use crate::connection_or_tx::ConnectionOrTx; use crate::error::Error::ModelNotFound; use crate::error::Result; use crate::models::{AnyModel, UpsertModelInfo}; use crate::util::{ModelChangeEvent, ModelPayload, UpdateSource}; use rusqlite::{OptionalExtension, params}; use sea_query::{ Asterisk, Expr, Func, IntoColumnRef, IntoIden, IntoTableRef, OnConflict, Query, SimpleExpr, SqliteQueryBuilder, }; use sea_query_rusqlite::RusqliteBinder; use std::fmt::Debug; use std::sync::mpsc; pub struct DbContext<'a> { pub(crate) _events_tx: mpsc::Sender, pub(crate) conn: ConnectionOrTx<'a>, } impl<'a> DbContext<'a> { pub(crate) fn find_one<'s, M>( &self, col: impl IntoColumnRef + IntoIden + Clone, value: impl Into + Debug, ) -> Result where M: Into + Clone + UpsertModelInfo, { let value_debug = format!("{:?}", value); let value_expr = value.into(); let (sql, params) = Query::select() .from(M::table_name()) .column(Asterisk) .cond_where(Expr::col(col.clone()).eq(value_expr)) .build_rusqlite(SqliteQueryBuilder); let mut stmt = self.conn.prepare(sql.as_str()).expect("Failed to prepare query"); match stmt.query_row(&*params.as_params(), M::from_row) { Ok(result) => Ok(result), Err(rusqlite::Error::QueryReturnedNoRows) => Err(ModelNotFound(format!( r#"table "{}" {} == {}"#, M::table_name().into_iden().to_string(), col.into_iden().to_string(), value_debug ))), Err(e) => Err(crate::error::Error::SqlError(e)), } } pub(crate) fn find_optional<'s, M>( &self, col: impl IntoColumnRef, value: impl Into, ) -> Option where M: Into + Clone + UpsertModelInfo, { let (sql, params) = Query::select() .from(M::table_name()) .column(Asterisk) .cond_where(Expr::col(col).eq(value)) .build_rusqlite(SqliteQueryBuilder); let mut stmt = self.conn.prepare(sql.as_str()).expect("Failed to prepare query"); stmt.query_row(&*params.as_params(), M::from_row) .optional() .expect("Failed to run find on DB") } pub(crate) fn find_all<'s, M>(&self) -> Result> where M: Into + Clone + UpsertModelInfo, { let (order_by_col, order_by_dir) = M::order_by(); let (sql, params) = Query::select() .from(M::table_name()) .column(Asterisk) .order_by(order_by_col, order_by_dir) .build_rusqlite(SqliteQueryBuilder); let mut stmt = self.conn.resolve().prepare(sql.as_str())?; let items = stmt.query_map(&*params.as_params(), M::from_row)?; Ok(items.map(|v| v.unwrap()).collect()) } pub(crate) fn find_many<'s, M>( &self, col: impl IntoColumnRef, value: impl Into, limit: Option, ) -> Result> where M: Into + Clone + UpsertModelInfo, { // TODO: Figure out how to do this conditional builder better let (order_by_col, order_by_dir) = M::order_by(); let (sql, params) = if let Some(limit) = limit { Query::select() .from(M::table_name()) .column(Asterisk) .cond_where(Expr::col(col).eq(value)) .limit(limit) .order_by(order_by_col, order_by_dir) .build_rusqlite(SqliteQueryBuilder) } else { Query::select() .from(M::table_name()) .column(Asterisk) .cond_where(Expr::col(col).eq(value)) .order_by(order_by_col, order_by_dir) .build_rusqlite(SqliteQueryBuilder) }; let mut stmt = self.conn.resolve().prepare(sql.as_str())?; let items = stmt.query_map(&*params.as_params(), M::from_row)?; Ok(items.map(|v| v.unwrap()).collect()) } pub(crate) fn upsert(&self, model: &M, source: &UpdateSource) -> Result where M: Into + From + UpsertModelInfo + Clone, { self.upsert_one( M::table_name(), M::id_column(), model.get_id().as_str(), model.clone().insert_values(source)?, M::update_columns(), source, ) } fn upsert_one( &self, table: impl IntoTableRef, id_col: impl IntoIden + Eq + Clone, id_val: &str, other_values: Vec<(impl IntoIden + Eq, impl Into)>, update_columns: Vec, source: &UpdateSource, ) -> Result where M: Into + From + UpsertModelInfo + Clone, { let id_iden = id_col.into_iden(); let mut column_vec = vec![id_iden.clone()]; let mut value_vec = vec![if id_val == "" { M::generate_id().into() } else { id_val.into() }]; for (col, val) in other_values { value_vec.push(val.into()); column_vec.push(col.into_iden()); } let on_conflict = OnConflict::column(id_iden).update_columns(update_columns).to_owned(); let (sql, params) = Query::insert() .into_table(table) .columns(column_vec) .values_panic(value_vec) .on_conflict(on_conflict) .returning(Query::returning().exprs(vec![ Expr::col(Asterisk), Expr::expr(Func::cust("last_insert_rowid")), Expr::col("rowid"), ])) .build_rusqlite(SqliteQueryBuilder); let mut stmt = self.conn.resolve().prepare(sql.as_str())?; let (m, created): (M, bool) = stmt.query_row(&*params.as_params(), |row| { M::from_row(row).and_then(|m| { let rowid: i64 = row.get("rowid")?; let last_rowid: i64 = row.get("last_insert_rowid()")?; Ok((m, rowid == last_rowid)) }) })?; let payload = ModelPayload { model: m.clone().into(), update_source: source.clone(), change: ModelChangeEvent::Upsert { created }, }; self.record_model_change(&payload)?; let _ = self._events_tx.send(payload); Ok(m) } pub(crate) fn delete<'s, M>(&self, m: &M, source: &UpdateSource) -> Result where M: Into + Clone + UpsertModelInfo, { let (sql, params) = Query::delete() .from_table(M::table_name()) .cond_where(Expr::col(M::id_column().into_iden()).eq(m.get_id())) .build_rusqlite(SqliteQueryBuilder); self.conn.execute(sql.as_str(), &*params.as_params())?; let payload = ModelPayload { model: m.clone().into(), update_source: source.clone(), change: ModelChangeEvent::Delete, }; self.record_model_change(&payload)?; let _ = self._events_tx.send(payload); Ok(m.clone()) } fn record_model_change(&self, payload: &ModelPayload) -> Result<()> { let payload_json = serde_json::to_string(payload)?; let source_json = serde_json::to_string(&payload.update_source)?; let change_json = serde_json::to_string(&payload.change)?; self.conn.resolve().execute( r#" INSERT INTO model_changes (model, model_id, change, update_source, payload) VALUES (?1, ?2, ?3, ?4, ?5) "#, params![ payload.model.model(), payload.model.id(), change_json, source_json, payload_json, ], )?; Ok(()) } }