Fix DB mutex deadlock

This commit is contained in:
Gregory Schier
2024-02-04 21:17:05 -08:00
parent e2c17873ae
commit 60773cab53
9 changed files with 312 additions and 348 deletions

View File

@@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize};
use sqlx::types::chrono::NaiveDateTime;
use sqlx::types::{Json, JsonValue};
use sqlx::{Pool, Sqlite};
use tauri::AppHandle;
use tauri::{AppHandle, Manager};
use tokio::sync::Mutex;
fn default_true() -> bool {
true
@@ -457,18 +458,19 @@ pub async fn delete_cookie_jar(db: &Pool<Sqlite>, id: &str) -> Result<CookieJar,
}
pub async fn duplicate_grpc_request(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcRequest, sqlx::Error> {
let mut request = get_grpc_request(db, id).await?.clone();
let mut request = get_grpc_request(app_handle, id).await?.clone();
request.id = "".to_string();
upsert_grpc_request(db, &request).await
upsert_grpc_request(app_handle, &request).await
}
pub async fn upsert_grpc_request(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
request: &GrpcRequest,
) -> Result<GrpcRequest, sqlx::Error> {
let db = get_db(app_handle).await;
let id = match request.id.as_str() {
"" => generate_id(Some("gr")),
_ => request.id.to_string(),
@@ -500,13 +502,19 @@ pub async fn upsert_grpc_request(
request.method,
request.message,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_request(db, &id).await
get_grpc_request(app_handle, &id).await
}
pub async fn get_grpc_request(db: &Pool<Sqlite>, id: &str) -> Result<GrpcRequest, sqlx::Error> {
pub async fn get_grpc_request(
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcRequest, sqlx::Error> {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
sqlx::query_as!(
GrpcRequest,
r#"
@@ -542,9 +550,10 @@ pub async fn list_grpc_requests(
}
pub async fn upsert_grpc_connection(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
connection: &GrpcConnection,
) -> Result<GrpcConnection, sqlx::Error> {
let db = get_db(&app_handle).await;
let id = match connection.id.as_str() {
"" => generate_id(Some("gc")),
_ => connection.id.to_string(),
@@ -554,7 +563,7 @@ pub async fn upsert_grpc_connection(
INSERT INTO grpc_connections (
id, workspace_id, request_id, service, method
)
VALUES ( ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
service = excluded.service,
@@ -566,16 +575,17 @@ pub async fn upsert_grpc_connection(
connection.service,
connection.method,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_connection(db, &id).await
get_grpc_connection(app_handle, &id).await
}
pub async fn get_grpc_connection(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcConnection, sqlx::Error> {
let db = get_db(&app_handle).await;
sqlx::query_as!(
GrpcConnection,
r#"
@@ -585,7 +595,7 @@ pub async fn get_grpc_connection(
"#,
id,
)
.fetch_one(db)
.fetch_one(&db)
.await
}
@@ -608,9 +618,10 @@ pub async fn list_grpc_connections(
}
pub async fn upsert_grpc_message(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
message: &GrpcMessage,
) -> Result<GrpcMessage, sqlx::Error> {
let db = get_db(app_handle).await;
let id = match message.id.as_str() {
"" => generate_id(Some("gm")),
_ => message.id.to_string(),
@@ -635,13 +646,21 @@ pub async fn upsert_grpc_message(
message.is_server,
message.is_info,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_message(db, &id).await
let msg = get_grpc_message(app_handle, &id).await;
match msg {
Ok(msg) => Ok(emit_upserted_model(app_handle, msg.clone()).await),
Err(e) => Err(e),
}
}
pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage, sqlx::Error> {
pub async fn get_grpc_message(
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcMessage, sqlx::Error> {
let db = get_db(app_handle).await;
sqlx::query_as!(
GrpcMessage,
r#"
@@ -653,7 +672,7 @@ pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage
"#,
id,
)
.fetch_one(db)
.fetch_one(&db)
.await
}
@@ -1327,3 +1346,16 @@ pub async fn get_workspace_export_resources(
},
};
}
async fn emit_upserted_model<S: Serialize + Clone>(app_handle: &AppHandle, model: S) -> S {
app_handle
.emit_all("upserted_model", model.clone())
.unwrap();
model
}
async fn get_db(app_handle: &AppHandle) -> Pool<Sqlite> {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
db.clone()
}