mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-27 20:01:10 +01:00
gRPC Support (#20)
This commit is contained in:
52
src-tauri/grpc/src/codec.rs
Normal file
52
src-tauri/grpc/src/codec.rs
Normal file
@@ -0,0 +1,52 @@
|
||||
use prost_reflect::prost::Message;
|
||||
use prost_reflect::{DynamicMessage, MethodDescriptor};
|
||||
use tonic::codec::{Codec, DecodeBuf, Decoder, EncodeBuf, Encoder};
|
||||
use tonic::Status;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DynamicCodec(MethodDescriptor);
|
||||
|
||||
impl DynamicCodec {
|
||||
#[allow(dead_code)]
|
||||
pub fn new(md: MethodDescriptor) -> Self {
|
||||
Self(md)
|
||||
}
|
||||
}
|
||||
|
||||
impl Codec for DynamicCodec {
|
||||
type Encode = DynamicMessage;
|
||||
type Decode = DynamicMessage;
|
||||
type Encoder = Self;
|
||||
type Decoder = Self;
|
||||
|
||||
fn encoder(&mut self) -> Self::Encoder {
|
||||
self.clone()
|
||||
}
|
||||
|
||||
fn decoder(&mut self) -> Self::Decoder {
|
||||
self.clone()
|
||||
}
|
||||
}
|
||||
|
||||
impl Encoder for DynamicCodec {
|
||||
type Item = DynamicMessage;
|
||||
type Error = Status;
|
||||
|
||||
fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
|
||||
item.encode(dst)
|
||||
.expect("buffer is too small to decode this message");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl Decoder for DynamicCodec {
|
||||
type Item = DynamicMessage;
|
||||
type Error = Status;
|
||||
|
||||
fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
|
||||
let mut msg = DynamicMessage::new(self.0.output());
|
||||
msg.merge(src)
|
||||
.map_err(|err| Status::internal(err.to_string()))?;
|
||||
Ok(Some(msg))
|
||||
}
|
||||
}
|
||||
179
src-tauri/grpc/src/json_schema.rs
Normal file
179
src-tauri/grpc/src/json_schema.rs
Normal file
@@ -0,0 +1,179 @@
|
||||
use prost_reflect::{DescriptorPool, MessageDescriptor};
|
||||
use prost_types::field_descriptor_proto;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Default, Serialize, Deserialize)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
pub struct JsonSchemaEntry {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
title: Option<String>,
|
||||
|
||||
#[serde(rename = "type")]
|
||||
type_: JsonType,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
description: Option<String>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
properties: Option<HashMap<String, JsonSchemaEntry>>,
|
||||
|
||||
#[serde(rename = "enum", skip_serializing_if = "Option::is_none")]
|
||||
enum_: Option<Vec<String>>,
|
||||
|
||||
/// Don't allow any other properties in the object
|
||||
additional_properties: bool,
|
||||
|
||||
/// Set all properties to required
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
required: Option<Vec<String>>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
items: Option<Box<JsonSchemaEntry>>,
|
||||
}
|
||||
|
||||
enum JsonType {
|
||||
String,
|
||||
Number,
|
||||
Object,
|
||||
Array,
|
||||
Boolean,
|
||||
Null,
|
||||
_UNKNOWN,
|
||||
}
|
||||
|
||||
impl Default for JsonType {
|
||||
fn default() -> Self {
|
||||
JsonType::_UNKNOWN
|
||||
}
|
||||
}
|
||||
|
||||
impl serde::Serialize for JsonType {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self {
|
||||
JsonType::String => serializer.serialize_str("string"),
|
||||
JsonType::Number => serializer.serialize_str("number"),
|
||||
JsonType::Object => serializer.serialize_str("object"),
|
||||
JsonType::Array => serializer.serialize_str("array"),
|
||||
JsonType::Boolean => serializer.serialize_str("boolean"),
|
||||
JsonType::Null => serializer.serialize_str("null"),
|
||||
JsonType::_UNKNOWN => serializer.serialize_str("unknown"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> serde::Deserialize<'de> for JsonType {
|
||||
fn deserialize<D>(deserializer: D) -> Result<JsonType, D::Error>
|
||||
where
|
||||
D: serde::Deserializer<'de>,
|
||||
{
|
||||
let s = String::deserialize(deserializer)?;
|
||||
match s.as_str() {
|
||||
"string" => Ok(JsonType::String),
|
||||
"number" => Ok(JsonType::Number),
|
||||
"object" => Ok(JsonType::Object),
|
||||
"array" => Ok(JsonType::Array),
|
||||
"boolean" => Ok(JsonType::Boolean),
|
||||
"null" => Ok(JsonType::Null),
|
||||
_ => Ok(JsonType::_UNKNOWN),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn message_to_json_schema(
|
||||
pool: &DescriptorPool,
|
||||
message: MessageDescriptor,
|
||||
) -> JsonSchemaEntry {
|
||||
let mut schema = JsonSchemaEntry {
|
||||
title: Some(message.name().to_string()),
|
||||
type_: JsonType::Object, // Messages are objects
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let mut properties = HashMap::new();
|
||||
message.fields().for_each(|f| match f.kind() {
|
||||
prost_reflect::Kind::Message(m) => {
|
||||
properties.insert(f.name().to_string(), message_to_json_schema(pool, m));
|
||||
}
|
||||
prost_reflect::Kind::Enum(e) => {
|
||||
properties.insert(
|
||||
f.name().to_string(),
|
||||
JsonSchemaEntry {
|
||||
type_: map_proto_type_to_json_type(f.field_descriptor_proto().r#type()),
|
||||
enum_: Some(e.values().map(|v| v.name().to_string()).collect::<Vec<_>>()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// TODO: Handle repeated label
|
||||
match f.field_descriptor_proto().label() {
|
||||
field_descriptor_proto::Label::Repeated => {
|
||||
// TODO: Handle more complex repeated types. This just handles primitives for now
|
||||
properties.insert(
|
||||
f.name().to_string(),
|
||||
JsonSchemaEntry {
|
||||
type_: JsonType::Array,
|
||||
items: Some(Box::new(JsonSchemaEntry {
|
||||
type_: map_proto_type_to_json_type(
|
||||
f.field_descriptor_proto().r#type(),
|
||||
),
|
||||
..Default::default()
|
||||
})),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
_ => {
|
||||
// Regular JSON field
|
||||
properties.insert(
|
||||
f.name().to_string(),
|
||||
JsonSchemaEntry {
|
||||
type_: map_proto_type_to_json_type(f.field_descriptor_proto().r#type()),
|
||||
..Default::default()
|
||||
},
|
||||
);
|
||||
}
|
||||
};
|
||||
}
|
||||
});
|
||||
|
||||
schema.properties = Some(properties);
|
||||
|
||||
// All proto 3 fields are optional, so maybe we could
|
||||
// make this a setting?
|
||||
// schema.required = Some(
|
||||
// message
|
||||
// .fields()
|
||||
// .map(|f| f.name().to_string())
|
||||
// .collect::<Vec<_>>(),
|
||||
// );
|
||||
|
||||
schema
|
||||
}
|
||||
|
||||
fn map_proto_type_to_json_type(proto_type: field_descriptor_proto::Type) -> JsonType {
|
||||
match proto_type {
|
||||
field_descriptor_proto::Type::Double => JsonType::Number,
|
||||
field_descriptor_proto::Type::Float => JsonType::Number,
|
||||
field_descriptor_proto::Type::Int64 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Uint64 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Int32 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Fixed64 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Fixed32 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Bool => JsonType::Boolean,
|
||||
field_descriptor_proto::Type::String => JsonType::String,
|
||||
field_descriptor_proto::Type::Group => JsonType::_UNKNOWN,
|
||||
field_descriptor_proto::Type::Message => JsonType::Object,
|
||||
field_descriptor_proto::Type::Bytes => JsonType::String,
|
||||
field_descriptor_proto::Type::Uint32 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Enum => JsonType::String,
|
||||
field_descriptor_proto::Type::Sfixed32 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Sfixed64 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Sint32 => JsonType::Number,
|
||||
field_descriptor_proto::Type::Sint64 => JsonType::Number,
|
||||
}
|
||||
}
|
||||
40
src-tauri/grpc/src/lib.rs
Normal file
40
src-tauri/grpc/src/lib.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use prost_reflect::{DynamicMessage, SerializeOptions};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
mod codec;
|
||||
mod json_schema;
|
||||
pub mod manager;
|
||||
mod proto;
|
||||
|
||||
pub fn serialize_options() -> SerializeOptions {
|
||||
SerializeOptions::new().skip_default_fields(false)
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
pub struct ServiceDefinition {
|
||||
pub name: String,
|
||||
pub methods: Vec<MethodDefinition>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Default)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
pub struct MethodDefinition {
|
||||
pub name: String,
|
||||
pub schema: String,
|
||||
pub client_streaming: bool,
|
||||
pub server_streaming: bool,
|
||||
}
|
||||
|
||||
static SERIALIZE_OPTIONS: &'static SerializeOptions = &SerializeOptions::new()
|
||||
.skip_default_fields(false)
|
||||
.stringify_64_bit_integers(false);
|
||||
|
||||
pub fn serialize_message(msg: &DynamicMessage) -> Result<String, String> {
|
||||
let mut buf = Vec::new();
|
||||
let mut se = serde_json::Serializer::pretty(&mut buf);
|
||||
msg.serialize_with_options(&mut se, SERIALIZE_OPTIONS)
|
||||
.map_err(|e| e.to_string())?;
|
||||
let s = String::from_utf8(buf).expect("serde_json to emit valid utf8");
|
||||
Ok(s)
|
||||
}
|
||||
280
src-tauri/grpc/src/manager.rs
Normal file
280
src-tauri/grpc/src/manager.rs
Normal file
@@ -0,0 +1,280 @@
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::Client;
|
||||
use hyper_rustls::HttpsConnector;
|
||||
use prost_reflect::DescriptorPool;
|
||||
pub use prost_reflect::DynamicMessage;
|
||||
use serde_json::Deserializer;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::body::BoxBody;
|
||||
use tonic::transport::Uri;
|
||||
use tonic::{IntoRequest, IntoStreamingRequest, Streaming};
|
||||
|
||||
use crate::codec::DynamicCodec;
|
||||
use crate::proto::{fill_pool, fill_pool_from_files, get_transport, method_desc_to_path};
|
||||
use crate::{json_schema, MethodDefinition, ServiceDefinition};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct GrpcConnection {
|
||||
pool: DescriptorPool,
|
||||
conn: Client<HttpsConnector<HttpConnector>, BoxBody>,
|
||||
pub uri: Uri,
|
||||
}
|
||||
|
||||
impl GrpcConnection {
|
||||
pub async fn unary(
|
||||
&self,
|
||||
service: &str,
|
||||
method: &str,
|
||||
message: &str,
|
||||
) -> Result<DynamicMessage, String> {
|
||||
let service = self.pool.get_service_by_name(service).unwrap();
|
||||
let method = &service.methods().find(|m| m.name() == method).unwrap();
|
||||
let input_message = method.input();
|
||||
|
||||
let mut deserializer = Deserializer::from_str(message);
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)
|
||||
.map_err(|e| e.to_string())?;
|
||||
deserializer.end().unwrap();
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let req = req_message.into_request();
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.unwrap();
|
||||
|
||||
Ok(client
|
||||
.unary(req, path, codec)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?
|
||||
.into_inner())
|
||||
}
|
||||
|
||||
pub async fn streaming(
|
||||
&self,
|
||||
service: &str,
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
) -> Result<Streaming<DynamicMessage>, String> {
|
||||
let service = self.pool.get_service_by_name(service).unwrap();
|
||||
let method = &service.methods().find(|m| m.name() == method).unwrap();
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let method2 = method.clone();
|
||||
let req = stream
|
||||
.map(move |s| {
|
||||
let mut deserializer = Deserializer::from_str(&s);
|
||||
let req_message = DynamicMessage::deserialize(method2.input(), &mut deserializer)
|
||||
.map_err(|e| e.to_string())
|
||||
.unwrap();
|
||||
deserializer.end().unwrap();
|
||||
req_message
|
||||
})
|
||||
.into_streaming_request();
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.unwrap();
|
||||
Ok(client
|
||||
.streaming(req, path, codec)
|
||||
.await
|
||||
.map_err(|s| s.to_string())?
|
||||
.into_inner())
|
||||
}
|
||||
|
||||
pub async fn client_streaming(
|
||||
&self,
|
||||
service: &str,
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
) -> Result<DynamicMessage, String> {
|
||||
let service = self.pool.get_service_by_name(service).unwrap();
|
||||
let method = &service.methods().find(|m| m.name() == method).unwrap();
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let req = {
|
||||
let method = method.clone();
|
||||
stream
|
||||
.map(move |s| {
|
||||
let mut deserializer = Deserializer::from_str(&s);
|
||||
let req_message =
|
||||
DynamicMessage::deserialize(method.input(), &mut deserializer)
|
||||
.map_err(|e| e.to_string())
|
||||
.unwrap();
|
||||
deserializer.end().unwrap();
|
||||
req_message
|
||||
})
|
||||
.into_streaming_request()
|
||||
};
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.unwrap();
|
||||
Ok(client
|
||||
.client_streaming(req, path, codec)
|
||||
.await
|
||||
.map_err(|s| s.to_string())?
|
||||
.into_inner())
|
||||
}
|
||||
|
||||
pub async fn server_streaming(
|
||||
&self,
|
||||
service: &str,
|
||||
method: &str,
|
||||
message: &str,
|
||||
) -> Result<Streaming<DynamicMessage>, String> {
|
||||
let service = self.pool.get_service_by_name(service).unwrap();
|
||||
let method = &service.methods().find(|m| m.name() == method).unwrap();
|
||||
let input_message = method.input();
|
||||
|
||||
let mut deserializer = Deserializer::from_str(message);
|
||||
let req_message = DynamicMessage::deserialize(input_message, &mut deserializer)
|
||||
.map_err(|e| e.to_string())?;
|
||||
deserializer.end().unwrap();
|
||||
|
||||
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
|
||||
|
||||
let req = req_message.into_request();
|
||||
let path = method_desc_to_path(method);
|
||||
let codec = DynamicCodec::new(method.clone());
|
||||
client.ready().await.unwrap();
|
||||
Ok(client
|
||||
.server_streaming(req, path, codec)
|
||||
.await
|
||||
.map_err(|s| s.to_string())?
|
||||
.into_inner())
|
||||
}
|
||||
}
|
||||
|
||||
pub struct GrpcHandle {
|
||||
pools: HashMap<String, DescriptorPool>,
|
||||
}
|
||||
|
||||
impl Default for GrpcHandle {
|
||||
fn default() -> Self {
|
||||
let pools = HashMap::new();
|
||||
Self { pools }
|
||||
}
|
||||
}
|
||||
|
||||
impl GrpcHandle {
|
||||
pub async fn services_from_files(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: &Uri,
|
||||
paths: Vec<PathBuf>,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
let pool = fill_pool_from_files(paths).await?;
|
||||
self.pools.insert(self.get_pool_key(id, uri), pool.clone());
|
||||
Ok(self.services_from_pool(&pool))
|
||||
}
|
||||
pub async fn services_from_reflection(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: &Uri,
|
||||
) -> Result<Vec<ServiceDefinition>, String> {
|
||||
let pool = fill_pool(uri).await?;
|
||||
self.pools.insert(self.get_pool_key(id, uri), pool.clone());
|
||||
Ok(self.services_from_pool(&pool))
|
||||
}
|
||||
|
||||
fn get_pool_key(&self, id: &str, uri: &Uri) -> String {
|
||||
format!("{}-{}", id, uri)
|
||||
}
|
||||
|
||||
fn services_from_pool(&self, pool: &DescriptorPool) -> Vec<ServiceDefinition> {
|
||||
pool.services()
|
||||
.map(|s| {
|
||||
let mut def = ServiceDefinition {
|
||||
name: s.full_name().to_string(),
|
||||
methods: vec![],
|
||||
};
|
||||
for method in s.methods() {
|
||||
let input_message = method.input();
|
||||
def.methods.push(MethodDefinition {
|
||||
name: method.name().to_string(),
|
||||
server_streaming: method.is_server_streaming(),
|
||||
client_streaming: method.is_client_streaming(),
|
||||
schema: serde_json::to_string_pretty(&json_schema::message_to_json_schema(
|
||||
&pool,
|
||||
input_message,
|
||||
))
|
||||
.unwrap(),
|
||||
})
|
||||
}
|
||||
def
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
}
|
||||
|
||||
pub async fn server_streaming(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: Uri,
|
||||
proto_files: Vec<PathBuf>,
|
||||
service: &str,
|
||||
method: &str,
|
||||
message: &str,
|
||||
) -> Result<Streaming<DynamicMessage>, String> {
|
||||
self.connect(id, uri, proto_files)
|
||||
.await?
|
||||
.server_streaming(service, method, message)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn client_streaming(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: Uri,
|
||||
proto_files: Vec<PathBuf>,
|
||||
service: &str,
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
) -> Result<DynamicMessage, String> {
|
||||
self.connect(id, uri, proto_files)
|
||||
.await?
|
||||
.client_streaming(service, method, stream)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn streaming(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: Uri,
|
||||
proto_files: Vec<PathBuf>,
|
||||
service: &str,
|
||||
method: &str,
|
||||
stream: ReceiverStream<String>,
|
||||
) -> Result<Streaming<DynamicMessage>, String> {
|
||||
self.connect(id, uri, proto_files)
|
||||
.await?
|
||||
.streaming(service, method, stream)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn connect(
|
||||
&mut self,
|
||||
id: &str,
|
||||
uri: Uri,
|
||||
proto_files: Vec<PathBuf>,
|
||||
) -> Result<GrpcConnection, String> {
|
||||
let pool = match self.pools.get(id) {
|
||||
Some(p) => p.clone(),
|
||||
None => match proto_files.len() {
|
||||
0 => fill_pool(&uri).await?,
|
||||
_ => {
|
||||
let pool = fill_pool_from_files(proto_files).await?;
|
||||
self.pools.insert(id.to_string(), pool.clone());
|
||||
pool
|
||||
}
|
||||
},
|
||||
};
|
||||
|
||||
let conn = get_transport();
|
||||
let connection = GrpcConnection { pool, conn, uri };
|
||||
Ok(connection)
|
||||
}
|
||||
}
|
||||
226
src-tauri/grpc/src/proto.rs
Normal file
226
src-tauri/grpc/src/proto.rs
Normal file
@@ -0,0 +1,226 @@
|
||||
use std::env::temp_dir;
|
||||
use std::ops::Deref;
|
||||
use std::path::PathBuf;
|
||||
use std::process::Command;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::anyhow;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::Client;
|
||||
use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder};
|
||||
use log::{debug, warn};
|
||||
use prost::Message;
|
||||
use prost_reflect::{DescriptorPool, MethodDescriptor};
|
||||
use prost_types::{FileDescriptorProto, FileDescriptorSet};
|
||||
use tokio::fs;
|
||||
use tokio_stream::StreamExt;
|
||||
use tonic::body::BoxBody;
|
||||
use tonic::codegen::http::uri::PathAndQuery;
|
||||
use tonic::transport::Uri;
|
||||
use tonic::Request;
|
||||
use tonic_reflection::pb::server_reflection_client::ServerReflectionClient;
|
||||
use tonic_reflection::pb::server_reflection_request::MessageRequest;
|
||||
use tonic_reflection::pb::server_reflection_response::MessageResponse;
|
||||
use tonic_reflection::pb::ServerReflectionRequest;
|
||||
|
||||
pub async fn fill_pool_from_files(paths: Vec<PathBuf>) -> Result<DescriptorPool, String> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let random_file_name = format!("{}.desc", uuid::Uuid::new_v4());
|
||||
let desc_path = temp_dir().join(random_file_name);
|
||||
let bin = protoc_bin_vendored::protoc_bin_path().unwrap();
|
||||
|
||||
let mut cmd = Command::new(bin.clone());
|
||||
cmd.arg("--include_imports")
|
||||
.arg("--include_source_info")
|
||||
.arg("-o")
|
||||
.arg(&desc_path);
|
||||
|
||||
for p in paths {
|
||||
if p.as_path().exists() {
|
||||
cmd.arg(p.as_path().to_string_lossy().as_ref());
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
let parent = p.as_path().parent();
|
||||
if let Some(parent_path) = parent {
|
||||
cmd.arg("-I").arg(parent_path);
|
||||
cmd.arg("-I").arg(parent_path.parent().unwrap());
|
||||
} else {
|
||||
debug!("ignoring {:?} since it does not exist.", parent)
|
||||
}
|
||||
}
|
||||
|
||||
let output = cmd.output().map_err(|e| e.to_string())?;
|
||||
if !output.status.success() {
|
||||
return Err(format!(
|
||||
"protoc failed: {}",
|
||||
String::from_utf8_lossy(&output.stderr)
|
||||
));
|
||||
}
|
||||
|
||||
let bytes = fs::read(desc_path.as_path())
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
let fdp = FileDescriptorSet::decode(bytes.deref()).map_err(|e| e.to_string())?;
|
||||
pool.add_file_descriptor_set(fdp)
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
fs::remove_file(desc_path.as_path())
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub async fn fill_pool(uri: &Uri) -> Result<DescriptorPool, String> {
|
||||
let mut pool = DescriptorPool::new();
|
||||
let mut client = ServerReflectionClient::with_origin(get_transport(), uri.clone());
|
||||
|
||||
for service in list_services(&mut client).await? {
|
||||
if service == "grpc.reflection.v1alpha.ServerReflection" {
|
||||
continue;
|
||||
}
|
||||
file_descriptor_set_from_service_name(&service, &mut pool, &mut client).await;
|
||||
}
|
||||
|
||||
Ok(pool)
|
||||
}
|
||||
|
||||
pub fn get_transport() -> Client<HttpsConnector<HttpConnector>, BoxBody> {
|
||||
let connector = HttpsConnectorBuilder::new().with_native_roots();
|
||||
let connector = connector.https_or_http().enable_http2().wrap_connector({
|
||||
let mut http_connector = HttpConnector::new();
|
||||
http_connector.enforce_http(false);
|
||||
http_connector
|
||||
});
|
||||
Client::builder()
|
||||
.pool_max_idle_per_host(0)
|
||||
.http2_only(true)
|
||||
.build(connector)
|
||||
}
|
||||
|
||||
async fn list_services(
|
||||
reflect_client: &mut ServerReflectionClient<Client<HttpsConnector<HttpConnector>, BoxBody>>,
|
||||
) -> Result<Vec<String>, String> {
|
||||
let response =
|
||||
send_reflection_request(reflect_client, MessageRequest::ListServices("".into())).await?;
|
||||
|
||||
let list_services_response = match response {
|
||||
MessageResponse::ListServicesResponse(resp) => resp,
|
||||
_ => panic!("Expected a ListServicesResponse variant"),
|
||||
};
|
||||
|
||||
Ok(list_services_response
|
||||
.service
|
||||
.iter()
|
||||
.map(|s| s.name.clone())
|
||||
.collect::<Vec<_>>())
|
||||
}
|
||||
|
||||
async fn file_descriptor_set_from_service_name(
|
||||
service_name: &str,
|
||||
pool: &mut DescriptorPool,
|
||||
client: &mut ServerReflectionClient<Client<HttpsConnector<HttpConnector>, BoxBody>>,
|
||||
) {
|
||||
let response = match send_reflection_request(
|
||||
client,
|
||||
MessageRequest::FileContainingSymbol(service_name.into()),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(resp) => resp,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Error fetching file descriptor for service {}: {}",
|
||||
service_name, e
|
||||
);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let file_descriptor_response = match response {
|
||||
MessageResponse::FileDescriptorResponse(resp) => resp,
|
||||
_ => panic!("Expected a FileDescriptorResponse variant"),
|
||||
};
|
||||
|
||||
for fd in file_descriptor_response.file_descriptor_proto {
|
||||
let fdp = FileDescriptorProto::decode(fd.deref()).unwrap();
|
||||
|
||||
// Add deps first or else we'll get an error
|
||||
for dep_name in fdp.clone().dependency {
|
||||
file_descriptor_set_by_filename(&dep_name, pool, client).await;
|
||||
}
|
||||
|
||||
pool.add_file_descriptor_proto(fdp)
|
||||
.expect("add file descriptor proto");
|
||||
}
|
||||
}
|
||||
|
||||
async fn file_descriptor_set_by_filename(
|
||||
filename: &str,
|
||||
pool: &mut DescriptorPool,
|
||||
client: &mut ServerReflectionClient<Client<HttpsConnector<HttpConnector>, BoxBody>>,
|
||||
) {
|
||||
// We already fetched this file
|
||||
if let Some(_) = pool.get_file_by_name(filename) {
|
||||
return;
|
||||
}
|
||||
|
||||
let response =
|
||||
send_reflection_request(client, MessageRequest::FileByFilename(filename.into())).await;
|
||||
let file_descriptor_response = match response {
|
||||
Ok(MessageResponse::FileDescriptorResponse(resp)) => resp,
|
||||
Ok(_) => {
|
||||
panic!("Expected a FileDescriptorResponse variant")
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error fetching file descriptor for {}: {}", filename, e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
for fd in file_descriptor_response.file_descriptor_proto {
|
||||
let fdp = FileDescriptorProto::decode(fd.deref()).unwrap();
|
||||
pool.add_file_descriptor_proto(fdp)
|
||||
.expect("add file descriptor proto");
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_reflection_request(
|
||||
client: &mut ServerReflectionClient<Client<HttpsConnector<HttpConnector>, BoxBody>>,
|
||||
message: MessageRequest,
|
||||
) -> Result<MessageResponse, String> {
|
||||
let reflection_request = ServerReflectionRequest {
|
||||
host: "".into(), // Doesn't matter
|
||||
message_request: Some(message),
|
||||
};
|
||||
|
||||
let request = Request::new(tokio_stream::once(reflection_request));
|
||||
|
||||
client
|
||||
.server_reflection_info(request)
|
||||
.await
|
||||
.map_err(|e| match e.code() {
|
||||
tonic::Code::Unavailable => "Failed to connect to endpoint".to_string(),
|
||||
tonic::Code::Unauthenticated => "Authentication failed".to_string(),
|
||||
tonic::Code::DeadlineExceeded => "Deadline exceeded".to_string(),
|
||||
_ => e.to_string(),
|
||||
})?
|
||||
.into_inner()
|
||||
.next()
|
||||
.await
|
||||
.expect("steamed response")
|
||||
.map_err(|e| e.to_string())?
|
||||
.message_response
|
||||
.ok_or("No reflection response".to_string())
|
||||
}
|
||||
|
||||
pub fn method_desc_to_path(md: &MethodDescriptor) -> PathAndQuery {
|
||||
let full_name = md.full_name();
|
||||
let (namespace, method_name) = full_name
|
||||
.rsplit_once('.')
|
||||
.ok_or_else(|| anyhow!("invalid method path"))
|
||||
.expect("invalid method path");
|
||||
PathAndQuery::from_str(&format!("/{}/{}", namespace, method_name)).expect("invalid method path")
|
||||
}
|
||||
Reference in New Issue
Block a user