Use basemsg

This commit is contained in:
Gregory Schier
2024-02-11 08:29:57 -08:00
parent d274b85db1
commit 90b0112f14

View File

@@ -18,7 +18,6 @@ use std::str::FromStr;
use ::http::uri::InvalidUri;
use ::http::Uri;
use fern::colors::ColoredLevelConfig;
use futures::StreamExt;
use log::{debug, error, info, warn};
use rand::random;
use serde_json::{json, Value};
@@ -149,24 +148,24 @@ async fn cmd_grpc_go(
.map_err(|e| e.to_string())?
};
{
let conn = conn.clone();
let req = req.clone();
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message");
let base_msg = GrpcMessage {
workspace_id: req.clone().workspace_id,
request_id: req.clone().id,
connection_id: conn.clone().id,
..Default::default()
};
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Initiating connection".to_string(),
is_info: true,
..base_msg.clone()
},
)
.await
.expect("Failed to upsert message");
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
@@ -212,8 +211,7 @@ async fn cmd_grpc_go(
let cb = {
let cancelled_rx = cancelled_rx.clone();
let w = w.clone();
let conn = conn.clone();
let req = req.clone();
let base_msg = base_msg.clone();
move |ev: tauri::Event| {
if *cancelled_rx.borrow() {
@@ -236,17 +234,13 @@ async fn cmd_grpc_go(
Ok(IncomingMsg::Message(msg)) => {
in_msg_tx.try_send(msg.clone()).unwrap();
let w = w.clone();
let req = req.clone();
let conn = conn.clone();
let base_msg = base_msg.clone();
tauri::async_runtime::spawn(async move {
upsert_grpc_message(
&w,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
..base_msg.clone()
},
)
.await
@@ -270,7 +264,7 @@ async fn cmd_grpc_go(
let grpc_listen = {
let w = w.clone();
let conn = conn.clone();
let base_msg = base_msg.clone();
let req = req.clone();
async move {
let (maybe_stream, maybe_msg) = match (
@@ -278,12 +272,7 @@ async fn cmd_grpc_go(
method_desc.is_server_streaming(),
) {
(true, true) => (
Some(
connection
.streaming(&service, &method, in_msg_stream)
.await
.unwrap(),
),
Some(connection.streaming(&service, &method, in_msg_stream).await),
None,
),
(true, false) => (
@@ -291,81 +280,118 @@ async fn cmd_grpc_go(
Some(
connection
.client_streaming(&service, &method, in_msg_stream)
.await
.map_err(|e| e.to_string())
.unwrap(),
.await,
),
),
(false, true) => (
Some(
connection
.server_streaming(&service, &method, &req.message)
.await
.unwrap(),
.await,
),
None,
),
(false, false) => (
None,
Some(
connection
.unary(&service, &method, &req.message)
.await
.unwrap(),
),
Some(connection.unary(&service, &method, &req.message).await),
),
};
if let Some(msg) = maybe_msg {
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message(
&w,
&GrpcMessage {
message: serialize_message(&msg).unwrap(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
match maybe_msg {
Some(Ok(msg)) => {
upsert_grpc_message(
&w,
&GrpcMessage {
message: serialize_message(&msg).unwrap(),
is_server: true,
..base_msg.clone()
},
)
.await
.unwrap();
}
Some(Err(e)) => {
// TODO: Make into error
upsert_grpc_message(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
}
None => {}
}
let mut stream = match maybe_stream {
Some(s) => s,
Some(Ok(s)) => s,
Some(Err(e)) => {
// TODO: Make into error
upsert_grpc_message(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
return;
}
None => return,
};
loop {
match stream.next().await {
Some(Ok(item)) => {
let item = serialize_message(&item).unwrap();
let req = req.clone();
let conn = conn.clone();
match stream.message().await {
Ok(Some(msg)) => {
let message = serialize_message(&msg).unwrap();
upsert_grpc_message(
&w,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
message,
is_server: true,
..Default::default()
..base_msg.clone()
},
)
.await
.unwrap();
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
// TODO: Handle error
}
None => {
info!("gRPC stream closed by sender");
Ok(None) => {
// TODO: Store trailers on connection
let trailers = stream.trailers().await.unwrap_or_default();
info!("gRPC stream closed by sender {:?}", trailers,);
// TODO: Mark this on connection instead
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Connection completed".to_string(),
..base_msg.clone()
},
)
.await
.unwrap();
break;
}
Err(e) => {
// TODO: Make into error
upsert_grpc_message(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
},
)
.await
.unwrap();
}
}
}
}
@@ -375,21 +401,8 @@ async fn cmd_grpc_go(
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let w = w.clone();
let req = req.clone();
tokio::select! {
_ = grpc_listen => {
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.clone().id,
is_info: true,
..Default::default()
},
)
.await.unwrap();
upsert_grpc_connection(
&w,
&GrpcConnection{
@@ -403,11 +416,8 @@ async fn cmd_grpc_go(
&w,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.clone().id,
is_info: true,
..Default::default()
..base_msg.clone()
},
)
.await.unwrap();