Surface gRPC message deserialization errors to UI

Previously, when a gRPC streaming message failed to deserialize (e.g., wrong
type like int instead of string), the error was silently logged and the message
was dropped. Now errors are surfaced to the UI as GrpcEventType::Error events.

Changed the streaming/client_streaming methods to accept an on_message callback
that handles both success (logs ClientMessage) and error (logs Error) cases,
rather than logging the client message prematurely before deserialization.
This commit is contained in:
Gregory Schier
2026-01-10 14:41:49 -08:00
parent aa79fb05f9
commit 4d75b8ef06
2 changed files with 64 additions and 24 deletions

View File

@@ -360,10 +360,8 @@ async fn cmd_grpc_go<R: Runtime>(
let cb = {
let cancelled_rx = cancelled_rx.clone();
let app_handle = app_handle.clone();
let environment_chain = environment_chain.clone();
let window = window.clone();
let base_msg = base_msg.clone();
let plugin_manager = plugin_manager.clone();
let encryption_manager = encryption_manager.clone();
@@ -385,8 +383,6 @@ async fn cmd_grpc_go<R: Runtime>(
match serde_json::from_str::<IncomingMsg>(ev.payload()) {
Ok(IncomingMsg::Message(msg)) => {
let window = window.clone();
let app_handle = app_handle.clone();
let base_msg = base_msg.clone();
let environment_chain = environment_chain.clone();
let plugin_manager = plugin_manager.clone();
let encryption_manager = encryption_manager.clone();
@@ -411,19 +407,6 @@ async fn cmd_grpc_go<R: Runtime>(
})
});
in_msg_tx.try_send(msg.clone()).unwrap();
tauri::async_runtime::spawn(async move {
app_handle
.db()
.upsert_grpc_event(
&GrpcEvent {
content: msg,
event_type: GrpcEventType::ClientMessage,
..base_msg.clone()
},
&UpdateSource::from_window_label(window.label()),
)
.unwrap();
});
}
Ok(IncomingMsg::Commit) => {
maybe_in_msg_tx.take();
@@ -470,12 +453,48 @@ async fn cmd_grpc_go<R: Runtime>(
)?;
async move {
// Create callback for streaming methods that handles both success and error
let on_message = {
let app_handle = app_handle.clone();
let base_event = base_event.clone();
let window_label = window.label().to_string();
move |result: std::result::Result<String, String>| match result {
Ok(msg) => {
let _ = app_handle.db().upsert_grpc_event(
&GrpcEvent {
content: msg,
event_type: GrpcEventType::ClientMessage,
..base_event.clone()
},
&UpdateSource::from_window_label(&window_label),
);
}
Err(error) => {
let _ = app_handle.db().upsert_grpc_event(
&GrpcEvent {
content: format!("Failed to send message: {}", error),
event_type: GrpcEventType::Error,
..base_event.clone()
},
&UpdateSource::from_window_label(&window_label),
);
}
}
};
let (maybe_stream, maybe_msg) =
match (method_desc.is_client_streaming(), method_desc.is_server_streaming()) {
(true, true) => (
Some(
connection
.streaming(&service, &method, in_msg_stream, &metadata, client_cert)
.streaming(
&service,
&method,
in_msg_stream,
&metadata,
client_cert,
on_message.clone(),
)
.await,
),
None,
@@ -490,6 +509,7 @@ async fn cmd_grpc_go<R: Runtime>(
in_msg_stream,
&metadata,
client_cert,
on_message.clone(),
)
.await,
),

View File

@@ -115,14 +115,18 @@ impl GrpcConnection {
Ok(client.unary(req, path, codec).await?)
}
pub async fn streaming(
pub async fn streaming<F>(
&self,
service: &str,
method: &str,
stream: ReceiverStream<String>,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
) -> Result<Response<Streaming<DynamicMessage>>> {
on_message: F,
) -> Result<Response<Streaming<DynamicMessage>>>
where
F: Fn(std::result::Result<String, String>) + Send + Sync + Clone + 'static,
{
let method = &self.method(&service, &method).await?;
let mapped_stream = {
let input_message = method.input();
@@ -139,6 +143,8 @@ impl GrpcConnection {
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
let on_message = on_message.clone();
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
@@ -149,9 +155,13 @@ impl GrpcConnection {
}
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Ok(m) => {
on_message(Ok(json_clone));
Some(m)
}
Err(e) => {
warn!("Failed to deserialize message: {e}");
on_message(Err(e.to_string()));
None
}
}
@@ -171,14 +181,18 @@ impl GrpcConnection {
Ok(client.streaming(req, path, codec).await?)
}
pub async fn client_streaming(
pub async fn client_streaming<F>(
&self,
service: &str,
method: &str,
stream: ReceiverStream<String>,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
) -> Result<Response<DynamicMessage>> {
on_message: F,
) -> Result<Response<DynamicMessage>>
where
F: Fn(std::result::Result<String, String>) + Send + Sync + Clone + 'static,
{
let method = &self.method(&service, &method).await?;
let mapped_stream = {
let input_message = method.input();
@@ -195,6 +209,8 @@ impl GrpcConnection {
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
let on_message = on_message.clone();
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
@@ -205,9 +221,13 @@ impl GrpcConnection {
}
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Ok(m) => {
on_message(Ok(json_clone));
Some(m)
}
Err(e) => {
warn!("Failed to deserialize message: {e}");
on_message(Err(e.to_string()));
None
}
}