diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs index f51b6989..f9c98533 100644 --- a/src-tauri/grpc/src/manager.rs +++ b/src-tauri/grpc/src/manager.rs @@ -11,7 +11,7 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tonic::body::BoxBody; use tonic::transport::Uri; -use tonic::{IntoRequest, IntoStreamingRequest, Streaming}; +use tonic::{IntoRequest, IntoStreamingRequest, Response, Status, Streaming}; use crate::codec::DynamicCodec; use crate::proto::{fill_pool, fill_pool_from_files, get_transport, method_desc_to_path}; @@ -75,7 +75,7 @@ impl GrpcConnection { service: &str, method: &str, stream: ReceiverStream, - ) -> Result, String> { + ) -> Result>, Status>, String> { let method = &self.method(&service, &method)?; let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); @@ -92,12 +92,8 @@ impl GrpcConnection { .into_streaming_request(); let path = method_desc_to_path(method); let codec = DynamicCodec::new(method.clone()); - client.ready().await.unwrap(); - Ok(client - .streaming(req, path, codec) - .await - .map_err(|s| s.to_string())? - .into_inner()) + client.ready().await.map_err(|e| e.to_string())?; + Ok(client.streaming(req, path, codec).await) } pub async fn client_streaming( @@ -138,7 +134,7 @@ impl GrpcConnection { service: &str, method: &str, message: &str, - ) -> Result, String> { + ) -> Result>, Status>, String> { let method = &self.method(&service, &method)?; let input_message = method.input(); @@ -152,12 +148,8 @@ impl GrpcConnection { let req = req_message.into_request(); let path = method_desc_to_path(method); let codec = DynamicCodec::new(method.clone()); - client.ready().await.unwrap(); - Ok(client - .server_streaming(req, path, codec) - .await - .map_err(|s| s.to_string())? - .into_inner()) + client.ready().await.map_err(|e| e.to_string())?; + Ok(client.server_streaming(req, path, codec).await) } } @@ -230,7 +222,7 @@ impl GrpcHandle { service: &str, method: &str, message: &str, - ) -> Result, String> { + ) -> Result>, Status>, String> { self.connect(id, uri, proto_files) .await? .server_streaming(service, method, message) @@ -260,7 +252,7 @@ impl GrpcHandle { service: &str, method: &str, stream: ReceiverStream, - ) -> Result, String> { + ) -> Result>, Status>, String> { self.connect(id, uri, proto_files) .await? .streaming(service, method, stream) diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 4df8b89e..bacd0bd3 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -312,6 +312,7 @@ async fn cmd_grpc_go( } Some(Err(e)) => { // TODO: Make into error + println!("Error connecting: {:?}", e); upsert_grpc_message( &w, &GrpcMessage { @@ -328,9 +329,26 @@ async fn cmd_grpc_go( } let mut stream = match maybe_stream { - Some(Ok(s)) => s, + Some(Ok(Ok(s))) => s.into_inner(), + Some(Ok(Err(e))) => { + // TODO: Make into error, and use status + println!("Connection status error: {:?}", e); + upsert_grpc_message( + &w, + &GrpcMessage { + message: e.message().to_string(), + is_server: true, + is_info: true, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + return; + } Some(Err(e)) => { // TODO: Make into error + println!("Generic error: {:?}", e); upsert_grpc_message( &w, &GrpcMessage { @@ -370,7 +388,8 @@ async fn cmd_grpc_go( upsert_grpc_message( &w, &GrpcMessage { - message: "Connection completed".to_string(), + message: "Connection closed".to_string(), + is_info: true, ..base_msg.clone() }, ) @@ -378,12 +397,13 @@ async fn cmd_grpc_go( .unwrap(); break; } - Err(e) => { + Err(status) => { // TODO: Make into error + println!("Error status: {:?}", status); upsert_grpc_message( &w, &GrpcMessage { - message: e.to_string(), + message: status.message().to_string(), is_server: true, is_info: true, ..base_msg.clone()