From aa79fb05f95182057f141d36618439815b908e2c Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sat, 10 Jan 2026 14:31:39 -0800 Subject: [PATCH] Fix gRPC stream panic: use async stream combinators instead of block_on The gRPC streaming code was using tokio::runtime::Handle::current().block_on() inside filter_map closures, which caused a panic ('Cannot start a runtime from within a runtime') when called from an async context. Fixed by replacing the pattern with .then(async move { ... }).filter_map(|x| x) which properly handles async operations in stream pipelines. This fixes the gRPC Ping/Pong freeze issue and restores request cancellation. --- crates-tauri/yaak-app/src/lib.rs | 6 +-- crates/yaak-grpc/src/manager.rs | 88 +++++++++++++++++--------------- 2 files changed, 49 insertions(+), 45 deletions(-) diff --git a/crates-tauri/yaak-app/src/lib.rs b/crates-tauri/yaak-app/src/lib.rs index ee3e281b..7b0311d7 100644 --- a/crates-tauri/yaak-app/src/lib.rs +++ b/crates-tauri/yaak-app/src/lib.rs @@ -392,7 +392,7 @@ async fn cmd_grpc_go( let encryption_manager = encryption_manager.clone(); let msg = block_in_place(|| { tauri::async_runtime::block_on(async { - render_template( + let result = render_template( msg.as_str(), environment_chain, &PluginTemplateCallback::new( @@ -406,8 +406,8 @@ async fn cmd_grpc_go( ), &RenderOptions { error_behavior: RenderErrorBehavior::Throw }, ) - .await - .expect("Failed to render template") + .await; + result.expect("Failed to render template") }) }); in_msg_tx.try_send(msg.clone()).unwrap(); diff --git a/crates/yaak-grpc/src/manager.rs b/crates/yaak-grpc/src/manager.rs index 3dc36c2c..48294e7f 100644 --- a/crates/yaak-grpc/src/manager.rs +++ b/crates/yaak-grpc/src/manager.rs @@ -131,31 +131,33 @@ impl GrpcConnection { let md = metadata.clone(); let use_reflection = self.use_reflection.clone(); let client_cert = client_cert.clone(); - stream.filter_map(move |json| { - let pool = pool.clone(); - let uri = uri.clone(); - let input_message = input_message.clone(); - let md = md.clone(); - let use_reflection = use_reflection.clone(); - let client_cert = client_cert.clone(); - tokio::runtime::Handle::current().block_on(async move { - if use_reflection { - if let Err(e) = - reflect_types_for_message(pool, &uri, &json, &md, client_cert).await - { - warn!("Failed to resolve Any types: {e}"); + stream + .then(move |json| { + let pool = pool.clone(); + let uri = uri.clone(); + let input_message = input_message.clone(); + let md = md.clone(); + let use_reflection = use_reflection.clone(); + let client_cert = client_cert.clone(); + async move { + if use_reflection { + if let Err(e) = + reflect_types_for_message(pool, &uri, &json, &md, client_cert).await + { + warn!("Failed to resolve Any types: {e}"); + } } - } - let mut de = Deserializer::from_str(&json); - match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), - Err(e) => { - warn!("Failed to deserialize message: {e}"); - None + let mut de = Deserializer::from_str(&json); + match DynamicMessage::deserialize(input_message, &mut de) { + Ok(m) => Some(m), + Err(e) => { + warn!("Failed to deserialize message: {e}"); + None + } } } }) - }) + .filter_map(|x| x) }; let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); @@ -185,31 +187,33 @@ impl GrpcConnection { let md = metadata.clone(); let use_reflection = self.use_reflection.clone(); let client_cert = client_cert.clone(); - stream.filter_map(move |json| { - let pool = pool.clone(); - let uri = uri.clone(); - let input_message = input_message.clone(); - let md = md.clone(); - let use_reflection = use_reflection.clone(); - let client_cert = client_cert.clone(); - tokio::runtime::Handle::current().block_on(async move { - if use_reflection { - if let Err(e) = - reflect_types_for_message(pool, &uri, &json, &md, client_cert).await - { - warn!("Failed to resolve Any types: {e}"); + stream + .then(move |json| { + let pool = pool.clone(); + let uri = uri.clone(); + let input_message = input_message.clone(); + let md = md.clone(); + let use_reflection = use_reflection.clone(); + let client_cert = client_cert.clone(); + async move { + if use_reflection { + if let Err(e) = + reflect_types_for_message(pool, &uri, &json, &md, client_cert).await + { + warn!("Failed to resolve Any types: {e}"); + } } - } - let mut de = Deserializer::from_str(&json); - match DynamicMessage::deserialize(input_message, &mut de) { - Ok(m) => Some(m), - Err(e) => { - warn!("Failed to deserialize message: {e}"); - None + let mut de = Deserializer::from_str(&json); + match DynamicMessage::deserialize(input_message, &mut de) { + Ok(m) => Some(m), + Err(e) => { + warn!("Failed to deserialize message: {e}"); + None + } } } }) - }) + .filter_map(|x| x) }; let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());