mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-02-25 12:04:56 +01:00
Fix HTTP/2 requests failing with duplicate Content-Length
When Yaak moved compression, redirect, and multipart handling out of reqwest, it started explicitly adding Content-Length as an HTTP header. Over HTTP/2, hyper also sets content-length internally via DATA frame sizing, causing a duplicate that HTTP/2 servers (like Node.js) reject with PROTOCOL_ERROR. Instead of setting Content-Length as an explicit header, carry the content length through SendableBody::Stream and use http_body::Body's size_hint() to let hyper handle it automatically for both HTTP/1.1 and HTTP/2. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8167,6 +8167,7 @@ dependencies = [
|
||||
"cookie",
|
||||
"flate2",
|
||||
"futures-util",
|
||||
"http-body",
|
||||
"hyper-util",
|
||||
"log",
|
||||
"mime_guess",
|
||||
|
||||
@@ -414,7 +414,7 @@ async fn execute_transaction<R: Runtime>(
|
||||
sendable_request.body = Some(SendableBody::Bytes(bytes));
|
||||
None
|
||||
}
|
||||
Some(SendableBody::Stream(stream)) => {
|
||||
Some(SendableBody::Stream { data: stream, content_length }) => {
|
||||
// Wrap stream with TeeReader to capture data as it's read
|
||||
// Use unbounded channel to ensure all data is captured without blocking the HTTP request
|
||||
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
|
||||
@@ -448,7 +448,7 @@ async fn execute_transaction<R: Runtime>(
|
||||
None
|
||||
};
|
||||
|
||||
sendable_request.body = Some(SendableBody::Stream(pinned));
|
||||
sendable_request.body = Some(SendableBody::Stream { data: pinned, content_length });
|
||||
handle
|
||||
}
|
||||
None => {
|
||||
|
||||
@@ -12,6 +12,7 @@ bytes = "1.11.1"
|
||||
cookie = "0.18.1"
|
||||
flate2 = "1"
|
||||
futures-util = "0.3"
|
||||
http-body = "1"
|
||||
url = "2"
|
||||
zstd = "0.13"
|
||||
hyper-util = { version = "0.1.17", default-features = false, features = ["client-legacy"] }
|
||||
|
||||
@@ -2,7 +2,9 @@ use crate::decompress::{ContentEncoding, streaming_decoder};
|
||||
use crate::error::{Error, Result};
|
||||
use crate::types::{SendableBody, SendableHttpRequest};
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures_util::StreamExt;
|
||||
use http_body::{Body as HttpBody, Frame, SizeHint};
|
||||
use reqwest::{Client, Method, Version};
|
||||
use std::fmt::Display;
|
||||
use std::pin::Pin;
|
||||
@@ -413,10 +415,16 @@ impl HttpSender for ReqwestSender {
|
||||
Some(SendableBody::Bytes(bytes)) => {
|
||||
req_builder = req_builder.body(bytes);
|
||||
}
|
||||
Some(SendableBody::Stream(stream)) => {
|
||||
// Convert AsyncRead stream to reqwest Body
|
||||
let stream = tokio_util::io::ReaderStream::new(stream);
|
||||
let body = reqwest::Body::wrap_stream(stream);
|
||||
Some(SendableBody::Stream { data, content_length }) => {
|
||||
// Convert AsyncRead stream to reqwest Body. If content length is
|
||||
// known, wrap with a SizedBody so hyper can set Content-Length
|
||||
// automatically (for both HTTP/1.1 and HTTP/2).
|
||||
let stream = tokio_util::io::ReaderStream::new(data);
|
||||
let body = if let Some(len) = content_length {
|
||||
reqwest::Body::wrap(SizedBody::new(stream, len))
|
||||
} else {
|
||||
reqwest::Body::wrap_stream(stream)
|
||||
};
|
||||
req_builder = req_builder.body(body);
|
||||
}
|
||||
}
|
||||
@@ -520,6 +528,55 @@ impl HttpSender for ReqwestSender {
|
||||
}
|
||||
}
|
||||
|
||||
/// A wrapper around a byte stream that reports a known content length via
|
||||
/// `size_hint()`. This lets hyper set `Content-Length` automatically (as a
|
||||
/// pseudo-header in HTTP/2, or a regular header in HTTP/1.1) without us
|
||||
/// having to add it as an explicit header — which causes duplicates and
|
||||
/// breaks HTTP/2.
|
||||
struct SizedBody<S> {
|
||||
stream: S,
|
||||
remaining: u64,
|
||||
}
|
||||
|
||||
// SAFETY: SizedBody is only ever accessed via Pin<&mut Self> through the
|
||||
// HttpBody::poll_frame method. It is never actually shared across threads.
|
||||
// This is needed because reqwest::Body::wrap requires Sync, but our inner
|
||||
// stream (ReaderStream<Pin<Box<dyn AsyncRead + Send>>>) is not Sync.
|
||||
unsafe impl<S: Send> Sync for SizedBody<S> {}
|
||||
|
||||
impl<S> SizedBody<S> {
|
||||
fn new(stream: S, content_length: u64) -> Self {
|
||||
Self { stream, remaining: content_length }
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> HttpBody for SizedBody<S>
|
||||
where
|
||||
S: futures_util::Stream<Item = std::result::Result<Bytes, std::io::Error>> + Send + Unpin + 'static,
|
||||
{
|
||||
type Data = Bytes;
|
||||
type Error = std::io::Error;
|
||||
|
||||
fn poll_frame(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
) -> Poll<Option<std::result::Result<Frame<Self::Data>, Self::Error>>> {
|
||||
match Pin::new(&mut self.stream).poll_next(cx) {
|
||||
Poll::Ready(Some(Ok(chunk))) => {
|
||||
self.remaining = self.remaining.saturating_sub(chunk.len() as u64);
|
||||
Poll::Ready(Some(Ok(Frame::data(chunk))))
|
||||
}
|
||||
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
|
||||
Poll::Ready(None) => Poll::Ready(None),
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
fn size_hint(&self) -> SizeHint {
|
||||
SizeHint::with_exact(self.remaining)
|
||||
}
|
||||
}
|
||||
|
||||
fn version_to_str(version: &Version) -> String {
|
||||
match *version {
|
||||
Version::HTTP_09 => "HTTP/0.9".to_string(),
|
||||
|
||||
@@ -16,7 +16,13 @@ pub(crate) const MULTIPART_BOUNDARY: &str = "------YaakFormBoundary";
|
||||
|
||||
pub enum SendableBody {
|
||||
Bytes(Bytes),
|
||||
Stream(Pin<Box<dyn AsyncRead + Send + 'static>>),
|
||||
Stream {
|
||||
data: Pin<Box<dyn AsyncRead + Send + 'static>>,
|
||||
/// Known content length for the stream, if available. This is used by
|
||||
/// the sender to set the body size hint so that hyper can set
|
||||
/// Content-Length automatically for both HTTP/1.1 and HTTP/2.
|
||||
content_length: Option<u64>,
|
||||
},
|
||||
}
|
||||
|
||||
enum SendableBodyWithMeta {
|
||||
@@ -31,7 +37,10 @@ impl From<SendableBodyWithMeta> for SendableBody {
|
||||
fn from(value: SendableBodyWithMeta) -> Self {
|
||||
match value {
|
||||
SendableBodyWithMeta::Bytes(b) => SendableBody::Bytes(b),
|
||||
SendableBodyWithMeta::Stream { data, .. } => SendableBody::Stream(data),
|
||||
SendableBodyWithMeta::Stream { data, content_length } => SendableBody::Stream {
|
||||
data,
|
||||
content_length: content_length.map(|l| l as u64),
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -186,23 +195,11 @@ async fn build_body(
|
||||
}
|
||||
}
|
||||
|
||||
// Check if Transfer-Encoding: chunked is already set
|
||||
let has_chunked_encoding = headers.iter().any(|h| {
|
||||
h.0.to_lowercase() == "transfer-encoding" && h.1.to_lowercase().contains("chunked")
|
||||
});
|
||||
|
||||
// Add a Content-Length header only if chunked encoding is not being used
|
||||
if !has_chunked_encoding {
|
||||
let content_length = match body {
|
||||
Some(SendableBodyWithMeta::Bytes(ref bytes)) => Some(bytes.len()),
|
||||
Some(SendableBodyWithMeta::Stream { content_length, .. }) => content_length,
|
||||
None => None,
|
||||
};
|
||||
|
||||
if let Some(cl) = content_length {
|
||||
headers.push(("Content-Length".to_string(), cl.to_string()));
|
||||
}
|
||||
}
|
||||
// NOTE: Content-Length is NOT set as an explicit header here. Instead, the
|
||||
// body's content length is carried via SendableBody::Stream { content_length }
|
||||
// and used by the sender to set the body size hint. This lets hyper handle
|
||||
// Content-Length automatically for both HTTP/1.1 and HTTP/2, avoiding the
|
||||
// duplicate Content-Length that breaks HTTP/2 servers.
|
||||
|
||||
Ok((body.map(|b| b.into()), headers))
|
||||
}
|
||||
@@ -928,7 +925,27 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_no_content_length_with_chunked_encoding() -> Result<()> {
|
||||
async fn test_no_content_length_header_added_by_build_body() -> Result<()> {
|
||||
let mut body = BTreeMap::new();
|
||||
body.insert("text".to_string(), json!("Hello, World!"));
|
||||
|
||||
let headers = vec![];
|
||||
|
||||
let (_, result_headers) =
|
||||
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
||||
|
||||
// Content-Length should NOT be set as an explicit header. Instead, the
|
||||
// sender uses the body's size_hint to let hyper set it automatically,
|
||||
// which works correctly for both HTTP/1.1 and HTTP/2.
|
||||
let has_content_length =
|
||||
result_headers.iter().any(|h| h.0.to_lowercase() == "content-length");
|
||||
assert!(!has_content_length, "Content-Length should not be set as an explicit header");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_chunked_encoding_header_preserved() -> Result<()> {
|
||||
let mut body = BTreeMap::new();
|
||||
body.insert("text".to_string(), json!("Hello, World!"));
|
||||
|
||||
@@ -938,11 +955,6 @@ mod tests {
|
||||
let (_, result_headers) =
|
||||
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
||||
|
||||
// Verify that Content-Length is NOT present when Transfer-Encoding: chunked is set
|
||||
let has_content_length =
|
||||
result_headers.iter().any(|h| h.0.to_lowercase() == "content-length");
|
||||
assert!(!has_content_length, "Content-Length should not be present with chunked encoding");
|
||||
|
||||
// Verify that the Transfer-Encoding header is still present
|
||||
let has_chunked = result_headers.iter().any(|h| {
|
||||
h.0.to_lowercase() == "transfer-encoding" && h.1.to_lowercase().contains("chunked")
|
||||
@@ -951,31 +963,4 @@ mod tests {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_content_length_without_chunked_encoding() -> Result<()> {
|
||||
let mut body = BTreeMap::new();
|
||||
body.insert("text".to_string(), json!("Hello, World!"));
|
||||
|
||||
// Headers without Transfer-Encoding: chunked
|
||||
let headers = vec![];
|
||||
|
||||
let (_, result_headers) =
|
||||
build_body("POST", &Some("text/plain".to_string()), &body, headers).await?;
|
||||
|
||||
// Verify that Content-Length IS present when Transfer-Encoding: chunked is NOT set
|
||||
let content_length_header =
|
||||
result_headers.iter().find(|h| h.0.to_lowercase() == "content-length");
|
||||
assert!(
|
||||
content_length_header.is_some(),
|
||||
"Content-Length should be present without chunked encoding"
|
||||
);
|
||||
assert_eq!(
|
||||
content_length_header.unwrap().1,
|
||||
"13",
|
||||
"Content-Length should match the body size"
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
114
crates/yaak-http/tests/h2_repro.rs
Normal file
114
crates/yaak-http/tests/h2_repro.rs
Normal file
@@ -0,0 +1,114 @@
|
||||
/// Integration test to reproduce the HTTP/2 "HTTP/1 specific headers are forbidden" bug.
|
||||
///
|
||||
/// Requires a local HTTP/2-only server running on https://localhost:8443
|
||||
/// Start one with: node h2server.mjs (see tmp dir for the script)
|
||||
///
|
||||
/// Run with: cargo test -p yaak-http --test h2_repro -- --nocapture
|
||||
use reqwest::redirect;
|
||||
use tokio::sync::mpsc;
|
||||
use yaak_http::sender::{HttpSender, ReqwestSender};
|
||||
use yaak_http::types::SendableHttpRequest;
|
||||
use yaak_tls::get_tls_config;
|
||||
|
||||
fn build_yaak_client() -> reqwest::Client {
|
||||
let tls_config = get_tls_config(false, true, None).unwrap();
|
||||
reqwest::Client::builder()
|
||||
.redirect(redirect::Policy::none())
|
||||
.no_gzip()
|
||||
.no_brotli()
|
||||
.no_deflate()
|
||||
.referer(false)
|
||||
.tls_info(true)
|
||||
.pool_max_idle_per_host(0)
|
||||
.use_preconfigured_tls(tls_config)
|
||||
.build()
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
async fn send_and_print(name: &str, request: SendableHttpRequest) {
|
||||
println!("\n=== {} ===", name);
|
||||
let sender = ReqwestSender::with_client(build_yaak_client());
|
||||
let (event_tx, mut event_rx) = mpsc::channel(100);
|
||||
|
||||
let test_name = name.to_string();
|
||||
let handle = tokio::spawn(async move {
|
||||
while let Some(event) = event_rx.recv().await {
|
||||
println!(" [{}] {}", test_name, event);
|
||||
}
|
||||
});
|
||||
|
||||
match sender.send(request, event_tx).await {
|
||||
Ok(response) => {
|
||||
println!(" [{}] OK: HTTP {} {:?}", name, response.status, response.version);
|
||||
}
|
||||
Err(e) => {
|
||||
println!(" [{}] FAILED: {}", name, e);
|
||||
}
|
||||
}
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_h2_post_with_body() {
|
||||
// Narrow down: POST with body but no extra headers
|
||||
send_and_print("post-body-no-headers", SendableHttpRequest {
|
||||
url: "https://localhost:8443/".to_string(),
|
||||
method: "POST".to_string(),
|
||||
headers: vec![],
|
||||
body: Some(yaak_http::types::SendableBody::Bytes(
|
||||
bytes::Bytes::from(r#"{"hello":"world"}"#),
|
||||
)),
|
||||
options: Default::default(),
|
||||
}).await;
|
||||
|
||||
// POST with body + content-type only
|
||||
send_and_print("post-body-content-type", SendableHttpRequest {
|
||||
url: "https://localhost:8443/".to_string(),
|
||||
method: "POST".to_string(),
|
||||
headers: vec![
|
||||
("Content-Type".to_string(), "application/json".to_string()),
|
||||
],
|
||||
body: Some(yaak_http::types::SendableBody::Bytes(
|
||||
bytes::Bytes::from(r#"{"hello":"world"}"#),
|
||||
)),
|
||||
options: Default::default(),
|
||||
}).await;
|
||||
|
||||
// POST with body + content-length only
|
||||
send_and_print("post-body-content-length", SendableHttpRequest {
|
||||
url: "https://localhost:8443/".to_string(),
|
||||
method: "POST".to_string(),
|
||||
headers: vec![
|
||||
("Content-Length".to_string(), "18".to_string()),
|
||||
],
|
||||
body: Some(yaak_http::types::SendableBody::Bytes(
|
||||
bytes::Bytes::from(r#"{"hello":"world"}"#),
|
||||
)),
|
||||
options: Default::default(),
|
||||
}).await;
|
||||
|
||||
// POST with body + all typical Yaak headers
|
||||
send_and_print("post-body-all-yaak-headers", SendableHttpRequest {
|
||||
url: "https://localhost:8443/".to_string(),
|
||||
method: "POST".to_string(),
|
||||
headers: vec![
|
||||
("Content-Type".to_string(), "application/json".to_string()),
|
||||
("Content-Length".to_string(), "18".to_string()),
|
||||
("User-Agent".to_string(), "yaak".to_string()),
|
||||
("Accept".to_string(), "*/*".to_string()),
|
||||
],
|
||||
body: Some(yaak_http::types::SendableBody::Bytes(
|
||||
bytes::Bytes::from(r#"{"hello":"world"}"#),
|
||||
)),
|
||||
options: Default::default(),
|
||||
}).await;
|
||||
|
||||
// GET with no body (control — should work)
|
||||
send_and_print("get-control", SendableHttpRequest {
|
||||
url: "https://localhost:8443/".to_string(),
|
||||
method: "GET".to_string(),
|
||||
headers: vec![],
|
||||
body: None,
|
||||
options: Default::default(),
|
||||
}).await;
|
||||
}
|
||||
Reference in New Issue
Block a user