Initial DB implementation

This commit is contained in:
Gregory Schier
2026-03-08 14:39:00 -07:00
parent a5433fbc74
commit 7382287bef
9 changed files with 316 additions and 23 deletions

View File

@@ -0,0 +1,33 @@
use include_dir::{Dir, include_dir};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use std::path::Path;
use yaak_database::{ConnectionOrTx, DbContext, run_migrations};
static MIGRATIONS: Dir<'static> = include_dir!("$CARGO_MANIFEST_DIR/migrations");
#[derive(Clone)]
pub struct ProxyQueryManager {
pool: Pool<SqliteConnectionManager>,
}
impl ProxyQueryManager {
pub fn new(db_path: &Path) -> Self {
let manager = SqliteConnectionManager::file(db_path);
let pool = Pool::builder()
.max_size(5)
.build(manager)
.expect("Failed to create proxy DB pool");
run_migrations(&pool, &MIGRATIONS).expect("Failed to run proxy DB migrations");
Self { pool }
}
pub fn with_conn<F, T>(&self, func: F) -> T
where
F: FnOnce(&DbContext) -> T,
{
let conn = self.pool.get().expect("Failed to get proxy DB connection");
let ctx = DbContext::new(ConnectionOrTx::Connection(conn));
func(&ctx)
}
}

View File

