Http response events (#326)

This commit is contained in:
Gregory Schier
2025-12-21 14:34:37 -08:00
committed by GitHub
parent 7e0aa919fb
commit 089c7e8dce
18 changed files with 779 additions and 74 deletions

View File

@@ -7,20 +7,45 @@ use reqwest::{Client, Method, Version};
use std::collections::HashMap;
use std::fmt::Display;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
use tokio::io::{AsyncRead, AsyncReadExt, BufReader, ReadBuf};
use tokio::sync::mpsc;
use tokio_util::io::StreamReader;
#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum RedirectBehavior {
/// 307/308: Method and body are preserved
Preserve,
/// 303 or 301/302 with POST: Method changed to GET, body dropped
DropBody,
}
#[derive(Debug, Clone)]
pub enum HttpResponseEvent {
Setting(String, String),
Info(String),
SendUrl { method: String, path: String },
ReceiveUrl { version: Version, status: String },
Redirect {
url: String,
status: u16,
behavior: RedirectBehavior,
},
SendUrl {
method: String,
path: String,
},
ReceiveUrl {
version: Version,
status: String,
},
HeaderUp(String, String),
HeaderDown(String, String),
HeaderUpDone,
HeaderDownDone,
ChunkSent {
bytes: usize,
},
ChunkReceived {
bytes: usize,
},
}
impl Display for HttpResponseEvent {
@@ -28,14 +53,47 @@ impl Display for HttpResponseEvent {
match self {
HttpResponseEvent::Setting(name, value) => write!(f, "* Setting {}={}", name, value),
HttpResponseEvent::Info(s) => write!(f, "* {}", s),
HttpResponseEvent::Redirect { url, status, behavior } => {
let behavior_str = match behavior {
RedirectBehavior::Preserve => "preserve",
RedirectBehavior::DropBody => "drop body",
};
write!(f, "* Redirect {} -> {} ({})", status, url, behavior_str)
}
HttpResponseEvent::SendUrl { method, path } => write!(f, "> {} {}", method, path),
HttpResponseEvent::ReceiveUrl { version, status } => {
write!(f, "< {} {}", version_to_str(version), status)
}
HttpResponseEvent::HeaderUp(name, value) => write!(f, "> {}: {}", name, value),
HttpResponseEvent::HeaderUpDone => write!(f, ">"),
HttpResponseEvent::HeaderDown(name, value) => write!(f, "< {}: {}", name, value),
HttpResponseEvent::HeaderDownDone => write!(f, "<"),
HttpResponseEvent::ChunkSent { bytes } => write!(f, "> [{} bytes sent]", bytes),
HttpResponseEvent::ChunkReceived { bytes } => write!(f, "< [{} bytes received]", bytes),
}
}
}
impl From<HttpResponseEvent> for yaak_models::models::HttpResponseEventData {
fn from(event: HttpResponseEvent) -> Self {
use yaak_models::models::HttpResponseEventData as D;
match event {
HttpResponseEvent::Setting(name, value) => D::Setting { name, value },
HttpResponseEvent::Info(message) => D::Info { message },
HttpResponseEvent::Redirect { url, status, behavior } => D::Redirect {
url,
status,
behavior: match behavior {
RedirectBehavior::Preserve => "preserve".to_string(),
RedirectBehavior::DropBody => "drop_body".to_string(),
},
},
HttpResponseEvent::SendUrl { method, path } => D::SendUrl { method, path },
HttpResponseEvent::ReceiveUrl { version, status } => {
D::ReceiveUrl { version: format!("{:?}", version), status }
}
HttpResponseEvent::HeaderUp(name, value) => D::HeaderUp { name, value },
HttpResponseEvent::HeaderDown(name, value) => D::HeaderDown { name, value },
HttpResponseEvent::ChunkSent { bytes } => D::ChunkSent { bytes },
HttpResponseEvent::ChunkReceived { bytes } => D::ChunkReceived { bytes },
}
}
}
@@ -49,6 +107,40 @@ pub struct BodyStats {
pub size_decompressed: u64,
}
/// An AsyncRead wrapper that sends chunk events as data is read
pub struct TrackingRead<R> {
inner: R,
event_tx: mpsc::UnboundedSender<HttpResponseEvent>,
ended: bool,
}
impl<R> TrackingRead<R> {
pub fn new(inner: R, event_tx: mpsc::UnboundedSender<HttpResponseEvent>) -> Self {
Self { inner, event_tx, ended: false }
}
}
impl<R: AsyncRead + Unpin> AsyncRead for TrackingRead<R> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
let before = buf.filled().len();
let result = Pin::new(&mut self.inner).poll_read(cx, buf);
if let Poll::Ready(Ok(())) = &result {
let bytes_read = buf.filled().len() - before;
if bytes_read > 0 {
// Ignore send errors - receiver may have been dropped
let _ = self.event_tx.send(HttpResponseEvent::ChunkReceived { bytes: bytes_read });
} else if !self.ended {
self.ended = true;
}
}
result
}
}
/// Type alias for the body stream
type BodyStream = Pin<Box<dyn AsyncRead + Send>>;
@@ -215,10 +307,11 @@ impl HttpResponse {
pub trait HttpSender: Send + Sync {
/// Send an HTTP request and return the response with headers.
/// The body is not consumed until you call bytes(), text(), write_to_file(), or drain().
/// Events are sent through the provided channel.
async fn send(
&self,
request: SendableHttpRequest,
events: &mut Vec<HttpResponseEvent>,
event_tx: mpsc::UnboundedSender<HttpResponseEvent>,
) -> Result<HttpResponse>;
}
@@ -245,8 +338,13 @@ impl HttpSender for ReqwestSender {
async fn send(
&self,
request: SendableHttpRequest,
events: &mut Vec<HttpResponseEvent>,
event_tx: mpsc::UnboundedSender<HttpResponseEvent>,
) -> Result<HttpResponse> {
// Helper to send events (ignores errors if receiver is dropped)
let send_event = |event: HttpResponseEvent| {
let _ = event_tx.send(event);
};
// Parse the HTTP method
let method = Method::from_bytes(request.method.as_bytes())
.map_err(|e| Error::RequestError(format!("Invalid HTTP method: {}", e)))?;
@@ -282,7 +380,7 @@ impl HttpSender for ReqwestSender {
// Send the request
let sendable_req = req_builder.build()?;
events.push(HttpResponseEvent::Setting(
send_event(HttpResponseEvent::Setting(
"timeout".to_string(),
if request.options.timeout.unwrap_or_default().is_zero() {
"Infinity".to_string()
@@ -291,7 +389,7 @@ impl HttpSender for ReqwestSender {
},
));
events.push(HttpResponseEvent::SendUrl {
send_event(HttpResponseEvent::SendUrl {
path: sendable_req.url().path().to_string(),
method: sendable_req.method().to_string(),
});
@@ -300,10 +398,9 @@ impl HttpSender for ReqwestSender {
for (name, value) in sendable_req.headers() {
let v = value.to_str().unwrap_or_default().to_string();
request_headers.insert(name.to_string(), v.clone());
events.push(HttpResponseEvent::HeaderUp(name.to_string(), v));
send_event(HttpResponseEvent::HeaderUp(name.to_string(), v));
}
events.push(HttpResponseEvent::HeaderUpDone);
events.push(HttpResponseEvent::Info("Sending request to server".to_string()));
send_event(HttpResponseEvent::Info("Sending request to server".to_string()));
// Map some errors to our own, so they look nicer
let response = self.client.execute(sendable_req).await.map_err(|e| {
@@ -323,7 +420,7 @@ impl HttpSender for ReqwestSender {
let version = Some(version_to_str(&response.version()));
let content_length = response.content_length();
events.push(HttpResponseEvent::ReceiveUrl {
send_event(HttpResponseEvent::ReceiveUrl {
version: response.version(),
status: response.status().to_string(),
});
@@ -332,11 +429,10 @@ impl HttpSender for ReqwestSender {
let mut headers = HashMap::new();
for (key, value) in response.headers() {
if let Ok(v) = value.to_str() {
events.push(HttpResponseEvent::HeaderDown(key.to_string(), v.to_string()));
send_event(HttpResponseEvent::HeaderDown(key.to_string(), v.to_string()));
headers.insert(key.to_string(), v.to_string());
}
}
events.push(HttpResponseEvent::HeaderDownDone);
// Determine content encoding for decompression
// HTTP headers are case-insensitive, so we need to search for any casing
@@ -355,7 +451,9 @@ impl HttpSender for ReqwestSender {
byte_stream.map(|result| result.map_err(|e| std::io::Error::other(e))),
);
let body_stream: BodyStream = Box::pin(stream_reader);
// Wrap the stream with tracking to emit chunk received events via the same channel
let tracking_reader = TrackingRead::new(stream_reader, event_tx);
let body_stream: BodyStream = Box::pin(tracking_reader);
Ok(HttpResponse::new(
status,

View File

@@ -1,6 +1,7 @@
use crate::error::Result;
use crate::sender::{HttpResponse, HttpResponseEvent, HttpSender};
use crate::sender::{HttpResponse, HttpResponseEvent, HttpSender, RedirectBehavior};
use crate::types::SendableHttpRequest;
use tokio::sync::mpsc;
use tokio::sync::watch::Receiver;
/// HTTP Transaction that manages the lifecycle of a request, including redirect handling
@@ -22,17 +23,23 @@ impl<S: HttpSender> HttpTransaction<S> {
/// Execute the request with cancellation support.
/// Returns an HttpResponse with unconsumed body - caller decides how to consume it.
/// Events are sent through the provided channel.
pub async fn execute_with_cancellation(
&self,
request: SendableHttpRequest,
mut cancelled_rx: Receiver<bool>,
) -> Result<(HttpResponse, Vec<HttpResponseEvent>)> {
event_tx: mpsc::UnboundedSender<HttpResponseEvent>,
) -> Result<HttpResponse> {
let mut redirect_count = 0;
let mut current_url = request.url;
let mut current_method = request.method;
let mut current_headers = request.headers;
let mut current_body = request.body;
let mut events = Vec::new();
// Helper to send events (ignores errors if receiver is dropped)
let send_event = |event: HttpResponseEvent| {
let _ = event_tx.send(event);
};
loop {
// Check for cancellation before each request
@@ -50,14 +57,14 @@ impl<S: HttpSender> HttpTransaction<S> {
};
// Send the request
events.push(HttpResponseEvent::Setting(
send_event(HttpResponseEvent::Setting(
"redirects".to_string(),
request.options.follow_redirects.to_string(),
));
// Execute with cancellation support
let response = tokio::select! {
result = self.sender.send(req, &mut events) => result?,
result = self.sender.send(req, event_tx.clone()) => result?,
_ = cancelled_rx.changed() => {
return Err(crate::error::Error::RequestCanceledError);
}
@@ -65,12 +72,12 @@ impl<S: HttpSender> HttpTransaction<S> {
if !Self::is_redirect(response.status) {
// Not a redirect - return the response for caller to consume body
return Ok((response, events));
return Ok(response);
}
if !request.options.follow_redirects {
// Redirects disabled - return the redirect response as-is
return Ok((response, events));
return Ok(response);
}
// Check if we've exceeded max redirects
@@ -99,7 +106,7 @@ impl<S: HttpSender> HttpTransaction<S> {
// Also get status before draining
let status = response.status;
events.push(HttpResponseEvent::Info("Ignoring the response body".to_string()));
send_event(HttpResponseEvent::Info("Ignoring the response body".to_string()));
// Drain the redirect response body before following
response.drain().await?;
@@ -118,38 +125,36 @@ impl<S: HttpSender> HttpTransaction<S> {
format!("{}/{}", base_path, location)
};
events.push(HttpResponseEvent::Info(format!(
"Issuing redirect {} to: {}",
redirect_count + 1,
current_url
)));
// Determine redirect behavior based on status code and method
let behavior = if status == 303 {
// 303 See Other always changes to GET
RedirectBehavior::DropBody
} else if (status == 301 || status == 302) && current_method == "POST" {
// For 301/302, change POST to GET (common browser behavior)
RedirectBehavior::DropBody
} else {
// For 307 and 308, the method and body are preserved
// Also for 301/302 with non-POST methods
RedirectBehavior::Preserve
};
send_event(HttpResponseEvent::Redirect {
url: current_url.clone(),
status,
behavior: behavior.clone(),
});
// Handle method changes for certain redirect codes
if status == 303 {
// 303 See Other always changes to GET
if matches!(behavior, RedirectBehavior::DropBody) {
if current_method != "GET" {
current_method = "GET".to_string();
events.push(HttpResponseEvent::Info("Changing method to GET".to_string()));
}
// Remove content-related headers
current_headers.retain(|h| {
let name_lower = h.0.to_lowercase();
!name_lower.starts_with("content-") && name_lower != "transfer-encoding"
});
} else if status == 301 || status == 302 {
// For 301/302, change POST to GET (common browser behavior)
// but keep other methods as-is
if current_method == "POST" {
events.push(HttpResponseEvent::Info("Changing method to GET".to_string()));
current_method = "GET".to_string();
// Remove content-related headers
current_headers.retain(|h| {
let name_lower = h.0.to_lowercase();
!name_lower.starts_with("content-") && name_lower != "transfer-encoding"
});
}
}
// For 307 and 308, the method and body are preserved
// Reset body for next iteration (since it was moved in the send call)
// For redirects that change method to GET or for all redirects since body was consumed
@@ -231,7 +236,7 @@ mod tests {
async fn send(
&self,
_request: SendableHttpRequest,
_events: &mut Vec<HttpResponseEvent>,
_event_tx: mpsc::UnboundedSender<HttpResponseEvent>,
) -> Result<HttpResponse> {
let mut responses = self.responses.lock().await;
if responses.is_empty() {
@@ -271,7 +276,8 @@ mod tests {
};
let (_tx, rx) = tokio::sync::watch::channel(false);
let (result, _) = transaction.execute_with_cancellation(request, rx).await.unwrap();
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let result = transaction.execute_with_cancellation(request, rx, event_tx).await.unwrap();
assert_eq!(result.status, 200);
// Consume the body to verify it
@@ -303,7 +309,8 @@ mod tests {
};
let (_tx, rx) = tokio::sync::watch::channel(false);
let (result, _) = transaction.execute_with_cancellation(request, rx).await.unwrap();
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let result = transaction.execute_with_cancellation(request, rx, event_tx).await.unwrap();
assert_eq!(result.status, 200);
let (body, _) = result.bytes().await.unwrap();
@@ -334,7 +341,8 @@ mod tests {
};
let (_tx, rx) = tokio::sync::watch::channel(false);
let result = transaction.execute_with_cancellation(request, rx).await;
let (event_tx, _event_rx) = mpsc::unbounded_channel();
let result = transaction.execute_with_cancellation(request, rx, event_tx).await;
if let Err(crate::error::Error::RequestError(msg)) = result {
assert!(msg.contains("Maximum redirect limit"));
} else {