mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-06-30 01:51:37 +02:00
Add request message size setting
This commit is contained in:
Generated
+4
@@ -46,6 +46,7 @@ export type Folder = {
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingFollowRedirects: InheritedBoolSetting;
|
||||
settingRequestTimeout: InheritedIntSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type GrpcRequest = {
|
||||
@@ -69,6 +70,7 @@ export type GrpcRequest = {
|
||||
*/
|
||||
url: string;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type HttpRequest = {
|
||||
@@ -146,6 +148,7 @@ export type WebsocketRequest = {
|
||||
settingSendCookies: InheritedBoolSetting;
|
||||
settingStoreCookies: InheritedBoolSetting;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type Workspace = {
|
||||
@@ -162,6 +165,7 @@ export type Workspace = {
|
||||
settingValidateCertificates: boolean;
|
||||
settingFollowRedirects: boolean;
|
||||
settingRequestTimeout: number;
|
||||
settingRequestMessageSize: number;
|
||||
settingDnsOverrides: Array<DnsOverride>;
|
||||
settingSendCookies: boolean;
|
||||
settingStoreCookies: boolean;
|
||||
|
||||
@@ -33,15 +33,21 @@ impl AutoReflectionClient {
|
||||
uri: &Uri,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
max_message_size: usize,
|
||||
) -> Result<Self> {
|
||||
let client_v1 = v1::server_reflection_client::ServerReflectionClient::with_origin(
|
||||
get_transport(validate_certificates, client_cert.clone())?,
|
||||
uri.clone(),
|
||||
);
|
||||
let client_v1alpha = v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
|
||||
get_transport(validate_certificates, client_cert.clone())?,
|
||||
uri.clone(),
|
||||
);
|
||||
)
|
||||
.max_decoding_message_size(max_message_size)
|
||||
.max_encoding_message_size(max_message_size);
|
||||
let client_v1alpha =
|
||||
v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
|
||||
get_transport(validate_certificates, client_cert.clone())?,
|
||||
uri.clone(),
|
||||
)
|
||||
.max_decoding_message_size(max_message_size)
|
||||
.max_encoding_message_size(max_message_size);
|
||||
Ok(AutoReflectionClient { use_v1alpha: false, client_v1, client_v1alpha })
|
||||
}
|
||||
|
||||
|
||||
@@ -33,16 +33,13 @@ use tonic::transport::Uri;
|
||||
use tonic::{IntoRequest, IntoStreamingRequest, Request, Response, Status, Streaming};
|
||||
use yaak_tls::ClientCertificateConfig;
|
||||
|
||||
/// Maximum size for a single gRPC message (64 MB).
|
||||
/// Tonic defaults to 4 MB, which is too small for large responses.
|
||||
const GRPC_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcConnection {
|
||||
pool: Arc<RwLock<DescriptorPool>>,
|
||||
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
|
||||
pub uri: Uri,
|
||||
use_reflection: bool,
|
||||
max_message_size: usize,
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
@@ -101,8 +98,15 @@ impl GrpcConnection {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Response<DynamicMessage>> {
|
||||
if self.use_reflection {
|
||||
reflect_types_for_message(self.pool.clone(), &self.uri, message, metadata, client_cert)
|
||||
.await?;
|
||||
reflect_types_for_message(
|
||||
self.pool.clone(),
|
||||
&self.uri,
|
||||
message,
|
||||
metadata,
|
||||
client_cert,
|
||||
self.max_message_size,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
let method = &self.method(&service, &method).await?;
|
||||
let input_message = method.input();
|
||||
@@ -111,8 +115,7 @@ impl GrpcConnection {
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
|
||||
deserializer.end()?;
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
|
||||
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
|
||||
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
|
||||
|
||||
let mut req = req_message.into_request();
|
||||
decorate_req(metadata, &mut req)?;
|
||||
@@ -137,6 +140,7 @@ impl GrpcConnection {
|
||||
message,
|
||||
metadata,
|
||||
client_cert,
|
||||
self.max_message_size,
|
||||
)
|
||||
.await?;
|
||||
|
||||
@@ -176,6 +180,7 @@ impl GrpcConnection {
|
||||
let md = metadata.clone();
|
||||
let use_reflection = self.use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
let max_message_size = self.max_message_size;
|
||||
stream
|
||||
.then(move |json| {
|
||||
let pool = pool.clone();
|
||||
@@ -188,8 +193,15 @@ impl GrpcConnection {
|
||||
let json_clone = json.clone();
|
||||
async move {
|
||||
if use_reflection {
|
||||
if let Err(e) =
|
||||
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
|
||||
if let Err(e) = reflect_types_for_message(
|
||||
pool,
|
||||
&uri,
|
||||
&json,
|
||||
&md,
|
||||
client_cert,
|
||||
max_message_size,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Failed to resolve Any types: {e}");
|
||||
}
|
||||
@@ -211,8 +223,7 @@ impl GrpcConnection {
|
||||
.filter_map(|x| x)
|
||||
};
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
|
||||
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
|
||||
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
|
||||
@@ -243,6 +254,7 @@ impl GrpcConnection {
|
||||
let md = metadata.clone();
|
||||
let use_reflection = self.use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
let max_message_size = self.max_message_size;
|
||||
stream
|
||||
.then(move |json| {
|
||||
let pool = pool.clone();
|
||||
@@ -255,8 +267,15 @@ impl GrpcConnection {
|
||||
let json_clone = json.clone();
|
||||
async move {
|
||||
if use_reflection {
|
||||
if let Err(e) =
|
||||
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
|
||||
if let Err(e) = reflect_types_for_message(
|
||||
pool,
|
||||
&uri,
|
||||
&json,
|
||||
&md,
|
||||
client_cert,
|
||||
max_message_size,
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!("Failed to resolve Any types: {e}");
|
||||
}
|
||||
@@ -278,8 +297,7 @@ impl GrpcConnection {
|
||||
.filter_map(|x| x)
|
||||
};
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
|
||||
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
|
||||
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
|
||||
@@ -307,8 +325,7 @@ impl GrpcConnection {
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
|
||||
deserializer.end()?;
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone())
|
||||
.max_decoding_message_size(GRPC_MAX_MESSAGE_SIZE);
|
||||
let mut client = grpc_client(self.conn.clone(), self.uri.clone(), self.max_message_size);
|
||||
|
||||
let mut req = req_message.into_request();
|
||||
decorate_req(metadata, &mut req)?;
|
||||
@@ -320,6 +337,23 @@ impl GrpcConnection {
|
||||
}
|
||||
}
|
||||
|
||||
fn grpc_client(
|
||||
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
|
||||
uri: Uri,
|
||||
max_message_size: usize,
|
||||
) -> tonic::client::Grpc<Client<HttpsConnector<HttpConnector>, BoxBody>> {
|
||||
tonic::client::Grpc::with_origin(conn, uri)
|
||||
.max_decoding_message_size(max_message_size)
|
||||
.max_encoding_message_size(max_message_size)
|
||||
}
|
||||
|
||||
fn message_size_limit(setting: i32) -> usize {
|
||||
match setting.try_into() {
|
||||
Ok(0) | Err(_) => usize::MAX,
|
||||
Ok(limit) => limit,
|
||||
}
|
||||
}
|
||||
|
||||
/// Configuration for GrpcHandle to compile proto files
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcConfig {
|
||||
@@ -356,6 +390,7 @@ impl GrpcHandle {
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
request_message_size: i32,
|
||||
) -> Result<bool> {
|
||||
let server_reflection = proto_files.is_empty();
|
||||
let key = make_pool_key(id, uri, proto_files);
|
||||
@@ -367,7 +402,14 @@ impl GrpcHandle {
|
||||
|
||||
let pool = if server_reflection {
|
||||
let full_uri = uri_from_str(uri)?;
|
||||
fill_pool_from_reflection(&full_uri, metadata, validate_certificates, client_cert).await
|
||||
fill_pool_from_reflection(
|
||||
&full_uri,
|
||||
metadata,
|
||||
validate_certificates,
|
||||
client_cert,
|
||||
message_size_limit(request_message_size),
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
fill_pool_from_files(&self.config, proto_files).await
|
||||
}?;
|
||||
@@ -384,12 +426,21 @@ impl GrpcHandle {
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
request_message_size: i32,
|
||||
) -> Result<Vec<ServiceDefinition>> {
|
||||
// Ensure we have a pool; reflect only if missing
|
||||
if self.get_pool(id, uri, proto_files).is_none() {
|
||||
info!("Reflecting gRPC services for {} at {}", id, uri);
|
||||
self.reflect(id, uri, proto_files, metadata, validate_certificates, client_cert)
|
||||
.await?;
|
||||
self.reflect(
|
||||
id,
|
||||
uri,
|
||||
proto_files,
|
||||
metadata,
|
||||
validate_certificates,
|
||||
client_cert,
|
||||
request_message_size,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let pool = self
|
||||
@@ -429,8 +480,10 @@ impl GrpcHandle {
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
request_message_size: i32,
|
||||
) -> Result<GrpcConnection> {
|
||||
let use_reflection = proto_files.is_empty();
|
||||
let max_message_size = message_size_limit(request_message_size);
|
||||
if self.get_pool(id, uri, proto_files).is_none() {
|
||||
self.reflect(
|
||||
id,
|
||||
@@ -439,6 +492,7 @@ impl GrpcHandle {
|
||||
metadata,
|
||||
validate_certificates,
|
||||
client_cert.clone(),
|
||||
request_message_size,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
@@ -448,7 +502,13 @@ impl GrpcHandle {
|
||||
.clone();
|
||||
let uri = uri_from_str(uri)?;
|
||||
let conn = get_transport(validate_certificates, client_cert.clone())?;
|
||||
Ok(GrpcConnection { pool: Arc::new(RwLock::new(pool)), use_reflection, conn, uri })
|
||||
Ok(GrpcConnection {
|
||||
pool: Arc::new(RwLock::new(pool)),
|
||||
use_reflection,
|
||||
conn,
|
||||
uri,
|
||||
max_message_size,
|
||||
})
|
||||
}
|
||||
|
||||
fn get_pool(&self, id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> Option<&DescriptorPool> {
|
||||
|
||||
@@ -119,9 +119,11 @@ pub async fn fill_pool_from_reflection(
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
max_message_size: usize,
|
||||
) -> Result<DescriptorPool> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let mut client = AutoReflectionClient::new(uri, validate_certificates, client_cert)?;
|
||||
let mut client =
|
||||
AutoReflectionClient::new(uri, validate_certificates, client_cert, max_message_size)?;
|
||||
|
||||
for service in list_services(&mut client, metadata).await? {
|
||||
if service == "grpc.reflection.v1alpha.ServerReflection" {
|
||||
@@ -192,6 +194,7 @@ pub(crate) async fn reflect_types_for_message(
|
||||
json: &str,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
max_message_size: usize,
|
||||
) -> Result<()> {
|
||||
// 1. Collect all Any types in the JSON
|
||||
let mut extra_types = Vec::new();
|
||||
@@ -201,7 +204,7 @@ pub(crate) async fn reflect_types_for_message(
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
|
||||
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
|
||||
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
|
||||
for extra_type in extra_types {
|
||||
{
|
||||
let guard = pool.read().await;
|
||||
@@ -239,6 +242,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
|
||||
message: &DynamicMessage,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
max_message_size: usize,
|
||||
) -> Result<()> {
|
||||
let mut extra_types = HashSet::new();
|
||||
collect_any_types_from_dynamic_message(message, &mut extra_types);
|
||||
@@ -247,7 +251,7 @@ pub(crate) async fn reflect_types_for_dynamic_message(
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
|
||||
let mut client = AutoReflectionClient::new(uri, false, client_cert, max_message_size)?;
|
||||
for extra_type in extra_types {
|
||||
{
|
||||
let guard = pool.read().await;
|
||||
|
||||
+4
@@ -109,6 +109,7 @@ export type Folder = {
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingFollowRedirects: InheritedBoolSetting;
|
||||
settingRequestTimeout: InheritedIntSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type GraphQlIntrospection = {
|
||||
@@ -184,6 +185,7 @@ export type GrpcRequest = {
|
||||
*/
|
||||
url: string;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type HttpRequest = {
|
||||
@@ -482,6 +484,7 @@ export type WebsocketRequest = {
|
||||
settingSendCookies: InheritedBoolSetting;
|
||||
settingStoreCookies: InheritedBoolSetting;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type Workspace = {
|
||||
@@ -498,6 +501,7 @@ export type Workspace = {
|
||||
settingValidateCertificates: boolean;
|
||||
settingFollowRedirects: boolean;
|
||||
settingRequestTimeout: number;
|
||||
settingRequestMessageSize: number;
|
||||
settingDnsOverrides: Array<DnsOverride>;
|
||||
settingSendCookies: boolean;
|
||||
settingStoreCookies: boolean;
|
||||
|
||||
@@ -0,0 +1,7 @@
|
||||
ALTER TABLE workspaces ADD COLUMN setting_request_message_size INTEGER DEFAULT 67108864 NOT NULL;
|
||||
|
||||
ALTER TABLE folders ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
|
||||
|
||||
ALTER TABLE websocket_requests ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
|
||||
|
||||
ALTER TABLE grpc_requests ADD COLUMN setting_request_message_size TEXT DEFAULT '{"enabled":false,"value":67108864}' NOT NULL;
|
||||
@@ -21,6 +21,8 @@ use ts_rs::TS;
|
||||
use yaak_database::{Result as DbResult, UpdateSource};
|
||||
pub use yaak_database::{UpsertModelInfo, upsert_date};
|
||||
|
||||
pub const DEFAULT_REQUEST_MESSAGE_SIZE: i32 = 64 * 1024 * 1024;
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! impl_model {
|
||||
($t:ty, $variant:ident) => {
|
||||
@@ -120,6 +122,7 @@ pub struct ResolvedHttpRequestSettings {
|
||||
pub validate_certificates: ResolvedSetting<bool>,
|
||||
pub follow_redirects: ResolvedSetting<bool>,
|
||||
pub request_timeout: ResolvedSetting<i32>,
|
||||
pub request_message_size: ResolvedSetting<i32>,
|
||||
pub send_cookies: ResolvedSetting<bool>,
|
||||
pub store_cookies: ResolvedSetting<bool>,
|
||||
}
|
||||
@@ -130,6 +133,7 @@ impl Default for ResolvedHttpRequestSettings {
|
||||
validate_certificates: ResolvedSetting::default_source(true),
|
||||
follow_redirects: ResolvedSetting::default_source(true),
|
||||
request_timeout: ResolvedSetting::default_source(0),
|
||||
request_message_size: ResolvedSetting::default_source(DEFAULT_REQUEST_MESSAGE_SIZE),
|
||||
send_cookies: ResolvedSetting::default_source(true),
|
||||
store_cookies: ResolvedSetting::default_source(true),
|
||||
}
|
||||
@@ -400,6 +404,8 @@ pub struct Workspace {
|
||||
#[serde(default = "default_true")]
|
||||
pub setting_follow_redirects: bool,
|
||||
pub setting_request_timeout: i32,
|
||||
#[serde(default = "default_request_message_size")]
|
||||
pub setting_request_message_size: i32,
|
||||
#[serde(default)]
|
||||
pub setting_dns_overrides: Vec<DnsOverride>,
|
||||
#[serde(default = "default_true")]
|
||||
@@ -445,6 +451,7 @@ impl UpsertModelInfo for Workspace {
|
||||
(EncryptionKeyChallenge, self.encryption_key_challenge.into()),
|
||||
(SettingFollowRedirects, self.setting_follow_redirects.into()),
|
||||
(SettingRequestTimeout, self.setting_request_timeout.into()),
|
||||
(SettingRequestMessageSize, self.setting_request_message_size.into()),
|
||||
(SettingValidateCertificates, self.setting_validate_certificates.into()),
|
||||
(SettingDnsOverrides, serde_json::to_string(&self.setting_dns_overrides)?.into()),
|
||||
(SettingSendCookies, self.setting_send_cookies.into()),
|
||||
@@ -463,7 +470,7 @@ impl UpsertModelInfo for Workspace {
|
||||
WorkspaceIden::EncryptionKeyChallenge,
|
||||
WorkspaceIden::SettingRequestTimeout,
|
||||
WorkspaceIden::SettingFollowRedirects,
|
||||
WorkspaceIden::SettingRequestTimeout,
|
||||
WorkspaceIden::SettingRequestMessageSize,
|
||||
WorkspaceIden::SettingValidateCertificates,
|
||||
WorkspaceIden::SettingDnsOverrides,
|
||||
WorkspaceIden::SettingSendCookies,
|
||||
@@ -491,6 +498,7 @@ impl UpsertModelInfo for Workspace {
|
||||
authentication_type: row.get("authentication_type")?,
|
||||
setting_follow_redirects: row.get("setting_follow_redirects")?,
|
||||
setting_request_timeout: row.get("setting_request_timeout")?,
|
||||
setting_request_message_size: row.get("setting_request_message_size")?,
|
||||
setting_validate_certificates: row.get("setting_validate_certificates")?,
|
||||
setting_dns_overrides: serde_json::from_str(&setting_dns_overrides).unwrap_or_default(),
|
||||
setting_send_cookies: row.get("setting_send_cookies")?,
|
||||
@@ -962,6 +970,8 @@ pub struct Folder {
|
||||
pub setting_validate_certificates: InheritedBoolSetting,
|
||||
pub setting_follow_redirects: InheritedBoolSetting,
|
||||
pub setting_request_timeout: InheritedIntSetting,
|
||||
#[serde(default = "default_request_message_size_setting")]
|
||||
pub setting_request_message_size: InheritedIntSetting,
|
||||
}
|
||||
|
||||
impl UpsertModelInfo for Folder {
|
||||
@@ -1009,6 +1019,10 @@ impl UpsertModelInfo for Folder {
|
||||
),
|
||||
(SettingFollowRedirects, serde_json::to_string(&self.setting_follow_redirects)?.into()),
|
||||
(SettingRequestTimeout, serde_json::to_string(&self.setting_request_timeout)?.into()),
|
||||
(
|
||||
SettingRequestMessageSize,
|
||||
serde_json::to_string(&self.setting_request_message_size)?.into(),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
@@ -1027,6 +1041,7 @@ impl UpsertModelInfo for Folder {
|
||||
FolderIden::SettingValidateCertificates,
|
||||
FolderIden::SettingFollowRedirects,
|
||||
FolderIden::SettingRequestTimeout,
|
||||
FolderIden::SettingRequestMessageSize,
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1041,6 +1056,7 @@ impl UpsertModelInfo for Folder {
|
||||
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
|
||||
let setting_follow_redirects: String = row.get("setting_follow_redirects")?;
|
||||
let setting_request_timeout: String = row.get("setting_request_timeout")?;
|
||||
let setting_request_message_size: String = row.get("setting_request_message_size")?;
|
||||
Ok(Self {
|
||||
id: row.get("id")?,
|
||||
model: row.get("model")?,
|
||||
@@ -1062,6 +1078,8 @@ impl UpsertModelInfo for Folder {
|
||||
.unwrap_or_default(),
|
||||
setting_request_timeout: serde_json::from_str(&setting_request_timeout)
|
||||
.unwrap_or_default(),
|
||||
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
|
||||
.unwrap_or_else(|_| default_request_message_size_setting()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -1398,6 +1416,8 @@ pub struct WebsocketRequest {
|
||||
pub setting_send_cookies: InheritedBoolSetting,
|
||||
pub setting_store_cookies: InheritedBoolSetting,
|
||||
pub setting_validate_certificates: InheritedBoolSetting,
|
||||
#[serde(default = "default_request_message_size_setting")]
|
||||
pub setting_request_message_size: InheritedIntSetting,
|
||||
}
|
||||
|
||||
impl UpsertModelInfo for WebsocketRequest {
|
||||
@@ -1446,6 +1466,10 @@ impl UpsertModelInfo for WebsocketRequest {
|
||||
SettingValidateCertificates,
|
||||
serde_json::to_string(&self.setting_validate_certificates)?.into(),
|
||||
),
|
||||
(
|
||||
SettingRequestMessageSize,
|
||||
serde_json::to_string(&self.setting_request_message_size)?.into(),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
@@ -1466,6 +1490,7 @@ impl UpsertModelInfo for WebsocketRequest {
|
||||
WebsocketRequestIden::SettingSendCookies,
|
||||
WebsocketRequestIden::SettingStoreCookies,
|
||||
WebsocketRequestIden::SettingValidateCertificates,
|
||||
WebsocketRequestIden::SettingRequestMessageSize,
|
||||
]
|
||||
}
|
||||
|
||||
@@ -1479,6 +1504,7 @@ impl UpsertModelInfo for WebsocketRequest {
|
||||
let setting_send_cookies: String = row.get("setting_send_cookies")?;
|
||||
let setting_store_cookies: String = row.get("setting_store_cookies")?;
|
||||
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
|
||||
let setting_request_message_size: String = row.get("setting_request_message_size")?;
|
||||
Ok(Self {
|
||||
id: row.get("id")?,
|
||||
model: row.get("model")?,
|
||||
@@ -1499,6 +1525,8 @@ impl UpsertModelInfo for WebsocketRequest {
|
||||
setting_store_cookies: serde_json::from_str(&setting_store_cookies).unwrap_or_default(),
|
||||
setting_validate_certificates: serde_json::from_str(&setting_validate_certificates)
|
||||
.unwrap_or_default(),
|
||||
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
|
||||
.unwrap_or_else(|_| default_request_message_size_setting()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2039,6 +2067,8 @@ pub struct GrpcRequest {
|
||||
/// Server URL (http for plaintext or https for secure)
|
||||
pub url: String,
|
||||
pub setting_validate_certificates: InheritedBoolSetting,
|
||||
#[serde(default = "default_request_message_size_setting")]
|
||||
pub setting_request_message_size: InheritedIntSetting,
|
||||
}
|
||||
|
||||
impl UpsertModelInfo for GrpcRequest {
|
||||
@@ -2086,6 +2116,10 @@ impl UpsertModelInfo for GrpcRequest {
|
||||
SettingValidateCertificates,
|
||||
serde_json::to_string(&self.setting_validate_certificates)?.into(),
|
||||
),
|
||||
(
|
||||
SettingRequestMessageSize,
|
||||
serde_json::to_string(&self.setting_request_message_size)?.into(),
|
||||
),
|
||||
])
|
||||
}
|
||||
|
||||
@@ -2105,6 +2139,7 @@ impl UpsertModelInfo for GrpcRequest {
|
||||
GrpcRequestIden::Authentication,
|
||||
GrpcRequestIden::Metadata,
|
||||
GrpcRequestIden::SettingValidateCertificates,
|
||||
GrpcRequestIden::SettingRequestMessageSize,
|
||||
]
|
||||
}
|
||||
|
||||
@@ -2115,6 +2150,7 @@ impl UpsertModelInfo for GrpcRequest {
|
||||
let authentication: String = row.get("authentication")?;
|
||||
let metadata: String = row.get("metadata")?;
|
||||
let setting_validate_certificates: String = row.get("setting_validate_certificates")?;
|
||||
let setting_request_message_size: String = row.get("setting_request_message_size")?;
|
||||
Ok(Self {
|
||||
id: row.get("id")?,
|
||||
model: row.get("model")?,
|
||||
@@ -2134,6 +2170,8 @@ impl UpsertModelInfo for GrpcRequest {
|
||||
metadata: serde_json::from_str(metadata.as_str()).unwrap_or_default(),
|
||||
setting_validate_certificates: serde_json::from_str(&setting_validate_certificates)
|
||||
.unwrap_or_default(),
|
||||
setting_request_message_size: serde_json::from_str(&setting_request_message_size)
|
||||
.unwrap_or_else(|_| default_request_message_size_setting()),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -2684,6 +2722,14 @@ fn default_true() -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn default_request_message_size() -> i32 {
|
||||
DEFAULT_REQUEST_MESSAGE_SIZE
|
||||
}
|
||||
|
||||
fn default_request_message_size_setting() -> InheritedIntSetting {
|
||||
InheritedIntSetting { enabled: false, value: DEFAULT_REQUEST_MESSAGE_SIZE }
|
||||
}
|
||||
|
||||
fn default_http_method() -> String {
|
||||
"GET".to_string()
|
||||
}
|
||||
|
||||
@@ -180,6 +180,14 @@ impl<'a> ClientDb<'a> {
|
||||
} else {
|
||||
parent.request_timeout
|
||||
},
|
||||
request_message_size: if folder.setting_request_message_size.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
folder.setting_request_message_size.value,
|
||||
AnyModel::Folder(folder.clone()),
|
||||
)
|
||||
} else {
|
||||
parent.request_message_size
|
||||
},
|
||||
send_cookies: if folder.setting_send_cookies.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
folder.setting_send_cookies.value,
|
||||
|
||||
@@ -129,6 +129,14 @@ impl<'a> ClientDb<'a> {
|
||||
} else {
|
||||
parent.validate_certificates
|
||||
},
|
||||
request_message_size: if grpc_request.setting_request_message_size.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
grpc_request.setting_request_message_size.value,
|
||||
AnyModel::GrpcRequest(grpc_request.clone()),
|
||||
)
|
||||
} else {
|
||||
parent.request_message_size
|
||||
},
|
||||
..parent
|
||||
})
|
||||
}
|
||||
|
||||
@@ -131,6 +131,7 @@ impl<'a> ClientDb<'a> {
|
||||
} else {
|
||||
parent.request_timeout
|
||||
},
|
||||
request_message_size: parent.request_message_size,
|
||||
send_cookies: if http_request.setting_send_cookies.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
http_request.setting_send_cookies.value,
|
||||
|
||||
@@ -139,6 +139,14 @@ impl<'a> ClientDb<'a> {
|
||||
} else {
|
||||
parent.validate_certificates
|
||||
},
|
||||
request_message_size: if websocket_request.setting_request_message_size.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
websocket_request.setting_request_message_size.value,
|
||||
AnyModel::WebsocketRequest(websocket_request.clone()),
|
||||
)
|
||||
} else {
|
||||
parent.request_message_size
|
||||
},
|
||||
send_cookies: if websocket_request.setting_send_cookies.enabled {
|
||||
ResolvedSetting::from_model(
|
||||
websocket_request.setting_send_cookies.value,
|
||||
|
||||
@@ -21,6 +21,7 @@ impl<'a> ClientDb<'a> {
|
||||
&Workspace {
|
||||
name: "Yaak".to_string(),
|
||||
setting_follow_redirects: true,
|
||||
setting_request_message_size: crate::models::DEFAULT_REQUEST_MESSAGE_SIZE,
|
||||
setting_validate_certificates: true,
|
||||
..Default::default()
|
||||
},
|
||||
@@ -102,6 +103,10 @@ impl<'a> ClientDb<'a> {
|
||||
workspace.setting_request_timeout,
|
||||
AnyModel::Workspace(workspace.clone()),
|
||||
),
|
||||
request_message_size: ResolvedSetting::from_model(
|
||||
workspace.setting_request_message_size,
|
||||
AnyModel::Workspace(workspace.clone()),
|
||||
),
|
||||
send_cookies: ResolvedSetting::from_model(
|
||||
workspace.setting_send_cookies,
|
||||
AnyModel::Workspace(workspace.clone()),
|
||||
|
||||
+4
@@ -108,6 +108,7 @@ export type Folder = {
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingFollowRedirects: InheritedBoolSetting;
|
||||
settingRequestTimeout: InheritedIntSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type GraphQlIntrospection = {
|
||||
@@ -183,6 +184,7 @@ export type GrpcRequest = {
|
||||
*/
|
||||
url: string;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type HttpRequest = {
|
||||
@@ -450,6 +452,7 @@ export type WebsocketRequest = {
|
||||
settingSendCookies: InheritedBoolSetting;
|
||||
settingStoreCookies: InheritedBoolSetting;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type Workspace = {
|
||||
@@ -466,6 +469,7 @@ export type Workspace = {
|
||||
settingValidateCertificates: boolean;
|
||||
settingFollowRedirects: boolean;
|
||||
settingRequestTimeout: number;
|
||||
settingRequestMessageSize: number;
|
||||
settingDnsOverrides: Array<DnsOverride>;
|
||||
settingSendCookies: boolean;
|
||||
settingStoreCookies: boolean;
|
||||
|
||||
Generated
+4
@@ -46,6 +46,7 @@ export type Folder = {
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingFollowRedirects: InheritedBoolSetting;
|
||||
settingRequestTimeout: InheritedIntSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type GrpcRequest = {
|
||||
@@ -69,6 +70,7 @@ export type GrpcRequest = {
|
||||
*/
|
||||
url: string;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type HttpRequest = {
|
||||
@@ -159,6 +161,7 @@ export type WebsocketRequest = {
|
||||
settingSendCookies: InheritedBoolSetting;
|
||||
settingStoreCookies: InheritedBoolSetting;
|
||||
settingValidateCertificates: InheritedBoolSetting;
|
||||
settingRequestMessageSize: InheritedIntSetting;
|
||||
};
|
||||
|
||||
export type Workspace = {
|
||||
@@ -175,6 +178,7 @@ export type Workspace = {
|
||||
settingValidateCertificates: boolean;
|
||||
settingFollowRedirects: boolean;
|
||||
settingRequestTimeout: number;
|
||||
settingRequestMessageSize: number;
|
||||
settingDnsOverrides: Array<DnsOverride>;
|
||||
settingSendCookies: boolean;
|
||||
settingStoreCookies: boolean;
|
||||
|
||||
@@ -20,6 +20,7 @@ pub async fn ws_connect(
|
||||
headers: HeaderMap<HeaderValue>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
request_message_size: i32,
|
||||
) -> Result<(WebSocketStream<MaybeTlsStream<TcpStream>>, Response)> {
|
||||
info!("Connecting to WS {url}");
|
||||
let tls_config = get_tls_config(validate_certificates, WITH_ALPN, client_cert.clone())?;
|
||||
@@ -34,7 +35,7 @@ pub async fn ws_connect(
|
||||
|
||||
let (stream, response) = connect_async_tls_with_config(
|
||||
req,
|
||||
Some(WebSocketConfig::default()),
|
||||
Some(websocket_config(request_message_size)),
|
||||
false,
|
||||
Some(Connector::Rustls(Arc::new(tls_config))),
|
||||
)
|
||||
@@ -48,3 +49,12 @@ pub async fn ws_connect(
|
||||
|
||||
Ok((stream, response))
|
||||
}
|
||||
|
||||
fn websocket_config(request_message_size: i32) -> WebSocketConfig {
|
||||
let max_message_size = message_size_limit(request_message_size);
|
||||
WebSocketConfig::default().max_message_size(max_message_size).max_frame_size(max_message_size)
|
||||
}
|
||||
|
||||
pub(crate) fn message_size_limit(setting: i32) -> Option<usize> {
|
||||
setting.try_into().ok().filter(|limit| *limit > 0)
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::connect::ws_connect;
|
||||
use crate::connect::{message_size_limit, ws_connect};
|
||||
use crate::error::Error::GenericError;
|
||||
use crate::error::Result;
|
||||
use futures_util::stream::SplitSink;
|
||||
use futures_util::{SinkExt, StreamExt};
|
||||
@@ -15,10 +16,16 @@ use tokio_tungstenite::tungstenite::http::HeaderValue;
|
||||
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
|
||||
use yaak_tls::ClientCertificateConfig;
|
||||
|
||||
type WebsocketSink = SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
|
||||
|
||||
struct WebsocketConnection {
|
||||
max_message_size: Option<usize>,
|
||||
sink: WebsocketSink,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct WebsocketManager {
|
||||
connections:
|
||||
Arc<Mutex<HashMap<String, SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>,
|
||||
connections: Arc<Mutex<HashMap<String, WebsocketConnection>>>,
|
||||
read_tasks: Arc<Mutex<HashMap<String, tokio::task::JoinHandle<()>>>>,
|
||||
}
|
||||
|
||||
@@ -35,14 +42,20 @@ impl WebsocketManager {
|
||||
receive_tx: mpsc::Sender<Message>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
request_message_size: i32,
|
||||
) -> Result<Response> {
|
||||
let tx = receive_tx.clone();
|
||||
let max_message_size = message_size_limit(request_message_size);
|
||||
|
||||
let (stream, response) =
|
||||
ws_connect(url, headers, validate_certificates, client_cert).await?;
|
||||
ws_connect(url, headers, validate_certificates, client_cert, request_message_size)
|
||||
.await?;
|
||||
let (write, mut read) = stream.split();
|
||||
|
||||
self.connections.lock().await.insert(id.to_string(), write);
|
||||
self.connections
|
||||
.lock()
|
||||
.await
|
||||
.insert(id.to_string(), WebsocketConnection { max_message_size, sink: write });
|
||||
|
||||
let handle = {
|
||||
let connection_id = id.to_string();
|
||||
@@ -76,7 +89,15 @@ impl WebsocketManager {
|
||||
None => return Ok(()),
|
||||
Some(c) => c,
|
||||
};
|
||||
connection.send(msg).await?;
|
||||
if let Some(limit) = connection.max_message_size {
|
||||
let message_size = msg.len();
|
||||
if message_size > limit {
|
||||
return Err(GenericError(format!(
|
||||
"WebSocket message too large: found {message_size} bytes, the limit is {limit} bytes"
|
||||
)));
|
||||
}
|
||||
}
|
||||
connection.sink.send(msg).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -84,7 +105,7 @@ impl WebsocketManager {
|
||||
info!("Closing websocket");
|
||||
if let Some(mut connection) = self.connections.lock().await.remove(id) {
|
||||
// Wait a maximum of 1 second for the connection to close
|
||||
if let Err(e) = connection.close().await {
|
||||
if let Err(e) = connection.sink.close().await {
|
||||
warn!("Failed to close websocket connection {e:?}");
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user