mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-17 23:14:03 +01:00
bidi hacked!
This commit is contained in:
@@ -130,6 +130,104 @@ async fn cmd_grpc_client_streaming(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
async fn cmd_grpc_bidi_streaming(
|
||||
endpoint: &str,
|
||||
service: &str,
|
||||
method: &str,
|
||||
app_handle: AppHandle<Wry>,
|
||||
grpc_handle: State<'_, Mutex<GrpcManager>>,
|
||||
) -> Result<String, String> {
|
||||
let (in_msg_tx, mut in_msg_rx) = tauri::async_runtime::channel::<String>(16);
|
||||
let maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
|
||||
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
|
||||
|
||||
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
|
||||
let conn_id = generate_id(Some("grpc"));
|
||||
|
||||
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
|
||||
|
||||
let mut stream = grpc_handle
|
||||
.lock()
|
||||
.await
|
||||
.bidi_streaming(&conn_id, uri, service, method, in_msg_stream)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
#[derive(serde::Deserialize)]
|
||||
enum GrpcMessage {
|
||||
Message(String),
|
||||
Commit,
|
||||
Cancel,
|
||||
}
|
||||
|
||||
let cb = {
|
||||
let cancelled_rx = cancelled_rx.clone();
|
||||
|
||||
move |ev: tauri::Event| {
|
||||
if *cancelled_rx.borrow() {
|
||||
// Stream is cancelled
|
||||
return;
|
||||
}
|
||||
|
||||
match serde_json::from_str::<GrpcMessage>(ev.payload().unwrap()) {
|
||||
Ok(GrpcMessage::Message(msg)) => {
|
||||
println!("Received message: {}", msg);
|
||||
in_msg_tx.try_send(msg).unwrap();
|
||||
}
|
||||
Ok(GrpcMessage::Commit) => {
|
||||
println!("Received commit");
|
||||
// TODO: Commit client streaming stream
|
||||
}
|
||||
Ok(GrpcMessage::Cancel) => {
|
||||
println!("Received cancel");
|
||||
cancelled_tx.send_replace(true);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to parse gRPC message: {:?}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
let event_handler = app_handle.listen_global("grpc_message_in", cb);
|
||||
|
||||
let app_handle2 = app_handle.clone();
|
||||
let grpc_listen = async move {
|
||||
loop {
|
||||
match stream.next().await {
|
||||
Some(Ok(item)) => {
|
||||
let item = serde_json::to_string_pretty(&item).unwrap();
|
||||
app_handle2
|
||||
.emit_all("grpc_message", item)
|
||||
.expect("Failed to emit");
|
||||
}
|
||||
Some(Err(e)) => {
|
||||
error!("gRPC stream error: {:?}", e);
|
||||
// TODO: Handle error
|
||||
}
|
||||
None => {
|
||||
info!("gRPC stream closed by sender");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tauri::async_runtime::spawn(async move {
|
||||
tokio::select! {
|
||||
_ = grpc_listen => {
|
||||
debug!("gRPC listen finished");
|
||||
},
|
||||
_ = cancelled_rx.changed() => {
|
||||
debug!("gRPC connection cancelled");
|
||||
},
|
||||
}
|
||||
app_handle.unlisten(event_handler);
|
||||
});
|
||||
|
||||
Ok(conn_id)
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
async fn cmd_grpc_server_streaming(
|
||||
endpoint: &str,
|
||||
@@ -1119,6 +1217,7 @@ fn main() {
|
||||
cmd_grpc_call_unary,
|
||||
cmd_grpc_client_streaming,
|
||||
cmd_grpc_server_streaming,
|
||||
cmd_grpc_bidi_streaming,
|
||||
cmd_grpc_reflect,
|
||||
cmd_import_data,
|
||||
cmd_list_cookie_jars,
|
||||
|
||||
Reference in New Issue
Block a user