Add request message size setting

This commit is contained in:
Gregory Schier
2026-06-29 16:29:42 -07:00
parent c3aecfdc0c
commit a9be57e6d9
22 changed files with 528 additions and 71 deletions
+4
View File
@@ -46,6 +46,7 @@ export type Folder = {
settingValidateCertificates: InheritedBoolSetting;
settingFollowRedirects: InheritedBoolSetting;
settingRequestTimeout: InheritedIntSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type GrpcRequest = {
@@ -69,6 +70,7 @@ export type GrpcRequest = {
*/
url: string;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type HttpRequest = {
@@ -146,6 +148,7 @@ export type WebsocketRequest = {
settingSendCookies: InheritedBoolSetting;
settingStoreCookies: InheritedBoolSetting;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type Workspace = {
@@ -162,6 +165,7 @@ export type Workspace = {
settingValidateCertificates: boolean;
settingFollowRedirects: boolean;
settingRequestTimeout: number;
settingRequestMessageSize: number;
settingDnsOverrides: Array<DnsOverride>;
settingSendCookies: boolean;
settingStoreCookies: boolean;
+11 -5
View File
@@ -33,15 +33,21 @@ impl AutoReflectionClient {
uri: &Uri,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<Self> {
let client_v1 = v1::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
);
let client_v1alpha = v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
);
)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
let client_v1alpha =
v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
get_transport(validate_certificates, client_cert.clone())?,
uri.clone(),
)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size);
Ok(AutoReflectionClient { use_v1alpha: false, client_v1, client_v1alpha })
}
+82 -22
View File
@@ -33,16 +33,13 @@ use tonic::transport::Uri;
use tonic::{IntoRequest, IntoStreamingRequest, Request, Response, Status, Streaming};
use yaak_tls::ClientCertificateConfig;
/// Maximum size for a single gRPC message (64 MB).
/// Tonic defaults to 4 MB, which is too small for large responses.
const GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
#[derive(Clone)]
pub struct GrpcConnection {
pool: Arc<RwLock<DescriptorPool>>,
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
pub uri: Uri,
use_reflection: bool,
max_message_size: usize,
}
#[derive(Default, Debug)]
@@ -101,8 +98,15 @@ impl GrpcConnection {
client_cert: Option<ClientCertificateConfig>,
) -> Result<Response<DynamicMessage>> {
if self.use_reflection {
reflect_types_for_message(self.pool.clone(), &self.uri, message, metadata, client_cert)
.await?;
reflect_types_for_message(
self.pool.clone(),
&self.uri,
message,
metadata,
client_cert,
self.max_message_size,
)
.await?;
}
let method = &self.method(&service, &method).await?;
let input_message = method.input();
@@ -111,8 +115,7 @@ impl GrpcConnection {
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
deserializer.end()?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let mut req = req_message.into_request();
decorate_req(metadata, &mut req)?;
@@ -137,6 +140,7 @@ impl GrpcConnection {
message,
metadata,
client_cert,
self.max_message_size,
)
.await?;
@@ -176,6 +180,7 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
let max_message_size = self.max_message_size;
stream
.then(move |json| {
let pool = pool.clone();
@@ -188,8 +193,15 @@ impl GrpcConnection {
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
if let Err(e) = reflect_types_for_message(
pool,
&uri,
&json,
&md,
client_cert,
max_message_size,
)
.await
{
warn!("Failed to resolve Any types: {e}");
}
@@ -211,8 +223,7 @@ impl GrpcConnection {
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -243,6 +254,7 @@ impl GrpcConnection {
let md = metadata.clone();
let use_reflection = self.use_reflection.clone();
let client_cert = client_cert.clone();
let max_message_size = self.max_message_size;
stream
.then(move |json| {
let pool = pool.clone();
@@ -255,8 +267,15 @@ impl GrpcConnection {
let json_clone = json.clone();
async move {
if use_reflection {
if let Err(e) =
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
if let Err(e) = reflect_types_for_message(
pool,
&uri,
&json,
&md,
client_cert,
max_message_size,
)
.await
{
warn!("Failed to resolve Any types: {e}");
}
@@ -278,8 +297,7 @@ impl GrpcConnection {
.filter_map(|x| x)
};
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -307,8 +325,7 @@ impl GrpcConnection {
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
deserializer.end()?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
let mut req = req_message.into_request();
decorate_req(metadata, &mut req)?;
@@ -320,6 +337,23 @@ impl GrpcConnection {
}
}
fn grpc_client(
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
uri: Uri,
max_message_size: usize,
) -> tonic::client::Grpc<Client<HttpsConnector<HttpConnector>, BoxBody>> {
tonic::client::Grpc::with_origin(conn, uri)
.max_decoding_message_size(max_message_size)
.max_encoding_message_size(max_message_size)
}
fn message_size_limit(setting: i32) -> usize {
match setting.try_into() {
Ok(0) | Err(_) => usize::MAX,
Ok(limit) => limit,
}
}
/// Configuration for GrpcHandle to compile proto files
#[derive(Clone)]
pub struct GrpcConfig {
@@ -356,6 +390,7 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<bool> {
let server_reflection = proto_files.is_empty();
let key = make_pool_key(id, uri, proto_files);
@@ -367,7 +402,14 @@ impl GrpcHandle {
let pool = if server_reflection {
let full_uri = uri_from_str(uri)?;
fill_pool_from_reflection(&full_uri, metadata, validate_certificates, client_cert).await
fill_pool_from_reflection(
&full_uri,
metadata,
validate_certificates,
client_cert,
message_size_limit(request_message_size),
)
.await
} else {
fill_pool_from_files(&self.config, proto_files).await
}?;
@@ -384,12 +426,21 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<Vec<ServiceDefinition>> {
// Ensure we have a pool; reflect only if missing
if self.get_pool(id, uri, proto_files).is_none() {
info!("Reflecting gRPC services for {} at {}", id, uri);
self.reflect(id, uri, proto_files, metadata, validate_certificates, client_cert)
.await?;
self.reflect(
id,
uri,
proto_files,
metadata,
validate_certificates,
client_cert,
request_message_size,
)
.await?;
}
let pool = self
@@ -429,8 +480,10 @@ impl GrpcHandle {
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<GrpcConnection> {
let use_reflection = proto_files.is_empty();
let max_message_size = message_size_limit(request_message_size);
if self.get_pool(id, uri, proto_files).is_none() {
self.reflect(
id,
@@ -439,6 +492,7 @@ impl GrpcHandle {
metadata,
validate_certificates,
client_cert.clone(),
request_message_size,
)
.await?;
}
@@ -448,7 +502,13 @@ impl GrpcHandle {
.clone();
let uri = uri_from_str(uri)?;
let conn = get_transport(validate_certificates, client_cert.clone())?;
Ok(GrpcConnection { pool: Arc::new(RwLock::new(pool)), use_reflection, conn, uri })
Ok(GrpcConnection {
pool: Arc::new(RwLock::new(pool)),
use_reflection,
conn,
uri,
max_message_size,
})
}
fn get_pool(&self, id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> Option<&DescriptorPool> {
+7 -3
View File
@@ -119,9 +119,11 @@ pub async fn fill_pool_from_reflection(
metadata: &BTreeMap<String, String>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<DescriptorPool> {
let mut pool = DescriptorPool::new();
let mut client = AutoReflectionClient::new(uri, validate_certificates, client_cert)?;
let mut client =
AutoReflectionClient::new(uri, validate_certificates, client_cert, max_message_size)?;
for service in list_services(&mut client, metadata).await? {
if service == "grpc.reflection.v1alpha.ServerReflection" {
@@ -192,6 +194,7 @@ pub(crate) async fn reflect_types_for_message(
json: &str,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<()> {
// 1. Collect all Any types in the JSON
let mut extra_types = Vec::new();
@@ -201,7 +204,7 @@ pub(crate) async fn reflect_types_for_message(
return Ok(()); // nothing to do
}
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
for extra_type in extra_types {
{
let guard = pool.read().await;
@@ -239,6 +242,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
message: &DynamicMessage,
metadata: &BTreeMap<String, String>,
client_cert: Option<ClientCertificateConfig>,
max_message_size: usize,
) -> Result<()> {
let mut extra_types = HashSet::new();
collect_any_types_from_dynamic_message(message, &mut extra_types);
@@ -247,7 +251,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
return Ok(());
}
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
for extra_type in extra_types {
{
let guard = pool.read().await;
+4
View File
@@ -109,6 +109,7 @@ export type Folder = {
settingValidateCertificates: InheritedBoolSetting;
settingFollowRedirects: InheritedBoolSetting;
settingRequestTimeout: InheritedIntSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type GraphQlIntrospection = {
@@ -184,6 +185,7 @@ export type GrpcRequest = {
*/
url: string;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type HttpRequest = {
@@ -482,6 +484,7 @@ export type WebsocketRequest = {
settingSendCookies: InheritedBoolSetting;
settingStoreCookies: InheritedBoolSetting;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type Workspace = {
@@ -498,6 +501,7 @@ export type Workspace = {
settingValidateCertificates: boolean;
settingFollowRedirects: boolean;
settingRequestTimeout: number;
settingRequestMessageSize: number;
settingDnsOverrides: Array<DnsOverride>;
settingSendCookies: boolean;
settingStoreCookies: boolean;
@@ -0,0 +1,7 @@
ALTER TABLE workspaces ADD COLUMN setting_request_message_size INTEGER DEFAULT 67108864 NOT NULL;
ALTER TABLE folders ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
ALTER TABLE websocket_requests ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
ALTER TABLE grpc_requests ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
+47 -1
View File
@@ -21,6 +21,8 @@ use ts_rs::TS;
use yaak_database::{Result as DbResult, UpdateSource};
pub use yaak_database::{UpsertModelInfo, upsert_date};
pub const DEFAULT_REQUEST_MESSAGE_SIZE: i32 = 64 * 1024 * 1024;
#[macro_export]
macro_rules! impl_model {
($t:ty, $variant:ident) => {
@@ -120,6 +122,7 @@ pub struct ResolvedHttpRequestSettings {
pub validate_certificates: ResolvedSetting<bool>,
pub follow_redirects: ResolvedSetting<bool>,
pub request_timeout: ResolvedSetting<i32>,
pub request_message_size: ResolvedSetting<i32>,
pub send_cookies: ResolvedSetting<bool>,
pub store_cookies: ResolvedSetting<bool>,
}
@@ -130,6 +133,7 @@ impl Default for ResolvedHttpRequestSettings {
validate_certificates: ResolvedSetting::default_source(true),
follow_redirects: ResolvedSetting::default_source(true),
request_timeout: ResolvedSetting::default_source(0),
request_message_size: ResolvedSetting::default_source(DEFAULT_REQUEST_MESSAGE_SIZE),
send_cookies: ResolvedSetting::default_source(true),
store_cookies: ResolvedSetting::default_source(true),
}
@@ -400,6 +404,8 @@ pub struct Workspace {
#[serde(default = "default_true")]
pub setting_follow_redirects: bool,
pub setting_request_timeout: i32,
#[serde(default = "default_request_message_size")]
pub setting_request_message_size: i32,
#[serde(default)]
pub setting_dns_overrides: Vec<DnsOverride>,
#[serde(default = "default_true")]
@@ -445,6 +451,7 @@ impl UpsertModelInfo for Workspace {
(EncryptionKeyChallenge, self.encryption_key_challenge.into()),
(SettingFollowRedirects, self.setting_follow_redirects.into()),
(SettingRequestTimeout, self.setting_request_timeout.into()),
(SettingRequestMessageSize, self.setting_request_message_size.into()),
(SettingValidateCertificates, self.setting_validate_certificates.into()),
(SettingDnsOverrides, serde_json::to_string(&self.setting_dns_overrides)?.into()),
(SettingSendCookies, self.setting_send_cookies.into()),
@@ -463,7 +470,7 @@ impl UpsertModelInfo for Workspace {
WorkspaceIden::EncryptionKeyChallenge,
WorkspaceIden::SettingRequestTimeout,
WorkspaceIden::SettingFollowRedirects,
WorkspaceIden::SettingRequestTimeout,
WorkspaceIden::SettingRequestMessageSize,
WorkspaceIden::SettingValidateCertificates,
WorkspaceIden::SettingDnsOverrides,
WorkspaceIden::SettingSendCookies,
@@ -491,6 +498,7 @@ impl UpsertModelInfo for Workspace {
authentication_type: row.get("authentication_type")?,
setting_follow_redirects: row.get("setting_follow_redirects")?,
setting_request_timeout: row.get("setting_request_timeout")?,
setting_request_message_size: row.get("setting_request_message_size")?,
setting_validate_certificates: row.get("setting_validate_certificates")?,
setting_dns_overrides: serde_json::from_str(&setting_dns_overrides).unwrap_or_default(),
setting_send_cookies: row.get("setting_send_cookies")?,
@@ -962,6 +970,8 @@ pub struct Folder {
pub setting_validate_certificates: InheritedBoolSetting,
pub setting_follow_redirects: InheritedBoolSetting,
pub setting_request_timeout: InheritedIntSetting,
#[serde(default = "default_request_message_size_setting")]
pub setting_request_message_size: InheritedIntSetting,
}
impl UpsertModelInfo for Folder {
@@ -1009,6 +1019,10 @@ impl UpsertModelInfo for Folder {
),
(SettingFollowRedirects, serde_json::to_string(&self.setting_follow_redirects)?.into()),
(SettingRequestTimeout, serde_json::to_string(&self.setting_request_timeout)?.into()),
(
SettingRequestMessageSize,
serde_json::to_string(&self.setting_request_message_size)?.into(),
),
])
}
@@ -1027,6 +1041,7 @@ impl UpsertModelInfo for Folder {
FolderIden::SettingValidateCertificates,
FolderIden::SettingFollowRedirects,
FolderIden::SettingRequestTimeout,
FolderIden::SettingRequestMessageSize,
]
}
@@ -1041,6 +1056,7 @@ impl UpsertModelInfo for Folder {
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
let setting_follow_redirects: String = row.get("setting_follow_redirects")?;
let setting_request_timeout: String = row.get("setting_request_timeout")?;
let setting_request_message_size: String = row.get("setting_request_message_size")?;
Ok(Self {
id: row.get("id")?,
model: row.get("model")?,
@@ -1062,6 +1078,8 @@ impl UpsertModelInfo for Folder {
.unwrap_or_default(),
setting_request_timeout: serde_json::from_str(&setting_request_timeout)
.unwrap_or_default(),
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
.unwrap_or_else(|_| default_request_message_size_setting()),
})
}
}
@@ -1398,6 +1416,8 @@ pub struct WebsocketRequest {
pub setting_send_cookies: InheritedBoolSetting,
pub setting_store_cookies: InheritedBoolSetting,
pub setting_validate_certificates: InheritedBoolSetting,
#[serde(default = "default_request_message_size_setting")]
pub setting_request_message_size: InheritedIntSetting,
}
impl UpsertModelInfo for WebsocketRequest {
@@ -1446,6 +1466,10 @@ impl UpsertModelInfo for WebsocketRequest {
SettingValidateCertificates,
serde_json::to_string(&self.setting_validate_certificates)?.into(),
),
(
SettingRequestMessageSize,
serde_json::to_string(&self.setting_request_message_size)?.into(),
),
])
}
@@ -1466,6 +1490,7 @@ impl UpsertModelInfo for WebsocketRequest {
WebsocketRequestIden::SettingSendCookies,
WebsocketRequestIden::SettingStoreCookies,
WebsocketRequestIden::SettingValidateCertificates,
WebsocketRequestIden::SettingRequestMessageSize,
]
}
@@ -1479,6 +1504,7 @@ impl UpsertModelInfo for WebsocketRequest {
let setting_send_cookies: String = row.get("setting_send_cookies")?;
let setting_store_cookies: String = row.get("setting_store_cookies")?;
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
let setting_request_message_size: String = row.get("setting_request_message_size")?;
Ok(Self {
id: row.get("id")?,
model: row.get("model")?,
@@ -1499,6 +1525,8 @@ impl UpsertModelInfo for WebsocketRequest {
setting_store_cookies: serde_json::from_str(&setting_store_cookies).unwrap_or_default(),
setting_validate_certificates: serde_json::from_str(&setting_validate_certificates)
.unwrap_or_default(),
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
.unwrap_or_else(|_| default_request_message_size_setting()),
})
}
}
@@ -2039,6 +2067,8 @@ pub struct GrpcRequest {
/// Server URL (http for plaintext or https for secure)
pub url: String,
pub setting_validate_certificates: InheritedBoolSetting,
#[serde(default = "default_request_message_size_setting")]
pub setting_request_message_size: InheritedIntSetting,
}
impl UpsertModelInfo for GrpcRequest {
@@ -2086,6 +2116,10 @@ impl UpsertModelInfo for GrpcRequest {
SettingValidateCertificates,
serde_json::to_string(&self.setting_validate_certificates)?.into(),
),
(
SettingRequestMessageSize,
serde_json::to_string(&self.setting_request_message_size)?.into(),
),
])
}
@@ -2105,6 +2139,7 @@ impl UpsertModelInfo for GrpcRequest {
GrpcRequestIden::Authentication,
GrpcRequestIden::Metadata,
GrpcRequestIden::SettingValidateCertificates,
GrpcRequestIden::SettingRequestMessageSize,
]
}
@@ -2115,6 +2150,7 @@ impl UpsertModelInfo for GrpcRequest {
let authentication: String = row.get("authentication")?;
let metadata: String = row.get("metadata")?;
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
let setting_request_message_size: String = row.get("setting_request_message_size")?;
Ok(Self {
id: row.get("id")?,
model: row.get("model")?,
@@ -2134,6 +2170,8 @@ impl UpsertModelInfo for GrpcRequest {
metadata: serde_json::from_str(metadata.as_str()).unwrap_or_default(),
setting_validate_certificates: serde_json::from_str(&setting_validate_certificates)
.unwrap_or_default(),
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
.unwrap_or_else(|_| default_request_message_size_setting()),
})
}
}
@@ -2684,6 +2722,14 @@ fn default_true() -> bool {
true
}
fn default_request_message_size() -> i32 {
DEFAULT_REQUEST_MESSAGE_SIZE
}
fn default_request_message_size_setting() -> InheritedIntSetting {
InheritedIntSetting { enabled: false, value: DEFAULT_REQUEST_MESSAGE_SIZE }
}
fn default_http_method() -> String {
"GET".to_string()
}
@@ -180,6 +180,14 @@ impl<'a> ClientDb<'a> {
} else {
parent.request_timeout
},
request_message_size: if folder.setting_request_message_size.enabled {
ResolvedSetting::from_model(
folder.setting_request_message_size.value,
AnyModel::Folder(folder.clone()),
)
} else {
parent.request_message_size
},
send_cookies: if folder.setting_send_cookies.enabled {
ResolvedSetting::from_model(
folder.setting_send_cookies.value,
@@ -129,6 +129,14 @@ impl<'a> ClientDb<'a> {
} else {
parent.validate_certificates
},
request_message_size: if grpc_request.setting_request_message_size.enabled {
ResolvedSetting::from_model(
grpc_request.setting_request_message_size.value,
AnyModel::GrpcRequest(grpc_request.clone()),
)
} else {
parent.request_message_size
},
..parent
})
}
@@ -131,6 +131,7 @@ impl<'a> ClientDb<'a> {
} else {
parent.request_timeout
},
request_message_size: parent.request_message_size,
send_cookies: if http_request.setting_send_cookies.enabled {
ResolvedSetting::from_model(
http_request.setting_send_cookies.value,
@@ -139,6 +139,14 @@ impl<'a> ClientDb<'a> {
} else {
parent.validate_certificates
},
request_message_size: if websocket_request.setting_request_message_size.enabled {
ResolvedSetting::from_model(
websocket_request.setting_request_message_size.value,
AnyModel::WebsocketRequest(websocket_request.clone()),
)
} else {
parent.request_message_size
},
send_cookies: if websocket_request.setting_send_cookies.enabled {
ResolvedSetting::from_model(
websocket_request.setting_send_cookies.value,
@@ -21,6 +21,7 @@ impl<'a> ClientDb<'a> {
&Workspace {
name: "Yaak".to_string(),
setting_follow_redirects: true,
setting_request_message_size: crate::models::DEFAULT_REQUEST_MESSAGE_SIZE,
setting_validate_certificates: true,
..Default::default()
},
@@ -102,6 +103,10 @@ impl<'a> ClientDb<'a> {
workspace.setting_request_timeout,
AnyModel::Workspace(workspace.clone()),
),
request_message_size: ResolvedSetting::from_model(
workspace.setting_request_message_size,
AnyModel::Workspace(workspace.clone()),
),
send_cookies: ResolvedSetting::from_model(
workspace.setting_send_cookies,
AnyModel::Workspace(workspace.clone()),
+4
View File
@@ -108,6 +108,7 @@ export type Folder = {
settingValidateCertificates: InheritedBoolSetting;
settingFollowRedirects: InheritedBoolSetting;
settingRequestTimeout: InheritedIntSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type GraphQlIntrospection = {
@@ -183,6 +184,7 @@ export type GrpcRequest = {
*/
url: string;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type HttpRequest = {
@@ -450,6 +452,7 @@ export type WebsocketRequest = {
settingSendCookies: InheritedBoolSetting;
settingStoreCookies: InheritedBoolSetting;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type Workspace = {
@@ -466,6 +469,7 @@ export type Workspace = {
settingValidateCertificates: boolean;
settingFollowRedirects: boolean;
settingRequestTimeout: number;
settingRequestMessageSize: number;
settingDnsOverrides: Array<DnsOverride>;
settingSendCookies: boolean;
settingStoreCookies: boolean;
+4
View File
@@ -46,6 +46,7 @@ export type Folder = {
settingValidateCertificates: InheritedBoolSetting;
settingFollowRedirects: InheritedBoolSetting;
settingRequestTimeout: InheritedIntSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type GrpcRequest = {
@@ -69,6 +70,7 @@ export type GrpcRequest = {
*/
url: string;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type HttpRequest = {
@@ -159,6 +161,7 @@ export type WebsocketRequest = {
settingSendCookies: InheritedBoolSetting;
settingStoreCookies: InheritedBoolSetting;
settingValidateCertificates: InheritedBoolSetting;
settingRequestMessageSize: InheritedIntSetting;
};
export type Workspace = {
@@ -175,6 +178,7 @@ export type Workspace = {
settingValidateCertificates: boolean;
settingFollowRedirects: boolean;
settingRequestTimeout: number;
settingRequestMessageSize: number;
settingDnsOverrides: Array<DnsOverride>;
settingSendCookies: boolean;
settingStoreCookies: boolean;
+11 -1
View File
@@ -20,6 +20,7 @@ pub async fn ws_connect(
headers: HeaderMap<HeaderValue>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response)> {
info!("Connecting to WS {url}");
let tls_config = get_tls_config(validate_certificates, WITH_ALPN, client_cert.clone())?;
@@ -34,7 +35,7 @@ pub async fn ws_connect(
let (stream, response) = connect_async_tls_with_config(
req,
Some(WebSocketConfig::default()),
Some(websocket_config(request_message_size)),
false,
Some(Connector::Rustls(Arc::new(tls_config))),
)
@@ -48,3 +49,12 @@ pub async fn ws_connect(
Ok((stream, response))
}
fn websocket_config(request_message_size: i32) -> WebSocketConfig {
let max_message_size = message_size_limit(request_message_size);
WebSocketConfig::default().max_message_size(max_message_size).max_frame_size(max_message_size)
}
pub(crate) fn message_size_limit(setting: i32) -> Option<usize> {
setting.try_into().ok().filter(|limit| *limit > 0)
}
+28 -7
View File
@@ -1,4 +1,5 @@
use crate::connect::ws_connect;
use crate::connect::{message_size_limit, ws_connect};
use crate::error::Error::GenericError;
use crate::error::Result;
use futures_util::stream::SplitSink;
use futures_util::{SinkExt, StreamExt};
@@ -15,10 +16,16 @@ use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use yaak_tls::ClientCertificateConfig;
type WebsocketSink = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
struct WebsocketConnection {
max_message_size: Option<usize>,
sink: WebsocketSink,
}
#[derive(Clone)]
pub struct WebsocketManager {
connections:
Arc<Mutex<HashMap<String, SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
connections: Arc<Mutex<HashMap<String, WebsocketConnection>>>,
read_tasks: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
}
@@ -35,14 +42,20 @@ impl WebsocketManager {
receive_tx: mpsc::Sender<Message>,
validate_certificates: bool,
client_cert: Option<ClientCertificateConfig>,
request_message_size: i32,
) -> Result<Response> {
let tx = receive_tx.clone();
let max_message_size = message_size_limit(request_message_size);
let (stream, response) =
ws_connect(url, headers, validate_certificates, client_cert).await?;
ws_connect(url, headers, validate_certificates, client_cert, request_message_size)
.await?;
let (write, mut read) = stream.split();
self.connections.lock().await.insert(id.to_string(), write);
self.connections
.lock()
.await
.insert(id.to_string(), WebsocketConnection { max_message_size, sink: write });
let handle = {
let connection_id = id.to_string();
@@ -76,7 +89,15 @@ impl WebsocketManager {
None => return Ok(()),
Some(c) => c,
};
connection.send(msg).await?;
if let Some(limit) = connection.max_message_size {
let message_size = msg.len();
if message_size > limit {
return Err(GenericError(format!(
"WebSocket message too large: found {message_size} bytes, the limit is {limit} bytes"
)));
}
}
connection.sink.send(msg).await?;
Ok(())
}
@@ -84,7 +105,7 @@ impl WebsocketManager {
info!("Closing websocket");
if let Some(mut connection) = self.connections.lock().await.remove(id) {
// Wait a maximum of 1 second for the connection to close
if let Err(e) = connection.close().await {
if let Err(e) = connection.sink.close().await {
warn!("Failed to close websocket connection {e:?}");
};
}