Use req/conn/msg models in unary/server

This commit is contained in:
Gregory Schier
2024-02-04 11:57:12 -08:00
parent 3a340999ec
commit 1abba4980a
27 changed files with 497 additions and 233 deletions

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT id, model, workspace_id, request_id, connection_id, created_at, message\n FROM grpc_messages\n WHERE id = ?\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE connection_id = ?\n ",
"describe": {
"columns": [
{
@@ -37,6 +37,16 @@
"name": "message",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "is_server",
"ordinal": 7,
"type_info": "Bool"
},
{
"name": "is_info",
"ordinal": 8,
"type_info": "Bool"
}
],
"parameters": {
@@ -49,8 +59,10 @@
false,
false,
false,
false,
false,
false
]
},
"hash": "f055a2b6ab6bc3fea115dbea3f287df833c9bb73cdf2fe38da21fe5f5f6ae639"
"hash": "196ed792c8d96425d428cb9609b0c1b18e8f1ba3c1fdfb38c91ffd7bada97f59"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT id, model, workspace_id, request_id, connection_id, created_at, message\n FROM grpc_messages\n WHERE workspace_id = ?\n ",
"query": "\n SELECT\n id, model, workspace_id, request_id, connection_id, created_at, message,\n is_server, is_info\n FROM grpc_messages\n WHERE id = ?\n ",
"describe": {
"columns": [
{
@@ -37,6 +37,16 @@
"name": "message",
"ordinal": 6,
"type_info": "Text"
},
{
"name": "is_server",
"ordinal": 7,
"type_info": "Bool"
},
{
"name": "is_info",
"ordinal": 8,
"type_info": "Bool"
}
],
"parameters": {
@@ -49,8 +59,10 @@
false,
false,
false,
false,
false,
false
]
},
"hash": "a86d3e86c5638d5f666c0cbe26dd5b3e88c632c56c7c4f3f116a29867e41e6a2"
"hash": "3c52c0fa3372cdd2657a775c3b93fb65f42d3226cec27220469558e14973328c"
}

View File

@@ -1,12 +0,0 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_messages (\n id, workspace_id, request_id, connection_id, message\n )\n VALUES (?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n message = excluded.message\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 5
},
"nullable": []
},
"hash": "4abfe0884ba046534dc6c255409c41ce287d7b0137726b8ca09870382a8b8300"
}

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n INSERT INTO grpc_messages (\n id, workspace_id, request_id, connection_id, message, is_server, is_info\n )\n VALUES (?, ?, ?, ?, ?, ?, ?)\n ON CONFLICT (id) DO UPDATE SET\n updated_at = CURRENT_TIMESTAMP,\n message = excluded.message,\n is_server = excluded.is_server,\n is_info = excluded.is_info\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 7
},
"nullable": []
},
"hash": "4b45b681698cbfe8531a7c3ba368a1d8003fa17d5585bc126debb18cae670460"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT id, model, workspace_id, request_id, created_at, updated_at, service, method\n FROM grpc_connections\n WHERE workspace_id = ?\n ",
"query": "\n SELECT id, model, workspace_id, request_id, created_at, updated_at, service, method\n FROM grpc_connections\n WHERE request_id = ?\n ORDER BY created_at DESC\n ",
"describe": {
"columns": [
{
@@ -58,5 +58,5 @@
false
]
},
"hash": "29d3bee45c4fca4c63231e1205edc708818737fe37137dbba6af2d784c3c0221"
"hash": "a7b969f33ed0424188b429227d6e3fac2bef52f2e1b0eb1d3846d1293d41f86c"
}

View File

