From 00289734c776b7e0227930bd2a0408effde0ca1b Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sun, 11 Feb 2024 08:29:57 -0800 Subject: [PATCH] Use basemsg --- src-tauri/src/main.rs | 198 ++++++++++++++++++++++-------------------- 1 file changed, 104 insertions(+), 94 deletions(-) diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 2b845a59..4df8b89e 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -18,7 +18,6 @@ use std::str::FromStr; use ::http::uri::InvalidUri; use ::http::Uri; use fern::colors::ColoredLevelConfig; -use futures::StreamExt; use log::{debug, error, info, warn}; use rand::random; use serde_json::{json, Value}; @@ -149,24 +148,24 @@ async fn cmd_grpc_go( .map_err(|e| e.to_string())? }; - { - let conn = conn.clone(); - let req = req.clone(); - upsert_grpc_message( - &w, - &GrpcMessage { - message: "Initiating connection".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_info: true, - ..Default::default() - }, - ) - .await - .expect("Failed to upsert message"); + let base_msg = GrpcMessage { + workspace_id: req.clone().workspace_id, + request_id: req.clone().id, + connection_id: conn.clone().id, + ..Default::default() }; + upsert_grpc_message( + &w, + &GrpcMessage { + message: "Initiating connection".to_string(), + is_info: true, + ..base_msg.clone() + }, + ) + .await + .expect("Failed to upsert message"); + let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone())); let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); @@ -212,8 +211,7 @@ async fn cmd_grpc_go( let cb = { let cancelled_rx = cancelled_rx.clone(); let w = w.clone(); - let conn = conn.clone(); - let req = req.clone(); + let base_msg = base_msg.clone(); move |ev: tauri::Event| { if *cancelled_rx.borrow() { @@ -236,17 +234,13 @@ async fn cmd_grpc_go( Ok(IncomingMsg::Message(msg)) => { in_msg_tx.try_send(msg.clone()).unwrap(); let w = w.clone(); - let req = req.clone(); - let conn = conn.clone(); + let base_msg = base_msg.clone(); tauri::async_runtime::spawn(async move { upsert_grpc_message( &w, &GrpcMessage { message: msg, - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - ..Default::default() + ..base_msg.clone() }, ) .await @@ -270,7 +264,7 @@ async fn cmd_grpc_go( let grpc_listen = { let w = w.clone(); - let conn = conn.clone(); + let base_msg = base_msg.clone(); let req = req.clone(); async move { let (maybe_stream, maybe_msg) = match ( @@ -278,12 +272,7 @@ async fn cmd_grpc_go( method_desc.is_server_streaming(), ) { (true, true) => ( - Some( - connection - .streaming(&service, &method, in_msg_stream) - .await - .unwrap(), - ), + Some(connection.streaming(&service, &method, in_msg_stream).await), None, ), (true, false) => ( @@ -291,81 +280,118 @@ async fn cmd_grpc_go( Some( connection .client_streaming(&service, &method, in_msg_stream) - .await - .map_err(|e| e.to_string()) - .unwrap(), + .await, ), ), (false, true) => ( Some( connection .server_streaming(&service, &method, &req.message) - .await - .unwrap(), + .await, ), None, ), (false, false) => ( None, - Some( - connection - .unary(&service, &method, &req.message) - .await - .unwrap(), - ), + Some(connection.unary(&service, &method, &req.message).await), ), }; - if let Some(msg) = maybe_msg { - let req = req.clone(); - let conn = conn.clone(); - upsert_grpc_message( - &w, - &GrpcMessage { - message: serialize_message(&msg).unwrap(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, - is_server: true, - ..Default::default() - }, - ) - .await - .map_err(|e| e.to_string()) - .unwrap(); + match maybe_msg { + Some(Ok(msg)) => { + upsert_grpc_message( + &w, + &GrpcMessage { + message: serialize_message(&msg).unwrap(), + is_server: true, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + } + Some(Err(e)) => { + // TODO: Make into error + upsert_grpc_message( + &w, + &GrpcMessage { + message: e.to_string(), + is_server: true, + is_info: true, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + } + None => {} } + let mut stream = match maybe_stream { - Some(s) => s, + Some(Ok(s)) => s, + Some(Err(e)) => { + // TODO: Make into error + upsert_grpc_message( + &w, + &GrpcMessage { + message: e.to_string(), + is_server: true, + is_info: true, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + return; + } None => return, }; + loop { - match stream.next().await { - Some(Ok(item)) => { - let item = serialize_message(&item).unwrap(); - let req = req.clone(); - let conn = conn.clone(); + match stream.message().await { + Ok(Some(msg)) => { + let message = serialize_message(&msg).unwrap(); upsert_grpc_message( &w, &GrpcMessage { - message: item, - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.id, + message, is_server: true, - ..Default::default() + ..base_msg.clone() }, ) .await .unwrap(); } - Some(Err(e)) => { - error!("gRPC stream error: {:?}", e); - // TODO: Handle error - } - None => { - info!("gRPC stream closed by sender"); + Ok(None) => { + // TODO: Store trailers on connection + let trailers = stream.trailers().await.unwrap_or_default(); + info!("gRPC stream closed by sender {:?}", trailers,); + // TODO: Mark this on connection instead + upsert_grpc_message( + &w, + &GrpcMessage { + message: "Connection completed".to_string(), + ..base_msg.clone() + }, + ) + .await + .unwrap(); break; } + Err(e) => { + // TODO: Make into error + upsert_grpc_message( + &w, + &GrpcMessage { + message: e.to_string(), + is_server: true, + is_info: true, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + } } } } @@ -375,21 +401,8 @@ async fn cmd_grpc_go( let conn = conn.clone(); tauri::async_runtime::spawn(async move { let w = w.clone(); - let req = req.clone(); tokio::select! { _ = grpc_listen => { - upsert_grpc_message( - &w, - &GrpcMessage { - message: "Connection completed".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.clone().id, - is_info: true, - ..Default::default() - }, - ) - .await.unwrap(); upsert_grpc_connection( &w, &GrpcConnection{ @@ -403,11 +416,8 @@ async fn cmd_grpc_go( &w, &GrpcMessage { message: "Connection cancelled".to_string(), - workspace_id: req.workspace_id, - request_id: req.id, - connection_id: conn.clone().id, is_info: true, - ..Default::default() + ..base_msg.clone() }, ) .await.unwrap();