Use basemsg

This commit is contained in:
Gregory Schier
2024-02-11 08:29:57 -08:00
parent 09c7c2cb91
commit 00289734c7

View File

@@ -18,7 +18,6 @@ use std::str::FromStr;
use ::http::uri::InvalidUri; use ::http::uri::InvalidUri;
use ::http::Uri; use ::http::Uri;
use fern::colors::ColoredLevelConfig; use fern::colors::ColoredLevelConfig;
use futures::StreamExt;
use log::{debug, error, info, warn}; use log::{debug, error, info, warn};
use rand::random; use rand::random;
use serde_json::{json, Value}; use serde_json::{json, Value};
@@ -149,24 +148,24 @@ async fn cmd_grpc_go(
.map_err(|e| e.to_string())? .map_err(|e| e.to_string())?
}; };
{ let base_msg = GrpcMessage {
let conn = conn.clone(); workspace_id: req.clone().workspace_id,
let req = req.clone(); request_id: req.clone().id,
upsert_grpc_message( connection_id: conn.clone().id,
&w, ..Default::default()
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message");
}; };
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Initiating connection".to_string(),
is_info: true,
..base_msg.clone()
},
)
.await
.expect("Failed to upsert message");
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16); let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone())); let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
@@ -212,8 +211,7 @@ async fn cmd_grpc_go(
let cb = { let cb = {
let cancelled_rx = cancelled_rx.clone(); let cancelled_rx = cancelled_rx.clone();
let w = w.clone(); let w = w.clone();
let conn = conn.clone(); let base_msg = base_msg.clone();
let req = req.clone();
move |ev: tauri::Event| { move |ev: tauri::Event| {
if *cancelled_rx.borrow() { if *cancelled_rx.borrow() {
@@ -236,17 +234,13 @@ async fn cmd_grpc_go(
Ok(IncomingMsg::Message(msg)) => { Ok(IncomingMsg::Message(msg)) => {
in_msg_tx.try_send(msg.clone()).unwrap(); in_msg_tx.try_send(msg.clone()).unwrap();
let w = w.clone(); let w = w.clone();
let req = req.clone(); let base_msg = base_msg.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move { tauri::async_runtime::spawn(async move {
upsert_grpc_message( upsert_grpc_message(
&w, &w,
&GrpcMessage { &GrpcMessage {
message: msg, message: msg,
workspace_id: req.workspace_id, ..base_msg.clone()
request_id: req.id,
connection_id: conn.id,
..Default::default()
}, },
) )
.await .await
@@ -270,7 +264,7 @@ async fn cmd_grpc_go(
let grpc_listen = { let grpc_listen = {
let w = w.clone(); let w = w.clone();
let conn = conn.clone(); let base_msg = base_msg.clone();
let req = req.clone(); let req = req.clone();
async move { async move {
let (maybe_stream, maybe_msg) = match ( let (maybe_stream, maybe_msg) = match (
@@ -278,12 +272,7 @@ async fn cmd_grpc_go(
method_desc.is_server_streaming(), method_desc.is_server_streaming(),
) { ) {
(true, true) => ( (true, true) => (
Some( Some(connection.streaming(&service, &method, in_msg_stream).await),
connection
.streaming(&service, &method, in_msg_stream)
.await
.unwrap(),
),
None, None,
), ),
(true, false) => ( (true, false) => (
@@ -291,81 +280,118 @@ async fn cmd_grpc_go(
Some( Some(
connection connection
.client_streaming(&service, &method, in_msg_stream) .client_streaming(&service, &method, in_msg_stream)
.await .await,
.map_err(|e| e.to_string())
.unwrap(),
), ),
), ),
(false, true) => ( (false, true) => (
Some( Some(
connection connection
.server_streaming(&service, &method, &req.message) .server_streaming(&service, &method, &req.message)
.await .await,
.unwrap(),
), ),
None, None,
), ),
(false, false) => ( (false, false) => (
None, None,
Some( Some(connection.unary(&service, &method, &req.message).await),
connection
.unary(&service, &method, &req.message)
.await
.unwrap(),
),
), ),
}; };
if let Some(msg) = maybe_msg { match maybe_msg {
let req = req.clone(); Some(Ok(msg)) => {
let conn = conn.clone(); upsert_grpc_message(
upsert_grpc_message( &w,
&w, &GrpcMessage {
&GrpcMessage { message: serialize_message(&msg).unwrap(),
message: serialize_message(&msg).unwrap(), is_server: true,
workspace_id: req.workspace_id, ..base_msg.clone()
request_id: req.id, },
connection_id: conn.id, )
is_server: true, .await
..Default::default() .unwrap();
}, }
) Some(Err(e)) => {
.await // TODO: Make into error
.map_err(|e| e.to_string()) upsert_grpc_message(
.unwrap(); &w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
}
None => {}
} }
let mut stream = match maybe_stream { let mut stream = match maybe_stream {
Some(s) => s, Some(Ok(s)) => s,
Some(Err(e)) => {
// TODO: Make into error
upsert_grpc_message(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
return;
}
None => return, None => return,
}; };
loop { loop {
match stream.next().await { match stream.message().await {
Some(Ok(item)) => { Ok(Some(msg)) => {
let item = serialize_message(&item).unwrap(); let message = serialize_message(&msg).unwrap();
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message( upsert_grpc_message(
&w, &w,
&GrpcMessage { &GrpcMessage {
message: item, message,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true, is_server: true,
..Default::default() ..base_msg.clone()
}, },
) )
.await .await
.unwrap(); .unwrap();
} }
Some(Err(e)) => { Ok(None) => {
error!("gRPC stream error: {:?}", e); // TODO: Store trailers on connection
// TODO: Handle error let trailers = stream.trailers().await.unwrap_or_default();
} info!("gRPC stream closed by sender {:?}", trailers,);
None => { // TODO: Mark this on connection instead
info!("gRPC stream closed by sender"); upsert_grpc_message(
&w,
&GrpcMessage {
message: "Connection completed".to_string(),
..base_msg.clone()
},
)
.await
.unwrap();
break; break;
} }
Err(e) => {
// TODO: Make into error
upsert_grpc_message(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
}
} }
} }
} }
@@ -375,21 +401,8 @@ async fn cmd_grpc_go(
let conn = conn.clone(); let conn = conn.clone();
tauri::async_runtime::spawn(async move { tauri::async_runtime::spawn(async move {
let w = w.clone(); let w = w.clone();
let req = req.clone();
tokio::select! { tokio::select! {
_ = grpc_listen => { _ = grpc_listen => {
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.clone().id,
is_info: true,
..Default::default()
},
)
.await.unwrap();
upsert_grpc_connection( upsert_grpc_connection(
&w, &w,
&GrpcConnection{ &GrpcConnection{
@@ -403,11 +416,8 @@ async fn cmd_grpc_go(
&w, &w,
&GrpcMessage { &GrpcMessage {
message: "Connection cancelled".to_string(), message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.clone().id,
is_info: true, is_info: true,
..Default::default() ..base_msg.clone()
}, },
) )
.await.unwrap(); .await.unwrap();