From 0a616eb5e2ac37b2ac7aaf75dd22d9ff088bafe6 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sun, 8 Mar 2026 15:18:31 -0700 Subject: [PATCH] Got models and event system working --- Cargo.lock | 1 + apps/yaak-proxy/rpc.ts | 18 ++++- .../yaak-proxy-lib/bindings/gen_models.ts | 4 + .../yaak-proxy-lib/bindings/gen_rpc.ts | 3 + .../migrations/00000001_init.sql | 2 +- crates-proxy/yaak-proxy-lib/src/lib.rs | 42 ++++++---- crates-proxy/yaak-proxy-lib/src/models.rs | 44 ++++++----- crates-tauri/yaak-app-proxy/src/lib.rs | 19 ++++- crates/common/yaak-database/Cargo.toml | 1 + crates/common/yaak-database/src/lib.rs | 2 +- .../common/yaak-database/src/update_source.rs | 8 ++ crates/common/yaak-rpc/src/lib.rs | 79 +++++++++++++++++-- crates/yaak-models/src/util.rs | 10 +-- 13 files changed, 181 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 607505d1..a85ee184 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10400,6 +10400,7 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.17", + "ts-rs", ] [[package]] diff --git a/apps/yaak-proxy/rpc.ts b/apps/yaak-proxy/rpc.ts index 0597ce44..17b1aac9 100644 --- a/apps/yaak-proxy/rpc.ts +++ b/apps/yaak-proxy/rpc.ts @@ -1,5 +1,9 @@ import { invoke } from "@tauri-apps/api/core"; -import type { RpcSchema } from "../../crates-proxy/yaak-proxy-lib/bindings/gen_rpc"; +import { listen as tauriListen } from "@tauri-apps/api/event"; +import type { + RpcEventSchema, + RpcSchema, +} from "../../crates-proxy/yaak-proxy-lib/bindings/gen_rpc"; type Req = RpcSchema[K][0]; type Res = RpcSchema[K][1]; @@ -10,3 +14,15 @@ export async function rpc( ): Promise> { return invoke("rpc", { cmd, payload }) as Promise>; } + +/** Subscribe to a backend event. Returns an unsubscribe function. */ +export function listen( + event: K & string, + callback: (payload: RpcEventSchema[K]) => void, +): () => void { + let unsub: (() => void) | null = null; + tauriListen(event, (e) => callback(e.payload)) + .then((fn) => (unsub = fn)) + .catch(console.error); + return () => unsub?.(); +} diff --git a/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts b/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts index 458536c7..a1e4941f 100644 --- a/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts +++ b/crates-proxy/yaak-proxy-lib/bindings/gen_models.ts @@ -1,5 +1,9 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +export type ModelChangeEvent = { "type": "upsert" } | { "type": "delete" }; + +export type ModelPayload = { model: ProxyEntry, change: ModelChangeEvent, }; + export type ProxyEntry = { id: string, createdAt: string, updatedAt: string, url: string, method: string, reqHeaders: Array, reqBody: Array | null, resStatus: number | null, resHeaders: Array, resBody: Array | null, error: string | null, }; export type ProxyHeader = { name: string, value: string, }; diff --git a/crates-proxy/yaak-proxy-lib/bindings/gen_rpc.ts b/crates-proxy/yaak-proxy-lib/bindings/gen_rpc.ts index 37e74a3a..bc133b2e 100644 --- a/crates-proxy/yaak-proxy-lib/bindings/gen_rpc.ts +++ b/crates-proxy/yaak-proxy-lib/bindings/gen_rpc.ts @@ -1,4 +1,5 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ModelPayload } from "./gen_models"; export type ProxyStartRequest = { port: number | null, }; @@ -6,4 +7,6 @@ export type ProxyStartResponse = { port: number, alreadyRunning: boolean, }; export type ProxyStopRequest = Record; +export type RpcEventSchema = { model_write: ModelPayload, }; + export type RpcSchema = { proxy_start: [ProxyStartRequest, ProxyStartResponse], proxy_stop: [ProxyStopRequest, boolean], }; diff --git a/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql b/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql index 7b2b5a42..2eaf0c53 100644 --- a/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql +++ b/crates-proxy/yaak-proxy-lib/migrations/00000001_init.sql @@ -1,4 +1,4 @@ -CREATE TABLE proxy_entries +CREATE TABLE http_exchanges ( id TEXT NOT NULL PRIMARY KEY, created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, diff --git a/crates-proxy/yaak-proxy-lib/src/lib.rs b/crates-proxy/yaak-proxy-lib/src/lib.rs index fd3d92e0..649b5f9b 100644 --- a/crates-proxy/yaak-proxy-lib/src/lib.rs +++ b/crates-proxy/yaak-proxy-lib/src/lib.rs @@ -7,24 +7,26 @@ use std::sync::Mutex; use log::warn; use serde::{Deserialize, Serialize}; use ts_rs::TS; -use yaak_database::UpdateSource; +use yaak_database::{ModelChangeEvent, UpdateSource}; use yaak_proxy::{CapturedRequest, ProxyEvent, ProxyHandle, RequestState}; -use yaak_rpc::{RpcError, define_rpc}; +use yaak_rpc::{RpcError, RpcEventEmitter, define_rpc}; use crate::db::ProxyQueryManager; -use crate::models::{ProxyEntry, ProxyHeader}; +use crate::models::{HttpExchange, ModelPayload, ProxyHeader}; // -- Context -- pub struct ProxyCtx { handle: Mutex>, pub db: ProxyQueryManager, + pub events: RpcEventEmitter, } impl ProxyCtx { - pub fn new(db_path: &Path) -> Self { + pub fn new(db_path: &Path, events: RpcEventEmitter) -> Self { Self { handle: Mutex::new(None), db: ProxyQueryManager::new(db_path), + events, } } } @@ -68,7 +70,8 @@ fn proxy_start(ctx: &ProxyCtx, req: ProxyStartRequest) -> Result Result // -- Event loop -- -fn run_event_loop(rx: std::sync::mpsc::Receiver, db: ProxyQueryManager) { +fn run_event_loop(rx: std::sync::mpsc::Receiver, db: ProxyQueryManager, events: RpcEventEmitter) { let mut in_flight: HashMap = HashMap::new(); while let Ok(event) = rx.recv() { @@ -140,22 +143,22 @@ fn run_event_loop(rx: std::sync::mpsc::Receiver, db: ProxyQueryManag r.response_body_size = size; r.elapsed_ms = r.elapsed_ms.or(Some(elapsed_ms)); r.state = RequestState::Complete; - write_entry(&db, &r); + write_entry(&db, &events, &r); } } ProxyEvent::Error { id, error } => { if let Some(mut r) = in_flight.remove(&id) { r.error = Some(error); r.state = RequestState::Error; - write_entry(&db, &r); + write_entry(&db, &events, &r); } } } } } -fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) { - let entry = ProxyEntry { +fn write_entry(db: &ProxyQueryManager, events: &RpcEventEmitter, r: &CapturedRequest) { + let entry = HttpExchange { url: r.url.clone(), method: r.method.clone(), req_headers: r.request_headers.iter() @@ -171,8 +174,14 @@ fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) { ..Default::default() }; db.with_conn(|ctx| { - if let Err(e) = ctx.upsert(&entry, &UpdateSource::Background) { - warn!("Failed to write proxy entry: {e}"); + match ctx.upsert(&entry, &UpdateSource::Background) { + Ok((saved, created)) => { + events.emit("model_write", &ModelPayload { + model: saved, + change: ModelChangeEvent::Upsert { created }, + }); + } + Err(e) => warn!("Failed to write proxy entry: {e}"), } }); } @@ -181,6 +190,11 @@ fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) { define_rpc! { ProxyCtx; - "proxy_start" => proxy_start(ProxyStartRequest) -> ProxyStartResponse, - "proxy_stop" => proxy_stop(ProxyStopRequest) -> bool, + commands { + proxy_start(ProxyStartRequest) -> ProxyStartResponse, + proxy_stop(ProxyStopRequest) -> bool, + } + events { + model_write(ModelPayload), + } } diff --git a/crates-proxy/yaak-proxy-lib/src/models.rs b/crates-proxy/yaak-proxy-lib/src/models.rs index 09f1034b..3c1d5624 100644 --- a/crates-proxy/yaak-proxy-lib/src/models.rs +++ b/crates-proxy/yaak-proxy-lib/src/models.rs @@ -3,7 +3,7 @@ use rusqlite::Row; use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr, enum_def}; use serde::{Deserialize, Serialize}; use ts_rs::TS; -use yaak_database::{Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date}; +use yaak_database::{ModelChangeEvent, Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date}; #[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] #[serde(rename_all = "camelCase")] @@ -16,8 +16,8 @@ pub struct ProxyHeader { #[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] #[serde(default, rename_all = "camelCase")] #[ts(export, export_to = "gen_models.ts")] -#[enum_def(table_name = "proxy_entries")] -pub struct ProxyEntry { +#[enum_def(table_name = "http_exchanges")] +pub struct HttpExchange { pub id: String, pub created_at: NaiveDateTime, pub updated_at: NaiveDateTime, @@ -31,21 +31,29 @@ pub struct ProxyEntry { pub error: Option, } -impl UpsertModelInfo for ProxyEntry { +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export, export_to = "gen_models.ts")] +pub struct ModelPayload { + pub model: HttpExchange, + pub change: ModelChangeEvent, +} + +impl UpsertModelInfo for HttpExchange { fn table_name() -> impl IntoTableRef + IntoIden { - ProxyEntryIden::Table + HttpExchangeIden::Table } fn id_column() -> impl IntoIden + Eq + Clone { - ProxyEntryIden::Id + HttpExchangeIden::Id } fn generate_id() -> String { - generate_prefixed_id("pe") + generate_prefixed_id("he") } fn order_by() -> (impl IntoColumnRef, Order) { - (ProxyEntryIden::CreatedAt, Order::Desc) + (HttpExchangeIden::CreatedAt, Order::Desc) } fn get_id(&self) -> String { @@ -56,7 +64,7 @@ impl UpsertModelInfo for ProxyEntry { self, source: &UpdateSource, ) -> DbResult)>> { - use ProxyEntryIden::*; + use HttpExchangeIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), (UpdatedAt, upsert_date(source, self.updated_at)), @@ -73,15 +81,15 @@ impl UpsertModelInfo for ProxyEntry { fn update_columns() -> Vec { vec![ - ProxyEntryIden::UpdatedAt, - ProxyEntryIden::Url, - ProxyEntryIden::Method, - ProxyEntryIden::ReqHeaders, - ProxyEntryIden::ReqBody, - ProxyEntryIden::ResStatus, - ProxyEntryIden::ResHeaders, - ProxyEntryIden::ResBody, - ProxyEntryIden::Error, + HttpExchangeIden::UpdatedAt, + HttpExchangeIden::Url, + HttpExchangeIden::Method, + HttpExchangeIden::ReqHeaders, + HttpExchangeIden::ReqBody, + HttpExchangeIden::ResStatus, + HttpExchangeIden::ResHeaders, + HttpExchangeIden::ResBody, + HttpExchangeIden::Error, ] } diff --git a/crates-tauri/yaak-app-proxy/src/lib.rs b/crates-tauri/yaak-app-proxy/src/lib.rs index e536bd21..1000d4e6 100644 --- a/crates-tauri/yaak-app-proxy/src/lib.rs +++ b/crates-tauri/yaak-app-proxy/src/lib.rs @@ -1,7 +1,7 @@ use log::error; -use tauri::{Manager, RunEvent, State}; +use tauri::{Emitter, Manager, RunEvent, State}; use yaak_proxy_lib::ProxyCtx; -use yaak_rpc::RpcRouter; +use yaak_rpc::{RpcEventEmitter, RpcRouter}; use yaak_window::window::CreateWindowConfig; #[tauri::command] @@ -20,8 +20,21 @@ pub fn run() { .setup(|app| { let data_dir = app.path().app_data_dir().expect("no app data dir"); std::fs::create_dir_all(&data_dir).expect("failed to create app data dir"); - app.manage(ProxyCtx::new(&data_dir.join("proxy.db"))); + + let (emitter, event_rx) = RpcEventEmitter::new(); + app.manage(ProxyCtx::new(&data_dir.join("proxy.db"), emitter)); app.manage(yaak_proxy_lib::build_router()); + + // Drain RPC events and forward as Tauri events + let app_handle = app.handle().clone(); + std::thread::spawn(move || { + for event in event_rx { + if let Err(e) = app_handle.emit(event.event, event.payload) { + error!("Failed to emit RPC event: {e}"); + } + } + }); + Ok(()) }) .invoke_handler(tauri::generate_handler![rpc]) diff --git a/crates/common/yaak-database/Cargo.toml b/crates/common/yaak-database/Cargo.toml index 35df46d1..f02c4fb2 100644 --- a/crates/common/yaak-database/Cargo.toml +++ b/crates/common/yaak-database/Cargo.toml @@ -17,3 +17,4 @@ sea-query-rusqlite = { version = "0.7.0", features = ["with-chrono"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } +ts-rs = { workspace = true } diff --git a/crates/common/yaak-database/src/lib.rs b/crates/common/yaak-database/src/lib.rs index c6b2643d..54929ea5 100644 --- a/crates/common/yaak-database/src/lib.rs +++ b/crates/common/yaak-database/src/lib.rs @@ -12,7 +12,7 @@ pub use db_context::DbContext; pub use error::{Error, Result}; pub use migrate::run_migrations; pub use traits::{UpsertModelInfo, upsert_date}; -pub use update_source::UpdateSource; +pub use update_source::{ModelChangeEvent, UpdateSource}; pub use util::{generate_id, generate_id_of_length, generate_prefixed_id}; // Re-export pool types that consumers will need diff --git a/crates/common/yaak-database/src/update_source.rs b/crates/common/yaak-database/src/update_source.rs index f448e04a..18afea4f 100644 --- a/crates/common/yaak-database/src/update_source.rs +++ b/crates/common/yaak-database/src/update_source.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use ts_rs::TS; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "snake_case", tag = "type")] @@ -15,3 +16,10 @@ impl UpdateSource { Self::Window { label: label.into() } } } + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum ModelChangeEvent { + Upsert { created: bool }, + Delete, +} diff --git a/crates/common/yaak-rpc/src/lib.rs b/crates/common/yaak-rpc/src/lib.rs index 60837c48..8895c610 100644 --- a/crates/common/yaak-rpc/src/lib.rs +++ b/crates/common/yaak-rpc/src/lib.rs @@ -1,5 +1,6 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::sync::mpsc; /// Type-erased handler function: takes context + JSON payload, returns JSON or error. type HandlerFn = Box Result + Send + Sync>; @@ -101,29 +102,97 @@ impl RpcRouter { } } -/// Define RPC commands with a single source of truth. +/// A named event carrying a JSON payload, emitted from backend to frontend. +#[derive(Debug, Clone, Serialize)] +pub struct RpcEvent { + pub event: &'static str, + pub payload: serde_json::Value, +} + +/// Channel-based event emitter. The backend calls `emit()`, the transport +/// adapter (Tauri, WebSocket, etc.) drains the receiver and delivers events. +#[derive(Clone)] +pub struct RpcEventEmitter { + tx: mpsc::Sender, +} + +impl RpcEventEmitter { + pub fn new() -> (Self, mpsc::Receiver) { + let (tx, rx) = mpsc::channel(); + (Self { tx }, rx) + } + + /// Emit a typed event. Serializes the payload to JSON. + pub fn emit(&self, event: &'static str, payload: &T) { + if let Ok(value) = serde_json::to_value(payload) { + let _ = self.tx.send(RpcEvent { event, payload: value }); + } + } +} + +/// Define RPC commands and events with a single source of truth. /// /// Generates: /// - `build_router()` — creates an `RpcRouter` with all handlers registered /// - `RpcSchema` — a struct with ts-rs derives for TypeScript type generation +/// - `RpcEventSchema` — (if events declared) a struct mapping event names to payload types +/// +/// The wire name for each command/event is derived from `stringify!($ident)`. /// /// # Example /// ```ignore /// define_rpc! { /// ProxyCtx; -/// "proxy_start" => proxy_start(ProxyStartRequest) -> ProxyStartResponse, -/// "proxy_stop" => proxy_stop(ProxyStopRequest) -> bool, +/// commands { +/// proxy_start(ProxyStartRequest) -> ProxyStartResponse, +/// proxy_stop(ProxyStopRequest) -> bool, +/// } +/// events { +/// model_write(ModelPayload), +/// } /// } /// ``` #[macro_export] macro_rules! define_rpc { + // With both commands and events ( $ctx:ty; - $( $name:literal => $handler:ident ( $req:ty ) -> $res:ty ),* $(,)? + commands { + $( $handler:ident ( $req:ty ) -> $res:ty ),* $(,)? + } + events { + $( $evt_ident:ident ( $evt_payload:ty ) ),* $(,)? + } ) => { pub fn build_router() -> $crate::RpcRouter<$ctx> { let mut router = $crate::RpcRouter::new(); - $( router.register($name, $crate::rpc_handler!($handler)); )* + $( router.register(stringify!($handler), $crate::rpc_handler!($handler)); )* + router + } + + #[derive(ts_rs::TS)] + #[ts(export, export_to = "gen_rpc.ts")] + pub struct RpcSchema { + $( pub $handler: ($req, $res), )* + } + + #[derive(ts_rs::TS)] + #[ts(export, export_to = "gen_rpc.ts")] + pub struct RpcEventSchema { + $( pub $evt_ident: $evt_payload, )* + } + }; + + // Commands only (no events) + ( + $ctx:ty; + commands { + $( $handler:ident ( $req:ty ) -> $res:ty ),* $(,)? + } + ) => { + pub fn build_router() -> $crate::RpcRouter<$ctx> { + let mut router = $crate::RpcRouter::new(); + $( router.register(stringify!($handler), $crate::rpc_handler!($handler)); )* router } diff --git a/crates/yaak-models/src/util.rs b/crates/yaak-models/src/util.rs index 4f28f38d..a05a4ff1 100644 --- a/crates/yaak-models/src/util.rs +++ b/crates/yaak-models/src/util.rs @@ -10,7 +10,7 @@ use std::collections::BTreeMap; use ts_rs::TS; use yaak_core::WorkspaceContext; -pub use yaak_database::{generate_id, generate_id_of_length, generate_prefixed_id}; +pub use yaak_database::{ModelChangeEvent, generate_id, generate_id_of_length, generate_prefixed_id}; #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[serde(rename_all = "camelCase")] @@ -21,14 +21,6 @@ pub struct ModelPayload { pub change: ModelChangeEvent, } -#[derive(Debug, Clone, Serialize, Deserialize, TS)] -#[serde(rename_all = "snake_case", tag = "type")] -#[ts(export, export_to = "gen_models.ts")] -pub enum ModelChangeEvent { - Upsert { created: bool }, - Delete, -} - #[derive(Debug, Clone, Serialize, Deserialize, TS)] #[serde(rename_all = "snake_case", tag = "type")] #[ts(export, export_to = "gen_models.ts")]