diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs index 14fd610e..70c38e32 100644 --- a/src-tauri/grpc/src/manager.rs +++ b/src-tauri/grpc/src/manager.rs @@ -39,11 +39,6 @@ impl GrpcConnection { let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); - println!( - "\n---------- SENDING -----------------\n{}", - serde_json::to_string_pretty(&req_message).expect("json") - ); - let req = req_message.into_request(); let path = method_desc_to_path(method); let codec = DynamicCodec::new(method.clone()); @@ -52,7 +47,6 @@ impl GrpcConnection { let resp = client.unary(req, path, codec).await.unwrap(); let msg = resp.into_inner(); let response_json = serde_json::to_string_pretty(&msg).expect("json to string"); - println!("\n---------- RECEIVING ---------------\n{}", response_json,); Ok(response_json) } @@ -140,11 +134,6 @@ impl GrpcConnection { let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); - println!( - "\n---------- SENDING -----------------\n{}", - serde_json::to_string_pretty(&req_message).expect("json") - ); - let req = req_message.into_request(); let path = method_desc_to_path(method); let codec = DynamicCodec::new(method.clone()); @@ -184,7 +173,6 @@ impl GrpcManager { method: &str, message: &str, ) -> Result> { - println!("Server streaming {}", id); self.connect(id, uri) .await .server_streaming(service, method, message) diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 96f69812..16f0a512 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -182,7 +182,6 @@ async fn cmd_grpc_client_streaming( app_handle: AppHandle, db_state: State<'_, Mutex>>, ) -> Result { - println!("CLIENT STREAMING"); let db = &*db_state.lock().await; let req = get_grpc_request(db, request_id) .await @@ -200,7 +199,6 @@ async fn cmd_grpc_client_streaming( .await .map_err(|e| e.to_string())? }; - println!("CREATED CONN: {}", conn.clone().id); emit_side_effect(app_handle.clone(), "created_model", conn.clone()); { @@ -300,12 +298,9 @@ async fn cmd_grpc_client_streaming( }); } Ok(IncomingMsg::Commit) => { - println!("Received commit"); maybe_in_msg_tx.take(); - // in_msg_stream.close(); } Ok(IncomingMsg::Cancel) => { - println!("Received cancel"); cancelled_tx.send_replace(true); } Err(e) => { @@ -324,7 +319,6 @@ async fn cmd_grpc_client_streaming( async move { let grpc_handle = app_handle.state::>(); let db_state = app_handle.state::>>(); - println!("STARTING CLIENT STREAM"); let msg = grpc_handle .lock() .await @@ -352,18 +346,34 @@ async fn cmd_grpc_client_streaming( } }; - println!("ENDED CLIENT STREAM"); { let conn = conn.clone(); tauri::async_runtime::spawn(async move { tokio::select! { _ = grpc_listen => { - debug!("gRPC listen finished"); + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Connection completed".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); }, _ = cancelled_rx.changed() => { - debug!("gRPC connection cancelled"); - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; emit_side_effect( app_handle.clone(), "created_model", @@ -440,7 +450,6 @@ async fn cmd_grpc_streaming( }; let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); - let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone())); let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); let uri = safe_uri(&req.url).map_err(|e| e.to_string())?; @@ -465,7 +474,6 @@ async fn cmd_grpc_streaming( #[derive(serde::Deserialize)] enum IncomingMsg { Message(String), - Commit, Cancel, } @@ -508,12 +516,7 @@ async fn cmd_grpc_streaming( ); }); } - Ok(IncomingMsg::Commit) => { - println!("Received commit"); - // TODO: Commit client streaming stream - } Ok(IncomingMsg::Cancel) => { - println!("Received cancel"); cancelled_tx.send_replace(true); } Err(e) => { @@ -574,12 +577,29 @@ async fn cmd_grpc_streaming( tauri::async_runtime::spawn(async move { tokio::select! { _ = grpc_listen => { - debug!("gRPC listen finished"); + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Connection completed".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); }, _ = cancelled_rx.changed() => { - debug!("gRPC connection cancelled"); - let db_state = app_handle.state::>>(); - let db = &*db_state.lock().await; + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; emit_side_effect( app_handle.clone(), "created_model", @@ -617,6 +637,7 @@ async fn cmd_grpc_server_streaming( let req = get_grpc_request(db, request_id) .await .map_err(|e| e.to_string())?; + let conn = { let req = req.clone(); upsert_grpc_connection( @@ -632,6 +653,28 @@ async fn cmd_grpc_server_streaming( }; emit_side_effect(app_handle.clone(), "created_model", conn.clone()); + { + let req = req.clone(); + let conn = conn.clone(); + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Initiating connection".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); + } + let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); let (service, method) = match (&req.service, &req.method) { @@ -649,8 +692,6 @@ async fn cmd_grpc_server_streaming( #[derive(serde::Deserialize)] enum IncomingMsg { - Message(String), - Commit, Cancel, } @@ -664,15 +705,7 @@ async fn cmd_grpc_server_streaming( } match serde_json::from_str::(ev.payload().unwrap()) { - Ok(IncomingMsg::Message(msg)) => { - println!("Received message: {}", msg); - } - Ok(IncomingMsg::Commit) => { - println!("Received commit"); - // TODO: Commit client streaming stream - } Ok(IncomingMsg::Cancel) => { - println!("Received cancel"); cancelled_tx.send_replace(true); } Err(e) => { @@ -688,6 +721,7 @@ async fn cmd_grpc_server_streaming( let db = db.clone(); let conn_id = conn.clone().id; let app_handle = app_handle.clone(); + let req = req.clone(); async move { loop { let req = req.clone(); @@ -724,17 +758,57 @@ async fn cmd_grpc_server_streaming( } }; - tauri::async_runtime::spawn(async move { - tokio::select! { - _ = grpc_listen => { - debug!("gRPC listen finished"); - }, - _ = cancelled_rx.changed() => { - debug!("gRPC connection cancelled"); - }, - } - app_handle.unlisten(event_handler); - }); + { + let conn = conn.clone(); + let req = req.clone(); + tauri::async_runtime::spawn(async move { + tokio::select! { + _ = grpc_listen => { + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Connection completed".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); + }, + _ = cancelled_rx.changed() => { + let db_state = app_handle.state::>>(); + let db = &*db_state.lock().await; + emit_side_effect( + app_handle.clone(), + "created_model", + upsert_grpc_message( + &db, + &GrpcMessage { + message: "Connection cancelled".to_string(), + workspace_id: req.workspace_id, + request_id: req.id, + connection_id: conn.id, + is_info: true, + ..Default::default() + }, + ) + .await + .expect("Failed to upsert message"), + ); + }, + } + app_handle.unlisten(event_handler); + }); + } Ok(conn) } @@ -1087,7 +1161,6 @@ async fn cmd_update_cookie_jar( db_state: State<'_, Mutex>>, ) -> Result { let db = &*db_state.lock().await; - println!("Updating cookie jar {}", cookie_jar.cookies.len()); let updated = upsert_cookie_jar(db, &cookie_jar) .await diff --git a/src-web/components/GrpcConnectionLayout.tsx b/src-web/components/GrpcConnectionLayout.tsx index 5d9f6664..10aedda7 100644 --- a/src-web/components/GrpcConnectionLayout.tsx +++ b/src-web/components/GrpcConnectionLayout.tsx @@ -44,14 +44,6 @@ export function GrpcConnectionLayout({ style }: Props) { return s.methods.find((m) => m.name === activeRequest.method); }, [activeRequest, grpc.services]); - const handleCancel = useCallback(() => { - grpc.cancel.mutateAsync().catch(console.error); - }, [grpc.cancel]); - - const handleCommit = useCallback(() => { - grpc.commit.mutateAsync().catch(console.error); - }, [grpc.commit]); - const handleConnect = useCallback( async (e: FormEvent) => { e.preventDefault(); @@ -71,7 +63,8 @@ export function GrpcConnectionLayout({ style }: Props) { } else if (activeMethod.clientStreaming && !activeMethod.serverStreaming) { await grpc.clientStreaming.mutateAsync(activeRequest.id); } else { - await grpc.unary.mutateAsync(activeRequest.id); + const msg = await grpc.unary.mutateAsync(activeRequest.id); + setActiveMessageId(msg.id); } }, [ @@ -148,6 +141,15 @@ export function GrpcConnectionLayout({ style }: Props) { [activeMessageId, messages], ); + const messageType: 'unary' | 'server_streaming' | 'client_streaming' | 'streaming' = + useMemo(() => { + if (activeMethod == null) return 'unary'; // Good enough + if (activeMethod.clientStreaming && activeMethod.serverStreaming) return 'streaming'; + if (activeMethod.clientStreaming) return 'client_streaming'; + if (activeMethod.serverStreaming) return 'server_streaming'; + return 'unary'; + }, [activeMethod]); + if (activeRequest == null) { return null; } @@ -201,21 +203,40 @@ export function GrpcConnectionLayout({ style }: Props) { grpc.cancel.mutateAsync() : handleConnect} + disabled={grpc.isStreaming} + spin={grpc.isStreaming || grpc.unary.isLoading} icon={ grpc.isStreaming - ? 'x' - : !activeMethod?.clientStreaming && activeMethod?.serverStreaming - ? 'arrowDownToDot' - : activeMethod?.clientStreaming && !activeMethod?.serverStreaming - ? 'arrowUpFromDot' - : activeMethod?.clientStreaming && activeMethod?.serverStreaming - ? 'arrowUpDown' - : 'sendHorizontal' + ? 'refresh' + : messageType === 'unary' + ? 'sendHorizontal' + : 'arrowUpDown' } /> + {grpc.isStreaming && ( + grpc.cancel.mutateAsync()} + icon="x" + disabled={!grpc.isStreaming} + /> + )} + {activeMethod?.clientStreaming && + !activeMethod.serverStreaming && + grpc.isStreaming && ( + grpc.commit.mutateAsync()} + icon="check" + /> + )} {activeMethod?.clientStreaming && grpc.isStreaming && ( )} - {activeMethod?.clientStreaming && - !activeMethod.serverStreaming && - grpc.isStreaming && ( - - )} ) : messages.length >= 0 ? ( ( -
+
{...messages.map((m) => ( ) => , diff --git a/src-web/hooks/useGrpc.ts b/src-web/hooks/useGrpc.ts index 32132ec2..3cfc51b2 100644 --- a/src-web/hooks/useGrpc.ts +++ b/src-web/hooks/useGrpc.ts @@ -29,7 +29,6 @@ export function useGrpc(url: string | null, requestId: string | null) { requestId: id, })) as GrpcMessage; await messages.set([message]); - console.log('MESSAGE', message); return message; }, }); @@ -40,7 +39,6 @@ export function useGrpc(url: string | null, requestId: string | null) { if (url === null) throw new Error('No URL provided'); await messages.set([]); const c = (await invoke('cmd_grpc_client_streaming', { requestId })) as GrpcConnection; - console.log('GOT CONNECTION', c); setActiveConnectionId(c.id); }, }); @@ -72,10 +70,6 @@ export function useGrpc(url: string | null, requestId: string | null) { mutationFn: async ({ message }: { message: string }) => { if (activeConnectionId == null) throw new Error('No active connection'); await messages.set([]); - // await messages.set((m) => { - // return [...m, { type: 'client', message, timestamp: new Date().toISOString() }]; - // }); - console.log('SENDING', activeConnectionId); await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message }); }, });