Bidirectional working

This commit is contained in:
Gregory Schier
2024-02-04 14:10:38 -08:00
parent 8c15274786
commit 722c8a1c6b
4 changed files with 127 additions and 38 deletions

View File

@@ -56,7 +56,7 @@ impl GrpcConnection {
Ok(response_json) Ok(response_json)
} }
pub async fn bidi_streaming( pub async fn streaming(
&self, &self,
service: &str, service: &str,
method: &str, method: &str,
@@ -156,7 +156,7 @@ impl GrpcManager {
.await .await
} }
pub async fn bidi_streaming( pub async fn streaming(
&mut self, &mut self,
id: &str, id: &str,
uri: Uri, uri: Uri,
@@ -164,10 +164,9 @@ impl GrpcManager {
method: &str, method: &str,
stream: ReceiverStream<String>, stream: ReceiverStream<String>,
) -> Result<Streaming<DynamicMessage>> { ) -> Result<Streaming<DynamicMessage>> {
println!("Bidi streaming {}", id);
self.connect(id, uri) self.connect(id, uri)
.await .await
.bidi_streaming(service, method, stream) .streaming(service, method, stream)
.await .await
} }
pub async fn connect(&mut self, id: &str, uri: Uri) -> GrpcConnection { pub async fn connect(&mut self, id: &str, uri: Uri) -> GrpcConnection {

View File

@@ -193,31 +193,79 @@ async fn cmd_grpc_client_streaming(
} }
#[tauri::command] #[tauri::command]
async fn cmd_grpc_bidi_streaming( async fn cmd_grpc_streaming(
endpoint: &str, request_id: &str,
service: &str,
method: &str,
app_handle: AppHandle<Wry>, app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>, grpc_handle: State<'_, Mutex<GrpcManager>>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<String, String> { ) -> Result<String, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let conn = conn.clone();
let req = req.clone();
let db = db.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16); let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone())); let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let conn_id = generate_id(Some("grpc"));
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx); let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
let (service, method) = {
let req = req.clone();
match (req.service, req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
}
};
let mut stream = grpc_handle let mut stream = grpc_handle
.lock() .lock()
.await .await
.bidi_streaming(&conn_id, uri, service, method, in_msg_stream) .streaming(&conn.id, uri, &service, &method, in_msg_stream)
.await .await
.unwrap(); .unwrap();
#[derive(serde::Deserialize)] #[derive(serde::Deserialize)]
enum GrpcMessage { enum IncomingMsg {
Message(String), Message(String),
Commit, Commit,
Cancel, Cancel,
@@ -225,6 +273,9 @@ async fn cmd_grpc_bidi_streaming(
let cb = { let cb = {
let cancelled_rx = cancelled_rx.clone(); let cancelled_rx = cancelled_rx.clone();
let app_handle = app_handle.clone();
let conn = conn.clone();
let req = req.clone();
move |ev: tauri::Event| { move |ev: tauri::Event| {
if *cancelled_rx.borrow() { if *cancelled_rx.borrow() {
@@ -232,16 +283,38 @@ async fn cmd_grpc_bidi_streaming(
return; return;
} }
match serde_json::from_str::<GrpcMessage>(ev.payload().unwrap()) { match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(GrpcMessage::Message(msg)) => { Ok(IncomingMsg::Message(msg)) => {
println!("Received message: {}", msg); in_msg_tx.try_send(msg.clone()).unwrap();
in_msg_tx.try_send(msg).unwrap(); let app_handle = app_handle.clone();
let req = req.clone();
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
});
} }
Ok(GrpcMessage::Commit) => { Ok(IncomingMsg::Commit) => {
println!("Received commit"); println!("Received commit");
// TODO: Commit client streaming stream // TODO: Commit client streaming stream
} }
Ok(GrpcMessage::Cancel) => { Ok(IncomingMsg::Cancel) => {
println!("Received cancel"); println!("Received cancel");
cancelled_tx.send_replace(true); cancelled_tx.send_replace(true);
} }
@@ -252,19 +325,38 @@ async fn cmd_grpc_bidi_streaming(
} }
}; };
let event_handler = let event_handler =
app_handle.listen_global(format!("grpc_client_msg_{}", conn_id).as_str(), cb); app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = { let grpc_listen = {
let app_handle = app_handle.clone(); let app_handle = app_handle.clone();
let conn_id = conn_id.clone(); let conn = conn.clone();
let req = req.clone();
async move { async move {
loop { loop {
match stream.next().await { match stream.next().await {
Some(Ok(item)) => { Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap(); let item = serde_json::to_string_pretty(&item).unwrap();
app_handle let req = req.clone();
.emit_all(format!("grpc_server_msg_{}", &conn_id).as_str(), item) let conn = conn.clone();
.expect("Failed to emit"); let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
} }
Some(Err(e)) => { Some(Err(e)) => {
error!("gRPC stream error: {:?}", e); error!("gRPC stream error: {:?}", e);
@@ -291,7 +383,7 @@ async fn cmd_grpc_bidi_streaming(
app_handle.unlisten(event_handler); app_handle.unlisten(event_handler);
}); });
Ok(conn_id) Ok(conn.id)
} }
#[tauri::command] #[tauri::command]
@@ -1434,7 +1526,7 @@ fn main() {
cmd_grpc_call_unary, cmd_grpc_call_unary,
cmd_grpc_client_streaming, cmd_grpc_client_streaming,
cmd_grpc_server_streaming, cmd_grpc_server_streaming,
cmd_grpc_bidi_streaming, cmd_grpc_streaming,
cmd_grpc_reflect, cmd_grpc_reflect,
cmd_import_data, cmd_import_data,
cmd_list_cookie_jars, cmd_list_cookie_jars,

View File

@@ -61,14 +61,14 @@ export function GrpcConnectionLayout({ style }: Props) {
}); });
} }
if (activeMethod.clientStreaming && activeMethod.serverStreaming) { if (activeMethod.clientStreaming && activeMethod.serverStreaming) {
await grpc.bidiStreaming.mutateAsync(activeRequest); await grpc.streaming.mutateAsync(activeRequest.id);
} else if (activeMethod.serverStreaming && !activeMethod.clientStreaming) { } else if (activeMethod.serverStreaming && !activeMethod.clientStreaming) {
await grpc.serverStreaming.mutateAsync(activeRequest.id); await grpc.serverStreaming.mutateAsync(activeRequest.id);
} else { } else {
await grpc.unary.mutateAsync(activeRequest.id); await grpc.unary.mutateAsync(activeRequest.id);
} }
}, },
[activeMethod, activeRequest, alert, grpc.bidiStreaming, grpc.serverStreaming, grpc.unary], [activeMethod, activeRequest, alert, grpc.streaming, grpc.serverStreaming, grpc.unary],
); );
useEffect(() => { useEffect(() => {

View File

@@ -2,7 +2,7 @@ import { useMutation, useQuery } from '@tanstack/react-query';
import { invoke } from '@tauri-apps/api'; import { invoke } from '@tauri-apps/api';
import { emit } from '@tauri-apps/api/event'; import { emit } from '@tauri-apps/api/event';
import { useEffect, useState } from 'react'; import { useEffect, useState } from 'react';
import type { GrpcConnection, GrpcMessage, GrpcRequest } from '../lib/models'; import type { GrpcConnection, GrpcMessage } from '../lib/models';
import { useKeyValue } from './useKeyValue'; import { useKeyValue } from './useKeyValue';
interface ReflectResponseService { interface ReflectResponseService {
@@ -44,17 +44,14 @@ export function useGrpc(url: string | null, requestId: string | null) {
}, },
}); });
const bidiStreaming = useMutation<void, string, GrpcRequest>({ const streaming = useMutation<void, string, string>({
mutationKey: ['grpc_bidi_streaming', url], mutationKey: ['grpc_streaming', url],
mutationFn: async ({ service, method, message, url }) => { mutationFn: async (requestId) => {
if (url === null) throw new Error('No URL provided'); if (url === null) throw new Error('No URL provided');
const id: string = await invoke('cmd_grpc_bidi_streaming', {
endpoint: url,
service,
method,
message,
});
await messages.set([]); await messages.set([]);
const id: string = await invoke('cmd_grpc_streaming', {
requestId,
});
setActiveConnectionId(id); setActiveConnectionId(id);
}, },
}); });
@@ -67,6 +64,7 @@ export function useGrpc(url: string | null, requestId: string | null) {
// await messages.set((m) => { // await messages.set((m) => {
// return [...m, { type: 'client', message, timestamp: new Date().toISOString() }]; // return [...m, { type: 'client', message, timestamp: new Date().toISOString() }];
// }); // });
console.log('SENDING', activeConnectionId);
await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message }); await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message });
}, },
}); });
@@ -90,7 +88,7 @@ export function useGrpc(url: string | null, requestId: string | null) {
return { return {
unary, unary,
serverStreaming, serverStreaming,
bidiStreaming, streaming,
services: reflect.data, services: reflect.data,
cancel, cancel,
isStreaming: activeConnectionId !== null, isStreaming: activeConnectionId !== null,