diff --git a/crates-tauri/yaak-app/src/lib.rs b/crates-tauri/yaak-app/src/lib.rs index 7b0311d7..d176f9bc 100644 --- a/crates-tauri/yaak-app/src/lib.rs +++ b/crates-tauri/yaak-app/src/lib.rs @@ -360,10 +360,8 @@ async fn cmd_grpc_go( let cb = { let cancelled_rx = cancelled_rx.clone(); - let app_handle = app_handle.clone(); let environment_chain = environment_chain.clone(); let window = window.clone(); - let base_msg = base_msg.clone(); let plugin_manager = plugin_manager.clone(); let encryption_manager = encryption_manager.clone(); @@ -385,8 +383,6 @@ async fn cmd_grpc_go( match serde_json::from_str::(ev.payload()) { Ok(IncomingMsg::Message(msg)) => { let window = window.clone(); - let app_handle = app_handle.clone(); - let base_msg = base_msg.clone(); let environment_chain = environment_chain.clone(); let plugin_manager = plugin_manager.clone(); let encryption_manager = encryption_manager.clone(); @@ -411,19 +407,6 @@ async fn cmd_grpc_go( }) }); in_msg_tx.try_send(msg.clone()).unwrap(); - tauri::async_runtime::spawn(async move { - app_handle - .db() - .upsert_grpc_event( - &GrpcEvent { - content: msg, - event_type: GrpcEventType::ClientMessage, - ..base_msg.clone() - }, - &UpdateSource::from_window_label(window.label()), - ) - .unwrap(); - }); } Ok(IncomingMsg::Commit) => { maybe_in_msg_tx.take(); @@ -470,12 +453,48 @@ async fn cmd_grpc_go( )?; async move { + // Create callback for streaming methods that handles both success and error + let on_message = { + let app_handle = app_handle.clone(); + let base_event = base_event.clone(); + let window_label = window.label().to_string(); + move |result: std::result::Result| match result { + Ok(msg) => { + let _ = app_handle.db().upsert_grpc_event( + &GrpcEvent { + content: msg, + event_type: GrpcEventType::ClientMessage, + ..base_event.clone() + }, + &UpdateSource::from_window_label(&window_label), + ); + } + Err(error) => { + let _ = app_handle.db().upsert_grpc_event( + &GrpcEvent { + content: format!("Failed to send message: {}", error), + event_type: GrpcEventType::Error, + ..base_event.clone() + }, + &UpdateSource::from_window_label(&window_label), + ); + } + } + }; + let (maybe_stream, maybe_msg) = match (method_desc.is_client_streaming(), method_desc.is_server_streaming()) { (true, true) => ( Some( connection - .streaming(&service, &method, in_msg_stream, &metadata, client_cert) + .streaming( + &service, + &method, + in_msg_stream, + &metadata, + client_cert, + on_message.clone(), + ) .await, ), None, @@ -490,6 +509,7 @@ async fn cmd_grpc_go( in_msg_stream, &metadata, client_cert, + on_message.clone(), ) .await, ), diff --git a/crates/yaak-grpc/src/manager.rs b/crates/yaak-grpc/src/manager.rs index 48294e7f..bd1dd17a 100644 --- a/crates/yaak-grpc/src/manager.rs +++ b/crates/yaak-grpc/src/manager.rs @@ -115,14 +115,18 @@ impl GrpcConnection { Ok(client.unary(req, path, codec).await?) } - pub async fn streaming( + pub async fn streaming( &self, service: &str, method: &str, stream: ReceiverStream, metadata: &BTreeMap, client_cert: Option, - ) -> Result>> { + on_message: F, + ) -> Result>> + where + F: Fn(std::result::Result) + Send + Sync + Clone + 'static, + { let method = &self.method(&service, &method).await?; let mapped_stream = { let input_message = method.input(); @@ -139,6 +143,8 @@ impl GrpcConnection { let md = md.clone(); let use_reflection = use_reflection.clone(); let client_cert = client_cert.clone(); + let on_message = on_message.clone(); + let json_clone = json.clone(); async move { if use_reflection { if let Err(e) = @@ -149,9 +155,13 @@ impl GrpcConnection { } let mut de = Deserializer::from_str(&json); match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), + Ok(m) => { + on_message(Ok(json_clone)); + Some(m) + } Err(e) => { warn!("Failed to deserialize message: {e}"); + on_message(Err(e.to_string())); None } } @@ -171,14 +181,18 @@ impl GrpcConnection { Ok(client.streaming(req, path, codec).await?) } - pub async fn client_streaming( + pub async fn client_streaming( &self, service: &str, method: &str, stream: ReceiverStream, metadata: &BTreeMap, client_cert: Option, - ) -> Result> { + on_message: F, + ) -> Result> + where + F: Fn(std::result::Result) + Send + Sync + Clone + 'static, + { let method = &self.method(&service, &method).await?; let mapped_stream = { let input_message = method.input(); @@ -195,6 +209,8 @@ impl GrpcConnection { let md = md.clone(); let use_reflection = use_reflection.clone(); let client_cert = client_cert.clone(); + let on_message = on_message.clone(); + let json_clone = json.clone(); async move { if use_reflection { if let Err(e) = @@ -205,9 +221,13 @@ impl GrpcConnection { } let mut de = Deserializer::from_str(&json); match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), + Ok(m) => { + on_message(Ok(json_clone)); + Some(m) + } Err(e) => { warn!("Failed to deserialize message: {e}"); + on_message(Err(e.to_string())); None } }