fix(grpc): proto dep topo order to solve panic (#130)

This commit is contained in:
Hao Xiang
2024-10-30 05:19:11 +08:00
committed by GitHub
parent 6d2c3712c0
commit 1050ac5e3c
3 changed files with 179 additions and 35 deletions

1
src-tauri/Cargo.lock generated
View File

@@ -8038,6 +8038,7 @@ name = "yaak_grpc"
version = "0.1.0"
dependencies = [
"anyhow",
"async-recursion",
"dunce",
"hyper 0.14.30",
"hyper-rustls 0.24.2",

View File

@@ -22,3 +22,4 @@ tauri = { workspace = true }
tauri-plugin-shell = { workspace = true }
md5 = "0.7.0"
dunce = "1.0.4"
async-recursion = "1.1.1"

View File

@@ -4,6 +4,7 @@ use std::path::PathBuf;
use std::str::FromStr;
use anyhow::anyhow;
use async_recursion::async_recursion;
use hyper::client::HttpConnector;
use hyper::Client;
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
@@ -38,9 +39,8 @@ pub async fn fill_pool_from_files(
.expect("failed to resolve protoc include directory");
// HACK: Remove UNC prefix for Windows paths
let global_import_dir = dunce::simplified(global_import_dir.as_path())
.to_string_lossy()
.to_string();
let global_import_dir =
dunce::simplified(global_import_dir.as_path()).to_string_lossy().to_string();
let desc_path = dunce::simplified(desc_path.as_path());
let mut args = vec![
@@ -89,12 +89,9 @@ pub async fn fill_pool_from_files(
let bytes = fs::read(desc_path).await.map_err(|e| e.to_string())?;
let fdp = FileDescriptorSet::decode(bytes.deref()).map_err(|e| e.to_string())?;
pool.add_file_descriptor_set(fdp)
.map_err(|e| e.to_string())?;
pool.add_file_descriptor_set(fdp).map_err(|e| e.to_string())?;
fs::remove_file(desc_path)
.await
.map_err(|e| e.to_string())?;
fs::remove_file(desc_path).await.map_err(|e| e.to_string())?;
Ok(pool)
}
@@ -107,6 +104,10 @@ pub async fn fill_pool_from_reflection(uri: &Uri) -> Result<DescriptorPool, Stri
if service == "grpc.reflection.v1alpha.ServerReflection" {
continue;
}
if service == "grpc.reflection.v1.ServerReflection"{
// TODO: update reflection client to use v1
continue;
}
file_descriptor_set_from_service_name(&service, &mut pool, &mut client).await;
}
@@ -120,10 +121,7 @@ pub fn get_transport() -> Client<HttpsConnector<HttpConnector>, BoxBody> {
http_connector.enforce_http(false);
http_connector
});
Client::builder()
.pool_max_idle_per_host(0)
.http2_only(true)
.build(connector)
Client::builder().pool_max_idle_per_host(0).http2_only(true).build(connector)
}
async fn list_services(
@@ -137,11 +135,7 @@ async fn list_services(
_ => panic!("Expected a ListServicesResponse variant"),
};
Ok(list_services_response
.service
.iter()
.map(|s| s.name.clone())
.collect::<Vec<_>>())
Ok(list_services_response.service.iter().map(|s| s.name.clone()).collect::<Vec<_>>())
}
async fn file_descriptor_set_from_service_name(
@@ -153,14 +147,11 @@ async fn file_descriptor_set_from_service_name(
client,
MessageRequest::FileContainingSymbol(service_name.into()),
)
.await
.await
{
Ok(resp) => resp,
Err(e) => {
warn!(
"Error fetching file descriptor for service {}: {}",
service_name, e
);
warn!("Error fetching file descriptor for service {}: {}", service_name, e);
return;
}
};
@@ -170,16 +161,37 @@ async fn file_descriptor_set_from_service_name(
_ => panic!("Expected a FileDescriptorResponse variant"),
};
for fd in file_descriptor_response.file_descriptor_proto {
add_file_descriptors_to_pool(file_descriptor_response.file_descriptor_proto, pool, client)
.await;
}
#[async_recursion]
async fn add_file_descriptors_to_pool(
fds: Vec<Vec<u8>>,
pool: &mut DescriptorPool,
client: &mut ServerReflectionClient<Client<HttpsConnector<HttpConnector>, BoxBody>>,
) {
let mut topo_sort = topology::SimpleTopoSort::new();
let mut fd_mapping = std::collections::HashMap::with_capacity(fds.len());
for fd in fds {
let fdp = FileDescriptorProto::decode(fd.deref()).unwrap();
// Add deps first or else we'll get an error
for dep_name in fdp.clone().dependency {
file_descriptor_set_by_filename(&dep_name, pool, client).await;
}
topo_sort.insert(fdp.name().to_string(), fdp.dependency.clone());
fd_mapping.insert(fdp.name().to_string(), fdp);
}
pool.add_file_descriptor_proto(fdp)
.expect("add file descriptor proto");
for node in topo_sort {
match node {
Ok(node) => {
if let Some(fdp) = fd_mapping.remove(&node) {
pool.add_file_descriptor_proto(fdp).expect("add file descriptor proto");
} else {
file_descriptor_set_by_filename(node.as_str(), pool, client).await;
}
}
Err(_) => panic!("proto file got cycle!"),
}
}
}
@@ -206,11 +218,8 @@ async fn file_descriptor_set_by_filename(
}
};
for fd in file_descriptor_response.file_descriptor_proto {
let fdp = FileDescriptorProto::decode(fd.deref()).unwrap();
pool.add_file_descriptor_proto(fdp)
.expect("add file descriptor proto");
}
add_file_descriptors_to_pool(file_descriptor_response.file_descriptor_proto, pool, client)
.await;
}
async fn send_reflection_request(
@@ -249,4 +258,137 @@ pub fn method_desc_to_path(md: &MethodDescriptor) -> PathAndQuery {
.ok_or_else(|| anyhow!("invalid method path"))
.expect("invalid method path");
PathAndQuery::from_str(&format!("/{}/{}", namespace, method_name)).expect("invalid method path")
}
}
mod topology {
use std::collections::{HashMap, HashSet};
pub struct SimpleTopoSort<T> {
out_graph: HashMap<T, HashSet<T>>,
in_graph: HashMap<T, HashSet<T>>,
}
impl<T> SimpleTopoSort<T>
where
T: Eq + std::hash::Hash + Clone,
{
pub fn new() -> Self {
SimpleTopoSort {
out_graph: HashMap::new(),
in_graph: HashMap::new(),
}
}
pub fn insert<I: IntoIterator<Item = T>>(&mut self, node: T, deps: I) {
self.out_graph.entry(node.clone()).or_insert(HashSet::new());
for dep in deps {
self.out_graph.entry(node.clone()).or_insert(HashSet::new()).insert(dep.clone());
self.in_graph.entry(dep.clone()).or_insert(HashSet::new()).insert(node.clone());
}
}
}
impl<T> IntoIterator for SimpleTopoSort<T>
where
T: Eq + std::hash::Hash + Clone,
{
type IntoIter = SimpleTopoSortIter<T>;
type Item = <SimpleTopoSortIter<T> as Iterator>::Item;
fn into_iter(self) -> Self::IntoIter {
SimpleTopoSortIter::new(self)
}
}
pub struct SimpleTopoSortIter<T> {
data: SimpleTopoSort<T>,
zero_indegree: Vec<T>,
}
impl<T> SimpleTopoSortIter<T>
where
T: Eq + std::hash::Hash + Clone,
{
pub fn new(data: SimpleTopoSort<T>) -> Self {
let mut zero_indegree = Vec::new();
for (node, _) in data.in_graph.iter() {
if !data.out_graph.contains_key(node) {
zero_indegree.push(node.clone());
}
}
for (node, deps) in data.out_graph.iter(){
if deps.is_empty(){
zero_indegree.push(node.clone());
}
}
SimpleTopoSortIter {
data,
zero_indegree,
}
}
}
impl<T> Iterator for SimpleTopoSortIter<T>
where
T: Eq + std::hash::Hash + Clone,
{
type Item = Result<T, &'static str>;
fn next(&mut self) -> Option<Self::Item> {
if self.zero_indegree.is_empty() {
if self.data.out_graph.is_empty() {
return None;
}
return Some(Err("Cycle detected"));
}
let node = self.zero_indegree.pop().unwrap();
if let Some(parents) = self.data.in_graph.get(&node){
for parent in parents.iter(){
let deps = self.data.out_graph.get_mut(parent).unwrap();
deps.remove(&node);
if deps.is_empty() {
self.zero_indegree.push(parent.clone());
}
}
}
self.data.out_graph.remove(&node);
Some(Ok(node))
}
}
#[test]
fn test_sort(){
{
let mut topo_sort = SimpleTopoSort::new();
topo_sort.insert("a", []);
for node in topo_sort {
match node {
Ok(n) => assert_eq!(n, "a"),
Err(e) => panic!("err {}", e),
}
}
}
{
let mut topo_sort = SimpleTopoSort::new();
topo_sort.insert("a", ["b"]);
topo_sort.insert("b", []);
let mut iter = topo_sort.into_iter();
match iter.next() {
Some(Ok(n)) => assert_eq!(n, "b"),
_ => panic!("err"),
}
match iter.next() {
Some(Ok(n)) => assert_eq!(n, "a"),
_ => panic!("err"),
}
assert_eq!(iter.next(), None);
}
}
}