mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-04-22 08:38:29 +02:00
Support client certificates (#319)
This commit is contained in:
@@ -1,3 +1,5 @@
|
||||
use crate::error::Error::GenericError;
|
||||
use crate::error::Result;
|
||||
use crate::manager::decorate_req;
|
||||
use crate::transport::get_transport;
|
||||
use async_recursion::async_recursion;
|
||||
@@ -18,6 +20,7 @@ use tonic_reflection::pb::v1::{
|
||||
};
|
||||
use tonic_reflection::pb::v1::{ExtensionRequest, FileDescriptorResponse};
|
||||
use tonic_reflection::pb::{v1, v1alpha};
|
||||
use yaak_tls::ClientCertificateConfig;
|
||||
|
||||
pub struct AutoReflectionClient<T = Client<HttpsConnector<HttpConnector>, BoxBody>> {
|
||||
use_v1alpha: bool,
|
||||
@@ -26,20 +29,24 @@ pub struct AutoReflectionClient<T = Client<HttpsConnector<HttpConnector>, BoxBod
|
||||
}
|
||||
|
||||
impl AutoReflectionClient {
|
||||
pub fn new(uri: &Uri, validate_certificates: bool) -> Self {
|
||||
pub fn new(
|
||||
uri: &Uri,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Self> {
|
||||
let client_v1 = v1::server_reflection_client::ServerReflectionClient::with_origin(
|
||||
get_transport(validate_certificates),
|
||||
get_transport(validate_certificates, client_cert.clone())?,
|
||||
uri.clone(),
|
||||
);
|
||||
let client_v1alpha = v1alpha::server_reflection_client::ServerReflectionClient::with_origin(
|
||||
get_transport(validate_certificates),
|
||||
get_transport(validate_certificates, client_cert.clone())?,
|
||||
uri.clone(),
|
||||
);
|
||||
AutoReflectionClient {
|
||||
Ok(AutoReflectionClient {
|
||||
use_v1alpha: false,
|
||||
client_v1,
|
||||
client_v1alpha,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
#[async_recursion]
|
||||
@@ -47,36 +54,40 @@ impl AutoReflectionClient {
|
||||
&mut self,
|
||||
message: MessageRequest,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<MessageResponse, String> {
|
||||
) -> Result<MessageResponse> {
|
||||
let reflection_request = ServerReflectionRequest {
|
||||
host: "".into(), // Doesn't matter
|
||||
message_request: Some(message.clone()),
|
||||
};
|
||||
|
||||
if self.use_v1alpha {
|
||||
let mut request = Request::new(tokio_stream::once(to_v1alpha_request(reflection_request)));
|
||||
decorate_req(metadata, &mut request).map_err(|e| e.to_string())?;
|
||||
let mut request =
|
||||
Request::new(tokio_stream::once(to_v1alpha_request(reflection_request)));
|
||||
decorate_req(metadata, &mut request)?;
|
||||
|
||||
self.client_v1alpha
|
||||
.server_reflection_info(request)
|
||||
.await
|
||||
.map_err(|e| match e.code() {
|
||||
tonic::Code::Unavailable => "Failed to connect to endpoint".to_string(),
|
||||
tonic::Code::Unauthenticated => "Authentication failed".to_string(),
|
||||
tonic::Code::DeadlineExceeded => "Deadline exceeded".to_string(),
|
||||
_ => e.to_string(),
|
||||
tonic::Code::Unavailable => {
|
||||
GenericError("Failed to connect to endpoint".to_string())
|
||||
}
|
||||
tonic::Code::Unauthenticated => {
|
||||
GenericError("Authentication failed".to_string())
|
||||
}
|
||||
tonic::Code::DeadlineExceeded => GenericError("Deadline exceeded".to_string()),
|
||||
_ => GenericError(e.to_string()),
|
||||
})?
|
||||
.into_inner()
|
||||
.next()
|
||||
.await
|
||||
.expect("steamed response")
|
||||
.map_err(|e| e.to_string())?
|
||||
.ok_or(GenericError("Missing reflection message".to_string()))??
|
||||
.message_response
|
||||
.ok_or("No reflection response".to_string())
|
||||
.ok_or(GenericError("No reflection response".to_string()))
|
||||
.map(|resp| to_v1_msg_response(resp))
|
||||
} else {
|
||||
let mut request = Request::new(tokio_stream::once(reflection_request));
|
||||
decorate_req(metadata, &mut request).map_err(|e| e.to_string())?;
|
||||
decorate_req(metadata, &mut request)?;
|
||||
|
||||
let resp = self.client_v1.server_reflection_info(request).await;
|
||||
match resp {
|
||||
@@ -92,18 +103,19 @@ impl AutoReflectionClient {
|
||||
},
|
||||
}
|
||||
.map_err(|e| match e.code() {
|
||||
tonic::Code::Unavailable => "Failed to connect to endpoint".to_string(),
|
||||
tonic::Code::Unauthenticated => "Authentication failed".to_string(),
|
||||
tonic::Code::DeadlineExceeded => "Deadline exceeded".to_string(),
|
||||
_ => e.to_string(),
|
||||
tonic::Code::Unavailable => {
|
||||
GenericError("Failed to connect to endpoint".to_string())
|
||||
}
|
||||
tonic::Code::Unauthenticated => GenericError("Authentication failed".to_string()),
|
||||
tonic::Code::DeadlineExceeded => GenericError("Deadline exceeded".to_string()),
|
||||
_ => GenericError(e.to_string()),
|
||||
})?
|
||||
.into_inner()
|
||||
.next()
|
||||
.await
|
||||
.expect("steamed response")
|
||||
.map_err(|e| e.to_string())?
|
||||
.ok_or(GenericError("Missing reflection message".to_string()))??
|
||||
.message_response
|
||||
.ok_or("No reflection response".to_string())
|
||||
.ok_or(GenericError("No reflection response".to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
51
src-tauri/yaak-grpc/src/error.rs
Normal file
51
src-tauri/yaak-grpc/src/error.rs
Normal file
@@ -0,0 +1,51 @@
|
||||
use crate::manager::GrpcStreamError;
|
||||
use serde::{Serialize, Serializer};
|
||||
use serde_json::Error as SerdeJsonError;
|
||||
use std::io;
|
||||
use prost::DecodeError;
|
||||
use thiserror::Error;
|
||||
use tonic::Status;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error(transparent)]
|
||||
TlsError(#[from] yaak_tls::error::Error),
|
||||
|
||||
#[error(transparent)]
|
||||
TonicError(#[from] Status),
|
||||
|
||||
#[error("Prost reflect error: {0:?}")]
|
||||
ProstReflectError(#[from] prost_reflect::DescriptorError),
|
||||
|
||||
#[error(transparent)]
|
||||
DeserializerError(#[from] SerdeJsonError),
|
||||
|
||||
#[error(transparent)]
|
||||
GrpcStreamError(#[from] GrpcStreamError),
|
||||
|
||||
#[error(transparent)]
|
||||
GrpcDecodeError(#[from] DecodeError),
|
||||
|
||||
#[error(transparent)]
|
||||
GrpcInvalidMetadataKeyError(#[from] tonic::metadata::errors::InvalidMetadataKey),
|
||||
|
||||
#[error(transparent)]
|
||||
GrpcInvalidMetadataValueError(#[from] tonic::metadata::errors::InvalidMetadataValue),
|
||||
|
||||
#[error(transparent)]
|
||||
IOError(#[from] io::Error),
|
||||
|
||||
#[error("GRPC error: {0}")]
|
||||
GenericError(String),
|
||||
}
|
||||
|
||||
impl Serialize for Error {
|
||||
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
serializer.serialize_str(self.to_string().as_ref())
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
@@ -9,6 +9,7 @@ pub mod manager;
|
||||
mod reflection;
|
||||
mod transport;
|
||||
mod any;
|
||||
pub mod error;
|
||||
|
||||
pub use tonic::metadata::*;
|
||||
pub use tonic::Code;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
use crate::codec::DynamicCodec;
|
||||
use crate::error::Error::GenericError;
|
||||
use crate::error::Result;
|
||||
use crate::reflection::{
|
||||
fill_pool_from_files, fill_pool_from_reflection, method_desc_to_path, reflect_types_for_message,
|
||||
};
|
||||
@@ -12,6 +14,9 @@ pub use prost_reflect::DynamicMessage;
|
||||
use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor};
|
||||
use serde_json::Deserializer;
|
||||
use std::collections::BTreeMap;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::fmt::Display;
|
||||
use std::path::PathBuf;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
@@ -23,6 +28,7 @@ use tonic::body::BoxBody;
|
||||
use tonic::metadata::{MetadataKey, MetadataValue};
|
||||
use tonic::transport::Uri;
|
||||
use tonic::{IntoRequest, IntoStreamingRequest, Request, Response, Status, Streaming};
|
||||
use yaak_tls::ClientCertificateConfig;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcConnection {
|
||||
@@ -33,23 +39,34 @@ pub struct GrpcConnection {
|
||||
}
|
||||
|
||||
#[derive(Default, Debug)]
|
||||
pub struct StreamError {
|
||||
pub struct GrpcStreamError {
|
||||
pub message: String,
|
||||
pub status: Option<Status>,
|
||||
}
|
||||
|
||||
impl From<String> for StreamError {
|
||||
impl Error for GrpcStreamError {}
|
||||
|
||||
impl Display for GrpcStreamError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match &self.status {
|
||||
Some(status) => write!(f, "[{}] {}", status, self.message),
|
||||
None => write!(f, "{}", self.message),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for GrpcStreamError {
|
||||
fn from(value: String) -> Self {
|
||||
StreamError {
|
||||
GrpcStreamError {
|
||||
message: value.to_string(),
|
||||
status: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<Status> for StreamError {
|
||||
impl From<Status> for GrpcStreamError {
|
||||
fn from(s: Status) -> Self {
|
||||
StreamError {
|
||||
GrpcStreamError {
|
||||
message: s.message().to_string(),
|
||||
status: Some(s),
|
||||
}
|
||||
@@ -57,16 +74,20 @@ impl From<Status> for StreamError {
|
||||
}
|
||||
|
||||
impl GrpcConnection {
|
||||
pub async fn method(&self, service: &str, method: &str) -> Result<MethodDescriptor, String> {
|
||||
pub async fn method(&self, service: &str, method: &str) -> Result<MethodDescriptor> {
|
||||
let service = self.service(service).await?;
|
||||
let method =
|
||||
service.methods().find(|m| m.name() == method).ok_or("Failed to find method")?;
|
||||
let method = service
|
||||
.methods()
|
||||
.find(|m| m.name() == method)
|
||||
.ok_or(GenericError("Failed to find method".to_string()))?;
|
||||
Ok(method)
|
||||
}
|
||||
|
||||
async fn service(&self, service: &str) -> Result<ServiceDescriptor, String> {
|
||||
async fn service(&self, service: &str) -> Result<ServiceDescriptor> {
|
||||
let pool = self.pool.read().await;
|
||||
let service = pool.get_service_by_name(service).ok_or("Failed to find service")?;
|
||||
let service = pool
|
||||
.get_service_by_name(service)
|
||||
.ok_or(GenericError("Failed to find service".to_string()))?;
|
||||
Ok(service)
|
||||
}
|
||||
|
||||
@@ -76,26 +97,27 @@ impl GrpcConnection {
|
||||
method: &str,
|
||||
message: &str,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<Response<DynamicMessage>, StreamError> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Response<DynamicMessage>> {
|
||||
if self.use_reflection {
|
||||
reflect_types_for_message(self.pool.clone(), &self.uri, message, metadata).await?;
|
||||
reflect_types_for_message(self.pool.clone(), &self.uri, message, metadata, client_cert)
|
||||
.await?;
|
||||
}
|
||||
let method = &self.method(&service, &method).await?;
|
||||
let input_message = method.input();
|
||||
|
||||
let mut deserializer = Deserializer::from_str(message);
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)
|
||||
.map_err(|e| e.to_string())?;
|
||||
deserializer.end().unwrap();
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
|
||||
deserializer.end()?;
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let mut req = req_message.into_request();
|
||||
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
|
||||
decorate_req(metadata, &mut req)?;
|
||||
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.unwrap();
|
||||
client.ready().await.map_err(|e| GenericError(format!("Failed to connect: {}", e)))?;
|
||||
|
||||
Ok(client.unary(req, path, codec).await?)
|
||||
}
|
||||
@@ -106,7 +128,8 @@ impl GrpcConnection {
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<Response<Streaming<DynamicMessage>>, StreamError> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Response<Streaming<DynamicMessage>>> {
|
||||
let method = &self.method(&service, &method).await?;
|
||||
let mapped_stream = {
|
||||
let input_message = method.input();
|
||||
@@ -114,15 +137,19 @@ impl GrpcConnection {
|
||||
let uri = self.uri.clone();
|
||||
let md = metadata.clone();
|
||||
let use_reflection = self.use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
stream.filter_map(move |json| {
|
||||
let pool = pool.clone();
|
||||
let uri = uri.clone();
|
||||
let input_message = input_message.clone();
|
||||
let md = md.clone();
|
||||
let use_reflection = use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
tauri::async_runtime::block_on(async move {
|
||||
if use_reflection {
|
||||
if let Err(e) = reflect_types_for_message(pool, &uri, &json, &md).await {
|
||||
if let Err(e) =
|
||||
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
|
||||
{
|
||||
warn!("Failed to resolve Any types: {e}");
|
||||
}
|
||||
}
|
||||
@@ -143,9 +170,9 @@ impl GrpcConnection {
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
|
||||
let mut req = mapped_stream.into_streaming_request();
|
||||
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
|
||||
decorate_req(metadata, &mut req)?;
|
||||
|
||||
client.ready().await.map_err(|e| e.to_string())?;
|
||||
client.ready().await.map_err(|e| GenericError(format!("Failed to connect: {}", e)))?;
|
||||
Ok(client.streaming(req, path, codec).await?)
|
||||
}
|
||||
|
||||
@@ -155,7 +182,8 @@ impl GrpcConnection {
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<Response<DynamicMessage>, StreamError> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Response<DynamicMessage>> {
|
||||
let method = &self.method(&service, &method).await?;
|
||||
let mapped_stream = {
|
||||
let input_message = method.input();
|
||||
@@ -163,15 +191,19 @@ impl GrpcConnection {
|
||||
let uri = self.uri.clone();
|
||||
let md = metadata.clone();
|
||||
let use_reflection = self.use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
stream.filter_map(move |json| {
|
||||
let pool = pool.clone();
|
||||
let uri = uri.clone();
|
||||
let input_message = input_message.clone();
|
||||
let md = md.clone();
|
||||
let use_reflection = use_reflection.clone();
|
||||
let client_cert = client_cert.clone();
|
||||
tauri::async_runtime::block_on(async move {
|
||||
if use_reflection {
|
||||
if let Err(e) = reflect_types_for_message(pool, &uri, &json, &md).await {
|
||||
if let Err(e) =
|
||||
reflect_types_for_message(pool, &uri, &json, &md, client_cert).await
|
||||
{
|
||||
warn!("Failed to resolve Any types: {e}");
|
||||
}
|
||||
}
|
||||
@@ -192,13 +224,13 @@ impl GrpcConnection {
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
|
||||
let mut req = mapped_stream.into_streaming_request();
|
||||
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
|
||||
decorate_req(metadata, &mut req)?;
|
||||
|
||||
client.ready().await.unwrap();
|
||||
client.client_streaming(req, path, codec).await.map_err(|e| StreamError {
|
||||
client.ready().await.map_err(|e| GenericError(format!("Failed to connect: {}", e)))?;
|
||||
Ok(client.client_streaming(req, path, codec).await.map_err(|e| GrpcStreamError {
|
||||
message: e.message().to_string(),
|
||||
status: Some(e),
|
||||
})
|
||||
})?)
|
||||
}
|
||||
|
||||
pub async fn server_streaming(
|
||||
@@ -207,23 +239,22 @@ impl GrpcConnection {
|
||||
method: &str,
|
||||
message: &str,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<Response<Streaming<DynamicMessage>>, StreamError> {
|
||||
) -> Result<Response<Streaming<DynamicMessage>>> {
|
||||
let method = &self.method(&service, &method).await?;
|
||||
let input_message = method.input();
|
||||
|
||||
let mut deserializer = Deserializer::from_str(message);
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)
|
||||
.map_err(|e| e.to_string())?;
|
||||
deserializer.end().unwrap();
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)?;
|
||||
deserializer.end()?;
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let mut req = req_message.into_request();
|
||||
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
|
||||
decorate_req(metadata, &mut req)?;
|
||||
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.map_err(|e| e.to_string())?;
|
||||
client.ready().await.map_err(|e| GenericError(format!("Failed to connect: {}", e)))?;
|
||||
Ok(client.server_streaming(req, path, codec).await?)
|
||||
}
|
||||
}
|
||||
@@ -257,7 +288,8 @@ impl GrpcHandle {
|
||||
proto_files: &Vec<PathBuf>,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
) -> Result<bool, String> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<bool> {
|
||||
let server_reflection = proto_files.is_empty();
|
||||
let key = make_pool_key(id, uri, proto_files);
|
||||
|
||||
@@ -268,7 +300,7 @@ impl GrpcHandle {
|
||||
|
||||
let pool = if server_reflection {
|
||||
let full_uri = uri_from_str(uri)?;
|
||||
fill_pool_from_reflection(&full_uri, metadata, validate_certificates).await
|
||||
fill_pool_from_reflection(&full_uri, metadata, validate_certificates, client_cert).await
|
||||
} else {
|
||||
fill_pool_from_files(&self.app_handle, proto_files).await
|
||||
}?;
|
||||
@@ -284,15 +316,19 @@ impl GrpcHandle {
|
||||
proto_files: &Vec<PathBuf>,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
skip_cache: bool,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
) -> Result<Vec<ServiceDefinition>> {
|
||||
// Ensure we have a pool; reflect only if missing
|
||||
if skip_cache || self.get_pool(id, uri, proto_files).is_none() {
|
||||
info!("Reflecting gRPC services for {} at {}", id, uri);
|
||||
self.reflect(id, uri, proto_files, metadata, validate_certificates).await?;
|
||||
self.reflect(id, uri, proto_files, metadata, validate_certificates, client_cert)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool".to_string())?;
|
||||
let pool = self
|
||||
.get_pool(id, uri, proto_files)
|
||||
.ok_or(GenericError("Failed to get pool".to_string()))?;
|
||||
Ok(self.services_from_pool(&pool))
|
||||
}
|
||||
|
||||
@@ -313,7 +349,7 @@ impl GrpcHandle {
|
||||
&pool,
|
||||
input_message,
|
||||
))
|
||||
.unwrap(),
|
||||
.expect("Failed to serialize JSON schema"),
|
||||
})
|
||||
}
|
||||
def
|
||||
@@ -328,14 +364,26 @@ impl GrpcHandle {
|
||||
proto_files: &Vec<PathBuf>,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
) -> Result<GrpcConnection, String> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<GrpcConnection> {
|
||||
let use_reflection = proto_files.is_empty();
|
||||
if self.get_pool(id, uri, proto_files).is_none() {
|
||||
self.reflect(id, uri, proto_files, metadata, validate_certificates).await?;
|
||||
self.reflect(
|
||||
id,
|
||||
uri,
|
||||
proto_files,
|
||||
metadata,
|
||||
validate_certificates,
|
||||
client_cert.clone(),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool")?.clone();
|
||||
let pool = self
|
||||
.get_pool(id, uri, proto_files)
|
||||
.ok_or(GenericError("Failed to get pool".to_string()))?
|
||||
.clone();
|
||||
let uri = uri_from_str(uri)?;
|
||||
let conn = get_transport(validate_certificates);
|
||||
let conn = get_transport(validate_certificates, client_cert.clone())?;
|
||||
Ok(GrpcConnection {
|
||||
pool: Arc::new(RwLock::new(pool)),
|
||||
use_reflection,
|
||||
@@ -352,22 +400,20 @@ impl GrpcHandle {
|
||||
pub(crate) fn decorate_req<T>(
|
||||
metadata: &BTreeMap<String, String>,
|
||||
req: &mut Request<T>,
|
||||
) -> Result<(), String> {
|
||||
) -> Result<()> {
|
||||
for (k, v) in metadata {
|
||||
req.metadata_mut().insert(
|
||||
MetadataKey::from_str(k.as_str()).map_err(|e| e.to_string())?,
|
||||
MetadataValue::from_str(v.as_str()).map_err(|e| e.to_string())?,
|
||||
);
|
||||
req.metadata_mut()
|
||||
.insert(MetadataKey::from_str(k.as_str())?, MetadataValue::from_str(v.as_str())?);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn uri_from_str(uri_str: &str) -> Result<Uri, String> {
|
||||
fn uri_from_str(uri_str: &str) -> Result<Uri> {
|
||||
match Uri::from_str(uri_str) {
|
||||
Ok(uri) => Ok(uri),
|
||||
Err(err) => {
|
||||
// Uri::from_str basically only returns "invalid format" so we add more context here
|
||||
Err(format!("Failed to parse URL, {}", err.to_string()))
|
||||
Err(GenericError(format!("Failed to parse URL, {}", err.to_string())))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use crate::any::collect_any_types;
|
||||
use crate::client::AutoReflectionClient;
|
||||
use crate::error::Error::GenericError;
|
||||
use crate::error::Result;
|
||||
use anyhow::anyhow;
|
||||
use async_recursion::async_recursion;
|
||||
use log::{debug, info, warn};
|
||||
@@ -21,11 +23,12 @@ use tonic::codegen::http::uri::PathAndQuery;
|
||||
use tonic::transport::Uri;
|
||||
use tonic_reflection::pb::v1::server_reflection_request::MessageRequest;
|
||||
use tonic_reflection::pb::v1::server_reflection_response::MessageResponse;
|
||||
use yaak_tls::ClientCertificateConfig;
|
||||
|
||||
pub async fn fill_pool_from_files(
|
||||
app_handle: &AppHandle,
|
||||
paths: &Vec<PathBuf>,
|
||||
) -> Result<DescriptorPool, String> {
|
||||
) -> Result<DescriptorPool> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let random_file_name = format!("{}.desc", uuid::Uuid::new_v4());
|
||||
let desc_path = temp_dir().join(random_file_name);
|
||||
@@ -103,18 +106,18 @@ pub async fn fill_pool_from_files(
|
||||
.expect("yaakprotoc failed to run");
|
||||
|
||||
if !out.status.success() {
|
||||
return Err(format!(
|
||||
return Err(GenericError(format!(
|
||||
"protoc failed with status {}: {}",
|
||||
out.status.code().unwrap(),
|
||||
String::from_utf8_lossy(out.stderr.as_slice())
|
||||
));
|
||||
)));
|
||||
}
|
||||
|
||||
let bytes = fs::read(desc_path).await.map_err(|e| e.to_string())?;
|
||||
let fdp = FileDescriptorSet::decode(bytes.deref()).map_err(|e| e.to_string())?;
|
||||
pool.add_file_descriptor_set(fdp).map_err(|e| e.to_string())?;
|
||||
let bytes = fs::read(desc_path).await?;
|
||||
let fdp = FileDescriptorSet::decode(bytes.deref())?;
|
||||
pool.add_file_descriptor_set(fdp)?;
|
||||
|
||||
fs::remove_file(desc_path).await.map_err(|e| e.to_string())?;
|
||||
fs::remove_file(desc_path).await?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
@@ -123,9 +126,10 @@ pub async fn fill_pool_from_reflection(
|
||||
uri: &Uri,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
validate_certificates: bool,
|
||||
) -> Result<DescriptorPool, String> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<DescriptorPool> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let mut client = AutoReflectionClient::new(uri, validate_certificates);
|
||||
let mut client = AutoReflectionClient::new(uri, validate_certificates, client_cert)?;
|
||||
|
||||
for service in list_services(&mut client, metadata).await? {
|
||||
if service == "grpc.reflection.v1alpha.ServerReflection" {
|
||||
@@ -144,7 +148,7 @@ pub async fn fill_pool_from_reflection(
|
||||
async fn list_services(
|
||||
client: &mut AutoReflectionClient,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<Vec<String>, String> {
|
||||
) -> Result<Vec<String>> {
|
||||
let response =
|
||||
client.send_reflection_request(MessageRequest::ListServices("".into()), metadata).await?;
|
||||
|
||||
@@ -171,7 +175,7 @@ async fn file_descriptor_set_from_service_name(
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
warn!("Error fetching file descriptor for service {}: {}", service_name, e);
|
||||
warn!("Error fetching file descriptor for service {}: {:?}", service_name, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
@@ -195,7 +199,8 @@ pub(crate) async fn reflect_types_for_message(
|
||||
uri: &Uri,
|
||||
json: &str,
|
||||
metadata: &BTreeMap<String, String>,
|
||||
) -> Result<(), String> {
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<()> {
|
||||
// 1. Collect all Any types in the JSON
|
||||
let mut extra_types = Vec::new();
|
||||
collect_any_types(json, &mut extra_types);
|
||||
@@ -204,7 +209,7 @@ pub(crate) async fn reflect_types_for_message(
|
||||
return Ok(()); // nothing to do
|
||||
}
|
||||
|
||||
let mut client = AutoReflectionClient::new(uri, false);
|
||||
let mut client = AutoReflectionClient::new(uri, false, client_cert)?;
|
||||
for extra_type in extra_types {
|
||||
{
|
||||
let guard = pool.read().await;
|
||||
@@ -217,9 +222,9 @@ pub(crate) async fn reflect_types_for_message(
|
||||
let resp = match client.send_reflection_request(req, metadata).await {
|
||||
Ok(r) => r,
|
||||
Err(e) => {
|
||||
return Err(format!(
|
||||
"Error sending reflection request for @type \"{extra_type}\": {e}",
|
||||
));
|
||||
return Err(GenericError(format!(
|
||||
"Error sending reflection request for @type \"{extra_type}\": {e:?}",
|
||||
)));
|
||||
}
|
||||
};
|
||||
let files = match resp {
|
||||
@@ -286,7 +291,7 @@ async fn file_descriptor_set_by_filename(
|
||||
panic!("Expected a FileDescriptorResponse variant")
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error fetching file descriptor for {}: {}", filename, e);
|
||||
warn!("Error fetching file descriptor for {}: {:?}", filename, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,25 +1,41 @@
|
||||
use crate::error::Result;
|
||||
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
|
||||
use hyper_util::client::legacy::connect::HttpConnector;
|
||||
use hyper_util::client::legacy::Client;
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use log::info;
|
||||
use tonic::body::BoxBody;
|
||||
use yaak_tls::{get_tls_config, ClientCertificateConfig};
|
||||
|
||||
// I think ALPN breaks this because we're specifying http2_only
|
||||
const WITH_ALPN: bool = false;
|
||||
|
||||
pub(crate) fn get_transport(validate_certificates: bool) -> Client<HttpsConnector<HttpConnector>, BoxBody> {
|
||||
let tls_config = yaak_http::tls::get_config(validate_certificates, WITH_ALPN);
|
||||
pub(crate) fn get_transport(
|
||||
validate_certificates: bool,
|
||||
client_cert: Option<ClientCertificateConfig>,
|
||||
) -> Result<Client<HttpsConnector<HttpConnector>, BoxBody>> {
|
||||
let tls_config =
|
||||
get_tls_config(validate_certificates, WITH_ALPN, client_cert.clone())?;
|
||||
|
||||
let mut http = HttpConnector::new();
|
||||
http.enforce_http(false);
|
||||
|
||||
let connector =
|
||||
HttpsConnectorBuilder::new().with_tls_config(tls_config).https_or_http().enable_http2().build();
|
||||
let connector = HttpsConnectorBuilder::new()
|
||||
.with_tls_config(tls_config)
|
||||
.https_or_http()
|
||||
.enable_http2()
|
||||
.build();
|
||||
|
||||
let client = Client::builder(TokioExecutor::new())
|
||||
.pool_max_idle_per_host(0)
|
||||
.http2_only(true)
|
||||
.build(connector);
|
||||
|
||||
client
|
||||
info!(
|
||||
"Created gRPC client validate_certs={} client_cert={}",
|
||||
validate_certificates,
|
||||
client_cert.is_some()
|
||||
);
|
||||
|
||||
Ok(client)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user