Fix gRPC stream panic: use async stream combinators instead of block_on

The gRPC streaming code was using tokio::runtime::Handle::current().block_on()
inside filter_map closures, which caused a panic ('Cannot start a runtime from
within a runtime') when called from an async context.

Fixed by replacing the pattern with .then(async move { ... }).filter_map(|x| x)
which properly handles async operations in stream pipelines.

This fixes the gRPC Ping/Pong freeze issue and restores request cancellation.
This commit is contained in:
Gregory Schier
2026-01-10 14:31:39 -08:00
parent fe01796536
commit aa79fb05f9
2 changed files with 49 additions and 45 deletions

View File

@@ -131,31 +131,33 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
stream.filter_map(move |json| {
let pool = pool.clone();
let uri = uri.clone();
let input_message = input_message.clone();
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
tokio::runtime::Handle::current().block_on(async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
{
warn!("Failed to resolve Any types: {e}");
stream
.then(move |json| {
let pool = pool.clone();
let uri = uri.clone();
let input_message = input_message.clone();
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
{
warn!("Failed to resolve Any types: {e}");
}
}
}
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Err(e) => {
warn!("Failed to deserialize message: {e}");
None
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Err(e) => {
warn!("Failed to deserialize message: {e}");
None
}
}
}
})
})
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
@@ -185,31 +187,33 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
stream.filter_map(move |json| {
let pool = pool.clone();
let uri = uri.clone();
let input_message = input_message.clone();
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
tokio::runtime::Handle::current().block_on(async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
{
warn!("Failed to resolve Any types: {e}");
stream
.then(move |json| {
let pool = pool.clone();
let uri = uri.clone();
let input_message = input_message.clone();
let md = md.clone();
let use_reflection = use_reflection.clone();
let client_cert = client_cert.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
{
warn!("Failed to resolve Any types: {e}");
}
}
}
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Err(e) => {
warn!("Failed to deserialize message: {e}");
None
let mut de = Deserializer::from_str(&json);
match DynamicMessage::deserialize(input_message, &mut de) {
Ok(m) => Some(m),
Err(e) => {
warn!("Failed to deserialize message: {e}");
None
}
}
}
})
})
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());