Fix DB mutex deadlock

This commit is contained in:
Gregory Schier
2024-02-04 21:17:05 -08:00
parent acb01cf086
commit 7bb620e6d5
9 changed files with 312 additions and 348 deletions

View File

@@ -102,16 +102,14 @@ async fn cmd_grpc_call_unary(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcMessage, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -127,7 +125,7 @@ async fn cmd_grpc_call_unary(
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message(
db,
&app_handle,
&GrpcMessage {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -157,7 +155,7 @@ async fn cmd_grpc_call_unary(
{
Ok(msg) => {
upsert_grpc_message(
db,
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
@@ -178,18 +176,15 @@ async fn cmd_grpc_call_unary(
#[tauri::command]
async fn cmd_grpc_client_streaming(
request_id: &str,
grpc_handle: State<'_, Mutex<GrpcManager>>,
app_handle: AppHandle<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -204,24 +199,19 @@ async fn cmd_grpc_client_streaming(
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.unwrap();
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
@@ -277,24 +267,18 @@ async fn cmd_grpc_client_streaming(
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.unwrap();
});
}
Ok(IncomingMsg::Commit) => {
@@ -318,79 +302,61 @@ async fn cmd_grpc_client_streaming(
let req = req.clone();
async move {
let grpc_handle = app_handle.state::<Mutex<GrpcManager>>();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let msg = grpc_handle
.lock()
.await
.client_streaming(&conn.id, uri, &service, &method, in_msg_stream)
.await
.unwrap();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
db,
&GrpcMessage {
message: msg.to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg.to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.unwrap();
}
};
{
let conn = conn.clone();
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await.map_err(|e| e.to_string()).unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string()).unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -405,16 +371,14 @@ async fn cmd_grpc_streaming(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<String, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -429,24 +393,19 @@ async fn cmd_grpc_streaming(
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message");
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
@@ -496,24 +455,19 @@ async fn cmd_grpc_streaming(
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
});
}
Ok(IncomingMsg::Cancel) => {
@@ -539,25 +493,20 @@ async fn cmd_grpc_streaming(
let item = serde_json::to_string_pretty(&item).unwrap();
let req = req.clone();
let conn = conn.clone();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -575,36 +524,25 @@ async fn cmd_grpc_streaming(
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let app_handle = app_handle.clone();
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await.map_err(|e| e.to_string()).unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
@@ -614,9 +552,7 @@ async fn cmd_grpc_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.map_err(|e| e.to_string()).unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -631,17 +567,15 @@ async fn cmd_grpc_server_streaming(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -656,23 +590,19 @@ async fn cmd_grpc_server_streaming(
{
let req = req.clone();
let conn = conn.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.unwrap();
}
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
@@ -718,7 +648,6 @@ async fn cmd_grpc_server_streaming(
app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let db = db.clone();
let conn_id = conn.clone().id;
let app_handle = app_handle.clone();
let req = req.clone();
@@ -726,11 +655,12 @@ async fn cmd_grpc_server_streaming(
loop {
let req = req.clone();
let conn_id = conn_id.clone();
let app_handle = app_handle.clone();
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
let msg = upsert_grpc_message(
&db,
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
@@ -743,7 +673,6 @@ async fn cmd_grpc_server_streaming(
.await
.map_err(|e| e.to_string())
.expect("Failed to upsert message");
emit_side_effect(app_handle.clone(), "created_model", msg);
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -761,16 +690,12 @@ async fn cmd_grpc_server_streaming(
{
let conn = conn.clone();
let req = req.clone();
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
@@ -780,18 +705,11 @@ async fn cmd_grpc_server_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
@@ -801,9 +719,7 @@ async fn cmd_grpc_server_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -1076,7 +992,7 @@ async fn response_err(
response = update_response_if_id(db, &response)
.await
.expect("Failed to update response");
emit_side_effect(app_handle, "updated_model", &response);
emit_side_effect(app_handle, "upserted_model", &response);
Ok(response)
}
@@ -1136,7 +1052,7 @@ async fn cmd_set_key_value(
if created {
emit_and_return(&window, "created_model", key_value)
} else {
emit_and_return(&window, "updated_model", key_value)
emit_and_return(&window, "upserted_model", key_value)
}
}
@@ -1166,7 +1082,7 @@ async fn cmd_update_cookie_jar(
.await
.expect("Failed to update cookie jar");
emit_and_return(&window, "updated_model", updated)
emit_and_return(&window, "upserted_model", updated)
}
#[tauri::command]
@@ -1235,11 +1151,10 @@ async fn cmd_create_grpc_request(
sort_priority: f64,
folder_id: Option<&str>,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let created = upsert_grpc_request(
db,
&app_handle,
&GrpcRequest {
workspace_id: workspace_id.to_string(),
name: name.to_string(),
@@ -1258,10 +1173,9 @@ async fn cmd_create_grpc_request(
async fn cmd_duplicate_grpc_request(
id: &str,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let request = duplicate_grpc_request(db, id)
let request = duplicate_grpc_request(&app_handle, id)
.await
.expect("Failed to duplicate grpc request");
emit_and_return(&window, "created_model", request)
@@ -1318,7 +1232,7 @@ async fn cmd_update_workspace(
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_workspace)
emit_and_return(&window, "upserted_model", updated_workspace)
}
#[tauri::command]
@@ -1332,20 +1246,19 @@ async fn cmd_update_environment(
.await
.expect("Failed to update environment");
emit_and_return(&window, "updated_model", updated_environment)
emit_and_return(&window, "upserted_model", updated_environment)
}
#[tauri::command]
async fn cmd_update_grpc_request(
request: GrpcRequest,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let updated_request = upsert_grpc_request(db, &request)
let updated_request = upsert_grpc_request(&app_handle, &request)
.await
.expect("Failed to update grpc request");
emit_and_return(&window, "updated_model", updated_request)
emit_and_return(&window, "upserted_model", updated_request)
}
#[tauri::command]
@@ -1358,7 +1271,7 @@ async fn cmd_update_http_request(
let updated_request = upsert_http_request(db, request)
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_request)
emit_and_return(&window, "upserted_model", updated_request)
}
#[tauri::command]
@@ -1434,7 +1347,7 @@ async fn cmd_update_folder(
let updated_folder = upsert_folder(db, folder)
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_folder)
emit_and_return(&window, "upserted_model", updated_folder)
}
#[tauri::command]
@@ -1540,7 +1453,7 @@ async fn cmd_update_settings(
.await
.expect("Failed to update settings");
emit_and_return(&window, "updated_model", updated_settings)
emit_and_return(&window, "upserted_model", updated_settings)
}
#[tauri::command]
@@ -1553,12 +1466,10 @@ async fn cmd_get_folder(
}
#[tauri::command]
async fn cmd_get_grpc_request(
id: &str,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
get_grpc_request(db, id).await.map_err(|e| e.to_string())
async fn cmd_get_grpc_request(id: &str, app_handle: AppHandle<Wry>) -> Result<GrpcRequest, String> {
get_grpc_request(&app_handle, id)
.await
.map_err(|e| e.to_string())
}
#[tauri::command]