Cargo format

This commit is contained in:
Gregory Schier
2025-01-11 13:53:30 -08:00
parent 295aea4f2e
commit ba330047ca
17 changed files with 82 additions and 163 deletions

View File

@@ -33,8 +33,7 @@ impl Encoder for DynamicCodec {
type Error = Status;
fn encode(&mut self, item: Self::Item, dst: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
item.encode(dst)
.expect("buffer is too small to decode this message");
item.encode(dst).expect("buffer is too small to decode this message");
Ok(())
}
}
@@ -45,8 +44,7 @@ impl Decoder for DynamicCodec {
fn decode(&mut self, src: &mut DecodeBuf<'_>) -> Result<Option<Self::Item>, Self::Error> {
let mut msg = DynamicMessage::new(self.0.output());
msg.merge(src)
.map_err(|err| Status::internal(err.to_string()))?;
msg.merge(src).map_err(|err| Status::internal(err.to_string()))?;
Ok(Some(msg))
}
}

View File

@@ -30,15 +30,13 @@ pub struct MethodDefinition {
pub server_streaming: bool,
}
static SERIALIZE_OPTIONS: &'static SerializeOptions = &SerializeOptions::new()
.skip_default_fields(false)
.stringify_64_bit_integers(false);
static SERIALIZE_OPTIONS: &'static SerializeOptions =
&SerializeOptions::new().skip_default_fields(false).stringify_64_bit_integers(false);
pub fn serialize_message(msg: &DynamicMessage) -> Result<String, String> {
let mut buf = Vec::new();
let mut se = serde_json::Serializer::pretty(&mut buf);
msg.serialize_with_options(&mut se, SERIALIZE_OPTIONS)
.map_err(|e| e.to_string())?;
msg.serialize_with_options(&mut se, SERIALIZE_OPTIONS).map_err(|e| e.to_string())?;
let s = String::from_utf8(buf).expect("serde_json to emit valid utf8");
Ok(s)
}

View File

@@ -54,19 +54,14 @@ impl From<Status> for StreamError {
impl GrpcConnection {
pub fn service(&self, service: &str) -> Result<ServiceDescriptor, String> {
let service = self
.pool
.get_service_by_name(service)
.ok_or("Failed to find service")?;
let service = self.pool.get_service_by_name(service).ok_or("Failed to find service")?;
Ok(service)
}
pub fn method(&self, service: &str, method: &str) -> Result<MethodDescriptor, String> {
let service = self.service(service)?;
let method = service
.methods()
.find(|m| m.name() == method)
.ok_or("Failed to find method")?;
let method =
service.methods().find(|m| m.name() == method).ok_or("Failed to find method")?;
Ok(method)
}
@@ -132,13 +127,10 @@ impl GrpcConnection {
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
client.ready().await.unwrap();
client
.client_streaming(req, path, codec)
.await
.map_err(|e| StreamError {
message: e.message().to_string(),
status: Some(e),
})
client.client_streaming(req, path, codec).await.map_err(|e| StreamError {
message: e.message().to_string(),
status: Some(e),
})
}
pub async fn server_streaming(
@@ -197,8 +189,7 @@ impl GrpcHandle {
fill_pool_from_files(&self.app_handle, proto_files).await
}?;
self.pools
.insert(make_pool_key(id, uri, proto_files), pool.clone());
self.pools.insert(make_pool_key(id, uri, proto_files), pool.clone());
Ok(())
}
@@ -211,9 +202,7 @@ impl GrpcHandle {
// Ensure reflection is up-to-date
self.reflect(id, uri, proto_files).await?;
let pool = self
.get_pool(id, uri, proto_files)
.ok_or("Failed to get pool".to_string())?;
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool".to_string())?;
Ok(self.services_from_pool(&pool))
}
@@ -234,7 +223,7 @@ impl GrpcHandle {
&pool,
input_message,
))
.unwrap(),
.unwrap(),
})
}
def
@@ -249,9 +238,7 @@ impl GrpcHandle {
proto_files: &Vec<PathBuf>,
) -> Result<GrpcConnection, String> {
self.reflect(id, uri, proto_files).await?;
let pool = self
.get_pool(id, uri, proto_files)
.ok_or("Failed to get pool")?;
let pool = self.get_pool(id, uri, proto_files).ok_or("Failed to get pool")?;
let uri = uri_from_str(uri)?;
let conn = get_transport();
@@ -301,4 +288,4 @@ fn make_pool_key(id: &str, uri: &str, proto_files: &Vec<PathBuf>) -> String {
);
format!("{:x}", md5::compute(pool_key))
}
}

View File

@@ -104,7 +104,7 @@ pub async fn fill_pool_from_reflection(uri: &Uri) -> Result<DescriptorPool, Stri
if service == "grpc.reflection.v1alpha.ServerReflection" {
continue;
}
if service == "grpc.reflection.v1.ServerReflection"{
if service == "grpc.reflection.v1.ServerReflection" {
// TODO: update reflection client to use v1
continue;
}
@@ -316,8 +316,8 @@ mod topology {
zero_indegree.push(node.clone());
}
}
for (node, deps) in data.out_graph.iter(){
if deps.is_empty(){
for (node, deps) in data.out_graph.iter() {
if deps.is_empty() {
zero_indegree.push(node.clone());
}
}
@@ -344,8 +344,8 @@ mod topology {
}
let node = self.zero_indegree.pop().unwrap();
if let Some(parents) = self.data.in_graph.get(&node){
for parent in parents.iter(){
if let Some(parents) = self.data.in_graph.get(&node) {
for parent in parents.iter() {
let deps = self.data.out_graph.get_mut(parent).unwrap();
deps.remove(&node);
if deps.is_empty() {
@@ -360,7 +360,7 @@ mod topology {
}
#[test]
fn test_sort(){
fn test_sort() {
{
let mut topo_sort = SimpleTopoSort::new();
topo_sort.insert("a", []);
@@ -390,5 +390,4 @@ mod topology {
assert_eq!(iter.next(), None);
}
}
}