mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-28 12:11:53 +01:00
Refactor gRPC reflection!
This commit is contained in:
@@ -16,7 +16,9 @@ use tonic::transport::Uri;
|
||||
use tonic::{IntoRequest, IntoStreamingRequest, Request, Response, Status, Streaming};
|
||||
|
||||
use crate::codec::DynamicCodec;
|
||||
use crate::proto::{fill_pool, fill_pool_from_files, get_transport, method_desc_to_path};
|
||||
use crate::proto::{
|
||||
fill_pool_from_files, fill_pool_from_reflection, get_transport, method_desc_to_path,
|
||||
};
|
||||
use crate::{json_schema, MethodDefinition, ServiceDefinition};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -182,39 +184,36 @@ impl GrpcHandle {
|
||||
}
|
||||
|
||||
impl GrpcHandle {
|
||||
pub async fn services_from_files(
|
||||
&mut self,
|
||||
id: &str,
|
||||
paths: Vec<PathBuf>,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
let pool_key = format!(
|
||||
"{}-{}",
|
||||
id,
|
||||
paths
|
||||
.iter()
|
||||
.map(|p| p.to_string_lossy().to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(":")
|
||||
);
|
||||
let pool = fill_pool_from_files(&self.app_handle, paths).await?;
|
||||
self.pools.insert(pool_key, pool.clone());
|
||||
Ok(self.services_from_pool(&pool))
|
||||
}
|
||||
|
||||
pub async fn services_from_reflection(
|
||||
pub async fn reflect(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: &str,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
// Short-circuit if no URL is set
|
||||
if uri.is_empty() {
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
proto_files: &Vec<PathBuf>,
|
||||
) -> Result<(), String> {
|
||||
let pool = if proto_files.is_empty() {
|
||||
let full_uri = uri_from_str(uri)?;
|
||||
fill_pool_from_reflection(&full_uri).await
|
||||
} else {
|
||||
fill_pool_from_files(&self.app_handle, proto_files).await
|
||||
}?;
|
||||
|
||||
let uri = uri_from_str(uri)?;
|
||||
let pool = fill_pool(&uri).await?;
|
||||
let pool_key = format!("{}-{}", id, uri);
|
||||
self.pools.insert(pool_key, pool.clone());
|
||||
self.pools
|
||||
.insert(make_pool_key(id, uri, proto_files), pool.clone());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn services(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: &str,
|
||||
proto_files: &Vec<PathBuf>,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
// Ensure reflection is up-to-date
|
||||
self.reflect(id, uri, proto_files).await?;
|
||||
|
||||
let pool = self
|
||||
.get_pool(id, uri, proto_files)
|
||||
.ok_or("Failed to get pool".to_string())?;
|
||||
Ok(self.services_from_pool(&pool))
|
||||
}
|
||||
|
||||
@@ -247,25 +246,26 @@ impl GrpcHandle {
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: &str,
|
||||
proto_files: Vec<PathBuf>,
|
||||
proto_files: &Vec<PathBuf>,
|
||||
) -> Result<GrpcConnection, String> {
|
||||
let uri = uri_from_str(uri)?;
|
||||
let pool = match self.pools.get(id) {
|
||||
Some(p) => p.clone(),
|
||||
None => match proto_files.len() {
|
||||
0 => fill_pool(&uri).await?,
|
||||
_ => {
|
||||
let pool = fill_pool_from_files(&self.app_handle, proto_files).await?;
|
||||
self.pools.insert(id.to_string(), pool.clone());
|
||||
pool
|
||||
}
|
||||
},
|
||||
};
|
||||
self.reflect(id, uri, proto_files).await?;
|
||||
let pool = self
|
||||
.get_pool(id, uri, proto_files)
|
||||
.ok_or("Failed to get pool")?;
|
||||
|
||||
let uri = uri_from_str(uri)?;
|
||||
let conn = get_transport();
|
||||
let connection = GrpcConnection { pool, conn, uri };
|
||||
let connection = GrpcConnection {
|
||||
pool: pool.clone(),
|
||||
conn,
|
||||
uri,
|
||||
};
|
||||
Ok(connection)
|
||||
}
|
||||
|
||||
fn get_pool(&self, id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> Option<&DescriptorPool> {
|
||||
self.pools.get(make_pool_key(id, uri, proto_files).as_str())
|
||||
}
|
||||
}
|
||||
|
||||
fn decorate_req<T>(metadata: HashMap<String, String>, req: &mut Request<T>) -> Result<(), String> {
|
||||
@@ -287,3 +287,18 @@ fn uri_from_str(uri_str: &str) -> Result<Uri, String> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn make_pool_key(id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> String {
|
||||
let pool_key = format!(
|
||||
"{}::{}::{}",
|
||||
id,
|
||||
uri,
|
||||
proto_files
|
||||
.iter()
|
||||
.map(|p| p.to_string_lossy().to_string())
|
||||
.collect::<Vec<String>>()
|
||||
.join(":")
|
||||
);
|
||||
|
||||
format!("{:x}", md5::compute(pool_key))
|
||||
}
|
||||
|
||||
@@ -28,7 +28,7 @@ use tonic_reflection::pb::ServerReflectionRequest;
|
||||
|
||||
pub async fn fill_pool_from_files(
|
||||
app_handle: &AppHandle,
|
||||
paths: Vec<PathBuf>,
|
||||
paths: &Vec<PathBuf>,
|
||||
) -> Result<DescriptorPool, String> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let random_file_name = format!("{}.desc", uuid::Uuid::new_v4());
|
||||
@@ -121,7 +121,7 @@ pub async fn fill_pool_from_files(
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub async fn fill_pool(uri: &Uri) -> Result<DescriptorPool, String> {
|
||||
pub async fn fill_pool_from_reflection(uri: &Uri) -> Result<DescriptorPool, String> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let mut client = ServerReflectionClient::with_origin(get_transport(), uri.clone());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user