Refactor into grpc events

This commit is contained in:
Gregory Schier
2024-02-22 00:49:22 -08:00
parent 6f389b0010
commit 766da4327c
31 changed files with 851 additions and 595 deletions

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE connection_id = ?\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, content,\n event_type AS \"event_type!: GrpcEventType\",\n metadata AS \"metadata!: sqlx::types::Json<HashMap<String, String>>\"\n FROM grpc_events\n WHERE id = ?\n ",
"describe": {
"columns": [
{
@@ -34,19 +34,19 @@
"type_info": "Datetime"
},
{
"name": "message",
"name": "content",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "is_server",
"name": "event_type!: GrpcEventType",
"ordinal": 7,
"type_info": "Bool"
"type_info": "Text"
},
{
"name": "is_info",
"name": "metadata!: sqlx::types::Json<HashMap<String, String>>",
"ordinal": 8,
"type_info": "Bool"
"type_info": "Text"
}
],
"parameters": {
@@ -64,5 +64,5 @@
false
]
},
"hash": "196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59"
"hash": "20d6b878bb8d16bde3e78e22cf801b5b191905d867091bb54a210256a0145a17"
}

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_events (\n id, workspace_id, request_id, connection_id, content, event_type, metadata\n )\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n content = excluded.content,\n event_type = excluded.event_type,\n metadata = excluded.metadata\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "3dce053aef78e831db2369f3c49e891cb8a9e1ba6e7a60fe9e24292a3f97dca3"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed\n FROM grpc_connections\n WHERE request_id = ?\n ORDER BY created_at DESC\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed, status, error, url,\n trailers AS \"trailers!: sqlx::types::Json<HashMap<String, String>>\"\n FROM grpc_connections\n WHERE request_id = ?\n ORDER BY created_at DESC\n ",
"describe": {
"columns": [
{
@@ -47,6 +47,26 @@
"name": "elapsed",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "status",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "error",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "url",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "trailers!: sqlx::types::Json<HashMap<String, String>>",
"ordinal": 12,
"type_info": "Text"
}
],
"parameters": {
@@ -61,8 +81,12 @@
false,
false,
false,
false,
false,
true,
false,
false
]
},
"hash": "80a85f83d0946d532a60f0add87aa0ade7e35a6b56cb058e2caf9ca005ce6407"
"hash": "3e8651cca7feecc208a676dfd24c7d8775040d5287c16890056dcb474674edfb"
}

View File

@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_messages (\n id, workspace_id, request_id, connection_id, message, is_server, is_info\n )\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n message = excluded.message,\n is_server = excluded.is_server,\n is_info = excluded.is_info\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460"
}

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method, elapsed,\n status, error, trailers, url\n )\n VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method,\n elapsed = excluded.elapsed,\n status = excluded.status,\n error = excluded.error,\n trailers = excluded.trailers,\n url = excluded.url\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 10
},
"nullable": []
},
"hash": "66deed028199c78ed15ea2f837907887c2a2cb564d1d076dd4ebf0ecbc82e098"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE id = ?\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, content,\n event_type AS \"event_type!: GrpcEventType\",\n metadata AS \"metadata!: sqlx::types::Json<HashMap<String, String>>\"\n FROM grpc_events\n WHERE connection_id = ?\n ",
"describe": {
"columns": [
{
@@ -34,19 +34,19 @@
"type_info": "Datetime"
},
{
"name": "message",
"name": "content",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "is_server",
"name": "event_type!: GrpcEventType",
"ordinal": 7,
"type_info": "Bool"
"type_info": "Text"
},
{
"name": "is_info",
"name": "metadata!: sqlx::types::Json<HashMap<String, String>>",
"ordinal": 8,
"type_info": "Bool"
"type_info": "Text"
}
],
"parameters": {
@@ -64,5 +64,5 @@
false
]
},
"hash": "3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c"
"hash": "737045ddd5f8ba3454425e82b9d3943f93649742d8f78613e01d322745e47ebd"
}

View File

