From 184b13cc2a5910bb55632072a2434df8dff04191 Mon Sep 17 00:00:00 2001 From: Gregory Schier Date: Thu, 1 Feb 2024 15:36:50 -0800 Subject: [PATCH] gRPC manager mostly working --- src-tauri/grpc/src/lib.rs | 38 +----- src-tauri/grpc/src/manager.rs | 139 ++++++++++++++++++++ src-tauri/src/main.rs | 63 +++++---- src-web/components/GlobalHooks.tsx | 2 +- src-web/components/GrpcConnectionLayout.tsx | 28 ++-- src-web/components/core/Dropdown.tsx | 2 +- 6 files changed, 197 insertions(+), 75 deletions(-) create mode 100644 src-tauri/grpc/src/manager.rs diff --git a/src-tauri/grpc/src/lib.rs b/src-tauri/grpc/src/lib.rs index 9de2b164..fa746cff 100644 --- a/src-tauri/grpc/src/lib.rs +++ b/src-tauri/grpc/src/lib.rs @@ -11,6 +11,7 @@ use crate::proto::{fill_pool, method_desc_to_path}; mod codec; mod json_schema; mod proto; +pub mod manager; pub fn serialize_options() -> SerializeOptions { SerializeOptions::new().skip_default_fields(false) @@ -32,43 +33,6 @@ pub struct MethodDefinition { pub server_streaming: bool, } -pub async fn unary( - uri: &Uri, - service: &str, - method: &str, - message_json: &str, -) -> Result { - let (pool, conn) = fill_pool(uri).await; - - let service = pool.get_service_by_name(service).unwrap(); - let method = &service.methods().find(|m| m.name() == method).unwrap(); - let input_message = method.input(); - - let mut deserializer = Deserializer::from_str(message_json); - let req_message = - DynamicMessage::deserialize(input_message, &mut deserializer).map_err(|e| e.to_string())?; - deserializer.end().unwrap(); - - let mut client = tonic::client::Grpc::new(conn); - - println!( - "\n---------- SENDING -----------------\n{}", - serde_json::to_string_pretty(&req_message).expect("json") - ); - - let req = req_message.into_request(); - let path = method_desc_to_path(method); - let codec = DynamicCodec::new(method.clone()); - client.ready().await.unwrap(); - - let resp = client.unary(req, path, codec).await.unwrap(); - let msg = resp.into_inner(); - let response_json = serde_json::to_string_pretty(&msg).expect("json to string"); - println!("\n---------- RECEIVING ---------------\n{}", response_json,); - - Ok(response_json) -} - struct ClientStream {} impl Stream for ClientStream { diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs new file mode 100644 index 00000000..43cc7edd --- /dev/null +++ b/src-tauri/grpc/src/manager.rs @@ -0,0 +1,139 @@ +use std::collections::HashMap; + +use prost_reflect::{DescriptorPool, DynamicMessage}; +use serde_json::Deserializer; +use tokio::sync::mpsc; +use tokio_stream::StreamExt; +use tonic::transport::{Channel, Uri}; +use tonic::{IntoRequest, Streaming}; + +use crate::codec::DynamicCodec; +use crate::proto::{fill_pool, method_desc_to_path}; + +type Result = std::result::Result; + +#[derive(Clone)] +pub struct GrpcConnection { + pool: DescriptorPool, + conn: Channel, +} + +impl GrpcConnection { + pub async fn unary(&self, service: &str, method: &str, message: &str) -> Result { + let service = self.pool.get_service_by_name(service).unwrap(); + let method = &service.methods().find(|m| m.name() == method).unwrap(); + let input_message = method.input(); + + let mut deserializer = Deserializer::from_str(message); + let req_message = DynamicMessage::deserialize(input_message, &mut deserializer) + .map_err(|e| e.to_string())?; + deserializer.end().unwrap(); + + let mut client = tonic::client::Grpc::new(self.conn.clone()); + + println!( + "\n---------- SENDING -----------------\n{}", + serde_json::to_string_pretty(&req_message).expect("json") + ); + + let req = req_message.into_request(); + let path = method_desc_to_path(method); + let codec = DynamicCodec::new(method.clone()); + client.ready().await.unwrap(); + + let resp = client.unary(req, path, codec).await.unwrap(); + let msg = resp.into_inner(); + let response_json = serde_json::to_string_pretty(&msg).expect("json to string"); + println!("\n---------- RECEIVING ---------------\n{}", response_json,); + + Ok(response_json) + } + pub async fn server_streaming( + &self, + service: &str, + method: &str, + message: &str, + ) -> Result> { + let service = self.pool.get_service_by_name(service).unwrap(); + let method = &service.methods().find(|m| m.name() == method).unwrap(); + let input_message = method.input(); + + let mut deserializer = Deserializer::from_str(message); + let req_message = DynamicMessage::deserialize(input_message, &mut deserializer) + .map_err(|e| e.to_string())?; + deserializer.end().unwrap(); + + let mut client = tonic::client::Grpc::new(self.conn.clone()); + + println!( + "\n---------- SENDING -----------------\n{}", + serde_json::to_string_pretty(&req_message).expect("json") + ); + + let req = req_message.into_request(); + let path = method_desc_to_path(method); + let codec = DynamicCodec::new(method.clone()); + client.ready().await.unwrap(); + + Ok(client + .server_streaming(req, path, codec) + .await + .map_err(|s| s.to_string())? + .into_inner()) + } +} + +pub struct GrpcManager { + connections: HashMap, + pub send: mpsc::Sender, + pub recv: mpsc::Receiver, +} + +impl Default for GrpcManager { + fn default() -> Self { + let (send, recv) = mpsc::channel(100); + let connections = HashMap::new(); + Self { + connections, + send, + recv, + } + } +} + +impl GrpcManager { + pub async fn server_streaming( + &mut self, + id: &str, + uri: Uri, + service: &str, + method: &str, + message: &str, + ) -> Result> { + println!("Server streaming {}", id); + self.connect(id, uri) + .await + .server_streaming(service, method, message) + .await + + // while let Some(item) = stream.next().await { + // match item { + // Ok(item) => { + // let item = serde_json::to_string_pretty(&item).unwrap(); + // println!("Sending message {}", item); + // self.send.send(item).await.unwrap() + // } + // Err(e) => println!("\terror: {}", e), + // } + // } + + // Ok(()) + } + + pub async fn connect(&mut self, id: &str, uri: Uri) -> GrpcConnection { + let (pool, conn) = fill_pool(&uri).await; + let connection = GrpcConnection { pool, conn }; + self.connections.insert(id.to_string(), connection.clone()); + connection + } +} diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index c47bae9e..ec8895cc 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -35,6 +35,7 @@ use tokio::sync::Mutex; use tokio::time::sleep; use window_shadows::set_shadow; +use grpc::manager::GrpcManager; use grpc::ServiceDefinition; use window_ext::TrafficLightWindowExt; @@ -44,7 +45,7 @@ use crate::models::{ cancel_pending_responses, create_response, delete_all_responses, delete_cookie_jar, delete_environment, delete_folder, delete_request, delete_response, delete_workspace, duplicate_request, find_cookie_jars, find_environments, find_folders, find_requests, - find_responses, find_workspaces, get_cookie_jar, get_environment, get_folder, + find_responses, find_workspaces, generate_id, get_cookie_jar, get_environment, get_folder, get_key_value_raw, get_or_create_settings, get_request, get_response, get_workspace, get_workspace_export_resources, set_key_value_raw, update_response_if_id, update_settings, upsert_cookie_jar, upsert_environment, upsert_folder, upsert_request, upsert_workspace, @@ -89,11 +90,7 @@ async fn migrate_db(app_handle: AppHandle, db: &Mutex>) -> Res } #[tauri::command] -async fn cmd_grpc_reflect( - endpoint: &str, - // app_handle: AppHandle, - // db_state: State<'_, Mutex>>, -) -> Result, String> { +async fn cmd_grpc_reflect(endpoint: &str) -> Result, String> { let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; Ok(grpc::callable(&uri).await) } @@ -104,11 +101,16 @@ async fn cmd_grpc_call_unary( service: &str, method: &str, message: &str, - // app_handle: AppHandle, - // db_state: State<'_, Mutex>>, + grpc_handle: State<'_, Mutex>, ) -> Result { let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; - grpc::unary(&uri, service, method, message).await + grpc_handle + .lock() + .await + .connect("default", uri) + .await + .unary(service, method, message) + .await } #[tauri::command] @@ -135,26 +137,30 @@ async fn cmd_grpc_server_streaming( method: &str, message: &str, app_handle: AppHandle, + grpc_handle: State<'_, Mutex>, ) -> Result { let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; - let mut stream = grpc::server_streaming(&uri, service, method, message) + let conn_id = generate_id(Some("grpc")); + + let mut stream = grpc_handle + .lock() .await - .unwrap() - .into_inner(); + .server_streaming(&conn_id, uri, service, method, message) + .await + .unwrap(); + while let Some(item) = stream.next().await { match item { Ok(item) => { - let s = serde_json::to_string(&item).unwrap(); - emit_side_effect(&app_handle, "grpc_message", s.clone()); - println!("GOt item: {}", s); - } - Err(e) => { - println!("\terror: {}", e); + let item = serde_json::to_string_pretty(&item).unwrap(); + println!("Sending message {}", item); + emit_side_effect(&app_handle, "grpc_message", item); } + Err(e) => println!("\terror: {}", e), } } - Ok("foo".to_string()) + Ok(conn_id) } #[tauri::command] @@ -1002,25 +1008,28 @@ fn main() { let url = format!("sqlite://{}?mode=rwc", p_string); info!("Connecting to database at {}", url); + // Add updater + let yaak_updater = YaakUpdater::new(); + app.manage(Mutex::new(yaak_updater)); + + // Add GRPC manager + let grpc_handle = GrpcManager::default(); + app.manage(Mutex::new(grpc_handle)); + + // Add DB handle tauri::async_runtime::block_on(async move { let pool = SqlitePool::connect(p.to_str().unwrap()) .await .expect("Failed to connect to database"); - - // Setup the DB handle let m = Mutex::new(pool.clone()); migrate_db(app.handle(), &m) .await .expect("Failed to migrate database"); app.manage(m); - - let yaak_updater = YaakUpdater::new(); - app.manage(Mutex::new(yaak_updater)); - let _ = cancel_pending_responses(&pool).await; + }); - Ok(()) - }) + Ok(()) }) .invoke_handler(tauri::generate_handler![ cmd_check_for_updates, diff --git a/src-web/components/GlobalHooks.tsx b/src-web/components/GlobalHooks.tsx index aa60a419..d7a6e993 100644 --- a/src-web/components/GlobalHooks.tsx +++ b/src-web/components/GlobalHooks.tsx @@ -12,8 +12,8 @@ import { requestsQueryKey } from '../hooks/useRequests'; import { useRequestUpdateKey } from '../hooks/useRequestUpdateKey'; import { responsesQueryKey } from '../hooks/useResponses'; import { settingsQueryKey } from '../hooks/useSettings'; -import { useSyncWindowTitle } from '../hooks/useSyncWindowTitle'; import { useSyncAppearance } from '../hooks/useSyncAppearance'; +import { useSyncWindowTitle } from '../hooks/useSyncWindowTitle'; import { workspacesQueryKey } from '../hooks/useWorkspaces'; import { NAMESPACE_NO_SYNC } from '../lib/keyValueStore'; import type { HttpRequest, HttpResponse, Model, Workspace } from '../lib/models'; diff --git a/src-web/components/GrpcConnectionLayout.tsx b/src-web/components/GrpcConnectionLayout.tsx index febed260..2c559c5c 100644 --- a/src-web/components/GrpcConnectionLayout.tsx +++ b/src-web/components/GrpcConnectionLayout.tsx @@ -8,11 +8,13 @@ import type { GrpcMessage } from '../hooks/useGrpc'; import { useGrpc } from '../hooks/useGrpc'; import { useKeyValue } from '../hooks/useKeyValue'; import { Banner } from './core/Banner'; +import { Button } from './core/Button'; import { Editor } from './core/Editor'; import { HotKeyList } from './core/HotKeyList'; import { Icon } from './core/Icon'; import { IconButton } from './core/IconButton'; import { JsonAttributeTree } from './core/JsonAttributeTree'; +import { RadioDropdown } from './core/RadioDropdown'; import { Select } from './core/Select'; import { Separator } from './core/Separator'; import { SplitLayout } from './core/SplitLayout'; @@ -149,7 +151,7 @@ export function GrpcConnectionLayout({ style }: Props) { ref={urlContainerEl} className={classNames( 'grid grid-cols-[minmax(0,1fr)_auto_auto] gap-1.5', - paneSize < 350 && '!grid-cols-1', + paneSize < 400 && '!grid-cols-1', )} > -