diff --git a/Cargo.lock b/Cargo.lock index 78730d75..607505d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10595,9 +10595,17 @@ dependencies = [ name = "yaak-proxy-lib" version = "0.0.0" dependencies = [ + "chrono", + "include_dir", + "log 0.4.29", + "r2d2", + "r2d2_sqlite", + "rusqlite", + "sea-query", "serde", "serde_json", "ts-rs", + "yaak-database", "yaak-proxy", "yaak-rpc", ] diff --git a/crates-proxy/yaak-proxy-lib/Cargo.toml b/crates-proxy/yaak-proxy-lib/Cargo.toml index c1c8a762..a4492de7 100644 --- a/crates-proxy/yaak-proxy-lib/Cargo.toml +++ b/crates-proxy/yaak-proxy-lib/Cargo.toml @@ -6,8 +6,16 @@ authors = ["Gregory Schier"] publish = false [dependencies] +chrono = { workspace = true, features = ["serde"] } +log = { workspace = true } +include_dir = "0.7" +r2d2 = "0.8.10" +r2d2_sqlite = "0.25.0" +rusqlite = { version = "0.32.1", features = ["bundled", "chrono"] } +sea-query = { version = "0.32.1", features = ["with-chrono", "attr"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -ts-rs = { workspace = true } +ts-rs = { workspace = true, features = ["chrono-impl"] } +yaak-database = { workspace = true } yaak-proxy = { workspace = true } yaak-rpc = { workspace = true } diff --git a/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts b/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts new file mode 100644 index 00000000..458536c7 --- /dev/null +++ b/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts @@ -0,0 +1,5 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ProxyEntry = { id: string, createdAt: string, updatedAt: string, url: string, method: string, reqHeaders: Array, reqBody: Array | null, resStatus: number | null, resHeaders: Array, resBody: Array | null, error: string | null, }; + +export type ProxyHeader = { name: string, value: string, }; diff --git a/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql b/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql new file mode 100644 index 00000000..7b2b5a42 --- /dev/null +++ b/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql @@ -0,0 +1,14 @@ +CREATE TABLE proxy_entries +( + id TEXT NOT NULL PRIMARY KEY, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + url TEXT NOT NULL DEFAULT '', + method TEXT NOT NULL DEFAULT '', + req_headers TEXT NOT NULL DEFAULT '[]', + req_body BLOB, + res_status INTEGER, + res_headers TEXT NOT NULL DEFAULT '[]', + res_body BLOB, + error TEXT +); diff --git a/crates-proxy/yaak-proxy-lib/src/db.rs b/crates-proxy/yaak-proxy-lib/src/db.rs new file mode 100644 index 00000000..d0332738 --- /dev/null +++ b/crates-proxy/yaak-proxy-lib/src/db.rs @@ -0,0 +1,33 @@ +use include_dir::{Dir, include_dir}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use std::path::Path; +use yaak_database::{ConnectionOrTx, DbContext, run_migrations}; + +static MIGRATIONS: Dir<'static> = include_dir!("$CARGO_MANIFEST_DIR/migrations"); + +#[derive(Clone)] +pub struct ProxyQueryManager { + pool: Pool, +} + +impl ProxyQueryManager { + pub fn new(db_path: &Path) -> Self { + let manager = SqliteConnectionManager::file(db_path); + let pool = Pool::builder() + .max_size(5) + .build(manager) + .expect("Failed to create proxy DB pool"); + run_migrations(&pool, &MIGRATIONS).expect("Failed to run proxy DB migrations"); + Self { pool } + } + + pub fn with_conn(&self, func: F) -> T + where + F: FnOnce(&DbContext) -> T, + { + let conn = self.pool.get().expect("Failed to get proxy DB connection"); + let ctx = DbContext::new(ConnectionOrTx::Connection(conn)); + func(&ctx) + } +} diff --git a/crates-proxy/yaak-proxy-lib/src/lib.rs b/crates-proxy/yaak-proxy-lib/src/lib.rs index a218db85..fd3d92e0 100644 --- a/crates-proxy/yaak-proxy-lib/src/lib.rs +++ b/crates-proxy/yaak-proxy-lib/src/lib.rs @@ -1,19 +1,30 @@ -use serde::{Deserialize, Serialize}; -use std::sync::Mutex; -use ts_rs::TS; -use yaak_proxy::ProxyHandle; -use yaak_rpc::{RpcError, define_rpc}; +pub mod db; +pub mod models; -// -- Context shared across all RPC handlers -- +use std::collections::HashMap; +use std::path::Path; +use std::sync::Mutex; +use log::warn; +use serde::{Deserialize, Serialize}; +use ts_rs::TS; +use yaak_database::UpdateSource; +use yaak_proxy::{CapturedRequest, ProxyEvent, ProxyHandle, RequestState}; +use yaak_rpc::{RpcError, define_rpc}; +use crate::db::ProxyQueryManager; +use crate::models::{ProxyEntry, ProxyHeader}; + +// -- Context -- pub struct ProxyCtx { handle: Mutex>, + pub db: ProxyQueryManager, } impl ProxyCtx { - pub fn new() -> Self { + pub fn new(db_path: &Path) -> Self { Self { handle: Mutex::new(None), + db: ProxyQueryManager::new(db_path), } } } @@ -47,21 +58,21 @@ fn proxy_start(ctx: &ProxyCtx, req: ProxyStartRequest) -> Result Result { @@ -72,6 +83,100 @@ fn proxy_stop(ctx: &ProxyCtx, _req: ProxyStopRequest) -> Result Ok(handle.take().is_some()) } +// -- Event loop -- + +fn run_event_loop(rx: std::sync::mpsc::Receiver, db: ProxyQueryManager) { + let mut in_flight: HashMap = HashMap::new(); + + while let Ok(event) = rx.recv() { + match event { + ProxyEvent::RequestStart { id, method, url, http_version } => { + in_flight.insert(id, CapturedRequest { + id, + method, + url, + http_version, + status: None, + elapsed_ms: None, + remote_http_version: None, + request_headers: vec![], + request_body: None, + response_headers: vec![], + response_body: None, + response_body_size: 0, + state: RequestState::Sending, + error: None, + }); + } + ProxyEvent::RequestHeader { id, name, value } => { + if let Some(r) = in_flight.get_mut(&id) { + r.request_headers.push((name, value)); + } + } + ProxyEvent::RequestBody { id, body } => { + if let Some(r) = in_flight.get_mut(&id) { + r.request_body = Some(body); + } + } + ProxyEvent::ResponseStart { id, status, http_version, elapsed_ms } => { + if let Some(r) = in_flight.get_mut(&id) { + r.status = Some(status); + r.remote_http_version = Some(http_version); + r.elapsed_ms = Some(elapsed_ms); + r.state = RequestState::Receiving; + } + } + ProxyEvent::ResponseHeader { id, name, value } => { + if let Some(r) = in_flight.get_mut(&id) { + r.response_headers.push((name, value)); + } + } + ProxyEvent::ResponseBodyChunk { .. } => { + // Progress only — no action needed + } + ProxyEvent::ResponseBodyComplete { id, body, size, elapsed_ms } => { + if let Some(mut r) = in_flight.remove(&id) { + r.response_body = body; + r.response_body_size = size; + r.elapsed_ms = r.elapsed_ms.or(Some(elapsed_ms)); + r.state = RequestState::Complete; + write_entry(&db, &r); + } + } + ProxyEvent::Error { id, error } => { + if let Some(mut r) = in_flight.remove(&id) { + r.error = Some(error); + r.state = RequestState::Error; + write_entry(&db, &r); + } + } + } + } +} + +fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) { + let entry = ProxyEntry { + url: r.url.clone(), + method: r.method.clone(), + req_headers: r.request_headers.iter() + .map(|(n, v)| ProxyHeader { name: n.clone(), value: v.clone() }) + .collect(), + req_body: r.request_body.clone(), + res_status: r.status.map(|s| s as i32), + res_headers: r.response_headers.iter() + .map(|(n, v)| ProxyHeader { name: n.clone(), value: v.clone() }) + .collect(), + res_body: r.response_body.clone(), + error: r.error.clone(), + ..Default::default() + }; + db.with_conn(|ctx| { + if let Err(e) = ctx.upsert(&entry, &UpdateSource::Background) { + warn!("Failed to write proxy entry: {e}"); + } + }); +} + // -- Router + Schema -- define_rpc! { diff --git a/crates-proxy/yaak-proxy-lib/src/models.rs b/crates-proxy/yaak-proxy-lib/src/models.rs new file mode 100644 index 00000000..09f1034b --- /dev/null +++ b/crates-proxy/yaak-proxy-lib/src/models.rs @@ -0,0 +1,108 @@ +use chrono::NaiveDateTime; +use rusqlite::Row; +use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr, enum_def}; +use serde::{Deserialize, Serialize}; +use ts_rs::TS; +use yaak_database::{Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date}; + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "gen_models.ts")] +pub struct ProxyHeader { + pub name: String, + pub value: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export, export_to = "gen_models.ts")] +#[enum_def(table_name = "proxy_entries")] +pub struct ProxyEntry { + pub id: String, + pub created_at: NaiveDateTime, + pub updated_at: NaiveDateTime, + pub url: String, + pub method: String, + pub req_headers: Vec, + pub req_body: Option>, + pub res_status: Option, + pub res_headers: Vec, + pub res_body: Option>, + pub error: Option, +} + +impl UpsertModelInfo for ProxyEntry { + fn table_name() -> impl IntoTableRef + IntoIden { + ProxyEntryIden::Table + } + + fn id_column() -> impl IntoIden + Eq + Clone { + ProxyEntryIden::Id + } + + fn generate_id() -> String { + generate_prefixed_id("pe") + } + + fn order_by() -> (impl IntoColumnRef, Order) { + (ProxyEntryIden::CreatedAt, Order::Desc) + } + + fn get_id(&self) -> String { + self.id.clone() + } + + fn insert_values( + self, + source: &UpdateSource, + ) -> DbResult)>> { + use ProxyEntryIden::*; + Ok(vec![ + (CreatedAt, upsert_date(source, self.created_at)), + (UpdatedAt, upsert_date(source, self.updated_at)), + (Url, self.url.into()), + (Method, self.method.into()), + (ReqHeaders, serde_json::to_string(&self.req_headers)?.into()), + (ReqBody, self.req_body.into()), + (ResStatus, self.res_status.into()), + (ResHeaders, serde_json::to_string(&self.res_headers)?.into()), + (ResBody, self.res_body.into()), + (Error, self.error.into()), + ]) + } + + fn update_columns() -> Vec { + vec![ + ProxyEntryIden::UpdatedAt, + ProxyEntryIden::Url, + ProxyEntryIden::Method, + ProxyEntryIden::ReqHeaders, + ProxyEntryIden::ReqBody, + ProxyEntryIden::ResStatus, + ProxyEntryIden::ResHeaders, + ProxyEntryIden::ResBody, + ProxyEntryIden::Error, + ] + } + + fn from_row(r: &Row) -> rusqlite::Result + where + Self: Sized, + { + let req_headers: String = r.get("req_headers")?; + let res_headers: String = r.get("res_headers")?; + Ok(Self { + id: r.get("id")?, + created_at: r.get("created_at")?, + updated_at: r.get("updated_at")?, + url: r.get("url")?, + method: r.get("method")?, + req_headers: serde_json::from_str(&req_headers).unwrap_or_default(), + req_body: r.get("req_body")?, + res_status: r.get("res_status")?, + res_headers: serde_json::from_str(&res_headers).unwrap_or_default(), + res_body: r.get("res_body")?, + error: r.get("error")?, + }) + } +} diff --git a/crates-tauri/yaak-app-proxy/src/lib.rs b/crates-tauri/yaak-app-proxy/src/lib.rs index d0b6e3a5..e536bd21 100644 --- a/crates-tauri/yaak-app-proxy/src/lib.rs +++ b/crates-tauri/yaak-app-proxy/src/lib.rs @@ -1,5 +1,5 @@ use log::error; -use tauri::{RunEvent, State}; +use tauri::{Manager, RunEvent, State}; use yaak_proxy_lib::ProxyCtx; use yaak_rpc::RpcRouter; use yaak_window::window::CreateWindowConfig; @@ -17,8 +17,13 @@ fn rpc( pub fn run() { tauri::Builder::default() .plugin(tauri_plugin_os::init()) - .manage(ProxyCtx::new()) - .manage(yaak_proxy_lib::build_router()) + .setup(|app| { + let data_dir = app.path().app_data_dir().expect("no app data dir"); + std::fs::create_dir_all(&data_dir).expect("failed to create app data dir"); + app.manage(ProxyCtx::new(&data_dir.join("proxy.db"))); + app.manage(yaak_proxy_lib::build_router()); + Ok(()) + }) .invoke_handler(tauri::generate_handler![rpc]) .build(tauri::generate_context!()) .expect("error while building yaak proxy tauri application") diff --git a/crates/yaak-proxy/src/lib.rs b/crates/yaak-proxy/src/lib.rs index b16d8043..80f2b82a 100644 --- a/crates/yaak-proxy/src/lib.rs +++ b/crates/yaak-proxy/src/lib.rs @@ -82,11 +82,18 @@ pub enum RequestState { pub struct ProxyHandle { shutdown_tx: Option>, thread_handle: Option>, - pub event_rx: std_mpsc::Receiver, + event_rx: Option>, pub port: u16, pub ca_pem: String, } +impl ProxyHandle { + /// Take the event receiver. Can only be called once — returns `None` after the first call. + pub fn take_event_rx(&mut self) -> Option> { + self.event_rx.take() + } +} + impl Drop for ProxyHandle { fn drop(&mut self) { if let Some(tx) = self.shutdown_tx.take() { @@ -158,7 +165,7 @@ pub fn start_proxy(port: u16) -> Result { Ok(Ok(bound_port)) => Ok(ProxyHandle { shutdown_tx: Some(shutdown_tx), thread_handle: Some(thread_handle), - event_rx, + event_rx: Some(event_rx), port: bound_port, ca_pem, }),