@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_connections (\n id, workspace_id, request_id, service, method, elapsed\n )\n VALUES (?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n service = excluded.service,\n method = excluded.method,\n elapsed = excluded.elapsed\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 6
},
"nullable": []
},
"hash": "9d7bc2b0eb0c09652d9826db4a7ae47591405e1b5bec1229f2e2734c73e66163"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed\n FROM grpc_connections\n WHERE id = ?\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, created_at, updated_at, service,\n method, elapsed, status, error, url,\n trailers AS \"trailers!: sqlx::types::Json<HashMap<String, String>>\"\n FROM grpc_connections\n WHERE id = ?\n ",
"describe": {
"columns": [
{
@@ -47,6 +47,26 @@
"name": "elapsed",
"ordinal": 8,
"type_info": "Int64"
},
{
"name": "status",
"ordinal": 9,
"type_info": "Int64"
},
{
"name": "error",
"ordinal": 10,
"type_info": "Text"
},
{
"name": "url",
"ordinal": 11,
"type_info": "Text"
},
{
"name": "trailers!: sqlx::types::Json<HashMap<String, String>>",
"ordinal": 12,
"type_info": "Text"
}
],
"parameters": {
@@ -61,8 +81,12 @@
false,
false,
false,
false,
false,
true,
false,
false
]
},
"hash": "3330be44d8851f8e3456c403b5d1067f4e70e85ef8829b7aaad5b1993c3d01e8"
"hash": "d4b64c466624eb75e0f5bd201ebfb6a73d25eb7c9e09cb9690afdb7fef5fca8b"
}

View File

@@ -1,11 +1,15 @@
use prost_reflect::{DynamicMessage, SerializeOptions};
use prost_reflect::{DynamicMessage, MethodDescriptor, SerializeOptions};
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;
mod codec;
mod json_schema;
pub mod manager;
mod proto;
pub use tonic::metadata::*;
pub use tonic::Code;
pub fn serialize_options() -> SerializeOptions {
SerializeOptions::new().skip_default_fields(false)
}
@@ -38,3 +42,11 @@ pub fn serialize_message(msg: &DynamicMessage) -> Result<String, String> {
let s = String::from_utf8(buf).expect("serde_json to emit valid utf8");
Ok(s)
}
pub fn deserialize_message(msg: &str, method: MethodDescriptor) -> Result<DynamicMessage, String> {
let mut deserializer = Deserializer::from_str(&msg);
let req_message = DynamicMessage::deserialize(method.input(), &mut deserializer)
.map_err(|e| e.to_string())?;
deserializer.end().map_err(|e| e.to_string())?;
Ok(req_message)
}

View File

