diff --git a/Cargo.lock b/Cargo.lock index 6aafaa39..30cecb2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10387,12 +10387,15 @@ dependencies = [ name = "yaak-database" version = "0.1.0" dependencies = [ + "chrono", "include_dir", "log 0.4.29", "nanoid", "r2d2", "r2d2_sqlite", "rusqlite", + "sea-query", + "sea-query-rusqlite", "serde", "serde_json", "thiserror 2.0.17", @@ -10587,6 +10590,23 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "yaak-proxy-models" +version = "0.1.0" +dependencies = [ + "chrono", + "include_dir", + "log 0.4.29", + "r2d2", + "r2d2_sqlite", + "rusqlite", + "sea-query", + "serde", + "serde_json", + "thiserror 2.0.17", + "yaak-database", +] + [[package]] name = "yaak-sse" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 0315692c..13b69c00 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ members = [ "crates/yaak-ws", "crates/yaak-api", "crates/yaak-proxy", + # Proxy-specific crates + "crates-proxy/yaak-proxy-models", # CLI crates "crates-cli/yaak-cli", # Tauri-specific crates @@ -73,6 +75,9 @@ yaak-ws = { path = "crates/yaak-ws" } yaak-api = { path = "crates/yaak-api" } yaak-proxy = { path = "crates/yaak-proxy" } +# Internal crates - proxy +yaak-proxy-models = { path = "crates-proxy/yaak-proxy-models" } + # Internal crates - Tauri-specific yaak-fonts = { path = "crates-tauri/yaak-fonts" } yaak-license = { path = "crates-tauri/yaak-license" } diff --git a/crates-cli/yaak-cli/src/context.rs b/crates-cli/yaak-cli/src/context.rs index 8e839f14..da3f2e20 100644 --- a/crates-cli/yaak-cli/src/context.rs +++ b/crates-cli/yaak-cli/src/context.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use tokio::sync::Mutex; use yaak_crypto::manager::EncryptionManager; use yaak_models::blob_manager::BlobManager; -use yaak_models::db_context::DbContext; +use yaak_models::client_db::ClientDb; use yaak_models::query_manager::QueryManager; use yaak_plugins::events::PluginContext; use yaak_plugins::manager::PluginManager; @@ -108,7 +108,7 @@ impl CliContext { &self.data_dir } - pub fn db(&self) -> DbContext<'_> { + pub fn db(&self) -> ClientDb<'_> { self.query_manager.connect() } diff --git a/crates-proxy/yaak-proxy-models/Cargo.toml b/crates-proxy/yaak-proxy-models/Cargo.toml new file mode 100644 index 00000000..d73e6905 --- /dev/null +++ b/crates-proxy/yaak-proxy-models/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "yaak-proxy-models" +version = "0.1.0" +edition = "2024" +publish = false + +[dependencies] +chrono = { version = "0.4.38", features = ["serde"] } +include_dir = "0.7" +log = { workspace = true } +r2d2 = "0.8.10" +r2d2_sqlite = { version = "0.25.0" } +rusqlite = { version = "0.32.1", features = ["bundled", "chrono"] } +sea-query = { version = "0.32.1", features = ["with-chrono", "attr"] } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +thiserror = { workspace = true } +yaak-database = { workspace = true } diff --git a/crates-proxy/yaak-proxy-models/migrations/00000001_init.sql b/crates-proxy/yaak-proxy-models/migrations/00000001_init.sql new file mode 100644 index 00000000..eb92e876 --- /dev/null +++ b/crates-proxy/yaak-proxy-models/migrations/00000001_init.sql @@ -0,0 +1,35 @@ +-- Proxy version of http_responses, duplicated from client. +-- No workspace_id/request_id foreign keys — proxy captures raw traffic. +CREATE TABLE proxy_http_responses ( + id TEXT NOT NULL PRIMARY KEY, + model TEXT DEFAULT 'proxy_http_response' NOT NULL, + proxy_request_id INTEGER NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, + elapsed INTEGER NOT NULL DEFAULT 0, + elapsed_headers INTEGER NOT NULL DEFAULT 0, + elapsed_dns INTEGER NOT NULL DEFAULT 0, + status INTEGER NOT NULL DEFAULT 0, + status_reason TEXT, + url TEXT NOT NULL, + headers TEXT NOT NULL DEFAULT '[]', + request_headers TEXT NOT NULL DEFAULT '[]', + error TEXT, + body_path TEXT, + content_length INTEGER, + content_length_compressed INTEGER, + request_content_length INTEGER, + remote_addr TEXT, + version TEXT, + state TEXT DEFAULT 'initialized' NOT NULL +); +CREATE INDEX idx_proxy_http_responses_created_at ON proxy_http_responses (created_at DESC); + +-- Inline body storage (proxy keeps everything self-contained in one DB file) +CREATE TABLE proxy_http_response_bodies ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + response_id TEXT NOT NULL REFERENCES proxy_http_responses(id) ON DELETE CASCADE, + body_type TEXT NOT NULL, + data BLOB NOT NULL, + UNIQUE(response_id, body_type) +); diff --git a/crates-proxy/yaak-proxy-models/src/error.rs b/crates-proxy/yaak-proxy-models/src/error.rs new file mode 100644 index 00000000..2e4bfc78 --- /dev/null +++ b/crates-proxy/yaak-proxy-models/src/error.rs @@ -0,0 +1 @@ +pub use yaak_database::error::{Error, Result}; diff --git a/crates-proxy/yaak-proxy-models/src/lib.rs b/crates-proxy/yaak-proxy-models/src/lib.rs new file mode 100644 index 00000000..3ce268b0 --- /dev/null +++ b/crates-proxy/yaak-proxy-models/src/lib.rs @@ -0,0 +1,73 @@ +use crate::error::{Error, Result}; +use include_dir::{Dir, include_dir}; +use log::info; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use std::fs::create_dir_all; +use std::path::Path; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use yaak_database::{ConnectionOrTx, DbContext}; + +pub mod error; +pub mod models; + +static MIGRATIONS_DIR: Dir = include_dir!("$CARGO_MANIFEST_DIR/migrations"); + +/// Manages the proxy session database pool. +/// Use `connect()` to get a `DbContext` for running queries. +#[derive(Debug, Clone)] +pub struct ProxyDb { + pool: Arc>>, +} + +impl ProxyDb { + pub fn connect(&self) -> DbContext<'_> { + let conn = self + .pool + .lock() + .expect("Failed to gain lock on proxy DB") + .get() + .expect("Failed to get proxy DB connection from pool"); + DbContext::new(ConnectionOrTx::Connection(conn)) + } +} + +pub fn init_standalone(db_path: impl AsRef) -> Result { + let db_path = db_path.as_ref(); + + if let Some(parent) = db_path.parent() { + create_dir_all(parent)?; + } + + info!("Initializing proxy session database {db_path:?}"); + let manager = SqliteConnectionManager::file(db_path); + let pool = Pool::builder() + .max_size(100) + .connection_timeout(Duration::from_secs(10)) + .build(manager) + .map_err(|e| Error::Database(e.to_string()))?; + + pool.get()?.execute_batch( + "PRAGMA journal_mode=WAL; + PRAGMA foreign_keys=ON;", + )?; + + yaak_database::run_migrations(&pool, &MIGRATIONS_DIR)?; + + Ok(ProxyDb { pool: Arc::new(Mutex::new(pool)) }) +} + +pub fn init_in_memory() -> Result { + let manager = SqliteConnectionManager::memory(); + let pool = Pool::builder() + .max_size(1) + .build(manager) + .map_err(|e| Error::Database(e.to_string()))?; + + pool.get()?.execute_batch("PRAGMA foreign_keys=ON;")?; + + yaak_database::run_migrations(&pool, &MIGRATIONS_DIR)?; + + Ok(ProxyDb { pool: Arc::new(Mutex::new(pool)) }) +} diff --git a/crates-proxy/yaak-proxy-models/src/models.rs b/crates-proxy/yaak-proxy-models/src/models.rs new file mode 100644 index 00000000..84a5926b --- /dev/null +++ b/crates-proxy/yaak-proxy-models/src/models.rs @@ -0,0 +1,160 @@ +use chrono::NaiveDateTime; +use rusqlite::Row; +use sea_query::Order::Desc; +use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr, enum_def}; +use serde::{Deserialize, Serialize}; +use yaak_database::{ + UpsertModelInfo, UpdateSource, Result as DbResult, + generate_prefixed_id, upsert_date, +}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +pub enum HttpResponseState { + Initialized, + Connected, + Closed, +} + +impl Default for HttpResponseState { + fn default() -> Self { + Self::Initialized + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct HttpResponseHeader { + pub name: String, + pub value: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(default, rename_all = "camelCase")] +#[enum_def(table_name = "proxy_http_responses")] +pub struct HttpResponse { + pub model: String, + pub id: String, + pub proxy_request_id: i64, + pub created_at: NaiveDateTime, + pub updated_at: NaiveDateTime, + pub elapsed: i32, + pub elapsed_headers: i32, + pub elapsed_dns: i32, + pub status: i32, + pub status_reason: Option, + pub url: String, + pub headers: Vec, + pub request_headers: Vec, + pub error: Option, + pub body_path: Option, + pub content_length: Option, + pub content_length_compressed: Option, + pub request_content_length: Option, + pub remote_addr: Option, + pub version: Option, + pub state: HttpResponseState, +} + +impl UpsertModelInfo for HttpResponse { + fn table_name() -> impl IntoTableRef + IntoIden { + HttpResponseIden::Table + } + + fn id_column() -> impl IntoIden + Eq + Clone { + HttpResponseIden::Id + } + + fn generate_id() -> String { + generate_prefixed_id("rs") + } + + fn order_by() -> (impl IntoColumnRef, Order) { + (HttpResponseIden::CreatedAt, Desc) + } + + fn get_id(&self) -> String { + self.id.clone() + } + + fn insert_values( + self, + source: &UpdateSource, + ) -> DbResult)>> { + use HttpResponseIden::*; + Ok(vec![ + (CreatedAt, upsert_date(source, self.created_at)), + (UpdatedAt, upsert_date(source, self.updated_at)), + (ProxyRequestId, self.proxy_request_id.into()), + (BodyPath, self.body_path.into()), + (ContentLength, self.content_length.into()), + (ContentLengthCompressed, self.content_length_compressed.into()), + (Elapsed, self.elapsed.into()), + (ElapsedHeaders, self.elapsed_headers.into()), + (ElapsedDns, self.elapsed_dns.into()), + (Error, self.error.into()), + (Headers, serde_json::to_string(&self.headers)?.into()), + (RemoteAddr, self.remote_addr.into()), + (RequestContentLength, self.request_content_length.into()), + (RequestHeaders, serde_json::to_string(&self.request_headers)?.into()), + (State, serde_json::to_value(&self.state)?.as_str().into()), + (Status, self.status.into()), + (StatusReason, self.status_reason.into()), + (Url, self.url.into()), + (Version, self.version.into()), + ]) + } + + fn update_columns() -> Vec { + vec![ + HttpResponseIden::UpdatedAt, + HttpResponseIden::BodyPath, + HttpResponseIden::ContentLength, + HttpResponseIden::ContentLengthCompressed, + HttpResponseIden::Elapsed, + HttpResponseIden::ElapsedHeaders, + HttpResponseIden::ElapsedDns, + HttpResponseIden::Error, + HttpResponseIden::Headers, + HttpResponseIden::RemoteAddr, + HttpResponseIden::RequestContentLength, + HttpResponseIden::RequestHeaders, + HttpResponseIden::State, + HttpResponseIden::Status, + HttpResponseIden::StatusReason, + HttpResponseIden::Url, + HttpResponseIden::Version, + ] + } + + fn from_row(r: &Row) -> rusqlite::Result + where + Self: Sized, + { + let headers: String = r.get("headers")?; + let request_headers: String = r.get("request_headers")?; + let state: String = r.get("state")?; + Ok(Self { + id: r.get("id")?, + model: r.get("model")?, + proxy_request_id: r.get("proxy_request_id")?, + created_at: r.get("created_at")?, + updated_at: r.get("updated_at")?, + error: r.get("error")?, + url: r.get("url")?, + content_length: r.get("content_length")?, + content_length_compressed: r.get("content_length_compressed").unwrap_or_default(), + version: r.get("version")?, + elapsed: r.get("elapsed")?, + elapsed_headers: r.get("elapsed_headers")?, + elapsed_dns: r.get("elapsed_dns").unwrap_or_default(), + remote_addr: r.get("remote_addr")?, + status: r.get("status")?, + status_reason: r.get("status_reason")?, + state: serde_json::from_str(format!(r#""{state}""#).as_str()).unwrap_or_default(), + body_path: r.get("body_path")?, + headers: serde_json::from_str(headers.as_str()).unwrap_or_default(), + request_content_length: r.get("request_content_length").unwrap_or_default(), + request_headers: serde_json::from_str(request_headers.as_str()).unwrap_or_default(), + }) + } +} diff --git a/crates-tauri/yaak-app-client/src/models_ext.rs b/crates-tauri/yaak-app-client/src/models_ext.rs index 14b5eebf..795af40f 100644 --- a/crates-tauri/yaak-app-client/src/models_ext.rs +++ b/crates-tauri/yaak-app-client/src/models_ext.rs @@ -10,7 +10,7 @@ use tauri::plugin::TauriPlugin; use tauri::{Emitter, Manager, Runtime, State}; use tauri_plugin_dialog::{DialogExt, MessageDialogKind}; use yaak_models::blob_manager::BlobManager; -use yaak_models::db_context::DbContext; +use yaak_models::client_db::ClientDb; use yaak_models::error::Result; use yaak_models::models::{AnyModel, GraphQlIntrospection, GrpcEvent, Settings, WebsocketEvent}; use yaak_models::query_manager::QueryManager; @@ -88,10 +88,10 @@ async fn run_model_change_poller( /// Extension trait for accessing the QueryManager from Tauri Manager types. pub trait QueryManagerExt<'a, R> { fn db_manager(&'a self) -> State<'a, QueryManager>; - fn db(&'a self) -> DbContext<'a>; + fn db(&'a self) -> ClientDb<'a>; fn with_tx(&'a self, func: F) -> Result where - F: FnOnce(&DbContext) -> Result; + F: FnOnce(&ClientDb) -> Result; } impl<'a, R: Runtime, M: Manager> QueryManagerExt<'a, R> for M { @@ -99,14 +99,14 @@ impl<'a, R: Runtime, M: Manager> QueryManagerExt<'a, R> for M { self.state::() } - fn db(&'a self) -> DbContext<'a> { + fn db(&'a self) -> ClientDb<'a> { let qm = self.state::(); qm.inner().connect() } fn with_tx(&'a self, func: F) -> Result where - F: FnOnce(&DbContext) -> Result, + F: FnOnce(&ClientDb) -> Result, { let qm = self.state::(); qm.inner().with_tx(func) diff --git a/crates-tauri/yaak-license/src/license.rs b/crates-tauri/yaak-license/src/license.rs index f0732b8a..dcc3fd12 100644 --- a/crates-tauri/yaak-license/src/license.rs +++ b/crates-tauri/yaak-license/src/license.rs @@ -9,18 +9,18 @@ use tauri::{AppHandle, Emitter, Manager, Runtime, WebviewWindow, is_dev}; use ts_rs::TS; use yaak_api::{ApiClientKind, yaak_api_client}; use yaak_common::platform::get_os_str; -use yaak_models::db_context::DbContext; +use yaak_models::client_db::ClientDb; use yaak_models::query_manager::QueryManager; use yaak_models::util::UpdateSource; /// Extension trait for accessing the QueryManager from Tauri Manager types. /// This is needed temporarily until all crates are refactored to not use Tauri. trait QueryManagerExt<'a, R> { - fn db(&'a self) -> DbContext<'a>; + fn db(&'a self) -> ClientDb<'a>; } impl<'a, R: Runtime, M: Manager> QueryManagerExt<'a, R> for M { - fn db(&'a self) -> DbContext<'a> { + fn db(&'a self) -> ClientDb<'a> { let qm = self.state::(); qm.inner().connect() } diff --git a/crates/common/yaak-database/Cargo.toml b/crates/common/yaak-database/Cargo.toml index 749dc5f2..35df46d1 100644 --- a/crates/common/yaak-database/Cargo.toml +++ b/crates/common/yaak-database/Cargo.toml @@ -5,12 +5,15 @@ edition = "2024" publish = false [dependencies] +chrono = { version = "0.4.38", features = ["serde"] } include_dir = "0.7" log = { workspace = true } nanoid = "0.4.0" r2d2 = "0.8.10" r2d2_sqlite = { version = "0.25.0" } rusqlite = { version = "0.32.1", features = ["bundled", "chrono"] } +sea-query = { version = "0.32.1", features = ["with-chrono", "attr"] } +sea-query-rusqlite = { version = "0.7.0", features = ["with-chrono"] } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } thiserror = { workspace = true } diff --git a/crates/yaak-models/src/db_context.rs b/crates/common/yaak-database/src/db_context.rs similarity index 56% rename from crates/yaak-models/src/db_context.rs rename to crates/common/yaak-database/src/db_context.rs index 1e25fc1a..83e56711 100644 --- a/crates/yaak-models/src/db_context.rs +++ b/crates/common/yaak-database/src/db_context.rs @@ -1,33 +1,37 @@ use crate::connection_or_tx::ConnectionOrTx; use crate::error::Error::ModelNotFound; use crate::error::Result; -use crate::models::{AnyModel, UpsertModelInfo}; -use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource}; -use rusqlite::{OptionalExtension, params}; +use crate::traits::UpsertModelInfo; +use crate::update_source::UpdateSource; use sea_query::{ - Asterisk, Expr, Func, IntoColumnRef, IntoIden, IntoTableRef, OnConflict, Query, SimpleExpr, + Asterisk, Expr, Func, IntoColumnRef, IntoIden, OnConflict, Query, SimpleExpr, SqliteQueryBuilder, }; use sea_query_rusqlite::RusqliteBinder; use std::fmt::Debug; -use std::sync::mpsc; pub struct DbContext<'a> { - pub(crate) _events_tx: mpsc::Sender, - pub(crate) conn: ConnectionOrTx<'a>, + conn: ConnectionOrTx<'a>, } impl<'a> DbContext<'a> { - pub(crate) fn find_one<'s, M>( + pub fn new(conn: ConnectionOrTx<'a>) -> Self { + Self { conn } + } + + pub fn conn(&self) -> &ConnectionOrTx<'a> { + &self.conn + } + + pub fn find_one( &self, col: impl IntoColumnRef + IntoIden + Clone, value: impl Into + Debug, ) -> Result where - M: Into + Clone + UpsertModelInfo, + M: UpsertModelInfo, { let value_debug = format!("{:?}", value); - let value_expr = value.into(); let (sql, params) = Query::select() .from(M::table_name()) @@ -47,13 +51,13 @@ impl<'a> DbContext<'a> { } } - pub(crate) fn find_optional<'s, M>( + pub fn find_optional( &self, col: impl IntoColumnRef, value: impl Into, ) -> Option where - M: Into + Clone + UpsertModelInfo, + M: UpsertModelInfo, { let (sql, params) = Query::select() .from(M::table_name()) @@ -62,13 +66,12 @@ impl<'a> DbContext<'a> { .build_rusqlite(SqliteQueryBuilder); let mut stmt = self.conn.prepare(sql.as_str()).expect("Failed to prepare query"); stmt.query_row(&*params.as_params(), M::from_row) - .optional() - .expect("Failed to run find on DB") + .ok() } - pub(crate) fn find_all<'s, M>(&self) -> Result> + pub fn find_all(&self) -> Result> where - M: Into + Clone + UpsertModelInfo, + M: UpsertModelInfo, { let (order_by_col, order_by_dir) = M::order_by(); let (sql, params) = Query::select() @@ -81,16 +84,15 @@ impl<'a> DbContext<'a> { Ok(items.map(|v| v.unwrap()).collect()) } - pub(crate) fn find_many<'s, M>( + pub fn find_many( &self, col: impl IntoColumnRef, value: impl Into, limit: Option, ) -> Result> where - M: Into + Clone + UpsertModelInfo, + M: UpsertModelInfo, { - // TODO: Figure out how to do this conditional builder better let (order_by_col, order_by_dir) = M::order_by(); let (sql, params) = if let Some(limit) = limit { Query::select() @@ -114,46 +116,30 @@ impl<'a> DbContext<'a> { Ok(items.map(|v| v.unwrap()).collect()) } - pub(crate) fn upsert(&self, model: &M, source: &UpdateSource) -> Result + /// Upsert a model. Returns `(model, created)` where `created` is true if a new row was inserted. + pub fn upsert(&self, model: &M, source: &UpdateSource) -> Result<(M, bool)> where - M: Into + From + UpsertModelInfo + Clone, + M: UpsertModelInfo + Clone, { - self.upsert_one( - M::table_name(), - M::id_column(), - model.get_id().as_str(), - model.clone().insert_values(source)?, - M::update_columns(), - source, - ) - } + let id_iden = M::id_column().into_iden(); + let id_val = model.get_id(); + let other_values = model.clone().insert_values(source)?; - fn upsert_one( - &self, - table: impl IntoTableRef, - id_col: impl IntoIden + Eq + Clone, - id_val: &str, - other_values: Vec<(impl IntoIden + Eq, impl Into)>, - update_columns: Vec, - source: &UpdateSource, - ) -> Result - where - M: Into + From + UpsertModelInfo + Clone, - { - let id_iden = id_col.into_iden(); let mut column_vec = vec![id_iden.clone()]; - let mut value_vec = - vec![if id_val == "" { M::generate_id().into() } else { id_val.into() }]; + let mut value_vec = vec![ + if id_val.is_empty() { M::generate_id().into() } else { id_val.into() }, + ]; for (col, val) in other_values { value_vec.push(val.into()); column_vec.push(col.into_iden()); } - let on_conflict = OnConflict::column(id_iden).update_columns(update_columns).to_owned(); + let on_conflict = + OnConflict::column(id_iden).update_columns(M::update_columns()).to_owned(); let (sql, params) = Query::insert() - .into_table(table) + .into_table(M::table_name()) .columns(column_vec) .values_panic(value_vec) .on_conflict(on_conflict) @@ -173,59 +159,19 @@ impl<'a> DbContext<'a> { }) })?; - let payload = ModelPayload { - model: m.clone().into(), - update_source: source.clone(), - change: ModelChangeEvent::Upsert { created }, - }; - - self.record_model_change(&payload)?; - let _ = self._events_tx.send(payload); - - Ok(m) + Ok((m, created)) } - pub(crate) fn delete<'s, M>(&self, m: &M, source: &UpdateSource) -> Result + /// Delete a model by its ID. Returns the number of rows deleted. + pub fn delete(&self, m: &M) -> Result where - M: Into + Clone + UpsertModelInfo, + M: UpsertModelInfo, { let (sql, params) = Query::delete() .from_table(M::table_name()) .cond_where(Expr::col(M::id_column().into_iden()).eq(m.get_id())) .build_rusqlite(SqliteQueryBuilder); - self.conn.execute(sql.as_str(), &*params.as_params())?; - - let payload = ModelPayload { - model: m.clone().into(), - update_source: source.clone(), - change: ModelChangeEvent::Delete, - }; - - self.record_model_change(&payload)?; - let _ = self._events_tx.send(payload); - - Ok(m.clone()) - } - - fn record_model_change(&self, payload: &ModelPayload) -> Result<()> { - let payload_json = serde_json::to_string(payload)?; - let source_json = serde_json::to_string(&payload.update_source)?; - let change_json = serde_json::to_string(&payload.change)?; - - self.conn.resolve().execute( - r#" - INSERT INTO model_changes (model, model_id, change, update_source, payload) - VALUES (?1, ?2, ?3, ?4, ?5) - "#, - params![ - payload.model.model(), - payload.model.id(), - change_json, - source_json, - payload_json, - ], - )?; - - Ok(()) + let count = self.conn.execute(sql.as_str(), &*params.as_params())?; + Ok(count) } } diff --git a/crates/common/yaak-database/src/lib.rs b/crates/common/yaak-database/src/lib.rs index 9e1aa0ed..c6b2643d 100644 --- a/crates/common/yaak-database/src/lib.rs +++ b/crates/common/yaak-database/src/lib.rs @@ -1,15 +1,23 @@ pub mod connection_or_tx; +pub mod db_context; pub mod error; pub mod migrate; +pub mod traits; +pub mod update_source; pub mod util; // Re-export key types for convenience pub use connection_or_tx::ConnectionOrTx; +pub use db_context::DbContext; pub use error::{Error, Result}; pub use migrate::run_migrations; +pub use traits::{UpsertModelInfo, upsert_date}; +pub use update_source::UpdateSource; pub use util::{generate_id, generate_id_of_length, generate_prefixed_id}; // Re-export pool types that consumers will need pub use r2d2; pub use r2d2_sqlite; pub use rusqlite; +pub use sea_query; +pub use sea_query_rusqlite; diff --git a/crates/common/yaak-database/src/traits.rs b/crates/common/yaak-database/src/traits.rs new file mode 100644 index 00000000..33e15044 --- /dev/null +++ b/crates/common/yaak-database/src/traits.rs @@ -0,0 +1,36 @@ +use crate::error::Result; +use crate::update_source::UpdateSource; +use chrono::{NaiveDateTime, Utc}; +use rusqlite::Row; +use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr}; + +pub trait UpsertModelInfo { + fn table_name() -> impl IntoTableRef + IntoIden; + fn id_column() -> impl IntoIden + Eq + Clone; + fn generate_id() -> String; + fn order_by() -> (impl IntoColumnRef, Order); + fn get_id(&self) -> String; + fn insert_values( + self, + source: &UpdateSource, + ) -> Result)>>; + fn update_columns() -> Vec; + fn from_row(row: &Row) -> rusqlite::Result + where + Self: Sized; +} + +/// Generate timestamps for upsert operations. +/// Sync and import operations preserve existing timestamps; other sources use current time. +pub fn upsert_date(update_source: &UpdateSource, dt: NaiveDateTime) -> SimpleExpr { + match update_source { + UpdateSource::Sync | UpdateSource::Import => { + if dt.and_utc().timestamp() == 0 { + Utc::now().naive_utc().into() + } else { + dt.into() + } + } + _ => Utc::now().naive_utc().into(), + } +} diff --git a/crates/common/yaak-database/src/update_source.rs b/crates/common/yaak-database/src/update_source.rs new file mode 100644 index 00000000..f448e04a --- /dev/null +++ b/crates/common/yaak-database/src/update_source.rs @@ -0,0 +1,17 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum UpdateSource { + Background, + Import, + Plugin, + Sync, + Window { label: String }, +} + +impl UpdateSource { + pub fn from_window_label(label: impl Into) -> Self { + Self::Window { label: label.into() } + } +} diff --git a/crates/yaak-models/src/client_db.rs b/crates/yaak-models/src/client_db.rs new file mode 100644 index 00000000..470a99a8 --- /dev/null +++ b/crates/yaak-models/src/client_db.rs @@ -0,0 +1,127 @@ +use crate::error::Result; +use crate::models::{AnyModel, UpsertModelInfo}; +use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource}; +use rusqlite::params; +use sea_query::{IntoColumnRef, IntoIden, SimpleExpr}; +use std::fmt::Debug; +use std::sync::mpsc; +use yaak_database::DbContext; + +pub struct ClientDb<'a> { + pub(crate) ctx: DbContext<'a>, + pub(crate) events_tx: mpsc::Sender, +} + +impl<'a> ClientDb<'a> { + pub fn new(ctx: DbContext<'a>, events_tx: mpsc::Sender) -> Self { + Self { ctx, events_tx } + } + + /// Access the underlying connection for custom queries. + pub(crate) fn conn(&self) -> &yaak_database::ConnectionOrTx<'a> { + self.ctx.conn() + } + + // --- Read delegates (thin wrappers over DbContext) --- + + pub(crate) fn find_one( + &self, + col: impl IntoColumnRef + IntoIden + Clone, + value: impl Into + Debug, + ) -> Result + where + M: UpsertModelInfo, + { + Ok(self.ctx.find_one(col, value)?) + } + + pub(crate) fn find_optional( + &self, + col: impl IntoColumnRef, + value: impl Into, + ) -> Option + where + M: UpsertModelInfo, + { + self.ctx.find_optional(col, value) + } + + pub(crate) fn find_all(&self) -> Result> + where + M: UpsertModelInfo, + { + Ok(self.ctx.find_all()?) + } + + pub(crate) fn find_many( + &self, + col: impl IntoColumnRef, + value: impl Into, + limit: Option, + ) -> Result> + where + M: UpsertModelInfo, + { + Ok(self.ctx.find_many(col, value, limit)?) + } + + // --- Write operations (with event recording) --- + + pub(crate) fn upsert(&self, model: &M, source: &UpdateSource) -> Result + where + M: Into + UpsertModelInfo + Clone, + { + let (m, created) = self.ctx.upsert(model, &source.to_db())?; + + let payload = ModelPayload { + model: m.clone().into(), + update_source: source.clone(), + change: ModelChangeEvent::Upsert { created }, + }; + + self.record_model_change(&payload)?; + let _ = self.events_tx.send(payload); + + Ok(m) + } + + pub(crate) fn delete(&self, m: &M, source: &UpdateSource) -> Result + where + M: Into + Clone + UpsertModelInfo, + { + self.ctx.delete(m)?; + + let payload = ModelPayload { + model: m.clone().into(), + update_source: source.clone(), + change: ModelChangeEvent::Delete, + }; + + self.record_model_change(&payload)?; + let _ = self.events_tx.send(payload); + + Ok(m.clone()) + } + + fn record_model_change(&self, payload: &ModelPayload) -> Result<()> { + let payload_json = serde_json::to_string(payload)?; + let source_json = serde_json::to_string(&payload.update_source)?; + let change_json = serde_json::to_string(&payload.change)?; + + self.ctx.conn().resolve().execute( + r#" + INSERT INTO model_changes (model, model_id, change, update_source, payload) + VALUES (?1, ?2, ?3, ?4, ?5) + "#, + params![ + payload.model.model(), + payload.model.id(), + change_json, + source_json, + payload_json, + ], + )?; + + Ok(()) + } +} diff --git a/crates/yaak-models/src/error.rs b/crates/yaak-models/src/error.rs index 3be44ab0..7726ed8c 100644 --- a/crates/yaak-models/src/error.rs +++ b/crates/yaak-models/src/error.rs @@ -40,6 +40,20 @@ pub enum Error { Unknown, } +impl From for Error { + fn from(e: yaak_database::Error) -> Self { + match e { + yaak_database::Error::SqlError(e) => Error::SqlError(e), + yaak_database::Error::SqlPoolError(e) => Error::SqlPoolError(e), + yaak_database::Error::Database(s) => Error::Database(s), + yaak_database::Error::Io(e) => Error::Io(e), + yaak_database::Error::JsonError(e) => Error::JsonError(e), + yaak_database::Error::ModelNotFound(s) => Error::ModelNotFound(s), + yaak_database::Error::MigrationError(s) => Error::MigrationError(s), + } + } +} + impl Serialize for Error { fn serialize(&self, serializer: S) -> std::result::Result where diff --git a/crates/yaak-models/src/lib.rs b/crates/yaak-models/src/lib.rs index 7e102546..d6107f66 100644 --- a/crates/yaak-models/src/lib.rs +++ b/crates/yaak-models/src/lib.rs @@ -12,8 +12,8 @@ use std::sync::mpsc; use std::time::Duration; pub mod blob_manager; +pub mod client_db; mod connection_or_tx; -pub mod db_context; pub mod error; pub mod migrate; pub mod models; diff --git a/crates/yaak-models/src/models.rs b/crates/yaak-models/src/models.rs index 9c206b0a..d7efe1e5 100644 --- a/crates/yaak-models/src/models.rs +++ b/crates/yaak-models/src/models.rs @@ -3,7 +3,7 @@ use crate::models::HttpRequestIden::{ Authentication, AuthenticationType, Body, BodyType, CreatedAt, Description, FolderId, Headers, Method, Name, SortPriority, UpdatedAt, Url, UrlParameters, WorkspaceId, }; -use crate::util::{UpdateSource, generate_prefixed_id}; +use crate::util::generate_prefixed_id; use chrono::{NaiveDateTime, Utc}; use rusqlite::Row; use schemars::JsonSchema; @@ -16,6 +16,8 @@ use std::collections::HashMap; use std::fmt::{Debug, Display}; use std::str::FromStr; use ts_rs::TS; +pub use yaak_database::{UpsertModelInfo, upsert_date}; +use yaak_database::{UpdateSource, Result as DbResult}; #[macro_export] macro_rules! impl_model { @@ -190,7 +192,7 @@ impl UpsertModelInfo for Settings { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use SettingsIden::*; let proxy = match self.proxy { None => None, @@ -346,7 +348,7 @@ impl UpsertModelInfo for Workspace { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use WorkspaceIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -453,7 +455,7 @@ impl UpsertModelInfo for WorkspaceMeta { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use WorkspaceMetaIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -554,7 +556,7 @@ impl UpsertModelInfo for CookieJar { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use CookieJarIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -642,7 +644,7 @@ impl UpsertModelInfo for Environment { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use EnvironmentIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -775,7 +777,7 @@ impl UpsertModelInfo for Folder { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use FolderIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -909,7 +911,7 @@ impl UpsertModelInfo for HttpRequest { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), (UpdatedAt, upsert_date(source, self.updated_at)), @@ -1036,7 +1038,7 @@ impl UpsertModelInfo for WebsocketConnection { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use WebsocketConnectionIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1151,7 +1153,7 @@ impl UpsertModelInfo for WebsocketRequest { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use WebsocketRequestIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1276,7 +1278,7 @@ impl UpsertModelInfo for WebsocketEvent { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use WebsocketEventIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1397,7 +1399,7 @@ impl UpsertModelInfo for HttpResponse { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use HttpResponseIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1593,7 +1595,7 @@ impl UpsertModelInfo for HttpResponseEvent { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use HttpResponseEventIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1681,7 +1683,7 @@ impl UpsertModelInfo for GraphQlIntrospection { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use GraphQlIntrospectionIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1766,7 +1768,7 @@ impl UpsertModelInfo for GrpcRequest { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use GrpcRequestIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -1893,7 +1895,7 @@ impl UpsertModelInfo for GrpcConnection { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use GrpcConnectionIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -2013,7 +2015,7 @@ impl UpsertModelInfo for GrpcEvent { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use GrpcEventIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -2144,7 +2146,7 @@ impl UpsertModelInfo for Plugin { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use PluginIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -2229,7 +2231,7 @@ impl UpsertModelInfo for SyncState { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use SyncStateIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -2312,7 +2314,7 @@ impl UpsertModelInfo for KeyValue { fn insert_values( self, source: &UpdateSource, - ) -> Result)>> { + ) -> DbResult)>> { use KeyValueIden::*; Ok(vec![ (CreatedAt, upsert_date(source, self.created_at)), @@ -2525,36 +2527,3 @@ impl AnyModel { } } -pub trait UpsertModelInfo { - fn table_name() -> impl IntoTableRef + IntoIden; - fn id_column() -> impl IntoIden + Eq + Clone; - fn generate_id() -> String; - fn order_by() -> (impl IntoColumnRef, Order); - fn get_id(&self) -> String; - fn insert_values( - self, - source: &UpdateSource, - ) -> Result)>>; - fn update_columns() -> Vec; - fn from_row(row: &Row) -> rusqlite::Result - where - Self: Sized; -} - -// Generate the created_at or updated_at timestamps for an upsert operation, depending on the ID -// provided. -fn upsert_date(update_source: &UpdateSource, dt: NaiveDateTime) -> SimpleExpr { - match update_source { - // Sync and import operations always preserve timestamps - UpdateSource::Sync | UpdateSource::Import => { - if dt.and_utc().timestamp() == 0 { - // Sometimes data won't have timestamps (partial data) - Utc::now().naive_utc().into() - } else { - dt.into() - } - } - // Other sources will always update to the latest time - _ => Utc::now().naive_utc().into(), - } -} diff --git a/crates/yaak-models/src/queries/any_request.rs b/crates/yaak-models/src/queries/any_request.rs index bde0a6fa..bff17b5e 100644 --- a/crates/yaak-models/src/queries/any_request.rs +++ b/crates/yaak-models/src/queries/any_request.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{GrpcRequest, HttpRequest, WebsocketRequest}; @@ -8,7 +8,7 @@ pub enum AnyRequest { WebsocketRequest(WebsocketRequest), } -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_any_request(&self, id: &str) -> Result { if let Ok(http_request) = self.get_http_request(id) { Ok(AnyRequest::HttpRequest(http_request)) diff --git a/crates/yaak-models/src/queries/batch.rs b/crates/yaak-models/src/queries/batch.rs index b5a711e0..f4163724 100644 --- a/crates/yaak-models/src/queries/batch.rs +++ b/crates/yaak-models/src/queries/batch.rs @@ -1,10 +1,10 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{Environment, Folder, GrpcRequest, HttpRequest, WebsocketRequest, Workspace}; use crate::util::{BatchUpsertResult, UpdateSource}; use log::info; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn batch_upsert( &self, workspaces: Vec, diff --git a/crates/yaak-models/src/queries/cookie_jars.rs b/crates/yaak-models/src/queries/cookie_jars.rs index 7d656176..3c4f6695 100644 --- a/crates/yaak-models/src/queries/cookie_jars.rs +++ b/crates/yaak-models/src/queries/cookie_jars.rs @@ -1,9 +1,9 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{CookieJar, CookieJarIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_cookie_jar(&self, id: &str) -> Result { self.find_one(CookieJarIden::Id, id) } diff --git a/crates/yaak-models/src/queries/environments.rs b/crates/yaak-models/src/queries/environments.rs index 7b003d15..a0aaf2fb 100644 --- a/crates/yaak-models/src/queries/environments.rs +++ b/crates/yaak-models/src/queries/environments.rs @@ -1,11 +1,11 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Error::{MissingBaseEnvironment, MultipleBaseEnvironments}; use crate::error::Result; use crate::models::{Environment, EnvironmentIden, EnvironmentVariable}; use crate::util::UpdateSource; use log::{info, warn}; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_environment(&self, id: &str) -> Result { self.find_one(EnvironmentIden::Id, id) } diff --git a/crates/yaak-models/src/queries/folders.rs b/crates/yaak-models/src/queries/folders.rs index 75fa7ce0..20d60ce3 100644 --- a/crates/yaak-models/src/queries/folders.rs +++ b/crates/yaak-models/src/queries/folders.rs @@ -1,5 +1,5 @@ use crate::connection_or_tx::ConnectionOrTx; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{ Environment, EnvironmentIden, Folder, FolderIden, GrpcRequest, GrpcRequestIden, HttpRequest, @@ -9,7 +9,7 @@ use crate::util::UpdateSource; use serde_json::Value; use std::collections::BTreeMap; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_folder(&self, id: &str) -> Result { self.find_one(FolderIden::Id, id) } @@ -19,7 +19,7 @@ impl<'a> DbContext<'a> { } pub fn delete_folder(&self, folder: &Folder, source: &UpdateSource) -> Result { - match self.conn { + match self.conn() { ConnectionOrTx::Connection(_) => {} ConnectionOrTx::Transaction(_) => {} } diff --git a/crates/yaak-models/src/queries/graphql_introspections.rs b/crates/yaak-models/src/queries/graphql_introspections.rs index 111ee1c4..2edbd2e3 100644 --- a/crates/yaak-models/src/queries/graphql_introspections.rs +++ b/crates/yaak-models/src/queries/graphql_introspections.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{GraphQlIntrospection, GraphQlIntrospectionIden}; use crate::util::UpdateSource; @@ -6,7 +6,7 @@ use chrono::{Duration, Utc}; use sea_query::{Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_graphql_introspection(&self, request_id: &str) -> Option { self.find_optional(GraphQlIntrospectionIden::RequestId, request_id) } @@ -44,7 +44,7 @@ impl<'a> DbContext<'a> { .cond_where(Expr::col(GraphQlIntrospectionIden::UpdatedAt).lt(cutoff)) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.resolve().prepare(sql.as_str())?; + let mut stmt = self.conn().resolve().prepare(sql.as_str())?; stmt.execute(&*params.as_params())?; Ok(()) } diff --git a/crates/yaak-models/src/queries/grpc_connections.rs b/crates/yaak-models/src/queries/grpc_connections.rs index 49d75351..32212092 100644 --- a/crates/yaak-models/src/queries/grpc_connections.rs +++ b/crates/yaak-models/src/queries/grpc_connections.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{GrpcConnection, GrpcConnectionIden, GrpcConnectionState}; use crate::queries::MAX_HISTORY_ITEMS; @@ -7,7 +7,7 @@ use log::debug; use sea_query::{Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_grpc_connection(&self, id: &str) -> Result { self.find_one(GrpcConnectionIden::Id, id) } @@ -71,7 +71,7 @@ impl<'a> DbContext<'a> { .values([(GrpcConnectionIden::State, closed.as_str().into())]) .cond_where(Expr::col(GrpcConnectionIden::State).ne(closed.as_str())) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.prepare(sql.as_str())?; + let mut stmt = self.conn().prepare(sql.as_str())?; stmt.execute(&*params.as_params())?; Ok(()) } diff --git a/crates/yaak-models/src/queries/grpc_events.rs b/crates/yaak-models/src/queries/grpc_events.rs index 1794d5f2..3e1d1479 100644 --- a/crates/yaak-models/src/queries/grpc_events.rs +++ b/crates/yaak-models/src/queries/grpc_events.rs @@ -1,9 +1,9 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{GrpcEvent, GrpcEventIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_grpc_events(&self, id: &str) -> Result { self.find_one(GrpcEventIden::Id, id) } diff --git a/crates/yaak-models/src/queries/grpc_requests.rs b/crates/yaak-models/src/queries/grpc_requests.rs index 5b65cb52..2fbd4977 100644 --- a/crates/yaak-models/src/queries/grpc_requests.rs +++ b/crates/yaak-models/src/queries/grpc_requests.rs @@ -1,12 +1,12 @@ use super::dedupe_headers; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{Folder, FolderIden, GrpcRequest, GrpcRequestIden, HttpRequestHeader}; use crate::util::UpdateSource; use serde_json::Value; use std::collections::BTreeMap; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_grpc_request(&self, id: &str) -> Result { self.find_one(GrpcRequestIden::Id, id) } diff --git a/crates/yaak-models/src/queries/http_requests.rs b/crates/yaak-models/src/queries/http_requests.rs index c0de36a0..46286f4f 100644 --- a/crates/yaak-models/src/queries/http_requests.rs +++ b/crates/yaak-models/src/queries/http_requests.rs @@ -1,12 +1,12 @@ use super::dedupe_headers; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{Folder, FolderIden, HttpRequest, HttpRequestHeader, HttpRequestIden}; use crate::util::UpdateSource; use serde_json::Value; use std::collections::BTreeMap; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_http_request(&self, id: &str) -> Result { self.find_one(HttpRequestIden::Id, id) } diff --git a/crates/yaak-models/src/queries/http_response_events.rs b/crates/yaak-models/src/queries/http_response_events.rs index 145ea444..2c873ecc 100644 --- a/crates/yaak-models/src/queries/http_response_events.rs +++ b/crates/yaak-models/src/queries/http_response_events.rs @@ -1,9 +1,9 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{HttpResponseEvent, HttpResponseEventIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn list_http_response_events(&self, response_id: &str) -> Result> { self.find_many(HttpResponseEventIden::ResponseId, response_id, None) } diff --git a/crates/yaak-models/src/queries/http_responses.rs b/crates/yaak-models/src/queries/http_responses.rs index a555b276..d1282418 100644 --- a/crates/yaak-models/src/queries/http_responses.rs +++ b/crates/yaak-models/src/queries/http_responses.rs @@ -1,5 +1,5 @@ use crate::blob_manager::BlobManager; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{HttpResponse, HttpResponseIden, HttpResponseState}; use crate::queries::MAX_HISTORY_ITEMS; @@ -9,7 +9,7 @@ use sea_query::{Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; use std::fs; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_http_response(&self, id: &str) -> Result { self.find_one(HttpResponseIden::Id, id) } @@ -101,7 +101,7 @@ impl<'a> DbContext<'a> { .values([(HttpResponseIden::State, closed.as_str().into())]) .cond_where(Expr::col(HttpResponseIden::State).ne(closed.as_str())) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.prepare(sql.as_str())?; + let mut stmt = self.conn().prepare(sql.as_str())?; stmt.execute(&*params.as_params())?; Ok(()) } diff --git a/crates/yaak-models/src/queries/key_values.rs b/crates/yaak-models/src/queries/key_values.rs index 038627d1..56da7513 100644 --- a/crates/yaak-models/src/queries/key_values.rs +++ b/crates/yaak-models/src/queries/key_values.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{KeyValue, KeyValueIden, UpsertModelInfo}; use crate::util::UpdateSource; @@ -7,7 +7,7 @@ use log::error; use sea_query::{Asterisk, Cond, Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn list_key_values(&self) -> Result> { let (sql, params) = Query::select() .from(KeyValueIden::Table) @@ -18,7 +18,7 @@ impl<'a> DbContext<'a> { // TODO: Add migration to delete key/values with NULL IDs later on, then remove this .cond_where(Expr::col(KeyValueIden::Id).is_not_null()) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.prepare(sql.as_str())?; + let mut stmt = self.conn().prepare(sql.as_str())?; let items = stmt.query_map(&*params.as_params(), KeyValue::from_row)?; Ok(items.map(|v| v.unwrap()).collect()) } @@ -86,7 +86,7 @@ impl<'a> DbContext<'a> { .add(Expr::col(KeyValueIden::Key).eq(key)), ) .build_rusqlite(SqliteQueryBuilder); - self.conn.resolve().query_row(sql.as_str(), &*params.as_params(), KeyValue::from_row).ok() + self.conn().resolve().query_row(sql.as_str(), &*params.as_params(), KeyValue::from_row).ok() } pub fn set_key_value_dte( diff --git a/crates/yaak-models/src/queries/model_changes.rs b/crates/yaak-models/src/queries/model_changes.rs index 141796f7..de263a05 100644 --- a/crates/yaak-models/src/queries/model_changes.rs +++ b/crates/yaak-models/src/queries/model_changes.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::util::ModelPayload; use rusqlite::params; @@ -11,13 +11,13 @@ pub struct PersistedModelChange { pub payload: ModelPayload, } -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn list_model_changes_after( &self, after_id: i64, limit: usize, ) -> Result> { - let mut stmt = self.conn.prepare( + let mut stmt = self.conn().prepare( r#" SELECT id, created_at, payload FROM model_changes @@ -46,7 +46,7 @@ impl<'a> DbContext<'a> { since_id: i64, limit: usize, ) -> Result> { - let mut stmt = self.conn.prepare( + let mut stmt = self.conn().prepare( r#" SELECT id, created_at, payload FROM model_changes @@ -72,7 +72,7 @@ impl<'a> DbContext<'a> { pub fn prune_model_changes_older_than_days(&self, days: i64) -> Result { let offset = format!("-{days} days"); - Ok(self.conn.resolve().execute( + Ok(self.conn().resolve().execute( r#" DELETE FROM model_changes WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) @@ -83,7 +83,7 @@ impl<'a> DbContext<'a> { pub fn prune_model_changes_older_than_hours(&self, hours: i64) -> Result { let offset = format!("-{hours} hours"); - Ok(self.conn.resolve().execute( + Ok(self.conn().resolve().execute( r#" DELETE FROM model_changes WHERE created_at < STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW', ?1) diff --git a/crates/yaak-models/src/queries/plugin_key_values.rs b/crates/yaak-models/src/queries/plugin_key_values.rs index fe0bba0a..9d5c664a 100644 --- a/crates/yaak-models/src/queries/plugin_key_values.rs +++ b/crates/yaak-models/src/queries/plugin_key_values.rs @@ -1,11 +1,11 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{PluginKeyValue, PluginKeyValueIden}; use sea_query::Keyword::CurrentTimestamp; use sea_query::{Asterisk, Cond, Expr, OnConflict, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_plugin_key_value(&self, plugin_name: &str, key: &str) -> Option { let (sql, params) = Query::select() .from(PluginKeyValueIden::Table) @@ -16,7 +16,7 @@ impl<'a> DbContext<'a> { .add(Expr::col(PluginKeyValueIden::Key).eq(key)), ) .build_rusqlite(SqliteQueryBuilder); - self.conn.resolve().query_row(sql.as_str(), &*params.as_params(), |row| row.try_into()).ok() + self.conn().resolve().query_row(sql.as_str(), &*params.as_params(), |row| row.try_into()).ok() } pub fn set_plugin_key_value( @@ -52,7 +52,7 @@ impl<'a> DbContext<'a> { .build_rusqlite(SqliteQueryBuilder); let mut stmt = - self.conn.prepare(sql.as_str()).expect("Failed to prepare PluginKeyValue upsert"); + self.conn().prepare(sql.as_str()).expect("Failed to prepare PluginKeyValue upsert"); let m: PluginKeyValue = stmt .query_row(&*params.as_params(), |row| row.try_into()) .expect("Failed to upsert KeyValue"); @@ -73,7 +73,7 @@ impl<'a> DbContext<'a> { .add(Expr::col(PluginKeyValueIden::Key).eq(key)), ) .build_rusqlite(SqliteQueryBuilder); - self.conn.execute(sql.as_str(), &*params.as_params())?; + self.conn().execute(sql.as_str(), &*params.as_params())?; Ok(true) } } diff --git a/crates/yaak-models/src/queries/plugins.rs b/crates/yaak-models/src/queries/plugins.rs index c3b97e14..48dd1998 100644 --- a/crates/yaak-models/src/queries/plugins.rs +++ b/crates/yaak-models/src/queries/plugins.rs @@ -1,9 +1,9 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{Plugin, PluginIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_plugin(&self, id: &str) -> Result { self.find_one(PluginIden::Id, id) } diff --git a/crates/yaak-models/src/queries/settings.rs b/crates/yaak-models/src/queries/settings.rs index b44aee3a..660e6e42 100644 --- a/crates/yaak-models/src/queries/settings.rs +++ b/crates/yaak-models/src/queries/settings.rs @@ -1,11 +1,11 @@ use std::collections::HashMap; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{EditorKeymap, Settings, SettingsIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_settings(&self) -> Settings { let id = "default".to_string(); diff --git a/crates/yaak-models/src/queries/sync_states.rs b/crates/yaak-models/src/queries/sync_states.rs index ee8ac9de..5d112a29 100644 --- a/crates/yaak-models/src/queries/sync_states.rs +++ b/crates/yaak-models/src/queries/sync_states.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{SyncState, SyncStateIden, UpsertModelInfo}; use crate::util::UpdateSource; @@ -6,7 +6,7 @@ use sea_query::{Asterisk, Cond, Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; use std::path::Path; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_sync_state(&self, id: &str) -> Result { self.find_one(SyncStateIden::Id, id) } @@ -29,7 +29,7 @@ impl<'a> DbContext<'a> { .add(Expr::col(SyncStateIden::SyncDir).eq(sync_dir.to_string_lossy())), ) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.prepare(sql.as_str())?; + let mut stmt = self.conn().prepare(sql.as_str())?; let items = stmt.query_map(&*params.as_params(), SyncState::from_row)?; Ok(items.map(|v| v.unwrap()).collect()) } diff --git a/crates/yaak-models/src/queries/websocket_connections.rs b/crates/yaak-models/src/queries/websocket_connections.rs index 0799c7ff..eb6f45c6 100644 --- a/crates/yaak-models/src/queries/websocket_connections.rs +++ b/crates/yaak-models/src/queries/websocket_connections.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{WebsocketConnection, WebsocketConnectionIden, WebsocketConnectionState}; use crate::queries::MAX_HISTORY_ITEMS; @@ -7,7 +7,7 @@ use log::debug; use sea_query::{Expr, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_websocket_connection(&self, id: &str) -> Result { self.find_one(WebsocketConnectionIden::Id, id) } @@ -90,7 +90,7 @@ impl<'a> DbContext<'a> { .values([(WebsocketConnectionIden::State, closed.as_str().into())]) .cond_where(Expr::col(WebsocketConnectionIden::State).ne(closed.as_str())) .build_rusqlite(SqliteQueryBuilder); - let mut stmt = self.conn.prepare(sql.as_str())?; + let mut stmt = self.conn().prepare(sql.as_str())?; stmt.execute(&*params.as_params())?; Ok(()) } diff --git a/crates/yaak-models/src/queries/websocket_events.rs b/crates/yaak-models/src/queries/websocket_events.rs index 299658cb..458f516c 100644 --- a/crates/yaak-models/src/queries/websocket_events.rs +++ b/crates/yaak-models/src/queries/websocket_events.rs @@ -1,9 +1,9 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{WebsocketEvent, WebsocketEventIden}; use crate::util::UpdateSource; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_websocket_event(&self, id: &str) -> Result { self.find_one(WebsocketEventIden::Id, id) } diff --git a/crates/yaak-models/src/queries/websocket_requests.rs b/crates/yaak-models/src/queries/websocket_requests.rs index 77a8e19c..1e449d5a 100644 --- a/crates/yaak-models/src/queries/websocket_requests.rs +++ b/crates/yaak-models/src/queries/websocket_requests.rs @@ -1,5 +1,5 @@ use super::dedupe_headers; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{ Folder, FolderIden, HttpRequestHeader, WebsocketRequest, WebsocketRequestIden, @@ -8,7 +8,7 @@ use crate::util::UpdateSource; use serde_json::Value; use std::collections::BTreeMap; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_websocket_request(&self, id: &str) -> Result { self.find_one(WebsocketRequestIden::Id, id) } diff --git a/crates/yaak-models/src/queries/workspace_metas.rs b/crates/yaak-models/src/queries/workspace_metas.rs index 26c51c86..a977c524 100644 --- a/crates/yaak-models/src/queries/workspace_metas.rs +++ b/crates/yaak-models/src/queries/workspace_metas.rs @@ -1,10 +1,10 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{WorkspaceMeta, WorkspaceMetaIden}; use crate::util::UpdateSource; use log::info; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_workspace_meta(&self, workspace_id: &str) -> Option { self.find_optional(WorkspaceMetaIden::WorkspaceId, workspace_id) } diff --git a/crates/yaak-models/src/queries/workspaces.rs b/crates/yaak-models/src/queries/workspaces.rs index 978caade..ecb8a3d1 100644 --- a/crates/yaak-models/src/queries/workspaces.rs +++ b/crates/yaak-models/src/queries/workspaces.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{ EnvironmentIden, FolderIden, GrpcRequestIden, HttpRequestHeader, HttpRequestIden, @@ -8,7 +8,7 @@ use crate::util::UpdateSource; use serde_json::Value; use std::collections::BTreeMap; -impl<'a> DbContext<'a> { +impl<'a> ClientDb<'a> { pub fn get_workspace(&self, id: &str) -> Result { self.find_one(WorkspaceIden::Id, id) } diff --git a/crates/yaak-models/src/query_manager.rs b/crates/yaak-models/src/query_manager.rs index fc222074..ebd7f87d 100644 --- a/crates/yaak-models/src/query_manager.rs +++ b/crates/yaak-models/src/query_manager.rs @@ -1,11 +1,11 @@ -use crate::connection_or_tx::ConnectionOrTx; -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Error::GenericError; use crate::util::ModelPayload; use r2d2::Pool; use r2d2_sqlite::SqliteConnectionManager; use rusqlite::TransactionBehavior; use std::sync::{Arc, Mutex, mpsc}; +use yaak_database::{ConnectionOrTx, DbContext}; #[derive(Debug, Clone)] pub struct QueryManager { @@ -18,19 +18,20 @@ impl QueryManager { QueryManager { pool: Arc::new(Mutex::new(pool)), events_tx } } - pub fn connect(&self) -> DbContext<'_> { + pub fn connect(&self) -> ClientDb<'_> { let conn = self .pool .lock() .expect("Failed to gain lock on DB") .get() .expect("Failed to get a new DB connection from the pool"); - DbContext { _events_tx: self.events_tx.clone(), conn: ConnectionOrTx::Connection(conn) } + let ctx = DbContext::new(ConnectionOrTx::Connection(conn)); + ClientDb::new(ctx, self.events_tx.clone()) } pub fn with_conn(&self, func: F) -> T where - F: FnOnce(&DbContext) -> T, + F: FnOnce(&ClientDb) -> T, { let conn = self .pool @@ -39,17 +40,15 @@ impl QueryManager { .get() .expect("Failed to get new DB connection from the pool"); - let db_context = DbContext { - _events_tx: self.events_tx.clone(), - conn: ConnectionOrTx::Connection(conn), - }; + let ctx = DbContext::new(ConnectionOrTx::Connection(conn)); + let db = ClientDb::new(ctx, self.events_tx.clone()); - func(&db_context) + func(&db) } pub fn with_tx( &self, - func: impl FnOnce(&DbContext) -> std::result::Result, + func: impl FnOnce(&ClientDb) -> std::result::Result, ) -> std::result::Result where E: From, @@ -64,12 +63,10 @@ impl QueryManager { .transaction_with_behavior(TransactionBehavior::Immediate) .expect("Failed to start DB transaction"); - let db_context = DbContext { - _events_tx: self.events_tx.clone(), - conn: ConnectionOrTx::Transaction(&tx), - }; + let ctx = DbContext::new(ConnectionOrTx::Transaction(&tx)); + let db = ClientDb::new(ctx, self.events_tx.clone()); - match func(&db_context) { + match func(&db) { Ok(val) => { tx.commit() .map_err(|e| GenericError(format!("Failed to commit transaction {e:?}")))?; diff --git a/crates/yaak-models/src/util.rs b/crates/yaak-models/src/util.rs index 0b026f11..4f28f38d 100644 --- a/crates/yaak-models/src/util.rs +++ b/crates/yaak-models/src/util.rs @@ -1,4 +1,4 @@ -use crate::db_context::DbContext; +use crate::client_db::ClientDb; use crate::error::Result; use crate::models::{ AnyModel, Environment, Folder, GrpcRequest, HttpRequest, UpsertModelInfo, WebsocketRequest, @@ -44,6 +44,30 @@ impl UpdateSource { pub fn from_window_label(label: impl Into) -> Self { Self::Window { label: label.into() } } + + pub fn to_db(&self) -> yaak_database::UpdateSource { + match self { + UpdateSource::Background => yaak_database::UpdateSource::Background, + UpdateSource::Import => yaak_database::UpdateSource::Import, + UpdateSource::Plugin => yaak_database::UpdateSource::Plugin, + UpdateSource::Sync => yaak_database::UpdateSource::Sync, + UpdateSource::Window { label } => { + yaak_database::UpdateSource::Window { label: label.clone() } + } + } + } +} + +impl From for UpdateSource { + fn from(source: yaak_database::UpdateSource) -> Self { + match source { + yaak_database::UpdateSource::Background => UpdateSource::Background, + yaak_database::UpdateSource::Import => UpdateSource::Import, + yaak_database::UpdateSource::Plugin => UpdateSource::Plugin, + yaak_database::UpdateSource::Sync => UpdateSource::Sync, + yaak_database::UpdateSource::Window { label } => UpdateSource::Window { label }, + } + } } #[derive(Default, Debug, Deserialize, Serialize)] @@ -68,7 +92,7 @@ pub struct BatchUpsertResult { } pub fn get_workspace_export_resources( - db: &DbContext, + db: &ClientDb, yaak_version: &str, workspace_ids: Vec<&str>, include_private_environments: bool, diff --git a/crates/yaak-sync/src/sync.rs b/crates/yaak-sync/src/sync.rs index 2d2287b2..b2021cb3 100644 --- a/crates/yaak-sync/src/sync.rs +++ b/crates/yaak-sync/src/sync.rs @@ -10,7 +10,7 @@ use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; use ts_rs::TS; -use yaak_models::db_context::DbContext; +use yaak_models::client_db::ClientDb; use yaak_models::models::{SyncState, WorkspaceMeta}; use yaak_models::util::{UpdateSource, get_workspace_export_resources}; @@ -106,7 +106,7 @@ pub struct FsCandidate { } pub fn get_db_candidates( - db: &DbContext, + db: &ClientDb, version: &str, workspace_id: &str, sync_dir: &Path, @@ -296,7 +296,7 @@ pub fn compute_sync_ops( .collect() } -fn workspace_models(db: &DbContext, version: &str, workspace_id: &str) -> Result> { +fn workspace_models(db: &ClientDb, version: &str, workspace_id: &str) -> Result> { // We want to include private environments here so that we can take them into account during // the sync process. Otherwise, they would be treated as deleted. let include_private_environments = true; @@ -338,7 +338,7 @@ fn workspace_models(db: &DbContext, version: &str, workspace_id: &str) -> Result /// Apply sync operations to the filesystem and database. /// Returns a list of SyncStateOps that should be applied afterward. pub fn apply_sync_ops( - db: &DbContext, + db: &ClientDb, workspace_id: &str, sync_dir: &Path, sync_ops: Vec, @@ -502,7 +502,7 @@ pub enum SyncStateOp { } pub fn apply_sync_state_ops( - db: &DbContext, + db: &ClientDb, workspace_id: &str, sync_dir: &Path, ops: Vec, @@ -547,7 +547,7 @@ fn derive_model_filename(m: &SyncModel) -> PathBuf { Path::new(&rel).to_path_buf() } -fn delete_model(db: &DbContext, model: &SyncModel) -> Result<()> { +fn delete_model(db: &ClientDb, model: &SyncModel) -> Result<()> { match model { SyncModel::Workspace(m) => { db.delete_workspace(&m, &UpdateSource::Sync)?;