Database access refactor (#190)

This commit is contained in:
Gregory Schier
2025-03-25 08:35:10 -07:00
committed by GitHub
parent 445c30f3a9
commit 1d37d46130
72 changed files with 4895 additions and 4702 deletions

View File

@@ -10,15 +10,12 @@ use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::Message;
use yaak_http::apply_path_placeholders;
use yaak_models::manager::QueryManagerExt;
use yaak_models::models::{
HttpResponseHeader, WebsocketConnection, WebsocketConnectionState, WebsocketEvent,
WebsocketEventType, WebsocketRequest,
};
use yaak_models::queries;
use yaak_models::queries::{
get_base_environment, get_cookie_jar, get_environment, get_websocket_connection,
get_websocket_request, upsert_websocket_connection, upsert_websocket_event, UpdateSource,
};
use yaak_models::queries_legacy::UpdateSource;
use yaak_plugins::events::{
CallHttpAuthenticationRequest, HttpHeader, RenderPurpose, WindowContext,
};
@@ -31,8 +28,11 @@ pub(crate) async fn upsert_request<R: Runtime>(
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::upsert_websocket_request(&app_handle, request, &UpdateSource::from_window(&window))
.await?)
Ok(app_handle
.queries()
.connect()
.await?
.upsert_websocket_request(&request, &UpdateSource::from_window(&window))?)
}
#[tauri::command]
@@ -41,12 +41,9 @@ pub(crate) async fn duplicate_request<R: Runtime>(
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::duplicate_websocket_request(
&app_handle,
request_id,
&UpdateSource::from_window(&window),
)
.await?)
let db = app_handle.queries().connect().await?;
let request = db.get_websocket_request(request_id)?.unwrap();
Ok(db.duplicate_websocket_request(&request, &UpdateSource::from_window(&window))?)
}
#[tauri::command]
@@ -55,7 +52,11 @@ pub(crate) async fn delete_request<R: Runtime>(
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::delete_websocket_request(&app_handle, request_id, &UpdateSource::from_window(&window)).await?)
Ok(app_handle
.queries()
.connect()
.await?
.delete_websocket_request_by_id(request_id, &UpdateSource::from_window(&window))?)
}
#[tauri::command]
@@ -64,8 +65,11 @@ pub(crate) async fn delete_connection<R: Runtime>(
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketConnection> {
Ok(queries::delete_websocket_connection(&app_handle, connection_id, &UpdateSource::from_window(&window))
.await?)
Ok(app_handle
.queries()
.connect()
.await?
.delete_websocket_connection_by_id(connection_id, &UpdateSource::from_window(&window))?)
}
#[tauri::command]
@@ -74,8 +78,10 @@ pub(crate) async fn delete_connections<R: Runtime>(
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<()> {
Ok(queries::delete_all_websocket_connections(&app_handle, request_id, &UpdateSource::from_window(&window))
.await?)
Ok(app_handle.queries().connect().await?.delete_all_websocket_connections_for_request(
request_id,
&UpdateSource::from_window(&window),
)?)
}
#[tauri::command]
@@ -83,7 +89,7 @@ pub(crate) async fn list_events<R: Runtime>(
connection_id: &str,
app_handle: AppHandle<R>,
) -> Result<Vec<WebsocketEvent>> {
Ok(queries::list_websocket_events(&app_handle, connection_id).await?)
Ok(app_handle.queries().connect().await?.list_websocket_events(connection_id)?)
}
#[tauri::command]
@@ -91,7 +97,7 @@ pub(crate) async fn list_requests<R: Runtime>(
workspace_id: &str,
app_handle: AppHandle<R>,
) -> Result<Vec<WebsocketRequest>> {
Ok(queries::list_websocket_requests(&app_handle, workspace_id).await?)
Ok(app_handle.queries().connect().await?.list_websocket_requests(workspace_id)?)
}
#[tauri::command]
@@ -99,7 +105,11 @@ pub(crate) async fn list_connections<R: Runtime>(
workspace_id: &str,
app_handle: AppHandle<R>,
) -> Result<Vec<WebsocketConnection>> {
Ok(queries::list_websocket_connections_for_workspace(&app_handle, workspace_id).await?)
Ok(app_handle
.queries()
.connect()
.await?
.list_websocket_connections_for_workspace(workspace_id)?)
}
#[tauri::command]
@@ -110,16 +120,23 @@ pub(crate) async fn send<R: Runtime>(
window: WebviewWindow<R>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let connection = get_websocket_connection(&app_handle, connection_id).await?;
let unrendered_request = get_websocket_request(&app_handle, &connection.request_id)
.await?
.ok_or(GenericError("WebSocket Request not found".to_string()))?;
let (connection, unrendered_request) = {
let db = app_handle.queries().connect().await?;
let connection = db.get_websocket_connection(connection_id)?;
let unrendered_request = db
.get_websocket_request(&connection.request_id)?
.ok_or(GenericError("WebSocket Request not found".to_string()))?;
(connection, unrendered_request)
};
let environment = match environment_id {
Some(id) => Some(get_environment(&app_handle, id).await?),
Some(id) => Some(app_handle.queries().connect().await?.get_environment(id)?),
None => None,
};
let base_environment =
get_base_environment(&app_handle, &unrendered_request.workspace_id).await?;
let base_environment = app_handle
.queries()
.connect()
.await?
.get_base_environment(&unrendered_request.workspace_id)?;
let request = render_request(
&unrendered_request,
&base_environment,
@@ -135,9 +152,8 @@ pub(crate) async fn send<R: Runtime>(
let mut ws_manager = ws_manager.lock().await;
ws_manager.send(&connection.id, Message::Text(request.message.clone().into())).await?;
upsert_websocket_event(
&app_handle,
WebsocketEvent {
app_handle.queries().connect().await?.upsert_websocket_event(
&WebsocketEvent {
connection_id: connection.id.clone(),
request_id: request.id.clone(),
workspace_id: connection.workspace_id.clone(),
@@ -147,9 +163,7 @@ pub(crate) async fn send<R: Runtime>(
..Default::default()
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
)?;
Ok(connection)
}
@@ -161,17 +175,17 @@ pub(crate) async fn close<R: Runtime>(
window: WebviewWindow<R>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let connection = get_websocket_connection(&app_handle, connection_id).await?;
let connection = upsert_websocket_connection(
&app_handle,
&WebsocketConnection {
state: WebsocketConnectionState::Closing,
..connection
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
let connection = {
let db = app_handle.queries().connect().await?;
let connection = db.get_websocket_connection(connection_id)?;
db.upsert_websocket_connection(
&WebsocketConnection {
state: WebsocketConnectionState::Closing,
..connection
},
&UpdateSource::from_window(&window),
)?
};
let mut ws_manager = ws_manager.lock().await;
if let Err(e) = ws_manager.close(&connection.id).await {
@@ -191,15 +205,21 @@ pub(crate) async fn connect<R: Runtime>(
plugin_manager: State<'_, PluginManager>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let unrendered_request = get_websocket_request(&app_handle, request_id)
let unrendered_request = app_handle
.queries()
.connect()
.await?
.get_websocket_request(request_id)?
.ok_or(GenericError("Failed to find GRPC request".to_string()))?;
let environment = match environment_id {
Some(id) => Some(get_environment(&app_handle, id).await?),
Some(id) => Some(app_handle.queries().connect().await?.get_environment(id)?),
None => None,
};
let base_environment =
get_base_environment(&app_handle, &unrendered_request.workspace_id).await?;
let base_environment = app_handle
.queries()
.connect()
.await?
.get_base_environment(&unrendered_request.workspace_id)?;
let request = render_request(
&unrendered_request,
&base_environment,
@@ -242,20 +262,18 @@ pub(crate) async fn connect<R: Runtime>(
// TODO: Handle cookies
let _cookie_jar = match cookie_jar_id {
Some(id) => Some(get_cookie_jar(&app_handle, id).await?),
Some(id) => Some(app_handle.queries().connect().await?.get_cookie_jar(id)?),
None => None,
};
let connection = upsert_websocket_connection(
&app_handle,
let connection = app_handle.queries().connect().await?.upsert_websocket_connection(
&WebsocketConnection {
workspace_id: request.workspace_id.clone(),
request_id: request_id.to_string(),
..Default::default()
},
&UpdateSource::from_window(&window),
)
.await?;
)?;
let (receive_tx, mut receive_rx) = mpsc::channel::<Message>(128);
let mut ws_manager = ws_manager.lock().await;
@@ -278,22 +296,19 @@ pub(crate) async fn connect<R: Runtime>(
{
Ok(r) => r,
Err(e) => {
return Ok(upsert_websocket_connection(
&app_handle,
return Ok(app_handle.queries().connect().await?.upsert_websocket_connection(
&WebsocketConnection {
error: Some(format!("{e:?}")),
state: WebsocketConnectionState::Closed,
..connection
},
&UpdateSource::from_window(&window),
)
.await?);
)?);
}
};
upsert_websocket_event(
&app_handle,
WebsocketEvent {
app_handle.queries().connect().await?.upsert_websocket_event(
&WebsocketEvent {
connection_id: connection.id.clone(),
request_id: request.id.clone(),
workspace_id: connection.workspace_id.clone(),
@@ -302,9 +317,7 @@ pub(crate) async fn connect<R: Runtime>(
..Default::default()
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
)?;
let response_headers = response
.headers()
@@ -315,8 +328,7 @@ pub(crate) async fn connect<R: Runtime>(
})
.collect::<Vec<HttpResponseHeader>>();
let connection = upsert_websocket_connection(
&app_handle,
let connection = app_handle.queries().connect().await?.upsert_websocket_connection(
&WebsocketConnection {
state: WebsocketConnectionState::Connected,
headers: response_headers,
@@ -325,8 +337,7 @@ pub(crate) async fn connect<R: Runtime>(
..connection
},
&UpdateSource::from_window(&window),
)
.await?;
)?;
{
let connection_id = connection.id.clone();
@@ -340,59 +351,68 @@ pub(crate) async fn connect<R: Runtime>(
has_written_close = true;
}
upsert_websocket_event(
&app_handle,
WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
workspace_id: workspace_id.clone(),
is_server: true,
message_type: match message {
Message::Text(_) => WebsocketEventType::Text,
Message::Binary(_) => WebsocketEventType::Binary,
Message::Ping(_) => WebsocketEventType::Ping,
Message::Pong(_) => WebsocketEventType::Pong,
Message::Close(_) => WebsocketEventType::Close,
// Raw frame will never happen during a read
Message::Frame(_) => WebsocketEventType::Frame,
app_handle
.queries()
.connect()
.await
.unwrap()
.upsert_websocket_event(
&WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
workspace_id: workspace_id.clone(),
is_server: true,
message_type: match message {
Message::Text(_) => WebsocketEventType::Text,
Message::Binary(_) => WebsocketEventType::Binary,
Message::Ping(_) => WebsocketEventType::Ping,
Message::Pong(_) => WebsocketEventType::Pong,
Message::Close(_) => WebsocketEventType::Close,
// Raw frame will never happen during a read
Message::Frame(_) => WebsocketEventType::Frame,
},
message: message.into_data().into(),
..Default::default()
},
message: message.into_data().into(),
..Default::default()
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
&UpdateSource::from_window(&window),
)
.unwrap();
}
info!("Websocket connection closed");
if !has_written_close {
upsert_websocket_event(
&app_handle,
WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
workspace_id: workspace_id.clone(),
is_server: true,
message_type: WebsocketEventType::Close,
..Default::default()
app_handle
.queries()
.connect()
.await
.unwrap()
.upsert_websocket_event(
&WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
workspace_id: workspace_id.clone(),
is_server: true,
message_type: WebsocketEventType::Close,
..Default::default()
},
&UpdateSource::from_window(&window),
)
.unwrap();
}
app_handle
.queries()
.connect()
.await
.unwrap()
.upsert_websocket_connection(
&WebsocketConnection {
workspace_id: request.workspace_id.clone(),
request_id: request_id.to_string(),
state: WebsocketConnectionState::Closed,
..connection
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
}
upsert_websocket_connection(
&app_handle,
&WebsocketConnection {
workspace_id: request.workspace_id.clone(),
request_id: request_id.to_string(),
state: WebsocketConnectionState::Closed,
..connection
},
&UpdateSource::from_window(&window),
)
.await
.unwrap();
});
}