@@ -1,19 +1,30 @@
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use ts_rs::TS;
use yaak_proxy::ProxyHandle;
use yaak_rpc::{RpcError, define_rpc};
pub mod db;
pub mod models;
// -- Context shared across all RPC handlers --
use std::collections::HashMap;
use std::path::Path;
use std::sync::Mutex;
use log::warn;
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use yaak_database::UpdateSource;
use yaak_proxy::{CapturedRequest, ProxyEvent, ProxyHandle, RequestState};
use yaak_rpc::{RpcError, define_rpc};
use crate::db::ProxyQueryManager;
use crate::models::{ProxyEntry, ProxyHeader};
// -- Context --
pub struct ProxyCtx {
handle: Mutex<Option<ProxyHandle>>,
pub db: ProxyQueryManager,
}
impl ProxyCtx {
pub fn new() -> Self {
pub fn new(db_path: &Path) -> Self {
Self {
handle: Mutex::new(None),
db: ProxyQueryManager::new(db_path),
}
}
}
@@ -47,21 +58,21 @@ fn proxy_start(ctx: &ProxyCtx, req: ProxyStartRequest) -> Result<ProxyStartRespo
.map_err(|_| RpcError { message: "lock poisoned".into() })?;
if let Some(existing) = handle.as_ref() {
return Ok(ProxyStartResponse {
port: existing.port,
already_running: true,
});
return Ok(ProxyStartResponse { port: existing.port, already_running: true });
}
let proxy_handle = yaak_proxy::start_proxy(req.port.unwrap_or(0))
let mut proxy_handle = yaak_proxy::start_proxy(req.port.unwrap_or(0))
.map_err(|e| RpcError { message: e })?;
let port = proxy_handle.port;
*handle = Some(proxy_handle);
Ok(ProxyStartResponse {
port,
already_running: false,
})
// Spawn event loop before storing the handle
if let Some(event_rx) = proxy_handle.take_event_rx() {
let db = ctx.db.clone();
std::thread::spawn(move || run_event_loop(event_rx, db));
}
*handle = Some(proxy_handle);
Ok(ProxyStartResponse { port, already_running: false })
}
fn proxy_stop(ctx: &ProxyCtx, _req: ProxyStopRequest) -> Result<bool, RpcError> {
@@ -72,6 +83,100 @@ fn proxy_stop(ctx: &ProxyCtx, _req: ProxyStopRequest) -> Result<bool, RpcError>
Ok(handle.take().is_some())
}
// -- Event loop --
fn run_event_loop(rx: std::sync::mpsc::Receiver<ProxyEvent>, db: ProxyQueryManager) {
let mut in_flight: HashMap<u64, CapturedRequest> = HashMap::new();
while let Ok(event) = rx.recv() {
match event {
ProxyEvent::RequestStart { id, method, url, http_version } => {
in_flight.insert(id, CapturedRequest {
id,
method,
url,
http_version,
status: None,
elapsed_ms: None,
remote_http_version: None,
request_headers: vec![],
request_body: None,
response_headers: vec![],
response_body: None,
response_body_size: 0,
state: RequestState::Sending,
error: None,
});
}
ProxyEvent::RequestHeader { id, name, value } => {
if let Some(r) = in_flight.get_mut(&id) {
r.request_headers.push((name, value));
}
}
ProxyEvent::RequestBody { id, body } => {
if let Some(r) = in_flight.get_mut(&id) {
r.request_body = Some(body);
}
}
ProxyEvent::ResponseStart { id, status, http_version, elapsed_ms } => {
if let Some(r) = in_flight.get_mut(&id) {
r.status = Some(status);
r.remote_http_version = Some(http_version);
r.elapsed_ms = Some(elapsed_ms);
r.state = RequestState::Receiving;
}
}
ProxyEvent::ResponseHeader { id, name, value } => {
if let Some(r) = in_flight.get_mut(&id) {
r.response_headers.push((name, value));
}
}
ProxyEvent::ResponseBodyChunk { .. } => {
// Progress only — no action needed
}
ProxyEvent::ResponseBodyComplete { id, body, size, elapsed_ms } => {
if let Some(mut r) = in_flight.remove(&id) {
r.response_body = body;
r.response_body_size = size;
r.elapsed_ms = r.elapsed_ms.or(Some(elapsed_ms));
r.state = RequestState::Complete;
write_entry(&db, &r);
}
}
ProxyEvent::Error { id, error } => {
if let Some(mut r) = in_flight.remove(&id) {
r.error = Some(error);
r.state = RequestState::Error;
write_entry(&db, &r);
}
}
}
}
}
fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) {
let entry = ProxyEntry {
url: r.url.clone(),
method: r.method.clone(),
req_headers: r.request_headers.iter()
.map(|(n, v)| ProxyHeader { name: n.clone(), value: v.clone() })
.collect(),
req_body: r.request_body.clone(),
res_status: r.status.map(|s| s as i32),
res_headers: r.response_headers.iter()
.map(|(n, v)| ProxyHeader { name: n.clone(), value: v.clone() })
.collect(),
res_body: r.response_body.clone(),
error: r.error.clone(),
..Default::default()
};
db.with_conn(|ctx| {
if let Err(e) = ctx.upsert(&entry, &UpdateSource::Background) {
warn!("Failed to write proxy entry: {e}");
}
});
}
// -- Router + Schema --
define_rpc! {

View File

@@ -0,0 +1,108 @@
use chrono::NaiveDateTime;
use rusqlite::Row;
use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr, enum_def};
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use yaak_database::{Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date};
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export, export_to = "gen_models.ts")]
pub struct ProxyHeader {
pub name: String,
pub value: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export, export_to = "gen_models.ts")]
#[enum_def(table_name = "proxy_entries")]
pub struct ProxyEntry {
pub id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub url: String,
pub method: String,
pub req_headers: Vec<ProxyHeader>,
pub req_body: Option<Vec<u8>>,
pub res_status: Option<i32>,
pub res_headers: Vec<ProxyHeader>,
pub res_body: Option<Vec<u8>>,
pub error: Option<String>,
}
impl UpsertModelInfo for ProxyEntry {
fn table_name() -> impl IntoTableRef + IntoIden {
ProxyEntryIden::Table
}
fn id_column() -> impl IntoIden + Eq + Clone {
ProxyEntryIden::Id
}
fn generate_id() -> String {
generate_prefixed_id("pe")
}
fn order_by() -> (impl IntoColumnRef, Order) {
(ProxyEntryIden::CreatedAt, Order::Desc)
}
fn get_id(&self) -> String {
self.id.clone()
}
fn insert_values(
self,
source: &UpdateSource,
) -> DbResult<Vec<(impl IntoIden + Eq, impl Into<SimpleExpr>)>> {
use ProxyEntryIden::*;
Ok(vec![
(CreatedAt, upsert_date(source, self.created_at)),
(UpdatedAt, upsert_date(source, self.updated_at)),
(Url, self.url.into()),
(Method, self.method.into()),
(ReqHeaders, serde_json::to_string(&self.req_headers)?.into()),
(ReqBody, self.req_body.into()),
(ResStatus, self.res_status.into()),
(ResHeaders, serde_json::to_string(&self.res_headers)?.into()),
(ResBody, self.res_body.into()),
(Error, self.error.into()),
])
}
fn update_columns() -> Vec<impl IntoIden> {
vec![
ProxyEntryIden::UpdatedAt,
ProxyEntryIden::Url,
ProxyEntryIden::Method,
ProxyEntryIden::ReqHeaders,
ProxyEntryIden::ReqBody,
ProxyEntryIden::ResStatus,
ProxyEntryIden::ResHeaders,
ProxyEntryIden::ResBody,
ProxyEntryIden::Error,
]
}
fn from_row(r: &Row) -> rusqlite::Result<Self>
where
Self: Sized,
{
let req_headers: String = r.get("req_headers")?;
let res_headers: String = r.get("res_headers")?;
Ok(Self {
id: r.get("id")?,
created_at: r.get("created_at")?,
updated_at: r.get("updated_at")?,
url: r.get("url")?,
method: r.get("method")?,
req_headers: serde_json::from_str(&req_headers).unwrap_or_default(),
req_body: r.get("req_body")?,
res_status: r.get("res_status")?,
res_headers: serde_json::from_str(&res_headers).unwrap_or_default(),
res_body: r.get("res_body")?,
error: r.get("error")?,
})
}
}