diff --git a/src-tauri/.sqlx/query-196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59.json b/src-tauri/.sqlx/query-20d6b878bb8d16bde3e78e22cf801b5b191905d867091bb54a210256a0145a17.json similarity index 65% rename from src-tauri/.sqlx/query-196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59.json rename to src-tauri/.sqlx/query-20d6b878bb8d16bde3e78e22cf801b5b191905d867091bb54a210256a0145a17.json index 572209d2..e6aaae04 100644 --- a/src-tauri/.sqlx/query-196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59.json +++ b/src-tauri/.sqlx/query-20d6b878bb8d16bde3e78e22cf801b5b191905d867091bb54a210256a0145a17.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE connection_id = ?\n ", + "query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, content,\n event_type AS \"event_type!: GrpcEventType\",\n metadata AS \"metadata!: sqlx::types::Json>\"\n FROM grpc_events\n WHERE id = ?\n ", "describe": { "columns": [ { @@ -34,19 +34,19 @@ "type_info": "Datetime" }, { - "name": "message", + "name": "content", "ordinal": 6, "type_info": "Text" }, { - "name": "is_server", + "name": "event_type!: GrpcEventType", "ordinal": 7, - "type_info": "Bool" + "type_info": "Text" }, { - "name": "is_info", + "name": "metadata!: sqlx::types::Json>", "ordinal": 8, - "type_info": "Bool" + "type_info": "Text" } ], "parameters": { @@ -64,5 +64,5 @@ false ] }, - "hash": "196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59" + "hash": "20d6b878bb8d16bde3e78e22cf801b5b191905d867091bb54a210256a0145a17" } diff --git a/src-tauri/.sqlx/query-3dce053aef78e831db2369f3c49e891cb8a9e1ba6e7a60fe9e24292a3f97dca3.json b/src-tauri/.sqlx/query-3dce053aef78e831db2369f3c49e891cb8a9e1ba6e7a60fe9e24292a3f97dca3.json new file mode 100644 index 00000000..009959d0 --- /dev/null +++ b/src-tauri/.sqlx/query-3dce053aef78e831db2369f3c49e891cb8a9e1ba6e7a60fe9e24292a3f97dca3.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO grpc_events (\n id, workspace_id, request_id, connection_id, content, event_type, metadata\n )\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n content = excluded.content,\n event_type = excluded.event_type,\n metadata = excluded.metadata\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 7 + }, + "nullable": [] + }, + "hash": "3dce053aef78e831db2369f3c49e891cb8a9e1ba6e7a60fe9e24292a3f97dca3" +} diff --git a/src-tauri/.sqlx/query-80a85f83d0946d532a60f0add87aa0ade7e35a6b56cb058e2caf9ca005ce6407.json b/src-tauri/.sqlx/query-3e8651cca7feecc208a676dfd24c7d8775040d5287c16890056dcb474674edfb.json similarity index 60% rename from src-tauri/.sqlx/query-80a85f83d0946d532a60f0add87aa0ade7e35a6b56cb058e2caf9ca005ce6407.json rename to src-tauri/.sqlx/query-3e8651cca7feecc208a676dfd24c7d8775040d5287c16890056dcb474674edfb.json index 9cacb528..8b2e3954 100644 --- a/src-tauri/.sqlx/query-80a85f83d0946d532a60f0add87aa0ade7e35a6b56cb058e2caf9ca005ce6407.json +++ b/src-tauri/.sqlx/query-3e8651cca7feecc208a676dfd24c7d8775040d5287c16890056dcb474674edfb.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed\n FROM grpc_connections\n WHERE request_id = ?\n ORDER BY created_at DESC\n ", + "query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed, status, error, url,\n trailers AS \"trailers!: sqlx::types::Json>\"\n FROM grpc_connections\n WHERE request_id = ?\n ORDER BY created_at DESC\n ", "describe": { "columns": [ { @@ -47,6 +47,26 @@ "name": "elapsed", "ordinal": 8, "type_info": "Int64" + }, + { + "name": "status", + "ordinal": 9, + "type_info": "Int64" + }, + { + "name": "error", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "url", + "ordinal": 11, + "type_info": "Text" + }, + { + "name": "trailers!: sqlx::types::Json>", + "ordinal": 12, + "type_info": "Text" } ], "parameters": { @@ -61,8 +81,12 @@ false, false, false, + false, + false, + true, + false, false ] }, - "hash": "80a85f83d0946d532a60f0add87aa0ade7e35a6b56cb058e2caf9ca005ce6407" + "hash": "3e8651cca7feecc208a676dfd24c7d8775040d5287c16890056dcb474674edfb" } diff --git a/src-tauri/.sqlx/query-4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460.json b/src-tauri/.sqlx/query-4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460.json deleted file mode 100644 index 0329c901..00000000 --- a/src-tauri/.sqlx/query-4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO grpc_messages (\n id, workspace_id, request_id, connection_id, message, is_server, is_info\n )\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n message = excluded.message,\n is_server = excluded.is_server,\n is_info = excluded.is_info\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 7 - }, - "nullable": [] - }, - "hash": "4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460" -} diff --git a/src-tauri/.sqlx/query-66deed028199c78ed15ea2f837907887c2a2cb564d1d076dd4ebf0ecbc82e098.json b/src-tauri/.sqlx/query-66deed028199c78ed15ea2f837907887c2a2cb564d1d076dd4ebf0ecbc82e098.json new file mode 100644 index 00000000..a06e03b3 --- /dev/null +++ b/src-tauri/.sqlx/query-66deed028199c78ed15ea2f837907887c2a2cb564d1d076dd4ebf0ecbc82e098.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method, elapsed,\n status, error, trailers, url\n )\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method,\n elapsed = excluded.elapsed,\n status = excluded.status,\n error = excluded.error,\n trailers = excluded.trailers,\n url = excluded.url\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 10 + }, + "nullable": [] + }, + "hash": "66deed028199c78ed15ea2f837907887c2a2cb564d1d076dd4ebf0ecbc82e098" +} diff --git a/src-tauri/.sqlx/query-3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c.json b/src-tauri/.sqlx/query-737045ddd5f8ba3454425e82b9d3943f93649742d8f78613e01d322745e47ebd.json similarity index 64% rename from src-tauri/.sqlx/query-3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c.json rename to src-tauri/.sqlx/query-737045ddd5f8ba3454425e82b9d3943f93649742d8f78613e01d322745e47ebd.json index c7bf9ae3..d20b2dc2 100644 --- a/src-tauri/.sqlx/query-3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c.json +++ b/src-tauri/.sqlx/query-737045ddd5f8ba3454425e82b9d3943f93649742d8f78613e01d322745e47ebd.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE id = ?\n ", + "query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, content,\n event_type AS \"event_type!: GrpcEventType\",\n metadata AS \"metadata!: sqlx::types::Json>\"\n FROM grpc_events\n WHERE connection_id = ?\n ", "describe": { "columns": [ { @@ -34,19 +34,19 @@ "type_info": "Datetime" }, { - "name": "message", + "name": "content", "ordinal": 6, "type_info": "Text" }, { - "name": "is_server", + "name": "event_type!: GrpcEventType", "ordinal": 7, - "type_info": "Bool" + "type_info": "Text" }, { - "name": "is_info", + "name": "metadata!: sqlx::types::Json>", "ordinal": 8, - "type_info": "Bool" + "type_info": "Text" } ], "parameters": { @@ -64,5 +64,5 @@ false ] }, - "hash": "3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c" + "hash": "737045ddd5f8ba3454425e82b9d3943f93649742d8f78613e01d322745e47ebd" } diff --git a/src-tauri/.sqlx/query-9d7bc2b0eb0c09652d9826db4a7ae47591405e1b5bec1229f2e2734c73e66163.json b/src-tauri/.sqlx/query-9d7bc2b0eb0c09652d9826db4a7ae47591405e1b5bec1229f2e2734c73e66163.json deleted file mode 100644 index e6d6e8bb..00000000 --- a/src-tauri/.sqlx/query-9d7bc2b0eb0c09652d9826db4a7ae47591405e1b5bec1229f2e2734c73e66163.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method, elapsed\n )\n VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method,\n elapsed = excluded.elapsed\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 6 - }, - "nullable": [] - }, - "hash": "9d7bc2b0eb0c09652d9826db4a7ae47591405e1b5bec1229f2e2734c73e66163" -} diff --git a/src-tauri/.sqlx/query-3330be44d8851f8e3456c403b5d1067f4e70e85ef8829b7aaad5b1993c3d01e8.json b/src-tauri/.sqlx/query-d4b64c466624eb75e0f5bd201ebfb6a73d25eb7c9e09cb9690afdb7fef5fca8b.json similarity index 62% rename from src-tauri/.sqlx/query-3330be44d8851f8e3456c403b5d1067f4e70e85ef8829b7aaad5b1993c3d01e8.json rename to src-tauri/.sqlx/query-d4b64c466624eb75e0f5bd201ebfb6a73d25eb7c9e09cb9690afdb7fef5fca8b.json index ea56c49f..9bb6f1b1 100644 --- a/src-tauri/.sqlx/query-3330be44d8851f8e3456c403b5d1067f4e70e85ef8829b7aaad5b1993c3d01e8.json +++ b/src-tauri/.sqlx/query-d4b64c466624eb75e0f5bd201ebfb6a73d25eb7c9e09cb9690afdb7fef5fca8b.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed\n FROM grpc_connections\n WHERE id = ?\n ", + "query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed, status, error, url,\n trailers AS \"trailers!: sqlx::types::Json>\"\n FROM grpc_connections\n WHERE id = ?\n ", "describe": { "columns": [ { @@ -47,6 +47,26 @@ "name": "elapsed", "ordinal": 8, "type_info": "Int64" + }, + { + "name": "status", + "ordinal": 9, + "type_info": "Int64" + }, + { + "name": "error", + "ordinal": 10, + "type_info": "Text" + }, + { + "name": "url", + "ordinal": 11, + "type_info": "Text" + }, + { + "name": "trailers!: sqlx::types::Json>", + "ordinal": 12, + "type_info": "Text" } ], "parameters": { @@ -61,8 +81,12 @@ false, false, false, + false, + false, + true, + false, false ] }, - "hash": "3330be44d8851f8e3456c403b5d1067f4e70e85ef8829b7aaad5b1993c3d01e8" + "hash": "d4b64c466624eb75e0f5bd201ebfb6a73d25eb7c9e09cb9690afdb7fef5fca8b" } diff --git a/src-tauri/grpc/src/lib.rs b/src-tauri/grpc/src/lib.rs index 8bb10da9..828c1627 100644 --- a/src-tauri/grpc/src/lib.rs +++ b/src-tauri/grpc/src/lib.rs @@ -1,11 +1,15 @@ -use prost_reflect::{DynamicMessage, SerializeOptions}; +use prost_reflect::{DynamicMessage, MethodDescriptor, SerializeOptions}; use serde::{Deserialize, Serialize}; +use serde_json::Deserializer; mod codec; mod json_schema; pub mod manager; mod proto; +pub use tonic::metadata::*; +pub use tonic::Code; + pub fn serialize_options() -> SerializeOptions { SerializeOptions::new().skip_default_fields(false) } @@ -38,3 +42,11 @@ pub fn serialize_message(msg: &DynamicMessage) -> Result { let s = String::from_utf8(buf).expect("serde_json to emit valid utf8"); Ok(s) } + +pub fn deserialize_message(msg: &str, method: MethodDescriptor) -> Result { + let mut deserializer = Deserializer::from_str(&msg); + let req_message = DynamicMessage::deserialize(method.input(), &mut deserializer) + .map_err(|e| e.to_string())?; + deserializer.end().map_err(|e| e.to_string())?; + Ok(req_message) +} diff --git a/src-tauri/grpc/src/manager.rs b/src-tauri/grpc/src/manager.rs index 85eecc30..55f56baf 100644 --- a/src-tauri/grpc/src/manager.rs +++ b/src-tauri/grpc/src/manager.rs @@ -9,7 +9,6 @@ pub use prost_reflect::DynamicMessage; use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor}; use serde_json::Deserializer; use tokio_stream::wrappers::ReceiverStream; -use tokio_stream::StreamExt; use tonic::body::BoxBody; use tonic::metadata::{MetadataKey, MetadataValue}; use tonic::transport::Uri; @@ -50,7 +49,7 @@ impl GrpcConnection { method: &str, message: &str, metadata: HashMap, - ) -> Result { + ) -> Result, String> { let method = &self.method(&service, &method)?; let input_message = method.input(); @@ -68,34 +67,23 @@ impl GrpcConnection { let codec = DynamicCodec::new(method.clone()); client.ready().await.unwrap(); - Ok(client + client .unary(req, path, codec) .await - .map_err(|e| e.to_string())? - .into_inner()) + .map_err(|e| e.to_string()) } pub async fn streaming( &self, service: &str, method: &str, - stream: ReceiverStream, + stream: ReceiverStream, metadata: HashMap, ) -> Result>, Status>, String> { let method = &self.method(&service, &method)?; let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); - let method2 = method.clone(); - let mut req = stream - .map(move |s| { - let mut deserializer = Deserializer::from_str(&s); - let req_message = DynamicMessage::deserialize(method2.input(), &mut deserializer) - .map_err(|e| e.to_string()) - .unwrap(); - deserializer.end().unwrap(); - req_message - }) - .into_streaming_request(); + let mut req = stream.into_streaming_request(); decorate_req(metadata, &mut req).map_err(|e| e.to_string())?; @@ -109,37 +97,21 @@ impl GrpcConnection { &self, service: &str, method: &str, - stream: ReceiverStream, + stream: ReceiverStream, metadata: HashMap, - ) -> Result { + ) -> Result, String> { let method = &self.method(&service, &method)?; let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone()); - - let mut req = { - let method = method.clone(); - stream - .map(move |s| { - let mut deserializer = Deserializer::from_str(&s); - let req_message = - DynamicMessage::deserialize(method.input(), &mut deserializer) - .map_err(|e| e.to_string()) - .unwrap(); - deserializer.end().unwrap(); - req_message - }) - .into_streaming_request() - }; - + let mut req = stream.into_streaming_request(); decorate_req(metadata, &mut req).map_err(|e| e.to_string())?; let path = method_desc_to_path(method); let codec = DynamicCodec::new(method.clone()); client.ready().await.unwrap(); - Ok(client + client .client_streaming(req, path, codec) .await - .map_err(|s| s.to_string())? - .into_inner()) + .map_err(|s| s.to_string()) } pub async fn server_streaming( @@ -230,54 +202,6 @@ impl GrpcHandle { .collect::>() } - pub async fn server_streaming( - &mut self, - id: &str, - uri: Uri, - proto_files: Vec, - service: &str, - method: &str, - message: &str, - metadata: HashMap, - ) -> Result>, Status>, String> { - self.connect(id, uri, proto_files) - .await? - .server_streaming(service, method, message, metadata) - .await - } - - pub async fn client_streaming( - &mut self, - id: &str, - uri: Uri, - proto_files: Vec, - service: &str, - method: &str, - stream: ReceiverStream, - metadata: HashMap, - ) -> Result { - self.connect(id, uri, proto_files) - .await? - .client_streaming(service, method, stream, metadata) - .await - } - - pub async fn streaming( - &mut self, - id: &str, - uri: Uri, - proto_files: Vec, - service: &str, - method: &str, - stream: ReceiverStream, - metadata: HashMap, - ) -> Result>, Status>, String> { - self.connect(id, uri, proto_files) - .await? - .streaming(service, method, stream, metadata) - .await - } - pub async fn connect( &mut self, id: &str, diff --git a/src-tauri/migrations/20240203164833_grpc.sql b/src-tauri/migrations/20240203164833_grpc.sql index db14a1ca..ce0fae90 100644 --- a/src-tauri/migrations/20240203164833_grpc.sql +++ b/src-tauri/migrations/20240203164833_grpc.sql @@ -1,63 +1,67 @@ CREATE TABLE grpc_requests ( - id TEXT NOT NULL + id TEXT NOT NULL PRIMARY KEY, - model TEXT DEFAULT 'grpc_request' NOT NULL, - workspace_id TEXT NOT NULL + model TEXT DEFAULT 'grpc_request' NOT NULL, + workspace_id TEXT NOT NULL REFERENCES workspaces ON DELETE CASCADE, - folder_id TEXT NULL + folder_id TEXT NULL REFERENCES folders ON DELETE CASCADE, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - name TEXT NOT NULL, - sort_priority REAL NOT NULL, - url TEXT NOT NULL, - service TEXT NULL, - method TEXT NULL, - message TEXT NOT NULL, - proto_files TEXT DEFAULT '[]' NOT NULL, - authentication TEXT DEFAULT '{}' NOT NULL, - authentication_type TEXT NULL, - metadata TEXT DEFAULT '[]' NOT NULL + created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + name TEXT NOT NULL, + sort_priority REAL NOT NULL, + url TEXT NOT NULL, + service TEXT NULL, + method TEXT NULL, + message TEXT NOT NULL, + proto_files TEXT DEFAULT '[]' NOT NULL, + authentication TEXT DEFAULT '{}' NOT NULL, + authentication_type TEXT NULL, + metadata TEXT DEFAULT '[]' NOT NULL ); CREATE TABLE grpc_connections ( - id TEXT NOT NULL + id TEXT NOT NULL PRIMARY KEY, - model TEXT DEFAULT 'grpc_connection' NOT NULL, - workspace_id TEXT NOT NULL + model TEXT DEFAULT 'grpc_connection' NOT NULL, + workspace_id TEXT NOT NULL REFERENCES workspaces ON DELETE CASCADE, - request_id TEXT NOT NULL + request_id TEXT NOT NULL REFERENCES grpc_requests ON DELETE CASCADE, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - service TEXT NOT NULL, - method TEXT NOT NULL, - elapsed INTEGER NOT NULL + created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + url TEXT NOT NULL, + service TEXT NOT NULL, + method TEXT NOT NULL, + status INTEGER DEFAULT -1 NOT NULL, + error TEXT NULL, + elapsed INTEGER DEFAULT 0 NOT NULL, + trailers TEXT DEFAULT '{}' NOT NULL ); -CREATE TABLE grpc_messages +CREATE TABLE grpc_events ( - id TEXT NOT NULL + id TEXT NOT NULL PRIMARY KEY, - model TEXT DEFAULT 'grpc_message' NOT NULL, - workspace_id TEXT NOT NULL + model TEXT DEFAULT 'grpc_event' NOT NULL, + workspace_id TEXT NOT NULL REFERENCES workspaces ON DELETE CASCADE, - request_id TEXT NOT NULL + request_id TEXT NOT NULL REFERENCES grpc_requests ON DELETE CASCADE, - connection_id TEXT NOT NULL + connection_id TEXT NOT NULL REFERENCES grpc_connections ON DELETE CASCADE, - created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL, - is_server BOOLEAN NOT NULL, - is_info BOOLEAN NOT NULL, - message TEXT NOT NULL + created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL, + metadata TEXT DEFAULT '{}' NOT NULL, + event_type TEXT NOT NULL, + content TEXT NOT NULL ); diff --git a/src-tauri/src/analytics.rs b/src-tauri/src/analytics.rs index 5388c003..0b05d26f 100644 --- a/src-tauri/src/analytics.rs +++ b/src-tauri/src/analytics.rs @@ -15,7 +15,7 @@ pub enum AnalyticsResource { Environment, Folder, GrpcConnection, - GrpcMessage, + GrpcEvent, GrpcRequest, HttpRequest, HttpResponse, @@ -33,7 +33,7 @@ impl AnalyticsResource { "Environment" => Some(AnalyticsResource::Environment), "Folder" => Some(AnalyticsResource::Folder), "GrpcConnection" => Some(AnalyticsResource::GrpcConnection), - "GrpcMessage" => Some(AnalyticsResource::GrpcMessage), + "GrpcEvent" => Some(AnalyticsResource::GrpcEvent), "GrpcRequest" => Some(AnalyticsResource::GrpcRequest), "HttpRequest" => Some(AnalyticsResource::HttpRequest), "HttpResponse" => Some(AnalyticsResource::HttpResponse), @@ -96,7 +96,7 @@ fn resource_name(resource: AnalyticsResource) -> &'static str { AnalyticsResource::Folder => "folder", AnalyticsResource::GrpcRequest => "grpc_request", AnalyticsResource::GrpcConnection => "grpc_connection", - AnalyticsResource::GrpcMessage => "grpc_message", + AnalyticsResource::GrpcEvent => "grpc_event", AnalyticsResource::HttpRequest => "http_request", AnalyticsResource::HttpResponse => "http_response", AnalyticsResource::KeyValue => "key_value", diff --git a/src-tauri/src/grpc.rs b/src-tauri/src/grpc.rs new file mode 100644 index 00000000..28996b8e --- /dev/null +++ b/src-tauri/src/grpc.rs @@ -0,0 +1,16 @@ +use std::collections::HashMap; + +use KeyAndValueRef::{Ascii, Binary}; + +use grpc::{KeyAndValueRef, MetadataMap}; + +pub fn metadata_to_map(metadata: MetadataMap) -> HashMap { + let mut entries = HashMap::new(); + for r in metadata.iter() { + match r { + Ascii(k, v) => entries.insert(k.to_string(), v.to_str().unwrap().to_string()), + Binary(k, v) => entries.insert(k.to_string(), format!("{:?}", v)), + }; + } + entries +} diff --git a/src-tauri/src/main.rs b/src-tauri/src/main.rs index 16c40731..50c13869 100644 --- a/src-tauri/src/main.rs +++ b/src-tauri/src/main.rs @@ -35,31 +35,33 @@ use tokio::sync::Mutex; use tokio::time::sleep; use window_shadows::set_shadow; -use grpc::manager::GrpcHandle; -use grpc::{serialize_message, ServiceDefinition}; +use ::grpc::manager::{DynamicMessage, GrpcHandle}; +use ::grpc::{deserialize_message, serialize_message, Code, ServiceDefinition}; use window_ext::TrafficLightWindowExt; use crate::analytics::{AnalyticsAction, AnalyticsResource}; +use crate::grpc::metadata_to_map; use crate::http::send_http_request; use crate::models::{ cancel_pending_grpc_connections, cancel_pending_responses, create_http_response, delete_all_grpc_connections, delete_all_http_responses, delete_cookie_jar, delete_environment, delete_folder, delete_grpc_connection, delete_grpc_request, delete_http_request, delete_http_response, delete_workspace, duplicate_grpc_request, duplicate_http_request, - get_cookie_jar, get_environment, get_folder, get_grpc_request, get_http_request, - get_http_response, get_key_value_raw, get_or_create_settings, get_workspace, + get_cookie_jar, get_environment, get_folder, get_grpc_connection, get_grpc_request, + get_http_request, get_http_response, get_key_value_raw, get_or_create_settings, get_workspace, get_workspace_export_resources, list_cookie_jars, list_environments, list_folders, - list_grpc_connections, list_grpc_messages, list_grpc_requests, list_requests, list_responses, + list_grpc_connections, list_grpc_events, list_grpc_requests, list_requests, list_responses, list_workspaces, set_key_value_raw, update_response_if_id, update_settings, upsert_cookie_jar, - upsert_environment, upsert_folder, upsert_grpc_connection, upsert_grpc_message, + upsert_environment, upsert_folder, upsert_grpc_connection, upsert_grpc_event, upsert_grpc_request, upsert_http_request, upsert_workspace, CookieJar, Environment, - EnvironmentVariable, Folder, GrpcConnection, GrpcMessage, GrpcRequest, HttpRequest, - HttpResponse, KeyValue, Settings, Workspace, + EnvironmentVariable, Folder, GrpcConnection, GrpcEvent, GrpcEventType, GrpcRequest, + HttpRequest, HttpResponse, KeyValue, Settings, Workspace, }; use crate::plugin::{ImportResources, ImportResult}; use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater}; mod analytics; +mod grpc; mod http; mod models; mod plugin; @@ -143,137 +145,6 @@ async fn cmd_grpc_go( let workspace = get_workspace(&w, &req.workspace_id) .await .map_err(|e| e.to_string())?; - let conn = { - let req = req.clone(); - upsert_grpc_connection( - &w, - &GrpcConnection { - workspace_id: req.workspace_id, - request_id: req.id, - ..Default::default() - }, - ) - .await - .map_err(|e| e.to_string())? - }; - - let base_msg = GrpcMessage { - workspace_id: req.clone().workspace_id, - request_id: req.clone().id, - connection_id: conn.clone().id, - ..Default::default() - }; - - upsert_grpc_message( - &w, - &GrpcMessage { - message: "Initiating connection".to_string(), - is_info: true, - ..base_msg.clone() - }, - ) - .await - .expect("Failed to upsert message"); - - let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); - let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone())); - let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); - - let uri = safe_uri(&req.url).map_err(|e| e.to_string())?; - - let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx); - - let (service, method) = { - let req = req.clone(); - match (req.service, req.method) { - (Some(service), Some(method)) => (service, method), - _ => return Err("Service and method are required".to_string()), - } - }; - - let start = std::time::Instant::now(); - let connection = grpc_handle - .lock() - .await - .connect( - &req.clone().id, - uri, - req.proto_files - .0 - .iter() - .map(|p| PathBuf::from_str(p).unwrap()) - .collect(), - ) - .await?; - - let method_desc = connection - .method(&service, &method) - .expect("Service not found"); - - #[derive(serde::Deserialize)] - enum IncomingMsg { - Message(String), - Cancel, - Commit, - } - - let cb = { - let cancelled_rx = cancelled_rx.clone(); - let environment = environment.clone(); - let workspace = workspace.clone(); - let w = w.clone(); - let base_msg = base_msg.clone(); - - move |ev: tauri::Event| { - if *cancelled_rx.borrow() { - // Stream is cancelled - return; - } - - let mut maybe_in_msg_tx = maybe_in_msg_tx - .lock() - .expect("previous holder not to panic"); - let in_msg_tx = if let Some(in_msg_tx) = maybe_in_msg_tx.as_ref() { - in_msg_tx - } else { - // This would mean that the stream is already committed because - // we have already dropped the sending half - return; - }; - - match serde_json::from_str::(ev.payload().unwrap()) { - Ok(IncomingMsg::Message(raw_msg)) => { - in_msg_tx.try_send(raw_msg.clone()).unwrap(); - let w = w.clone(); - let base_msg = base_msg.clone(); - let environment_ref = environment.as_ref(); - let msg = render::render(raw_msg.as_str(), &workspace, environment_ref); - tauri::async_runtime::spawn(async move { - upsert_grpc_message( - &w, - &GrpcMessage { - message: msg, - ..base_msg.clone() - }, - ) - .await - .map_err(|e| e.to_string()) - .unwrap(); - }); - } - Ok(IncomingMsg::Commit) => { - maybe_in_msg_tx.take(); - } - Ok(IncomingMsg::Cancel) => { - cancelled_tx.send_replace(true); - } - Err(e) => { - error!("Failed to parse gRPC message: {:?}", e); - } - } - } - }; - let event_handler = w.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb); let mut metadata = HashMap::new(); // Add rest of metadata @@ -322,7 +193,150 @@ async fn cmd_grpc_go( } } - println!("METADATA: {:?}", metadata); + let conn = { + let req = req.clone(); + upsert_grpc_connection( + &w, + &GrpcConnection { + workspace_id: req.workspace_id, + request_id: req.id, + status: -1, + url: req.url.clone(), + ..Default::default() + }, + ) + .await + .map_err(|e| e.to_string())? + }; + let conn_id = conn.id.clone(); + + let base_msg = GrpcEvent { + workspace_id: req.clone().workspace_id, + request_id: req.clone().id, + connection_id: conn.clone().id, + ..Default::default() + }; + + let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::(16); + let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone())); + let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false); + + let uri = safe_uri(&req.url).map_err(|e| e.to_string())?; + + let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx); + + let (service, method) = { + let req = req.clone(); + match (req.service, req.method) { + (Some(service), Some(method)) => (service, method), + _ => return Err("Service and method are required".to_string()), + } + }; + + let start = std::time::Instant::now(); + let connection = grpc_handle + .lock() + .await + .connect( + &req.clone().id, + uri, + req.proto_files + .0 + .iter() + .map(|p| PathBuf::from_str(p).unwrap()) + .collect(), + ) + .await?; + + let method_desc = connection + .method(&service, &method) + .expect("Service not found"); + + #[derive(serde::Deserialize)] + enum IncomingMsg { + Message(String), + Cancel, + Commit, + } + + let cb = { + let cancelled_rx = cancelled_rx.clone(); + let environment = environment.clone(); + let workspace = workspace.clone(); + let w = w.clone(); + let base_msg = base_msg.clone(); + let method_desc = method_desc.clone(); + + move |ev: tauri::Event| { + if *cancelled_rx.borrow() { + // Stream is cancelled + return; + } + + let mut maybe_in_msg_tx = maybe_in_msg_tx + .lock() + .expect("previous holder not to panic"); + let in_msg_tx = if let Some(in_msg_tx) = maybe_in_msg_tx.as_ref() { + in_msg_tx + } else { + // This would mean that the stream is already committed because + // we have already dropped the sending half + return; + }; + + match serde_json::from_str::(ev.payload().unwrap()) { + Ok(IncomingMsg::Message(raw_msg)) => { + let w = w.clone(); + let base_msg = base_msg.clone(); + let environment_ref = environment.as_ref(); + let method_desc = method_desc.clone(); + let msg = render::render(raw_msg.as_str(), &workspace, environment_ref); + let d_msg: DynamicMessage = match deserialize_message(msg.as_str(), method_desc) + { + Ok(d_msg) => d_msg, + Err(e) => { + tauri::async_runtime::spawn(async move { + upsert_grpc_event( + &w, + &GrpcEvent { + event_type: GrpcEventType::Error, + content: e.to_string(), + ..base_msg.clone() + }, + ) + .await + .unwrap(); + }); + return; + } + }; + in_msg_tx.try_send(d_msg).unwrap(); + tauri::async_runtime::spawn(async move { + upsert_grpc_event( + &w, + &GrpcEvent { + content: msg, + event_type: GrpcEventType::ClientMessage, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + }); + } + Ok(IncomingMsg::Commit) => { + maybe_in_msg_tx.take(); + } + Ok(IncomingMsg::Cancel) => { + cancelled_tx.send_replace(true); + } + Err(e) => { + error!("Failed to parse gRPC message: {:?}", e); + } + } + } + }; + let event_handler = w.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb); let grpc_listen = { let w = w.clone(); @@ -330,7 +344,26 @@ async fn cmd_grpc_go( let req = req.clone(); let workspace = workspace.clone(); let environment = environment.clone(); - let msg = render::render(&req.message, &workspace, environment.as_ref()); + let raw_msg = if req.message.is_empty() { + "{}".to_string() + } else { + req.message + }; + let msg = render::render(&raw_msg, &workspace, environment.as_ref()); + let conn_id = conn_id.clone(); + + upsert_grpc_event( + &w, + &GrpcEvent { + content: format!("Connecting to {}", req.url), + event_type: GrpcEventType::Info, + metadata: Json(metadata.clone()), + ..base_msg.clone() + }, + ) + .await + .unwrap(); + async move { let (maybe_stream, maybe_msg) = match ( method_desc.is_client_streaming(), @@ -366,54 +399,104 @@ async fn cmd_grpc_go( ), }; + if !method_desc.is_client_streaming() { + upsert_grpc_event( + &w, + &GrpcEvent { + event_type: GrpcEventType::ClientMessage, + content: msg, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + } + match maybe_msg { Some(Ok(msg)) => { - println!("Message: {:?}", msg); - upsert_grpc_message( + upsert_grpc_event( &w, - &GrpcMessage { - message: serialize_message(&msg).unwrap(), - is_server: true, + &GrpcEvent { + metadata: Json(metadata_to_map(msg.metadata().clone())), + content: if msg.metadata().len() == 0 { + "Connection established" + } else { + "Received metadata" + } + .to_string(), + event_type: GrpcEventType::Info, ..base_msg.clone() }, ) .await .unwrap(); + upsert_grpc_event( + &w, + &GrpcEvent { + content: serialize_message(&msg.into_inner()).unwrap(), + event_type: GrpcEventType::ServerMessage, + ..base_msg.clone() + }, + ) + .await + .unwrap(); + upsert_grpc_connection( + &w, + &GrpcConnection { + elapsed: start.elapsed().as_millis() as i64, + status: Code::Ok as i64, + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() + }, + ) + .await + .unwrap(); } Some(Err(e)) => { - // TODO: Make into error - println!("Error connecting: {:?}", e); - upsert_grpc_message( + upsert_grpc_connection( &w, - &GrpcMessage { - message: e.to_string(), - is_server: true, - is_info: true, - ..base_msg.clone() + &GrpcConnection { + error: Some(e.to_string()), + elapsed: start.elapsed().as_millis() as i64, + status: Code::Unknown as i64, + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, ) .await .unwrap(); } - None => {} + None => { + // Server streaming doesn't return initial message + } } let mut stream = match maybe_stream { Some(Ok(Ok(s))) => { - // TODO: Store metadata on... connection? Or in a message - println!("METADATA: {:?}", s.metadata()); + upsert_grpc_event( + &w, + &GrpcEvent { + metadata: Json(metadata_to_map(s.metadata().clone())), + content: if s.metadata().len() == 0 { + "Connection established" + } else { + "Received metadata" + } + .to_string(), + event_type: GrpcEventType::Info, + ..base_msg.clone() + }, + ) + .await + .unwrap(); s.into_inner() } Some(Ok(Err(e))) => { - // TODO: Make into error, and use status - println!("Connection status error: {:?}", e); - upsert_grpc_message( + upsert_grpc_connection( &w, - &GrpcMessage { - message: e.message().to_string(), - is_server: true, - is_info: true, - ..base_msg.clone() + &GrpcConnection { + error: Some(e.message().to_string()), + status: e.code() as i64, + elapsed: start.elapsed().as_millis() as i64, + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, ) .await @@ -421,15 +504,13 @@ async fn cmd_grpc_go( return; } Some(Err(e)) => { - // TODO: Make into error - println!("Generic error: {:?}", e); - upsert_grpc_message( + upsert_grpc_connection( &w, - &GrpcMessage { - message: e.to_string(), - is_server: true, - is_info: true, - ..base_msg.clone() + &GrpcConnection { + error: Some(e), + status: Code::Unknown as i64, + elapsed: start.elapsed().as_millis() as i64, + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, ) .await @@ -443,11 +524,11 @@ async fn cmd_grpc_go( match stream.message().await { Ok(Some(msg)) => { let message = serialize_message(&msg).unwrap(); - upsert_grpc_message( + upsert_grpc_event( &w, - &GrpcMessage { - message, - is_server: true, + &GrpcEvent { + content: message, + event_type: GrpcEventType::ServerMessage, ..base_msg.clone() }, ) @@ -455,16 +536,18 @@ async fn cmd_grpc_go( .unwrap(); } Ok(None) => { - // TODO: Store trailers on connection - let trailers = stream.trailers().await.unwrap_or_default(); - info!("gRPC stream closed by sender {:?}", trailers,); - // TODO: Mark this on connection instead - upsert_grpc_message( + let trailers = stream + .trailers() + .await + .unwrap_or_default() + .unwrap_or_default(); + upsert_grpc_connection( &w, - &GrpcMessage { - message: "Connection closed".to_string(), - is_info: true, - ..base_msg.clone() + &GrpcConnection { + elapsed: start.elapsed().as_millis() as i64, + status: Code::Unavailable as i64, + trailers: Json(metadata_to_map(trailers)), + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, ) .await @@ -472,15 +555,13 @@ async fn cmd_grpc_go( break; } Err(status) => { - // TODO: Make into error - println!("Error status: {:?}", status); - upsert_grpc_message( + upsert_grpc_connection( &w, - &GrpcMessage { - message: status.message().to_string(), - is_server: true, - is_info: true, - ..base_msg.clone() + &GrpcConnection { + elapsed: start.elapsed().as_millis() as i64, + status: Code::Unavailable as i64, + trailers: Json(metadata_to_map(status.metadata().clone())), + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, ) .await @@ -492,36 +573,31 @@ async fn cmd_grpc_go( }; { - let conn = conn.clone(); + let conn_id = conn_id.clone(); tauri::async_runtime::spawn(async move { let w = w.clone(); tokio::select! { _ = grpc_listen => { - upsert_grpc_connection( - &w, - &GrpcConnection{ - elapsed: start.elapsed().as_millis() as i64, - ..conn - }, - ).await.unwrap(); + // upsert_grpc_connection( + // &w, + // &GrpcConnection{ + // elapsed: start.elapsed().as_millis() as i64, + // status: Code::Ok as i64, + // ..conn + // }, + // ).await.unwrap(); }, _ = cancelled_rx.changed() => { - upsert_grpc_message( - &w, - &GrpcMessage { - message: "Connection cancelled".to_string(), - is_info: true, - ..base_msg.clone() - }, - ) - .await.unwrap(); upsert_grpc_connection( &w, - &GrpcConnection{ + &GrpcConnection { elapsed: start.elapsed().as_millis() as i64, - ..conn + status: Code::Cancelled as i64, + ..get_grpc_connection(&w, &conn_id).await.unwrap().clone() }, - ).await.unwrap(); + ) + .await + .unwrap(); }, } w.unlisten(event_handler); @@ -1037,11 +1113,8 @@ async fn cmd_list_grpc_connections( } #[tauri::command] -async fn cmd_list_grpc_messages( - connection_id: &str, - w: Window, -) -> Result, String> { - list_grpc_messages(&w, connection_id) +async fn cmd_list_grpc_events(connection_id: &str, w: Window) -> Result, String> { + list_grpc_events(&w, connection_id) .await .map_err(|e| e.to_string()) } @@ -1334,7 +1407,7 @@ fn main() { cmd_list_http_requests, cmd_list_grpc_requests, cmd_list_grpc_connections, - cmd_list_grpc_messages, + cmd_list_grpc_events, cmd_list_http_responses, cmd_list_workspaces, cmd_new_window, diff --git a/src-tauri/src/models.rs b/src-tauri/src/models.rs index 78b7aad9..e6ac0a35 100644 --- a/src-tauri/src/models.rs +++ b/src-tauri/src/models.rs @@ -231,33 +231,41 @@ pub struct GrpcConnection { pub service: String, pub method: String, pub elapsed: i64, + pub status: i64, + pub url: String, + pub error: Option, + pub trailers: Json>, +} + +#[derive(sqlx::Type, Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "snake_case")] +#[sqlx(rename_all = "snake_case")] +pub enum GrpcEventType { + Info, + Error, + ClientMessage, + ServerMessage, + ConnectionResponse, +} + +impl Default for GrpcEventType { + fn default() -> Self { + GrpcEventType::Info + } } #[derive(Debug, Clone, Serialize, Deserialize, Default)] #[serde(default, rename_all = "camelCase")] -pub struct GrpcMessage { +pub struct GrpcEvent { pub id: String, pub model: String, pub workspace_id: String, pub request_id: String, pub connection_id: String, pub created_at: NaiveDateTime, - pub message: String, - pub is_server: bool, - pub is_info: bool, -} - -#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)] -#[serde(default, rename_all = "camelCase")] -pub struct GrpcResponse { - pub id: String, - pub model: String, - pub workspace_id: String, - pub grpc_endpoint_id: String, - pub grpc_connection_id: String, - pub request_id: String, - pub created_at: NaiveDateTime, - pub updated_at: NaiveDateTime, + pub content: String, + pub event_type: GrpcEventType, + pub metadata: Json>, } #[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)] @@ -612,14 +620,19 @@ pub async fn upsert_grpc_connection( sqlx::query!( r#" INSERT INTO grpc_connections ( - id, workspace_id, request_id, service, method, elapsed + id, workspace_id, request_id, service, method, elapsed, + status, error, trailers, url ) - VALUES (?, ?, ?, ?, ?, ?) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET updated_at = CURRENT_TIMESTAMP, service = excluded.service, method = excluded.method, - elapsed = excluded.elapsed + elapsed = excluded.elapsed, + status = excluded.status, + error = excluded.error, + trailers = excluded.trailers, + url = excluded.url "#, id, connection.workspace_id, @@ -627,6 +640,10 @@ pub async fn upsert_grpc_connection( connection.service, connection.method, connection.elapsed, + connection.status, + connection.error, + connection.trailers, + connection.url, ) .execute(&db) .await?; @@ -647,7 +664,8 @@ pub async fn get_grpc_connection( r#" SELECT id, model, workspace_id, request_id, created_at, updated_at, service, - method, elapsed + method, elapsed, status, error, url, + trailers AS "trailers!: sqlx::types::Json>" FROM grpc_connections WHERE id = ? "#, @@ -667,7 +685,8 @@ pub async fn list_grpc_connections( r#" SELECT id, model, workspace_id, request_id, created_at, updated_at, service, - method, elapsed + method, elapsed, status, error, url, + trailers AS "trailers!: sqlx::types::Json>" FROM grpc_connections WHERE request_id = ? ORDER BY created_at DESC @@ -678,56 +697,57 @@ pub async fn list_grpc_connections( .await } -pub async fn upsert_grpc_message( +pub async fn upsert_grpc_event( mgr: &impl Manager, - message: &GrpcMessage, -) -> Result { + message: &GrpcEvent, +) -> Result { let db = get_db(mgr).await; let id = match message.id.as_str() { - "" => generate_id(Some("gm")), + "" => generate_id(Some("ge")), _ => message.id.to_string(), }; sqlx::query!( r#" - INSERT INTO grpc_messages ( - id, workspace_id, request_id, connection_id, message, is_server, is_info + INSERT INTO grpc_events ( + id, workspace_id, request_id, connection_id, content, event_type, metadata ) VALUES (?, ?, ?, ?, ?, ?, ?) ON CONFLICT (id) DO UPDATE SET updated_at = CURRENT_TIMESTAMP, - message = excluded.message, - is_server = excluded.is_server, - is_info = excluded.is_info + content = excluded.content, + event_type = excluded.event_type, + metadata = excluded.metadata "#, id, message.workspace_id, message.request_id, message.connection_id, - message.message, - message.is_server, - message.is_info, + message.content, + message.event_type, + message.metadata, ) .execute(&db) .await?; - match get_grpc_message(mgr, &id).await { + match get_grpc_event(mgr, &id).await { Ok(m) => Ok(emit_upserted_model(mgr, m)), Err(e) => Err(e), } } -pub async fn get_grpc_message( +pub async fn get_grpc_event( mgr: &impl Manager, id: &str, -) -> Result { +) -> Result { let db = get_db(mgr).await; sqlx::query_as!( - GrpcMessage, + GrpcEvent, r#" SELECT - id, model, workspace_id, request_id, connection_id, created_at, message, - is_server, is_info - FROM grpc_messages + id, model, workspace_id, request_id, connection_id, created_at, content, + event_type AS "event_type!: GrpcEventType", + metadata AS "metadata!: sqlx::types::Json>" + FROM grpc_events WHERE id = ? "#, id, @@ -736,18 +756,19 @@ pub async fn get_grpc_message( .await } -pub async fn list_grpc_messages( +pub async fn list_grpc_events( mgr: &impl Manager, connection_id: &str, -) -> Result, sqlx::Error> { +) -> Result, sqlx::Error> { let db = get_db(mgr).await; sqlx::query_as!( - GrpcMessage, + GrpcEvent, r#" SELECT - id, model, workspace_id, request_id, connection_id, created_at, message, - is_server, is_info - FROM grpc_messages + id, model, workspace_id, request_id, connection_id, created_at, content, + event_type AS "event_type!: GrpcEventType", + metadata AS "metadata!: sqlx::types::Json>" + FROM grpc_events WHERE connection_id = ? "#, connection_id, diff --git a/src-web/components/EmptyStateText.tsx b/src-web/components/EmptyStateText.tsx index d059611c..2ad80c11 100644 --- a/src-web/components/EmptyStateText.tsx +++ b/src-web/components/EmptyStateText.tsx @@ -6,7 +6,7 @@ interface Props { export function EmptyStateText({ children }: Props) { return ( -
+
{children}
); diff --git a/src-web/components/GlobalHooks.tsx b/src-web/components/GlobalHooks.tsx index 63833273..5a25861c 100644 --- a/src-web/components/GlobalHooks.tsx +++ b/src-web/components/GlobalHooks.tsx @@ -4,7 +4,7 @@ import { useEffect } from 'react'; import { useLocation } from 'react-router-dom'; import { cookieJarsQueryKey } from '../hooks/useCookieJars'; import { grpcConnectionsQueryKey } from '../hooks/useGrpcConnections'; -import { grpcMessagesQueryKey } from '../hooks/useGrpcMessages'; +import { grpcEventsQueryKey } from '../hooks/useGrpcEvents'; import { grpcRequestsQueryKey } from '../hooks/useGrpcRequests'; import { httpRequestsQueryKey } from '../hooks/useHttpRequests'; import { httpResponsesQueryKey } from '../hooks/useHttpResponses'; @@ -53,8 +53,8 @@ export function GlobalHooks() { ? httpResponsesQueryKey(payload) : payload.model === 'grpc_connection' ? grpcConnectionsQueryKey(payload) - : payload.model === 'grpc_message' - ? grpcMessagesQueryKey(payload) + : payload.model === 'grpc_event' + ? grpcEventsQueryKey(payload) : payload.model === 'grpc_request' ? grpcRequestsQueryKey(payload) : payload.model === 'workspace' @@ -107,8 +107,8 @@ export function GlobalHooks() { queryClient.setQueryData(grpcRequestsQueryKey(payload), removeById(payload)); } else if (payload.model === 'grpc_connection') { queryClient.setQueryData(grpcConnectionsQueryKey(payload), removeById(payload)); - } else if (payload.model === 'grpc_message') { - queryClient.setQueryData(grpcMessagesQueryKey(payload), removeById(payload)); + } else if (payload.model === 'grpc_event') { + queryClient.setQueryData(grpcEventsQueryKey(payload), removeById(payload)); } else if (payload.model === 'key_value') { queryClient.setQueryData(keyValueQueryKey(payload), undefined); } else if (payload.model === 'cookie_jar') { diff --git a/src-web/components/GrpcConnectionLayout.tsx b/src-web/components/GrpcConnectionLayout.tsx index 9703d492..11aee200 100644 --- a/src-web/components/GrpcConnectionLayout.tsx +++ b/src-web/components/GrpcConnectionLayout.tsx @@ -4,7 +4,7 @@ import React, { useEffect, useMemo } from 'react'; import { useActiveRequest } from '../hooks/useActiveRequest'; import { useGrpc } from '../hooks/useGrpc'; import { useGrpcConnections } from '../hooks/useGrpcConnections'; -import { useGrpcMessages } from '../hooks/useGrpcMessages'; +import { useGrpcEvents } from '../hooks/useGrpcEvents'; import { useUpdateGrpcRequest } from '../hooks/useUpdateGrpcRequest'; import { Banner } from './core/Banner'; import { HotKeyList } from './core/HotKeyList'; @@ -21,7 +21,7 @@ export function GrpcConnectionLayout({ style }: Props) { const updateRequest = useUpdateGrpcRequest(activeRequest?.id ?? null); const connections = useGrpcConnections(activeRequest?.id ?? null); const activeConnection = connections[0] ?? null; - const messages = useGrpcMessages(activeConnection?.id ?? null); + const messages = useGrpcEvents(activeConnection?.id ?? null); const grpc = useGrpc(activeRequest, activeConnection); const services = grpc.reflect.data ?? null; diff --git a/src-web/components/GrpcConnectionMessagesPane.tsx b/src-web/components/GrpcConnectionMessagesPane.tsx index deb9e7d6..141273d2 100644 --- a/src-web/components/GrpcConnectionMessagesPane.tsx +++ b/src-web/components/GrpcConnectionMessagesPane.tsx @@ -1,15 +1,17 @@ import classNames from 'classnames'; -import { format } from 'date-fns'; -import type { CSSProperties } from 'react'; -import React, { useMemo, useState } from 'react'; +import { format, addMilliseconds } from 'date-fns'; +import type { CSSProperties, ReactNode } from 'react'; +import React, { useEffect, useMemo, useState } from 'react'; import { useGrpcConnections } from '../hooks/useGrpcConnections'; -import { useGrpcMessages } from '../hooks/useGrpcMessages'; -import type { GrpcRequest } from '../lib/models'; +import { useGrpcEvents } from '../hooks/useGrpcEvents'; +import type { GrpcEvent, GrpcRequest } from '../lib/models'; import { Icon } from './core/Icon'; import { JsonAttributeTree } from './core/JsonAttributeTree'; +import { KeyValueRow, KeyValueRows } from './core/KeyValueRow'; import { Separator } from './core/Separator'; import { SplitLayout } from './core/SplitLayout'; import { HStack } from './core/Stacks'; +import { EmptyStateText } from './EmptyStateText'; import { RecentConnectionsDropdown } from './RecentConnectionsDropdown'; interface Props { @@ -25,34 +27,80 @@ interface Props { | 'no-method'; } +const CONNECTION_RESPONSE_EVENT_ID = 'connection_response'; + export function GrpcConnectionMessagesPane({ style, methodType, activeRequest }: Props) { - const [activeMessageId, setActiveMessageId] = useState(null); + const [activeEventId, setActiveEventId] = useState(null); const connections = useGrpcConnections(activeRequest.id ?? null); const activeConnection = connections[0] ?? null; - const messages = useGrpcMessages(activeConnection?.id ?? null); + const ogEvents = useGrpcEvents(activeConnection?.id ?? null); - const activeMessage = useMemo( - () => messages.find((m) => m.id === activeMessageId) ?? null, - [activeMessageId, messages], + const events = useMemo(() => { + const createdAt = + activeConnection != null && + addMilliseconds(activeConnection.createdAt, activeConnection.elapsed) + .toISOString() + .replace('Z', ''); + if (activeConnection == null || activeConnection.elapsed === 0) { + return ogEvents; + } else if (activeConnection.error != null) { + return [ + ...ogEvents, + { + id: CONNECTION_RESPONSE_EVENT_ID, + eventType: 'error', + content: activeConnection.error, + metadata: activeConnection.trailers, + createdAt, + updatedAt: createdAt, + } as GrpcEvent, + ]; + } else { + return [ + ...ogEvents, + { + id: CONNECTION_RESPONSE_EVENT_ID, + eventType: activeConnection.status === 0 ? 'connection_response' : 'error', + content: `Connection ${GRPC_CODES[activeConnection.status] ?? 'closed'}`, + metadata: activeConnection.trailers, + createdAt, + updatedAt: createdAt, + } as GrpcEvent, + ]; + } + }, [activeConnection, ogEvents]); + + const activeEvent = useMemo( + () => events.find((m) => m.id === activeEventId) ?? null, + [activeEventId, events], ); + // Set active message to the first message received if unary + useEffect(() => { + if (events.length === 0 || activeEvent != null || methodType !== 'unary') { + return; + } + setActiveEventId(events.find((m) => m.eventType === 'server_message')?.id ?? null); + // eslint-disable-next-line react-hooks/exhaustive-deps + }, [events.length]); + return ( ( -
- - - {messages.filter((m) => !m.isInfo).length} messages - {activeConnection?.elapsed === 0 && ( - - )} - - {activeConnection && ( + firstSlot={() => + activeConnection && ( +
+ + + {events.length} messages + {activeConnection.elapsed === 0 && ( + + )} + - )} - -
- {...messages.map((m) => ( - { - if (m.id === activeMessageId) setActiveMessageId(null); - else setActiveMessageId(m.id); - }} - alignItems="center" - className={classNames( - 'px-2 py-1 font-mono cursor-default group', - m === activeMessage && '!bg-highlight', - )} - > - -
+
+ {...events.map((m) => ( + { + if (m.id === activeEventId) setActiveEventId(null); + else setActiveEventId(m.id); + }} > - {m.message} -
-
- {format(m.createdAt, 'HH:mm:ss')} -
- - ))} + {m.content} + + ))} +
-
- )} + ) + } secondSlot={ - activeMessage && + activeEvent && (() => (
- {activeMessage.isInfo ? ( - {activeMessage.message} + {activeEvent.eventType === 'client_message' || + activeEvent.eventType === 'server_message' ? ( + <> +
+ Message {activeEvent.eventType === 'client_message' ? 'Sent' : 'Received'} +
+ + ) : ( - +
+
+ {activeEvent.content} +
+ {Object.keys(activeEvent.metadata).length === 0 ? ( + + No {activeEvent.eventType === 'connection_response' ? 'trailers' : 'metadata'} + + ) : ( + + {Object.entries(activeEvent.metadata).map(([key, value]) => ( + + ))} + + )} +
)}
@@ -125,3 +169,89 @@ export function GrpcConnectionMessagesPane({ style, methodType, activeRequest }: /> ); } + +function MessageRow({ + onClick, + isActive, + eventType, + children, + timestamp, +}: { + onClick?: () => void; + isActive?: boolean; + eventType: GrpcEvent['eventType']; + children: ReactNode; + timestamp: string; +}) { + return ( + + ); +} + +const GRPC_CODES: Record = { + 0: 'Ok', + 1: 'Cancelled', + 2: 'Unknown', + 3: 'Invalid argument', + 4: 'Deadline exceeded', + 5: 'Not found', + 6: 'Already exists', + 7: 'Permission denied', + 8: 'Resource exhausted', + 9: 'Failed precondition', + 10: 'Aborted', + 11: 'Out of range', + 12: 'Unimplemented', + 13: 'Internal', + 14: 'Unavailable', + 15: 'Data loss', + 16: 'Unauthenticated', +}; diff --git a/src-web/components/GrpcConnectionSetupPane.tsx b/src-web/components/GrpcConnectionSetupPane.tsx index 09534ac1..bdd46e8d 100644 --- a/src-web/components/GrpcConnectionSetupPane.tsx +++ b/src-web/components/GrpcConnectionSetupPane.tsx @@ -218,7 +218,7 @@ export function GrpcConnectionSetupPane({ className="border border-highlight" size="sm" title={methodType === 'unary' ? 'Send' : 'Connect'} - hotkeyAction={isStreaming ? undefined : 'http_request.send'} + hotkeyAction="grpc_request.send" onClick={handleConnect} disabled={methodType === 'no-schema' || methodType === 'no-method'} icon={ @@ -240,24 +240,24 @@ export function GrpcConnectionSetupPane({ disabled={!isStreaming} /> )} - {methodType === 'client_streaming' && isStreaming && ( - - )} {(methodType === 'client_streaming' || methodType === 'streaming') && isStreaming && ( - onSend({ message: activeRequest.message ?? '' })} - icon="sendHorizontal" - /> + <> + + onSend({ message: activeRequest.message ?? '' })} + icon="sendHorizontal" + /> + )}
diff --git a/src-web/components/ResponseHeaders.tsx b/src-web/components/ResponseHeaders.tsx index 68b0157f..92b9e849 100644 --- a/src-web/components/ResponseHeaders.tsx +++ b/src-web/components/ResponseHeaders.tsx @@ -1,10 +1,8 @@ import { shell } from '@tauri-apps/api'; -import classNames from 'classnames'; -import type { ReactNode } from 'react'; import type { HttpResponse } from '../lib/models'; import { IconButton } from './core/IconButton'; +import { KeyValueRow, KeyValueRows } from './core/KeyValueRow'; import { Separator } from './core/Separator'; -import { HStack } from './core/Stacks'; interface Props { response: HttpResponse; @@ -13,16 +11,16 @@ interface Props { export function ResponseHeaders({ response }: Props) { return (
-
+ {response.headers.map((h, i) => ( - + ))} -
+ Other Info -
- - - + + + URL @@ -41,26 +39,7 @@ export function ResponseHeaders({ response }: Props) {
} /> - +
); } - -function Row({ - label, - value, - labelClassName, -}: { - label: ReactNode; - value: ReactNode; - labelClassName?: string; -}) { - return ( - -
- {label} -
-
{value}
-
- ); -} diff --git a/src-web/components/SidebarActions.tsx b/src-web/components/SidebarActions.tsx index 3616f464..57b9e794 100644 --- a/src-web/components/SidebarActions.tsx +++ b/src-web/components/SidebarActions.tsx @@ -48,7 +48,7 @@ export const SidebarActions = memo(function SidebarActions() { }, { key: 'create-grpc-request', - label: 'GRPC Call', + label: 'gRPC Call', onSelect: () => createGrpcRequest.mutate({}), }, { diff --git a/src-web/components/UrlBar.tsx b/src-web/components/UrlBar.tsx index 6c351769..1865f785 100644 --- a/src-web/components/UrlBar.tsx +++ b/src-web/components/UrlBar.tsx @@ -69,7 +69,7 @@ export const UrlBar = memo(function UrlBar({ ) } diff --git a/src-web/components/Workspace.tsx b/src-web/components/Workspace.tsx index 76a940a7..88069d5e 100644 --- a/src-web/components/Workspace.tsx +++ b/src-web/components/Workspace.tsx @@ -15,6 +15,7 @@ import { useSidebarHidden } from '../hooks/useSidebarHidden'; import { useSidebarWidth } from '../hooks/useSidebarWidth'; import { Button } from './core/Button'; import { HotKeyList } from './core/HotKeyList'; +import { HStack } from './core/Stacks'; import { GrpcConnectionLayout } from './GrpcConnectionLayout'; import { HttpRequestLayout } from './HttpRequestLayout'; import { Overlay } from './Overlay'; @@ -160,7 +161,19 @@ export default function Workspace() { {activeRequest == null ? ( - + + + + + } + /> ) : activeRequest.model === 'grpc_request' ? ( ) : ( diff --git a/src-web/components/core/HotKeyList.tsx b/src-web/components/core/HotKeyList.tsx index 20bd6f8f..1c3a47f2 100644 --- a/src-web/components/core/HotKeyList.tsx +++ b/src-web/components/core/HotKeyList.tsx @@ -6,9 +6,10 @@ import { HStack, VStack } from './Stacks'; interface Props { hotkeys: HotkeyAction[]; + bottomSlot?: React.ReactNode; } -export const HotKeyList = ({ hotkeys }: Props) => { +export const HotKeyList = ({ hotkeys, bottomSlot }: Props) => { return (
@@ -18,6 +19,7 @@ export const HotKeyList = ({ hotkeys }: Props) => { ))} + {bottomSlot}
); diff --git a/src-web/components/core/Icon.tsx b/src-web/components/core/Icon.tsx index 020e0cf1..6406f799 100644 --- a/src-web/components/core/Icon.tsx +++ b/src-web/components/core/Icon.tsx @@ -4,8 +4,11 @@ import type { HTMLAttributes } from 'react'; import { memo } from 'react'; const icons = { + alert: lucide.AlertTriangleIcon, archive: lucide.ArchiveIcon, arrowBigDownDash: lucide.ArrowBigDownDashIcon, + arrowBigLeftDash: lucide.ArrowBigLeftDashIcon, + arrowBigRightDash: lucide.ArrowBigRightDashIcon, arrowBigUpDash: lucide.ArrowBigUpDashIcon, arrowDown: lucide.ArrowDownIcon, arrowDownToDot: lucide.ArrowDownToDotIcon, @@ -60,12 +63,14 @@ export interface IconProps { className?: string; size?: 'xs' | 'sm' | 'md' | 'lg'; spin?: boolean; + title?: string; } -export const Icon = memo(function Icon({ icon, spin, size = 'md', className }: IconProps) { +export const Icon = memo(function Icon({ icon, spin, size = 'md', className, title }: IconProps) { const Component = icons[icon] ?? icons.question; return ( {children}; +} + +export function KeyValueRow({ label, value, labelClassName }: Props) { + return ( + +
+ {label} +
+
{value}
+
+ ); +} diff --git a/src-web/hooks/useGrpcEvents.ts b/src-web/hooks/useGrpcEvents.ts new file mode 100644 index 00000000..2aac56b9 --- /dev/null +++ b/src-web/hooks/useGrpcEvents.ts @@ -0,0 +1,23 @@ +import { useQuery } from '@tanstack/react-query'; +import { invoke } from '@tauri-apps/api'; +import type { GrpcEvent } from '../lib/models'; + +export function grpcEventsQueryKey({ connectionId }: { connectionId: string }) { + return ['grpc_events', { connectionId }]; +} + +export function useGrpcEvents(connectionId: string | null) { + return ( + useQuery({ + enabled: connectionId !== null, + initialData: [], + queryKey: grpcEventsQueryKey({ connectionId: connectionId ?? 'n/a' }), + queryFn: async () => { + return (await invoke('cmd_list_grpc_events', { + connectionId, + limit: 200, + })) as GrpcEvent[]; + }, + }).data ?? [] + ); +} diff --git a/src-web/hooks/useGrpcMessages.ts b/src-web/hooks/useGrpcMessages.ts deleted file mode 100644 index d7c8f100..00000000 --- a/src-web/hooks/useGrpcMessages.ts +++ /dev/null @@ -1,23 +0,0 @@ -import { useQuery } from '@tanstack/react-query'; -import { invoke } from '@tauri-apps/api'; -import type { GrpcMessage } from '../lib/models'; - -export function grpcMessagesQueryKey({ connectionId }: { connectionId: string }) { - return ['grpc_messages', { connectionId }]; -} - -export function useGrpcMessages(connectionId: string | null) { - return ( - useQuery({ - enabled: connectionId !== null, - initialData: [], - queryKey: grpcMessagesQueryKey({ connectionId: connectionId ?? 'n/a' }), - queryFn: async () => { - return (await invoke('cmd_list_grpc_messages', { - connectionId, - limit: 200, - })) as GrpcMessage[]; - }, - }).data ?? [] - ); -} diff --git a/src-web/lib/analytics.ts b/src-web/lib/analytics.ts index 0865a0cf..91688280 100644 --- a/src-web/lib/analytics.ts +++ b/src-web/lib/analytics.ts @@ -9,7 +9,7 @@ export function trackEvent( | 'Workspace' | 'Environment' | 'Folder' - | 'GrpcMessage' + | 'GrpcEvent' | 'GrpcConnection' | 'GrpcRequest' | 'HttpRequest' diff --git a/src-web/lib/models.ts b/src-web/lib/models.ts index f8d5f1f5..ff4251e9 100644 --- a/src-web/lib/models.ts +++ b/src-web/lib/models.ts @@ -15,7 +15,7 @@ export type Model = | Workspace | GrpcConnection | GrpcRequest - | GrpcMessage + | GrpcEvent | HttpRequest | HttpResponse | KeyValue @@ -127,14 +127,14 @@ export interface GrpcRequest extends BaseModel { metadata: GrpcMetadataEntry[]; } -export interface GrpcMessage extends BaseModel { +export interface GrpcEvent extends BaseModel { readonly workspaceId: string; readonly requestId: string; readonly connectionId: string; - readonly model: 'grpc_message'; - message: string; - isServer: boolean; - isInfo: boolean; + readonly model: 'grpc_event'; + content: string; + eventType: 'info' | 'error' | 'client_message' | 'server_message' | 'connection_response'; + metadata: Record; } export interface GrpcConnection extends BaseModel { @@ -144,6 +144,11 @@ export interface GrpcConnection extends BaseModel { service: string; method: string; elapsed: number; + elapsedConnection: number; + status: number; + url: string; + error: string | null; + trailers: Record; } export interface HttpRequest extends BaseModel {