Add configurable gRPC and WebSocket message size limit (#487)

This commit is contained in:
Gregory Schier
2026-06-30 09:14:41 -07:00
committed by GitHub
parent d1e6f8fb33
commit bbdfbcb9ca
34 changed files with 1669 additions and 1014 deletions
+11 -5
View File
@@ -33,15 +33,21 @@ impl AutoReflectionClient {
uri: &Uri,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<Self> {
let client_v1 = v1::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
);
let client_v1alpha = v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
);
)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
let client_v1alpha =
v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
Ok(AutoReflectionClient { use_v1alpha: false, client_v1, client_v1alpha })
}
+82 -22
View File
@@ -33,16 +33,13 @@ use tonic::transport::Uri;
use tonic::{IntoRequest, IntoStreamingRequest, Request, Response, Status, Streaming};
use yaak_tls::ClientCertificateConfig;
/// Maximum size for a single gRPC message (64 MB).
/// Tonic defaults to 4 MB, which is too small for large responses.
const GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
#[derive(Clone)]
pub struct GrpcConnection {
pool: Arc<RwLock<DescriptorPool>>,
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
pub uri: Uri,
use_reflection: bool,
max_message_size: usize,
}
#[derive(Default, Debug)]
@@ -101,8 +98,15 @@ impl GrpcConnection {
client_cert: Option<ClientCertificateConfig>,
) -> Result<Response<DynamicMessage>> {
if self.use_reflection {
reflect_types_for_message(self.pool.clone(), &self.uri, message, metadata, client_cert)
.await?;
reflect_types_for_message(
self.pool.clone(),
&self.uri,
message,
metadata,
client_cert,
self.max_message_size,
)
.await?;
}
let method = &self.method(&service, &method).await?;
let input_message = method.input();
@@ -111,8 +115,7 @@ impl GrpcConnection {
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
deserializer.end()?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let mut req = req_message.into_request();
decorate_req(metadata, &mut req)?;
@@ -137,6 +140,7 @@ impl GrpcConnection {
message,
metadata,
client_cert,
self.max_message_size,
)
.await?;
@@ -176,6 +180,7 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
let max_message_size = self.max_message_size;
stream
.then(move |json| {
let pool = pool.clone();
@@ -188,8 +193,15 @@ impl GrpcConnection {
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
if let Err(e) = reflect_types_for_message(
pool,
&uri,
&json,
&md,
client_cert,
max_message_size,
)
.await
{
warn!("Failed to resolve Any types: {e}");
}
@@ -211,8 +223,7 @@ impl GrpcConnection {
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -243,6 +254,7 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
let max_message_size = self.max_message_size;
stream
.then(move |json| {
let pool = pool.clone();
@@ -255,8 +267,15 @@ impl GrpcConnection {
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
if let Err(e) = reflect_types_for_message(
pool,
&uri,
&json,
&md,
client_cert,
max_message_size,
)
.await
{
warn!("Failed to resolve Any types: {e}");
}
@@ -278,8 +297,7 @@ impl GrpcConnection {
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -307,8 +325,7 @@ impl GrpcConnection {
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
deserializer.end()?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let mut req = req_message.into_request();
decorate_req(metadata, &mut req)?;
@@ -320,6 +337,23 @@ impl GrpcConnection {
}
}
fn grpc_client(
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
uri: Uri,
max_message_size: usize,
) -> tonic::client::Grpc<Client<HttpsConnector<HttpConnector>, BoxBody>> {
tonic::client::Grpc::with_origin(conn, uri)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size)
}
fn message_size_limit(setting: i32) -> usize {
match setting.try_into() {
Ok(0) | Err(_) => usize::MAX,
Ok(limit) => limit,
}
}
/// Configuration for GrpcHandle to compile proto files
#[derive(Clone)]
pub struct GrpcConfig {
@@ -356,6 +390,7 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<bool> {
let server_reflection = proto_files.is_empty();
let key = make_pool_key(id, uri, proto_files);
@@ -367,7 +402,14 @@ impl GrpcHandle {
let pool = if server_reflection {
let full_uri = uri_from_str(uri)?;
fill_pool_from_reflection(&full_uri, metadata, validate_certificates, client_cert).await
fill_pool_from_reflection(
&full_uri,
metadata,
validate_certificates,
client_cert,
message_size_limit(request_message_size),
)
.await
} else {
fill_pool_from_files(&self.config, proto_files).await
}?;
@@ -384,12 +426,21 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<Vec<ServiceDefinition>> {
// Ensure we have a pool; reflect only if missing
if self.get_pool(id, uri, proto_files).is_none() {
info!("Reflecting gRPC services for {} at {}", id, uri);
self.reflect(id, uri, proto_files, metadata, validate_certificates, client_cert)
.await?;
self.reflect(
id,
uri,
proto_files,
metadata,
validate_certificates,
client_cert,
request_message_size,
)
.await?;
}
let pool = self
@@ -429,8 +480,10 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<GrpcConnection> {
let use_reflection = proto_files.is_empty();
let max_message_size = message_size_limit(request_message_size);
if self.get_pool(id, uri, proto_files).is_none() {
self.reflect(
id,
@@ -439,6 +492,7 @@ impl GrpcHandle {
metadata,
validate_certificates,
client_cert.clone(),
request_message_size,
)
.await?;
}
@@ -448,7 +502,13 @@ impl GrpcHandle {
.clone();
let uri = uri_from_str(uri)?;
let conn = get_transport(validate_certificates, client_cert.clone())?;
Ok(GrpcConnection { pool: Arc::new(RwLock::new(pool)), use_reflection, conn, uri })
Ok(GrpcConnection {
pool: Arc::new(RwLock::new(pool)),
use_reflection,
conn,
uri,
max_message_size,
})
}
fn get_pool(&self, id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> Option<&DescriptorPool> {
+7 -3
View File
@@ -119,9 +119,11 @@ pub async fn fill_pool_from_reflection(
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<DescriptorPool> {
let mut pool = DescriptorPool::new();
let mut client = AutoReflectionClient::new(uri, validate_certificates, client_cert)?;
let mut client =
AutoReflectionClient::new(uri, validate_certificates, client_cert, max_message_size)?;
for service in list_services(&mut client, metadata).await? {
if service == "grpc.reflection.v1alpha.ServerReflection" {
@@ -192,6 +194,7 @@ pub(crate) async fn reflect_types_for_message(
json: &str,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<()> {
// 1. Collect all Any types in the JSON
let mut extra_types = Vec::new();
@@ -201,7 +204,7 @@ pub(crate) async fn reflect_types_for_message(
return Ok(()); // nothing to do
}
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
for extra_type in extra_types {
{
let guard = pool.read().await;
@@ -239,6 +242,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
message: &DynamicMessage,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<()> {
let mut extra_types = HashSet::new();
collect_any_types_from_dynamic_message(message, &mut extra_types);
@@ -247,7 +251,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
return Ok(());
}
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
for extra_type in extra_types {
{
let guard = pool.read().await;