From 603ae0650d82cc6f55c3245856ac362c46455b17 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Mon, 16 Feb 2026 15:09:06 -0800 Subject: [PATCH] refactor send callers to shared plugin runtime helper --- Cargo.lock | 5 + crates-cli/yaak-cli/src/commands/request.rs | 23 +- crates-tauri/yaak-app/src/http_request.rs | 593 +---------------- crates/yaak/Cargo.toml | 5 + crates/yaak/src/send.rs | 671 ++++++++++++++++++-- 5 files changed, 650 insertions(+), 647 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d0d3b514..ffb476f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8243,13 +8243,18 @@ dependencies = [ name = "yaak" version = "0.1.0" dependencies = [ + "async-trait", "log 0.4.29", + "md5 0.8.0", "serde_json", "thiserror 2.0.17", "tokio", + "yaak-crypto", "yaak-http", "yaak-models", + "yaak-plugins", "yaak-templates", + "yaak-tls", ] [[package]] diff --git a/crates-cli/yaak-cli/src/commands/request.rs b/crates-cli/yaak-cli/src/commands/request.rs index f5682c5c..d762a2e4 100644 --- a/crates-cli/yaak-cli/src/commands/request.rs +++ b/crates-cli/yaak-cli/src/commands/request.rs @@ -6,12 +6,10 @@ use crate::commands::json::{ }; use crate::context::CliContext; use tokio::sync::mpsc; -use yaak::send::{SendHttpRequestByIdParams, send_http_request_by_id}; -use yaak_http::types::SendableHttpRequestOptions; +use yaak::send::{SendHttpRequestByIdWithPluginsParams, send_http_request_by_id_with_plugins}; use yaak_models::models::HttpRequest; use yaak_models::util::UpdateSource; -use yaak_plugins::events::{PluginContext, RenderPurpose}; -use yaak_plugins::template_callback::PluginTemplateCallback; +use yaak_plugins::events::PluginContext; pub async fn run( ctx: &CliContext, @@ -174,12 +172,6 @@ pub async fn send_request_by_id( ctx.db().get_http_request(request_id).map_err(|e| format!("Failed to get request: {e}"))?; let plugin_context = PluginContext::new(None, Some(request.workspace_id.clone())); - let template_callback = PluginTemplateCallback::new( - ctx.plugin_manager(), - ctx.encryption_manager.clone(), - &plugin_context, - RenderPurpose::Send, - ); let (event_tx, mut event_rx) = mpsc::channel(100); let event_handle = tokio::spawn(async move { @@ -191,17 +183,22 @@ pub async fn send_request_by_id( }); let response_dir = ctx.data_dir().join("responses"); - let result = send_http_request_by_id(SendHttpRequestByIdParams { + let result = send_http_request_by_id_with_plugins(SendHttpRequestByIdWithPluginsParams { query_manager: ctx.query_manager(), blob_manager: ctx.blob_manager(), request_id, environment_id: environment, - template_callback: &template_callback, - send_options: SendableHttpRequestOptions::default(), update_source: UpdateSource::Sync, + cookie_jar_id: None, + cookie_jar_update_source: UpdateSource::Sync, response_dir: &response_dir, persist_events: true, emit_events_to: Some(event_tx), + plugin_manager: ctx.plugin_manager(), + encryption_manager: ctx.encryption_manager.clone(), + plugin_context: &plugin_context, + cancelled_rx: None, + connection_manager: None, }) .await; diff --git a/crates-tauri/yaak-app/src/http_request.rs b/crates-tauri/yaak-app/src/http_request.rs index 4618bc87..4fdcf667 100644 --- a/crates-tauri/yaak-app/src/http_request.rs +++ b/crates-tauri/yaak-app/src/http_request.rs @@ -3,45 +3,18 @@ use crate::error::Error::GenericError; use crate::error::Result; use crate::models_ext::BlobManagerExt; use crate::models_ext::QueryManagerExt; -use crate::render::render_http_request; -use log::{debug, warn}; -use std::pin::Pin; +use log::warn; use std::sync::Arc; -use std::sync::atomic::{AtomicI32, Ordering}; -use std::time::{Duration, Instant}; +use std::time::Instant; use tauri::{AppHandle, Manager, Runtime, WebviewWindow}; -use tokio::fs::{File, create_dir_all}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt}; use tokio::sync::watch::Receiver; -use tokio_util::bytes::Bytes; +use yaak::send::{SendHttpRequestWithPluginsParams, send_http_request_with_plugins}; use yaak_crypto::manager::EncryptionManager; -use yaak_http::client::{ - HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth, -}; -use yaak_http::cookies::CookieStore; -use yaak_http::manager::{CachedClient, HttpConnectionManager}; -use yaak_http::sender::ReqwestSender; -use yaak_http::tee_reader::TeeReader; -use yaak_http::transaction::HttpTransaction; -use yaak_http::types::{ - SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params, -}; -use yaak_models::blob_manager::BodyChunk; -use yaak_models::models::{ - CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseEvent, HttpResponseHeader, - HttpResponseState, ProxySetting, ProxySettingAuth, -}; +use yaak_http::manager::HttpConnectionManager; +use yaak_models::models::{CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseState}; use yaak_models::util::UpdateSource; -use yaak_plugins::events::{ - CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose, -}; +use yaak_plugins::events::PluginContext; use yaak_plugins::manager::PluginManager; -use yaak_plugins::template_callback::PluginTemplateCallback; -use yaak_templates::RenderOptions; -use yaak_tls::find_client_certificate; - -/// Chunk size for storing request bodies (1MB) -const REQUEST_BODY_CHUNK_SIZE: usize = 1024 * 1024; /// Context for managing response state during HTTP transactions. /// Handles both persisted responses (stored in DB) and ephemeral responses (in-memory only). @@ -168,148 +141,32 @@ async fn send_http_request_inner( let plugin_manager = Arc::new((*app_handle.state::()).clone()); let encryption_manager = Arc::new((*app_handle.state::()).clone()); let connection_manager = app_handle.state::(); - let settings = window.db().get_settings(); - let workspace_id = &unrendered_request.workspace_id; - let folder_id = unrendered_request.folder_id.as_deref(); let environment_id = environment.map(|e| e.id); - let workspace = window.db().get_workspace(workspace_id)?; - let (resolved, auth_context_id) = resolve_http_request(window, unrendered_request)?; - let cb = PluginTemplateCallback::new( - plugin_manager.clone(), - encryption_manager.clone(), - &plugin_context, - RenderPurpose::Send, - ); - let env_chain = - window.db().resolve_environments(&workspace.id, folder_id, environment_id.as_deref())?; - let mut cancel_rx = cancelled_rx.clone(); - let render_options = RenderOptions::throw(); - let request = tokio::select! { - result = render_http_request(&resolved, env_chain, &cb, &render_options) => result?, - _ = cancel_rx.changed() => { - return Err(GenericError("Request canceled".to_string())); - } - }; + let cookie_jar_id = cookie_jar.as_ref().map(|jar| jar.id.clone()); - // Build the sendable request using the new SendableHttpRequest type - let options = SendableHttpRequestOptions { - follow_redirects: workspace.setting_follow_redirects, - timeout: if workspace.setting_request_timeout > 0 { - Some(Duration::from_millis(workspace.setting_request_timeout.unsigned_abs() as u64)) - } else { - None - }, - }; - let mut sendable_request = SendableHttpRequest::from_http_request(&request, options).await?; + let response_dir = app_handle.path().app_data_dir()?.join("responses"); + let result = send_http_request_with_plugins(SendHttpRequestWithPluginsParams { + query_manager: app_handle.db_manager().inner(), + blob_manager: app_handle.blob_manager().inner(), + request: unrendered_request.clone(), + environment_id: environment_id.as_deref(), + update_source: response_ctx.update_source.clone(), + cookie_jar_id, + cookie_jar_update_source: UpdateSource::Background, + response_dir: &response_dir, + persist_events: true, + emit_events_to: None, + existing_response: Some(response_ctx.response().clone()), + plugin_manager, + encryption_manager, + plugin_context, + cancelled_rx: Some(cancelled_rx.clone()), + connection_manager: Some(connection_manager.inner()), + }) + .await + .map_err(|e| GenericError(e.to_string()))?; - debug!("Sending request to {} {}", sendable_request.method, sendable_request.url); - - let proxy_setting = match settings.proxy { - None => HttpConnectionProxySetting::System, - Some(ProxySetting::Disabled) => HttpConnectionProxySetting::Disabled, - Some(ProxySetting::Enabled { http, https, auth, bypass, disabled }) => { - if disabled { - HttpConnectionProxySetting::System - } else { - HttpConnectionProxySetting::Enabled { - http, - https, - bypass, - auth: match auth { - None => None, - Some(ProxySettingAuth { user, password }) => { - Some(HttpConnectionProxySettingAuth { user, password }) - } - }, - } - } - } - }; - - let client_certificate = - find_client_certificate(&sendable_request.url, &settings.client_certificates); - - // Create cookie store if a cookie jar is specified - let maybe_cookie_store = match cookie_jar.clone() { - Some(CookieJar { id, .. }) => { - // NOTE: We need to refetch the cookie jar because a chained request might have - // updated cookies when we rendered the request. - let cj = window.db().get_cookie_jar(&id)?; - let cookie_store = CookieStore::from_cookies(cj.cookies.clone()); - Some((cookie_store, cj)) - } - None => None, - }; - - let cached_client = connection_manager - .get_client(&HttpConnectionOptions { - id: plugin_context.id.clone(), - validate_certificates: workspace.setting_validate_certificates, - proxy: proxy_setting, - client_certificate, - dns_overrides: workspace.setting_dns_overrides.clone(), - }) - .await?; - - // Apply authentication to the request, racing against cancellation since - // auth plugins (e.g. OAuth2) can block indefinitely waiting for user action. - let mut cancel_rx = cancelled_rx.clone(); - tokio::select! { - result = apply_authentication( - &window, - &mut sendable_request, - &request, - auth_context_id, - &plugin_manager, - plugin_context, - ) => result?, - _ = cancel_rx.changed() => { - return Err(GenericError("Request canceled".to_string())); - } - }; - - let cookie_store = maybe_cookie_store.as_ref().map(|(cs, _)| cs.clone()); - let result = execute_transaction( - cached_client, - sendable_request, - response_ctx, - cancelled_rx.clone(), - cookie_store, - ) - .await; - - // Wait for blob writing to complete and check for errors - let final_result = match result { - Ok((response, maybe_blob_write_handle)) => { - // Check if blob writing failed - if let Some(handle) = maybe_blob_write_handle { - if let Ok(Err(e)) = handle.await { - // Update response with the storage error - let _ = response_ctx.update(|r| { - let error_msg = - format!("Request succeeded but failed to store request body: {}", e); - r.error = Some(match &r.error { - Some(existing) => format!("{}; {}", existing, error_msg), - None => error_msg, - }); - }); - } - } - Ok(response) - } - Err(e) => Err(e), - }; - - // Persist cookies back to the database after the request completes - if let Some((cookie_store, mut cj)) = maybe_cookie_store { - let cookies = cookie_store.get_all_cookies(); - cj.cookies = cookies; - if let Err(e) = window.db().upsert_cookie_jar(&cj, &UpdateSource::Background) { - warn!("Failed to persist cookies to database: {}", e); - } - } - - final_result + Ok(result.response) } pub fn resolve_http_request( @@ -328,395 +185,3 @@ pub fn resolve_http_request( Ok((new_request, authentication_context_id)) } - -async fn execute_transaction( - cached_client: CachedClient, - mut sendable_request: SendableHttpRequest, - response_ctx: &mut ResponseContext, - mut cancelled_rx: Receiver, - cookie_store: Option, -) -> Result<(HttpResponse, Option>>)> { - let app_handle = &response_ctx.app_handle.clone(); - let response_id = response_ctx.response().id.clone(); - let workspace_id = response_ctx.response().workspace_id.clone(); - let is_persisted = response_ctx.is_persisted(); - - // Keep a reference to the resolver for DNS timing events - let resolver = cached_client.resolver.clone(); - - let sender = ReqwestSender::with_client(cached_client.client); - let transaction = match cookie_store { - Some(cs) => HttpTransaction::with_cookie_store(sender, cs), - None => HttpTransaction::new(sender), - }; - let start = Instant::now(); - - // Capture request headers before sending - let request_headers: Vec = sendable_request - .headers - .iter() - .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() }) - .collect(); - - // Update response with headers info - response_ctx.update(|r| { - r.url = sendable_request.url.clone(); - r.request_headers = request_headers; - })?; - - // Create bounded channel for receiving events and spawn a task to store them in DB - // Buffer size of 100 events provides back pressure if DB writes are slow - let (event_tx, mut event_rx) = - tokio::sync::mpsc::channel::(100); - - // Set the event sender on the DNS resolver so it can emit DNS timing events - resolver.set_event_sender(Some(event_tx.clone())).await; - - // Shared state to capture DNS timing from the event processing task - let dns_elapsed = Arc::new(AtomicI32::new(0)); - - // Write events to DB in a task (only for persisted responses) - if is_persisted { - let response_id = response_id.clone(); - let app_handle = app_handle.clone(); - let update_source = response_ctx.update_source.clone(); - let workspace_id = workspace_id.clone(); - let dns_elapsed = dns_elapsed.clone(); - tokio::spawn(async move { - while let Some(event) = event_rx.recv().await { - // Capture DNS timing when we see a DNS event - if let yaak_http::sender::HttpResponseEvent::DnsResolved { duration, .. } = &event { - dns_elapsed.store(*duration as i32, Ordering::SeqCst); - } - let db_event = HttpResponseEvent::new(&response_id, &workspace_id, event.into()); - let _ = app_handle.db().upsert_http_response_event(&db_event, &update_source); - } - }); - } else { - // For ephemeral responses, just drain the events but still capture DNS timing - let dns_elapsed = dns_elapsed.clone(); - tokio::spawn(async move { - while let Some(event) = event_rx.recv().await { - if let yaak_http::sender::HttpResponseEvent::DnsResolved { duration, .. } = &event { - dns_elapsed.store(*duration as i32, Ordering::SeqCst); - } - } - }); - }; - - // Capture request body as it's sent (only for persisted responses) - let body_id = format!("{}.request", response_id); - let maybe_blob_write_handle = match sendable_request.body { - Some(SendableBody::Bytes(bytes)) => { - if is_persisted { - write_bytes_to_db_sync(response_ctx, &body_id, bytes.clone())?; - } - sendable_request.body = Some(SendableBody::Bytes(bytes)); - None - } - Some(SendableBody::Stream { data: stream, content_length }) => { - // Wrap stream with TeeReader to capture data as it's read - // Use unbounded channel to ensure all data is captured without blocking the HTTP request - let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::>(); - let tee_reader = TeeReader::new(stream, body_chunk_tx); - let pinned: Pin> = Box::pin(tee_reader); - - let handle = if is_persisted { - // Spawn task to write request body chunks to blob DB - let app_handle = app_handle.clone(); - let response_id = response_id.clone(); - let workspace_id = workspace_id.clone(); - let body_id = body_id.clone(); - let update_source = response_ctx.update_source.clone(); - Some(tauri::async_runtime::spawn(async move { - write_stream_chunks_to_db( - app_handle, - &body_id, - &workspace_id, - &response_id, - &update_source, - body_chunk_rx, - ) - .await - })) - } else { - // For ephemeral responses, just drain the body chunks - tauri::async_runtime::spawn(async move { - let mut rx = body_chunk_rx; - while rx.recv().await.is_some() {} - }); - None - }; - - sendable_request.body = Some(SendableBody::Stream { data: pinned, content_length }); - handle - } - None => { - sendable_request.body = None; - None - } - }; - - // Execute the transaction with cancellation support - // This returns the response with headers, but body is not yet consumed - // Events (headers, settings, chunks) are sent through the channel - let mut http_response = transaction - .execute_with_cancellation(sendable_request, cancelled_rx.clone(), event_tx) - .await?; - - // Prepare the response path before consuming the body - let body_path = if response_id.is_empty() { - // Ephemeral responses: use OS temp directory for automatic cleanup - let temp_dir = std::env::temp_dir().join("yaak-ephemeral-responses"); - create_dir_all(&temp_dir).await?; - temp_dir.join(uuid::Uuid::new_v4().to_string()) - } else { - // Persisted responses: use app data directory - let dir = app_handle.path().app_data_dir()?; - let base_dir = dir.join("responses"); - create_dir_all(&base_dir).await?; - base_dir.join(&response_id) - }; - - // Extract metadata before consuming the body (headers are available immediately) - // Url might change, so update again - response_ctx.update(|r| { - r.body_path = Some(body_path.to_string_lossy().to_string()); - r.elapsed_headers = start.elapsed().as_millis() as i32; - r.status = http_response.status as i32; - r.status_reason = http_response.status_reason.clone(); - r.url = http_response.url.clone(); - r.remote_addr = http_response.remote_addr.clone(); - r.version = http_response.version.clone(); - r.headers = http_response - .headers - .iter() - .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() }) - .collect(); - r.content_length = http_response.content_length.map(|l| l as i32); - r.state = HttpResponseState::Connected; - r.request_headers = http_response - .request_headers - .iter() - .map(|(n, v)| HttpResponseHeader { name: n.clone(), value: v.clone() }) - .collect(); - })?; - - // Get the body stream for manual consumption - let mut body_stream = http_response.into_body_stream()?; - - // Open file for writing - let mut file = File::options() - .create(true) - .truncate(true) - .write(true) - .open(&body_path) - .await - .map_err(|e| GenericError(format!("Failed to open file: {}", e)))?; - - // Stream body to file, with throttled DB updates to avoid excessive writes - let mut written_bytes: usize = 0; - let mut last_update_time = start; - let mut buf = [0u8; 8192]; - - // Throttle settings: update DB at most every 100ms - const UPDATE_INTERVAL_MS: u128 = 100; - - loop { - // Check for cancellation. If we already have headers/body, just close cleanly without error - if *cancelled_rx.borrow() { - break; - } - - // Use select! to race between reading and cancellation, so cancellation is immediate - let read_result = tokio::select! { - biased; - _ = cancelled_rx.changed() => { - break; - } - result = body_stream.read(&mut buf) => result, - }; - - match read_result { - Ok(0) => break, // EOF - Ok(n) => { - file.write_all(&buf[..n]) - .await - .map_err(|e| GenericError(format!("Failed to write to file: {}", e)))?; - file.flush() - .await - .map_err(|e| GenericError(format!("Failed to flush file: {}", e)))?; - written_bytes += n; - - // Throttle DB updates: only update if enough time has passed - let now = Instant::now(); - let elapsed_since_update = now.duration_since(last_update_time).as_millis(); - - if elapsed_since_update >= UPDATE_INTERVAL_MS { - response_ctx.update(|r| { - r.elapsed = start.elapsed().as_millis() as i32; - r.content_length = Some(written_bytes as i32); - })?; - last_update_time = now; - } - } - Err(e) => { - return Err(GenericError(format!("Failed to read response body: {}", e))); - } - } - } - - // Final update with closed state and accurate byte count - response_ctx.update(|r| { - r.elapsed = start.elapsed().as_millis() as i32; - r.elapsed_dns = dns_elapsed.load(Ordering::SeqCst); - r.content_length = Some(written_bytes as i32); - r.state = HttpResponseState::Closed; - })?; - - // Clear the event sender from the resolver since this request is done - resolver.set_event_sender(None).await; - - Ok((response_ctx.response().clone(), maybe_blob_write_handle)) -} - -fn write_bytes_to_db_sync( - response_ctx: &mut ResponseContext, - body_id: &str, - data: Bytes, -) -> Result<()> { - if data.is_empty() { - return Ok(()); - } - - // Write in chunks if data is large - let mut offset = 0; - let mut chunk_index = 0; - while offset < data.len() { - let end = std::cmp::min(offset + REQUEST_BODY_CHUNK_SIZE, data.len()); - let chunk_data = data.slice(offset..end).to_vec(); - let chunk = BodyChunk::new(body_id, chunk_index, chunk_data); - response_ctx.app_handle.blobs().insert_chunk(&chunk)?; - offset = end; - chunk_index += 1; - } - - // Update the response with the total request body size - response_ctx.update(|r| { - r.request_content_length = Some(data.len() as i32); - })?; - - Ok(()) -} - -async fn write_stream_chunks_to_db( - app_handle: AppHandle, - body_id: &str, - workspace_id: &str, - response_id: &str, - update_source: &UpdateSource, - mut rx: tokio::sync::mpsc::UnboundedReceiver>, -) -> Result<()> { - let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE); - let mut chunk_index = 0; - let mut total_bytes: usize = 0; - - while let Some(data) = rx.recv().await { - total_bytes += data.len(); - buffer.extend_from_slice(&data); - - // Flush when buffer reaches chunk size - while buffer.len() >= REQUEST_BODY_CHUNK_SIZE { - debug!("Writing chunk {chunk_index} to DB"); - let chunk_data: Vec = buffer.drain(..REQUEST_BODY_CHUNK_SIZE).collect(); - let chunk = BodyChunk::new(body_id, chunk_index, chunk_data); - app_handle.blobs().insert_chunk(&chunk)?; - app_handle.db().upsert_http_response_event( - &HttpResponseEvent::new( - response_id, - workspace_id, - yaak_http::sender::HttpResponseEvent::ChunkSent { - bytes: REQUEST_BODY_CHUNK_SIZE, - } - .into(), - ), - update_source, - )?; - chunk_index += 1; - } - } - - // Flush remaining data - if !buffer.is_empty() { - let chunk = BodyChunk::new(body_id, chunk_index, buffer); - debug!("Flushing remaining data {chunk_index} {}", chunk.data.len()); - app_handle.blobs().insert_chunk(&chunk)?; - app_handle.db().upsert_http_response_event( - &HttpResponseEvent::new( - response_id, - workspace_id, - yaak_http::sender::HttpResponseEvent::ChunkSent { bytes: chunk.data.len() }.into(), - ), - update_source, - )?; - } - - // Update the response with the total request body size - app_handle.with_tx(|tx| { - debug!("Updating final body length {total_bytes}"); - if let Ok(mut response) = tx.get_http_response(&response_id) { - response.request_content_length = Some(total_bytes as i32); - tx.update_http_response_if_id(&response, update_source)?; - } - Ok(()) - })?; - - Ok(()) -} - -async fn apply_authentication( - _window: &WebviewWindow, - sendable_request: &mut SendableHttpRequest, - request: &HttpRequest, - auth_context_id: String, - plugin_manager: &PluginManager, - plugin_context: &PluginContext, -) -> Result<()> { - match &request.authentication_type { - None => { - // No authentication found. Not even inherited - } - Some(authentication_type) if authentication_type == "none" => { - // Explicitly no authentication - } - Some(authentication_type) => { - let req = CallHttpAuthenticationRequest { - context_id: format!("{:x}", md5::compute(auth_context_id)), - values: serde_json::from_value(serde_json::to_value(&request.authentication)?)?, - url: sendable_request.url.clone(), - method: sendable_request.method.clone(), - headers: sendable_request - .headers - .iter() - .map(|(name, value)| HttpHeader { - name: name.to_string(), - value: value.to_string(), - }) - .collect(), - }; - let plugin_result = plugin_manager - .call_http_authentication(plugin_context, &authentication_type, req) - .await?; - - for header in plugin_result.set_headers.unwrap_or_default() { - sendable_request.insert_header((header.name, header.value)); - } - - if let Some(params) = plugin_result.set_query_parameters { - let params = params.into_iter().map(|p| (p.name, p.value)).collect::>(); - sendable_request.url = append_query_params(&sendable_request.url, params); - } - } - } - Ok(()) -} diff --git a/crates/yaak/Cargo.toml b/crates/yaak/Cargo.toml index 51fdfca2..c5eb14fb 100644 --- a/crates/yaak/Cargo.toml +++ b/crates/yaak/Cargo.toml @@ -5,10 +5,15 @@ edition = "2024" publish = false [dependencies] +async-trait = "0.1" log = { workspace = true } +md5 = "0.8.0" serde_json = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true, features = ["sync", "rt"] } yaak-http = { workspace = true } +yaak-crypto = { workspace = true } yaak-models = { workspace = true } +yaak-plugins = { workspace = true } yaak-templates = { workspace = true } +yaak-tls = { workspace = true } diff --git a/crates/yaak/src/send.rs b/crates/yaak/src/send.rs index 9cf5be67..84e3f2ae 100644 --- a/crates/yaak/src/send.rs +++ b/crates/yaak/src/send.rs @@ -1,16 +1,37 @@ use crate::render::render_http_request; +use async_trait::async_trait; use log::warn; use std::path::{Path, PathBuf}; +use std::sync::Arc; use std::time::Instant; use thiserror::Error; use tokio::sync::mpsc; -use yaak_http::sender::{HttpResponseEvent as SenderHttpResponseEvent, HttpSender, ReqwestSender}; -use yaak_http::types::{SendableBody, SendableHttpRequest, SendableHttpRequestOptions}; +use tokio::sync::watch; +use yaak_crypto::manager::EncryptionManager; +use yaak_http::client::{ + HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth, +}; +use yaak_http::cookies::CookieStore; +use yaak_http::manager::HttpConnectionManager; +use yaak_http::sender::{HttpResponseEvent as SenderHttpResponseEvent, ReqwestSender}; +use yaak_http::transaction::HttpTransaction; +use yaak_http::types::{ + SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params, +}; use yaak_models::blob_manager::BlobManager; -use yaak_models::models::{HttpResponse, HttpResponseEvent, HttpResponseHeader, HttpResponseState}; +use yaak_models::models::{ + ClientCertificate, CookieJar, DnsOverride, Environment, HttpRequest, HttpResponse, + HttpResponseEvent, HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth, +}; use yaak_models::query_manager::QueryManager; use yaak_models::util::UpdateSource; +use yaak_plugins::events::{ + CallHttpAuthenticationRequest, HttpHeader, PluginContext, RenderPurpose, +}; +use yaak_plugins::manager::PluginManager; +use yaak_plugins::template_callback::PluginTemplateCallback; use yaak_templates::{RenderOptions, TemplateCallback}; +use yaak_tls::find_client_certificate; const HTTP_EVENT_CHANNEL_CAPACITY: usize = 100; @@ -19,15 +40,27 @@ pub enum SendHttpRequestError { #[error("Failed to load request: {0}")] LoadRequest(#[source] yaak_models::error::Error), + #[error("Failed to load workspace: {0}")] + LoadWorkspace(#[source] yaak_models::error::Error), + #[error("Failed to resolve environments: {0}")] ResolveEnvironments(#[source] yaak_models::error::Error), #[error("Failed to resolve inherited request settings: {0}")] ResolveRequestInheritance(#[source] yaak_models::error::Error), + #[error("Failed to load cookie jar: {0}")] + LoadCookieJar(#[source] yaak_models::error::Error), + + #[error("Failed to persist cookie jar: {0}")] + PersistCookieJar(#[source] yaak_models::error::Error), + #[error("Failed to render request templates: {0}")] RenderRequest(#[source] yaak_templates::error::Error), + #[error("Failed to prepare request before send: {0}")] + PrepareSendableRequest(String), + #[error("Failed to persist response metadata: {0}")] PersistResponse(#[source] yaak_models::error::Error), @@ -60,51 +93,370 @@ pub enum SendHttpRequestError { pub type Result = std::result::Result; +#[async_trait] +pub trait PrepareSendableRequest: Send + Sync { + async fn prepare_sendable_request( + &self, + rendered_request: &HttpRequest, + auth_context_id: &str, + sendable_request: &mut SendableHttpRequest, + ) -> std::result::Result<(), String>; +} + +#[async_trait] +pub trait SendRequestExecutor: Send + Sync { + async fn send( + &self, + sendable_request: SendableHttpRequest, + event_tx: mpsc::Sender, + cookie_store: Option, + ) -> yaak_http::error::Result; +} + +struct DefaultSendRequestExecutor; + +#[async_trait] +impl SendRequestExecutor for DefaultSendRequestExecutor { + async fn send( + &self, + sendable_request: SendableHttpRequest, + event_tx: mpsc::Sender, + cookie_store: Option, + ) -> yaak_http::error::Result { + let sender = ReqwestSender::new()?; + let transaction = match cookie_store { + Some(store) => HttpTransaction::with_cookie_store(sender, store), + None => HttpTransaction::new(sender), + }; + let (_cancel_tx, cancel_rx) = watch::channel(false); + transaction.execute_with_cancellation(sendable_request, cancel_rx, event_tx).await + } +} + +struct PluginPrepareSendableRequest { + plugin_manager: Arc, + plugin_context: PluginContext, + cancelled_rx: Option>, +} + +#[async_trait] +impl PrepareSendableRequest for PluginPrepareSendableRequest { + async fn prepare_sendable_request( + &self, + rendered_request: &HttpRequest, + auth_context_id: &str, + sendable_request: &mut SendableHttpRequest, + ) -> std::result::Result<(), String> { + if let Some(cancelled_rx) = &self.cancelled_rx { + let mut cancelled_rx = cancelled_rx.clone(); + tokio::select! { + result = apply_plugin_authentication( + sendable_request, + rendered_request, + auth_context_id, + &self.plugin_manager, + &self.plugin_context, + ) => result, + _ = cancelled_rx.changed() => Err("Request canceled".to_string()), + } + } else { + apply_plugin_authentication( + sendable_request, + rendered_request, + auth_context_id, + &self.plugin_manager, + &self.plugin_context, + ) + .await + } + } +} + +struct ConnectionManagerSendRequestExecutor<'a> { + connection_manager: &'a HttpConnectionManager, + plugin_context_id: String, + query_manager: QueryManager, + workspace_id: String, + cancelled_rx: Option>, +} + +#[async_trait] +impl SendRequestExecutor for ConnectionManagerSendRequestExecutor<'_> { + async fn send( + &self, + sendable_request: SendableHttpRequest, + event_tx: mpsc::Sender, + cookie_store: Option, + ) -> yaak_http::error::Result { + let runtime_config = + resolve_http_send_runtime_config(&self.query_manager, &self.workspace_id) + .map_err(|e| yaak_http::error::Error::RequestError(e.to_string()))?; + let client_certificate = + find_client_certificate(&sendable_request.url, &runtime_config.client_certificates); + let cached_client = self + .connection_manager + .get_client(&HttpConnectionOptions { + id: self.plugin_context_id.clone(), + validate_certificates: runtime_config.validate_certificates, + proxy: runtime_config.proxy, + client_certificate, + dns_overrides: runtime_config.dns_overrides, + }) + .await?; + + cached_client.resolver.set_event_sender(Some(event_tx.clone())).await; + + let sender = ReqwestSender::with_client(cached_client.client); + let transaction = match cookie_store { + Some(cs) => HttpTransaction::with_cookie_store(sender, cs), + None => HttpTransaction::new(sender), + }; + + let result = if let Some(cancelled_rx) = self.cancelled_rx.clone() { + transaction.execute_with_cancellation(sendable_request, cancelled_rx, event_tx).await + } else { + let (_cancel_tx, cancel_rx) = watch::channel(false); + transaction.execute_with_cancellation(sendable_request, cancel_rx, event_tx).await + }; + cached_client.resolver.set_event_sender(None).await; + result + } +} + pub struct SendHttpRequestByIdParams<'a, T: TemplateCallback> { pub query_manager: &'a QueryManager, pub blob_manager: &'a BlobManager, pub request_id: &'a str, pub environment_id: Option<&'a str>, pub template_callback: &'a T, - pub send_options: SendableHttpRequestOptions, pub update_source: UpdateSource, + pub cookie_jar_id: Option, + pub cookie_jar_update_source: UpdateSource, pub response_dir: &'a Path, pub persist_events: bool, pub emit_events_to: Option>, + pub prepare_sendable_request: Option<&'a dyn PrepareSendableRequest>, + pub executor: Option<&'a dyn SendRequestExecutor>, +} + +pub struct SendHttpRequestParams<'a, T: TemplateCallback> { + pub query_manager: &'a QueryManager, + pub blob_manager: &'a BlobManager, + pub request: HttpRequest, + pub environment_id: Option<&'a str>, + pub template_callback: &'a T, + pub send_options: Option, + pub update_source: UpdateSource, + pub cookie_jar_id: Option, + pub cookie_jar_update_source: UpdateSource, + pub response_dir: &'a Path, + pub persist_events: bool, + pub emit_events_to: Option>, + pub auth_context_id: Option, + pub existing_response: Option, + pub prepare_sendable_request: Option<&'a dyn PrepareSendableRequest>, + pub executor: Option<&'a dyn SendRequestExecutor>, +} + +pub struct SendHttpRequestWithPluginsParams<'a> { + pub query_manager: &'a QueryManager, + pub blob_manager: &'a BlobManager, + pub request: HttpRequest, + pub environment_id: Option<&'a str>, + pub update_source: UpdateSource, + pub cookie_jar_id: Option, + pub cookie_jar_update_source: UpdateSource, + pub response_dir: &'a Path, + pub persist_events: bool, + pub emit_events_to: Option>, + pub existing_response: Option, + pub plugin_manager: Arc, + pub encryption_manager: Arc, + pub plugin_context: &'a PluginContext, + pub cancelled_rx: Option>, + pub connection_manager: Option<&'a HttpConnectionManager>, +} + +pub struct SendHttpRequestByIdWithPluginsParams<'a> { + pub query_manager: &'a QueryManager, + pub blob_manager: &'a BlobManager, + pub request_id: &'a str, + pub environment_id: Option<&'a str>, + pub update_source: UpdateSource, + pub cookie_jar_id: Option, + pub cookie_jar_update_source: UpdateSource, + pub response_dir: &'a Path, + pub persist_events: bool, + pub emit_events_to: Option>, + pub plugin_manager: Arc, + pub encryption_manager: Arc, + pub plugin_context: &'a PluginContext, + pub cancelled_rx: Option>, + pub connection_manager: Option<&'a HttpConnectionManager>, } pub struct SendHttpRequestResult { - pub rendered_request: yaak_models::models::HttpRequest, + pub rendered_request: HttpRequest, pub response: HttpResponse, pub response_body: Vec, } +pub struct HttpSendRuntimeConfig { + pub send_options: SendableHttpRequestOptions, + pub validate_certificates: bool, + pub proxy: HttpConnectionProxySetting, + pub dns_overrides: Vec, + pub client_certificates: Vec, +} + +pub fn resolve_http_send_runtime_config( + query_manager: &QueryManager, + workspace_id: &str, +) -> Result { + let db = query_manager.connect(); + let workspace = db.get_workspace(workspace_id).map_err(SendHttpRequestError::LoadWorkspace)?; + let settings = db.get_settings(); + + Ok(HttpSendRuntimeConfig { + send_options: SendableHttpRequestOptions { + follow_redirects: workspace.setting_follow_redirects, + timeout: if workspace.setting_request_timeout > 0 { + Some(std::time::Duration::from_millis( + workspace.setting_request_timeout.unsigned_abs() as u64, + )) + } else { + None + }, + }, + validate_certificates: workspace.setting_validate_certificates, + proxy: proxy_setting_from_settings(settings.proxy), + dns_overrides: workspace.setting_dns_overrides, + client_certificates: settings.client_certificates, + }) +} + +pub async fn send_http_request_by_id_with_plugins( + params: SendHttpRequestByIdWithPluginsParams<'_>, +) -> Result { + let request = params + .query_manager + .connect() + .get_http_request(params.request_id) + .map_err(SendHttpRequestError::LoadRequest)?; + + send_http_request_with_plugins(SendHttpRequestWithPluginsParams { + query_manager: params.query_manager, + blob_manager: params.blob_manager, + request, + environment_id: params.environment_id, + update_source: params.update_source, + cookie_jar_id: params.cookie_jar_id, + cookie_jar_update_source: params.cookie_jar_update_source, + response_dir: params.response_dir, + persist_events: params.persist_events, + emit_events_to: params.emit_events_to, + existing_response: None, + plugin_manager: params.plugin_manager, + encryption_manager: params.encryption_manager, + plugin_context: params.plugin_context, + cancelled_rx: params.cancelled_rx, + connection_manager: params.connection_manager, + }) + .await +} + +pub async fn send_http_request_with_plugins( + params: SendHttpRequestWithPluginsParams<'_>, +) -> Result { + let template_callback = PluginTemplateCallback::new( + params.plugin_manager.clone(), + params.encryption_manager.clone(), + params.plugin_context, + RenderPurpose::Send, + ); + let auth_hook = PluginPrepareSendableRequest { + plugin_manager: params.plugin_manager, + plugin_context: params.plugin_context.clone(), + cancelled_rx: params.cancelled_rx.clone(), + }; + let executor = + params.connection_manager.map(|connection_manager| ConnectionManagerSendRequestExecutor { + connection_manager, + plugin_context_id: params.plugin_context.id.clone(), + query_manager: params.query_manager.clone(), + workspace_id: params.request.workspace_id.clone(), + cancelled_rx: params.cancelled_rx.clone(), + }); + + send_http_request(SendHttpRequestParams { + query_manager: params.query_manager, + blob_manager: params.blob_manager, + request: params.request, + environment_id: params.environment_id, + template_callback: &template_callback, + send_options: None, + update_source: params.update_source, + cookie_jar_id: params.cookie_jar_id, + cookie_jar_update_source: params.cookie_jar_update_source, + response_dir: params.response_dir, + persist_events: params.persist_events, + emit_events_to: params.emit_events_to, + auth_context_id: None, + existing_response: params.existing_response, + prepare_sendable_request: Some(&auth_hook), + executor: executor.as_ref().map(|e| e as &dyn SendRequestExecutor), + }) + .await +} + pub async fn send_http_request_by_id( params: SendHttpRequestByIdParams<'_, T>, ) -> Result { - let db = params.query_manager.connect(); - let request = - db.get_http_request(params.request_id).map_err(SendHttpRequestError::LoadRequest)?; - let environment_chain = db - .resolve_environments( - &request.workspace_id, - request.folder_id.as_deref(), - params.environment_id, - ) - .map_err(SendHttpRequestError::ResolveEnvironments)?; + let request = params + .query_manager + .connect() + .get_http_request(params.request_id) + .map_err(SendHttpRequestError::LoadRequest)?; + let (request, auth_context_id) = resolve_inherited_request(params.query_manager, &request)?; - let (authentication_type, authentication, _auth_context_id) = db - .resolve_auth_for_http_request(&request) - .map_err(SendHttpRequestError::ResolveRequestInheritance)?; - let resolved_headers = db - .resolve_headers_for_http_request(&request) - .map_err(SendHttpRequestError::ResolveRequestInheritance)?; - drop(db); + send_http_request(SendHttpRequestParams { + query_manager: params.query_manager, + blob_manager: params.blob_manager, + request, + environment_id: params.environment_id, + template_callback: params.template_callback, + send_options: None, + update_source: params.update_source, + cookie_jar_id: params.cookie_jar_id, + cookie_jar_update_source: params.cookie_jar_update_source, + response_dir: params.response_dir, + persist_events: params.persist_events, + emit_events_to: params.emit_events_to, + existing_response: None, + prepare_sendable_request: params.prepare_sendable_request, + executor: params.executor, + auth_context_id: Some(auth_context_id), + }) + .await +} - let mut resolved_request = request.clone(); - resolved_request.authentication_type = authentication_type; - resolved_request.authentication = authentication; - resolved_request.headers = resolved_headers; +pub async fn send_http_request( + params: SendHttpRequestParams<'_, T>, +) -> Result { + let environment_chain = + resolve_environment_chain(params.query_manager, ¶ms.request, params.environment_id)?; + let (resolved_request, auth_context_id) = + if let Some(auth_context_id) = params.auth_context_id.clone() { + (params.request.clone(), auth_context_id) + } else { + resolve_inherited_request(params.query_manager, ¶ms.request)? + }; + let runtime_config = + resolve_http_send_runtime_config(params.query_manager, ¶ms.request.workspace_id)?; + let send_options = params.send_options.unwrap_or(runtime_config.send_options); + let mut cookie_jar = load_cookie_jar(params.query_manager, params.cookie_jar_id.as_deref())?; + let cookie_store = + cookie_jar.as_ref().map(|jar| CookieStore::from_cookies(jar.cookies.clone())); let rendered_request = render_http_request( &resolved_request, @@ -115,41 +467,52 @@ pub async fn send_http_request_by_id( .await .map_err(SendHttpRequestError::RenderRequest)?; - let sendable_request = - SendableHttpRequest::from_http_request(&rendered_request, params.send_options) + let mut sendable_request = + SendableHttpRequest::from_http_request(&rendered_request, send_options) .await .map_err(SendHttpRequestError::BuildSendableRequest)?; - let request_content_length = sendable_body_length(sendable_request.body.as_ref()); - let mut response = params + if let Some(hook) = params.prepare_sendable_request { + hook.prepare_sendable_request(&rendered_request, &auth_context_id, &mut sendable_request) + .await + .map_err(SendHttpRequestError::PrepareSendableRequest)?; + } + + let request_content_length = sendable_body_length(sendable_request.body.as_ref()); + let mut response = params.existing_response.unwrap_or_default(); + response.request_id = params.request.id.clone(); + response.workspace_id = params.request.workspace_id.clone(); + response.request_content_length = request_content_length; + response.request_headers = sendable_request + .headers + .iter() + .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() }) + .collect(); + response.url = sendable_request.url.clone(); + response.state = HttpResponseState::Initialized; + response.error = None; + response.content_length = None; + response.content_length_compressed = None; + response.body_path = None; + response.status = 0; + response.status_reason = None; + response.headers = Vec::new(); + response.remote_addr = None; + response.version = None; + response.elapsed = 0; + response.elapsed_headers = 0; + response.elapsed_dns = 0; + response = params .query_manager .connect() - .upsert_http_response( - &HttpResponse { - request_id: request.id.clone(), - workspace_id: request.workspace_id.clone(), - request_content_length, - request_headers: sendable_request - .headers - .iter() - .map(|(name, value)| HttpResponseHeader { - name: name.clone(), - value: value.clone(), - }) - .collect(), - url: sendable_request.url.clone(), - ..Default::default() - }, - ¶ms.update_source, - params.blob_manager, - ) + .upsert_http_response(&response, ¶ms.update_source, params.blob_manager) .map_err(SendHttpRequestError::PersistResponse)?; let (event_tx, mut event_rx) = mpsc::channel::(HTTP_EVENT_CHANNEL_CAPACITY); let event_query_manager = params.query_manager.clone(); let event_response_id = response.id.clone(); - let event_workspace_id = request.workspace_id.clone(); + let event_workspace_id = params.request.workspace_id.clone(); let event_update_source = params.update_source.clone(); let emit_events_to = params.emit_events_to.clone(); let persist_events = params.persist_events; @@ -175,34 +538,33 @@ pub async fn send_http_request_by_id( } }); - let sender = ReqwestSender::new().map_err(SendHttpRequestError::CreateHttpClient)?; + let default_executor = DefaultSendRequestExecutor; + let executor = params.executor.unwrap_or(&default_executor); let started_at = Instant::now(); let request_started_url = sendable_request.url.clone(); - let http_response = match sender.send(sendable_request, event_tx).await { + let http_response = match executor.send(sendable_request, event_tx, cookie_store.clone()).await + { Ok(response) => response, Err(err) => { - let _ = params - .query_manager - .connect() - .upsert_http_response( - &HttpResponse { - state: HttpResponseState::Closed, - elapsed: duration_to_i32(started_at.elapsed()), - elapsed_headers: duration_to_i32(started_at.elapsed()), - error: Some(err.to_string()), - url: request_started_url, - ..response - }, - ¶ms.update_source, - params.blob_manager, - ) - .map_err(SendHttpRequestError::PersistResponse)?; - + persist_cookie_jar( + params.query_manager, + cookie_jar.as_mut(), + cookie_store.as_ref(), + ¶ms.cookie_jar_update_source, + )?; + let _ = persist_response_error( + params.query_manager, + params.blob_manager, + ¶ms.update_source, + &response, + started_at, + err.to_string(), + request_started_url, + ); if let Err(join_err) = event_handle.await { warn!("Failed to join response event task: {}", join_err); } - return Err(SendHttpRequestError::SendRequest(err)); } }; @@ -279,10 +641,179 @@ pub async fn send_http_request_by_id( if let Err(join_err) = event_handle.await { warn!("Failed to join response event task: {}", join_err); } + persist_cookie_jar( + params.query_manager, + cookie_jar.as_mut(), + cookie_store.as_ref(), + ¶ms.cookie_jar_update_source, + )?; Ok(SendHttpRequestResult { rendered_request, response, response_body }) } +fn resolve_environment_chain( + query_manager: &QueryManager, + request: &HttpRequest, + environment_id: Option<&str>, +) -> Result> { + let db = query_manager.connect(); + db.resolve_environments(&request.workspace_id, request.folder_id.as_deref(), environment_id) + .map_err(SendHttpRequestError::ResolveEnvironments) +} + +fn resolve_inherited_request( + query_manager: &QueryManager, + request: &HttpRequest, +) -> Result<(HttpRequest, String)> { + let db = query_manager.connect(); + let (authentication_type, authentication, auth_context_id) = db + .resolve_auth_for_http_request(request) + .map_err(SendHttpRequestError::ResolveRequestInheritance)?; + let resolved_headers = db + .resolve_headers_for_http_request(request) + .map_err(SendHttpRequestError::ResolveRequestInheritance)?; + + let mut request = request.clone(); + request.authentication_type = authentication_type; + request.authentication = authentication; + request.headers = resolved_headers; + + Ok((request, auth_context_id)) +} + +fn load_cookie_jar( + query_manager: &QueryManager, + cookie_jar_id: Option<&str>, +) -> Result> { + let Some(cookie_jar_id) = cookie_jar_id else { + return Ok(None); + }; + + query_manager + .connect() + .get_cookie_jar(cookie_jar_id) + .map(Some) + .map_err(SendHttpRequestError::LoadCookieJar) +} + +fn persist_cookie_jar( + query_manager: &QueryManager, + cookie_jar: Option<&mut CookieJar>, + cookie_store: Option<&CookieStore>, + source: &UpdateSource, +) -> Result<()> { + match (cookie_jar, cookie_store) { + (Some(cookie_jar), Some(cookie_store)) => { + cookie_jar.cookies = cookie_store.get_all_cookies(); + query_manager + .connect() + .upsert_cookie_jar(cookie_jar, source) + .map_err(SendHttpRequestError::PersistCookieJar)?; + Ok(()) + } + _ => Ok(()), + } +} + +fn proxy_setting_from_settings(proxy: Option) -> HttpConnectionProxySetting { + match proxy { + None => HttpConnectionProxySetting::System, + Some(ProxySetting::Disabled) => HttpConnectionProxySetting::Disabled, + Some(ProxySetting::Enabled { http, https, auth, bypass, disabled }) => { + if disabled { + HttpConnectionProxySetting::System + } else { + HttpConnectionProxySetting::Enabled { + http, + https, + bypass, + auth: auth.map(|ProxySettingAuth { user, password }| { + HttpConnectionProxySettingAuth { user, password } + }), + } + } + } + } +} + +pub async fn apply_plugin_authentication( + sendable_request: &mut SendableHttpRequest, + request: &HttpRequest, + auth_context_id: &str, + plugin_manager: &PluginManager, + plugin_context: &PluginContext, +) -> std::result::Result<(), String> { + match &request.authentication_type { + None => {} + Some(authentication_type) if authentication_type == "none" => {} + Some(authentication_type) => { + let req = CallHttpAuthenticationRequest { + context_id: format!("{:x}", md5::compute(auth_context_id)), + values: serde_json::from_value( + serde_json::to_value(&request.authentication) + .map_err(|e| format!("Failed to serialize auth values: {e}"))?, + ) + .map_err(|e| format!("Failed to parse auth values: {e}"))?, + url: sendable_request.url.clone(), + method: sendable_request.method.clone(), + headers: sendable_request + .headers + .iter() + .map(|(name, value)| HttpHeader { + name: name.to_string(), + value: value.to_string(), + }) + .collect(), + }; + let plugin_result = plugin_manager + .call_http_authentication(plugin_context, authentication_type, req) + .await + .map_err(|e| format!("Failed to apply authentication plugin: {e}"))?; + + for header in plugin_result.set_headers.unwrap_or_default() { + sendable_request.insert_header((header.name, header.value)); + } + + if let Some(params) = plugin_result.set_query_parameters { + let params = params.into_iter().map(|p| (p.name, p.value)).collect::>(); + sendable_request.url = append_query_params(&sendable_request.url, params); + } + } + } + Ok(()) +} + +fn persist_response_error( + query_manager: &QueryManager, + blob_manager: &BlobManager, + update_source: &UpdateSource, + response: &HttpResponse, + started_at: Instant, + error: String, + fallback_url: String, +) -> Result { + let elapsed = duration_to_i32(started_at.elapsed()); + query_manager + .connect() + .upsert_http_response( + &HttpResponse { + state: HttpResponseState::Closed, + elapsed, + elapsed_headers: if response.elapsed_headers == 0 { + elapsed + } else { + response.elapsed_headers + }, + error: Some(error), + url: if response.url.is_empty() { fallback_url } else { response.url.clone() }, + ..response.clone() + }, + update_source, + blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse) +} + fn sendable_body_length(body: Option<&SendableBody>) -> Option { match body { Some(SendableBody::Bytes(bytes)) => Some(usize_to_i32(bytes.len())),