From a2004106972cc1bf207abb53c58ae6b41d9e245c Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Wed, 6 May 2026 07:42:35 -0700 Subject: [PATCH] Fix gRPC Any response reflection (#451) --- crates-tauri/yaak-app/src/lib.rs | 60 ++++++++++++++-- crates/yaak-grpc/src/lib.rs | 2 +- crates/yaak-grpc/src/manager.rs | 38 +++++++++- crates/yaak-grpc/src/reflection.rs | 79 ++++++++++++++++++++- packages/plugin-runtime-types/tsconfig.json | 1 + packages/plugin-runtime/tsconfig.json | 1 + 6 files changed, 171 insertions(+), 10 deletions(-) diff --git a/crates-tauri/yaak-app/src/lib.rs b/crates-tauri/yaak-app/src/lib.rs index 966c692b..93157d25 100644 --- a/crates-tauri/yaak-app/src/lib.rs +++ b/crates-tauri/yaak-app/src/lib.rs @@ -34,8 +34,7 @@ use tokio::time; use yaak_common::command::new_checked_command; use yaak_crypto::manager::EncryptionManager; use yaak_grpc::manager::{GrpcConfig, GrpcHandle}; -use yaak_templates::strip_json_comments::strip_json_comments; -use yaak_grpc::{Code, ServiceDefinition, serialize_message}; +use yaak_grpc::{Code, ServiceDefinition}; use yaak_mac_window::AppHandleMacWindowExt; use yaak_models::models::{ AnyModel, CookieJar, Environment, GrpcConnection, GrpcConnectionState, GrpcEvent, @@ -60,6 +59,7 @@ use yaak_plugins::template_callback::PluginTemplateCallback; use yaak_sse::sse::ServerSentEvent; use yaak_tauri_utils::window::WorkspaceWindowTrait; use yaak_templates::format_json::format_json; +use yaak_templates::strip_json_comments::strip_json_comments; use yaak_templates::{RenderErrorBehavior, RenderOptions, Tokens, transform_args}; use yaak_tls::find_client_certificate; @@ -522,7 +522,7 @@ async fn cmd_grpc_go( &method, in_msg_stream, &metadata, - client_cert, + client_cert.clone(), on_message.clone(), ) .await, @@ -538,7 +538,7 @@ async fn cmd_grpc_go( &method, in_msg_stream, &metadata, - client_cert, + client_cert.clone(), on_message.clone(), ) .await, @@ -551,7 +551,9 @@ async fn cmd_grpc_go( (false, false) => ( None, Some( - connection.unary(&service, &method, &msg, &metadata, client_cert).await, + connection + .unary(&service, &method, &msg, &metadata, client_cert.clone()) + .await, ), ), }; @@ -589,11 +591,34 @@ async fn cmd_grpc_go( &UpdateSource::from_window_label(window.label()), ) .unwrap(); + let response_message = msg.into_inner(); + let content = match connection + .serialize_message(&response_message, &metadata, client_cert.clone()) + .await + { + Ok(content) => content, + Err(err) => { + app_handle + .db() + .upsert_grpc_event( + &GrpcEvent { + content: "Failed to read response".to_string(), + error: Some(err.to_string()), + status: Some(Code::Internal as i32), + event_type: GrpcEventType::ConnectionEnd, + ..base_event.clone() + }, + &UpdateSource::from_window_label(window.label()), + ) + .unwrap(); + return; + } + }; app_handle .db() .upsert_grpc_event( &GrpcEvent { - content: serialize_message(&msg.into_inner()).unwrap(), + content, event_type: GrpcEventType::ServerMessage, ..base_event.clone() }, @@ -728,7 +753,28 @@ async fn cmd_grpc_go( loop { match stream.message().await { Ok(Some(msg)) => { - let message = serialize_message(&msg).unwrap(); + let message = match connection + .serialize_message(&msg, &metadata, client_cert.clone()) + .await + { + Ok(message) => message, + Err(err) => { + app_handle + .db() + .upsert_grpc_event( + &GrpcEvent { + content: "Failed to read response".to_string(), + error: Some(err.to_string()), + status: Some(Code::Internal as i32), + event_type: GrpcEventType::ConnectionEnd, + ..base_event.clone() + }, + &UpdateSource::from_window_label(window.label()), + ) + .unwrap(); + break; + } + }; app_handle .db() .upsert_grpc_event( diff --git a/crates/yaak-grpc/src/lib.rs b/crates/yaak-grpc/src/lib.rs index f38cf64a..a8c770a8 100644 --- a/crates/yaak-grpc/src/lib.rs +++ b/crates/yaak-grpc/src/lib.rs @@ -37,7 +37,7 @@ pub struct MethodDefinition { static SERIALIZE_OPTIONS: &'static SerializeOptions = &SerializeOptions::new().skip_default_fields(false).stringify_64_bit_integers(false); -pub fn serialize_message(msg: &DynamicMessage) -> Result { +pub(crate) fn serialize_dynamic_message_json(msg: &DynamicMessage) -> Result { let mut buf = Vec::new(); let mut se = serde_json::Serializer::pretty(&mut buf); msg.serialize_with_options(&mut se, SERIALIZE_OPTIONS).map_err(|e| e.to_string())?; diff --git a/crates/yaak-grpc/src/manager.rs b/crates/yaak-grpc/src/manager.rs index dd2c1c53..80a71b55 100644 --- a/crates/yaak-grpc/src/manager.rs +++ b/crates/yaak-grpc/src/manager.rs @@ -2,7 +2,8 @@ use crate::codec::DynamicCodec; use crate::error::Error::GenericError; use crate::error::Result; use crate::reflection::{ - fill_pool_from_files, fill_pool_from_reflection, method_desc_to_path, reflect_types_for_message, + fill_pool_from_files, fill_pool_from_reflection, method_desc_to_path, + reflect_types_for_dynamic_message, reflect_types_for_message, }; use crate::transport::get_transport; use crate::{MethodDefinition, ServiceDefinition, json_schema}; @@ -11,8 +12,11 @@ use hyper_util::client::legacy::Client; use hyper_util::client::legacy::connect::HttpConnector; use log::{info, warn}; pub use prost_reflect::DynamicMessage; +use prost_reflect::ReflectMessage; +use prost_reflect::prost::Message; use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor}; use serde_json::Deserializer; +use std::borrow::Cow; use std::collections::BTreeMap; use std::error::Error; use std::fmt; @@ -115,6 +119,38 @@ impl GrpcConnection { Ok(client.unary(req, path, codec).await?) } + pub async fn serialize_message( + &self, + message: &DynamicMessage, + metadata: &BTreeMap, + client_cert: Option, + ) -> Result { + let message = if self.use_reflection { + reflect_types_for_dynamic_message( + self.pool.clone(), + &self.uri, + message, + metadata, + client_cert, + ) + .await?; + + let message_name = message.descriptor().full_name().to_string(); + let message_desc = { + let pool = self.pool.read().await; + pool.get_message_by_name(&message_name) + .ok_or(GenericError(format!("Failed to find message {message_name}")))? + }; + let mut message_with_updated_pool = DynamicMessage::new(message_desc); + message_with_updated_pool.merge(message.encode_to_vec().as_slice())?; + Cow::Owned(message_with_updated_pool) + } else { + Cow::Borrowed(message) + }; + + crate::serialize_dynamic_message_json(message.as_ref()).map_err(GenericError) + } + pub async fn streaming( &self, service: &str, diff --git a/crates/yaak-grpc/src/reflection.rs b/crates/yaak-grpc/src/reflection.rs index ca41a9c1..7ea95266 100644 --- a/crates/yaak-grpc/src/reflection.rs +++ b/crates/yaak-grpc/src/reflection.rs @@ -7,7 +7,7 @@ use anyhow::anyhow; use async_recursion::async_recursion; use log::{debug, info, warn}; use prost::Message; -use prost_reflect::{DescriptorPool, MethodDescriptor}; +use prost_reflect::{DescriptorPool, DynamicMessage, MethodDescriptor, ReflectMessage, Value}; use prost_types::{FileDescriptorProto, FileDescriptorSet}; use std::collections::{BTreeMap, HashSet}; use std::env::temp_dir; @@ -233,6 +233,83 @@ pub(crate) async fn reflect_types_for_message( Ok(()) } +pub(crate) async fn reflect_types_for_dynamic_message( + pool: Arc>, + uri: &Uri, + message: &DynamicMessage, + metadata: &BTreeMap, + client_cert: Option, +) -> Result<()> { + let mut extra_types = HashSet::new(); + collect_any_types_from_dynamic_message(message, &mut extra_types); + + if extra_types.is_empty() { + return Ok(()); + } + + let mut client = AutoReflectionClient::new(uri, false, client_cert)?; + for extra_type in extra_types { + { + let guard = pool.read().await; + if guard.get_message_by_name(&extra_type).is_some() { + continue; + } + } + info!("Adding response file descriptor for {:?} from reflection", extra_type); + let req = MessageRequest::FileContainingSymbol(extra_type.clone().into()); + let resp = match client.send_reflection_request(req, metadata).await { + Ok(r) => r, + Err(e) => { + return Err(GenericError(format!( + "Error sending reflection request for response @type \"{extra_type}\": {e:?}", + ))); + } + }; + let files = match resp { + MessageResponse::FileDescriptorResponse(resp) => resp.file_descriptor_proto, + _ => panic!("Expected a FileDescriptorResponse variant"), + }; + + { + let mut guard = pool.write().await; + add_file_descriptors_to_pool(files, &mut *guard, &mut client, metadata).await; + } + } + + Ok(()) +} + +fn collect_any_types_from_dynamic_message(message: &DynamicMessage, out: &mut HashSet) { + if message.descriptor().full_name() == "google.protobuf.Any" { + if let Some(Value::String(type_url)) = message.get_field_by_name("type_url").as_deref() { + if let Some(full_name) = type_url.rsplit_once('/').map(|(_, name)| name) { + out.insert(full_name.to_string()); + } + } + } + + for (_, value) in message.fields() { + collect_any_types_from_value(value, out); + } +} + +fn collect_any_types_from_value(value: &Value, out: &mut HashSet) { + match value { + Value::Message(message) => collect_any_types_from_dynamic_message(message, out), + Value::List(values) => { + for value in values { + collect_any_types_from_value(value, out); + } + } + Value::Map(values) => { + for value in values.values() { + collect_any_types_from_value(value, out); + } + } + _ => {} + } +} + #[async_recursion] pub(crate) async fn add_file_descriptors_to_pool( fds: Vec>, diff --git a/packages/plugin-runtime-types/tsconfig.json b/packages/plugin-runtime-types/tsconfig.json index 9e49c5e4..9faeff51 100644 --- a/packages/plugin-runtime-types/tsconfig.json +++ b/packages/plugin-runtime-types/tsconfig.json @@ -5,6 +5,7 @@ "lib": ["es2021", "dom"], "declaration": true, "declarationDir": "./lib", + "rootDir": "./src", "outDir": "./lib", "strict": true, "types": ["node"] diff --git a/packages/plugin-runtime/tsconfig.json b/packages/plugin-runtime/tsconfig.json index 7b51fbe2..6d7ac8c9 100644 --- a/packages/plugin-runtime/tsconfig.json +++ b/packages/plugin-runtime/tsconfig.json @@ -10,6 +10,7 @@ "moduleResolution": "node16", "resolveJsonModule": true, "sourceMap": true, + "rootDir": "src", "outDir": "build" }, "include": ["src"]