diff --git a/.gitattributes b/.gitattributes
index 6438d80c..0b6a9cb7 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -1,2 +1,7 @@
src-tauri/vendored/**/* linguist-generated=true
src-tauri/gen/schemas/**/* linguist-generated=true
+**/bindings/* linguist-generated=true
+src-tauri/yaak-templates/pkg/* linguist-generated=true
+
+# Ensure consistent line endings for test files that check exact content
+src-tauri/yaak-http/tests/test.txt text eol=lf
diff --git a/.github/workflows/claude.yml b/.github/workflows/claude.yml
new file mode 100644
index 00000000..d300267f
--- /dev/null
+++ b/.github/workflows/claude.yml
@@ -0,0 +1,50 @@
+name: Claude Code
+
+on:
+ issue_comment:
+ types: [created]
+ pull_request_review_comment:
+ types: [created]
+ issues:
+ types: [opened, assigned]
+ pull_request_review:
+ types: [submitted]
+
+jobs:
+ claude:
+ if: |
+ (github.event_name == 'issue_comment' && contains(github.event.comment.body, '@claude')) ||
+ (github.event_name == 'pull_request_review_comment' && contains(github.event.comment.body, '@claude')) ||
+ (github.event_name == 'pull_request_review' && contains(github.event.review.body, '@claude')) ||
+ (github.event_name == 'issues' && (contains(github.event.issue.body, '@claude') || contains(github.event.issue.title, '@claude')))
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ pull-requests: read
+ issues: read
+ id-token: write
+ actions: read # Required for Claude to read CI results on PRs
+ steps:
+ - name: Checkout repository
+ uses: actions/checkout@v4
+ with:
+ fetch-depth: 1
+
+ - name: Run Claude Code
+ id: claude
+ uses: anthropics/claude-code-action@v1
+ with:
+ claude_code_oauth_token: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
+
+ # This is an optional setting that allows Claude to read CI results on PRs
+ additional_permissions: |
+ actions: read
+
+ # Optional: Give a custom prompt to Claude. If this is not specified, Claude will perform the instructions specified in the comment that tagged it.
+ # prompt: 'Update the pull request description to include a summary of changes.'
+
+ # Optional: Add claude_args to customize behavior and configuration
+ # See https://github.com/anthropics/claude-code-action/blob/main/docs/usage.md
+ # or https://code.claude.com/docs/en/cli-reference for available options
+ # claude_args: '--allowed-tools Bash(gh pr:*)'
+
diff --git a/.gitignore b/.gitignore
index ee2c7428..e4995499 100644
--- a/.gitignore
+++ b/.gitignore
@@ -34,3 +34,5 @@ out
.tmp
tmp
+.zed
+codebook.toml
diff --git a/README.md b/README.md
index 8bb37fb7..327cfd0d 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@
-
+
diff --git a/package-lock.json b/package-lock.json
index 2a3de264..86c09beb 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -1700,6 +1700,21 @@
"integrity": "sha512-l0h88YhZFyKdXIFNfSWpyjStDjGHwZ/U7iobcK1cQQD8sejsONdQtTVU+1wVN1PBw40PiiHB1vA5S7VTfQiP9g==",
"license": "MIT"
},
+ "node_modules/@mjackson/headers": {
+ "version": "0.11.1",
+ "resolved": "https://registry.npmjs.org/@mjackson/headers/-/headers-0.11.1.tgz",
+ "integrity": "sha512-uXXhd4rtDdDwkqAuGef1nuafkCa1NlTmEc1Jzc0NL4YiA1yON1NFXuqJ3hOuKvNKQwkiDwdD+JJlKVyz4dunFA==",
+ "license": "MIT"
+ },
+ "node_modules/@mjackson/multipart-parser": {
+ "version": "0.10.1",
+ "resolved": "https://registry.npmjs.org/@mjackson/multipart-parser/-/multipart-parser-0.10.1.tgz",
+ "integrity": "sha512-cHMD6+ErH/DrEfC0N6Ru/+1eAdavxdV0C35PzSb5/SD7z3XoaDMc16xPJcb8CahWjSpqHY+Too9sAb6/UNuq7A==",
+ "license": "MIT",
+ "dependencies": {
+ "@mjackson/headers": "^0.11.1"
+ }
+ },
"node_modules/@mrmlnc/readdir-enhanced": {
"version": "2.2.1",
"resolved": "https://registry.npmjs.org/@mrmlnc/readdir-enhanced/-/readdir-enhanced-2.2.1.tgz",
@@ -18680,6 +18695,7 @@
"@gilbarbara/deep-equal": "^0.3.1",
"@lezer/highlight": "^1.1.3",
"@lezer/lr": "^1.3.3",
+ "@mjackson/multipart-parser": "^0.10.1",
"@prantlf/jsonlint": "^16.0.0",
"@replit/codemirror-emacs": "^6.1.0",
"@replit/codemirror-vim": "^6.3.0",
diff --git a/packages/plugin-runtime-types/src/bindings/gen_models.ts b/packages/plugin-runtime-types/src/bindings/gen_models.ts
index 6b2eb5c8..454903fe 100644
--- a/packages/plugin-runtime-types/src/bindings/gen_models.ts
+++ b/packages/plugin-runtime-types/src/bindings/gen_models.ts
@@ -12,7 +12,7 @@ export type HttpRequest = { model: "http_request", id: string, createdAt: string
export type HttpRequestHeader = { enabled?: boolean, name: string, value: string, id?: string, };
-export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array, remoteAddr: string | null, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
+export type HttpResponse = { model: "http_response", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, bodyPath: string | null, contentLength: number | null, contentLengthCompressed: number | null, elapsed: number, elapsedHeaders: number, error: string | null, headers: Array, remoteAddr: string | null, requestContentLength: number | null, requestHeaders: Array, status: number, statusReason: string | null, state: HttpResponseState, url: string, version: string | null, };
export type HttpResponseHeader = { name: string, value: string, };
diff --git a/plugins/importer-curl/src/index.ts b/plugins/importer-curl/src/index.ts
index 49834ecc..5e4b6d98 100644
--- a/plugins/importer-curl/src/index.ts
+++ b/plugins/importer-curl/src/index.ts
@@ -194,11 +194,17 @@ function importCommand(parseEntries: ParseEntry[], workspaceId: string) {
let value: string | boolean;
const nextEntry = parseEntries[i + 1];
const hasValue = !BOOLEAN_FLAGS.includes(name);
+ // Check if nextEntry looks like a flag:
+ // - Single dash followed by a letter: -X, -H, -d
+ // - Double dash followed by a letter: --data-raw, --header
+ // This prevents mistaking data that starts with dashes (like multipart boundaries ------) as flags
+ const nextEntryIsFlag = typeof nextEntry === 'string' &&
+ (nextEntry.match(/^-[a-zA-Z]/) || nextEntry.match(/^--[a-zA-Z]/));
if (isSingleDash && name.length > 1) {
// Handle squished arguments like -XPOST
value = name.slice(1);
name = name.slice(0, 1);
- } else if (typeof nextEntry === 'string' && hasValue && !nextEntry.startsWith('-')) {
+ } else if (typeof nextEntry === 'string' && hasValue && !nextEntryIsFlag) {
// Next arg is not a flag, so assign it as the value
value = nextEntry;
i++; // Skip next one
@@ -305,11 +311,32 @@ function importCommand(parseEntries: ParseEntry[], workspaceId: string) {
}
// Body (Text or Blob)
- const dataParameters = pairsToDataParameters(flagsByName);
const contentTypeHeader = headers.find((header) => header.name.toLowerCase() === 'content-type');
- const mimeType = contentTypeHeader ? contentTypeHeader.value.split(';')[0] : null;
+ const mimeType = contentTypeHeader ? contentTypeHeader.value.split(';')[0]?.trim() : null;
- // Body (Multipart Form Data)
+ // Extract boundary from Content-Type header for multipart parsing
+ const boundaryMatch = contentTypeHeader?.value.match(/boundary=([^\s;]+)/i);
+ const boundary = boundaryMatch?.[1];
+
+ // Get raw data from --data-raw flags (before splitting by &)
+ const rawDataValues = [
+ ...((flagsByName['data-raw'] as string[] | undefined) || []),
+ ...((flagsByName.d as string[] | undefined) || []),
+ ...((flagsByName.data as string[] | undefined) || []),
+ ...((flagsByName['data-binary'] as string[] | undefined) || []),
+ ...((flagsByName['data-ascii'] as string[] | undefined) || []),
+ ];
+
+ // Check if this is multipart form data in --data-raw (Chrome DevTools format)
+ let multipartFormDataFromRaw: { name: string; value?: string; file?: string; enabled: boolean }[] | null = null;
+ if (mimeType === 'multipart/form-data' && boundary && rawDataValues.length > 0) {
+ const rawBody = rawDataValues.join('');
+ multipartFormDataFromRaw = parseMultipartFormData(rawBody, boundary);
+ }
+
+ const dataParameters = pairsToDataParameters(flagsByName);
+
+ // Body (Multipart Form Data from -F flags)
const formDataParams = [
...((flagsByName.form as string[] | undefined) || []),
...((flagsByName.F as string[] | undefined) || []),
@@ -336,7 +363,13 @@ function importCommand(parseEntries: ParseEntry[], workspaceId: string) {
let bodyType: string | null = null;
const bodyAsGET = getPairValue(flagsByName, false, ['G', 'get']);
- if (dataParameters.length > 0 && bodyAsGET) {
+ if (multipartFormDataFromRaw) {
+ // Handle multipart form data parsed from --data-raw (Chrome DevTools format)
+ bodyType = 'multipart/form-data';
+ body = {
+ form: multipartFormDataFromRaw,
+ };
+ } else if (dataParameters.length > 0 && bodyAsGET) {
urlParameters.push(...dataParameters);
} else if (
dataParameters.length > 0 &&
@@ -473,6 +506,71 @@ function splitOnce(str: string, sep: string): string[] {
return [str];
}
+/**
+ * Parses multipart form data from a raw body string
+ * Used when Chrome DevTools exports a cURL with --data-raw containing multipart data
+ */
+function parseMultipartFormData(
+ rawBody: string,
+ boundary: string,
+): { name: string; value?: string; file?: string; enabled: boolean }[] | null {
+ const results: { name: string; value?: string; file?: string; enabled: boolean }[] = [];
+
+ // The boundary in the body typically has -- prefix
+ const boundaryMarker = `--${boundary}`;
+ const parts = rawBody.split(boundaryMarker);
+
+ for (const part of parts) {
+ // Skip empty parts and the closing boundary marker
+ if (!part || part.trim() === '--' || part.trim() === '--\r\n') {
+ continue;
+ }
+
+ // Each part has headers and content separated by \r\n\r\n
+ const headerContentSplit = part.indexOf('\r\n\r\n');
+ if (headerContentSplit === -1) {
+ continue;
+ }
+
+ const headerSection = part.slice(0, headerContentSplit);
+ let content = part.slice(headerContentSplit + 4); // Skip \r\n\r\n
+
+ // Remove trailing \r\n from content
+ if (content.endsWith('\r\n')) {
+ content = content.slice(0, -2);
+ }
+
+ // Parse Content-Disposition header to get name and filename
+ const contentDispositionMatch = headerSection.match(
+ /Content-Disposition:\s*form-data;\s*name="([^"]+)"(?:;\s*filename="([^"]+)")?/i,
+ );
+
+ if (!contentDispositionMatch) {
+ continue;
+ }
+
+ const name = contentDispositionMatch[1] ?? '';
+ const filename = contentDispositionMatch[2];
+
+ const item: { name: string; value?: string; file?: string; enabled: boolean } = {
+ name,
+ enabled: true,
+ };
+
+ if (filename) {
+ // This is a file upload field
+ item.file = filename;
+ } else {
+ // This is a regular text field
+ item.value = content;
+ }
+
+ results.push(item);
+ }
+
+ return results.length > 0 ? results : null;
+}
+
const idCount: Partial> = {};
function generateId(model: string): string {
diff --git a/plugins/importer-curl/tests/index.test.ts b/plugins/importer-curl/tests/index.test.ts
index 71e8cac1..6f75b8e4 100644
--- a/plugins/importer-curl/tests/index.test.ts
+++ b/plugins/importer-curl/tests/index.test.ts
@@ -441,6 +441,72 @@ describe('importer-curl', () => {
},
});
});
+
+ test('Imports multipart form data from --data-raw (Chrome DevTools format)', () => {
+ // This is the format Chrome DevTools uses when copying a multipart form submission as cURL
+ const curlCommand = `curl 'http://localhost:8080/system' \
+ -H 'Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryHwsXKi4rKA6P5VBd' \
+ --data-raw $'------WebKitFormBoundaryHwsXKi4rKA6P5VBd\r\nContent-Disposition: form-data; name="username"\r\n\r\njsgj\r\n------WebKitFormBoundaryHwsXKi4rKA6P5VBd\r\nContent-Disposition: form-data; name="password"\r\n\r\n654321\r\n------WebKitFormBoundaryHwsXKi4rKA6P5VBd\r\nContent-Disposition: form-data; name="captcha"; filename="test.xlsx"\r\nContent-Type: application/vnd.openxmlformats-officedocument.spreadsheetml.sheet\r\n\r\n\r\n------WebKitFormBoundaryHwsXKi4rKA6P5VBd--\r\n'`;
+
+ expect(convertCurl(curlCommand)).toEqual({
+ resources: {
+ workspaces: [baseWorkspace()],
+ httpRequests: [
+ baseRequest({
+ url: 'http://localhost:8080/system',
+ method: 'POST',
+ headers: [
+ {
+ name: 'Content-Type',
+ value: 'multipart/form-data; boundary=----WebKitFormBoundaryHwsXKi4rKA6P5VBd',
+ enabled: true,
+ },
+ ],
+ bodyType: 'multipart/form-data',
+ body: {
+ form: [
+ { name: 'username', value: 'jsgj', enabled: true },
+ { name: 'password', value: '654321', enabled: true },
+ { name: 'captcha', file: 'test.xlsx', enabled: true },
+ ],
+ },
+ }),
+ ],
+ },
+ });
+ });
+
+ test('Imports multipart form data with text-only fields from --data-raw', () => {
+ const curlCommand = `curl 'http://example.com/api' \
+ -H 'Content-Type: multipart/form-data; boundary=----FormBoundary123' \
+ --data-raw $'------FormBoundary123\r\nContent-Disposition: form-data; name="field1"\r\n\r\nvalue1\r\n------FormBoundary123\r\nContent-Disposition: form-data; name="field2"\r\n\r\nvalue2\r\n------FormBoundary123--\r\n'`;
+
+ expect(convertCurl(curlCommand)).toEqual({
+ resources: {
+ workspaces: [baseWorkspace()],
+ httpRequests: [
+ baseRequest({
+ url: 'http://example.com/api',
+ method: 'POST',
+ headers: [
+ {
+ name: 'Content-Type',
+ value: 'multipart/form-data; boundary=----FormBoundary123',
+ enabled: true,
+ },
+ ],
+ bodyType: 'multipart/form-data',
+ body: {
+ form: [
+ { name: 'field1', value: 'value1', enabled: true },
+ { name: 'field2', value: 'value2', enabled: true },
+ ],
+ },
+ }),
+ ],
+ },
+ });
+ });
});
const idCount: Partial> = {};
diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock
index 30ab8329..a91f4c0a 100644
--- a/src-tauri/Cargo.lock
+++ b/src-tauri/Cargo.lock
@@ -192,12 +192,14 @@ version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07"
dependencies = [
- "brotli",
+ "brotli 8.0.1",
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
+ "zstd",
+ "zstd-safe",
]
[[package]]
@@ -536,6 +538,17 @@ dependencies = [
"syn 2.0.101",
]
+[[package]]
+name = "brotli"
+version = "7.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
+ "brotli-decompressor 4.0.3",
+]
+
[[package]]
name = "brotli"
version = "8.0.1"
@@ -544,7 +557,17 @@ checksum = "9991eea70ea4f293524138648e41ee89b0b2b12ddef3b255effa43c8056e0e0d"
dependencies = [
"alloc-no-stdlib",
"alloc-stdlib",
- "brotli-decompressor",
+ "brotli-decompressor 5.0.0",
+]
+
+[[package]]
+name = "brotli-decompressor"
+version = "4.0.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd"
+dependencies = [
+ "alloc-no-stdlib",
+ "alloc-stdlib",
]
[[package]]
@@ -5762,7 +5785,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9fa9844cefcf99554a16e0a278156ae73b0d8680bbc0e2ad1e4287aadd8489cf"
dependencies = [
"base64 0.22.1",
- "brotli",
+ "brotli 8.0.1",
"ico",
"json-patch",
"plist",
@@ -6094,7 +6117,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76a423c51176eb3616ee9b516a9fa67fed5f0e78baaba680e44eb5dd2cc37490"
dependencies = [
"anyhow",
- "brotli",
+ "brotli 8.0.1",
"cargo_metadata",
"ctor",
"dunce",
@@ -7903,6 +7926,7 @@ dependencies = [
"thiserror 2.0.17",
"tokio",
"tokio-stream",
+ "tokio-util",
"ts-rs",
"uuid",
"yaak-common",
@@ -7929,6 +7953,7 @@ dependencies = [
"regex",
"reqwest",
"serde",
+ "serde_json",
"tauri",
"thiserror 2.0.17",
]
@@ -8010,19 +8035,30 @@ dependencies = [
name = "yaak-http"
version = "0.1.0"
dependencies = [
+ "async-compression",
+ "async-trait",
+ "brotli 7.0.0",
+ "bytes",
+ "flate2",
+ "futures-util",
"hyper-util",
"log",
+ "mime_guess",
"regex",
"reqwest",
"reqwest_cookie_store",
"serde",
+ "serde_json",
"tauri",
"thiserror 2.0.17",
"tokio",
+ "tokio-util",
"tower-service",
"urlencoding",
+ "yaak-common",
"yaak-models",
"yaak-tls",
+ "zstd",
]
[[package]]
diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml
index af5ce6c2..19be0850 100644
--- a/src-tauri/Cargo.toml
+++ b/src-tauri/Cargo.toml
@@ -74,6 +74,7 @@ tauri-plugin-window-state = "2.4.1"
thiserror = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
tokio-stream = "0.1.17"
+tokio-util = { version = "0.7", features = ["codec"] }
ts-rs = { workspace = true }
uuid = "1.12.1"
yaak-common = { workspace = true }
diff --git a/src-tauri/src/error.rs b/src-tauri/src/error.rs
index 4797fb0d..e0541b51 100644
--- a/src-tauri/src/error.rs
+++ b/src-tauri/src/error.rs
@@ -59,7 +59,7 @@ pub enum Error {
#[error("Request error: {0}")]
RequestError(#[from] reqwest::Error),
- #[error("Generic error: {0}")]
+ #[error("{0}")]
GenericError(String),
}
diff --git a/src-tauri/src/http_request.rs b/src-tauri/src/http_request.rs
index dfde8946..64230cbf 100644
--- a/src-tauri/src/http_request.rs
+++ b/src-tauri/src/http_request.rs
@@ -1,33 +1,30 @@
use crate::error::Error::GenericError;
use crate::error::Result;
use crate::render::render_http_request;
-use crate::response_err;
-use http::header::{ACCEPT, USER_AGENT};
-use http::{HeaderMap, HeaderName, HeaderValue};
-use log::{debug, error, warn};
-use mime_guess::Mime;
-use reqwest::{Method, Response};
-use reqwest::{Url, multipart};
+use log::{debug, warn};
use reqwest_cookie_store::{CookieStore, CookieStoreMutex};
-use serde_json::Value;
-use std::collections::BTreeMap;
-use std::path::PathBuf;
-use std::str::FromStr;
+use std::pin::Pin;
use std::sync::Arc;
-use std::time::Duration;
-use tauri::{Manager, Runtime, WebviewWindow};
-use tokio::fs;
+use std::time::{Duration, Instant};
+use tauri::{AppHandle, Manager, Runtime, WebviewWindow};
use tokio::fs::{File, create_dir_all};
-use tokio::io::AsyncWriteExt;
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt};
use tokio::sync::watch::Receiver;
-use tokio::sync::{Mutex, oneshot};
+use tokio_util::bytes::Bytes;
use yaak_http::client::{
HttpConnectionOptions, HttpConnectionProxySetting, HttpConnectionProxySettingAuth,
};
use yaak_http::manager::HttpConnectionManager;
+use yaak_http::sender::ReqwestSender;
+use yaak_http::tee_reader::TeeReader;
+use yaak_http::transaction::HttpTransaction;
+use yaak_http::types::{
+ SendableBody, SendableHttpRequest, SendableHttpRequestOptions, append_query_params,
+};
+use yaak_models::blob_manager::{BlobManagerExt, BodyChunk};
use yaak_models::models::{
- Cookie, CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseHeader,
- HttpResponseState, ProxySetting, ProxySettingAuth,
+ Cookie, CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseEvent,
+ HttpResponseHeader, HttpResponseState, ProxySetting, ProxySettingAuth,
};
use yaak_models::query_manager::QueryManagerExt;
use yaak_models::util::UpdateSource;
@@ -36,9 +33,58 @@ use yaak_plugins::events::{
};
use yaak_plugins::manager::PluginManager;
use yaak_plugins::template_callback::PluginTemplateCallback;
-use yaak_templates::{RenderErrorBehavior, RenderOptions};
+use yaak_templates::RenderOptions;
use yaak_tls::find_client_certificate;
+/// Chunk size for storing request bodies (1MB)
+const REQUEST_BODY_CHUNK_SIZE: usize = 1024 * 1024;
+
+/// Context for managing response state during HTTP transactions.
+/// Handles both persisted responses (stored in DB) and ephemeral responses (in-memory only).
+struct ResponseContext {
+ app_handle: AppHandle,
+ response: HttpResponse,
+ update_source: UpdateSource,
+}
+
+impl ResponseContext {
+ fn new(app_handle: AppHandle, response: HttpResponse, update_source: UpdateSource) -> Self {
+ Self { app_handle, response, update_source }
+ }
+
+ /// Whether this response is persisted (has a non-empty ID)
+ fn is_persisted(&self) -> bool {
+ !self.response.id.is_empty()
+ }
+
+ /// Update the response state. For persisted responses, fetches from DB, applies the
+ /// closure, and updates the DB. For ephemeral responses, just applies the closure
+ /// to the in-memory response.
+ fn update(&mut self, func: F) -> Result<()>
+ where
+ F: FnOnce(&mut HttpResponse),
+ {
+ if self.is_persisted() {
+ let r = self.app_handle.with_tx(|tx| {
+ let mut r = tx.get_http_response(&self.response.id)?;
+ func(&mut r);
+ tx.update_http_response_if_id(&r, &self.update_source)?;
+ Ok(r)
+ })?;
+ self.response = r;
+ Ok(())
+ } else {
+ func(&mut self.response);
+ Ok(())
+ }
+ }
+
+ /// Get the current response state
+ fn response(&self) -> &HttpResponse {
+ &self.response
+ }
+}
+
pub async fn send_http_request(
window: &WebviewWindow,
unrendered_request: &HttpRequest,
@@ -65,62 +111,81 @@ pub async fn send_http_request_with_context(
og_response: &HttpResponse,
environment: Option,
cookie_jar: Option,
- cancelled_rx: &mut Receiver,
+ cancelled_rx: &Receiver,
plugin_context: &PluginContext,
+) -> Result {
+ let app_handle = window.app_handle().clone();
+ let update_source = UpdateSource::from_window(window);
+ let mut response_ctx =
+ ResponseContext::new(app_handle.clone(), og_response.clone(), update_source);
+
+ // Execute the inner send logic and handle errors consistently
+ let start = Instant::now();
+ let result = send_http_request_inner(
+ window,
+ unrendered_request,
+ environment,
+ cookie_jar,
+ cancelled_rx,
+ plugin_context,
+ &mut response_ctx,
+ )
+ .await;
+
+ match result {
+ Ok(response) => Ok(response),
+ Err(e) => {
+ let error = e.to_string();
+ let elapsed = start.elapsed().as_millis() as i32;
+ warn!("Failed to send request: {error:?}");
+ let _ = response_ctx.update(|r| {
+ r.state = HttpResponseState::Closed;
+ r.elapsed = elapsed;
+ if r.elapsed_headers == 0 {
+ r.elapsed_headers = elapsed;
+ }
+ r.error = Some(error);
+ });
+ Ok(response_ctx.response().clone())
+ }
+ }
+}
+
+async fn send_http_request_inner(
+ window: &WebviewWindow,
+ unrendered_request: &HttpRequest,
+ environment: Option,
+ cookie_jar: Option,
+ cancelled_rx: &Receiver,
+ plugin_context: &PluginContext,
+ response_ctx: &mut ResponseContext,
) -> Result {
let app_handle = window.app_handle().clone();
let plugin_manager = app_handle.state::();
let connection_manager = app_handle.state::();
let settings = window.db().get_settings();
- let workspace = window.db().get_workspace(&unrendered_request.workspace_id)?;
+ let workspace_id = &unrendered_request.workspace_id;
+ let folder_id = unrendered_request.folder_id.as_deref();
let environment_id = environment.map(|e| e.id);
- let environment_chain = window.db().resolve_environments(
- &unrendered_request.workspace_id,
- unrendered_request.folder_id.as_deref(),
- environment_id.as_deref(),
- )?;
-
- let response_id = og_response.id.clone();
- let response = Arc::new(Mutex::new(og_response.clone()));
-
- let update_source = UpdateSource::from_window(window);
-
- let (resolved_request, auth_context_id) = match resolve_http_request(window, unrendered_request)
- {
- Ok(r) => r,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- ));
- }
- };
-
+ let workspace = window.db().get_workspace(workspace_id)?;
+ let (resolved, auth_context_id) = resolve_http_request(window, unrendered_request)?;
let cb = PluginTemplateCallback::new(window.app_handle(), &plugin_context, RenderPurpose::Send);
+ let env_chain =
+ window.db().resolve_environments(&workspace.id, folder_id, environment_id.as_deref())?;
+ let request = render_http_request(&resolved, env_chain, &cb, &RenderOptions::throw()).await?;
- let opt = RenderOptions { error_behavior: RenderErrorBehavior::Throw };
-
- let request = match render_http_request(&resolved_request, environment_chain, &cb, &opt).await {
- Ok(r) => r,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- ));
- }
+ // Build the sendable request using the new SendableHttpRequest type
+ let options = SendableHttpRequestOptions {
+ follow_redirects: workspace.setting_follow_redirects,
+ timeout: if workspace.setting_request_timeout > 0 {
+ Some(Duration::from_millis(workspace.setting_request_timeout.unsigned_abs() as u64))
+ } else {
+ None
+ },
};
+ let mut sendable_request = SendableHttpRequest::from_http_request(&request, options).await?;
- let mut url_string = request.url.clone();
-
- url_string = ensure_proto(&url_string);
- if !url_string.starts_with("http://") && !url_string.starts_with("https://") {
- url_string = format!("http://{}", url_string);
- }
- debug!("Sending request to {} {url_string}", request.method);
+ debug!("Sending request to {} {}", sendable_request.method, sendable_request.url);
let proxy_setting = match settings.proxy {
None => HttpConnectionProxySetting::System,
@@ -144,7 +209,8 @@ pub async fn send_http_request_with_context(
}
};
- let client_certificate = find_client_certificate(&url_string, &settings.client_certificates);
+ let client_certificate =
+ find_client_certificate(&sendable_request.url, &settings.client_certificates);
// Add cookie store if specified
let maybe_cookie_manager = match cookie_jar.clone() {
@@ -175,523 +241,73 @@ pub async fn send_http_request_with_context(
let client = connection_manager
.get_client(&HttpConnectionOptions {
id: plugin_context.id.clone(),
- follow_redirects: workspace.setting_follow_redirects,
validate_certificates: workspace.setting_validate_certificates,
proxy: proxy_setting,
cookie_provider: maybe_cookie_manager.as_ref().map(|(p, _)| Arc::clone(&p)),
client_certificate,
- timeout: if workspace.setting_request_timeout > 0 {
- Some(Duration::from_millis(workspace.setting_request_timeout.unsigned_abs() as u64))
- } else {
- None
- },
})
.await?;
- // Render query parameters
- let mut query_params = Vec::new();
- for p in request.url_parameters.clone() {
- if !p.enabled || p.name.is_empty() {
- continue;
- }
- query_params.push((p.name, p.value));
- }
+ // Apply authentication to the request
+ apply_authentication(
+ &window,
+ &mut sendable_request,
+ &request,
+ auth_context_id,
+ &plugin_manager,
+ plugin_context,
+ )
+ .await?;
- let url = match Url::from_str(&url_string) {
- Ok(u) => u,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
- &update_source,
- ));
+ let result =
+ execute_transaction(client, sendable_request, response_ctx, cancelled_rx.clone()).await;
+
+ // Wait for blob writing to complete and check for errors
+ let final_result = match result {
+ Ok((response, maybe_blob_write_handle)) => {
+ // Check if blob writing failed
+ if let Some(handle) = maybe_blob_write_handle {
+ if let Ok(Err(e)) = handle.await {
+ // Update response with the storage error
+ let _ = response_ctx.update(|r| {
+ let error_msg =
+ format!("Request succeeded but failed to store request body: {}", e);
+ r.error = Some(match &r.error {
+ Some(existing) => format!("{}; {}", existing, error_msg),
+ None => error_msg,
+ });
+ });
+ }
+ }
+ Ok(response)
}
+ Err(e) => Err(e),
};
- let m = Method::from_str(&request.method.to_uppercase())
- .map_err(|e| GenericError(e.to_string()))?;
- let mut request_builder = client.request(m, url).query(&query_params);
-
- let mut headers = HeaderMap::new();
- headers.insert(USER_AGENT, HeaderValue::from_static("yaak"));
- headers.insert(ACCEPT, HeaderValue::from_static("*/*"));
-
- // TODO: Set cookie header ourselves once we also handle redirects. We need to do this
- // because reqwest doesn't give us a way to inspect the headers it sent (we have to do
- // everything manually to know that).
- // if let Some(cookie_store) = maybe_cookie_store.clone() {
- // let values1 = cookie_store.get_request_values(&url);
- // let raw_value = cookie_store.get_request_values(&url)
- // .map(|(name, value)| format!("{}={}", name, value))
- // .collect::>()
- // .join("; ");
- // headers.insert(
- // COOKIE,
- // HeaderValue::from_str(&raw_value).expect("Failed to create cookie header"),
- // );
- // }
-
- for h in request.headers.clone() {
- if h.name.is_empty() && h.value.is_empty() {
- continue;
- }
-
- if !h.enabled {
- continue;
- }
-
- let header_name = match HeaderName::from_str(&h.name) {
- Ok(n) => n,
- Err(e) => {
- error!("Failed to create header name: {}", e);
- continue;
- }
- };
- let header_value = match HeaderValue::from_str(&h.value) {
- Ok(n) => n,
- Err(e) => {
- error!("Failed to create header value: {}", e);
- continue;
- }
- };
-
- headers.insert(header_name, header_value);
- }
-
- let request_body = request.body.clone();
- if let Some(body_type) = &request.body_type.clone() {
- if body_type == "graphql" {
- let query = get_str_h(&request_body, "query");
- let variables = get_str_h(&request_body, "variables");
- if request.method.to_lowercase() == "get" {
- request_builder = request_builder.query(&[("query", query)]);
- if !variables.trim().is_empty() {
- request_builder = request_builder.query(&[("variables", variables)]);
- }
- } else {
- let body = if variables.trim().is_empty() {
- format!(r#"{{"query":{}}}"#, serde_json::to_string(query).unwrap_or_default())
- } else {
- format!(
- r#"{{"query":{},"variables":{variables}}}"#,
- serde_json::to_string(query).unwrap_or_default()
- )
- };
- request_builder = request_builder.body(body.to_owned());
- }
- } else if body_type == "application/x-www-form-urlencoded"
- && request_body.contains_key("form")
- {
- let mut form_params = Vec::new();
- let form = request_body.get("form");
- if let Some(f) = form {
- match f.as_array() {
- None => {}
- Some(a) => {
- for p in a {
- let enabled = get_bool(p, "enabled", true);
- let name = get_str(p, "name");
- if !enabled || name.is_empty() {
- continue;
- }
- let value = get_str(p, "value");
- form_params.push((name, value));
- }
- }
- }
- }
- request_builder = request_builder.form(&form_params);
- } else if body_type == "binary" && request_body.contains_key("filePath") {
- let file_path = request_body
- .get("filePath")
- .ok_or(GenericError("filePath not set".to_string()))?
- .as_str()
- .unwrap_or_default();
-
- match fs::read(file_path).await.map_err(|e| e.to_string()) {
- Ok(f) => {
- request_builder = request_builder.body(f);
- }
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e,
- &update_source,
- ));
- }
- }
- } else if body_type == "multipart/form-data" && request_body.contains_key("form") {
- let mut multipart_form = multipart::Form::new();
- if let Some(form_definition) = request_body.get("form") {
- match form_definition.as_array() {
- None => {}
- Some(fd) => {
- for p in fd {
- let enabled = get_bool(p, "enabled", true);
- let name = get_str(p, "name").to_string();
-
- if !enabled || name.is_empty() {
- continue;
- }
-
- let file_path = get_str(p, "file").to_owned();
- let value = get_str(p, "value").to_owned();
-
- let mut part = if file_path.is_empty() {
- multipart::Part::text(value.clone())
- } else {
- match fs::read(file_path.clone()).await {
- Ok(f) => multipart::Part::bytes(f),
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- ));
- }
- }
- };
-
- let content_type = get_str(p, "contentType");
-
- // Set or guess mimetype
- if !content_type.is_empty() {
- part = match part.mime_str(content_type) {
- Ok(p) => p,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- format!("Invalid mime for multi-part entry {e:?}"),
- &update_source,
- ));
- }
- };
- } else if !file_path.is_empty() {
- let default_mime =
- Mime::from_str("application/octet-stream").unwrap();
- let mime =
- mime_guess::from_path(file_path.clone()).first_or(default_mime);
- part = match part.mime_str(mime.essence_str()) {
- Ok(p) => p,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- format!("Invalid mime for multi-part entry {e:?}"),
- &update_source,
- ));
- }
- };
- }
-
- // Set a file path if it is not empty
- if !file_path.is_empty() {
- let user_filename = get_str(p, "filename").to_owned();
- let filename = if user_filename.is_empty() {
- PathBuf::from(file_path)
- .file_name()
- .unwrap_or_default()
- .to_string_lossy()
- .to_string()
- } else {
- user_filename
- };
- part = part.file_name(filename);
- }
-
- multipart_form = multipart_form.part(name, part);
- }
- }
- }
- }
- headers.remove("Content-Type"); // reqwest will add this automatically
- request_builder = request_builder.multipart(multipart_form);
- } else if request_body.contains_key("text") {
- let body = get_str_h(&request_body, "text");
- request_builder = request_builder.body(body.to_owned());
- } else {
- warn!("Unsupported body type: {}", body_type);
- }
- } else {
- // No body set
- let method = request.method.to_ascii_lowercase();
- let is_body_method = method == "post" || method == "put" || method == "patch";
- // Add Content-Length for methods that commonly accept a body because some servers
- // will error if they don't receive it.
- if is_body_method && !headers.contains_key("content-length") {
- headers.insert("Content-Length", HeaderValue::from_static("0"));
- }
- }
-
- // Add headers last, because previous steps may modify them
- request_builder = request_builder.headers(headers);
-
- let mut sendable_req = match request_builder.build() {
- Ok(r) => r,
- Err(e) => {
- warn!("Failed to build request builder {e:?}");
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- ));
- }
- };
-
- match request.authentication_type {
- None => {
- // No authentication found. Not even inherited
- }
- Some(authentication_type) if authentication_type == "none" => {
- // Explicitly no authentication
- }
- Some(authentication_type) => {
- let req = CallHttpAuthenticationRequest {
- context_id: format!("{:x}", md5::compute(auth_context_id)),
- values: serde_json::from_value(serde_json::to_value(&request.authentication)?)?,
- url: sendable_req.url().to_string(),
- method: sendable_req.method().to_string(),
- headers: sendable_req
- .headers()
- .iter()
- .map(|(name, value)| HttpHeader {
- name: name.to_string(),
- value: value.to_str().unwrap_or_default().to_string(),
+ // Persist cookies back to the database after the request completes
+ if let Some((cookie_store, mut cj)) = maybe_cookie_manager {
+ match cookie_store.lock() {
+ Ok(store) => {
+ let cookies: Vec = store
+ .iter_any()
+ .filter_map(|c| {
+ // Convert cookie_store::Cookie -> yaak_models::Cookie via serde
+ let json_cookie = serde_json::to_value(c).ok()?;
+ serde_json::from_value(json_cookie).ok()
})
- .collect(),
- };
- let auth_result = plugin_manager
- .call_http_authentication(&window, &authentication_type, req, plugin_context)
- .await;
- let plugin_result = match auth_result {
- Ok(r) => r,
- Err(e) => {
- return Ok(response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- ));
+ .collect();
+ cj.cookies = cookies;
+ if let Err(e) = window.db().upsert_cookie_jar(&cj, &UpdateSource::Background) {
+ warn!("Failed to persist cookies to database: {}", e);
}
- };
-
- let headers = sendable_req.headers_mut();
- for header in plugin_result.set_headers.unwrap_or_default() {
- match (HeaderName::from_str(&header.name), HeaderValue::from_str(&header.value)) {
- (Ok(name), Ok(value)) => {
- headers.insert(name, value);
- }
- _ => continue,
- };
}
-
- if let Some(params) = plugin_result.set_query_parameters {
- let mut query_pairs = sendable_req.url_mut().query_pairs_mut();
- for p in params {
- query_pairs.append_pair(&p.name, &p.value);
- }
+ Err(e) => {
+ warn!("Failed to lock cookie store: {}", e);
}
}
}
- let (resp_tx, resp_rx) = oneshot::channel::>();
- let (done_tx, done_rx) = oneshot::channel::();
-
- let start = std::time::Instant::now();
-
- tokio::spawn(async move {
- let _ = resp_tx.send(client.execute(sendable_req).await);
- });
-
- let raw_response = tokio::select! {
- Ok(r) = resp_rx => r,
- _ = cancelled_rx.changed() => {
- let mut r = response.lock().await;
- r.elapsed_headers = start.elapsed().as_millis() as i32;
- r.elapsed = start.elapsed().as_millis() as i32;
- return Ok(response_err(&app_handle, &r, "Request was cancelled".to_string(), &update_source));
- }
- };
-
- {
- let app_handle = app_handle.clone();
- let window = window.clone();
- let cancelled_rx = cancelled_rx.clone();
- let response_id = response_id.clone();
- let response = response.clone();
- let update_source = update_source.clone();
- tokio::spawn(async move {
- match raw_response {
- Ok(mut v) => {
- let content_length = v.content_length();
- let response_headers = v.headers().clone();
- let dir = app_handle.path().app_data_dir().unwrap();
- let base_dir = dir.join("responses");
- create_dir_all(base_dir.clone()).await.expect("Failed to create responses dir");
- let body_path = if response_id.is_empty() {
- base_dir.join(uuid::Uuid::new_v4().to_string())
- } else {
- base_dir.join(response_id.clone())
- };
-
- {
- let mut r = response.lock().await;
- r.body_path = Some(body_path.to_str().unwrap().to_string());
- r.elapsed_headers = start.elapsed().as_millis() as i32;
- r.elapsed = start.elapsed().as_millis() as i32;
- r.status = v.status().as_u16() as i32;
- r.status_reason = v.status().canonical_reason().map(|s| s.to_string());
- r.headers = response_headers
- .iter()
- .map(|(k, v)| HttpResponseHeader {
- name: k.as_str().to_string(),
- value: v.to_str().unwrap_or_default().to_string(),
- })
- .collect();
- r.url = v.url().to_string();
- r.remote_addr = v.remote_addr().map(|a| a.to_string());
- r.version = match v.version() {
- reqwest::Version::HTTP_09 => Some("HTTP/0.9".to_string()),
- reqwest::Version::HTTP_10 => Some("HTTP/1.0".to_string()),
- reqwest::Version::HTTP_11 => Some("HTTP/1.1".to_string()),
- reqwest::Version::HTTP_2 => Some("HTTP/2".to_string()),
- reqwest::Version::HTTP_3 => Some("HTTP/3".to_string()),
- _ => None,
- };
-
- r.state = HttpResponseState::Connected;
- app_handle
- .db()
- .update_http_response_if_id(&r, &update_source)
- .expect("Failed to update response after connected");
- }
-
- // Write body to FS
- let mut f = File::options()
- .create(true)
- .truncate(true)
- .write(true)
- .open(&body_path)
- .await
- .expect("Failed to open file");
-
- let mut written_bytes: usize = 0;
- loop {
- let chunk = v.chunk().await;
- if *cancelled_rx.borrow() {
- // Request was canceled
- return;
- }
- match chunk {
- Ok(Some(bytes)) => {
- let mut r = response.lock().await;
- r.elapsed = start.elapsed().as_millis() as i32;
- f.write_all(&bytes).await.expect("Failed to write to file");
- f.flush().await.expect("Failed to flush file");
- written_bytes += bytes.len();
- r.content_length = Some(written_bytes as i32);
- app_handle
- .db()
- .update_http_response_if_id(&r, &update_source)
- .expect("Failed to update response");
- }
- Ok(None) => {
- break;
- }
- Err(e) => {
- response_err(
- &app_handle,
- &*response.lock().await,
- e.to_string(),
- &update_source,
- );
- break;
- }
- }
- }
-
- // Set the final content length
- {
- let mut r = response.lock().await;
- r.content_length = match content_length {
- Some(l) => Some(l as i32),
- None => Some(written_bytes as i32),
- };
- r.state = HttpResponseState::Closed;
- app_handle
- .db()
- .update_http_response_if_id(&r, &UpdateSource::from_window(&window))
- .expect("Failed to update response");
- };
-
- // Add cookie store if specified
- if let Some((cookie_store, mut cookie_jar)) = maybe_cookie_manager {
- // let cookies = response_headers.get_all(SET_COOKIE).iter().map(|h| {
- // println!("RESPONSE COOKIE: {}", h.to_str().unwrap());
- // cookie_store::RawCookie::from_str(h.to_str().unwrap())
- // .expect("Failed to parse cookie")
- // });
- // store.store_response_cookies(cookies, &url);
-
- let json_cookies: Vec = cookie_store
- .lock()
- .unwrap()
- .iter_any()
- .map(|c| {
- let json_cookie =
- serde_json::to_value(&c).expect("Failed to serialize cookie");
- serde_json::from_value(json_cookie)
- .expect("Failed to deserialize cookie")
- })
- .collect::>();
- cookie_jar.cookies = json_cookies;
- if let Err(e) = app_handle
- .db()
- .upsert_cookie_jar(&cookie_jar, &UpdateSource::from_window(&window))
- {
- error!("Failed to update cookie jar: {}", e);
- };
- }
- }
- Err(e) => {
- warn!("Failed to execute request {e}");
- response_err(
- &app_handle,
- &*response.lock().await,
- format!("{e} → {e:?}"),
- &update_source,
- );
- }
- };
-
- let r = response.lock().await.clone();
- done_tx.send(r).unwrap();
- });
- };
-
- let app_handle = app_handle.clone();
- Ok(tokio::select! {
- Ok(r) = done_rx => r,
- _ = cancelled_rx.changed() => {
- match app_handle.with_db(|c| c.get_http_response(&response_id)) {
- Ok(mut r) => {
- r.state = HttpResponseState::Closed;
- r.elapsed = start.elapsed().as_millis() as i32;
- r.elapsed_headers = start.elapsed().as_millis() as i32;
- app_handle.db().update_http_response_if_id(&r, &UpdateSource::from_window(window))
- .expect("Failed to update response")
- },
- _ => {
- response_err(&app_handle, &*response.lock().await, "Ephemeral request was cancelled".to_string(), &update_source)
- }.clone(),
- }
- }
- })
+ final_result
}
pub fn resolve_http_request(
@@ -711,46 +327,365 @@ pub fn resolve_http_request(
Ok((new_request, authentication_context_id))
}
-fn ensure_proto(url_str: &str) -> String {
- if url_str.starts_with("http://") || url_str.starts_with("https://") {
- return url_str.to_string();
- }
+async fn execute_transaction(
+ client: reqwest::Client,
+ mut sendable_request: SendableHttpRequest,
+ response_ctx: &mut ResponseContext,
+ mut cancelled_rx: Receiver,
+) -> Result<(HttpResponse, Option>>)> {
+ let app_handle = &response_ctx.app_handle.clone();
+ let response_id = response_ctx.response().id.clone();
+ let workspace_id = response_ctx.response().workspace_id.clone();
+ let is_persisted = response_ctx.is_persisted();
- // Url::from_str will fail without a proto, so add one
- let parseable_url = format!("http://{}", url_str);
- if let Ok(u) = Url::from_str(parseable_url.as_str()) {
- match u.host() {
- Some(host) => {
- let h = host.to_string();
- // These TLDs force HTTPS
- if h.ends_with(".app") || h.ends_with(".dev") || h.ends_with(".page") {
- return format!("https://{url_str}");
+ let sender = ReqwestSender::with_client(client);
+ let transaction = HttpTransaction::new(sender);
+ let start = Instant::now();
+
+ // Capture request headers before sending
+ let request_headers: Vec = sendable_request
+ .headers
+ .iter()
+ .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
+ .collect();
+
+ // Update response with headers info
+ response_ctx.update(|r| {
+ r.url = sendable_request.url.clone();
+ r.request_headers = request_headers;
+ })?;
+
+ // Create bounded channel for receiving events and spawn a task to store them in DB
+ // Buffer size of 100 events provides backpressure if DB writes are slow
+ let (event_tx, mut event_rx) =
+ tokio::sync::mpsc::channel::(100);
+
+ // Write events to DB in a task (only for persisted responses)
+ if is_persisted {
+ let response_id = response_id.clone();
+ let app_handle = app_handle.clone();
+ let update_source = response_ctx.update_source.clone();
+ let workspace_id = workspace_id.clone();
+ tokio::spawn(async move {
+ while let Some(event) = event_rx.recv().await {
+ let db_event = HttpResponseEvent::new(&response_id, &workspace_id, event.into());
+ let _ = app_handle.db().upsert_http_response_event(&db_event, &update_source);
+ }
+ });
+ } else {
+ // For ephemeral responses, just drain the events
+ tokio::spawn(async move { while event_rx.recv().await.is_some() {} });
+ };
+
+ // Capture request body as it's sent (only for persisted responses)
+ let body_id = format!("{}.request", response_id);
+ let maybe_blob_write_handle = match sendable_request.body {
+ Some(SendableBody::Bytes(bytes)) => {
+ if is_persisted {
+ write_bytes_to_db_sync(response_ctx, &body_id, bytes.clone())?;
+ }
+ sendable_request.body = Some(SendableBody::Bytes(bytes));
+ None
+ }
+ Some(SendableBody::Stream(stream)) => {
+ // Wrap stream with TeeReader to capture data as it's read
+ // Use unbounded channel to ensure all data is captured without blocking the HTTP request
+ let (body_chunk_tx, body_chunk_rx) = tokio::sync::mpsc::unbounded_channel::>();
+ let tee_reader = TeeReader::new(stream, body_chunk_tx);
+ let pinned: Pin> = Box::pin(tee_reader);
+
+ let handle = if is_persisted {
+ // Spawn task to write request body chunks to blob DB
+ let app_handle = app_handle.clone();
+ let response_id = response_id.clone();
+ let workspace_id = workspace_id.clone();
+ let body_id = body_id.clone();
+ let update_source = response_ctx.update_source.clone();
+ Some(tauri::async_runtime::spawn(async move {
+ write_stream_chunks_to_db(
+ app_handle,
+ &body_id,
+ &workspace_id,
+ &response_id,
+ &update_source,
+ body_chunk_rx,
+ )
+ .await
+ }))
+ } else {
+ // For ephemeral responses, just drain the body chunks
+ tauri::async_runtime::spawn(async move {
+ let mut rx = body_chunk_rx;
+ while rx.recv().await.is_some() {}
+ });
+ None
+ };
+
+ sendable_request.body = Some(SendableBody::Stream(pinned));
+ handle
+ }
+ None => {
+ sendable_request.body = None;
+ None
+ }
+ };
+
+ // Execute the transaction with cancellation support
+ // This returns the response with headers, but body is not yet consumed
+ // Events (headers, settings, chunks) are sent through the channel
+ let mut http_response = transaction
+ .execute_with_cancellation(sendable_request, cancelled_rx.clone(), event_tx)
+ .await?;
+
+ // Prepare the response path before consuming the body
+ let body_path = if response_id.is_empty() {
+ // Ephemeral responses: use OS temp directory for automatic cleanup
+ let temp_dir = std::env::temp_dir().join("yaak-ephemeral-responses");
+ create_dir_all(&temp_dir).await?;
+ temp_dir.join(uuid::Uuid::new_v4().to_string())
+ } else {
+ // Persisted responses: use app data directory
+ let dir = app_handle.path().app_data_dir()?;
+ let base_dir = dir.join("responses");
+ create_dir_all(&base_dir).await?;
+ base_dir.join(&response_id)
+ };
+
+ // Extract metadata before consuming the body (headers are available immediately)
+ // Url might change, so update again
+ response_ctx.update(|r| {
+ r.body_path = Some(body_path.to_string_lossy().to_string());
+ r.elapsed_headers = start.elapsed().as_millis() as i32;
+ r.status = http_response.status as i32;
+ r.status_reason = http_response.status_reason.clone();
+ r.url = http_response.url.clone();
+ r.remote_addr = http_response.remote_addr.clone();
+ r.version = http_response.version.clone();
+ r.headers = http_response
+ .headers
+ .iter()
+ .map(|(name, value)| HttpResponseHeader { name: name.clone(), value: value.clone() })
+ .collect();
+ r.content_length = http_response.content_length.map(|l| l as i64);
+ r.state = HttpResponseState::Connected;
+ r.request_headers = http_response
+ .request_headers
+ .iter()
+ .map(|(n, v)| HttpResponseHeader { name: n.clone(), value: v.clone() })
+ .collect();
+ })?;
+
+ // Get the body stream for manual consumption
+ let mut body_stream = http_response.into_body_stream()?;
+
+ // Open file for writing
+ let mut file = File::options()
+ .create(true)
+ .truncate(true)
+ .write(true)
+ .open(&body_path)
+ .await
+ .map_err(|e| GenericError(format!("Failed to open file: {}", e)))?;
+
+ // Stream body to file, with throttled DB updates to avoid excessive writes
+ let mut written_bytes: usize = 0;
+ let mut last_update_time = start;
+ let mut buf = [0u8; 8192];
+
+ // Throttle settings: update DB at most every 100ms
+ const UPDATE_INTERVAL_MS: u128 = 100;
+
+ loop {
+ // Check for cancellation. If we already have headers/body, just close cleanly without error
+ if *cancelled_rx.borrow() {
+ break;
+ }
+
+ // Use select! to race between reading and cancellation, so cancellation is immediate
+ let read_result = tokio::select! {
+ biased;
+ _ = cancelled_rx.changed() => {
+ break;
+ }
+ result = body_stream.read(&mut buf) => result,
+ };
+
+ match read_result {
+ Ok(0) => break, // EOF
+ Ok(n) => {
+ file.write_all(&buf[..n])
+ .await
+ .map_err(|e| GenericError(format!("Failed to write to file: {}", e)))?;
+ file.flush()
+ .await
+ .map_err(|e| GenericError(format!("Failed to flush file: {}", e)))?;
+ written_bytes += n;
+
+ // Throttle DB updates: only update if enough time has passed
+ let now = Instant::now();
+ let elapsed_since_update = now.duration_since(last_update_time).as_millis();
+
+ if elapsed_since_update >= UPDATE_INTERVAL_MS {
+ response_ctx.update(|r| {
+ r.elapsed = start.elapsed().as_millis() as i32;
+ r.content_length = Some(written_bytes as i64);
+ })?;
+ last_update_time = now;
}
}
- None => {}
+ Err(e) => {
+ return Err(GenericError(format!("Failed to read response body: {}", e)));
+ }
}
}
- format!("http://{url_str}")
+ // Final update with closed state and accurate byte count
+ response_ctx.update(|r| {
+ r.elapsed = start.elapsed().as_millis() as i32;
+ r.content_length = Some(written_bytes as i64);
+ r.state = HttpResponseState::Closed;
+ })?;
+
+ Ok((response_ctx.response().clone(), maybe_blob_write_handle))
}
-fn get_bool(v: &Value, key: &str, fallback: bool) -> bool {
- match v.get(key) {
- None => fallback,
- Some(v) => v.as_bool().unwrap_or(fallback),
+fn write_bytes_to_db_sync(
+ response_ctx: &mut ResponseContext,
+ body_id: &str,
+ data: Bytes,
+) -> Result<()> {
+ if data.is_empty() {
+ return Ok(());
}
+
+ // Write in chunks if data is large
+ let mut offset = 0;
+ let mut chunk_index = 0;
+ while offset < data.len() {
+ let end = std::cmp::min(offset + REQUEST_BODY_CHUNK_SIZE, data.len());
+ let chunk_data = data.slice(offset..end).to_vec();
+ let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
+ response_ctx.app_handle.blobs().insert_chunk(&chunk)?;
+ offset = end;
+ chunk_index += 1;
+ }
+
+ // Update the response with the total request body size
+ response_ctx.update(|r| {
+ r.request_content_length = Some(data.len() as i64);
+ })?;
+
+ Ok(())
}
-fn get_str<'a>(v: &'a Value, key: &str) -> &'a str {
- match v.get(key) {
- None => "",
- Some(v) => v.as_str().unwrap_or_default(),
+async fn write_stream_chunks_to_db(
+ app_handle: AppHandle,
+ body_id: &str,
+ workspace_id: &str,
+ response_id: &str,
+ update_source: &UpdateSource,
+ mut rx: tokio::sync::mpsc::UnboundedReceiver>,
+) -> Result<()> {
+ let mut buffer = Vec::with_capacity(REQUEST_BODY_CHUNK_SIZE);
+ let mut chunk_index = 0;
+ let mut total_bytes: usize = 0;
+
+ while let Some(data) = rx.recv().await {
+ total_bytes += data.len();
+ buffer.extend_from_slice(&data);
+
+ // Flush when buffer reaches chunk size
+ while buffer.len() >= REQUEST_BODY_CHUNK_SIZE {
+ debug!("Writing chunk {chunk_index} to DB");
+ let chunk_data: Vec = buffer.drain(..REQUEST_BODY_CHUNK_SIZE).collect();
+ let chunk = BodyChunk::new(body_id, chunk_index, chunk_data);
+ app_handle.blobs().insert_chunk(&chunk)?;
+ app_handle.db().upsert_http_response_event(
+ &HttpResponseEvent::new(
+ response_id,
+ workspace_id,
+ yaak_http::sender::HttpResponseEvent::ChunkSent {
+ bytes: REQUEST_BODY_CHUNK_SIZE,
+ }
+ .into(),
+ ),
+ update_source,
+ )?;
+ chunk_index += 1;
+ }
}
+
+ // Flush remaining data
+ if !buffer.is_empty() {
+ let chunk = BodyChunk::new(body_id, chunk_index, buffer);
+ debug!("Flushing remaining data {chunk_index} {}", chunk.data.len());
+ app_handle.blobs().insert_chunk(&chunk)?;
+ app_handle.db().upsert_http_response_event(
+ &HttpResponseEvent::new(
+ response_id,
+ workspace_id,
+ yaak_http::sender::HttpResponseEvent::ChunkSent { bytes: chunk.data.len() }.into(),
+ ),
+ update_source,
+ )?;
+ }
+
+ // Update the response with the total request body size
+ app_handle.with_tx(|tx| {
+ debug!("Updating final body length {total_bytes}");
+ if let Ok(mut response) = tx.get_http_response(&response_id) {
+ response.request_content_length = Some(total_bytes as i64);
+ tx.update_http_response_if_id(&response, update_source)?;
+ }
+ Ok(())
+ })?;
+
+ Ok(())
}
-fn get_str_h<'a>(v: &'a BTreeMap, key: &str) -> &'a str {
- match v.get(key) {
- None => "",
- Some(v) => v.as_str().unwrap_or_default(),
+async fn apply_authentication(
+ window: &WebviewWindow,
+ sendable_request: &mut SendableHttpRequest,
+ request: &HttpRequest,
+ auth_context_id: String,
+ plugin_manager: &PluginManager,
+ plugin_context: &PluginContext,
+) -> Result<()> {
+ match &request.authentication_type {
+ None => {
+ // No authentication found. Not even inherited
+ }
+ Some(authentication_type) if authentication_type == "none" => {
+ // Explicitly no authentication
+ }
+ Some(authentication_type) => {
+ let req = CallHttpAuthenticationRequest {
+ context_id: format!("{:x}", md5::compute(auth_context_id)),
+ values: serde_json::from_value(serde_json::to_value(&request.authentication)?)?,
+ url: sendable_request.url.clone(),
+ method: sendable_request.method.clone(),
+ headers: sendable_request
+ .headers
+ .iter()
+ .map(|(name, value)| HttpHeader {
+ name: name.to_string(),
+ value: value.to_string(),
+ })
+ .collect(),
+ };
+ let plugin_result = plugin_manager
+ .call_http_authentication(&window, &authentication_type, req, plugin_context)
+ .await?;
+
+ for header in plugin_result.set_headers.unwrap_or_default() {
+ sendable_request.insert_header((header.name, header.value));
+ }
+
+ if let Some(params) = plugin_result.set_query_parameters {
+ let params = params.into_iter().map(|p| (p.name, p.value)).collect::>();
+ sendable_request.url = append_query_params(&sendable_request.url, params);
+ }
+ }
}
+ Ok(())
}
diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs
index 680758b3..89be63ae 100644
--- a/src-tauri/src/lib.rs
+++ b/src-tauri/src/lib.rs
@@ -32,10 +32,11 @@ use yaak_common::window::WorkspaceWindowTrait;
use yaak_grpc::manager::GrpcHandle;
use yaak_grpc::{Code, ServiceDefinition, serialize_message};
use yaak_mac_window::AppHandleMacWindowExt;
+use yaak_models::blob_manager::BlobManagerExt;
use yaak_models::models::{
AnyModel, CookieJar, Environment, GrpcConnection, GrpcConnectionState, GrpcEvent,
- GrpcEventType, GrpcRequest, HttpRequest, HttpResponse, HttpResponseState, Plugin, Workspace,
- WorkspaceMeta,
+ GrpcEventType, GrpcRequest, HttpRequest, HttpResponse, HttpResponseEvent, HttpResponseState,
+ Plugin, Workspace, WorkspaceMeta,
};
use yaak_models::query_manager::QueryManagerExt;
use yaak_models::util::{BatchUpsertResult, UpdateSource, get_workspace_export_resources};
@@ -785,7 +786,7 @@ async fn cmd_http_response_body(
) -> YaakResult {
let body_path = match response.body_path {
None => {
- return Err(GenericError("Response body path not set".to_string()));
+ return Ok(FilterResponse { content: String::new(), error: None });
}
Some(p) => p,
};
@@ -810,6 +811,23 @@ async fn cmd_http_response_body(
}
}
+#[tauri::command]
+async fn cmd_http_request_body(
+ app_handle: AppHandle,
+ response_id: &str,
+) -> YaakResult