diff --git a/Cargo.lock b/Cargo.lock index aa08505f..d0d3b514 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8239,6 +8239,19 @@ dependencies = [ "rustix 1.0.7", ] +[[package]] +name = "yaak" +version = "0.1.0" +dependencies = [ + "log 0.4.29", + "serde_json", + "thiserror 2.0.17", + "tokio", + "yaak-http", + "yaak-models", + "yaak-templates", +] + [[package]] name = "yaak-api" version = "0.1.0" @@ -8290,6 +8303,7 @@ dependencies = [ "ts-rs", "url", "uuid", + "yaak", "yaak-api", "yaak-common", "yaak-core", @@ -8324,6 +8338,7 @@ dependencies = [ "serde_json", "tempfile", "tokio", + "yaak", "yaak-crypto", "yaak-http", "yaak-models", diff --git a/Cargo.toml b/Cargo.toml index 34cb3ee7..4b625794 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,7 @@ [workspace] resolver = "2" members = [ + "crates/yaak", # Shared crates (no Tauri dependency) "crates/yaak-core", "crates/yaak-common", @@ -47,6 +48,7 @@ ts-rs = "11.1.0" # Internal crates - shared yaak-core = { path = "crates/yaak-core" } +yaak = { path = "crates/yaak" } yaak-common = { path = "crates/yaak-common" } yaak-crypto = { path = "crates/yaak-crypto" } yaak-git = { path = "crates/yaak-git" } diff --git a/crates-cli/yaak-cli/Cargo.toml b/crates-cli/yaak-cli/Cargo.toml index 919c0d60..c13011e3 100644 --- a/crates-cli/yaak-cli/Cargo.toml +++ b/crates-cli/yaak-cli/Cargo.toml @@ -16,6 +16,7 @@ log = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +yaak = { workspace = true } yaak-crypto = { workspace = true } yaak-http = { workspace = true } yaak-models = { workspace = true } diff --git a/crates-cli/yaak-cli/src/commands/request.rs b/crates-cli/yaak-cli/src/commands/request.rs index acf1a078..f5682c5c 100644 --- a/crates-cli/yaak-cli/src/commands/request.rs +++ b/crates-cli/yaak-cli/src/commands/request.rs @@ -5,19 +5,13 @@ use crate::commands::json::{ validate_create_id, }; use crate::context::CliContext; -use log::info; -use serde_json::Value; -use std::collections::BTreeMap; use tokio::sync::mpsc; -use yaak_http::path_placeholders::apply_path_placeholders; -use yaak_http::sender::{HttpSender, ReqwestSender}; -use yaak_http::types::{SendableHttpRequest, SendableHttpRequestOptions}; -use yaak_models::models::{Environment, HttpRequest, HttpRequestHeader, HttpUrlParameter}; -use yaak_models::render::make_vars_hashmap; +use yaak::send::{SendHttpRequestByIdParams, send_http_request_by_id}; +use yaak_http::types::SendableHttpRequestOptions; +use yaak_models::models::HttpRequest; use yaak_models::util::UpdateSource; use yaak_plugins::events::{PluginContext, RenderPurpose}; use yaak_plugins::template_callback::PluginTemplateCallback; -use yaak_templates::{RenderOptions, parse_and_render, render_json_value_raw}; pub async fn run( ctx: &CliContext, @@ -179,11 +173,6 @@ pub async fn send_request_by_id( let request = ctx.db().get_http_request(request_id).map_err(|e| format!("Failed to get request: {e}"))?; - let environment_chain = ctx - .db() - .resolve_environments(&request.workspace_id, request.folder_id.as_deref(), environment) - .map_err(|e| format!("Failed to resolve environments: {e}"))?; - let plugin_context = PluginContext::new(None, Some(request.workspace_id.clone())); let template_callback = PluginTemplateCallback::new( ctx.plugin_manager(), @@ -192,147 +181,49 @@ pub async fn send_request_by_id( RenderPurpose::Send, ); - let rendered_request = render_http_request( - &request, - environment_chain, - &template_callback, - &RenderOptions::throw(), - ) - .await - .map_err(|e| format!("Failed to render request templates: {e}"))?; - - if verbose { - println!("> {} {}", rendered_request.method, rendered_request.url); - } - - let sendable = SendableHttpRequest::from_http_request( - &rendered_request, - SendableHttpRequestOptions::default(), - ) - .await - .map_err(|e| format!("Failed to build request: {e}"))?; - let (event_tx, mut event_rx) = mpsc::channel(100); - - let verbose_handle = if verbose { - Some(tokio::spawn(async move { - while let Some(event) = event_rx.recv().await { + let event_handle = tokio::spawn(async move { + while let Some(event) = event_rx.recv().await { + if verbose { println!("{}", event); } - })) - } else { - tokio::spawn(async move { while event_rx.recv().await.is_some() {} }); - None - }; + } + }); + let response_dir = ctx.data_dir().join("responses"); - let sender = ReqwestSender::new().map_err(|e| format!("Failed to create HTTP client: {e}"))?; - let response = sender - .send(sendable, event_tx) - .await - .map_err(|e| format!("Failed to send request: {e}"))?; + let result = send_http_request_by_id(SendHttpRequestByIdParams { + query_manager: ctx.query_manager(), + blob_manager: ctx.blob_manager(), + request_id, + environment_id: environment, + template_callback: &template_callback, + send_options: SendableHttpRequestOptions::default(), + update_source: UpdateSource::Sync, + response_dir: &response_dir, + persist_events: true, + emit_events_to: Some(event_tx), + }) + .await; - if let Some(handle) = verbose_handle { - let _ = handle.await; - } + let _ = event_handle.await; + let result = result.map_err(|e| e.to_string())?; if verbose { println!(); } - println!("HTTP {} {}", response.status, response.status_reason.as_deref().unwrap_or("")); - + println!( + "HTTP {} {}", + result.response.status, + result.response.status_reason.as_deref().unwrap_or("") + ); if verbose { - for (name, value) in &response.headers { - println!("{}: {}", name, value); + for header in &result.response.headers { + println!("{}: {}", header.name, header.value); } println!(); } - - let (body, _stats) = - response.text().await.map_err(|e| format!("Failed to read response body: {e}"))?; + let body = String::from_utf8(result.response_body) + .map_err(|e| format!("Failed to read response body: {e}"))?; println!("{}", body); Ok(()) } - -/// Render an HTTP request with template variables and plugin functions. -async fn render_http_request( - request: &HttpRequest, - environment_chain: Vec, - callback: &PluginTemplateCallback, - options: &RenderOptions, -) -> yaak_templates::error::Result { - let vars = &make_vars_hashmap(environment_chain); - - let mut url_parameters = Vec::new(); - for parameter in request.url_parameters.clone() { - if !parameter.enabled { - continue; - } - - url_parameters.push(HttpUrlParameter { - enabled: parameter.enabled, - name: parse_and_render(parameter.name.as_str(), vars, callback, options).await?, - value: parse_and_render(parameter.value.as_str(), vars, callback, options).await?, - id: parameter.id, - }) - } - - let mut headers = Vec::new(); - for header in request.headers.clone() { - if !header.enabled { - continue; - } - - headers.push(HttpRequestHeader { - enabled: header.enabled, - name: parse_and_render(header.name.as_str(), vars, callback, options).await?, - value: parse_and_render(header.value.as_str(), vars, callback, options).await?, - id: header.id, - }) - } - - let mut body = BTreeMap::new(); - for (key, value) in request.body.clone() { - body.insert(key, render_json_value_raw(value, vars, callback, options).await?); - } - - let authentication = { - let mut disabled = false; - let mut auth = BTreeMap::new(); - - match request.authentication.get("disabled") { - Some(Value::Bool(true)) => { - disabled = true; - } - Some(Value::String(template)) => { - disabled = parse_and_render(template.as_str(), vars, callback, options) - .await - .unwrap_or_default() - .is_empty(); - info!( - "Rendering authentication.disabled as a template: {disabled} from \"{template}\"" - ); - } - _ => {} - } - - if disabled { - auth.insert("disabled".to_string(), Value::Bool(true)); - } else { - for (key, value) in request.authentication.clone() { - if key == "disabled" { - auth.insert(key, Value::Bool(false)); - } else { - auth.insert(key, render_json_value_raw(value, vars, callback, options).await?); - } - } - } - - auth - }; - - let url = parse_and_render(request.url.clone().as_str(), vars, callback, options).await?; - - let (url, url_parameters) = apply_path_placeholders(&url, &url_parameters); - - Ok(HttpRequest { url, url_parameters, headers, body, authentication, ..request.to_owned() }) -} diff --git a/crates-cli/yaak-cli/src/context.rs b/crates-cli/yaak-cli/src/context.rs index 471e962e..2002bb33 100644 --- a/crates-cli/yaak-cli/src/context.rs +++ b/crates-cli/yaak-cli/src/context.rs @@ -1,13 +1,16 @@ -use std::path::PathBuf; +use std::path::{Path, PathBuf}; use std::sync::Arc; use yaak_crypto::manager::EncryptionManager; +use yaak_models::blob_manager::BlobManager; use yaak_models::db_context::DbContext; use yaak_models::query_manager::QueryManager; use yaak_plugins::events::PluginContext; use yaak_plugins::manager::PluginManager; pub struct CliContext { + data_dir: PathBuf, query_manager: QueryManager, + blob_manager: BlobManager, pub encryption_manager: Arc, plugin_manager: Option>, } @@ -17,9 +20,8 @@ impl CliContext { let db_path = data_dir.join("db.sqlite"); let blob_path = data_dir.join("blobs.sqlite"); - let (query_manager, _blob_manager, _rx) = - yaak_models::init_standalone(&db_path, &blob_path) - .expect("Failed to initialize database"); + let (query_manager, blob_manager, _rx) = yaak_models::init_standalone(&db_path, &blob_path) + .expect("Failed to initialize database"); let encryption_manager = Arc::new(EncryptionManager::new(query_manager.clone(), app_id)); @@ -63,13 +65,25 @@ impl CliContext { None }; - Self { query_manager, encryption_manager, plugin_manager } + Self { data_dir, query_manager, blob_manager, encryption_manager, plugin_manager } + } + + pub fn data_dir(&self) -> &Path { + &self.data_dir } pub fn db(&self) -> DbContext<'_> { self.query_manager.connect() } + pub fn query_manager(&self) -> &QueryManager { + &self.query_manager + } + + pub fn blob_manager(&self) -> &BlobManager { + &self.blob_manager + } + pub fn plugin_manager(&self) -> Arc { self.plugin_manager.clone().expect("Plugin manager was not initialized for this command") } diff --git a/crates-cli/yaak-cli/tests/common/http_server.rs b/crates-cli/yaak-cli/tests/common/http_server.rs new file mode 100644 index 00000000..5a004d32 --- /dev/null +++ b/crates-cli/yaak-cli/tests/common/http_server.rs @@ -0,0 +1,42 @@ +use std::io::{Read, Write}; +use std::net::TcpListener; +use std::thread; + +pub struct TestHttpServer { + pub url: String, + handle: Option>, +} + +impl TestHttpServer { + pub fn spawn_ok(body: &'static str) -> Self { + let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind test HTTP server"); + let addr = listener.local_addr().expect("Failed to get local addr"); + let url = format!("http://{addr}/test"); + let body_bytes = body.as_bytes().to_vec(); + + let handle = thread::spawn(move || { + if let Ok((mut stream, _)) = listener.accept() { + let mut request_buf = [0u8; 4096]; + let _ = stream.read(&mut request_buf); + + let response = format!( + "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\nContent-Length: {}\r\nConnection: close\r\n\r\n", + body_bytes.len() + ); + let _ = stream.write_all(response.as_bytes()); + let _ = stream.write_all(&body_bytes); + let _ = stream.flush(); + } + }); + + Self { url, handle: Some(handle) } + } +} + +impl Drop for TestHttpServer { + fn drop(&mut self) { + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} diff --git a/crates-cli/yaak-cli/tests/common/mod.rs b/crates-cli/yaak-cli/tests/common/mod.rs index 61aab3b2..4043a81e 100644 --- a/crates-cli/yaak-cli/tests/common/mod.rs +++ b/crates-cli/yaak-cli/tests/common/mod.rs @@ -1,5 +1,7 @@ #![allow(dead_code)] +pub mod http_server; + use assert_cmd::Command; use assert_cmd::cargo::cargo_bin_cmd; use std::path::Path; diff --git a/crates-cli/yaak-cli/tests/request_commands.rs b/crates-cli/yaak-cli/tests/request_commands.rs index d2386a56..6de0e652 100644 --- a/crates-cli/yaak-cli/tests/request_commands.rs +++ b/crates-cli/yaak-cli/tests/request_commands.rs @@ -1,8 +1,10 @@ mod common; +use common::http_server::TestHttpServer; use common::{cli_cmd, parse_created_id, query_manager, seed_request, seed_workspace}; use predicates::str::contains; use tempfile::TempDir; +use yaak_models::models::HttpResponseState; #[test] fn show_and_delete_yes_round_trip() { @@ -105,3 +107,53 @@ fn update_requires_id_in_json_payload() { .failure() .stderr(contains("request update requires a non-empty \"id\" field")); } + +#[test] +fn request_send_persists_response_body_and_events() { + let temp_dir = TempDir::new().expect("Failed to create temp dir"); + let data_dir = temp_dir.path(); + seed_workspace(data_dir, "wk_test"); + + let server = TestHttpServer::spawn_ok("hello from integration test"); + + let create_assert = cli_cmd(data_dir) + .args([ + "request", + "create", + "wk_test", + "--name", + "Send Test", + "--url", + &server.url, + ]) + .assert() + .success(); + let request_id = parse_created_id(&create_assert.get_output().stdout, "request create"); + + cli_cmd(data_dir) + .args(["request", "send", &request_id]) + .assert() + .success() + .stdout(contains("HTTP 200 OK")) + .stdout(contains("hello from integration test")); + + let qm = query_manager(data_dir); + let db = qm.connect(); + let responses = + db.list_http_responses_for_request(&request_id, None).expect("Failed to load responses"); + assert_eq!(responses.len(), 1, "expected exactly one persisted response"); + + let response = &responses[0]; + assert_eq!(response.status, 200); + assert!(matches!(response.state, HttpResponseState::Closed)); + assert!(response.error.is_none()); + + let body_path = + response.body_path.as_ref().expect("expected persisted response body path").to_string(); + let body = std::fs::read_to_string(&body_path).expect("Failed to read response body file"); + assert_eq!(body, "hello from integration test"); + + let events = + db.list_http_response_events(&response.id).expect("Failed to load response events"); + assert!(!events.is_empty(), "expected at least one persisted response event"); +} diff --git a/crates-tauri/yaak-app/Cargo.toml b/crates-tauri/yaak-app/Cargo.toml index 96f05ba0..cf1e9113 100644 --- a/crates-tauri/yaak-app/Cargo.toml +++ b/crates-tauri/yaak-app/Cargo.toml @@ -61,6 +61,7 @@ yaak-api = { workspace = true } yaak-common = { workspace = true } yaak-tauri-utils = { workspace = true } yaak-core = { workspace = true } +yaak = { workspace = true } yaak-crypto = { workspace = true } yaak-fonts = { workspace = true } yaak-git = { workspace = true } diff --git a/crates-tauri/yaak-app/src/render.rs b/crates-tauri/yaak-app/src/render.rs index e63f525f..8cb3fd00 100644 --- a/crates-tauri/yaak-app/src/render.rs +++ b/crates-tauri/yaak-app/src/render.rs @@ -1,10 +1,8 @@ use log::info; use serde_json::Value; use std::collections::BTreeMap; -use yaak_http::path_placeholders::apply_path_placeholders; -use yaak_models::models::{ - Environment, GrpcRequest, HttpRequest, HttpRequestHeader, HttpUrlParameter, -}; +pub use yaak::render::render_http_request; +use yaak_models::models::{Environment, GrpcRequest, HttpRequestHeader}; use yaak_models::render::make_vars_hashmap; use yaak_templates::{RenderOptions, TemplateCallback, parse_and_render, render_json_value_raw}; @@ -85,151 +83,3 @@ pub async fn render_grpc_request( Ok(GrpcRequest { url, metadata, authentication, ..r.to_owned() }) } - -pub async fn render_http_request( - r: &HttpRequest, - environment_chain: Vec, - cb: &T, - opt: &RenderOptions, -) -> yaak_templates::error::Result { - let vars = &make_vars_hashmap(environment_chain); - - let mut url_parameters = Vec::new(); - for p in r.url_parameters.clone() { - if !p.enabled { - continue; - } - url_parameters.push(HttpUrlParameter { - enabled: p.enabled, - name: parse_and_render(p.name.as_str(), vars, cb, &opt).await?, - value: parse_and_render(p.value.as_str(), vars, cb, &opt).await?, - id: p.id, - }) - } - - let mut headers = Vec::new(); - for p in r.headers.clone() { - if !p.enabled { - continue; - } - headers.push(HttpRequestHeader { - enabled: p.enabled, - name: parse_and_render(p.name.as_str(), vars, cb, &opt).await?, - value: parse_and_render(p.value.as_str(), vars, cb, &opt).await?, - id: p.id, - }) - } - - let mut body = BTreeMap::new(); - for (k, v) in r.body.clone() { - let v = if k == "form" { strip_disabled_form_entries(v) } else { v }; - body.insert(k, render_json_value_raw(v, vars, cb, &opt).await?); - } - - let authentication = { - let mut disabled = false; - let mut auth = BTreeMap::new(); - match r.authentication.get("disabled") { - Some(Value::Bool(true)) => { - disabled = true; - } - Some(Value::String(tmpl)) => { - disabled = parse_and_render(tmpl.as_str(), vars, cb, &opt) - .await - .unwrap_or_default() - .is_empty(); - info!( - "Rendering authentication.disabled as a template: {disabled} from \"{tmpl}\"" - ); - } - _ => {} - } - if disabled { - auth.insert("disabled".to_string(), Value::Bool(true)); - } else { - for (k, v) in r.authentication.clone() { - if k == "disabled" { - auth.insert(k, Value::Bool(false)); - } else { - auth.insert(k, render_json_value_raw(v, vars, cb, &opt).await?); - } - } - } - auth - }; - - let url = parse_and_render(r.url.clone().as_str(), vars, cb, &opt).await?; - - // This doesn't fit perfectly with the concept of "rendering" but it kind of does - let (url, url_parameters) = apply_path_placeholders(&url, &url_parameters); - - Ok(HttpRequest { url, url_parameters, headers, body, authentication, ..r.to_owned() }) -} - -/// Strip disabled entries from a JSON array of form objects. -fn strip_disabled_form_entries(v: Value) -> Value { - match v { - Value::Array(items) => Value::Array( - items - .into_iter() - .filter(|item| item.get("enabled").and_then(|e| e.as_bool()).unwrap_or(true)) - .collect(), - ), - v => v, - } -} - -#[cfg(test)] -mod tests { - use super::*; - use serde_json::json; - - #[test] - fn test_strip_disabled_form_entries() { - let input = json!([ - {"enabled": true, "name": "foo", "value": "bar"}, - {"enabled": false, "name": "disabled", "value": "gone"}, - {"enabled": true, "name": "baz", "value": "qux"}, - ]); - let result = strip_disabled_form_entries(input); - assert_eq!( - result, - json!([ - {"enabled": true, "name": "foo", "value": "bar"}, - {"enabled": true, "name": "baz", "value": "qux"}, - ]) - ); - } - - #[test] - fn test_strip_disabled_form_entries_all_disabled() { - let input = json!([ - {"enabled": false, "name": "a", "value": "b"}, - {"enabled": false, "name": "c", "value": "d"}, - ]); - let result = strip_disabled_form_entries(input); - assert_eq!(result, json!([])); - } - - #[test] - fn test_strip_disabled_form_entries_missing_enabled_defaults_to_kept() { - let input = json!([ - {"name": "no_enabled_field", "value": "kept"}, - {"enabled": false, "name": "disabled", "value": "gone"}, - ]); - let result = strip_disabled_form_entries(input); - assert_eq!( - result, - json!([ - {"name": "no_enabled_field", "value": "kept"}, - ]) - ); - } - - #[test] - fn test_strip_disabled_form_entries_non_array_passthrough() { - let input = json!("just a string"); - let result = strip_disabled_form_entries(input.clone()); - assert_eq!(result, input); - } -} diff --git a/crates/yaak/Cargo.toml b/crates/yaak/Cargo.toml new file mode 100644 index 00000000..51fdfca2 --- /dev/null +++ b/crates/yaak/Cargo.toml @@ -0,0 +1,14 @@ +[package] +name = "yaak" +version = "0.1.0" +edition = "2024" +publish = false + +[dependencies] +log = { workspace = true } +serde_json = { workspace = true } +thiserror = { workspace = true } +tokio = { workspace = true, features = ["sync", "rt"] } +yaak-http = { workspace = true } +yaak-models = { workspace = true } +yaak-templates = { workspace = true } diff --git a/crates/yaak/src/error.rs b/crates/yaak/src/error.rs new file mode 100644 index 00000000..322c78d8 --- /dev/null +++ b/crates/yaak/src/error.rs @@ -0,0 +1,9 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error(transparent)] + Send(#[from] crate::send::SendHttpRequestError), +} + +pub type Result = std::result::Result; diff --git a/crates/yaak/src/lib.rs b/crates/yaak/src/lib.rs new file mode 100644 index 00000000..2f068b24 --- /dev/null +++ b/crates/yaak/src/lib.rs @@ -0,0 +1,6 @@ +pub mod error; +pub mod render; +pub mod send; + +pub use error::Error; +pub type Result = error::Result; diff --git a/crates/yaak/src/render.rs b/crates/yaak/src/render.rs new file mode 100644 index 00000000..64b5e04e --- /dev/null +++ b/crates/yaak/src/render.rs @@ -0,0 +1,157 @@ +use log::info; +use serde_json::Value; +use std::collections::BTreeMap; +use yaak_http::path_placeholders::apply_path_placeholders; +use yaak_models::models::{Environment, HttpRequest, HttpRequestHeader, HttpUrlParameter}; +use yaak_models::render::make_vars_hashmap; +use yaak_templates::{RenderOptions, TemplateCallback, parse_and_render, render_json_value_raw}; + +pub async fn render_http_request( + request: &HttpRequest, + environment_chain: Vec, + callback: &T, + options: &RenderOptions, +) -> yaak_templates::error::Result { + let vars = &make_vars_hashmap(environment_chain); + + let mut url_parameters = Vec::new(); + for parameter in request.url_parameters.clone() { + if !parameter.enabled { + continue; + } + + url_parameters.push(HttpUrlParameter { + enabled: parameter.enabled, + name: parse_and_render(parameter.name.as_str(), vars, callback, options).await?, + value: parse_and_render(parameter.value.as_str(), vars, callback, options).await?, + id: parameter.id, + }) + } + + let mut headers = Vec::new(); + for header in request.headers.clone() { + if !header.enabled { + continue; + } + + headers.push(HttpRequestHeader { + enabled: header.enabled, + name: parse_and_render(header.name.as_str(), vars, callback, options).await?, + value: parse_and_render(header.value.as_str(), vars, callback, options).await?, + id: header.id, + }) + } + + let mut body = BTreeMap::new(); + for (key, value) in request.body.clone() { + let value = if key == "form" { strip_disabled_form_entries(value) } else { value }; + body.insert(key, render_json_value_raw(value, vars, callback, options).await?); + } + + let authentication = { + let mut disabled = false; + let mut auth = BTreeMap::new(); + + match request.authentication.get("disabled") { + Some(Value::Bool(true)) => { + disabled = true; + } + Some(Value::String(template)) => { + disabled = parse_and_render(template.as_str(), vars, callback, options) + .await + .unwrap_or_default() + .is_empty(); + info!( + "Rendering authentication.disabled as a template: {disabled} from \"{template}\"" + ); + } + _ => {} + } + + if disabled { + auth.insert("disabled".to_string(), Value::Bool(true)); + } else { + for (key, value) in request.authentication.clone() { + if key == "disabled" { + auth.insert(key, Value::Bool(false)); + } else { + auth.insert(key, render_json_value_raw(value, vars, callback, options).await?); + } + } + } + + auth + }; + + let url = parse_and_render(request.url.clone().as_str(), vars, callback, options).await?; + let (url, url_parameters) = apply_path_placeholders(&url, &url_parameters); + + Ok(HttpRequest { url, url_parameters, headers, body, authentication, ..request.to_owned() }) +} + +fn strip_disabled_form_entries(v: Value) -> Value { + match v { + Value::Array(items) => Value::Array( + items + .into_iter() + .filter(|item| item.get("enabled").and_then(|e| e.as_bool()).unwrap_or(true)) + .collect(), + ), + v => v, + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_strip_disabled_form_entries() { + let input = json!([ + {"enabled": true, "name": "foo", "value": "bar"}, + {"enabled": false, "name": "disabled", "value": "gone"}, + {"enabled": true, "name": "baz", "value": "qux"}, + ]); + let result = strip_disabled_form_entries(input); + assert_eq!( + result, + json!([ + {"enabled": true, "name": "foo", "value": "bar"}, + {"enabled": true, "name": "baz", "value": "qux"}, + ]) + ); + } + + #[test] + fn test_strip_disabled_form_entries_all_disabled() { + let input = json!([ + {"enabled": false, "name": "a", "value": "b"}, + {"enabled": false, "name": "c", "value": "d"}, + ]); + let result = strip_disabled_form_entries(input); + assert_eq!(result, json!([])); + } + + #[test] + fn test_strip_disabled_form_entries_missing_enabled_defaults_to_kept() { + let input = json!([ + {"name": "no_enabled_field", "value": "kept"}, + {"enabled": false, "name": "disabled", "value": "gone"}, + ]); + let result = strip_disabled_form_entries(input); + assert_eq!( + result, + json!([ + {"name": "no_enabled_field", "value": "kept"}, + ]) + ); + } + + #[test] + fn test_strip_disabled_form_entries_non_array_passthrough() { + let input = json!("just a string"); + let result = strip_disabled_form_entries(input.clone()); + assert_eq!(result, input); + } +} diff --git a/crates/yaak/src/send.rs b/crates/yaak/src/send.rs new file mode 100644 index 00000000..9cf5be67 --- /dev/null +++ b/crates/yaak/src/send.rs @@ -0,0 +1,310 @@ +use crate::render::render_http_request; +use log::warn; +use std::path::{Path, PathBuf}; +use std::time::Instant; +use thiserror::Error; +use tokio::sync::mpsc; +use yaak_http::sender::{HttpResponseEvent as SenderHttpResponseEvent, HttpSender, ReqwestSender}; +use yaak_http::types::{SendableBody, SendableHttpRequest, SendableHttpRequestOptions}; +use yaak_models::blob_manager::BlobManager; +use yaak_models::models::{HttpResponse, HttpResponseEvent, HttpResponseHeader, HttpResponseState}; +use yaak_models::query_manager::QueryManager; +use yaak_models::util::UpdateSource; +use yaak_templates::{RenderOptions, TemplateCallback}; + +const HTTP_EVENT_CHANNEL_CAPACITY: usize = 100; + +#[derive(Debug, Error)] +pub enum SendHttpRequestError { + #[error("Failed to load request: {0}")] + LoadRequest(#[source] yaak_models::error::Error), + + #[error("Failed to resolve environments: {0}")] + ResolveEnvironments(#[source] yaak_models::error::Error), + + #[error("Failed to resolve inherited request settings: {0}")] + ResolveRequestInheritance(#[source] yaak_models::error::Error), + + #[error("Failed to render request templates: {0}")] + RenderRequest(#[source] yaak_templates::error::Error), + + #[error("Failed to persist response metadata: {0}")] + PersistResponse(#[source] yaak_models::error::Error), + + #[error("Failed to create HTTP client: {0}")] + CreateHttpClient(#[source] yaak_http::error::Error), + + #[error("Failed to build sendable request: {0}")] + BuildSendableRequest(#[source] yaak_http::error::Error), + + #[error("Failed to send request: {0}")] + SendRequest(#[source] yaak_http::error::Error), + + #[error("Failed to read response body: {0}")] + ReadResponseBody(#[source] yaak_http::error::Error), + + #[error("Failed to create response directory {path:?}: {source}")] + CreateResponseDirectory { + path: PathBuf, + #[source] + source: std::io::Error, + }, + + #[error("Failed to write response body to {path:?}: {source}")] + WriteResponseBody { + path: PathBuf, + #[source] + source: std::io::Error, + }, +} + +pub type Result = std::result::Result; + +pub struct SendHttpRequestByIdParams<'a, T: TemplateCallback> { + pub query_manager: &'a QueryManager, + pub blob_manager: &'a BlobManager, + pub request_id: &'a str, + pub environment_id: Option<&'a str>, + pub template_callback: &'a T, + pub send_options: SendableHttpRequestOptions, + pub update_source: UpdateSource, + pub response_dir: &'a Path, + pub persist_events: bool, + pub emit_events_to: Option>, +} + +pub struct SendHttpRequestResult { + pub rendered_request: yaak_models::models::HttpRequest, + pub response: HttpResponse, + pub response_body: Vec, +} + +pub async fn send_http_request_by_id( + params: SendHttpRequestByIdParams<'_, T>, +) -> Result { + let db = params.query_manager.connect(); + let request = + db.get_http_request(params.request_id).map_err(SendHttpRequestError::LoadRequest)?; + let environment_chain = db + .resolve_environments( + &request.workspace_id, + request.folder_id.as_deref(), + params.environment_id, + ) + .map_err(SendHttpRequestError::ResolveEnvironments)?; + + let (authentication_type, authentication, _auth_context_id) = db + .resolve_auth_for_http_request(&request) + .map_err(SendHttpRequestError::ResolveRequestInheritance)?; + let resolved_headers = db + .resolve_headers_for_http_request(&request) + .map_err(SendHttpRequestError::ResolveRequestInheritance)?; + drop(db); + + let mut resolved_request = request.clone(); + resolved_request.authentication_type = authentication_type; + resolved_request.authentication = authentication; + resolved_request.headers = resolved_headers; + + let rendered_request = render_http_request( + &resolved_request, + environment_chain, + params.template_callback, + &RenderOptions::throw(), + ) + .await + .map_err(SendHttpRequestError::RenderRequest)?; + + let sendable_request = + SendableHttpRequest::from_http_request(&rendered_request, params.send_options) + .await + .map_err(SendHttpRequestError::BuildSendableRequest)?; + let request_content_length = sendable_body_length(sendable_request.body.as_ref()); + + let mut response = params + .query_manager + .connect() + .upsert_http_response( + &HttpResponse { + request_id: request.id.clone(), + workspace_id: request.workspace_id.clone(), + request_content_length, + request_headers: sendable_request + .headers + .iter() + .map(|(name, value)| HttpResponseHeader { + name: name.clone(), + value: value.clone(), + }) + .collect(), + url: sendable_request.url.clone(), + ..Default::default() + }, + ¶ms.update_source, + params.blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse)?; + + let (event_tx, mut event_rx) = + mpsc::channel::(HTTP_EVENT_CHANNEL_CAPACITY); + let event_query_manager = params.query_manager.clone(); + let event_response_id = response.id.clone(); + let event_workspace_id = request.workspace_id.clone(); + let event_update_source = params.update_source.clone(); + let emit_events_to = params.emit_events_to.clone(); + let persist_events = params.persist_events; + let event_handle = tokio::spawn(async move { + while let Some(event) = event_rx.recv().await { + if persist_events { + let db_event = HttpResponseEvent::new( + &event_response_id, + &event_workspace_id, + event.clone().into(), + ); + if let Err(err) = event_query_manager + .connect() + .upsert_http_response_event(&db_event, &event_update_source) + { + warn!("Failed to persist HTTP response event: {}", err); + } + } + + if let Some(tx) = emit_events_to.as_ref() { + let _ = tx.try_send(event); + } + } + }); + + let sender = ReqwestSender::new().map_err(SendHttpRequestError::CreateHttpClient)?; + let started_at = Instant::now(); + let request_started_url = sendable_request.url.clone(); + + let http_response = match sender.send(sendable_request, event_tx).await { + Ok(response) => response, + Err(err) => { + let _ = params + .query_manager + .connect() + .upsert_http_response( + &HttpResponse { + state: HttpResponseState::Closed, + elapsed: duration_to_i32(started_at.elapsed()), + elapsed_headers: duration_to_i32(started_at.elapsed()), + error: Some(err.to_string()), + url: request_started_url, + ..response + }, + ¶ms.update_source, + params.blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse)?; + + if let Err(join_err) = event_handle.await { + warn!("Failed to join response event task: {}", join_err); + } + + return Err(SendHttpRequestError::SendRequest(err)); + } + }; + + let headers_elapsed = duration_to_i32(started_at.elapsed()); + response = params + .query_manager + .connect() + .upsert_http_response( + &HttpResponse { + state: HttpResponseState::Connected, + elapsed_headers: headers_elapsed, + status: i32::from(http_response.status), + status_reason: http_response.status_reason.clone(), + url: http_response.url.clone(), + remote_addr: http_response.remote_addr.clone(), + version: http_response.version.clone(), + headers: http_response + .headers + .iter() + .map(|(name, value)| HttpResponseHeader { + name: name.clone(), + value: value.clone(), + }) + .collect(), + request_headers: http_response + .request_headers + .iter() + .map(|(name, value)| HttpResponseHeader { + name: name.clone(), + value: value.clone(), + }) + .collect(), + ..response + }, + ¶ms.update_source, + params.blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse)?; + + let (response_body, body_stats) = + http_response.bytes().await.map_err(SendHttpRequestError::ReadResponseBody)?; + + std::fs::create_dir_all(params.response_dir).map_err(|source| { + SendHttpRequestError::CreateResponseDirectory { + path: params.response_dir.to_path_buf(), + source, + } + })?; + + let body_path = params.response_dir.join(&response.id); + std::fs::write(&body_path, &response_body).map_err(|source| { + SendHttpRequestError::WriteResponseBody { path: body_path.clone(), source } + })?; + + response = params + .query_manager + .connect() + .upsert_http_response( + &HttpResponse { + body_path: Some(body_path.to_string_lossy().to_string()), + content_length: Some(usize_to_i32(response_body.len())), + content_length_compressed: Some(u64_to_i32(body_stats.size_compressed)), + elapsed: duration_to_i32(started_at.elapsed()), + elapsed_headers: headers_elapsed, + state: HttpResponseState::Closed, + ..response + }, + ¶ms.update_source, + params.blob_manager, + ) + .map_err(SendHttpRequestError::PersistResponse)?; + + if let Err(join_err) = event_handle.await { + warn!("Failed to join response event task: {}", join_err); + } + + Ok(SendHttpRequestResult { rendered_request, response, response_body }) +} + +fn sendable_body_length(body: Option<&SendableBody>) -> Option { + match body { + Some(SendableBody::Bytes(bytes)) => Some(usize_to_i32(bytes.len())), + Some(SendableBody::Stream { content_length: Some(length), .. }) => { + Some(u64_to_i32(*length)) + } + _ => None, + } +} + +fn duration_to_i32(duration: std::time::Duration) -> i32 { + u128_to_i32(duration.as_millis()) +} + +fn usize_to_i32(value: usize) -> i32 { + if value > i32::MAX as usize { i32::MAX } else { value as i32 } +} + +fn u64_to_i32(value: u64) -> i32 { + if value > i32::MAX as u64 { i32::MAX } else { value as i32 } +} + +fn u128_to_i32(value: u128) -> i32 { + if value > i32::MAX as u128 { i32::MAX } else { value as i32 } +}