From 4d75b8ef06fa9978f3a9a11c4961d5592f05ef0d Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sat, 10 Jan 2026 14:41:49 -0800 Subject: [PATCH] Surface gRPC message deserialization errors to UI Previously, when a gRPC streaming message failed to deserialize (e.g., wrong type like int instead of string), the error was silently logged and the message was dropped. Now errors are surfaced to the UI as GrpcEventType::Error events. Changed the streaming/client_streaming methods to accept an on_message callback that handles both success (logs ClientMessage) and error (logs Error) cases, rather than logging the client message prematurely before deserialization. --- crates-tauri/yaak-app/src/lib.rs | 56 ++++++++++++++++++++++---------- crates/yaak-grpc/src/manager.rs | 32 ++++++++++++++---- 2 files changed, 64 insertions(+), 24 deletions(-) diff --git a/crates-tauri/yaak-app/src/lib.rs b/crates-tauri/yaak-app/src/lib.rs index 7b0311d7..d176f9bc 100644 --- a/crates-tauri/yaak-app/src/lib.rs +++ b/crates-tauri/yaak-app/src/lib.rs @@ -360,10 +360,8 @@ async fn cmd_grpc_go( let cb = { let cancelled_rx = cancelled_rx.clone(); - let app_handle = app_handle.clone(); let environment_chain = environment_chain.clone(); let window = window.clone(); - let base_msg = base_msg.clone(); let plugin_manager = plugin_manager.clone(); let encryption_manager = encryption_manager.clone(); @@ -385,8 +383,6 @@ async fn cmd_grpc_go( match serde_json::from_str::(ev.payload()) { Ok(IncomingMsg::Message(msg)) => { let window = window.clone(); - let app_handle = app_handle.clone(); - let base_msg = base_msg.clone(); let environment_chain = environment_chain.clone(); let plugin_manager = plugin_manager.clone(); let encryption_manager = encryption_manager.clone(); @@ -411,19 +407,6 @@ async fn cmd_grpc_go( }) }); in_msg_tx.try_send(msg.clone()).unwrap(); - tauri::async_runtime::spawn(async move { - app_handle - .db() - .upsert_grpc_event( - &GrpcEvent { - content: msg, - event_type: GrpcEventType::ClientMessage, - ..base_msg.clone() - }, - &UpdateSource::from_window_label(window.label()), - ) - .unwrap(); - }); } Ok(IncomingMsg::Commit) => { maybe_in_msg_tx.take(); @@ -470,12 +453,48 @@ async fn cmd_grpc_go( )?; async move { + // Create callback for streaming methods that handles both success and error + let on_message = { + let app_handle = app_handle.clone(); + let base_event = base_event.clone(); + let window_label = window.label().to_string(); + move |result: std::result::Result| match result { + Ok(msg) => { + let _ = app_handle.db().upsert_grpc_event( + &GrpcEvent { + content: msg, + event_type: GrpcEventType::ClientMessage, + ..base_event.clone() + }, + &UpdateSource::from_window_label(&window_label), + ); + } + Err(error) => { + let _ = app_handle.db().upsert_grpc_event( + &GrpcEvent { + content: format!("Failed to send message: {}", error), + event_type: GrpcEventType::Error, + ..base_event.clone() + }, + &UpdateSource::from_window_label(&window_label), + ); + } + } + }; + let (maybe_stream, maybe_msg) = match (method_desc.is_client_streaming(), method_desc.is_server_streaming()) { (true, true) => ( Some( connection - .streaming(&service, &method, in_msg_stream, &metadata, client_cert) + .streaming( + &service, + &method, + in_msg_stream, + &metadata, + client_cert, + on_message.clone(), + ) .await, ), None, @@ -490,6 +509,7 @@ async fn cmd_grpc_go( in_msg_stream, &metadata, client_cert, + on_message.clone(), ) .await, ), diff --git a/crates/yaak-grpc/src/manager.rs b/crates/yaak-grpc/src/manager.rs index 48294e7f..bd1dd17a 100644 --- a/crates/yaak-grpc/src/manager.rs +++ b/crates/yaak-grpc/src/manager.rs @@ -115,14 +115,18 @@ impl GrpcConnection { Ok(client.unary(req, path, codec).await?) } - pub async fn streaming( + pub async fn streaming( &self, service: &str, method: &str, stream: ReceiverStream, metadata: &BTreeMap, client_cert: Option, - ) -> Result>> { + on_message: F, + ) -> Result>> + where + F: Fn(std::result::Result) + Send + Sync + Clone + 'static, + { let method = &self.method(&service, &method).await?; let mapped_stream = { let input_message = method.input(); @@ -139,6 +143,8 @@ impl GrpcConnection { let md = md.clone(); let use_reflection = use_reflection.clone(); let client_cert = client_cert.clone(); + let on_message = on_message.clone(); + let json_clone = json.clone(); async move { if use_reflection { if let Err(e) = @@ -149,9 +155,13 @@ impl GrpcConnection { } let mut de = Deserializer::from_str(&json); match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), + Ok(m) => { + on_message(Ok(json_clone)); + Some(m) + } Err(e) => { warn!("Failed to deserialize message: {e}"); + on_message(Err(e.to_string())); None } } @@ -171,14 +181,18 @@ impl GrpcConnection { Ok(client.streaming(req, path, codec).await?) } - pub async fn client_streaming( + pub async fn client_streaming( &self, service: &str, method: &str, stream: ReceiverStream, metadata: &BTreeMap, client_cert: Option, - ) -> Result> { + on_message: F, + ) -> Result> + where + F: Fn(std::result::Result) + Send + Sync + Clone + 'static, + { let method = &self.method(&service, &method).await?; let mapped_stream = { let input_message = method.input(); @@ -195,6 +209,8 @@ impl GrpcConnection { let md = md.clone(); let use_reflection = use_reflection.clone(); let client_cert = client_cert.clone(); + let on_message = on_message.clone(); + let json_clone = json.clone(); async move { if use_reflection { if let Err(e) = @@ -205,9 +221,13 @@ impl GrpcConnection { } let mut de = Deserializer::from_str(&json); match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), + Ok(m) => { + on_message(Ok(json_clone)); + Some(m) + } Err(e) => { warn!("Failed to deserialize message: {e}"); + on_message(Err(e.to_string())); None } }