mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-04-22 08:38:29 +02:00
Fix HTTP/2 requests failing with duplicate Content-Length (#391)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8167,6 +8167,7 @@ dependencies = [
|
|||||||
"cookie",
|
"cookie",
|
||||||
"flate2",
|
"flate2",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"http-body",
|
||||||
"hyper-util",
|
"hyper-util",
|
||||||
"log",
|
"log",
|
||||||
"mime_guess",
|
"mime_guess",
|
||||||
|
|||||||
@@ -414,7 +414,7 @@ async fn execute_transaction<R: Runtime>(
|
|||||||
sendable_request.body = Some(SendableBody::Bytes(bytes));
|
sendable_request.body = Some(SendableBody::Bytes(bytes));
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
Some(SendableBody::Stream(stream)) => {
|
Some(SendableBody::Stream { data: stream, content_length }) => {
|
||||||
// Wrap stream with TeeReader to capture data as it's read
|
// Wrap stream with TeeReader to capture data as it's read
|
||||||
// Use unbounded channel to ensure all data is captured without blocking the HTTP request
|
// Use unbounded channel to ensure all data is captured without blocking the HTTP request
|
||||||
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
|
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
|
||||||
@@ -448,7 +448,7 @@ async fn execute_transaction<R: Runtime>(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
sendable_request.body = Some(SendableBody::Stream(pinned));
|
sendable_request.body = Some(SendableBody::Stream { data: pinned, content_length });
|
||||||
handle
|
handle
|
||||||
}
|
}
|
||||||
None => {
|
None => {
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ bytes = "1.11.1"
|
|||||||
cookie = "0.18.1"
|
cookie = "0.18.1"
|
||||||
flate2 = "1"
|
flate2 = "1"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
|
http-body = "1"
|
||||||
url = "2"
|
url = "2"
|
||||||
zstd = "0.13"
|
zstd = "0.13"
|
||||||
hyper-util = { version = "0.1.17", default-features = false, features = ["client-legacy"] }
|
hyper-util = { version = "0.1.17", default-features = false, features = ["client-legacy"] }
|
||||||
|
|||||||
@@ -2,7 +2,9 @@ use crate::decompress::{ContentEncoding, streaming_decoder};
|
|||||||
use crate::error::{Error, Result};
|
use crate::error::{Error, Result};
|
||||||
use crate::types::{SendableBody, SendableHttpRequest};
|
use crate::types::{SendableBody, SendableHttpRequest};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use http_body::{Body as HttpBody, Frame, SizeHint};
|
||||||
use reqwest::{Client, Method, Version};
|
use reqwest::{Client, Method, Version};
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
@@ -413,10 +415,16 @@ impl HttpSender for ReqwestSender {
|
|||||||
Some(SendableBody::Bytes(bytes)) => {
|
Some(SendableBody::Bytes(bytes)) => {
|
||||||
req_builder = req_builder.body(bytes);
|
req_builder = req_builder.body(bytes);
|
||||||
}
|
}
|
||||||
Some(SendableBody::Stream(stream)) => {
|
Some(SendableBody::Stream { data, content_length }) => {
|
||||||
// Convert AsyncRead stream to reqwest Body
|
// Convert AsyncRead stream to reqwest Body. If content length is
|
||||||
let stream = tokio_util::io::ReaderStream::new(stream);
|
// known, wrap with a SizedBody so hyper can set Content-Length
|
||||||
let body = reqwest::Body::wrap_stream(stream);
|
// automatically (for both HTTP/1.1 and HTTP/2).
|
||||||
|
let stream = tokio_util::io::ReaderStream::new(data);
|
||||||
|
let body = if let Some(len) = content_length {
|
||||||
|
reqwest::Body::wrap(SizedBody::new(stream, len))
|
||||||
|
} else {
|
||||||
|
reqwest::Body::wrap_stream(stream)
|
||||||
|
};
|
||||||
req_builder = req_builder.body(body);
|
req_builder = req_builder.body(body);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -520,6 +528,51 @@ impl HttpSender for ReqwestSender {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A wrapper around a byte stream that reports a known content length via
|
||||||
|
/// `size_hint()`. This lets hyper set the `Content-Length` header
|
||||||
|
/// automatically based on the body size, without us having to add it as an
|
||||||
|
/// explicit header — which can cause duplicate `Content-Length` headers and
|
||||||
|
/// break HTTP/2.
|
||||||
|
struct SizedBody<S> {
|
||||||
|
stream: std::sync::Mutex<S>,
|
||||||
|
remaining: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> SizedBody<S> {
|
||||||
|
fn new(stream: S, content_length: u64) -> Self {
|
||||||
|
Self { stream: std::sync::Mutex::new(stream), remaining: content_length }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> HttpBody for SizedBody<S>
|
||||||
|
where
|
||||||
|
S: futures_util::Stream<Item = std::result::Result<Bytes, std::io::Error>> + Send + Unpin + 'static,
|
||||||
|
{
|
||||||
|
type Data = Bytes;
|
||||||
|
type Error = std::io::Error;
|
||||||
|
|
||||||
|
fn poll_frame(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
|
||||||
|
let this = self.get_mut();
|
||||||
|
let mut stream = this.stream.lock().unwrap();
|
||||||
|
match stream.poll_next_unpin(cx) {
|
||||||
|
Poll::Ready(Some(Ok(chunk))) => {
|
||||||
|
this.remaining = this.remaining.saturating_sub(chunk.len() as u64);
|
||||||
|
Poll::Ready(Some(Ok(Frame::data(chunk))))
|
||||||
|
}
|
||||||
|
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
||||||
|
Poll::Ready(None) => Poll::Ready(None),
|
||||||
|
Poll::Pending => Poll::Pending,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> SizeHint {
|
||||||
|
SizeHint::with_exact(self.remaining)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn version_to_str(version: &Version) -> String {
|
fn version_to_str(version: &Version) -> String {
|
||||||
match *version {
|
match *version {
|
||||||
Version::HTTP_09 => "HTTP/0.9".to_string(),
|
Version::HTTP_09 => "HTTP/0.9".to_string(),
|
||||||
|
|||||||
@@ -16,7 +16,13 @@ pub(crate) const MULTIPART_BOUNDARY: &str = "------YaakFormBoundary";
|
|||||||
|
|
||||||
pub enum SendableBody {
|
pub enum SendableBody {
|
||||||
Bytes(Bytes),
|
Bytes(Bytes),
|
||||||
Stream(Pin<Box<dyn AsyncRead + Send + 'static>>),
|
Stream {
|
||||||
|
data: Pin<Box<dyn AsyncRead + Send + 'static>>,
|
||||||
|
/// Known content length for the stream, if available. This is used by
|
||||||
|
/// the sender to set the body size hint so that hyper can set
|
||||||
|
/// Content-Length automatically for both HTTP/1.1 and HTTP/2.
|
||||||
|
content_length: Option<u64>,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
enum SendableBodyWithMeta {
|
enum SendableBodyWithMeta {
|
||||||
@@ -31,7 +37,10 @@ impl From<SendableBodyWithMeta> for SendableBody {
|
|||||||
fn from(value: SendableBodyWithMeta) -> Self {
|
fn from(value: SendableBodyWithMeta) -> Self {
|
||||||
match value {
|
match value {
|
||||||
SendableBodyWithMeta::Bytes(b) => SendableBody::Bytes(b),
|
SendableBodyWithMeta::Bytes(b) => SendableBody::Bytes(b),
|
||||||
SendableBodyWithMeta::Stream { data, .. } => SendableBody::Stream(data),
|
SendableBodyWithMeta::Stream { data, content_length } => SendableBody::Stream {
|
||||||
|
data,
|
||||||
|
content_length: content_length.map(|l| l as u64),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -186,23 +195,11 @@ async fn build_body(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if Transfer-Encoding: chunked is already set
|
// NOTE: Content-Length is NOT set as an explicit header here. Instead, the
|
||||||
let has_chunked_encoding = headers.iter().any(|h| {
|
// body's content length is carried via SendableBody::Stream { content_length }
|
||||||
h.0.to_lowercase() == "transfer-encoding" && h.1.to_lowercase().contains("chunked")
|
// and used by the sender to set the body size hint. This lets hyper handle
|
||||||
});
|
// Content-Length automatically for both HTTP/1.1 and HTTP/2, avoiding the
|
||||||
|
// duplicate Content-Length that breaks HTTP/2 servers.
|
||||||
// Add a Content-Length header only if chunked encoding is not being used
|
|
||||||
if !has_chunked_encoding {
|
|
||||||
let content_length = match body {
|
|
||||||
Some(SendableBodyWithMeta::Bytes(ref bytes)) => Some(bytes.len()),
|
|
||||||
Some(SendableBodyWithMeta::Stream { content_length, .. }) => content_length,
|
|
||||||
None => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(cl) = content_length {
|
|
||||||
headers.push(("Content-Length".to_string(), cl.to_string()));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok((body.map(|b| b.into()), headers))
|
Ok((body.map(|b| b.into()), headers))
|
||||||
}
|
}
|
||||||
@@ -928,7 +925,27 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_no_content_length_with_chunked_encoding() -> Result<()> {
|
async fn test_no_content_length_header_added_by_build_body() -> Result<()> {
|
||||||
|
let mut body = BTreeMap::new();
|
||||||
|
body.insert("text".to_string(), json!("Hello, World!"));
|
||||||
|
|
||||||
|
let headers = vec![];
|
||||||
|
|
||||||
|
let (_, result_headers) =
|
||||||
|
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
||||||
|
|
||||||
|
// Content-Length should NOT be set as an explicit header. Instead, the
|
||||||
|
// sender uses the body's size_hint to let hyper set it automatically,
|
||||||
|
// which works correctly for both HTTP/1.1 and HTTP/2.
|
||||||
|
let has_content_length =
|
||||||
|
result_headers.iter().any(|h| h.0.to_lowercase() == "content-length");
|
||||||
|
assert!(!has_content_length, "Content-Length should not be set as an explicit header");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_chunked_encoding_header_preserved() -> Result<()> {
|
||||||
let mut body = BTreeMap::new();
|
let mut body = BTreeMap::new();
|
||||||
body.insert("text".to_string(), json!("Hello, World!"));
|
body.insert("text".to_string(), json!("Hello, World!"));
|
||||||
|
|
||||||
@@ -938,11 +955,6 @@ mod tests {
|
|||||||
let (_, result_headers) =
|
let (_, result_headers) =
|
||||||
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
||||||
|
|
||||||
// Verify that Content-Length is NOT present when Transfer-Encoding: chunked is set
|
|
||||||
let has_content_length =
|
|
||||||
result_headers.iter().any(|h| h.0.to_lowercase() == "content-length");
|
|
||||||
assert!(!has_content_length, "Content-Length should not be present with chunked encoding");
|
|
||||||
|
|
||||||
// Verify that the Transfer-Encoding header is still present
|
// Verify that the Transfer-Encoding header is still present
|
||||||
let has_chunked = result_headers.iter().any(|h| {
|
let has_chunked = result_headers.iter().any(|h| {
|
||||||
h.0.to_lowercase() == "transfer-encoding" && h.1.to_lowercase().contains("chunked")
|
h.0.to_lowercase() == "transfer-encoding" && h.1.to_lowercase().contains("chunked")
|
||||||
@@ -951,31 +963,4 @@ mod tests {
|
|||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_content_length_without_chunked_encoding() -> Result<()> {
|
|
||||||
let mut body = BTreeMap::new();
|
|
||||||
body.insert("text".to_string(), json!("Hello, World!"));
|
|
||||||
|
|
||||||
// Headers without Transfer-Encoding: chunked
|
|
||||||
let headers = vec![];
|
|
||||||
|
|
||||||
let (_, result_headers) =
|
|
||||||
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
|
||||||
|
|
||||||
// Verify that Content-Length IS present when Transfer-Encoding: chunked is NOT set
|
|
||||||
let content_length_header =
|
|
||||||
result_headers.iter().find(|h| h.0.to_lowercase() == "content-length");
|
|
||||||
assert!(
|
|
||||||
content_length_header.is_some(),
|
|
||||||
"Content-Length should be present without chunked encoding"
|
|
||||||
);
|
|
||||||
assert_eq!(
|
|
||||||
content_length_header.unwrap().1,
|
|
||||||
"13",
|
|
||||||
"Content-Length should match the body size"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user