Revert hyper v1 for gRPC

This commit is contained in:
Gregory Schier
2024-10-12 22:05:17 -07:00
parent f1beabcb6f
commit 8090e67b9e
4 changed files with 340 additions and 315 deletions
+28 -14
View File
@@ -1,9 +1,10 @@
use std::collections::BTreeMap;
use std::path::PathBuf;
use std::str::FromStr;
use hyper::client::HttpConnector;
use hyper::Client;
use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::Client;
use hyper_util::client::legacy::connect::HttpConnector;
pub use prost_reflect::DynamicMessage;
use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor};
use serde_json::Deserializer;
@@ -53,14 +54,19 @@ impl From<Status> for StreamError {
impl GrpcConnection {
pub fn service(&self, service: &str) -> Result<ServiceDescriptor, String> {
let service = self.pool.get_service_by_name(service).ok_or("Failed to find service")?;
let service = self
.pool
.get_service_by_name(service)
.ok_or("Failed to find service")?;
Ok(service)
}
pub fn method(&self, service: &str, method: &str) -> Result<MethodDescriptor, String> {
let service = self.service(service)?;
let method =
service.methods().find(|m| m.name() == method).ok_or("Failed to find method")?;
let method = service
.methods()
.find(|m| m.name() == method)
.ok_or("Failed to find method")?;
Ok(method)
}
@@ -126,10 +132,13 @@ impl GrpcConnection {
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
client.ready().await.unwrap();
client.client_streaming(req, path, codec).await.map_err(|e| StreamError {
message: e.message().to_string(),
status: Some(e),
})
client
.client_streaming(req, path, codec)
.await
.map_err(|e| StreamError {
message: e.message().to_string(),
status: Some(e),
})
}
pub async fn server_streaming(
@@ -188,7 +197,8 @@ impl GrpcHandle {
fill_pool_from_files(&self.app_handle, proto_files).await
}?;
self.pools.insert(make_pool_key(id, uri, proto_files), pool.clone());
self.pools
.insert(make_pool_key(id, uri, proto_files), pool.clone());
Ok(())
}
@@ -201,7 +211,9 @@ impl GrpcHandle {
// Ensure reflection is up-to-date
self.reflect(id, uri, proto_files).await?;
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool".to_string())?;
let pool = self
.get_pool(id, uri, proto_files)
.ok_or("Failed to get pool".to_string())?;
Ok(self.services_from_pool(&pool))
}
@@ -222,7 +234,7 @@ impl GrpcHandle {
&pool,
input_message,
))
.unwrap(),
.unwrap(),
})
}
def
@@ -237,7 +249,9 @@ impl GrpcHandle {
proto_files: &Vec<PathBuf>,
) -> Result<GrpcConnection, String> {
self.reflect(id, uri, proto_files).await?;
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool")?;
let pool = self
.get_pool(id, uri, proto_files)
.ok_or("Failed to get pool")?;
let uri = uri_from_str(uri)?;
let conn = get_transport();
@@ -287,4 +301,4 @@ fn make_pool_key(id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> String {
);
format!("{:x}", md5::compute(pool_key))
}
}