refactor send callers to shared plugin runtime helper

This commit is contained in:
Gregory Schier
2026-02-16 15:09:06 -08:00
parent 7cd47ae811
commit 603ae0650d
5 changed files with 650 additions and 647 deletions

5
Cargo.lock generated
View File

@@ -8243,13 +8243,18 @@ dependencies = [
name = "yaak"
version = "0.1.0"
dependencies = [
"async-trait",
"log 0.4.29",
"md5 0.8.0",
"serde_json",
"thiserror 2.0.17",
"tokio",
"yaak-crypto",
"yaak-http",
"yaak-models",
"yaak-plugins",
"yaak-templates",
"yaak-tls",
]
[[package]]

View File

@@ -6,12 +6,10 @@ use crate::commands::json::{
};
use crate::context::CliContext;
use tokio::sync::mpsc;
use yaak::send::{SendHttpRequestByIdParams, send_http_request_by_id};
use yaak_http::types::SendableHttpRequestOptions;
use yaak::send::{SendHttpRequestByIdWithPluginsParams, send_http_request_by_id_with_plugins};
use yaak_models::models::HttpRequest;
use yaak_models::util::UpdateSource;
use yaak_plugins::events::{PluginContext, RenderPurpose};
use yaak_plugins::template_callback::PluginTemplateCallback;
use yaak_plugins::events::PluginContext;
pub async fn run(
ctx: &CliContext,
@@ -174,12 +172,6 @@ pub async fn send_request_by_id(
ctx.db().get_http_request(request_id).map_err(|e| format!("Failed to get request: {e}"))?;
let plugin_context = PluginContext::new(None, Some(request.workspace_id.clone()));
let template_callback = PluginTemplateCallback::new(
ctx.plugin_manager(),
ctx.encryption_manager.clone(),
&plugin_context,
RenderPurpose::Send,
);
let (event_tx, mut event_rx) = mpsc::channel(100);
let event_handle = tokio::spawn(async move {
@@ -191,17 +183,22 @@ pub async fn send_request_by_id(
});
let response_dir = ctx.data_dir().join("responses");
let result = send_http_request_by_id(SendHttpRequestByIdParams {
let result = send_http_request_by_id_with_plugins(SendHttpRequestByIdWithPluginsParams {
query_manager: ctx.query_manager(),
blob_manager: ctx.blob_manager(),
request_id,
environment_id: environment,
template_callback: &template_callback,
send_options: SendableHttpRequestOptions::default(),
update_source: UpdateSource::Sync,
cookie_jar_id: None,
cookie_jar_update_source: UpdateSource::Sync,
response_dir: &response_dir,
persist_events: true,
emit_events_to: Some(event_tx),
plugin_manager: ctx.plugin_manager(),
encryption_manager: ctx.encryption_manager.clone(),
plugin_context: &plugin_context,
cancelled_rx: None,
connection_manager: None,
})
.await;

View File

@@ -3,45 +3,18 @@ use crate::error::Error::GenericError;
use crate::error::Result;
use crate::models_ext::BlobManagerExt;
use crate::models_ext::QueryManagerExt;
use crate::render::render_http_request;
use log::{debug, warn};
use std::pin::Pin;
use log::warn;
use std::sync::Arc;
use std::sync::atomic::{AtomicI32, Ordering};
use std::time::{Duration, Instant};
use std::time::Instant;
use tauri::{AppHandle, Manager, Runtime, WebviewWindow};
use tokio::fs::{File, create_dir_all};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch::Receiver;
use tokio_util::bytes::Bytes;
use yaak::send::{SendHttpRequestWithPluginsParams, send_http_request_with_plugins};
use yaak_crypto::manager::EncryptionManager;
use yaak_http::client::{
HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth,
};
use yaak_http::cookies::CookieStore;
use yaak_http::manager::{CachedClient, HttpConnectionManager};
use yaak_http::sender::ReqwestSender;
use yaak_http::tee_reader::TeeReader;
use yaak_http::transaction::HttpTransaction;
use yaak_http::types::{
SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params,
};
use yaak_models::blob_manager::BodyChunk;
use yaak_models::models::{
CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseEvent, HttpResponseHeader,
HttpResponseState, ProxySetting, ProxySettingAuth,
};
use yaak_http::manager::HttpConnectionManager;
use yaak_models::models::{CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseState};
use yaak_models::util::UpdateSource;
use yaak_plugins::events::{
CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose,
};
use yaak_plugins::events::PluginContext;
use yaak_plugins::manager::PluginManager;
use yaak_plugins::template_callback::PluginTemplateCallback;
use yaak_templates::RenderOptions;
use yaak_tls::find_client_certificate;
/// Chunk size for storing request bodies (1MB)
const REQUEST_BODY_CHUNK_SIZE: usize = 1024 * 1024;
/// Context for managing response state during HTTP transactions.
/// Handles both persisted responses (stored in DB) and ephemeral responses (in-memory only).
@@ -168,148 +141,32 @@ async fn send_http_request_inner<R: Runtime>(
let plugin_manager = Arc::new((*app_handle.state::<PluginManager>()).clone());
let encryption_manager = Arc::new((*app_handle.state::<EncryptionManager>()).clone());
let connection_manager = app_handle.state::<HttpConnectionManager>();
let settings = window.db().get_settings();
let workspace_id = &unrendered_request.workspace_id;
let folder_id = unrendered_request.folder_id.as_deref();
let environment_id = environment.map(|e| e.id);
let workspace = window.db().get_workspace(workspace_id)?;
let (resolved, auth_context_id) = resolve_http_request(window, unrendered_request)?;
let cb = PluginTemplateCallback::new(
plugin_manager.clone(),
encryption_manager.clone(),
&plugin_context,
RenderPurpose::Send,
);
let env_chain =
window.db().resolve_environments(&workspace.id, folder_id, environment_id.as_deref())?;
let mut cancel_rx = cancelled_rx.clone();
let render_options = RenderOptions::throw();
let request = tokio::select! {
result = render_http_request(&resolved, env_chain, &cb, &render_options) => result?,
_ = cancel_rx.changed() => {
return Err(GenericError("Request canceled".to_string()));
}
};
let cookie_jar_id = cookie_jar.as_ref().map(|jar| jar.id.clone());
// Build the sendable request using the new SendableHttpRequest type
let options = SendableHttpRequestOptions {
follow_redirects: workspace.setting_follow_redirects,
timeout: if workspace.setting_request_timeout > 0 {
Some(Duration::from_millis(workspace.setting_request_timeout.unsigned_abs() as u64))
} else {
None
},
};
let mut sendable_request = SendableHttpRequest::from_http_request(&request, options).await?;
let response_dir = app_handle.path().app_data_dir()?.join("responses");
let result = send_http_request_with_plugins(SendHttpRequestWithPluginsParams {
query_manager: app_handle.db_manager().inner(),
blob_manager: app_handle.blob_manager().inner(),
request: unrendered_request.clone(),
environment_id: environment_id.as_deref(),
update_source: response_ctx.update_source.clone(),
cookie_jar_id,
cookie_jar_update_source: UpdateSource::Background,
response_dir: &response_dir,
persist_events: true,
emit_events_to: None,
existing_response: Some(response_ctx.response().clone()),
plugin_manager,
encryption_manager,
plugin_context,
cancelled_rx: Some(cancelled_rx.clone()),
connection_manager: Some(connection_manager.inner()),
})
.await
.map_err(|e| GenericError(e.to_string()))?;
debug!("Sending request to {} {}", sendable_request.method, sendable_request.url);
let proxy_setting = match settings.proxy {
None => HttpConnectionProxySetting::System,
Some(ProxySetting::Disabled) => HttpConnectionProxySetting::Disabled,
Some(ProxySetting::Enabled { http, https, auth, bypass, disabled }) => {
if disabled {
HttpConnectionProxySetting::System
} else {
HttpConnectionProxySetting::Enabled {
http,
https,
bypass,
auth: match auth {
None => None,
Some(ProxySettingAuth { user, password }) => {
Some(HttpConnectionProxySettingAuth { user, password })
}
},
}
}
}
};
let client_certificate =
find_client_certificate(&sendable_request.url, &settings.client_certificates);
// Create cookie store if a cookie jar is specified
let maybe_cookie_store = match cookie_jar.clone() {
Some(CookieJar { id, .. }) => {
// NOTE: We need to refetch the cookie jar because a chained request might have
// updated cookies when we rendered the request.
let cj = window.db().get_cookie_jar(&id)?;
let cookie_store = CookieStore::from_cookies(cj.cookies.clone());
Some((cookie_store, cj))
}
None => None,
};
let cached_client = connection_manager
.get_client(&HttpConnectionOptions {
id: plugin_context.id.clone(),
validate_certificates: workspace.setting_validate_certificates,
proxy: proxy_setting,
client_certificate,
dns_overrides: workspace.setting_dns_overrides.clone(),
})
.await?;
// Apply authentication to the request, racing against cancellation since
// auth plugins (e.g. OAuth2) can block indefinitely waiting for user action.
let mut cancel_rx = cancelled_rx.clone();
tokio::select! {
result = apply_authentication(
&window,
&mut sendable_request,
&request,
auth_context_id,
&plugin_manager,
plugin_context,
) => result?,
_ = cancel_rx.changed() => {
return Err(GenericError("Request canceled".to_string()));
}
};
let cookie_store = maybe_cookie_store.as_ref().map(|(cs, _)| cs.clone());
let result = execute_transaction(
cached_client,
sendable_request,
response_ctx,
cancelled_rx.clone(),
cookie_store,
)
.await;
// Wait for blob writing to complete and check for errors
let final_result = match result {
Ok((response, maybe_blob_write_handle)) => {
// Check if blob writing failed
if let Some(handle) = maybe_blob_write_handle {
if let Ok(Err(e)) = handle.await {
// Update response with the storage error
let _ = response_ctx.update(|r| {
let error_msg =
format!("Request succeeded but failed to store request body: {}", e);
r.error = Some(match &r.error {
Some(existing) => format!("{}; {}", existing, error_msg),
None => error_msg,
});
});
}
}
Ok(response)
}
Err(e) => Err(e),
};
// Persist cookies back to the database after the request completes
if let Some((cookie_store, mut cj)) = maybe_cookie_store {
let cookies = cookie_store.get_all_cookies();
cj.cookies = cookies;
if let Err(e) = window.db().upsert_cookie_jar(&cj, &UpdateSource::Background) {
warn!("Failed to persist cookies to database: {}", e);
}
}
final_result
Ok(result.response)
}
pub fn resolve_http_request<R: Runtime>(
@@ -328,395 +185,3 @@ pub fn resolve_http_request<R: Runtime>(
Ok((new_request, authentication_context_id))
}
async fn execute_transaction<R: Runtime>(
cached_client: CachedClient,
mut sendable_request: SendableHttpRequest,
response_ctx: &mut ResponseContext<R>,
mut cancelled_rx: Receiver<bool>,
cookie_store: Option<CookieStore>,
) -> Result<(HttpResponse, Option<tauri::async_runtime::JoinHandle<Result<()>>>)> {
let app_handle = &response_ctx.app_handle.clone();
let response_id = response_ctx.response().id.clone();
let workspace_id = response_ctx.response().workspace_id.clone();
let is_persisted = response_ctx.is_persisted();
// Keep a reference to the resolver for DNS timing events
let resolver = cached_client.resolver.clone();
let sender = ReqwestSender::with_client(cached_client.client);
let transaction = match cookie_store {
Some(cs) => HttpTransaction::with_cookie_store(sender, cs),
None => HttpTransaction::new(sender),
};
let start = Instant::now();
// Capture request headers before sending
let request_headers: Vec<HttpResponseHeader> = sendable_request
.headers
.iter()
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
// Update response with headers info
response_ctx.update(|r| {
r.url = sendable_request.url.clone();
r.request_headers = request_headers;
})?;
// Create bounded channel for receiving events and spawn a task to store them in DB
// Buffer size of 100 events provides back pressure if DB writes are slow
let (event_tx, mut event_rx) =
tokio::sync::mpsc::channel::<yaak_http::sender::HttpResponseEvent>(100);
// Set the event sender on the DNS resolver so it can emit DNS timing events
resolver.set_event_sender(Some(event_tx.clone())).await;
// Shared state to capture DNS timing from the event processing task
let dns_elapsed = Arc::new(AtomicI32::new(0));
// Write events to DB in a task (only for persisted responses)
if is_persisted {
let response_id = response_id.clone();
let app_handle = app_handle.clone();
let update_source = response_ctx.update_source.clone();
let workspace_id = workspace_id.clone();
let dns_elapsed = dns_elapsed.clone();
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
// Capture DNS timing when we see a DNS event
if let yaak_http::sender::HttpResponseEvent::DnsResolved { duration, .. } = &event {
dns_elapsed.store(*duration as i32, Ordering::SeqCst);
}
let db_event = HttpResponseEvent::new(&response_id, &workspace_id, event.into());
let _ = app_handle.db().upsert_http_response_event(&db_event, &update_source);
}
});
} else {
// For ephemeral responses, just drain the events but still capture DNS timing
let dns_elapsed = dns_elapsed.clone();
tokio::spawn(async move {
while let Some(event) = event_rx.recv().await {
if let yaak_http::sender::HttpResponseEvent::DnsResolved { duration, .. } = &event {
dns_elapsed.store(*duration as i32, Ordering::SeqCst);
}
}
});
};
// Capture request body as it's sent (only for persisted responses)
let body_id = format!("{}.request", response_id);
let maybe_blob_write_handle = match sendable_request.body {
Some(SendableBody::Bytes(bytes)) => {
if is_persisted {
write_bytes_to_db_sync(response_ctx, &body_id, bytes.clone())?;
}
sendable_request.body = Some(SendableBody::Bytes(bytes));
None
}
Some(SendableBody::Stream { data: stream, content_length }) => {
// Wrap stream with TeeReader to capture data as it's read
// Use unbounded channel to ensure all data is captured without blocking the HTTP request
let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::<Vec<u8>>();
let tee_reader = TeeReader::new(stream, body_chunk_tx);
let pinned: Pin<Box<dyn AsyncRead + Send + 'static>> = Box::pin(tee_reader);
let handle = if is_persisted {
// Spawn task to write request body chunks to blob DB
let app_handle = app_handle.clone();
let response_id = response_id.clone();
let workspace_id = workspace_id.clone();
let body_id = body_id.clone();
let update_source = response_ctx.update_source.clone();
Some(tauri::async_runtime::spawn(async move {
write_stream_chunks_to_db(
app_handle,
&body_id,
&workspace_id,
&response_id,
&update_source,
body_chunk_rx,
)
.await
}))
} else {
// For ephemeral responses, just drain the body chunks
tauri::async_runtime::spawn(async move {
let mut rx = body_chunk_rx;
while rx.recv().await.is_some() {}
});
None
};
sendable_request.body = Some(SendableBody::Stream { data: pinned, content_length });
handle
}
None => {
sendable_request.body = None;
None
}
};
// Execute the transaction with cancellation support
// This returns the response with headers, but body is not yet consumed
// Events (headers, settings, chunks) are sent through the channel
let mut http_response = transaction
.execute_with_cancellation(sendable_request, cancelled_rx.clone(), event_tx)
.await?;
// Prepare the response path before consuming the body
let body_path = if response_id.is_empty() {
// Ephemeral responses: use OS temp directory for automatic cleanup
let temp_dir = std::env::temp_dir().join("yaak-ephemeral-responses");
create_dir_all(&temp_dir).await?;
temp_dir.join(uuid::Uuid::new_v4().to_string())
} else {
// Persisted responses: use app data directory
let dir = app_handle.path().app_data_dir()?;
let base_dir = dir.join("responses");
create_dir_all(&base_dir).await?;
base_dir.join(&response_id)
};
// Extract metadata before consuming the body (headers are available immediately)
// Url might change, so update again
response_ctx.update(|r| {
r.body_path = Some(body_path.to_string_lossy().to_string());
r.elapsed_headers = start.elapsed().as_millis() as i32;
r.status = http_response.status as i32;
r.status_reason = http_response.status_reason.clone();
r.url = http_response.url.clone();
r.remote_addr = http_response.remote_addr.clone();
r.version = http_response.version.clone();
r.headers = http_response
.headers
.iter()
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
r.content_length = http_response.content_length.map(|l| l as i32);
r.state = HttpResponseState::Connected;
r.request_headers = http_response
.request_headers
.iter()
.map(|(n, v)| HttpResponseHeader { name: n.clone(), value: v.clone() })
.collect();
})?;
// Get the body stream for manual consumption
let mut body_stream = http_response.into_body_stream()?;
// Open file for writing
let mut file = File::options()
.create(true)
.truncate(true)
.write(true)
.open(&body_path)
.await
.map_err(|e| GenericError(format!("Failed to open file: {}", e)))?;
// Stream body to file, with throttled DB updates to avoid excessive writes
let mut written_bytes: usize = 0;
let mut last_update_time = start;
let mut buf = [0u8; 8192];
// Throttle settings: update DB at most every 100ms
const UPDATE_INTERVAL_MS: u128 = 100;
loop {
// Check for cancellation. If we already have headers/body, just close cleanly without error
if *cancelled_rx.borrow() {
break;
}
// Use select! to race between reading and cancellation, so cancellation is immediate
let read_result = tokio::select! {
biased;
_ = cancelled_rx.changed() => {
break;
}
result = body_stream.read(&mut buf) => result,
};
match read_result {
Ok(0) => break, // EOF
Ok(n) => {
file.write_all(&buf[..n])
.await
.map_err(|e| GenericError(format!("Failed to write to file: {}", e)))?;
file.flush()
.await
.map_err(|e| GenericError(format!("Failed to flush file: {}", e)))?;
written_bytes += n;
// Throttle DB updates: only update if enough time has passed
let now = Instant::now();
let elapsed_since_update = now.duration_since(last_update_time).as_millis();
if elapsed_since_update >= UPDATE_INTERVAL_MS {
response_ctx.update(|r| {
r.elapsed = start.elapsed().as_millis() as i32;
r.content_length = Some(written_bytes as i32);
})?;
last_update_time = now;
}
}
Err(e) => {
return Err(GenericError(format!("Failed to read response body: {}", e)));
}
}
}
// Final update with closed state and accurate byte count
response_ctx.update(|r| {
r.elapsed = start.elapsed().as_millis() as i32;
r.elapsed_dns = dns_elapsed.load(Ordering::SeqCst);
r.content_length = Some(written_bytes as i32);
r.state = HttpResponseState::Closed;
})?;
// Clear the event sender from the resolver since this request is done
resolver.set_event_sender(None).await;
Ok((response_ctx.response().clone(), maybe_blob_write_handle))
}
fn write_bytes_to_db_sync<R: Runtime>(
response_ctx: &mut ResponseContext<R>,
body_id: &str,
data: Bytes,
) -> Result<()> {
if data.is_empty() {
return Ok(());
}
// Write in chunks if data is large
let mut offset = 0;
let mut chunk_index = 0;
while offset < data.len() {
let end = std::cmp::min(offset + REQUEST_BODY_CHUNK_SIZE, data.len());
let chunk_data = data.slice(offset..end).to_vec();
let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
response_ctx.app_handle.blobs().insert_chunk(&chunk)?;
offset = end;
chunk_index += 1;
}
// Update the response with the total request body size
response_ctx.update(|r| {
r.request_content_length = Some(data.len() as i32);
})?;
Ok(())
}
async fn write_stream_chunks_to_db<R: Runtime>(
app_handle: AppHandle<R>,
body_id: &str,
workspace_id: &str,
response_id: &str,
update_source: &UpdateSource,
mut rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
) -> Result<()> {
let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE);
let mut chunk_index = 0;
let mut total_bytes: usize = 0;
while let Some(data) = rx.recv().await {
total_bytes += data.len();
buffer.extend_from_slice(&data);
// Flush when buffer reaches chunk size
while buffer.len() >= REQUEST_BODY_CHUNK_SIZE {
debug!("Writing chunk {chunk_index} to DB");
let chunk_data: Vec<u8> = buffer.drain(..REQUEST_BODY_CHUNK_SIZE).collect();
let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
app_handle.blobs().insert_chunk(&chunk)?;
app_handle.db().upsert_http_response_event(
&HttpResponseEvent::new(
response_id,
workspace_id,
yaak_http::sender::HttpResponseEvent::ChunkSent {
bytes: REQUEST_BODY_CHUNK_SIZE,
}
.into(),
),
update_source,
)?;
chunk_index += 1;
}
}
// Flush remaining data
if !buffer.is_empty() {
let chunk = BodyChunk::new(body_id, chunk_index, buffer);
debug!("Flushing remaining data {chunk_index} {}", chunk.data.len());
app_handle.blobs().insert_chunk(&chunk)?;
app_handle.db().upsert_http_response_event(
&HttpResponseEvent::new(
response_id,
workspace_id,
yaak_http::sender::HttpResponseEvent::ChunkSent { bytes: chunk.data.len() }.into(),
),
update_source,
)?;
}
// Update the response with the total request body size
app_handle.with_tx(|tx| {
debug!("Updating final body length {total_bytes}");
if let Ok(mut response) = tx.get_http_response(&response_id) {
response.request_content_length = Some(total_bytes as i32);
tx.update_http_response_if_id(&response, update_source)?;
}
Ok(())
})?;
Ok(())
}
async fn apply_authentication<R: Runtime>(
_window: &WebviewWindow<R>,
sendable_request: &mut SendableHttpRequest,
request: &HttpRequest,
auth_context_id: String,
plugin_manager: &PluginManager,
plugin_context: &PluginContext,
) -> Result<()> {
match &request.authentication_type {
None => {
// No authentication found. Not even inherited
}
Some(authentication_type) if authentication_type == "none" => {
// Explicitly no authentication
}
Some(authentication_type) => {
let req = CallHttpAuthenticationRequest {
context_id: format!("{:x}", md5::compute(auth_context_id)),
values: serde_json::from_value(serde_json::to_value(&request.authentication)?)?,
url: sendable_request.url.clone(),
method: sendable_request.method.clone(),
headers: sendable_request
.headers
.iter()
.map(|(name, value)| HttpHeader {
name: name.to_string(),
value: value.to_string(),
})
.collect(),
};
let plugin_result = plugin_manager
.call_http_authentication(plugin_context, &authentication_type, req)
.await?;
for header in plugin_result.set_headers.unwrap_or_default() {
sendable_request.insert_header((header.name, header.value));
}
if let Some(params) = plugin_result.set_query_parameters {
let params = params.into_iter().map(|p| (p.name, p.value)).collect::<Vec<_>>();
sendable_request.url = append_query_params(&sendable_request.url, params);
}
}
}
Ok(())
}

View File

@@ -5,10 +5,15 @@ edition = "2024"
publish = false
[dependencies]
async-trait = "0.1"
log = { workspace = true }
md5 = "0.8.0"
serde_json = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync", "rt"] }
yaak-http = { workspace = true }
yaak-crypto = { workspace = true }
yaak-models = { workspace = true }
yaak-plugins = { workspace = true }
yaak-templates = { workspace = true }
yaak-tls = { workspace = true }

View File

@@ -1,16 +1,37 @@
use crate::render::render_http_request;
use async_trait::async_trait;
use log::warn;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use thiserror::Error;
use tokio::sync::mpsc;
use yaak_http::sender::{HttpResponseEvent as SenderHttpResponseEvent, HttpSender, ReqwestSender};
use yaak_http::types::{SendableBody, SendableHttpRequest, SendableHttpRequestOptions};
use tokio::sync::watch;
use yaak_crypto::manager::EncryptionManager;
use yaak_http::client::{
HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth,
};
use yaak_http::cookies::CookieStore;
use yaak_http::manager::HttpConnectionManager;
use yaak_http::sender::{HttpResponseEvent as SenderHttpResponseEvent, ReqwestSender};
use yaak_http::transaction::HttpTransaction;
use yaak_http::types::{
SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params,
};
use yaak_models::blob_manager::BlobManager;
use yaak_models::models::{HttpResponse, HttpResponseEvent, HttpResponseHeader, HttpResponseState};
use yaak_models::models::{
ClientCertificate, CookieJar, DnsOverride, Environment, HttpRequest, HttpResponse,
HttpResponseEvent, HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth,
};
use yaak_models::query_manager::QueryManager;
use yaak_models::util::UpdateSource;
use yaak_plugins::events::{
CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose,
};
use yaak_plugins::manager::PluginManager;
use yaak_plugins::template_callback::PluginTemplateCallback;
use yaak_templates::{RenderOptions, TemplateCallback};
use yaak_tls::find_client_certificate;
const HTTP_EVENT_CHANNEL_CAPACITY: usize = 100;
@@ -19,15 +40,27 @@ pub enum SendHttpRequestError {
#[error("Failed to load request: {0}")]
LoadRequest(#[source] yaak_models::error::Error),
#[error("Failed to load workspace: {0}")]
LoadWorkspace(#[source] yaak_models::error::Error),
#[error("Failed to resolve environments: {0}")]
ResolveEnvironments(#[source] yaak_models::error::Error),
#[error("Failed to resolve inherited request settings: {0}")]
ResolveRequestInheritance(#[source] yaak_models::error::Error),
#[error("Failed to load cookie jar: {0}")]
LoadCookieJar(#[source] yaak_models::error::Error),
#[error("Failed to persist cookie jar: {0}")]
PersistCookieJar(#[source] yaak_models::error::Error),
#[error("Failed to render request templates: {0}")]
RenderRequest(#[source] yaak_templates::error::Error),
#[error("Failed to prepare request before send: {0}")]
PrepareSendableRequest(String),
#[error("Failed to persist response metadata: {0}")]
PersistResponse(#[source] yaak_models::error::Error),
@@ -60,51 +93,370 @@ pub enum SendHttpRequestError {
pub type Result<T> = std::result::Result<T, SendHttpRequestError>;
#[async_trait]
pub trait PrepareSendableRequest: Send + Sync {
async fn prepare_sendable_request(
&self,
rendered_request: &HttpRequest,
auth_context_id: &str,
sendable_request: &mut SendableHttpRequest,
) -> std::result::Result<(), String>;
}
#[async_trait]
pub trait SendRequestExecutor: Send + Sync {
async fn send(
&self,
sendable_request: SendableHttpRequest,
event_tx: mpsc::Sender<SenderHttpResponseEvent>,
cookie_store: Option<CookieStore>,
) -> yaak_http::error::Result<yaak_http::sender::HttpResponse>;
}
struct DefaultSendRequestExecutor;
#[async_trait]
impl SendRequestExecutor for DefaultSendRequestExecutor {
async fn send(
&self,
sendable_request: SendableHttpRequest,
event_tx: mpsc::Sender<SenderHttpResponseEvent>,
cookie_store: Option<CookieStore>,
) -> yaak_http::error::Result<yaak_http::sender::HttpResponse> {
let sender = ReqwestSender::new()?;
let transaction = match cookie_store {
Some(store) => HttpTransaction::with_cookie_store(sender, store),
None => HttpTransaction::new(sender),
};
let (_cancel_tx, cancel_rx) = watch::channel(false);
transaction.execute_with_cancellation(sendable_request, cancel_rx, event_tx).await
}
}
struct PluginPrepareSendableRequest {
plugin_manager: Arc<PluginManager>,
plugin_context: PluginContext,
cancelled_rx: Option<watch::Receiver<bool>>,
}
#[async_trait]
impl PrepareSendableRequest for PluginPrepareSendableRequest {
async fn prepare_sendable_request(
&self,
rendered_request: &HttpRequest,
auth_context_id: &str,
sendable_request: &mut SendableHttpRequest,
) -> std::result::Result<(), String> {
if let Some(cancelled_rx) = &self.cancelled_rx {
let mut cancelled_rx = cancelled_rx.clone();
tokio::select! {
result = apply_plugin_authentication(
sendable_request,
rendered_request,
auth_context_id,
&self.plugin_manager,
&self.plugin_context,
) => result,
_ = cancelled_rx.changed() => Err("Request canceled".to_string()),
}
} else {
apply_plugin_authentication(
sendable_request,
rendered_request,
auth_context_id,
&self.plugin_manager,
&self.plugin_context,
)
.await
}
}
}
struct ConnectionManagerSendRequestExecutor<'a> {
connection_manager: &'a HttpConnectionManager,
plugin_context_id: String,
query_manager: QueryManager,
workspace_id: String,
cancelled_rx: Option<watch::Receiver<bool>>,
}
#[async_trait]
impl SendRequestExecutor for ConnectionManagerSendRequestExecutor<'_> {
async fn send(
&self,
sendable_request: SendableHttpRequest,
event_tx: mpsc::Sender<SenderHttpResponseEvent>,
cookie_store: Option<CookieStore>,
) -> yaak_http::error::Result<yaak_http::sender::HttpResponse> {
let runtime_config =
resolve_http_send_runtime_config(&self.query_manager, &self.workspace_id)
.map_err(|e| yaak_http::error::Error::RequestError(e.to_string()))?;
let client_certificate =
find_client_certificate(&sendable_request.url, &runtime_config.client_certificates);
let cached_client = self
.connection_manager
.get_client(&HttpConnectionOptions {
id: self.plugin_context_id.clone(),
validate_certificates: runtime_config.validate_certificates,
proxy: runtime_config.proxy,
client_certificate,
dns_overrides: runtime_config.dns_overrides,
})
.await?;
cached_client.resolver.set_event_sender(Some(event_tx.clone())).await;
let sender = ReqwestSender::with_client(cached_client.client);
let transaction = match cookie_store {
Some(cs) => HttpTransaction::with_cookie_store(sender, cs),
None => HttpTransaction::new(sender),
};
let result = if let Some(cancelled_rx) = self.cancelled_rx.clone() {
transaction.execute_with_cancellation(sendable_request, cancelled_rx, event_tx).await
} else {
let (_cancel_tx, cancel_rx) = watch::channel(false);
transaction.execute_with_cancellation(sendable_request, cancel_rx, event_tx).await
};
cached_client.resolver.set_event_sender(None).await;
result
}
}
pub struct SendHttpRequestByIdParams<'a, T: TemplateCallback> {
pub query_manager: &'a QueryManager,
pub blob_manager: &'a BlobManager,
pub request_id: &'a str,
pub environment_id: Option<&'a str>,
pub template_callback: &'a T,
pub send_options: SendableHttpRequestOptions,
pub update_source: UpdateSource,
pub cookie_jar_id: Option<String>,
pub cookie_jar_update_source: UpdateSource,
pub response_dir: &'a Path,
pub persist_events: bool,
pub emit_events_to: Option<mpsc::Sender<SenderHttpResponseEvent>>,
pub prepare_sendable_request: Option<&'a dyn PrepareSendableRequest>,
pub executor: Option<&'a dyn SendRequestExecutor>,
}
pub struct SendHttpRequestParams<'a, T: TemplateCallback> {
pub query_manager: &'a QueryManager,
pub blob_manager: &'a BlobManager,
pub request: HttpRequest,
pub environment_id: Option<&'a str>,
pub template_callback: &'a T,
pub send_options: Option<SendableHttpRequestOptions>,
pub update_source: UpdateSource,
pub cookie_jar_id: Option<String>,
pub cookie_jar_update_source: UpdateSource,
pub response_dir: &'a Path,
pub persist_events: bool,
pub emit_events_to: Option<mpsc::Sender<SenderHttpResponseEvent>>,
pub auth_context_id: Option<String>,
pub existing_response: Option<HttpResponse>,
pub prepare_sendable_request: Option<&'a dyn PrepareSendableRequest>,
pub executor: Option<&'a dyn SendRequestExecutor>,
}
pub struct SendHttpRequestWithPluginsParams<'a> {
pub query_manager: &'a QueryManager,
pub blob_manager: &'a BlobManager,
pub request: HttpRequest,
pub environment_id: Option<&'a str>,
pub update_source: UpdateSource,
pub cookie_jar_id: Option<String>,
pub cookie_jar_update_source: UpdateSource,
pub response_dir: &'a Path,
pub persist_events: bool,
pub emit_events_to: Option<mpsc::Sender<SenderHttpResponseEvent>>,
pub existing_response: Option<HttpResponse>,
pub plugin_manager: Arc<PluginManager>,
pub encryption_manager: Arc<EncryptionManager>,
pub plugin_context: &'a PluginContext,
pub cancelled_rx: Option<watch::Receiver<bool>>,
pub connection_manager: Option<&'a HttpConnectionManager>,
}
pub struct SendHttpRequestByIdWithPluginsParams<'a> {
pub query_manager: &'a QueryManager,
pub blob_manager: &'a BlobManager,
pub request_id: &'a str,
pub environment_id: Option<&'a str>,
pub update_source: UpdateSource,
pub cookie_jar_id: Option<String>,
pub cookie_jar_update_source: UpdateSource,
pub response_dir: &'a Path,
pub persist_events: bool,
pub emit_events_to: Option<mpsc::Sender<SenderHttpResponseEvent>>,
pub plugin_manager: Arc<PluginManager>,
pub encryption_manager: Arc<EncryptionManager>,
pub plugin_context: &'a PluginContext,
pub cancelled_rx: Option<watch::Receiver<bool>>,
pub connection_manager: Option<&'a HttpConnectionManager>,
}
pub struct SendHttpRequestResult {
pub rendered_request: yaak_models::models::HttpRequest,
pub rendered_request: HttpRequest,
pub response: HttpResponse,
pub response_body: Vec<u8>,
}
pub struct HttpSendRuntimeConfig {
pub send_options: SendableHttpRequestOptions,
pub validate_certificates: bool,
pub proxy: HttpConnectionProxySetting,
pub dns_overrides: Vec<DnsOverride>,
pub client_certificates: Vec<ClientCertificate>,
}
pub fn resolve_http_send_runtime_config(
query_manager: &QueryManager,
workspace_id: &str,
) -> Result<HttpSendRuntimeConfig> {
let db = query_manager.connect();
let workspace = db.get_workspace(workspace_id).map_err(SendHttpRequestError::LoadWorkspace)?;
let settings = db.get_settings();
Ok(HttpSendRuntimeConfig {
send_options: SendableHttpRequestOptions {
follow_redirects: workspace.setting_follow_redirects,
timeout: if workspace.setting_request_timeout > 0 {
Some(std::time::Duration::from_millis(
workspace.setting_request_timeout.unsigned_abs() as u64,
))
} else {
None
},
},
validate_certificates: workspace.setting_validate_certificates,
proxy: proxy_setting_from_settings(settings.proxy),
dns_overrides: workspace.setting_dns_overrides,
client_certificates: settings.client_certificates,
})
}
pub async fn send_http_request_by_id_with_plugins(
params: SendHttpRequestByIdWithPluginsParams<'_>,
) -> Result<SendHttpRequestResult> {
let request = params
.query_manager
.connect()
.get_http_request(params.request_id)
.map_err(SendHttpRequestError::LoadRequest)?;
send_http_request_with_plugins(SendHttpRequestWithPluginsParams {
query_manager: params.query_manager,
blob_manager: params.blob_manager,
request,
environment_id: params.environment_id,
update_source: params.update_source,
cookie_jar_id: params.cookie_jar_id,
cookie_jar_update_source: params.cookie_jar_update_source,
response_dir: params.response_dir,
persist_events: params.persist_events,
emit_events_to: params.emit_events_to,
existing_response: None,
plugin_manager: params.plugin_manager,
encryption_manager: params.encryption_manager,
plugin_context: params.plugin_context,
cancelled_rx: params.cancelled_rx,
connection_manager: params.connection_manager,
})
.await
}
pub async fn send_http_request_with_plugins(
params: SendHttpRequestWithPluginsParams<'_>,
) -> Result<SendHttpRequestResult> {
let template_callback = PluginTemplateCallback::new(
params.plugin_manager.clone(),
params.encryption_manager.clone(),
params.plugin_context,
RenderPurpose::Send,
);
let auth_hook = PluginPrepareSendableRequest {
plugin_manager: params.plugin_manager,
plugin_context: params.plugin_context.clone(),
cancelled_rx: params.cancelled_rx.clone(),
};
let executor =
params.connection_manager.map(|connection_manager| ConnectionManagerSendRequestExecutor {
connection_manager,
plugin_context_id: params.plugin_context.id.clone(),
query_manager: params.query_manager.clone(),
workspace_id: params.request.workspace_id.clone(),
cancelled_rx: params.cancelled_rx.clone(),
});
send_http_request(SendHttpRequestParams {
query_manager: params.query_manager,
blob_manager: params.blob_manager,
request: params.request,
environment_id: params.environment_id,
template_callback: &template_callback,
send_options: None,
update_source: params.update_source,
cookie_jar_id: params.cookie_jar_id,
cookie_jar_update_source: params.cookie_jar_update_source,
response_dir: params.response_dir,
persist_events: params.persist_events,
emit_events_to: params.emit_events_to,
auth_context_id: None,
existing_response: params.existing_response,
prepare_sendable_request: Some(&auth_hook),
executor: executor.as_ref().map(|e| e as &dyn SendRequestExecutor),
})
.await
}
pub async fn send_http_request_by_id<T: TemplateCallback>(
params: SendHttpRequestByIdParams<'_, T>,
) -> Result<SendHttpRequestResult> {
let db = params.query_manager.connect();
let request =
db.get_http_request(params.request_id).map_err(SendHttpRequestError::LoadRequest)?;
let environment_chain = db
.resolve_environments(
&request.workspace_id,
request.folder_id.as_deref(),
params.environment_id,
)
.map_err(SendHttpRequestError::ResolveEnvironments)?;
let request = params
.query_manager
.connect()
.get_http_request(params.request_id)
.map_err(SendHttpRequestError::LoadRequest)?;
let (request, auth_context_id) = resolve_inherited_request(params.query_manager, &request)?;
let (authentication_type, authentication, _auth_context_id) = db
.resolve_auth_for_http_request(&request)
.map_err(SendHttpRequestError::ResolveRequestInheritance)?;
let resolved_headers = db
.resolve_headers_for_http_request(&request)
.map_err(SendHttpRequestError::ResolveRequestInheritance)?;
drop(db);
send_http_request(SendHttpRequestParams {
query_manager: params.query_manager,
blob_manager: params.blob_manager,
request,
environment_id: params.environment_id,
template_callback: params.template_callback,
send_options: None,
update_source: params.update_source,
cookie_jar_id: params.cookie_jar_id,
cookie_jar_update_source: params.cookie_jar_update_source,
response_dir: params.response_dir,
persist_events: params.persist_events,
emit_events_to: params.emit_events_to,
existing_response: None,
prepare_sendable_request: params.prepare_sendable_request,
executor: params.executor,
auth_context_id: Some(auth_context_id),
})
.await
}
let mut resolved_request = request.clone();
resolved_request.authentication_type = authentication_type;
resolved_request.authentication = authentication;
resolved_request.headers = resolved_headers;
pub async fn send_http_request<T: TemplateCallback>(
params: SendHttpRequestParams<'_, T>,
) -> Result<SendHttpRequestResult> {
let environment_chain =
resolve_environment_chain(params.query_manager, &params.request, params.environment_id)?;
let (resolved_request, auth_context_id) =
if let Some(auth_context_id) = params.auth_context_id.clone() {
(params.request.clone(), auth_context_id)
} else {
resolve_inherited_request(params.query_manager, &params.request)?
};
let runtime_config =
resolve_http_send_runtime_config(params.query_manager, &params.request.workspace_id)?;
let send_options = params.send_options.unwrap_or(runtime_config.send_options);
let mut cookie_jar = load_cookie_jar(params.query_manager, params.cookie_jar_id.as_deref())?;
let cookie_store =
cookie_jar.as_ref().map(|jar| CookieStore::from_cookies(jar.cookies.clone()));
let rendered_request = render_http_request(
&resolved_request,
@@ -115,41 +467,52 @@ pub async fn send_http_request_by_id<T: TemplateCallback>(
.await
.map_err(SendHttpRequestError::RenderRequest)?;
let sendable_request =
SendableHttpRequest::from_http_request(&rendered_request, params.send_options)
let mut sendable_request =
SendableHttpRequest::from_http_request(&rendered_request, send_options)
.await
.map_err(SendHttpRequestError::BuildSendableRequest)?;
let request_content_length = sendable_body_length(sendable_request.body.as_ref());
let mut response = params
if let Some(hook) = params.prepare_sendable_request {
hook.prepare_sendable_request(&rendered_request, &auth_context_id, &mut sendable_request)
.await
.map_err(SendHttpRequestError::PrepareSendableRequest)?;
}
let request_content_length = sendable_body_length(sendable_request.body.as_ref());
let mut response = params.existing_response.unwrap_or_default();
response.request_id = params.request.id.clone();
response.workspace_id = params.request.workspace_id.clone();
response.request_content_length = request_content_length;
response.request_headers = sendable_request
.headers
.iter()
.map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
.collect();
response.url = sendable_request.url.clone();
response.state = HttpResponseState::Initialized;
response.error = None;
response.content_length = None;
response.content_length_compressed = None;
response.body_path = None;
response.status = 0;
response.status_reason = None;
response.headers = Vec::new();
response.remote_addr = None;
response.version = None;
response.elapsed = 0;
response.elapsed_headers = 0;
response.elapsed_dns = 0;
response = params
.query_manager
.connect()
.upsert_http_response(
&HttpResponse {
request_id: request.id.clone(),
workspace_id: request.workspace_id.clone(),
request_content_length,
request_headers: sendable_request
.headers
.iter()
.map(|(name, value)| HttpResponseHeader {
name: name.clone(),
value: value.clone(),
})
.collect(),
url: sendable_request.url.clone(),
..Default::default()
},
&params.update_source,
params.blob_manager,
)
.upsert_http_response(&response, &params.update_source, params.blob_manager)
.map_err(SendHttpRequestError::PersistResponse)?;
let (event_tx, mut event_rx) =
mpsc::channel::<SenderHttpResponseEvent>(HTTP_EVENT_CHANNEL_CAPACITY);
let event_query_manager = params.query_manager.clone();
let event_response_id = response.id.clone();
let event_workspace_id = request.workspace_id.clone();
let event_workspace_id = params.request.workspace_id.clone();
let event_update_source = params.update_source.clone();
let emit_events_to = params.emit_events_to.clone();
let persist_events = params.persist_events;
@@ -175,34 +538,33 @@ pub async fn send_http_request_by_id<T: TemplateCallback>(
}
});
let sender = ReqwestSender::new().map_err(SendHttpRequestError::CreateHttpClient)?;
let default_executor = DefaultSendRequestExecutor;
let executor = params.executor.unwrap_or(&default_executor);
let started_at = Instant::now();
let request_started_url = sendable_request.url.clone();
let http_response = match sender.send(sendable_request, event_tx).await {
let http_response = match executor.send(sendable_request, event_tx, cookie_store.clone()).await
{
Ok(response) => response,
Err(err) => {
let _ = params
.query_manager
.connect()
.upsert_http_response(
&HttpResponse {
state: HttpResponseState::Closed,
elapsed: duration_to_i32(started_at.elapsed()),
elapsed_headers: duration_to_i32(started_at.elapsed()),
error: Some(err.to_string()),
url: request_started_url,
..response
},
&params.update_source,
params.blob_manager,
)
.map_err(SendHttpRequestError::PersistResponse)?;
persist_cookie_jar(
params.query_manager,
cookie_jar.as_mut(),
cookie_store.as_ref(),
&params.cookie_jar_update_source,
)?;
let _ = persist_response_error(
params.query_manager,
params.blob_manager,
&params.update_source,
&response,
started_at,
err.to_string(),
request_started_url,
);
if let Err(join_err) = event_handle.await {
warn!("Failed to join response event task: {}", join_err);
}
return Err(SendHttpRequestError::SendRequest(err));
}
};
@@ -279,10 +641,179 @@ pub async fn send_http_request_by_id<T: TemplateCallback>(
if let Err(join_err) = event_handle.await {
warn!("Failed to join response event task: {}", join_err);
}
persist_cookie_jar(
params.query_manager,
cookie_jar.as_mut(),
cookie_store.as_ref(),
&params.cookie_jar_update_source,
)?;
Ok(SendHttpRequestResult { rendered_request, response, response_body })
}
fn resolve_environment_chain(
query_manager: &QueryManager,
request: &HttpRequest,
environment_id: Option<&str>,
) -> Result<Vec<Environment>> {
let db = query_manager.connect();
db.resolve_environments(&request.workspace_id, request.folder_id.as_deref(), environment_id)
.map_err(SendHttpRequestError::ResolveEnvironments)
}
fn resolve_inherited_request(
query_manager: &QueryManager,
request: &HttpRequest,
) -> Result<(HttpRequest, String)> {
let db = query_manager.connect();
let (authentication_type, authentication, auth_context_id) = db
.resolve_auth_for_http_request(request)
.map_err(SendHttpRequestError::ResolveRequestInheritance)?;
let resolved_headers = db
.resolve_headers_for_http_request(request)
.map_err(SendHttpRequestError::ResolveRequestInheritance)?;
let mut request = request.clone();
request.authentication_type = authentication_type;
request.authentication = authentication;
request.headers = resolved_headers;
Ok((request, auth_context_id))
}
fn load_cookie_jar(
query_manager: &QueryManager,
cookie_jar_id: Option<&str>,
) -> Result<Option<CookieJar>> {
let Some(cookie_jar_id) = cookie_jar_id else {
return Ok(None);
};
query_manager
.connect()
.get_cookie_jar(cookie_jar_id)
.map(Some)
.map_err(SendHttpRequestError::LoadCookieJar)
}
fn persist_cookie_jar(
query_manager: &QueryManager,
cookie_jar: Option<&mut CookieJar>,
cookie_store: Option<&CookieStore>,
source: &UpdateSource,
) -> Result<()> {
match (cookie_jar, cookie_store) {
(Some(cookie_jar), Some(cookie_store)) => {
cookie_jar.cookies = cookie_store.get_all_cookies();
query_manager
.connect()
.upsert_cookie_jar(cookie_jar, source)
.map_err(SendHttpRequestError::PersistCookieJar)?;
Ok(())
}
_ => Ok(()),
}
}
fn proxy_setting_from_settings(proxy: Option<ProxySetting>) -> HttpConnectionProxySetting {
match proxy {
None => HttpConnectionProxySetting::System,
Some(ProxySetting::Disabled) => HttpConnectionProxySetting::Disabled,
Some(ProxySetting::Enabled { http, https, auth, bypass, disabled }) => {
if disabled {
HttpConnectionProxySetting::System
} else {
HttpConnectionProxySetting::Enabled {
http,
https,
bypass,
auth: auth.map(|ProxySettingAuth { user, password }| {
HttpConnectionProxySettingAuth { user, password }
}),
}
}
}
}
}
pub async fn apply_plugin_authentication(
sendable_request: &mut SendableHttpRequest,
request: &HttpRequest,
auth_context_id: &str,
plugin_manager: &PluginManager,
plugin_context: &PluginContext,
) -> std::result::Result<(), String> {
match &request.authentication_type {
None => {}
Some(authentication_type) if authentication_type == "none" => {}
Some(authentication_type) => {
let req = CallHttpAuthenticationRequest {
context_id: format!("{:x}", md5::compute(auth_context_id)),
values: serde_json::from_value(
serde_json::to_value(&request.authentication)
.map_err(|e| format!("Failed to serialize auth values: {e}"))?,
)
.map_err(|e| format!("Failed to parse auth values: {e}"))?,
url: sendable_request.url.clone(),
method: sendable_request.method.clone(),
headers: sendable_request
.headers
.iter()
.map(|(name, value)| HttpHeader {
name: name.to_string(),
value: value.to_string(),
})
.collect(),
};
let plugin_result = plugin_manager
.call_http_authentication(plugin_context, authentication_type, req)
.await
.map_err(|e| format!("Failed to apply authentication plugin: {e}"))?;
for header in plugin_result.set_headers.unwrap_or_default() {
sendable_request.insert_header((header.name, header.value));
}
if let Some(params) = plugin_result.set_query_parameters {
let params = params.into_iter().map(|p| (p.name, p.value)).collect::<Vec<_>>();
sendable_request.url = append_query_params(&sendable_request.url, params);
}
}
}
Ok(())
}
fn persist_response_error(
query_manager: &QueryManager,
blob_manager: &BlobManager,
update_source: &UpdateSource,
response: &HttpResponse,
started_at: Instant,
error: String,
fallback_url: String,
) -> Result<HttpResponse> {
let elapsed = duration_to_i32(started_at.elapsed());
query_manager
.connect()
.upsert_http_response(
&HttpResponse {
state: HttpResponseState::Closed,
elapsed,
elapsed_headers: if response.elapsed_headers == 0 {
elapsed
} else {
response.elapsed_headers
},
error: Some(error),
url: if response.url.is_empty() { fallback_url } else { response.url.clone() },
..response.clone()
},
update_source,
blob_manager,
)
.map_err(SendHttpRequestError::PersistResponse)
}
fn sendable_body_length(body: Option<&SendableBody>) -> Option<i32> {
match body {
Some(SendableBody::Bytes(bytes)) => Some(usize_to_i32(bytes.len())),