Combine grpc handlers, fix duplicate

This commit is contained in:
Gregory Schier
2024-02-10 10:41:45 -08:00
parent bbe62abd20
commit 09c7c2cb91
10 changed files with 135 additions and 551 deletions

View File

@@ -4,8 +4,8 @@ use std::path::PathBuf;
use hyper::client::HttpConnector;
use hyper::Client;
use hyper_rustls::HttpsConnector;
use prost_reflect::DescriptorPool;
pub use prost_reflect::DynamicMessage;
use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor};
use serde_json::Deserializer;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
@@ -25,14 +25,30 @@ pub struct GrpcConnection {
}
impl GrpcConnection {
pub fn service(&self, service: &str) -> Result<ServiceDescriptor, String> {
let service = self
.pool
.get_service_by_name(service)
.ok_or("Failed to find service")?;
Ok(service)
}
pub fn method(&self, service: &str, method: &str) -> Result<MethodDescriptor, String> {
let service = self.service(service)?;
let method = service
.methods()
.find(|m| m.name() == method)
.ok_or("Failed to find method")?;
Ok(method)
}
pub async fn unary(
&self,
service: &str,
method: &str,
message: &str,
) -> Result<DynamicMessage, String> {
let service = self.pool.get_service_by_name(service).unwrap();
let method = &service.methods().find(|m| m.name() == method).unwrap();
let method = &self.method(&service, &method)?;
let input_message = method.input();
let mut deserializer = Deserializer::from_str(message);
@@ -60,9 +76,7 @@ impl GrpcConnection {
method: &str,
stream: ReceiverStream<String>,
) -> Result<Streaming<DynamicMessage>, String> {
let service = self.pool.get_service_by_name(service).unwrap();
let method = &service.methods().find(|m| m.name() == method).unwrap();
let method = &self.method(&service, &method)?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
let method2 = method.clone();
@@ -92,8 +106,7 @@ impl GrpcConnection {
method: &str,
stream: ReceiverStream<String>,
) -> Result<DynamicMessage, String> {
let service = self.pool.get_service_by_name(service).unwrap();
let method = &service.methods().find(|m| m.name() == method).unwrap();
let method = &self.method(&service, &method)?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
let req = {
@@ -126,8 +139,7 @@ impl GrpcConnection {
method: &str,
message: &str,
) -> Result<Streaming<DynamicMessage>, String> {
let service = self.pool.get_service_by_name(service).unwrap();
let method = &service.methods().find(|m| m.name() == method).unwrap();
let method = &self.method(&service, &method)?;
let input_message = method.input();
let mut deserializer = Deserializer::from_str(message);