More messages

This commit is contained in:
Gregory Schier
2024-02-04 19:08:31 -08:00
parent 4d2b101278
commit 5ed1ea07ef
5 changed files with 178 additions and 106 deletions

View File

@@ -39,11 +39,6 @@ impl GrpcConnection {
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
println!(
"\n---------- SENDING -----------------\n{}",
serde_json::to_string_pretty(&req_message).expect("json")
);
let req = req_message.into_request();
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -52,7 +47,6 @@ impl GrpcConnection {
let resp = client.unary(req, path, codec).await.unwrap();
let msg = resp.into_inner();
let response_json = serde_json::to_string_pretty(&msg).expect("json to string");
println!("\n---------- RECEIVING ---------------\n{}", response_json,);
Ok(response_json)
}
@@ -140,11 +134,6 @@ impl GrpcConnection {
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
println!(
"\n---------- SENDING -----------------\n{}",
serde_json::to_string_pretty(&req_message).expect("json")
);
let req = req_message.into_request();
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -184,7 +173,6 @@ impl GrpcManager {
method: &str,
message: &str,
) -> Result<Streaming<DynamicMessage>> {
println!("Server streaming {}", id);
self.connect(id, uri)
.await
.server_streaming(service, method, message)

View File

@@ -182,7 +182,6 @@ async fn cmd_grpc_client_streaming(
app_handle: AppHandle<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
println!("CLIENT STREAMING");
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
@@ -200,7 +199,6 @@ async fn cmd_grpc_client_streaming(
.await
.map_err(|e| e.to_string())?
};
println!("CREATED CONN: {}", conn.clone().id);
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
@@ -300,12 +298,9 @@ async fn cmd_grpc_client_streaming(
});
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
maybe_in_msg_tx.take();
// in_msg_stream.close();
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -324,7 +319,6 @@ async fn cmd_grpc_client_streaming(
async move {
let grpc_handle = app_handle.state::<Mutex<GrpcManager>>();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
println!("STARTING CLIENT STREAM");
let msg = grpc_handle
.lock()
.await
@@ -352,18 +346,34 @@ async fn cmd_grpc_client_streaming(
}
};
println!("ENDED CLIENT STREAM");
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
@@ -440,7 +450,6 @@ async fn cmd_grpc_streaming(
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
@@ -465,7 +474,6 @@ async fn cmd_grpc_streaming(
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Commit,
Cancel,
}
@@ -508,12 +516,7 @@ async fn cmd_grpc_streaming(
);
});
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -574,12 +577,29 @@ async fn cmd_grpc_streaming(
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
@@ -617,6 +637,7 @@ async fn cmd_grpc_server_streaming(
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
@@ -632,6 +653,28 @@ async fn cmd_grpc_server_streaming(
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let req = req.clone();
let conn = conn.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
}
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let (service, method) = match (&req.service, &req.method) {
@@ -649,8 +692,6 @@ async fn cmd_grpc_server_streaming(
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Commit,
Cancel,
}
@@ -664,15 +705,7 @@ async fn cmd_grpc_server_streaming(
}
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(msg)) => {
println!("Received message: {}", msg);
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -688,6 +721,7 @@ async fn cmd_grpc_server_streaming(
let db = db.clone();
let conn_id = conn.clone().id;
let app_handle = app_handle.clone();
let req = req.clone();
async move {
loop {
let req = req.clone();
@@ -724,17 +758,57 @@ async fn cmd_grpc_server_streaming(
}
};
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
},
}
app_handle.unlisten(event_handler);
});
{
let conn = conn.clone();
let req = req.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
}
app_handle.unlisten(event_handler);
});
}
Ok(conn)
}
@@ -1087,7 +1161,6 @@ async fn cmd_update_cookie_jar(
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<CookieJar, String> {
let db = &*db_state.lock().await;
println!("Updating cookie jar {}", cookie_jar.cookies.len());
let updated = upsert_cookie_jar(db, &cookie_jar)
.await