mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-05-11 10:20:09 +02:00
Refine model change polling and move queries out of db_context
This commit is contained in:
@@ -3,6 +3,7 @@
|
||||
//! This module provides the Tauri plugin initialization and extension traits
|
||||
//! that allow accessing QueryManager and BlobManager from Tauri's Manager types.
|
||||
|
||||
use chrono::Utc;
|
||||
use log::error;
|
||||
use std::time::Duration;
|
||||
use tauri::plugin::TauriPlugin;
|
||||
@@ -15,10 +16,74 @@ use yaak_models::models::{AnyModel, GraphQlIntrospection, GrpcEvent, Settings, W
|
||||
use yaak_models::query_manager::QueryManager;
|
||||
use yaak_models::util::UpdateSource;
|
||||
|
||||
const MODEL_CHANGES_RETENTION_DAYS: i64 = 30;
|
||||
const MODEL_CHANGES_RETENTION_HOURS: i64 = 1;
|
||||
const MODEL_CHANGES_POLL_INTERVAL_MS: u64 = 250;
|
||||
const MODEL_CHANGES_POLL_BATCH_SIZE: usize = 200;
|
||||
|
||||
struct ModelChangeCursor {
|
||||
created_at: String,
|
||||
id: i64,
|
||||
}
|
||||
|
||||
impl ModelChangeCursor {
|
||||
fn from_launch_time() -> Self {
|
||||
Self {
|
||||
created_at: Utc::now().naive_utc().format("%Y-%m-%d %H:%M:%S%.3f").to_string(),
|
||||
id: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn drain_model_changes_batch<R: Runtime>(
|
||||
query_manager: &QueryManager,
|
||||
app_handle: &tauri::AppHandle<R>,
|
||||
cursor: &mut ModelChangeCursor,
|
||||
) -> bool {
|
||||
let changes = match query_manager.connect().list_model_changes_since(
|
||||
&cursor.created_at,
|
||||
cursor.id,
|
||||
MODEL_CHANGES_POLL_BATCH_SIZE,
|
||||
) {
|
||||
Ok(changes) => changes,
|
||||
Err(err) => {
|
||||
error!("Failed to poll model_changes rows: {err:?}");
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
if changes.is_empty() {
|
||||
return false;
|
||||
}
|
||||
|
||||
let fetched_count = changes.len();
|
||||
for change in changes {
|
||||
cursor.created_at = change.created_at;
|
||||
cursor.id = change.id;
|
||||
|
||||
// Local window-originated writes are forwarded immediately from the
|
||||
// in-memory model event channel.
|
||||
if matches!(change.payload.update_source, UpdateSource::Window { .. }) {
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = app_handle.emit("model_write", change.payload) {
|
||||
error!("Failed to emit model_write event: {err:?}");
|
||||
}
|
||||
}
|
||||
|
||||
fetched_count == MODEL_CHANGES_POLL_BATCH_SIZE
|
||||
}
|
||||
|
||||
async fn run_model_change_poller<R: Runtime>(
|
||||
query_manager: QueryManager,
|
||||
app_handle: tauri::AppHandle<R>,
|
||||
mut cursor: ModelChangeCursor,
|
||||
) {
|
||||
loop {
|
||||
while drain_model_changes_batch(&query_manager, &app_handle, &mut cursor) {}
|
||||
tokio::time::sleep(Duration::from_millis(MODEL_CHANGES_POLL_INTERVAL_MS)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Extension trait for accessing the QueryManager from Tauri Manager types.
|
||||
pub trait QueryManagerExt<'a, R> {
|
||||
fn db_manager(&'a self) -> State<'a, QueryManager>;
|
||||
@@ -255,7 +320,7 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
let db_path = app_path.join("db.sqlite");
|
||||
let blob_path = app_path.join("blobs.sqlite");
|
||||
|
||||
let (query_manager, blob_manager, _rx) =
|
||||
let (query_manager, blob_manager, rx) =
|
||||
match yaak_models::init_standalone(&db_path, &blob_path) {
|
||||
Ok(result) => result,
|
||||
Err(e) => {
|
||||
@@ -269,16 +334,12 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
};
|
||||
|
||||
let db = query_manager.connect();
|
||||
if let Err(err) = db.prune_model_changes_older_than_days(MODEL_CHANGES_RETENTION_DAYS) {
|
||||
if let Err(err) = db.prune_model_changes_older_than_hours(MODEL_CHANGES_RETENTION_HOURS)
|
||||
{
|
||||
error!("Failed to prune model_changes rows on startup: {err:?}");
|
||||
}
|
||||
let mut last_seen_change_id = match db.latest_model_change_id() {
|
||||
Ok(id) => id,
|
||||
Err(err) => {
|
||||
error!("Failed to read latest model_changes cursor: {err:?}");
|
||||
0
|
||||
}
|
||||
};
|
||||
// Only stream writes that happen after this app launch.
|
||||
let cursor = ModelChangeCursor::from_launch_time();
|
||||
|
||||
let poll_query_manager = query_manager.clone();
|
||||
|
||||
@@ -286,28 +347,23 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
app_handle.manage(blob_manager);
|
||||
|
||||
// Poll model_changes so all writers (including external CLI processes) update the UI.
|
||||
let app_handle = app_handle.clone();
|
||||
let app_handle_poll = app_handle.clone();
|
||||
let query_manager = poll_query_manager;
|
||||
tauri::async_runtime::spawn(async move {
|
||||
loop {
|
||||
match query_manager.connect().list_model_changes_after(
|
||||
last_seen_change_id,
|
||||
MODEL_CHANGES_POLL_BATCH_SIZE,
|
||||
) {
|
||||
Ok(changes) => {
|
||||
for change in changes {
|
||||
last_seen_change_id = change.id;
|
||||
if let Err(err) = app_handle.emit("model_write", change.payload) {
|
||||
error!("Failed to emit model_write event: {err:?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Failed to poll model_changes rows: {err:?}");
|
||||
}
|
||||
}
|
||||
run_model_change_poller(query_manager, app_handle_poll, cursor).await;
|
||||
});
|
||||
|
||||
tokio::time::sleep(Duration::from_millis(MODEL_CHANGES_POLL_INTERVAL_MS)).await;
|
||||
// Fast path for local app writes initiated by frontend windows. This keeps the
|
||||
// current sync-model UX snappy, while DB polling handles external writers (CLI).
|
||||
let app_handle_local = app_handle.clone();
|
||||
tauri::async_runtime::spawn(async move {
|
||||
for payload in rx {
|
||||
if !matches!(payload.update_source, UpdateSource::Window { .. }) {
|
||||
continue;
|
||||
}
|
||||
if let Err(err) = app_handle_local.emit("model_write", payload) {
|
||||
error!("Failed to emit local model_write event: {err:?}");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
@@ -3,7 +3,6 @@ use crate::error::Error::ModelNotFound;
|
||||
use crate::error::Result;
|
||||
use crate::models::{AnyModel, UpsertModelInfo};
|
||||
use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource};
|
||||
use rusqlite::types::Type;
|
||||
use rusqlite::{OptionalExtension, params};
|
||||
use sea_query::{
|
||||
Asterisk, Expr, Func, IntoColumnRef, IntoIden, IntoTableRef, OnConflict, Query, SimpleExpr,
|
||||
@@ -18,12 +17,6 @@ pub struct DbContext<'a> {
|
||||
pub(crate) conn: ConnectionOrTx<'a>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PersistedModelChange {
|
||||
pub id: i64,
|
||||
pub payload: ModelPayload,
|
||||
}
|
||||
|
||||
impl<'a> DbContext<'a> {
|
||||
pub(crate) fn find_one<'s, M>(
|
||||
&self,
|
||||
@@ -235,132 +228,4 @@ impl<'a> DbContext<'a> {
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn latest_model_change_id(&self) -> Result<i64> {
|
||||
let mut stmt = self.conn.prepare("SELECT COALESCE(MAX(id), 0) FROM model_changes")?;
|
||||
Ok(stmt.query_row([], |row| row.get(0))?)
|
||||
}
|
||||
|
||||
pub fn list_model_changes_after(
|
||||
&self,
|
||||
after_id: i64,
|
||||
limit: usize,
|
||||
) -> Result<Vec<PersistedModelChange>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
r#"
|
||||
SELECT id, payload
|
||||
FROM model_changes
|
||||
WHERE id > ?1
|
||||
ORDER BY id ASC
|
||||
LIMIT ?2
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let items = stmt.query_map(params![after_id, limit as i64], |row| {
|
||||
let id: i64 = row.get(0)?;
|
||||
let payload_raw: String = row.get(1)?;
|
||||
let payload = serde_json::from_str::<ModelPayload>(&payload_raw).map_err(|e| {
|
||||
rusqlite::Error::FromSqlConversionFailure(1, Type::Text, Box::new(e))
|
||||
})?;
|
||||
Ok(PersistedModelChange { id, payload })
|
||||
})?;
|
||||
|
||||
Ok(items.collect::<std::result::Result<Vec<_>, rusqlite::Error>>()?)
|
||||
}
|
||||
|
||||
pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result<usize> {
|
||||
let offset = format!("-{days} days");
|
||||
Ok(self.conn.resolve().execute(
|
||||
r#"
|
||||
DELETE FROM model_changes
|
||||
WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1)
|
||||
"#,
|
||||
params![offset],
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::init_in_memory;
|
||||
use crate::models::Workspace;
|
||||
|
||||
#[test]
|
||||
fn records_model_changes_for_upsert_and_delete() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
let workspace = db
|
||||
.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Changes Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
|
||||
let created_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(created_changes.len(), 1);
|
||||
assert_eq!(created_changes[0].payload.model.id(), workspace.id);
|
||||
assert_eq!(created_changes[0].payload.model.model(), "workspace");
|
||||
assert!(matches!(
|
||||
created_changes[0].payload.change,
|
||||
ModelChangeEvent::Upsert { created: true }
|
||||
));
|
||||
assert!(matches!(created_changes[0].payload.update_source, UpdateSource::Sync));
|
||||
|
||||
db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync)
|
||||
.expect("Failed to delete workspace");
|
||||
|
||||
let all_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(all_changes.len(), 2);
|
||||
assert!(matches!(all_changes[1].payload.change, ModelChangeEvent::Delete));
|
||||
assert_eq!(
|
||||
db.latest_model_change_id().expect("Failed to read latest ID"),
|
||||
all_changes[1].id
|
||||
);
|
||||
|
||||
let changes_after_first = db
|
||||
.list_model_changes_after(all_changes[0].id, 10)
|
||||
.expect("Failed to list changes after cursor");
|
||||
assert_eq!(changes_after_first.len(), 1);
|
||||
assert!(matches!(changes_after_first[0].payload.change, ModelChangeEvent::Delete));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prunes_old_model_changes() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
db.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Prune Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
|
||||
let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(changes.len(), 1);
|
||||
|
||||
db.conn
|
||||
.resolve()
|
||||
.execute(
|
||||
"UPDATE model_changes SET created_at = '2000-01-01 00:00:00.000' WHERE id = ?1",
|
||||
params![changes[0].id],
|
||||
)
|
||||
.expect("Failed to age model change row");
|
||||
|
||||
let pruned =
|
||||
db.prune_model_changes_older_than_days(30).expect("Failed to prune model changes");
|
||||
assert_eq!(pruned, 1);
|
||||
assert!(db.list_model_changes_after(0, 10).expect("Failed to list changes").is_empty());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ mod http_requests;
|
||||
mod http_response_events;
|
||||
mod http_responses;
|
||||
mod key_values;
|
||||
mod model_changes;
|
||||
mod plugin_key_values;
|
||||
mod plugins;
|
||||
mod settings;
|
||||
@@ -20,6 +21,7 @@ mod websocket_events;
|
||||
mod websocket_requests;
|
||||
mod workspace_metas;
|
||||
pub mod workspaces;
|
||||
pub use model_changes::PersistedModelChange;
|
||||
|
||||
const MAX_HISTORY_ITEMS: usize = 20;
|
||||
|
||||
|
||||
243
crates/yaak-models/src/queries/model_changes.rs
Normal file
243
crates/yaak-models/src/queries/model_changes.rs
Normal file
@@ -0,0 +1,243 @@
|
||||
use crate::db_context::DbContext;
|
||||
use crate::error::Result;
|
||||
use crate::util::ModelPayload;
|
||||
use rusqlite::params;
|
||||
use rusqlite::types::Type;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PersistedModelChange {
|
||||
pub id: i64,
|
||||
pub created_at: String,
|
||||
pub payload: ModelPayload,
|
||||
}
|
||||
|
||||
impl<'a> DbContext<'a> {
|
||||
pub fn list_model_changes_after(
|
||||
&self,
|
||||
after_id: i64,
|
||||
limit: usize,
|
||||
) -> Result<Vec<PersistedModelChange>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
r#"
|
||||
SELECT id, created_at, payload
|
||||
FROM model_changes
|
||||
WHERE id > ?1
|
||||
ORDER BY id ASC
|
||||
LIMIT ?2
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let items = stmt.query_map(params![after_id, limit as i64], |row| {
|
||||
let id: i64 = row.get(0)?;
|
||||
let created_at: String = row.get(1)?;
|
||||
let payload_raw: String = row.get(2)?;
|
||||
let payload = serde_json::from_str::<ModelPayload>(&payload_raw).map_err(|e| {
|
||||
rusqlite::Error::FromSqlConversionFailure(2, Type::Text, Box::new(e))
|
||||
})?;
|
||||
Ok(PersistedModelChange { id, created_at, payload })
|
||||
})?;
|
||||
|
||||
Ok(items.collect::<std::result::Result<Vec<_>, rusqlite::Error>>()?)
|
||||
}
|
||||
|
||||
pub fn list_model_changes_since(
|
||||
&self,
|
||||
since_created_at: &str,
|
||||
since_id: i64,
|
||||
limit: usize,
|
||||
) -> Result<Vec<PersistedModelChange>> {
|
||||
let mut stmt = self.conn.prepare(
|
||||
r#"
|
||||
SELECT id, created_at, payload
|
||||
FROM model_changes
|
||||
WHERE created_at > ?1
|
||||
OR (created_at = ?1 AND id > ?2)
|
||||
ORDER BY created_at ASC, id ASC
|
||||
LIMIT ?3
|
||||
"#,
|
||||
)?;
|
||||
|
||||
let items = stmt.query_map(params![since_created_at, since_id, limit as i64], |row| {
|
||||
let id: i64 = row.get(0)?;
|
||||
let created_at: String = row.get(1)?;
|
||||
let payload_raw: String = row.get(2)?;
|
||||
let payload = serde_json::from_str::<ModelPayload>(&payload_raw).map_err(|e| {
|
||||
rusqlite::Error::FromSqlConversionFailure(2, Type::Text, Box::new(e))
|
||||
})?;
|
||||
Ok(PersistedModelChange { id, created_at, payload })
|
||||
})?;
|
||||
|
||||
Ok(items.collect::<std::result::Result<Vec<_>, rusqlite::Error>>()?)
|
||||
}
|
||||
|
||||
pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result<usize> {
|
||||
let offset = format!("-{days} days");
|
||||
Ok(self.conn.resolve().execute(
|
||||
r#"
|
||||
DELETE FROM model_changes
|
||||
WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1)
|
||||
"#,
|
||||
params![offset],
|
||||
)?)
|
||||
}
|
||||
|
||||
pub fn prune_model_changes_older_than_hours(&self, hours: i64) -> Result<usize> {
|
||||
let offset = format!("-{hours} hours");
|
||||
Ok(self.conn.resolve().execute(
|
||||
r#"
|
||||
DELETE FROM model_changes
|
||||
WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1)
|
||||
"#,
|
||||
params![offset],
|
||||
)?)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::init_in_memory;
|
||||
use crate::models::Workspace;
|
||||
use crate::util::{ModelChangeEvent, UpdateSource};
|
||||
|
||||
#[test]
|
||||
fn records_model_changes_for_upsert_and_delete() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
let workspace = db
|
||||
.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Changes Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
|
||||
let created_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(created_changes.len(), 1);
|
||||
assert_eq!(created_changes[0].payload.model.id(), workspace.id);
|
||||
assert_eq!(created_changes[0].payload.model.model(), "workspace");
|
||||
assert!(matches!(
|
||||
created_changes[0].payload.change,
|
||||
ModelChangeEvent::Upsert { created: true }
|
||||
));
|
||||
assert!(matches!(created_changes[0].payload.update_source, UpdateSource::Sync));
|
||||
|
||||
db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync)
|
||||
.expect("Failed to delete workspace");
|
||||
|
||||
let all_changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(all_changes.len(), 2);
|
||||
assert!(matches!(all_changes[1].payload.change, ModelChangeEvent::Delete));
|
||||
assert!(all_changes[1].id > all_changes[0].id);
|
||||
|
||||
let changes_after_first = db
|
||||
.list_model_changes_after(all_changes[0].id, 10)
|
||||
.expect("Failed to list changes after cursor");
|
||||
assert_eq!(changes_after_first.len(), 1);
|
||||
assert!(matches!(changes_after_first[0].payload.change, ModelChangeEvent::Delete));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prunes_old_model_changes() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
db.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Prune Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
|
||||
let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(changes.len(), 1);
|
||||
|
||||
db.conn
|
||||
.resolve()
|
||||
.execute(
|
||||
"UPDATE model_changes SET created_at = '2000-01-01 00:00:00.000' WHERE id = ?1",
|
||||
params![changes[0].id],
|
||||
)
|
||||
.expect("Failed to age model change row");
|
||||
|
||||
let pruned =
|
||||
db.prune_model_changes_older_than_days(30).expect("Failed to prune model changes");
|
||||
assert_eq!(pruned, 1);
|
||||
assert!(db.list_model_changes_after(0, 10).expect("Failed to list changes").is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn list_model_changes_since_uses_timestamp_with_id_tiebreaker() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
let workspace = db
|
||||
.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Cursor Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
db.delete_workspace_by_id(&workspace.id, &UpdateSource::Sync)
|
||||
.expect("Failed to delete workspace");
|
||||
|
||||
let all = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(all.len(), 2);
|
||||
|
||||
let fixed_ts = "2026-02-16 00:00:00.000";
|
||||
db.conn
|
||||
.resolve()
|
||||
.execute("UPDATE model_changes SET created_at = ?1", params![fixed_ts])
|
||||
.expect("Failed to normalize timestamps");
|
||||
|
||||
let after_first =
|
||||
db.list_model_changes_since(fixed_ts, all[0].id, 10).expect("Failed to query cursor");
|
||||
assert_eq!(after_first.len(), 1);
|
||||
assert_eq!(after_first[0].id, all[1].id);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn prunes_old_model_changes_by_hours() {
|
||||
let (query_manager, _blob_manager, _rx) = init_in_memory().expect("Failed to init DB");
|
||||
let db = query_manager.connect();
|
||||
|
||||
db.upsert_workspace(
|
||||
&Workspace {
|
||||
name: "Prune Hour Test".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Sync,
|
||||
)
|
||||
.expect("Failed to upsert workspace");
|
||||
|
||||
let changes = db.list_model_changes_after(0, 10).expect("Failed to list changes");
|
||||
assert_eq!(changes.len(), 1);
|
||||
|
||||
db.conn
|
||||
.resolve()
|
||||
.execute(
|
||||
"UPDATE model_changes SET created_at = STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', '-2 hours') WHERE id = ?1",
|
||||
params![changes[0].id],
|
||||
)
|
||||
.expect("Failed to age model change row");
|
||||
|
||||
let pruned =
|
||||
db.prune_model_changes_older_than_hours(1).expect("Failed to prune model changes");
|
||||
assert_eq!(pruned, 1);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user