From 60773cab53bc10d25baba14367ee743df23e92af Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sun, 4 Feb 2024 21:17:05 -0800 Subject: [PATCH] Fix DB mutex deadlock --- ...a452230ff4ecbdfb5be54a034ec7c4fb1cabe.json | 12 - ...244c83de39048e77c5af0b0b5d188d3279ca4.json | 12 + src-tauri/grpc/src/lib.rs | 2 - src-tauri/grpc/src/manager.rs | 1 - src-tauri/src/http.rs | 4 +- src-tauri/src/main.rs | 427 +++++++----------- src-tauri/src/models.rs | 70 ++- src-web/components/GlobalHooks.tsx | 21 +- src-web/components/GrpcConnectionLayout.tsx | 111 +++-- 9 files changed, 312 insertions(+), 348 deletions(-) delete mode 100644 src-tauri/.sqlx/query-1ad8a2581417acb2d1b06fc2727a452230ff4ecbdfb5be54a034ec7c4fb1cabe.json create mode 100644 src-tauri/.sqlx/query-f7df06213eff80e2ce5100b77ec244c83de39048e77c5af0b0b5d188d3279ca4.json diff --git a/src-tauri/.sqlx/query-1ad8a2581417acb2d1b06fc2727a452230ff4ecbdfb5be54a034ec7c4fb1cabe.json b/src-tauri/.sqlx/query-1ad8a2581417acb2d1b06fc2727a452230ff4ecbdfb5be54a034ec7c4fb1cabe.json deleted file mode 100644 index a89b3c8d..00000000 --- a/src-tauri/.sqlx/query-1ad8a2581417acb2d1b06fc2727a452230ff4ecbdfb5be54a034ec7c4fb1cabe.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method\n )\n VALUES ( ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 5 - }, - "nullable": [] - }, - "hash": "1ad8a2581417acb2d1b06fc2727a452230ff4ecbdfb5be54a034ec7c4fb1cabe" -} diff --git a/src-tauri/.sqlx/query-f7df06213eff80e2ce5100b77ec244c83de39048e77c5af0b0b5d188d3279ca4.json b/src-tauri/.sqlx/query-f7df06213eff80e2ce5100b77ec244c83de39048e77c5af0b0b5d188d3279ca4.json new file mode 100644 index 00000000..cc93d656 --- /dev/null +++ b/src-tauri/.sqlx/query-f7df06213eff80e2ce5100b77ec244c83de39048e77c5af0b0b5d188d3279ca4.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method\n )\n VALUES (?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 5 + }, + "nullable": [] + }, + "hash": "f7df06213eff80e2ce5100b77ec244c83de39048e77c5af0b0b5d188d3279ca4" +} diff --git a/src-tauri/grpc/src/lib.rs b/src-tauri/grpc/src/lib.rs index cc318486..e0146375 100644 --- a/src-tauri/grpc/src/lib.rs +++ b/src-tauri/grpc/src/lib.rs @@ -1,8 +1,6 @@ use prost_reflect::SerializeOptions; use serde::{Deserialize, Serialize}; -use tokio_stream::Stream; use tonic::transport::Uri; -use tonic::IntoRequest; use crate::proto::fill_pool; diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs index 70c38e32..c8ad4f1f 100644 --- a/src-tauri/grpc/src/manager.rs +++ b/src-tauri/grpc/src/manager.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use hyper::client::connect::Connect; use hyper::client::HttpConnector; use hyper::Client; use hyper_rustls::HttpsConnector; diff --git a/src-tauri/src/http.rs b/src-tauri/src/http.rs index 307166a2..cc670229 100644 --- a/src-tauri/src/http.rs +++ b/src-tauri/src/http.rs @@ -366,7 +366,7 @@ pub async fn send_http_request( .await .expect("Failed to update response"); if !request.id.is_empty() { - emit_side_effect(app_handle.clone(), "updated_model", &response); + emit_side_effect(app_handle.clone(), "upserted_model", &response); } // Copy response to download path, if specified @@ -399,7 +399,7 @@ pub async fn send_http_request( cookie_jar.cookies = json_cookies; match models::upsert_cookie_jar(db, &cookie_jar).await { Ok(updated_jar) => { - emit_side_effect(app_handle, "updated_model", &updated_jar); + emit_side_effect(app_handle, "upserted_model", &updated_jar); } Err(e) => { error!("Failed to update cookie jar: {}", e); diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 16f0a512..23f802de 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -102,16 +102,14 @@ async fn cmd_grpc_call_unary( request_id: &str, app_handle: AppHandle, grpc_handle: State<'_, Mutex>, - db_state: State<'_, Mutex>>, ) -> Result { - let db = &*db_state.lock().await; - let req = get_grpc_request(db, request_id) + let req = get_grpc_request(&app_handle, request_id) .await .map_err(|e| e.to_string())?; let conn = { let req = req.clone(); upsert_grpc_connection( - db, + &app_handle, &GrpcConnection { workspace_id: req.workspace_id, request_id: req.id, @@ -127,7 +125,7 @@ async fn cmd_grpc_call_unary( let req = req.clone(); let conn = conn.clone(); upsert_grpc_message( - db, + &app_handle, &GrpcMessage { workspace_id: req.workspace_id, request_id: req.id, @@ -157,7 +155,7 @@ async fn cmd_grpc_call_unary( { Ok(msg) => { upsert_grpc_message( - db, + &app_handle, &GrpcMessage { message: msg, workspace_id: req.workspace_id, @@ -178,18 +176,15 @@ async fn cmd_grpc_call_unary( #[tauri::command] async fn cmd_grpc_client_streaming( request_id: &str, - grpc_handle: State<'_, Mutex>, app_handle: AppHandle, - db_state: State<'_, Mutex>>, ) -> Result { - let db = &*db_state.lock().await; - let req = get_grpc_request(db, request_id) + let req = get_grpc_request(&app_handle, request_id) .await .map_err(|e| e.to_string())?; let conn = { let req = req.clone(); upsert_grpc_connection( - db, + &app_handle, &GrpcConnection { workspace_id: req.workspace_id, request_id: req.id, @@ -204,24 +199,19 @@ async fn cmd_grpc_client_streaming( { let conn = conn.clone(); let req = req.clone(); - let db = db.clone(); - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Initiating connection".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Initiating connection".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .unwrap(); }; let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); @@ -277,24 +267,18 @@ async fn cmd_grpc_client_streaming( let req = req.clone(); let conn = conn.clone(); tauri::async_runtime::spawn(async move { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: msg, - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: msg, + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + ..Default::default() + }, + ) + .await + .unwrap(); }); } Ok(IncomingMsg::Commit) => { @@ -318,79 +302,61 @@ async fn cmd_grpc_client_streaming( let req = req.clone(); async move { let grpc_handle = app_handle.state::>(); - let db_state = app_handle.state::>>(); let msg = grpc_handle .lock() .await .client_streaming(&conn.id, uri, &service, &method, in_msg_stream) .await .unwrap(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - db, - &GrpcMessage { - message: msg.to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_server: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: msg.to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_server: true, + ..Default::default() + }, + ) + .await + .unwrap(); } }; { let conn = conn.clone(); + let app_handle = app_handle.clone(); tauri::async_runtime::spawn(async move { tokio::select! { _ = grpc_listen => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Connection completed".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Connection completed".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await.map_err(|e| e.to_string()).unwrap(); }, _ = cancelled_rx.changed() => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Connection cancelled".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Connection cancelled".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .map_err(|e| e.to_string()).unwrap(); }, } app_handle.unlisten(event_handler); @@ -405,16 +371,14 @@ async fn cmd_grpc_streaming( request_id: &str, app_handle: AppHandle, grpc_handle: State<'_, Mutex>, - db_state: State<'_, Mutex>>, ) -> Result { - let db = &*db_state.lock().await; - let req = get_grpc_request(db, request_id) + let req = get_grpc_request(&app_handle, request_id) .await .map_err(|e| e.to_string())?; let conn = { let req = req.clone(); upsert_grpc_connection( - db, + &app_handle, &GrpcConnection { workspace_id: req.workspace_id, request_id: req.id, @@ -429,24 +393,19 @@ async fn cmd_grpc_streaming( { let conn = conn.clone(); let req = req.clone(); - let db = db.clone(); - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Initiating connection".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Initiating connection".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"); }; let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); @@ -496,24 +455,19 @@ async fn cmd_grpc_streaming( let req = req.clone(); let conn = conn.clone(); tauri::async_runtime::spawn(async move { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: msg, - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: msg, + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + ..Default::default() + }, + ) + .await + .map_err(|e| e.to_string()) + .unwrap(); }); } Ok(IncomingMsg::Cancel) => { @@ -539,25 +493,20 @@ async fn cmd_grpc_streaming( let item = serde_json::to_string_pretty(&item).unwrap(); let req = req.clone(); let conn = conn.clone(); - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: item, - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_server: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: item, + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_server: true, + ..Default::default() + }, + ) + .await + .map_err(|e| e.to_string()) + .unwrap(); } Some(Err(e)) => { error!("gRPC stream error: {:?}", e); @@ -575,36 +524,25 @@ async fn cmd_grpc_streaming( { let conn = conn.clone(); tauri::async_runtime::spawn(async move { + let app_handle = app_handle.clone(); tokio::select! { _ = grpc_listen => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Connection completed".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Connection completed".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await.map_err(|e| e.to_string()).unwrap(); }, _ = cancelled_rx.changed() => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", upsert_grpc_message( - &db, + &app_handle, &GrpcMessage { message: "Connection cancelled".to_string(), workspace_id: req.workspace_id, @@ -614,9 +552,7 @@ async fn cmd_grpc_streaming( ..Default::default() }, ) - .await - .expect("Failed to upsert message"), - ); + .await.map_err(|e| e.to_string()).unwrap(); }, } app_handle.unlisten(event_handler); @@ -631,17 +567,15 @@ async fn cmd_grpc_server_streaming( request_id: &str, app_handle: AppHandle, grpc_handle: State<'_, Mutex>, - db_state: State<'_, Mutex>>, ) -> Result { - let db = &*db_state.lock().await; - let req = get_grpc_request(db, request_id) + let req = get_grpc_request(&app_handle, request_id) .await .map_err(|e| e.to_string())?; let conn = { let req = req.clone(); upsert_grpc_connection( - db, + &app_handle, &GrpcConnection { workspace_id: req.workspace_id, request_id: req.id, @@ -656,23 +590,19 @@ async fn cmd_grpc_server_streaming( { let req = req.clone(); let conn = conn.clone(); - emit_side_effect( - app_handle.clone(), - "created_model", - upsert_grpc_message( - &db, - &GrpcMessage { - message: "Initiating connection".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"), - ); + upsert_grpc_message( + &app_handle, + &GrpcMessage { + message: "Initiating connection".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .unwrap(); } let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); @@ -718,7 +648,6 @@ async fn cmd_grpc_server_streaming( app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb); let grpc_listen = { - let db = db.clone(); let conn_id = conn.clone().id; let app_handle = app_handle.clone(); let req = req.clone(); @@ -726,11 +655,12 @@ async fn cmd_grpc_server_streaming( loop { let req = req.clone(); let conn_id = conn_id.clone(); + let app_handle = app_handle.clone(); match stream.next().await { Some(Ok(item)) => { let item = serde_json::to_string_pretty(&item).unwrap(); - let msg = upsert_grpc_message( - &db, + upsert_grpc_message( + &app_handle, &GrpcMessage { message: item, workspace_id: req.workspace_id, @@ -743,7 +673,6 @@ async fn cmd_grpc_server_streaming( .await .map_err(|e| e.to_string()) .expect("Failed to upsert message"); - emit_side_effect(app_handle.clone(), "created_model", msg); } Some(Err(e)) => { error!("gRPC stream error: {:?}", e); @@ -761,16 +690,12 @@ async fn cmd_grpc_server_streaming( { let conn = conn.clone(); let req = req.clone(); + let app_handle = app_handle.clone(); tauri::async_runtime::spawn(async move { tokio::select! { _ = grpc_listen => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", upsert_grpc_message( - &db, + &app_handle, &GrpcMessage { message: "Connection completed".to_string(), workspace_id: req.workspace_id, @@ -780,18 +705,11 @@ async fn cmd_grpc_server_streaming( ..Default::default() }, ) - .await - .expect("Failed to upsert message"), - ); + .await.unwrap(); }, _ = cancelled_rx.changed() => { - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; - emit_side_effect( - app_handle.clone(), - "created_model", upsert_grpc_message( - &db, + &app_handle, &GrpcMessage { message: "Connection cancelled".to_string(), workspace_id: req.workspace_id, @@ -801,9 +719,7 @@ async fn cmd_grpc_server_streaming( ..Default::default() }, ) - .await - .expect("Failed to upsert message"), - ); + .await.unwrap(); }, } app_handle.unlisten(event_handler); @@ -1076,7 +992,7 @@ async fn response_err( response = update_response_if_id(db, &response) .await .expect("Failed to update response"); - emit_side_effect(app_handle, "updated_model", &response); + emit_side_effect(app_handle, "upserted_model", &response); Ok(response) } @@ -1136,7 +1052,7 @@ async fn cmd_set_key_value( if created { emit_and_return(&window, "created_model", key_value) } else { - emit_and_return(&window, "updated_model", key_value) + emit_and_return(&window, "upserted_model", key_value) } } @@ -1166,7 +1082,7 @@ async fn cmd_update_cookie_jar( .await .expect("Failed to update cookie jar"); - emit_and_return(&window, "updated_model", updated) + emit_and_return(&window, "upserted_model", updated) } #[tauri::command] @@ -1235,11 +1151,10 @@ async fn cmd_create_grpc_request( sort_priority: f64, folder_id: Option<&str>, window: Window, - db_state: State<'_, Mutex>>, + app_handle: AppHandle, ) -> Result { - let db = &*db_state.lock().await; let created = upsert_grpc_request( - db, + &app_handle, &GrpcRequest { workspace_id: workspace_id.to_string(), name: name.to_string(), @@ -1258,10 +1173,9 @@ async fn cmd_create_grpc_request( async fn cmd_duplicate_grpc_request( id: &str, window: Window, - db_state: State<'_, Mutex>>, + app_handle: AppHandle, ) -> Result { - let db = &*db_state.lock().await; - let request = duplicate_grpc_request(db, id) + let request = duplicate_grpc_request(&app_handle, id) .await .expect("Failed to duplicate grpc request"); emit_and_return(&window, "created_model", request) @@ -1318,7 +1232,7 @@ async fn cmd_update_workspace( .await .expect("Failed to update request"); - emit_and_return(&window, "updated_model", updated_workspace) + emit_and_return(&window, "upserted_model", updated_workspace) } #[tauri::command] @@ -1332,20 +1246,19 @@ async fn cmd_update_environment( .await .expect("Failed to update environment"); - emit_and_return(&window, "updated_model", updated_environment) + emit_and_return(&window, "upserted_model", updated_environment) } #[tauri::command] async fn cmd_update_grpc_request( request: GrpcRequest, window: Window, - db_state: State<'_, Mutex>>, + app_handle: AppHandle, ) -> Result { - let db = &*db_state.lock().await; - let updated_request = upsert_grpc_request(db, &request) + let updated_request = upsert_grpc_request(&app_handle, &request) .await .expect("Failed to update grpc request"); - emit_and_return(&window, "updated_model", updated_request) + emit_and_return(&window, "upserted_model", updated_request) } #[tauri::command] @@ -1358,7 +1271,7 @@ async fn cmd_update_http_request( let updated_request = upsert_http_request(db, request) .await .expect("Failed to update request"); - emit_and_return(&window, "updated_model", updated_request) + emit_and_return(&window, "upserted_model", updated_request) } #[tauri::command] @@ -1434,7 +1347,7 @@ async fn cmd_update_folder( let updated_folder = upsert_folder(db, folder) .await .expect("Failed to update request"); - emit_and_return(&window, "updated_model", updated_folder) + emit_and_return(&window, "upserted_model", updated_folder) } #[tauri::command] @@ -1540,7 +1453,7 @@ async fn cmd_update_settings( .await .expect("Failed to update settings"); - emit_and_return(&window, "updated_model", updated_settings) + emit_and_return(&window, "upserted_model", updated_settings) } #[tauri::command] @@ -1553,12 +1466,10 @@ async fn cmd_get_folder( } #[tauri::command] -async fn cmd_get_grpc_request( - id: &str, - db_state: State<'_, Mutex>>, -) -> Result { - let db = &*db_state.lock().await; - get_grpc_request(db, id).await.map_err(|e| e.to_string()) +async fn cmd_get_grpc_request(id: &str, app_handle: AppHandle) -> Result { + get_grpc_request(&app_handle, id) + .await + .map_err(|e| e.to_string()) } #[tauri::command] diff --git a/src-tauri/src/models.rs b/src-tauri/src/models.rs index 16f018d0..41576d6a 100644 --- a/src-tauri/src/models.rs +++ b/src-tauri/src/models.rs @@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize}; use sqlx::types::chrono::NaiveDateTime; use sqlx::types::{Json, JsonValue}; use sqlx::{Pool, Sqlite}; -use tauri::AppHandle; +use tauri::{AppHandle, Manager}; +use tokio::sync::Mutex; fn default_true() -> bool { true @@ -457,18 +458,19 @@ pub async fn delete_cookie_jar(db: &Pool, id: &str) -> Result, + app_handle: &AppHandle, id: &str, ) -> Result { - let mut request = get_grpc_request(db, id).await?.clone(); + let mut request = get_grpc_request(app_handle, id).await?.clone(); request.id = "".to_string(); - upsert_grpc_request(db, &request).await + upsert_grpc_request(app_handle, &request).await } pub async fn upsert_grpc_request( - db: &Pool, + app_handle: &AppHandle, request: &GrpcRequest, ) -> Result { + let db = get_db(app_handle).await; let id = match request.id.as_str() { "" => generate_id(Some("gr")), _ => request.id.to_string(), @@ -500,13 +502,19 @@ pub async fn upsert_grpc_request( request.method, request.message, ) - .execute(db) + .execute(&db) .await?; - get_grpc_request(db, &id).await + get_grpc_request(app_handle, &id).await } -pub async fn get_grpc_request(db: &Pool, id: &str) -> Result { +pub async fn get_grpc_request( + app_handle: &AppHandle, + id: &str, +) -> Result { + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + sqlx::query_as!( GrpcRequest, r#" @@ -542,9 +550,10 @@ pub async fn list_grpc_requests( } pub async fn upsert_grpc_connection( - db: &Pool, + app_handle: &AppHandle, connection: &GrpcConnection, ) -> Result { + let db = get_db(&app_handle).await; let id = match connection.id.as_str() { "" => generate_id(Some("gc")), _ => connection.id.to_string(), @@ -554,7 +563,7 @@ pub async fn upsert_grpc_connection( INSERT INTO grpc_connections ( id, workspace_id, request_id, service, method ) - VALUES ( ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET updated_at = CURRENT_TIMESTAMP, service = excluded.service, @@ -566,16 +575,17 @@ pub async fn upsert_grpc_connection( connection.service, connection.method, ) - .execute(db) + .execute(&db) .await?; - get_grpc_connection(db, &id).await + get_grpc_connection(app_handle, &id).await } pub async fn get_grpc_connection( - db: &Pool, + app_handle: &AppHandle, id: &str, ) -> Result { + let db = get_db(&app_handle).await; sqlx::query_as!( GrpcConnection, r#" @@ -585,7 +595,7 @@ pub async fn get_grpc_connection( "#, id, ) - .fetch_one(db) + .fetch_one(&db) .await } @@ -608,9 +618,10 @@ pub async fn list_grpc_connections( } pub async fn upsert_grpc_message( - db: &Pool, + app_handle: &AppHandle, message: &GrpcMessage, ) -> Result { + let db = get_db(app_handle).await; let id = match message.id.as_str() { "" => generate_id(Some("gm")), _ => message.id.to_string(), @@ -635,13 +646,21 @@ pub async fn upsert_grpc_message( message.is_server, message.is_info, ) - .execute(db) + .execute(&db) .await?; - get_grpc_message(db, &id).await + let msg = get_grpc_message(app_handle, &id).await; + match msg { + Ok(msg) => Ok(emit_upserted_model(app_handle, msg.clone()).await), + Err(e) => Err(e), + } } -pub async fn get_grpc_message(db: &Pool, id: &str) -> Result { +pub async fn get_grpc_message( + app_handle: &AppHandle, + id: &str, +) -> Result { + let db = get_db(app_handle).await; sqlx::query_as!( GrpcMessage, r#" @@ -653,7 +672,7 @@ pub async fn get_grpc_message(db: &Pool, id: &str) -> Result(app_handle: &AppHandle, model: S) -> S { + app_handle + .emit_all("upserted_model", model.clone()) + .unwrap(); + model +} + +async fn get_db(app_handle: &AppHandle) -> Pool { + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + db.clone() +} diff --git a/src-web/components/GlobalHooks.tsx b/src-web/components/GlobalHooks.tsx index 1c916d02..5f5e7b29 100644 --- a/src-web/components/GlobalHooks.tsx +++ b/src-web/components/GlobalHooks.tsx @@ -86,7 +86,7 @@ export function GlobalHooks() { } }); - useListenToTauriEvent('updated_model', ({ payload, windowLabel }) => { + useListenToTauriEvent('upserted_model', ({ payload, windowLabel }) => { if (shouldIgnoreEvent(payload, windowLabel)) return; const queryKey = @@ -119,12 +119,21 @@ export function GlobalHooks() { wasUpdatedExternally(payload.id); } + const pushToFront = (['http_response', 'grpc_connection'] as Model['model'][]).includes( + payload.model, + ); + if (!shouldIgnoreModel(payload)) { - console.time('set query date'); - queryClient.setQueryData(queryKey, (values) => - values?.map((v) => (modelsEq(v, payload) ? payload : v)), - ); - console.timeEnd('set query date'); + queryClient.setQueryData(queryKey, (values = []) => { + const index = values.findIndex((v) => modelsEq(v, payload)) ?? -1; + if (index >= 0) { + console.log('UPDATED MODEL', payload); + return [...values.slice(0, index), payload, ...values.slice(index + 1)]; + } else { + console.log('INSERTED MODEL', payload); + return pushToFront ? [payload, ...(values ?? [])] : [...(values ?? []), payload]; + } + }); } }); diff --git a/src-web/components/GrpcConnectionLayout.tsx b/src-web/components/GrpcConnectionLayout.tsx index 10aedda7..e7259206 100644 --- a/src-web/components/GrpcConnectionLayout.tsx +++ b/src-web/components/GrpcConnectionLayout.tsx @@ -19,6 +19,7 @@ import { RadioDropdown } from './core/RadioDropdown'; import { Separator } from './core/Separator'; import { SplitLayout } from './core/SplitLayout'; import { HStack, VStack } from './core/Stacks'; +import { StatusTag } from './core/StatusTag'; import { GrpcEditor } from './GrpcEditor'; import { UrlBar } from './UrlBar'; @@ -200,22 +201,22 @@ export function GrpcConnectionLayout({ style }: Props) { {select.options.find((o) => o.value === select.value)?.label} - grpc.cancel.mutateAsync() : handleConnect} - disabled={grpc.isStreaming} - spin={grpc.isStreaming || grpc.unary.isLoading} - icon={ - grpc.isStreaming - ? 'refresh' - : messageType === 'unary' - ? 'sendHorizontal' - : 'arrowUpDown' - } - /> + {!grpc.isStreaming && ( + + )} {grpc.isStreaming && ( ( -
- {...messages.map((m) => ( - { - if (m.id === activeMessageId) setActiveMessageId(null); - else setActiveMessageId(m.id); - }} - alignItems="center" - className={classNames( - 'px-2 py-1 font-mono', - m === activeMessage && 'bg-highlight', +
+ +
+ {grpc.isStreaming ? ( + + + Connected + + ) : ( + 'Done' )} - > - -
{m.message}
-
- {format(m.createdAt, 'HH:mm:ss')} -
- - ))} +
+
+
+ {...messages.map((m) => ( + { + if (m.id === activeMessageId) setActiveMessageId(null); + else setActiveMessageId(m.id); + }} + alignItems="center" + className={classNames( + 'px-2 py-1 font-mono', + m === activeMessage && 'bg-highlight', + )} + > + +
{m.message}
+
+ {format(m.createdAt, 'HH:mm:ss')} +
+
+ ))} +
)} secondSlot={