Client streaming working

This commit is contained in:
Gregory Schier
2024-02-04 17:53:15 -08:00
parent 1dfdadde98
commit 4d2b101278
5 changed files with 353 additions and 86 deletions

View File

@@ -177,19 +177,217 @@ async fn cmd_grpc_call_unary(
#[tauri::command]
async fn cmd_grpc_client_streaming(
endpoint: &str,
service: &str,
method: &str,
message: &str,
// app_handle: AppHandle<Wry>,
// db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<(), String> {
let service = service.to_string();
let method = method.to_string();
let message = message.to_string();
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
tokio::spawn(async move { grpc::client_streaming(&uri, &service, &method, &message).await });
Ok(())
request_id: &str,
grpc_handle: State<'_, Mutex<GrpcManager>>,
app_handle: AppHandle<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
println!("CLIENT STREAMING");
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
println!("CREATED CONN: {}", conn.clone().id);
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
let (service, method) = {
let req = req.clone();
match (req.service, req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
}
};
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Commit,
Cancel,
}
let cb = {
let cancelled_rx = cancelled_rx.clone();
let app_handle = app_handle.clone();
let conn = conn.clone();
let req = req.clone();
move |ev: tauri::Event| {
if *cancelled_rx.borrow() {
// Stream is cancelled
return;
}
let mut maybe_in_msg_tx = maybe_in_msg_tx
.lock()
.expect("previous holder not to panic");
let in_msg_tx = if let Some(in_msg_tx) = maybe_in_msg_tx.as_ref() {
in_msg_tx
} else {
// This would mean that the stream is already committed because
// we have already dropped the sending half
return;
};
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(msg)) => {
in_msg_tx.try_send(msg.clone()).unwrap();
let app_handle = app_handle.clone();
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
});
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
maybe_in_msg_tx.take();
// in_msg_stream.close();
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
error!("Failed to parse gRPC message: {:?}", e);
}
}
}
};
let event_handler =
app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let app_handle = app_handle.clone();
let conn = conn.clone();
let req = req.clone();
async move {
let grpc_handle = app_handle.state::<Mutex<GrpcManager>>();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
println!("STARTING CLIENT STREAM");
let msg = grpc_handle
.lock()
.await
.client_streaming(&conn.id, uri, &service, &method, in_msg_stream)
.await
.unwrap();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
db,
&GrpcMessage {
message: msg.to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
}
};
println!("ENDED CLIENT STREAM");
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
}
app_handle.unlisten(event_handler);
});
};
Ok(conn)
}
#[tauri::command]
@@ -371,17 +569,39 @@ async fn cmd_grpc_streaming(
}
};
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
},
}
app_handle.unlisten(event_handler);
});
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
}
app_handle.unlisten(event_handler);
});
};
Ok(conn.id)
}