Fix DB mutex deadlock

This commit is contained in:
Gregory Schier
2024-02-04 21:17:05 -08:00
parent 5ed1ea07ef
commit 1f71d4372f
9 changed files with 312 additions and 348 deletions

View File

@@ -366,7 +366,7 @@ pub async fn send_http_request(
.await
.expect("Failed to update response");
if !request.id.is_empty() {
emit_side_effect(app_handle.clone(), "updated_model", &response);
emit_side_effect(app_handle.clone(), "upserted_model", &response);
}
// Copy response to download path, if specified
@@ -399,7 +399,7 @@ pub async fn send_http_request(
cookie_jar.cookies = json_cookies;
match models::upsert_cookie_jar(db, &cookie_jar).await {
Ok(updated_jar) => {
emit_side_effect(app_handle, "updated_model", &updated_jar);
emit_side_effect(app_handle, "upserted_model", &updated_jar);
}
Err(e) => {
error!("Failed to update cookie jar: {}", e);

View File

@@ -102,16 +102,14 @@ async fn cmd_grpc_call_unary(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcMessage, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -127,7 +125,7 @@ async fn cmd_grpc_call_unary(
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message(
db,
&app_handle,
&GrpcMessage {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -157,7 +155,7 @@ async fn cmd_grpc_call_unary(
{
Ok(msg) => {
upsert_grpc_message(
db,
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
@@ -178,18 +176,15 @@ async fn cmd_grpc_call_unary(
#[tauri::command]
async fn cmd_grpc_client_streaming(
request_id: &str,
grpc_handle: State<'_, Mutex<GrpcManager>>,
app_handle: AppHandle<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -204,24 +199,19 @@ async fn cmd_grpc_client_streaming(
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.unwrap();
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
@@ -277,24 +267,18 @@ async fn cmd_grpc_client_streaming(
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.unwrap();
});
}
Ok(IncomingMsg::Commit) => {
@@ -318,79 +302,61 @@ async fn cmd_grpc_client_streaming(
let req = req.clone();
async move {
let grpc_handle = app_handle.state::<Mutex<GrpcManager>>();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let msg = grpc_handle
.lock()
.await
.client_streaming(&conn.id, uri, &service, &method, in_msg_stream)
.await
.unwrap();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
db,
&GrpcMessage {
message: msg.to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg.to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.unwrap();
}
};
{
let conn = conn.clone();
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await.map_err(|e| e.to_string()).unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string()).unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -405,16 +371,14 @@ async fn cmd_grpc_streaming(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<String, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -429,24 +393,19 @@ async fn cmd_grpc_streaming(
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message");
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
@@ -496,24 +455,19 @@ async fn cmd_grpc_streaming(
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
});
}
Ok(IncomingMsg::Cancel) => {
@@ -539,25 +493,20 @@ async fn cmd_grpc_streaming(
let item = serde_json::to_string_pretty(&item).unwrap();
let req = req.clone();
let conn = conn.clone();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -575,36 +524,25 @@ async fn cmd_grpc_streaming(
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let app_handle = app_handle.clone();
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await.map_err(|e| e.to_string()).unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
@@ -614,9 +552,7 @@ async fn cmd_grpc_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.map_err(|e| e.to_string()).unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -631,17 +567,15 @@ async fn cmd_grpc_server_streaming(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&app_handle,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
@@ -656,23 +590,19 @@ async fn cmd_grpc_server_streaming(
{
let req = req.clone();
let conn = conn.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.unwrap();
}
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
@@ -718,7 +648,6 @@ async fn cmd_grpc_server_streaming(
app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let db = db.clone();
let conn_id = conn.clone().id;
let app_handle = app_handle.clone();
let req = req.clone();
@@ -726,11 +655,12 @@ async fn cmd_grpc_server_streaming(
loop {
let req = req.clone();
let conn_id = conn_id.clone();
let app_handle = app_handle.clone();
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
let msg = upsert_grpc_message(
&db,
upsert_grpc_message(
&app_handle,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
@@ -743,7 +673,6 @@ async fn cmd_grpc_server_streaming(
.await
.map_err(|e| e.to_string())
.expect("Failed to upsert message");
emit_side_effect(app_handle.clone(), "created_model", msg);
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -761,16 +690,12 @@ async fn cmd_grpc_server_streaming(
{
let conn = conn.clone();
let req = req.clone();
let app_handle = app_handle.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
@@ -780,18 +705,11 @@ async fn cmd_grpc_server_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.unwrap();
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&app_handle,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
@@ -801,9 +719,7 @@ async fn cmd_grpc_server_streaming(
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
.await.unwrap();
},
}
app_handle.unlisten(event_handler);
@@ -1076,7 +992,7 @@ async fn response_err(
response = update_response_if_id(db, &response)
.await
.expect("Failed to update response");
emit_side_effect(app_handle, "updated_model", &response);
emit_side_effect(app_handle, "upserted_model", &response);
Ok(response)
}
@@ -1136,7 +1052,7 @@ async fn cmd_set_key_value(
if created {
emit_and_return(&window, "created_model", key_value)
} else {
emit_and_return(&window, "updated_model", key_value)
emit_and_return(&window, "upserted_model", key_value)
}
}
@@ -1166,7 +1082,7 @@ async fn cmd_update_cookie_jar(
.await
.expect("Failed to update cookie jar");
emit_and_return(&window, "updated_model", updated)
emit_and_return(&window, "upserted_model", updated)
}
#[tauri::command]
@@ -1235,11 +1151,10 @@ async fn cmd_create_grpc_request(
sort_priority: f64,
folder_id: Option<&str>,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let created = upsert_grpc_request(
db,
&app_handle,
&GrpcRequest {
workspace_id: workspace_id.to_string(),
name: name.to_string(),
@@ -1258,10 +1173,9 @@ async fn cmd_create_grpc_request(
async fn cmd_duplicate_grpc_request(
id: &str,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let request = duplicate_grpc_request(db, id)
let request = duplicate_grpc_request(&app_handle, id)
.await
.expect("Failed to duplicate grpc request");
emit_and_return(&window, "created_model", request)
@@ -1318,7 +1232,7 @@ async fn cmd_update_workspace(
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_workspace)
emit_and_return(&window, "upserted_model", updated_workspace)
}
#[tauri::command]
@@ -1332,20 +1246,19 @@ async fn cmd_update_environment(
.await
.expect("Failed to update environment");
emit_and_return(&window, "updated_model", updated_environment)
emit_and_return(&window, "upserted_model", updated_environment)
}
#[tauri::command]
async fn cmd_update_grpc_request(
request: GrpcRequest,
window: Window<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
app_handle: AppHandle,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
let updated_request = upsert_grpc_request(db, &request)
let updated_request = upsert_grpc_request(&app_handle, &request)
.await
.expect("Failed to update grpc request");
emit_and_return(&window, "updated_model", updated_request)
emit_and_return(&window, "upserted_model", updated_request)
}
#[tauri::command]
@@ -1358,7 +1271,7 @@ async fn cmd_update_http_request(
let updated_request = upsert_http_request(db, request)
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_request)
emit_and_return(&window, "upserted_model", updated_request)
}
#[tauri::command]
@@ -1434,7 +1347,7 @@ async fn cmd_update_folder(
let updated_folder = upsert_folder(db, folder)
.await
.expect("Failed to update request");
emit_and_return(&window, "updated_model", updated_folder)
emit_and_return(&window, "upserted_model", updated_folder)
}
#[tauri::command]
@@ -1540,7 +1453,7 @@ async fn cmd_update_settings(
.await
.expect("Failed to update settings");
emit_and_return(&window, "updated_model", updated_settings)
emit_and_return(&window, "upserted_model", updated_settings)
}
#[tauri::command]
@@ -1553,12 +1466,10 @@ async fn cmd_get_folder(
}
#[tauri::command]
async fn cmd_get_grpc_request(
id: &str,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcRequest, String> {
let db = &*db_state.lock().await;
get_grpc_request(db, id).await.map_err(|e| e.to_string())
async fn cmd_get_grpc_request(id: &str, app_handle: AppHandle<Wry>) -> Result<GrpcRequest, String> {
get_grpc_request(&app_handle, id)
.await
.map_err(|e| e.to_string())
}
#[tauri::command]

View File

@@ -7,7 +7,8 @@ use serde::{Deserialize, Serialize};
use sqlx::types::chrono::NaiveDateTime;
use sqlx::types::{Json, JsonValue};
use sqlx::{Pool, Sqlite};
use tauri::AppHandle;
use tauri::{AppHandle, Manager};
use tokio::sync::Mutex;
fn default_true() -> bool {
true
@@ -457,18 +458,19 @@ pub async fn delete_cookie_jar(db: &Pool<Sqlite>, id: &str) -> Result<CookieJar,
}
pub async fn duplicate_grpc_request(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcRequest, sqlx::Error> {
let mut request = get_grpc_request(db, id).await?.clone();
let mut request = get_grpc_request(app_handle, id).await?.clone();
request.id = "".to_string();
upsert_grpc_request(db, &request).await
upsert_grpc_request(app_handle, &request).await
}
pub async fn upsert_grpc_request(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
request: &GrpcRequest,
) -> Result<GrpcRequest, sqlx::Error> {
let db = get_db(app_handle).await;
let id = match request.id.as_str() {
"" => generate_id(Some("gr")),
_ => request.id.to_string(),
@@ -500,13 +502,19 @@ pub async fn upsert_grpc_request(
request.method,
request.message,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_request(db, &id).await
get_grpc_request(app_handle, &id).await
}
pub async fn get_grpc_request(db: &Pool<Sqlite>, id: &str) -> Result<GrpcRequest, sqlx::Error> {
pub async fn get_grpc_request(
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcRequest, sqlx::Error> {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
sqlx::query_as!(
GrpcRequest,
r#"
@@ -542,9 +550,10 @@ pub async fn list_grpc_requests(
}
pub async fn upsert_grpc_connection(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
connection: &GrpcConnection,
) -> Result<GrpcConnection, sqlx::Error> {
let db = get_db(&app_handle).await;
let id = match connection.id.as_str() {
"" => generate_id(Some("gc")),
_ => connection.id.to_string(),
@@ -554,7 +563,7 @@ pub async fn upsert_grpc_connection(
INSERT INTO grpc_connections (
id, workspace_id, request_id, service, method
)
VALUES ( ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
service = excluded.service,
@@ -566,16 +575,17 @@ pub async fn upsert_grpc_connection(
connection.service,
connection.method,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_connection(db, &id).await
get_grpc_connection(app_handle, &id).await
}
pub async fn get_grpc_connection(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcConnection, sqlx::Error> {
let db = get_db(&app_handle).await;
sqlx::query_as!(
GrpcConnection,
r#"
@@ -585,7 +595,7 @@ pub async fn get_grpc_connection(
"#,
id,
)
.fetch_one(db)
.fetch_one(&db)
.await
}
@@ -608,9 +618,10 @@ pub async fn list_grpc_connections(
}
pub async fn upsert_grpc_message(
db: &Pool<Sqlite>,
app_handle: &AppHandle,
message: &GrpcMessage,
) -> Result<GrpcMessage, sqlx::Error> {
let db = get_db(app_handle).await;
let id = match message.id.as_str() {
"" => generate_id(Some("gm")),
_ => message.id.to_string(),
@@ -635,13 +646,21 @@ pub async fn upsert_grpc_message(
message.is_server,
message.is_info,
)
.execute(db)
.execute(&db)
.await?;
get_grpc_message(db, &id).await
let msg = get_grpc_message(app_handle, &id).await;
match msg {
Ok(msg) => Ok(emit_upserted_model(app_handle, msg.clone()).await),
Err(e) => Err(e),
}
}
pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage, sqlx::Error> {
pub async fn get_grpc_message(
app_handle: &AppHandle,
id: &str,
) -> Result<GrpcMessage, sqlx::Error> {
let db = get_db(app_handle).await;
sqlx::query_as!(
GrpcMessage,
r#"
@@ -653,7 +672,7 @@ pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage
"#,
id,
)
.fetch_one(db)
.fetch_one(&db)
.await
}
@@ -1327,3 +1346,16 @@ pub async fn get_workspace_export_resources(
},
};
}
async fn emit_upserted_model<S: Serialize + Clone>(app_handle: &AppHandle, model: S) -> S {
app_handle
.emit_all("upserted_model", model.clone())
.unwrap();
model
}
async fn get_db(app_handle: &AppHandle) -> Pool<Sqlite> {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
db.clone()
}