@@ -9,7 +9,6 @@ pub use prost_reflect::DynamicMessage;
use prost_reflect::{DescriptorPool, MethodDescriptor, ServiceDescriptor};
use serde_json::Deserializer;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tonic::body::BoxBody;
use tonic::metadata::{MetadataKey, MetadataValue};
use tonic::transport::Uri;
@@ -50,7 +49,7 @@ impl GrpcConnection {
method: &str,
message: &str,
metadata: HashMap<String, String>,
) -> Result<DynamicMessage, String> {
) -> Result<Response<DynamicMessage>, String> {
let method = &self.method(&service, &method)?;
let input_message = method.input();
@@ -68,34 +67,23 @@ impl GrpcConnection {
let codec = DynamicCodec::new(method.clone());
client.ready().await.unwrap();
Ok(client
client
.unary(req, path, codec)
.await
.map_err(|e| e.to_string())?
.into_inner())
.map_err(|e| e.to_string())
}
pub async fn streaming(
&self,
service: &str,
method: &str,
stream: ReceiverStream<String>,
stream: ReceiverStream<DynamicMessage>,
metadata: HashMap<String, String>,
) -> Result<Result<Response<Streaming<DynamicMessage>>, Status>, String> {
let method = &self.method(&service, &method)?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
let method2 = method.clone();
let mut req = stream
.map(move |s| {
let mut deserializer = Deserializer::from_str(&s);
let req_message = DynamicMessage::deserialize(method2.input(), &mut deserializer)
.map_err(|e| e.to_string())
.unwrap();
deserializer.end().unwrap();
req_message
})
.into_streaming_request();
let mut req = stream.into_streaming_request();
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
@@ -109,37 +97,21 @@ impl GrpcConnection {
&self,
service: &str,
method: &str,
stream: ReceiverStream<String>,
stream: ReceiverStream<DynamicMessage>,
metadata: HashMap<String, String>,
) -> Result<DynamicMessage, String> {
) -> Result<Response<DynamicMessage>, String> {
let method = &self.method(&service, &method)?;
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
let mut req = {
let method = method.clone();
stream
.map(move |s| {
let mut deserializer = Deserializer::from_str(&s);
let req_message =
DynamicMessage::deserialize(method.input(), &mut deserializer)
.map_err(|e| e.to_string())
.unwrap();
deserializer.end().unwrap();
req_message
})
.into_streaming_request()
};
let mut req = stream.into_streaming_request();
decorate_req(metadata, &mut req).map_err(|e| e.to_string())?;
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
client.ready().await.unwrap();
Ok(client
client
.client_streaming(req, path, codec)
.await
.map_err(|s| s.to_string())?
.into_inner())
.map_err(|s| s.to_string())
}
pub async fn server_streaming(
@@ -230,54 +202,6 @@ impl GrpcHandle {
.collect::<Vec<_>>()
}
pub async fn server_streaming(
&mut self,
id: &str,
uri: Uri,
proto_files: Vec<PathBuf>,
service: &str,
method: &str,
message: &str,
metadata: HashMap<String, String>,
) -> Result<Result<Response<Streaming<DynamicMessage>>, Status>, String> {
self.connect(id, uri, proto_files)
.await?
.server_streaming(service, method, message, metadata)
.await
}
pub async fn client_streaming(
&mut self,
id: &str,
uri: Uri,
proto_files: Vec<PathBuf>,
service: &str,
method: &str,
stream: ReceiverStream<String>,
metadata: HashMap<String, String>,
) -> Result<DynamicMessage, String> {
self.connect(id, uri, proto_files)
.await?
.client_streaming(service, method, stream, metadata)
.await
}
pub async fn streaming(
&mut self,
id: &str,
uri: Uri,
proto_files: Vec<PathBuf>,
service: &str,
method: &str,
stream: ReceiverStream<String>,
metadata: HashMap<String, String>,
) -> Result<Result<Response<Streaming<DynamicMessage>>, Status>, String> {
self.connect(id, uri, proto_files)
.await?
.streaming(service, method, stream, metadata)
.await
}
pub async fn connect(
&mut self,
id: &str,

View File

@@ -1,63 +1,67 @@
CREATE TABLE grpc_requests
(
id TEXT NOT NULL
id TEXT NOT NULL
PRIMARY KEY,
model TEXT DEFAULT 'grpc_request' NOT NULL,
workspace_id TEXT NOT NULL
model TEXT DEFAULT 'grpc_request' NOT NULL,
workspace_id TEXT NOT NULL
REFERENCES workspaces
ON DELETE CASCADE,
folder_id TEXT NULL
folder_id TEXT NULL
REFERENCES folders
ON DELETE CASCADE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
name TEXT NOT NULL,
sort_priority REAL NOT NULL,
url TEXT NOT NULL,
service TEXT NULL,
method TEXT NULL,
message TEXT NOT NULL,
proto_files TEXT DEFAULT '[]' NOT NULL,
authentication TEXT DEFAULT '{}' NOT NULL,
authentication_type TEXT NULL,
metadata TEXT DEFAULT '[]' NOT NULL
created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
name TEXT NOT NULL,
sort_priority REAL NOT NULL,
url TEXT NOT NULL,
service TEXT NULL,
method TEXT NULL,
message TEXT NOT NULL,
proto_files TEXT DEFAULT '[]' NOT NULL,
authentication TEXT DEFAULT '{}' NOT NULL,
authentication_type TEXT NULL,
metadata TEXT DEFAULT '[]' NOT NULL
);
CREATE TABLE grpc_connections
(
id TEXT NOT NULL
id TEXT NOT NULL
PRIMARY KEY,
model TEXT DEFAULT 'grpc_connection' NOT NULL,
workspace_id TEXT NOT NULL
model TEXT DEFAULT 'grpc_connection' NOT NULL,
workspace_id TEXT NOT NULL
REFERENCES workspaces
ON DELETE CASCADE,
request_id TEXT NOT NULL
request_id TEXT NOT NULL
REFERENCES grpc_requests
ON DELETE CASCADE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
service TEXT NOT NULL,
method TEXT NOT NULL,
elapsed INTEGER NOT NULL
created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
url TEXT NOT NULL,
service TEXT NOT NULL,
method TEXT NOT NULL,
status INTEGER DEFAULT -1 NOT NULL,
error TEXT NULL,
elapsed INTEGER DEFAULT 0 NOT NULL,
trailers TEXT DEFAULT '{}' NOT NULL
);
CREATE TABLE grpc_messages
CREATE TABLE grpc_events
(
id TEXT NOT NULL
id TEXT NOT NULL
PRIMARY KEY,
model TEXT DEFAULT 'grpc_message' NOT NULL,
workspace_id TEXT NOT NULL
model TEXT DEFAULT 'grpc_event' NOT NULL,
workspace_id TEXT NOT NULL
REFERENCES workspaces
ON DELETE CASCADE,
request_id TEXT NOT NULL
request_id TEXT NOT NULL
REFERENCES grpc_requests
ON DELETE CASCADE,
connection_id TEXT NOT NULL
connection_id TEXT NOT NULL
REFERENCES grpc_connections
ON DELETE CASCADE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
is_server BOOLEAN NOT NULL,
is_info BOOLEAN NOT NULL,
message TEXT NOT NULL
created_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
updated_at DATETIME DEFAULT(STRFTIME('%Y-%m-%d %H:%M:%f', 'NOW')) NOT NULL,
metadata TEXT DEFAULT '{}' NOT NULL,
event_type TEXT NOT NULL,
content TEXT NOT NULL
);

View File

@@ -15,7 +15,7 @@ pub enum AnalyticsResource {
Environment,
Folder,
GrpcConnection,
GrpcMessage,
GrpcEvent,
GrpcRequest,
HttpRequest,
HttpResponse,
@@ -33,7 +33,7 @@ impl AnalyticsResource {
"Environment" => Some(AnalyticsResource::Environment),
"Folder" => Some(AnalyticsResource::Folder),
"GrpcConnection" => Some(AnalyticsResource::GrpcConnection),
"GrpcMessage" => Some(AnalyticsResource::GrpcMessage),
"GrpcEvent" => Some(AnalyticsResource::GrpcEvent),
"GrpcRequest" => Some(AnalyticsResource::GrpcRequest),
"HttpRequest" => Some(AnalyticsResource::HttpRequest),
"HttpResponse" => Some(AnalyticsResource::HttpResponse),
@@ -96,7 +96,7 @@ fn resource_name(resource: AnalyticsResource) -> &'static str {
AnalyticsResource::Folder => "folder",
AnalyticsResource::GrpcRequest => "grpc_request",
AnalyticsResource::GrpcConnection => "grpc_connection",
AnalyticsResource::GrpcMessage => "grpc_message",
AnalyticsResource::GrpcEvent => "grpc_event",
AnalyticsResource::HttpRequest => "http_request",
AnalyticsResource::HttpResponse => "http_response",
AnalyticsResource::KeyValue => "key_value",

16
src-tauri/src/grpc.rs Normal file
View File

@@ -0,0 +1,16 @@
use std::collections::HashMap;
use KeyAndValueRef::{Ascii, Binary};
use grpc::{KeyAndValueRef, MetadataMap};
pub fn metadata_to_map(metadata: MetadataMap) -> HashMap<String, String> {
let mut entries = HashMap::new();
for r in metadata.iter() {
match r {
Ascii(k, v) => entries.insert(k.to_string(), v.to_str().unwrap().to_string()),
Binary(k, v) => entries.insert(k.to_string(), format!("{:?}", v)),
};
}
entries
}

View File

@@ -35,31 +35,33 @@ use tokio::sync::Mutex;
use tokio::time::sleep;
use window_shadows::set_shadow;
use grpc::manager::GrpcHandle;
use grpc::{serialize_message, ServiceDefinition};
use ::grpc::manager::{DynamicMessage, GrpcHandle};
use ::grpc::{deserialize_message, serialize_message, Code, ServiceDefinition};
use window_ext::TrafficLightWindowExt;
use crate::analytics::{AnalyticsAction, AnalyticsResource};
use crate::grpc::metadata_to_map;
use crate::http::send_http_request;
use crate::models::{
cancel_pending_grpc_connections, cancel_pending_responses, create_http_response,
delete_all_grpc_connections, delete_all_http_responses, delete_cookie_jar, delete_environment,
delete_folder, delete_grpc_connection, delete_grpc_request, delete_http_request,
delete_http_response, delete_workspace, duplicate_grpc_request, duplicate_http_request,
get_cookie_jar, get_environment, get_folder, get_grpc_request, get_http_request,
get_http_response, get_key_value_raw, get_or_create_settings, get_workspace,
get_cookie_jar, get_environment, get_folder, get_grpc_connection, get_grpc_request,
get_http_request, get_http_response, get_key_value_raw, get_or_create_settings, get_workspace,
get_workspace_export_resources, list_cookie_jars, list_environments, list_folders,
list_grpc_connections, list_grpc_messages, list_grpc_requests, list_requests, list_responses,
list_grpc_connections, list_grpc_events, list_grpc_requests, list_requests, list_responses,
list_workspaces, set_key_value_raw, update_response_if_id, update_settings, upsert_cookie_jar,
upsert_environment, upsert_folder, upsert_grpc_connection, upsert_grpc_message,
upsert_environment, upsert_folder, upsert_grpc_connection, upsert_grpc_event,
upsert_grpc_request, upsert_http_request, upsert_workspace, CookieJar, Environment,
EnvironmentVariable, Folder, GrpcConnection, GrpcMessage, GrpcRequest, HttpRequest,
HttpResponse, KeyValue, Settings, Workspace,
EnvironmentVariable, Folder, GrpcConnection, GrpcEvent, GrpcEventType, GrpcRequest,
HttpRequest, HttpResponse, KeyValue, Settings, Workspace,
};
use crate::plugin::{ImportResources, ImportResult};
use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater};
mod analytics;
mod grpc;
mod http;
mod models;
mod plugin;
@@ -143,137 +145,6 @@ async fn cmd_grpc_go(
let workspace = get_workspace(&w, &req.workspace_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
&w,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
let base_msg = GrpcMessage {
workspace_id: req.clone().workspace_id,
request_id: req.clone().id,
connection_id: conn.clone().id,
..Default::default()
};
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Initiating connection".to_string(),
is_info: true,
..base_msg.clone()
},
)
.await
.expect("Failed to upsert message");
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
let (service, method) = {
let req = req.clone();
match (req.service, req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
}
};
let start = std::time::Instant::now();
let connection = grpc_handle
.lock()
.await
.connect(
&req.clone().id,
uri,
req.proto_files
.0
.iter()
.map(|p| PathBuf::from_str(p).unwrap())
.collect(),
)
.await?;
let method_desc = connection
.method(&service, &method)
.expect("Service not found");
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Cancel,
Commit,
}
let cb = {
let cancelled_rx = cancelled_rx.clone();
let environment = environment.clone();
let workspace = workspace.clone();
let w = w.clone();
let base_msg = base_msg.clone();
move |ev: tauri::Event| {
if *cancelled_rx.borrow() {
// Stream is cancelled
return;
}
let mut maybe_in_msg_tx = maybe_in_msg_tx
.lock()
.expect("previous holder not to panic");
let in_msg_tx = if let Some(in_msg_tx) = maybe_in_msg_tx.as_ref() {
in_msg_tx
} else {
// This would mean that the stream is already committed because
// we have already dropped the sending half
return;
};
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(raw_msg)) => {
in_msg_tx.try_send(raw_msg.clone()).unwrap();
let w = w.clone();
let base_msg = base_msg.clone();
let environment_ref = environment.as_ref();
let msg = render::render(raw_msg.as_str(), &workspace, environment_ref);
tauri::async_runtime::spawn(async move {
upsert_grpc_message(
&w,
&GrpcMessage {
message: msg,
..base_msg.clone()
},
)
.await
.map_err(|e| e.to_string())
.unwrap();
});
}
Ok(IncomingMsg::Commit) => {
maybe_in_msg_tx.take();
}
Ok(IncomingMsg::Cancel) => {
cancelled_tx.send_replace(true);
}
Err(e) => {
error!("Failed to parse gRPC message: {:?}", e);
}
}
}
};
let event_handler = w.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let mut metadata = HashMap::new();
// Add rest of metadata
@@ -322,7 +193,150 @@ async fn cmd_grpc_go(
}
}
println!("METADATA: {:?}", metadata);
let conn = {
let req = req.clone();
upsert_grpc_connection(
&w,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
status: -1,
url: req.url.clone(),
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
let conn_id = conn.id.clone();
let base_msg = GrpcEvent {
workspace_id: req.clone().workspace_id,
request_id: req.clone().id,
connection_id: conn.clone().id,
..Default::default()
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<DynamicMessage>(16);
let maybe_in_msg_tx = std::sync::Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let in_msg_stream = tokio_stream::wrappers::ReceiverStream::new(in_msg_rx);
let (service, method) = {
let req = req.clone();
match (req.service, req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
}
};
let start = std::time::Instant::now();
let connection = grpc_handle
.lock()
.await
.connect(
&req.clone().id,
uri,
req.proto_files
.0
.iter()
.map(|p| PathBuf::from_str(p).unwrap())
.collect(),
)
.await?;
let method_desc = connection
.method(&service, &method)
.expect("Service not found");
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Cancel,
Commit,
}
let cb = {
let cancelled_rx = cancelled_rx.clone();
let environment = environment.clone();
let workspace = workspace.clone();
let w = w.clone();
let base_msg = base_msg.clone();
let method_desc = method_desc.clone();
move |ev: tauri::Event| {
if *cancelled_rx.borrow() {
// Stream is cancelled
return;
}
let mut maybe_in_msg_tx = maybe_in_msg_tx
.lock()
.expect("previous holder not to panic");
let in_msg_tx = if let Some(in_msg_tx) = maybe_in_msg_tx.as_ref() {
in_msg_tx
} else {
// This would mean that the stream is already committed because
// we have already dropped the sending half
return;
};
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(raw_msg)) => {
let w = w.clone();
let base_msg = base_msg.clone();
let environment_ref = environment.as_ref();
let method_desc = method_desc.clone();
let msg = render::render(raw_msg.as_str(), &workspace, environment_ref);
let d_msg: DynamicMessage = match deserialize_message(msg.as_str(), method_desc)
{
Ok(d_msg) => d_msg,
Err(e) => {
tauri::async_runtime::spawn(async move {
upsert_grpc_event(
&w,
&GrpcEvent {
event_type: GrpcEventType::Error,
content: e.to_string(),
..base_msg.clone()
},
)
.await
.unwrap();
});
return;
}
};
in_msg_tx.try_send(d_msg).unwrap();
tauri::async_runtime::spawn(async move {
upsert_grpc_event(
&w,
&GrpcEvent {
content: msg,
event_type: GrpcEventType::ClientMessage,
..base_msg.clone()
},
)
.await
.unwrap();
});
}
Ok(IncomingMsg::Commit) => {
maybe_in_msg_tx.take();
}
Ok(IncomingMsg::Cancel) => {
cancelled_tx.send_replace(true);
}
Err(e) => {
error!("Failed to parse gRPC message: {:?}", e);
}
}
}
};
let event_handler = w.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let w = w.clone();
@@ -330,7 +344,26 @@ async fn cmd_grpc_go(
let req = req.clone();
let workspace = workspace.clone();
let environment = environment.clone();
let msg = render::render(&req.message, &workspace, environment.as_ref());
let raw_msg = if req.message.is_empty() {
"{}".to_string()
} else {
req.message
};
let msg = render::render(&raw_msg, &workspace, environment.as_ref());
let conn_id = conn_id.clone();
upsert_grpc_event(
&w,
&GrpcEvent {
content: format!("Connecting to {}", req.url),
event_type: GrpcEventType::Info,
metadata: Json(metadata.clone()),
..base_msg.clone()
},
)
.await
.unwrap();
async move {
let (maybe_stream, maybe_msg) = match (
method_desc.is_client_streaming(),
@@ -366,54 +399,104 @@ async fn cmd_grpc_go(
),
};
if !method_desc.is_client_streaming() {
upsert_grpc_event(
&w,
&GrpcEvent {
event_type: GrpcEventType::ClientMessage,
content: msg,
..base_msg.clone()
},
)
.await
.unwrap();
}
match maybe_msg {
Some(Ok(msg)) => {
println!("Message: {:?}", msg);
upsert_grpc_message(
upsert_grpc_event(
&w,
&GrpcMessage {
message: serialize_message(&msg).unwrap(),
is_server: true,
&GrpcEvent {
metadata: Json(metadata_to_map(msg.metadata().clone())),
content: if msg.metadata().len() == 0 {
"Connection established"
} else {
"Received metadata"
}
.to_string(),
event_type: GrpcEventType::Info,
..base_msg.clone()
},
)
.await
.unwrap();
upsert_grpc_event(
&w,
&GrpcEvent {
content: serialize_message(&msg.into_inner()).unwrap(),
event_type: GrpcEventType::ServerMessage,
..base_msg.clone()
},
)
.await
.unwrap();
upsert_grpc_connection(
&w,
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Ok as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
.unwrap();
}
Some(Err(e)) => {
// TODO: Make into error
println!("Error connecting: {:?}", e);
upsert_grpc_message(
upsert_grpc_connection(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
&GrpcConnection {
error: Some(e.to_string()),
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unknown as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
.unwrap();
}
None => {}
None => {
// Server streaming doesn't return initial message
}
}
let mut stream = match maybe_stream {
Some(Ok(Ok(s))) => {
// TODO: Store metadata on... connection? Or in a message
println!("METADATA: {:?}", s.metadata());
upsert_grpc_event(
&w,
&GrpcEvent {
metadata: Json(metadata_to_map(s.metadata().clone())),
content: if s.metadata().len() == 0 {
"Connection established"
} else {
"Received metadata"
}
.to_string(),
event_type: GrpcEventType::Info,
..base_msg.clone()
},
)
.await
.unwrap();
s.into_inner()
}
Some(Ok(Err(e))) => {
// TODO: Make into error, and use status
println!("Connection status error: {:?}", e);
upsert_grpc_message(
upsert_grpc_connection(
&w,
&GrpcMessage {
message: e.message().to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
&GrpcConnection {
error: Some(e.message().to_string()),
status: e.code() as i64,
elapsed: start.elapsed().as_millis() as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
@@ -421,15 +504,13 @@ async fn cmd_grpc_go(
return;
}
Some(Err(e)) => {
// TODO: Make into error
println!("Generic error: {:?}", e);
upsert_grpc_message(
upsert_grpc_connection(
&w,
&GrpcMessage {
message: e.to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
&GrpcConnection {
error: Some(e),
status: Code::Unknown as i64,
elapsed: start.elapsed().as_millis() as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
@@ -443,11 +524,11 @@ async fn cmd_grpc_go(
match stream.message().await {
Ok(Some(msg)) => {
let message = serialize_message(&msg).unwrap();
upsert_grpc_message(
upsert_grpc_event(
&w,
&GrpcMessage {
message,
is_server: true,
&GrpcEvent {
content: message,
event_type: GrpcEventType::ServerMessage,
..base_msg.clone()
},
)
@@ -455,16 +536,18 @@ async fn cmd_grpc_go(
.unwrap();
}
Ok(None) => {
// TODO: Store trailers on connection
let trailers = stream.trailers().await.unwrap_or_default();
info!("gRPC stream closed by sender {:?}", trailers,);
// TODO: Mark this on connection instead
upsert_grpc_message(
let trailers = stream
.trailers()
.await
.unwrap_or_default()
.unwrap_or_default();
upsert_grpc_connection(
&w,
&GrpcMessage {
message: "Connection closed".to_string(),
is_info: true,
..base_msg.clone()
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unavailable as i64,
trailers: Json(metadata_to_map(trailers)),
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
@@ -472,15 +555,13 @@ async fn cmd_grpc_go(
break;
}
Err(status) => {
// TODO: Make into error
println!("Error status: {:?}", status);
upsert_grpc_message(
upsert_grpc_connection(
&w,
&GrpcMessage {
message: status.message().to_string(),
is_server: true,
is_info: true,
..base_msg.clone()
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unavailable as i64,
trailers: Json(metadata_to_map(status.metadata().clone())),
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
.await
@@ -492,36 +573,31 @@ async fn cmd_grpc_go(
};
{
let conn = conn.clone();
let conn_id = conn_id.clone();
tauri::async_runtime::spawn(async move {
let w = w.clone();
tokio::select! {
_ = grpc_listen => {
upsert_grpc_connection(
&w,
&GrpcConnection{
elapsed: start.elapsed().as_millis() as i64,
..conn
},
).await.unwrap();
// upsert_grpc_connection(
// &w,
// &GrpcConnection{
// elapsed: start.elapsed().as_millis() as i64,
// status: Code::Ok as i64,
// ..conn
// },
// ).await.unwrap();
},
_ = cancelled_rx.changed() => {
upsert_grpc_message(
&w,
&GrpcMessage {
message: "Connection cancelled".to_string(),
is_info: true,
..base_msg.clone()
},
)
.await.unwrap();
upsert_grpc_connection(
&w,
&GrpcConnection{
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
..conn
status: Code::Cancelled as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
).await.unwrap();
)
.await
.unwrap();
},
}
w.unlisten(event_handler);
@@ -1037,11 +1113,8 @@ async fn cmd_list_grpc_connections(
}
#[tauri::command]
async fn cmd_list_grpc_messages(
connection_id: &str,
w: Window,
) -> Result<Vec<GrpcMessage>, String> {
list_grpc_messages(&w, connection_id)
async fn cmd_list_grpc_events(connection_id: &str, w: Window) -> Result<Vec<GrpcEvent>, String> {
list_grpc_events(&w, connection_id)
.await
.map_err(|e| e.to_string())
}
@@ -1334,7 +1407,7 @@ fn main() {
cmd_list_http_requests,
cmd_list_grpc_requests,
cmd_list_grpc_connections,
cmd_list_grpc_messages,
cmd_list_grpc_events,
cmd_list_http_responses,
cmd_list_workspaces,
cmd_new_window,

View File

@@ -231,33 +231,41 @@ pub struct GrpcConnection {
pub service: String,
pub method: String,
pub elapsed: i64,
pub status: i64,
pub url: String,
pub error: Option<String>,
pub trailers: Json<HashMap<String, String>>,
}
#[derive(sqlx::Type, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[sqlx(rename_all = "snake_case")]
pub enum GrpcEventType {
Info,
Error,
ClientMessage,
ServerMessage,
ConnectionResponse,
}
impl Default for GrpcEventType {
fn default() -> Self {
GrpcEventType::Info
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcMessage {
pub struct GrpcEvent {
pub id: String,
pub model: String,
pub workspace_id: String,
pub request_id: String,
pub connection_id: String,
pub created_at: NaiveDateTime,
pub message: String,
pub is_server: bool,
pub is_info: bool,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcResponse {
pub id: String,
pub model: String,
pub workspace_id: String,
pub grpc_endpoint_id: String,
pub grpc_connection_id: String,
pub request_id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub content: String,
pub event_type: GrpcEventType,
pub metadata: Json<HashMap<String, String>>,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -612,14 +620,19 @@ pub async fn upsert_grpc_connection(
sqlx::query!(
r#"
INSERT INTO grpc_connections (
id, workspace_id, request_id, service, method, elapsed
id, workspace_id, request_id, service, method, elapsed,
status, error, trailers, url
)
VALUES (?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
service = excluded.service,
method = excluded.method,
elapsed = excluded.elapsed
elapsed = excluded.elapsed,
status = excluded.status,
error = excluded.error,
trailers = excluded.trailers,
url = excluded.url
"#,
id,
connection.workspace_id,
@@ -627,6 +640,10 @@ pub async fn upsert_grpc_connection(
connection.service,
connection.method,
connection.elapsed,
connection.status,
connection.error,
connection.trailers,
connection.url,
)
.execute(&db)
.await?;
@@ -647,7 +664,8 @@ pub async fn get_grpc_connection(
r#"
SELECT
id, model, workspace_id, request_id, created_at, updated_at, service,
method, elapsed
method, elapsed, status, error, url,
trailers AS "trailers!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_connections
WHERE id = ?
"#,
@@ -667,7 +685,8 @@ pub async fn list_grpc_connections(
r#"
SELECT
id, model, workspace_id, request_id, created_at, updated_at, service,
method, elapsed
method, elapsed, status, error, url,
trailers AS "trailers!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_connections
WHERE request_id = ?
ORDER BY created_at DESC
@@ -678,56 +697,57 @@ pub async fn list_grpc_connections(
.await
}
pub async fn upsert_grpc_message(
pub async fn upsert_grpc_event(
mgr: &impl Manager<Wry>,
message: &GrpcMessage,
) -> Result<GrpcMessage, sqlx::Error> {
message: &GrpcEvent,
) -> Result<GrpcEvent, sqlx::Error> {
let db = get_db(mgr).await;
let id = match message.id.as_str() {
"" => generate_id(Some("gm")),
"" => generate_id(Some("ge")),
_ => message.id.to_string(),
};
sqlx::query!(
r#"
INSERT INTO grpc_messages (
id, workspace_id, request_id, connection_id, message, is_server, is_info
INSERT INTO grpc_events (
id, workspace_id, request_id, connection_id, content, event_type, metadata
)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
message = excluded.message,
is_server = excluded.is_server,
is_info = excluded.is_info
content = excluded.content,
event_type = excluded.event_type,
metadata = excluded.metadata
"#,
id,
message.workspace_id,
message.request_id,
message.connection_id,
message.message,
message.is_server,
message.is_info,
message.content,
message.event_type,
message.metadata,
)
.execute(&db)
.await?;
match get_grpc_message(mgr, &id).await {
match get_grpc_event(mgr, &id).await {
Ok(m) => Ok(emit_upserted_model(mgr, m)),
Err(e) => Err(e),
}
}
pub async fn get_grpc_message(
pub async fn get_grpc_event(
mgr: &impl Manager<Wry>,
id: &str,
) -> Result<GrpcMessage, sqlx::Error> {
) -> Result<GrpcEvent, sqlx::Error> {
let db = get_db(mgr).await;
sqlx::query_as!(
GrpcMessage,
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
id, model, workspace_id, request_id, connection_id, created_at, content,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events
WHERE id = ?
"#,
id,
@@ -736,18 +756,19 @@ pub async fn get_grpc_message(
.await
}
pub async fn list_grpc_messages(
pub async fn list_grpc_events(
mgr: &impl Manager<Wry>,
connection_id: &str,
) -> Result<Vec<GrpcMessage>, sqlx::Error> {
) -> Result<Vec<GrpcEvent>, sqlx::Error> {
let db = get_db(mgr).await;
sqlx::query_as!(
GrpcMessage,
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
id, model, workspace_id, request_id, connection_id, created_at, content,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events
WHERE connection_id = ?
"#,
connection_id,