mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-24 18:31:16 +01:00
Got models and event system working
This commit is contained in:
@@ -7,24 +7,26 @@ use std::sync::Mutex;
|
||||
use log::warn;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ts_rs::TS;
|
||||
use yaak_database::UpdateSource;
|
||||
use yaak_database::{ModelChangeEvent, UpdateSource};
|
||||
use yaak_proxy::{CapturedRequest, ProxyEvent, ProxyHandle, RequestState};
|
||||
use yaak_rpc::{RpcError, define_rpc};
|
||||
use yaak_rpc::{RpcError, RpcEventEmitter, define_rpc};
|
||||
use crate::db::ProxyQueryManager;
|
||||
use crate::models::{ProxyEntry, ProxyHeader};
|
||||
use crate::models::{HttpExchange, ModelPayload, ProxyHeader};
|
||||
|
||||
// -- Context --
|
||||
|
||||
pub struct ProxyCtx {
|
||||
handle: Mutex<Option<ProxyHandle>>,
|
||||
pub db: ProxyQueryManager,
|
||||
pub events: RpcEventEmitter,
|
||||
}
|
||||
|
||||
impl ProxyCtx {
|
||||
pub fn new(db_path: &Path) -> Self {
|
||||
pub fn new(db_path: &Path, events: RpcEventEmitter) -> Self {
|
||||
Self {
|
||||
handle: Mutex::new(None),
|
||||
db: ProxyQueryManager::new(db_path),
|
||||
events,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,7 +70,8 @@ fn proxy_start(ctx: &ProxyCtx, req: ProxyStartRequest) -> Result<ProxyStartRespo
|
||||
// Spawn event loop before storing the handle
|
||||
if let Some(event_rx) = proxy_handle.take_event_rx() {
|
||||
let db = ctx.db.clone();
|
||||
std::thread::spawn(move || run_event_loop(event_rx, db));
|
||||
let events = ctx.events.clone();
|
||||
std::thread::spawn(move || run_event_loop(event_rx, db, events));
|
||||
}
|
||||
|
||||
*handle = Some(proxy_handle);
|
||||
@@ -85,7 +88,7 @@ fn proxy_stop(ctx: &ProxyCtx, _req: ProxyStopRequest) -> Result<bool, RpcError>
|
||||
|
||||
// -- Event loop --
|
||||
|
||||
fn run_event_loop(rx: std::sync::mpsc::Receiver<ProxyEvent>, db: ProxyQueryManager) {
|
||||
fn run_event_loop(rx: std::sync::mpsc::Receiver<ProxyEvent>, db: ProxyQueryManager, events: RpcEventEmitter) {
|
||||
let mut in_flight: HashMap<u64, CapturedRequest> = HashMap::new();
|
||||
|
||||
while let Ok(event) = rx.recv() {
|
||||
@@ -140,22 +143,22 @@ fn run_event_loop(rx: std::sync::mpsc::Receiver<ProxyEvent>, db: ProxyQueryManag
|
||||
r.response_body_size = size;
|
||||
r.elapsed_ms = r.elapsed_ms.or(Some(elapsed_ms));
|
||||
r.state = RequestState::Complete;
|
||||
write_entry(&db, &r);
|
||||
write_entry(&db, &events, &r);
|
||||
}
|
||||
}
|
||||
ProxyEvent::Error { id, error } => {
|
||||
if let Some(mut r) = in_flight.remove(&id) {
|
||||
r.error = Some(error);
|
||||
r.state = RequestState::Error;
|
||||
write_entry(&db, &r);
|
||||
write_entry(&db, &events, &r);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) {
|
||||
let entry = ProxyEntry {
|
||||
fn write_entry(db: &ProxyQueryManager, events: &RpcEventEmitter, r: &CapturedRequest) {
|
||||
let entry = HttpExchange {
|
||||
url: r.url.clone(),
|
||||
method: r.method.clone(),
|
||||
req_headers: r.request_headers.iter()
|
||||
@@ -171,8 +174,14 @@ fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) {
|
||||
..Default::default()
|
||||
};
|
||||
db.with_conn(|ctx| {
|
||||
if let Err(e) = ctx.upsert(&entry, &UpdateSource::Background) {
|
||||
warn!("Failed to write proxy entry: {e}");
|
||||
match ctx.upsert(&entry, &UpdateSource::Background) {
|
||||
Ok((saved, created)) => {
|
||||
events.emit("model_write", &ModelPayload {
|
||||
model: saved,
|
||||
change: ModelChangeEvent::Upsert { created },
|
||||
});
|
||||
}
|
||||
Err(e) => warn!("Failed to write proxy entry: {e}"),
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -181,6 +190,11 @@ fn write_entry(db: &ProxyQueryManager, r: &CapturedRequest) {
|
||||
|
||||
define_rpc! {
|
||||
ProxyCtx;
|
||||
"proxy_start" => proxy_start(ProxyStartRequest) -> ProxyStartResponse,
|
||||
"proxy_stop" => proxy_stop(ProxyStopRequest) -> bool,
|
||||
commands {
|
||||
proxy_start(ProxyStartRequest) -> ProxyStartResponse,
|
||||
proxy_stop(ProxyStopRequest) -> bool,
|
||||
}
|
||||
events {
|
||||
model_write(ModelPayload),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,7 @@ use rusqlite::Row;
|
||||
use sea_query::{IntoColumnRef, IntoIden, IntoTableRef, Order, SimpleExpr, enum_def};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use ts_rs::TS;
|
||||
use yaak_database::{Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date};
|
||||
use yaak_database::{ModelChangeEvent, Result as DbResult, UpdateSource, UpsertModelInfo, generate_prefixed_id, upsert_date};
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
@@ -16,8 +16,8 @@ pub struct ProxyHeader {
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export, export_to = "gen_models.ts")]
|
||||
#[enum_def(table_name = "proxy_entries")]
|
||||
pub struct ProxyEntry {
|
||||
#[enum_def(table_name = "http_exchanges")]
|
||||
pub struct HttpExchange {
|
||||
pub id: String,
|
||||
pub created_at: NaiveDateTime,
|
||||
pub updated_at: NaiveDateTime,
|
||||
@@ -31,21 +31,29 @@ pub struct ProxyEntry {
|
||||
pub error: Option<String>,
|
||||
}
|
||||
|
||||
impl UpsertModelInfo for ProxyEntry {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export, export_to = "gen_models.ts")]
|
||||
pub struct ModelPayload {
|
||||
pub model: HttpExchange,
|
||||
pub change: ModelChangeEvent,
|
||||
}
|
||||
|
||||
impl UpsertModelInfo for HttpExchange {
|
||||
fn table_name() -> impl IntoTableRef + IntoIden {
|
||||
ProxyEntryIden::Table
|
||||
HttpExchangeIden::Table
|
||||
}
|
||||
|
||||
fn id_column() -> impl IntoIden + Eq + Clone {
|
||||
ProxyEntryIden::Id
|
||||
HttpExchangeIden::Id
|
||||
}
|
||||
|
||||
fn generate_id() -> String {
|
||||
generate_prefixed_id("pe")
|
||||
generate_prefixed_id("he")
|
||||
}
|
||||
|
||||
fn order_by() -> (impl IntoColumnRef, Order) {
|
||||
(ProxyEntryIden::CreatedAt, Order::Desc)
|
||||
(HttpExchangeIden::CreatedAt, Order::Desc)
|
||||
}
|
||||
|
||||
fn get_id(&self) -> String {
|
||||
@@ -56,7 +64,7 @@ impl UpsertModelInfo for ProxyEntry {
|
||||
self,
|
||||
source: &UpdateSource,
|
||||
) -> DbResult<Vec<(impl IntoIden + Eq, impl Into<SimpleExpr>)>> {
|
||||
use ProxyEntryIden::*;
|
||||
use HttpExchangeIden::*;
|
||||
Ok(vec![
|
||||
(CreatedAt, upsert_date(source, self.created_at)),
|
||||
(UpdatedAt, upsert_date(source, self.updated_at)),
|
||||
@@ -73,15 +81,15 @@ impl UpsertModelInfo for ProxyEntry {
|
||||
|
||||
fn update_columns() -> Vec<impl IntoIden> {
|
||||
vec![
|
||||
ProxyEntryIden::UpdatedAt,
|
||||
ProxyEntryIden::Url,
|
||||
ProxyEntryIden::Method,
|
||||
ProxyEntryIden::ReqHeaders,
|
||||
ProxyEntryIden::ReqBody,
|
||||
ProxyEntryIden::ResStatus,
|
||||
ProxyEntryIden::ResHeaders,
|
||||
ProxyEntryIden::ResBody,
|
||||
ProxyEntryIden::Error,
|
||||
HttpExchangeIden::UpdatedAt,
|
||||
HttpExchangeIden::Url,
|
||||
HttpExchangeIden::Method,
|
||||
HttpExchangeIden::ReqHeaders,
|
||||
HttpExchangeIden::ReqBody,
|
||||
HttpExchangeIden::ResStatus,
|
||||
HttpExchangeIden::ResHeaders,
|
||||
HttpExchangeIden::ResBody,
|
||||
HttpExchangeIden::Error,
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user