From 1dfdadde988ae138e747f8680f9f5f0c656ab491 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Sun, 4 Feb 2024 14:10:38 -0800 Subject: [PATCH] Bidirectional working --- src-tauri/grpc/src/manager.rs | 7 +- src-tauri/src/main.rs | 134 +++++++++++++++++--- src-web/components/GrpcConnectionLayout.tsx | 4 +- src-web/hooks/useGrpc.ts | 20 ++- 4 files changed, 127 insertions(+), 38 deletions(-) diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs index 611dc450..4e91543f 100644 --- a/src-tauri/grpc/src/manager.rs +++ b/src-tauri/grpc/src/manager.rs @@ -56,7 +56,7 @@ impl GrpcConnection { Ok(response_json) } - pub async fn bidi_streaming( + pub async fn streaming( &self, service: &str, method: &str, @@ -156,7 +156,7 @@ impl GrpcManager { .await } - pub async fn bidi_streaming( + pub async fn streaming( &mut self, id: &str, uri: Uri, @@ -164,10 +164,9 @@ impl GrpcManager { method: &str, stream: ReceiverStream, ) -> Result> { - println!("Bidi streaming {}", id); self.connect(id, uri) .await - .bidi_streaming(service, method, stream) + .streaming(service, method, stream) .await } pub async fn connect(&mut self, id: &str, uri: Uri) -> GrpcConnection { diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 4370c2a3..253cf592 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -193,31 +193,79 @@ async fn cmd_grpc_client_streaming( } #[tauri::command] -async fn cmd_grpc_bidi_streaming( - endpoint: &str, - service: &str, - method: &str, +async fn cmd_grpc_streaming( + request_id: &str, app_handle: AppHandle, grpc_handle: State<'_, Mutex>, + db_state: State<'_, Mutex>>, ) -> Result { + let db = &*db_state.lock().await; + let req = get_grpc_request(db, request_id) + .await + .map_err(|e| e.to_string())?; + let conn = { + let req = req.clone(); + upsert_grpc_connection( + db, + &GrpcConnection { + workspace_id: req.workspace_id, + request_id: req.id, + ..Default::default() + }, + ) + .await + .map_err(|e| e.to_string())? + }; + emit_side_effect(app_handle.clone(), "created_model", conn.clone()); + + { + let conn = conn.clone(); + let req = req.clone(); + let db = db.clone(); + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Initiating connection".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); + }; + let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone())); let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); - let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; - let conn_id = generate_id(Some("grpc")); + let uri = safe_uri(&req.url).map_err(|e| e.to_string())?; let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx); + let (service, method) = { + let req = req.clone(); + match (req.service, req.method) { + (Some(service), Some(method)) => (service, method), + _ => return Err("Service and method are required".to_string()), + } + }; + let mut stream = grpc_handle .lock() .await - .bidi_streaming(&conn_id, uri, service, method, in_msg_stream) + .streaming(&conn.id, uri, &service, &method, in_msg_stream) .await .unwrap(); #[derive(serde::Deserialize)] - enum GrpcMessage { + enum IncomingMsg { Message(String), Commit, Cancel, @@ -225,6 +273,9 @@ async fn cmd_grpc_bidi_streaming( let cb = { let cancelled_rx = cancelled_rx.clone(); + let app_handle = app_handle.clone(); + let conn = conn.clone(); + let req = req.clone(); move |ev: tauri::Event| { if *cancelled_rx.borrow() { @@ -232,16 +283,38 @@ async fn cmd_grpc_bidi_streaming( return; } - match serde_json::from_str::(ev.payload().unwrap()) { - Ok(GrpcMessage::Message(msg)) => { - println!("Received message: {}", msg); - in_msg_tx.try_send(msg).unwrap(); + match serde_json::from_str::(ev.payload().unwrap()) { + Ok(IncomingMsg::Message(msg)) => { + in_msg_tx.try_send(msg.clone()).unwrap(); + let app_handle = app_handle.clone(); + let req = req.clone(); + let conn = conn.clone(); + tauri::async_runtime::spawn(async move { + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: msg, + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); + }); } - Ok(GrpcMessage::Commit) => { + Ok(IncomingMsg::Commit) => { println!("Received commit"); // TODO: Commit client streaming stream } - Ok(GrpcMessage::Cancel) => { + Ok(IncomingMsg::Cancel) => { println!("Received cancel"); cancelled_tx.send_replace(true); } @@ -252,19 +325,38 @@ async fn cmd_grpc_bidi_streaming( } }; let event_handler = - app_handle.listen_global(format!("grpc_client_msg_{}", conn_id).as_str(), cb); + app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb); let grpc_listen = { let app_handle = app_handle.clone(); - let conn_id = conn_id.clone(); + let conn = conn.clone(); + let req = req.clone(); async move { loop { match stream.next().await { Some(Ok(item)) => { let item = serde_json::to_string_pretty(&item).unwrap(); - app_handle - .emit_all(format!("grpc_server_msg_{}", &conn_id).as_str(), item) - .expect("Failed to emit"); + let req = req.clone(); + let conn = conn.clone(); + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: item, + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_server: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); } Some(Err(e)) => { error!("gRPC stream error: {:?}", e); @@ -291,7 +383,7 @@ async fn cmd_grpc_bidi_streaming( app_handle.unlisten(event_handler); }); - Ok(conn_id) + Ok(conn.id) } #[tauri::command] @@ -1434,7 +1526,7 @@ fn main() { cmd_grpc_call_unary, cmd_grpc_client_streaming, cmd_grpc_server_streaming, - cmd_grpc_bidi_streaming, + cmd_grpc_streaming, cmd_grpc_reflect, cmd_import_data, cmd_list_cookie_jars, diff --git a/src-web/components/GrpcConnectionLayout.tsx b/src-web/components/GrpcConnectionLayout.tsx index 1885d51e..199a1672 100644 --- a/src-web/components/GrpcConnectionLayout.tsx +++ b/src-web/components/GrpcConnectionLayout.tsx @@ -61,14 +61,14 @@ export function GrpcConnectionLayout({ style }: Props) { }); } if (activeMethod.clientStreaming && activeMethod.serverStreaming) { - await grpc.bidiStreaming.mutateAsync(activeRequest); + await grpc.streaming.mutateAsync(activeRequest.id); } else if (activeMethod.serverStreaming && !activeMethod.clientStreaming) { await grpc.serverStreaming.mutateAsync(activeRequest.id); } else { await grpc.unary.mutateAsync(activeRequest.id); } }, - [activeMethod, activeRequest, alert, grpc.bidiStreaming, grpc.serverStreaming, grpc.unary], + [activeMethod, activeRequest, alert, grpc.streaming, grpc.serverStreaming, grpc.unary], ); useEffect(() => { diff --git a/src-web/hooks/useGrpc.ts b/src-web/hooks/useGrpc.ts index f599e84f..22cb4937 100644 --- a/src-web/hooks/useGrpc.ts +++ b/src-web/hooks/useGrpc.ts @@ -2,7 +2,7 @@ import { useMutation, useQuery } from '@tanstack/react-query'; import { invoke } from '@tauri-apps/api'; import { emit } from '@tauri-apps/api/event'; import { useEffect, useState } from 'react'; -import type { GrpcConnection, GrpcMessage, GrpcRequest } from '../lib/models'; +import type { GrpcConnection, GrpcMessage } from '../lib/models'; import { useKeyValue } from './useKeyValue'; interface ReflectResponseService { @@ -44,17 +44,14 @@ export function useGrpc(url: string | null, requestId: string | null) { }, }); - const bidiStreaming = useMutation({ - mutationKey: ['grpc_bidi_streaming', url], - mutationFn: async ({ service, method, message, url }) => { + const streaming = useMutation({ + mutationKey: ['grpc_streaming', url], + mutationFn: async (requestId) => { if (url === null) throw new Error('No URL provided'); - const id: string = await invoke('cmd_grpc_bidi_streaming', { - endpoint: url, - service, - method, - message, - }); await messages.set([]); + const id: string = await invoke('cmd_grpc_streaming', { + requestId, + }); setActiveConnectionId(id); }, }); @@ -67,6 +64,7 @@ export function useGrpc(url: string | null, requestId: string | null) { // await messages.set((m) => { // return [...m, { type: 'client', message, timestamp: new Date().toISOString() }]; // }); + console.log('SENDING', activeConnectionId); await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message }); }, }); @@ -90,7 +88,7 @@ export function useGrpc(url: string | null, requestId: string | null) { return { unary, serverStreaming, - bidiStreaming, + streaming, services: reflect.data, cancel, isStreaming: activeConnectionId !== null,