Bidirectional working

This commit is contained in:
Gregory Schier
2024-02-04 14:10:38 -08:00
parent d19729869e
commit 1dfdadde98
4 changed files with 127 additions and 38 deletions

View File

@@ -56,7 +56,7 @@ impl GrpcConnection {
Ok(response_json)
}
pub async fn bidi_streaming(
pub async fn streaming(
&self,
service: &str,
method: &str,
@@ -156,7 +156,7 @@ impl GrpcManager {
.await
}
pub async fn bidi_streaming(
pub async fn streaming(
&mut self,
id: &str,
uri: Uri,
@@ -164,10 +164,9 @@ impl GrpcManager {
method: &str,
stream: ReceiverStream<String>,
) -> Result<Streaming<DynamicMessage>> {
println!("Bidi streaming {}", id);
self.connect(id, uri)
.await
.bidi_streaming(service, method, stream)
.streaming(service, method, stream)
.await
}
pub async fn connect(&mut self, id: &str, uri: Uri) -> GrpcConnection {

View File

@@ -193,31 +193,79 @@ async fn cmd_grpc_client_streaming(
}
#[tauri::command]
async fn cmd_grpc_bidi_streaming(
endpoint: &str,
service: &str,
method: &str,
async fn cmd_grpc_streaming(
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<String, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
let conn_id = generate_id(Some("grpc"));
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
let (service, method) = {
let req = req.clone();
match (req.service, req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
}
};
let mut stream = grpc_handle
.lock()
.await
.bidi_streaming(&conn_id, uri, service, method, in_msg_stream)
.streaming(&conn.id, uri, &service, &method, in_msg_stream)
.await
.unwrap();
#[derive(serde::Deserialize)]
enum GrpcMessage {
enum IncomingMsg {
Message(String),
Commit,
Cancel,
@@ -225,6 +273,9 @@ async fn cmd_grpc_bidi_streaming(
let cb = {
let cancelled_rx = cancelled_rx.clone();
let app_handle = app_handle.clone();
let conn = conn.clone();
let req = req.clone();
move |ev: tauri::Event| {
if *cancelled_rx.borrow() {
@@ -232,16 +283,38 @@ async fn cmd_grpc_bidi_streaming(
return;
}
match serde_json::from_str::<GrpcMessage>(ev.payload().unwrap()) {
Ok(GrpcMessage::Message(msg)) => {
println!("Received message: {}", msg);
in_msg_tx.try_send(msg).unwrap();
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(msg)) => {
in_msg_tx.try_send(msg.clone()).unwrap();
let app_handle = app_handle.clone();
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
});
}
Ok(GrpcMessage::Commit) => {
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(GrpcMessage::Cancel) => {
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
@@ -252,19 +325,38 @@ async fn cmd_grpc_bidi_streaming(
}
};
let event_handler =
app_handle.listen_global(format!("grpc_client_msg_{}", conn_id).as_str(), cb);
app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let app_handle = app_handle.clone();
let conn_id = conn_id.clone();
let conn = conn.clone();
let req = req.clone();
async move {
loop {
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
app_handle
.emit_all(format!("grpc_server_msg_{}", &conn_id).as_str(), item)
.expect("Failed to emit");
let req = req.clone();
let conn = conn.clone();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -291,7 +383,7 @@ async fn cmd_grpc_bidi_streaming(
app_handle.unlisten(event_handler);
});
Ok(conn_id)
Ok(conn.id)
}
#[tauri::command]
@@ -1434,7 +1526,7 @@ fn main() {
cmd_grpc_call_unary,
cmd_grpc_client_streaming,
cmd_grpc_server_streaming,
cmd_grpc_bidi_streaming,
cmd_grpc_streaming,
cmd_grpc_reflect,
cmd_import_data,
cmd_list_cookie_jars,