diff --git a/crates-tauri/yaak-app/src/models_ext.rs b/crates-tauri/yaak-app/src/models_ext.rs index 7ed7ebd4..2163e07e 100644 --- a/crates-tauri/yaak-app/src/models_ext.rs +++ b/crates-tauri/yaak-app/src/models_ext.rs @@ -3,6 +3,8 @@ //! This module provides the Tauri plugin initialization and extension traits //! that allow accessing QueryManager and BlobManager from Tauri's Manager types. +use log::error; +use std::time::Duration; use tauri::plugin::TauriPlugin; use tauri::{Emitter, Manager, Runtime, State}; use tauri_plugin_dialog::{DialogExt, MessageDialogKind}; @@ -13,6 +15,10 @@ use yaak_models::models::{AnyModel, GraphQlIntrospection, GrpcEvent, Settings, W use yaak_models::query_manager::QueryManager; use yaak_models::util::UpdateSource; +const MODEL_CHANGES_RETENTION_DAYS: i64 = 30; +const MODEL_CHANGES_POLL_INTERVAL_MS: u64 = 250; +const MODEL_CHANGES_POLL_BATCH_SIZE: usize = 200; + /// Extension trait for accessing the QueryManager from Tauri Manager types. pub trait QueryManagerExt<'a, R> { fn db_manager(&'a self) -> State<'a, QueryManager>; @@ -249,7 +255,7 @@ pub fn init() -> TauriPlugin { let db_path = app_path.join("db.sqlite"); let blob_path = app_path.join("blobs.sqlite"); - let (query_manager, blob_manager, rx) = + let (query_manager, blob_manager, _rx) = match yaak_models::init_standalone(&db_path, &blob_path) { Ok(result) => result, Err(e) => { @@ -262,14 +268,46 @@ pub fn init() -> TauriPlugin { } }; + let db = query_manager.connect(); + if let Err(err) = db.prune_model_changes_older_than_days(MODEL_CHANGES_RETENTION_DAYS) { + error!("Failed to prune model_changes rows on startup: {err:?}"); + } + let mut last_seen_change_id = match db.latest_model_change_id() { + Ok(id) => id, + Err(err) => { + error!("Failed to read latest model_changes cursor: {err:?}"); + 0 + } + }; + + let poll_query_manager = query_manager.clone(); + app_handle.manage(query_manager); app_handle.manage(blob_manager); - // Forward model change events to the frontend + // Poll model_changes so all writers (including external CLI processes) update the UI. let app_handle = app_handle.clone(); + let query_manager = poll_query_manager; tauri::async_runtime::spawn(async move { - for payload in rx { - app_handle.emit("model_write", payload).unwrap(); + loop { + match query_manager.connect().list_model_changes_after( + last_seen_change_id, + MODEL_CHANGES_POLL_BATCH_SIZE, + ) { + Ok(changes) => { + for change in changes { + last_seen_change_id = change.id; + if let Err(err) = app_handle.emit("model_write", change.payload) { + error!("Failed to emit model_write event: {err:?}"); + } + } + } + Err(err) => { + error!("Failed to poll model_changes rows: {err:?}"); + } + } + + tokio::time::sleep(Duration::from_millis(MODEL_CHANGES_POLL_INTERVAL_MS)).await; } }); diff --git a/crates/yaak-models/migrations/20260216000000_model-changes.sql b/crates/yaak-models/migrations/20260216000000_model-changes.sql new file mode 100644 index 00000000..419853c0 --- /dev/null +++ b/crates/yaak-models/migrations/20260216000000_model-changes.sql @@ -0,0 +1,12 @@ +CREATE TABLE model_changes +( + id INTEGER PRIMARY KEY AUTOINCREMENT, + model TEXT NOT NULL, + model_id TEXT NOT NULL, + change TEXT NOT NULL, + update_source TEXT NOT NULL, + payload TEXT NOT NULL, + created_at DATETIME DEFAULT (STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL +); + +CREATE INDEX idx_model_changes_created_at ON model_changes (created_at); diff --git a/crates/yaak-models/src/db_context.rs b/crates/yaak-models/src/db_context.rs index 31479ccf..782f174b 100644 --- a/crates/yaak-models/src/db_context.rs +++ b/crates/yaak-models/src/db_context.rs @@ -3,8 +3,8 @@ use crate::error::Error::ModelNotFound; use crate::error::Result; use crate::models::{AnyModel, UpsertModelInfo}; use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource}; -use log::error; -use rusqlite::OptionalExtension; +use rusqlite::types::Type; +use rusqlite::{OptionalExtension, params}; use sea_query::{ Asterisk, Expr, Func, IntoColumnRef, IntoIden, IntoTableRef, OnConflict, Query, SimpleExpr, SqliteQueryBuilder, @@ -14,10 +14,16 @@ use std::fmt::Debug; use std::sync::mpsc; pub struct DbContext<'a> { - pub(crate) events_tx: mpsc::Sender, + pub(crate) _events_tx: mpsc::Sender, pub(crate) conn: ConnectionOrTx<'a>, } +#[derive(Debug, Clone)] +pub struct PersistedModelChange { + pub id: i64, + pub payload: ModelPayload, +} + impl<'a> DbContext<'a> { pub(crate) fn find_one<'s, M>( &self, @@ -180,9 +186,8 @@ impl<'a> DbContext<'a> { change: ModelChangeEvent::Upsert { created }, }; - if let Err(e) = self.events_tx.send(payload.clone()) { - error!("Failed to send model change {source:?}: {e:?}"); - } + self.record_model_change(&payload)?; + let _ = self._events_tx.send(payload); Ok(m) } @@ -203,9 +208,159 @@ impl<'a> DbContext<'a> { change: ModelChangeEvent::Delete, }; - if let Err(e) = self.events_tx.send(payload) { - error!("Failed to send model change {source:?}: {e:?}"); - } + self.record_model_change(&payload)?; + let _ = self._events_tx.send(payload); + Ok(m.clone()) } + + fn record_model_change(&self, payload: &ModelPayload) -> Result<()> { + let payload_json = serde_json::to_string(payload)?; + let source_json = serde_json::to_string(&payload.update_source)?; + let change_json = serde_json::to_string(&payload.change)?; + + self.conn.resolve().execute( + r#" + INSERT INTO model_changes (model, model_id, change, update_source, payload) + VALUES (?1, ?2, ?3, ?4, ?5) + "#, + params![ + payload.model.model(), + payload.model.id(), + change_json, + source_json, + payload_json, + ], + )?; + + Ok(()) + } + + pub fn latest_model_change_id(&self) -> Result { + let mut stmt = self.conn.prepare("SELECT COALESCE(MAX(id), 0) FROM model_changes")?; + Ok(stmt.query_row([], |row| row.get(0))?) + } + + pub fn list_model_changes_after( + &self, + after_id: i64, + limit: usize, + ) -> Result> { + let mut stmt = self.conn.prepare( + r#" + SELECT id, payload + FROM model_changes + WHERE id > ?1 + ORDER BY id ASC + LIMIT ?2 + "#, + )?; + + let items = stmt.query_map(params![after_id, limit as i64], |row| { + let id: i64 = row.get(0)?; + let payload_raw: String = row.get(1)?; + let payload = serde_json::from_str::(&payload_raw).map_err(|e| { + rusqlite::Error::FromSqlConversionFailure(1, Type::Text, Box::new(e)) + })?; + Ok(PersistedModelChange { id, payload }) + })?; + + Ok(items.collect::, rusqlite::Error>>()?) + } + + pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result { + let offset = format!("-{days} days"); + Ok(self.conn.resolve().execute( + r#" + DELETE FROM model_changes + WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) + "#, + params![offset], + )?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::init_in_memory; + use crate::models::Workspace; + + #[test] + fn records_model_changes_for_upsert_and_delete() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + let workspace = db + .upsert_workspace( + &Workspace { + name: "Changes Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + + let created_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(created_changes.len(), 1); + assert_eq!(created_changes[0].payload.model.id(), workspace.id); + assert_eq!(created_changes[0].payload.model.model(), "workspace"); + assert!(matches!( + created_changes[0].payload.change, + ModelChangeEvent::Upsert { created: true } + )); + assert!(matches!(created_changes[0].payload.update_source, UpdateSource::Sync)); + + db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync) + .expect("Failed to delete workspace"); + + let all_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(all_changes.len(), 2); + assert!(matches!(all_changes[1].payload.change, ModelChangeEvent::Delete)); + assert_eq!( + db.latest_model_change_id().expect("Failed to read latest ID"), + all_changes[1].id + ); + + let changes_after_first = db + .list_model_changes_after(all_changes[0].id, 10) + .expect("Failed to list changes after cursor"); + assert_eq!(changes_after_first.len(), 1); + assert!(matches!(changes_after_first[0].payload.change, ModelChangeEvent::Delete)); + } + + #[test] + fn prunes_old_model_changes() { + let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB"); + let db = query_manager.connect(); + + db.upsert_workspace( + &Workspace { + name: "Prune Test".to_string(), + setting_follow_redirects: true, + setting_validate_certificates: true, + ..Default::default() + }, + &UpdateSource::Sync, + ) + .expect("Failed to upsert workspace"); + + let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes"); + assert_eq!(changes.len(), 1); + + db.conn + .resolve() + .execute( + "UPDATE model_changes SET created_at = '2000-01-01 00:00:00.000' WHERE id = ?1", + params![changes[0].id], + ) + .expect("Failed to age model change row"); + + let pruned = + db.prune_model_changes_older_than_days(30).expect("Failed to prune model changes"); + assert_eq!(pruned, 1); + assert!(db.list_model_changes_after(0, 10).expect("Failed to list changes").is_empty()); + } } diff --git a/crates/yaak-models/src/models.rs b/crates/yaak-models/src/models.rs index 05401770..77e76824 100644 --- a/crates/yaak-models/src/models.rs +++ b/crates/yaak-models/src/models.rs @@ -2347,6 +2347,15 @@ macro_rules! define_any_model { )* } } + + #[inline] + pub fn model(&self) -> &str { + match self { + $( + AnyModel::$type(inner) => &inner.model, + )* + } + } } $( diff --git a/crates/yaak-models/src/query_manager.rs b/crates/yaak-models/src/query_manager.rs index c123c4ff..fc222074 100644 --- a/crates/yaak-models/src/query_manager.rs +++ b/crates/yaak-models/src/query_manager.rs @@ -25,7 +25,7 @@ impl QueryManager { .expect("Failed to gain lock on DB") .get() .expect("Failed to get a new DB connection from the pool"); - DbContext { events_tx: self.events_tx.clone(), conn: ConnectionOrTx::Connection(conn) } + DbContext { _events_tx: self.events_tx.clone(), conn: ConnectionOrTx::Connection(conn) } } pub fn with_conn(&self, func: F) -> T @@ -39,8 +39,10 @@ impl QueryManager { .get() .expect("Failed to get new DB connection from the pool"); - let db_context = - DbContext { events_tx: self.events_tx.clone(), conn: ConnectionOrTx::Connection(conn) }; + let db_context = DbContext { + _events_tx: self.events_tx.clone(), + conn: ConnectionOrTx::Connection(conn), + }; func(&db_context) } @@ -62,8 +64,10 @@ impl QueryManager { .transaction_with_behavior(TransactionBehavior::Immediate) .expect("Failed to start DB transaction"); - let db_context = - DbContext { events_tx: self.events_tx.clone(), conn: ConnectionOrTx::Transaction(&tx) }; + let db_context = DbContext { + _events_tx: self.events_tx.clone(), + conn: ConnectionOrTx::Transaction(&tx), + }; match func(&db_context) { Ok(val) => {