diff --git a/crates-tauri/yaak-app/src/models_ext.rs b/crates-tauri/yaak-app/src/models_ext.rs index 2163e07e..a9f7a8e3 100644 --- a/crates-tauri/yaak-app/src/models_ext.rs +++ b/crates-tauri/yaak-app/src/models_ext.rs @@ -3,6 +3,7 @@ //! This module provides the Tauri plugin initialization and extension traits //! that allow accessing QueryManager and BlobManager from Tauri's Manager types. +use chrono::Utc; use log::error; use std::time::Duration; use tauri::plugin::TauriPlugin; @@ -15,10 +16,74 @@ use yaak_models::models::{AnyModel, GraphQlIntrospection, GrpcEvent, Settings, W use yaak_models::query_manager::QueryManager; use yaak_models::util::UpdateSource; -const MODEL_CHANGES_RETENTION_DAYS: i64 = 30; +const MODEL_CHANGES_RETENTION_HOURS: i64 = 1; const MODEL_CHANGES_POLL_INTERVAL_MS: u64 = 250; const MODEL_CHANGES_POLL_BATCH_SIZE: usize = 200; +struct ModelChangeCursor { + created_at: String, + id: i64, +} + +impl ModelChangeCursor { + fn from_launch_time() -> Self { + Self { + created_at: Utc::now().naive_utc().format("%Y-%m-%d %H:%M:%S%.3f").to_string(), + id: 0, + } + } +} + +fn drain_model_changes_batch( + query_manager: &QueryManager, + app_handle: &tauri::AppHandle, + cursor: &mut ModelChangeCursor, +) -> bool { + let changes = match query_manager.connect().list_model_changes_since( + &cursor.created_at, + cursor.id, + MODEL_CHANGES_POLL_BATCH_SIZE, + ) { + Ok(changes) => changes, + Err(err) => { + error!("Failed to poll model_changes rows: {err:?}"); + return false; + } + }; + + if changes.is_empty() { + return false; + } + + let fetched_count = changes.len(); + for change in changes { + cursor.created_at = change.created_at; + cursor.id = change.id; + + // Local window-originated writes are forwarded immediately from the + // in-memory model event channel. + if matches!(change.payload.update_source, UpdateSource::Window { .. }) { + continue; + } + if let Err(err) = app_handle.emit("model_write", change.payload) { + error!("Failed to emit model_write event: {err:?}"); + } + } + + fetched_count == MODEL_CHANGES_POLL_BATCH_SIZE +} + +async fn run_model_change_poller( + query_manager: QueryManager, + app_handle: tauri::AppHandle, + mut cursor: ModelChangeCursor, +) { + loop { + while drain_model_changes_batch(&query_manager, &app_handle, &mut cursor) {} + tokio::time::sleep(Duration::from_millis(MODEL_CHANGES_POLL_INTERVAL_MS)).await; + } +} + /// Extension trait for accessing the QueryManager from Tauri Manager types. pub trait QueryManagerExt<'a, R> { fn db_manager(&'a self) -> State<'a, QueryManager>; @@ -255,7 +320,7 @@ pub fn init() -> TauriPlugin { let db_path = app_path.join("db.sqlite"); let blob_path = app_path.join("blobs.sqlite"); - let (query_manager, blob_manager, _rx) = + let (query_manager, blob_manager, rx) = match yaak_models::init_standalone(&db_path, &blob_path) { Ok(result) => result, Err(e) => { @@ -269,16 +334,12 @@ pub fn init() -> TauriPlugin { }; let db = query_manager.connect(); - if let Err(err) = db.prune_model_changes_older_than_days(MODEL_CHANGES_RETENTION_DAYS) { + if let Err(err) = db.prune_model_changes_older_than_hours(MODEL_CHANGES_RETENTION_HOURS) + { error!("Failed to prune model_changes rows on startup: {err:?}"); } - let mut last_seen_change_id = match db.latest_model_change_id() { - Ok(id) => id, - Err(err) => { - error!("Failed to read latest model_changes cursor: {err:?}"); - 0 - } - }; + // Only stream writes that happen after this app launch. + let cursor = ModelChangeCursor::from_launch_time(); let poll_query_manager = query_manager.clone(); @@ -286,28 +347,23 @@ pub fn init() -> TauriPlugin { app_handle.manage(blob_manager); // Poll model_changes so all writers (including external CLI processes) update the UI. - let app_handle = app_handle.clone(); + let app_handle_poll = app_handle.clone(); let query_manager = poll_query_manager; tauri::async_runtime::spawn(async move { - loop { - match query_manager.connect().list_model_changes_after( - last_seen_change_id, - MODEL_CHANGES_POLL_BATCH_SIZE, - ) { - Ok(changes) => { - for change in changes { - last_seen_change_id = change.id; - if let Err(err) = app_handle.emit("model_write", change.payload) { - error!("Failed to emit model_write event: {err:?}"); - } - } - } - Err(err) => { - error!("Failed to poll model_changes rows: {err:?}"); - } - } + run_model_change_poller(query_manager, app_handle_poll, cursor).await; + }); - tokio::time::sleep(Duration::from_millis(MODEL_CHANGES_POLL_INTERVAL_MS)).await; + // Fast path for local app writes initiated by frontend windows. This keeps the + // current sync-model UX snappy, while DB polling handles external writers (CLI). + let app_handle_local = app_handle.clone(); + tauri::async_runtime::spawn(async move { + for payload in rx { + if !matches!(payload.update_source, UpdateSource::Window { .. }) { + continue; + } + if let Err(err) = app_handle_local.emit("model_write", payload) { + error!("Failed to emit local model_write event: {err:?}"); + } } }); diff --git a/crates/yaak-models/src/db_context.rs b/crates/yaak-models/src/db_context.rs index 782f174b..1e25fc1a 100644 --- a/crates/yaak-models/src/db_context.rs +++ b/crates/yaak-models/src/db_context.rs @@ -3,7 +3,6 @@ use crate::error::Error::ModelNotFound; use crate::error::Result; use crate::models::{AnyModel, UpsertModelInfo}; use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource}; -use rusqlite::types::Type; use rusqlite::{OptionalExtension, params}; use sea_query::{ Asterisk, Expr, Func, IntoColumnRef, IntoIden, IntoTableRef, OnConflict, Query, SimpleExpr, @@ -18,12 +17,6 @@ pub struct DbContext<'a> { pub(crate) conn: ConnectionOrTx<'a>, } -#[derive(Debug, Clone)] -pub struct PersistedModelChange { - pub id: i64, - pub payload: ModelPayload, -} - impl<'a> DbContext<'a> { pub(crate) fn find_one<'s, M>( &self, @@ -235,132 +228,4 @@ impl<'a> DbContext<'a> { Ok(()) } - - pub fn latest_model_change_id(&self) -> Result { - let mut stmt = self.conn.prepare("SELECT COALESCE(MAX(id), 0) FROM model_changes")?; - Ok(stmt.query_row([], |row| row.get(0))?) - } - - pub fn list_model_changes_after( - &self, - after_id: i64, - limit: usize, - ) -> Result> { - let mut stmt = self.conn.prepare( - r#" - SELECT id, payload - FROM model_changes - WHERE id > ?1 - ORDER BY id ASC - LIMIT ?2 - "#, - )?; - - let items = stmt.query_map(params![after_id, limit as i64], |row| { - let id: i64 = row.get(0)?; - let payload_raw: String = row.get(1)?; - let payload = serde_json::from_str::(&payload_raw).map_err(|e| { - rusqlite::Error::FromSqlConversionFailure(1, Type::Text, Box::new(e)) - })?; - Ok(PersistedModelChange { id, payload }) - })?; - - Ok(items.collect::, rusqlite::Error>>()?) - } - - pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result { - let offset = format!("-{days} days"); - Ok(self.conn.resolve().execute( - r#" - DELETE FROM model_changes - WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) - "#, - params![offset], - )?) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::init_in_memory; - use crate::models::Workspace; - - #[test] - fn records_model_changes_for_upsert_and_delete() { - let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); - let db = query_manager.connect(); - - let workspace = db - .upsert_workspace( - &Workspace { - name: "Changes Test".to_string(), - setting_follow_redirects: true, - setting_validate_certificates: true, - ..Default::default() - }, - &UpdateSource::Sync, - ) - .expect("Failed to upsert workspace"); - - let created_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); - assert_eq!(created_changes.len(), 1); - assert_eq!(created_changes[0].payload.model.id(), workspace.id); - assert_eq!(created_changes[0].payload.model.model(), "workspace"); - assert!(matches!( - created_changes[0].payload.change, - ModelChangeEvent::Upsert { created: true } - )); - assert!(matches!(created_changes[0].payload.update_source, UpdateSource::Sync)); - - db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync) - .expect("Failed to delete workspace"); - - let all_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); - assert_eq!(all_changes.len(), 2); - assert!(matches!(all_changes[1].payload.change, ModelChangeEvent::Delete)); - assert_eq!( - db.latest_model_change_id().expect("Failed to read latest ID"), - all_changes[1].id - ); - - let changes_after_first = db - .list_model_changes_after(all_changes[0].id, 10) - .expect("Failed to list changes after cursor"); - assert_eq!(changes_after_first.len(), 1); - assert!(matches!(changes_after_first[0].payload.change, ModelChangeEvent::Delete)); - } - - #[test] - fn prunes_old_model_changes() { - let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); - let db = query_manager.connect(); - - db.upsert_workspace( - &Workspace { - name: "Prune Test".to_string(), - setting_follow_redirects: true, - setting_validate_certificates: true, - ..Default::default() - }, - &UpdateSource::Sync, - ) - .expect("Failed to upsert workspace"); - - let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); - assert_eq!(changes.len(), 1); - - db.conn - .resolve() - .execute( - "UPDATE model_changes SET created_at = '2000-01-01 00:00:00.000' WHERE id = ?1", - params![changes[0].id], - ) - .expect("Failed to age model change row"); - - let pruned = - db.prune_model_changes_older_than_days(30).expect("Failed to prune model changes"); - assert_eq!(pruned, 1); - assert!(db.list_model_changes_after(0, 10).expect("Failed to list changes").is_empty()); - } } diff --git a/crates/yaak-models/src/queries/mod.rs b/crates/yaak-models/src/queries/mod.rs index 2b89c233..a27361b4 100644 --- a/crates/yaak-models/src/queries/mod.rs +++ b/crates/yaak-models/src/queries/mod.rs @@ -11,6 +11,7 @@ mod http_requests; mod http_response_events; mod http_responses; mod key_values; +mod model_changes; mod plugin_key_values; mod plugins; mod settings; @@ -20,6 +21,7 @@ mod websocket_events; mod websocket_requests; mod workspace_metas; pub mod workspaces; +pub use model_changes::PersistedModelChange; const MAX_HISTORY_ITEMS: usize = 20; diff --git a/crates/yaak-models/src/queries/model_changes.rs b/crates/yaak-models/src/queries/model_changes.rs new file mode 100644 index 00000000..b3e44462 --- /dev/null +++ b/crates/yaak-models/src/queries/model_changes.rs @@ -0,0 +1,243 @@ +use crate::db_context::DbContext; +use crate::error::Result; +use crate::util::ModelPayload; +use rusqlite::params; +use rusqlite::types::Type; + +#[derive(Debug, Clone)] +pub struct PersistedModelChange { + pub id: i64, + pub created_at: String, + pub payload: ModelPayload, +} + +impl<'a> DbContext<'a> { + pub fn list_model_changes_after( + &self, + after_id: i64, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, created_at, payload + FROM model_changes + WHERE id > ?1 + ORDER BY id ASC + LIMIT ?2 + "#, + )?; + + let items = stmt.query_map(params![after_id, limit as i64], |row| { + let id: i64 = row.get(0)?; + let created_at: String = row.get(1)?; + let payload_raw: String = row.get(2)?; + let payload = serde_json::from_str::(&payload_raw).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(2, Type::Text, Box::new(e)) + })?; + Ok(PersistedModelChange { id, created_at, payload }) + })?; + + Ok(items.collect::, rusqlite::Error>>()?) + } + + pub fn list_model_changes_since( + &self, + since_created_at: &str, + since_id: i64, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, created_at, payload + FROM model_changes + WHERE created_at > ?1 + OR (created_at = ?1 AND id > ?2) + ORDER BY created_at ASC, id ASC + LIMIT ?3 + "#, + )?; + + let items = stmt.query_map(params![since_created_at, since_id, limit as i64], |row| { + let id: i64 = row.get(0)?; + let created_at: String = row.get(1)?; + let payload_raw: String = row.get(2)?; + let payload = serde_json::from_str::(&payload_raw).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(2, Type::Text, Box::new(e)) + })?; + Ok(PersistedModelChange { id, created_at, payload }) + })?; + + Ok(items.collect::, rusqlite::Error>>()?) + } + + pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result { + let offset = format!("-{days} days"); + Ok(self.conn.resolve().execute( + r#" + DELETE FROM model_changes + WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) + "#, + params![offset], + )?) + } + + pub fn prune_model_changes_older_than_hours(&self, hours: i64) -> Result { + let offset = format!("-{hours} hours"); + Ok(self.conn.resolve().execute( + r#" + DELETE FROM model_changes + WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) + "#, + params![offset], + )?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::init_in_memory; + use crate::models::Workspace; + use crate::util::{ModelChangeEvent, UpdateSource}; + + #[test] + fn records_model_changes_for_upsert_and_delete() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + let workspace = db + .upsert_workspace( + &Workspace { + name: "Changes Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + + let created_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(created_changes.len(), 1); + assert_eq!(created_changes[0].payload.model.id(), workspace.id); + assert_eq!(created_changes[0].payload.model.model(), "workspace"); + assert!(matches!( + created_changes[0].payload.change, + ModelChangeEvent::Upsert { created: true } + )); + assert!(matches!(created_changes[0].payload.update_source, UpdateSource::Sync)); + + db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync) + .expect("Failed to delete workspace"); + + let all_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(all_changes.len(), 2); + assert!(matches!(all_changes[1].payload.change, ModelChangeEvent::Delete)); + assert!(all_changes[1].id > all_changes[0].id); + + let changes_after_first = db + .list_model_changes_after(all_changes[0].id, 10) + .expect("Failed to list changes after cursor"); + assert_eq!(changes_after_first.len(), 1); + assert!(matches!(changes_after_first[0].payload.change, ModelChangeEvent::Delete)); + } + + #[test] + fn prunes_old_model_changes() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + db.upsert_workspace( + &Workspace { + name: "Prune Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + + let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(changes.len(), 1); + + db.conn + .resolve() + .execute( + "UPDATE model_changes SET created_at = '2000-01-01 00:00:00.000' WHERE id = ?1", + params![changes[0].id], + ) + .expect("Failed to age model change row"); + + let pruned = + db.prune_model_changes_older_than_days(30).expect("Failed to prune model changes"); + assert_eq!(pruned, 1); + assert!(db.list_model_changes_after(0, 10).expect("Failed to list changes").is_empty()); + } + + #[test] + fn list_model_changes_since_uses_timestamp_with_id_tiebreaker() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + let workspace = db + .upsert_workspace( + &Workspace { + name: "Cursor Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync) + .expect("Failed to delete workspace"); + + let all = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(all.len(), 2); + + let fixed_ts = "2026-02-16 00:00:00.000"; + db.conn + .resolve() + .execute("UPDATE model_changes SET created_at = ?1", params![fixed_ts]) + .expect("Failed to normalize timestamps"); + + let after_first = + db.list_model_changes_since(fixed_ts, all[0].id, 10).expect("Failed to query cursor"); + assert_eq!(after_first.len(), 1); + assert_eq!(after_first[0].id, all[1].id); + } + + #[test] + fn prunes_old_model_changes_by_hours() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + db.upsert_workspace( + &Workspace { + name: "Prune Hour Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + + let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(changes.len(), 1); + + db.conn + .resolve() + .execute( + "UPDATE model_changes SET created_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', '-2 hours') WHERE id = ?1", + params![changes[0].id], + ) + .expect("Failed to age model change row"); + + let pruned = + db.prune_model_changes_older_than_hours(1).expect("Failed to prune model changes"); + assert_eq!(pruned, 1); + } +}