@@ -1,7 +1,7 @@
use std::collections::HashMap;
use prost_reflect::{DescriptorPool, MessageDescriptor};
use prost_types::field_descriptor_proto;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Default, Serialize, Deserialize)]
#[serde(default, rename_all = "camelCase")]
@@ -50,8 +50,8 @@ impl Default for JsonType {
impl serde::Serialize for JsonType {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
where
S: serde::Serializer,
{
match self {
JsonType::String => serializer.serialize_str("string"),
@@ -67,8 +67,8 @@ impl serde::Serialize for JsonType {
impl<'de> serde::Deserialize<'de> for JsonType {
fn deserialize<D>(deserializer: D) -> Result<JsonType, D::Error>
where
D: serde::Deserializer<'de>,
where
D: serde::Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
match s.as_str() {
@@ -83,7 +83,10 @@ impl<'de> serde::Deserialize<'de> for JsonType {
}
}
pub fn message_to_json_schema(pool: &DescriptorPool, message: MessageDescriptor) -> JsonSchemaEntry {
pub fn message_to_json_schema(
pool: &DescriptorPool,
message: MessageDescriptor,
) -> JsonSchemaEntry {
let mut schema = JsonSchemaEntry {
title: Some(message.name().to_string()),
type_: JsonType::Object, // Messages are objects

View File

@@ -52,5 +52,7 @@ CREATE TABLE grpc_messages
ON DELETE CASCADE,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP NOT NULL,
is_server BOOLEAN NOT NULL,
is_info BOOLEAN NOT NULL,
message TEXT NOT NULL
);

View File

@@ -44,14 +44,15 @@ use crate::http::send_http_request;
use crate::models::{
cancel_pending_responses, create_response, delete_all_responses, delete_cookie_jar,
delete_environment, delete_folder, delete_request, delete_response, delete_workspace,
duplicate_grpc_request, duplicate_http_request, list_cookie_jars, list_folders, list_requests,
list_responses, list_workspaces, generate_id, get_cookie_jar, get_environment, get_folder,
get_grpc_request, get_http_request, get_key_value_raw, get_or_create_settings, get_response,
get_workspace, get_workspace_export_resources, list_environments, list_grpc_requests,
set_key_value_raw, update_response_if_id, update_settings, upsert_cookie_jar,
upsert_environment, upsert_folder, upsert_grpc_request, upsert_http_request, upsert_workspace,
CookieJar, Environment, EnvironmentVariable, Folder, GrpcRequest, HttpRequest, HttpResponse,
KeyValue, Settings, Workspace,
duplicate_grpc_request, duplicate_http_request, generate_id, get_cookie_jar, get_environment,
get_folder, get_grpc_request, get_http_request, get_key_value_raw, get_or_create_settings,
get_response, get_workspace, get_workspace_export_resources, list_cookie_jars,
list_environments, list_folders, list_grpc_connections, list_grpc_messages, list_grpc_requests,
list_requests, list_responses, list_workspaces, set_key_value_raw, update_response_if_id,
update_settings, upsert_cookie_jar, upsert_environment, upsert_folder, upsert_grpc_connection,
upsert_grpc_message, upsert_grpc_request, upsert_http_request, upsert_workspace, CookieJar,
Environment, EnvironmentVariable, Folder, GrpcConnection, GrpcMessage, GrpcRequest,
HttpRequest, HttpResponse, KeyValue, Settings, Workspace,
};
use crate::plugin::{ImportResources, ImportResult};
use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater};
@@ -98,20 +99,80 @@ async fn cmd_grpc_reflect(endpoint: &str) -> Result<Vec<ServiceDefinition>, Stri
#[tauri::command]
async fn cmd_grpc_call_unary(
endpoint: &str,
service: &str,
method: &str,
message: &str,
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
) -> Result<String, String> {
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
grpc_handle
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcMessage, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let req = req.clone();
let conn = conn.clone();
upsert_grpc_message(
db,
&GrpcMessage {
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
message: format!("Initiating connection to {}", req.url),
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?;
};
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let conn_id = generate_id(Some("grpc"));
let msg = match grpc_handle
.lock()
.await
.connect("default", uri)
.connect(&conn_id, uri)
.await
.unary(service, method, message)
.unary(
&req.service.unwrap_or_default(),
&req.method.unwrap_or_default(),
&req.message,
)
.await
{
Ok(msg) => {
upsert_grpc_message(
db,
&GrpcMessage {
message: msg,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_server: true,
..Default::default()
},
)
.await
}
Err(e) => return Err(e.to_string()),
};
msg.map_err(|e| e.to_string())
}
#[tauri::command]
@@ -235,27 +296,47 @@ async fn cmd_grpc_bidi_streaming(
#[tauri::command]
async fn cmd_grpc_server_streaming(
endpoint: &str,
service: &str,
method: &str,
message: &str,
request_id: &str,
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
) -> Result<String, String> {
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
db,
&GrpcConnection {
workspace_id: req.workspace_id,
request_id: req.id,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())?
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
let conn_id = generate_id(Some("grpc"));
let (service, method) = match (&req.service, &req.method) {
(Some(service), Some(method)) => (service, method),
_ => return Err("Service and method are required".to_string()),
};
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
let mut stream = grpc_handle
.lock()
.await
.server_streaming(&conn_id, uri, service, method, message)
.server_streaming(&conn.id, uri, &service, &method, &req.message)
.await
.unwrap();
#[derive(serde::Deserialize)]
enum GrpcMessage {
enum IncomingMsg {
Message(String),
Commit,
Cancel,
@@ -270,15 +351,15 @@ async fn cmd_grpc_server_streaming(
return;
}
match serde_json::from_str::<GrpcMessage>(ev.payload().unwrap()) {
Ok(GrpcMessage::Message(msg)) => {
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(msg)) => {
println!("Received message: {}", msg);
}
Ok(GrpcMessage::Commit) => {
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(GrpcMessage::Cancel) => {
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
@@ -289,19 +370,34 @@ async fn cmd_grpc_server_streaming(
}
};
let event_handler =
app_handle.listen_global(format!("grpc_client_msg_{}", conn_id).as_str(), cb);
app_handle.listen_global(format!("grpc_client_msg_{}", conn.id).as_str(), cb);
let grpc_listen = {
let db = db.clone();
let conn_id = conn.clone().id;
let app_handle = app_handle.clone();
let conn_id = conn_id.clone();
async move {
loop {
let req = req.clone();
let conn_id = conn_id.clone();
match stream.next().await {
Some(Ok(item)) => {
let item = serde_json::to_string_pretty(&item).unwrap();
app_handle
.emit_all(format!("grpc_server_msg_{}", &conn_id).as_str(), item)
.expect("Failed to emit");
let msg = upsert_grpc_message(
&db,
&GrpcMessage {
message: item,
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn_id,
is_server: true,
..Default::default()
},
)
.await
.map_err(|e| e.to_string())
.expect("Failed to upsert message");
emit_side_effect(app_handle.clone(), "created_model", msg);
}
Some(Err(e)) => {
error!("gRPC stream error: {:?}", e);
@@ -328,7 +424,7 @@ async fn cmd_grpc_server_streaming(
app_handle.unlisten(event_handler);
});
Ok(conn_id)
Ok(conn)
}
#[tauri::command]
@@ -783,7 +879,7 @@ async fn cmd_duplicate_grpc_request(
let request = duplicate_grpc_request(db, id)
.await
.expect("Failed to duplicate grpc request");
emit_and_return(&window, "updated_model", request)
emit_and_return(&window, "created_model", request)
}
#[tauri::command]
@@ -823,7 +919,7 @@ async fn cmd_duplicate_http_request(
let request = duplicate_http_request(db, id)
.await
.expect("Failed to duplicate http request");
emit_and_return(&window, "updated_model", request)
emit_and_return(&window, "created_model", request)
}
#[tauri::command]
@@ -982,6 +1078,28 @@ async fn cmd_delete_environment(
emit_and_return(&window, "deleted_model", req)
}
#[tauri::command]
async fn cmd_list_grpc_connections(
request_id: &str,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<Vec<GrpcConnection>, String> {
let db = &*db_state.lock().await;
list_grpc_connections(db, request_id)
.await
.map_err(|e| e.to_string())
}
#[tauri::command]
async fn cmd_list_grpc_messages(
connection_id: &str,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<Vec<GrpcMessage>, String> {
let db = &*db_state.lock().await;
list_grpc_messages(db, connection_id)
.await
.map_err(|e| e.to_string())
}
#[tauri::command]
async fn cmd_list_grpc_requests(
workspace_id: &str,
@@ -990,8 +1108,7 @@ async fn cmd_list_grpc_requests(
let db = &*db_state.lock().await;
let requests = list_grpc_requests(db, workspace_id)
.await
.expect("Failed to find grpc requests");
// .map_err(|e| e.to_string())
.map_err(|e| e.to_string())?;
Ok(requests)
}
@@ -1123,7 +1240,7 @@ async fn cmd_get_workspace(
}
#[tauri::command]
async fn cmd_list_responses(
async fn cmd_list_http_responses(
request_id: &str,
limit: Option<i64>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
@@ -1303,6 +1420,7 @@ fn main() {
cmd_delete_response,
cmd_delete_workspace,
cmd_duplicate_http_request,
cmd_duplicate_grpc_request,
cmd_export_data,
cmd_filter_response,
cmd_get_cookie_jar,
@@ -1324,7 +1442,9 @@ fn main() {
cmd_list_folders,
cmd_list_http_requests,
cmd_list_grpc_requests,
cmd_list_responses,
cmd_list_grpc_connections,
cmd_list_grpc_messages,
cmd_list_http_responses,
cmd_list_workspaces,
cmd_new_window,
cmd_send_ephemeral_request,

View File

@@ -228,6 +228,8 @@ pub struct GrpcMessage {
pub connection_id: String,
pub created_at: NaiveDateTime,
pub message: String,
pub is_server: bool,
pub is_info: bool,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -589,16 +591,17 @@ pub async fn get_grpc_connection(
pub async fn list_grpc_connections(
db: &Pool<Sqlite>,
workspace_id: &str,
request_id: &str,
) -> Result<Vec<GrpcConnection>, sqlx::Error> {
sqlx::query_as!(
GrpcConnection,
r#"
SELECT id, model, workspace_id, request_id, created_at, updated_at, service, method
FROM grpc_connections
WHERE workspace_id = ?
WHERE request_id = ?
ORDER BY created_at DESC
"#,
workspace_id,
request_id,
)
.fetch_all(db)
.await
@@ -615,30 +618,36 @@ pub async fn upsert_grpc_message(
sqlx::query!(
r#"
INSERT INTO grpc_messages (
id, workspace_id, request_id, connection_id, message
id, workspace_id, request_id, connection_id, message, is_server, is_info
)
VALUES (?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
message = excluded.message
message = excluded.message,
is_server = excluded.is_server,
is_info = excluded.is_info
"#,
id,
message.workspace_id,
message.request_id,
message.connection_id,
message.message,
message.is_server,
message.is_info,
)
.execute(db)
.await?;
crate::models::get_grpc_message(db, &id).await
get_grpc_message(db, &id).await
}
pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage, sqlx::Error> {
sqlx::query_as!(
GrpcMessage,
r#"
SELECT id, model, workspace_id, request_id, connection_id, created_at, message
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
WHERE id = ?
"#,
@@ -650,16 +659,18 @@ pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage
pub async fn list_grpc_messages(
db: &Pool<Sqlite>,
workspace_id: &str,
connection_id: &str,
) -> Result<Vec<GrpcMessage>, sqlx::Error> {
sqlx::query_as!(
GrpcMessage,
r#"
SELECT id, model, workspace_id, request_id, connection_id, created_at, message
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
WHERE workspace_id = ?
WHERE connection_id = ?
"#,
workspace_id,
connection_id,
)
.fetch_all(db)
.await