gRPC models and tables

This commit is contained in:
Gregory Schier
2024-02-03 11:14:42 -08:00
parent d148a8384d
commit d8d5344d21
38 changed files with 892 additions and 311 deletions

View File

@@ -17,7 +17,7 @@ use std::str::FromStr;
use ::http::uri::InvalidUri;
use ::http::Uri;
use fern::colors::ColoredLevelConfig;
use futures::{Stream, StreamExt};
use futures::StreamExt;
use log::{debug, error, info, warn};
use rand::random;
use serde::Serialize;
@@ -46,7 +46,7 @@ use crate::models::{
delete_environment, delete_folder, delete_request, delete_response, delete_workspace,
duplicate_request, find_cookie_jars, find_environments, find_folders, find_requests,
find_responses, find_workspaces, generate_id, get_cookie_jar, get_environment, get_folder,
get_key_value_raw, get_or_create_settings, get_request, get_response, get_workspace,
get_http_request, get_key_value_raw, get_or_create_settings, get_response, get_workspace,
get_workspace_export_resources, set_key_value_raw, update_response_if_id, update_settings,
upsert_cookie_jar, upsert_environment, upsert_folder, upsert_request, upsert_workspace,
CookieJar, Environment, EnvironmentVariable, Folder, HttpRequest, HttpResponse, KeyValue,
@@ -138,8 +138,8 @@ async fn cmd_grpc_bidi_streaming(
app_handle: AppHandle<Wry>,
grpc_handle: State<'_, Mutex<GrpcManager>>,
) -> Result<String, String> {
let (in_msg_tx, mut in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
@@ -522,7 +522,7 @@ async fn cmd_send_request(
) -> Result<HttpResponse, String> {
let db = &*db_state.lock().await;
let request = get_request(db, request_id)
let request = get_http_request(db, request_id)
.await
.expect("Failed to get request");
@@ -977,7 +977,7 @@ async fn cmd_get_request(
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<HttpRequest, String> {
let db = &*db_state.lock().await;
get_request(db, id).await.map_err(|e| e.to_string())
get_http_request(db, id).await.map_err(|e| e.to_string())
}
#[tauri::command]

View File

@@ -4,9 +4,9 @@ use std::fs;
use log::error;
use rand::distributions::{Alphanumeric, DistString};
use serde::{Deserialize, Serialize};
use sqlx::{Pool, Sqlite};
use sqlx::types::{Json, JsonValue};
use sqlx::types::chrono::NaiveDateTime;
use sqlx::types::{Json, JsonValue};
use sqlx::{Pool, Sqlite};
use tauri::AppHandle;
fn default_true() -> bool {
@@ -190,21 +190,19 @@ impl HttpResponse {
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcEndpoint {
pub struct GrpcRequest {
pub id: String,
pub model: String,
pub workspace_id: String,
pub request_id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub endpoint: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcMessage {
pub created_at: NaiveDateTime,
pub content: String,
pub folder_id: Option<String>,
pub name: String,
pub sort_priority: f64,
pub url: String,
pub service: Option<String>,
pub method: Option<String>,
pub message: String,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -213,24 +211,23 @@ pub struct GrpcConnection {
pub id: String,
pub model: String,
pub workspace_id: String,
pub grpc_endpoint_id: String,
pub request_id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub messages: Json<Vec<GrpcMessage>>,
pub service: String,
pub method: String,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcRequest {
pub struct GrpcMessage {
pub id: String,
pub model: String,
pub workspace_id: String,
pub grpc_endpoint_id: String,
pub grpc_connection_id: String,
pub request_id: String,
pub connection_id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub message: String,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -363,15 +360,8 @@ pub async fn find_workspaces(db: &Pool<Sqlite>) -> Result<Vec<Workspace>, sqlx::
Workspace,
r#"
SELECT
id,
model,
created_at,
updated_at,
name,
description,
setting_request_timeout,
setting_follow_redirects,
setting_validate_certificates,
id, model, created_at, updated_at, name, description, setting_request_timeout,
setting_follow_redirects, setting_validate_certificates,
variables AS "variables!: sqlx::types::Json<Vec<EnvironmentVariable>>"
FROM workspaces
"#,
@@ -385,15 +375,8 @@ pub async fn get_workspace(db: &Pool<Sqlite>, id: &str) -> Result<Workspace, sql
Workspace,
r#"
SELECT
id,
model,
created_at,
updated_at,
name,
description,
setting_request_timeout,
setting_follow_redirects,
setting_validate_certificates,
id, model, created_at, updated_at, name, description, setting_request_timeout,
setting_follow_redirects, setting_validate_certificates,
variables AS "variables!: sqlx::types::Json<Vec<EnvironmentVariable>>"
FROM workspaces WHERE id = ?
"#,
@@ -427,12 +410,7 @@ pub async fn get_cookie_jar(db: &Pool<Sqlite>, id: &str) -> Result<CookieJar, sq
CookieJar,
r#"
SELECT
id,
model,
created_at,
updated_at,
workspace_id,
name,
id, model, created_at, updated_at, workspace_id, name,
cookies AS "cookies!: sqlx::types::Json<Vec<JsonValue>>"
FROM cookie_jars WHERE id = ?
"#,
@@ -450,12 +428,7 @@ pub async fn find_cookie_jars(
CookieJar,
r#"
SELECT
id,
model,
created_at,
updated_at,
workspace_id,
name,
id, model, created_at, updated_at, workspace_id, name,
cookies AS "cookies!: sqlx::types::Json<Vec<JsonValue>>"
FROM cookie_jars WHERE workspace_id = ?
"#,
@@ -481,6 +454,208 @@ pub async fn delete_cookie_jar(db: &Pool<Sqlite>, id: &str) -> Result<CookieJar,
Ok(cookie_jar)
}
pub async fn upsert_grpc_request(
db: &Pool<Sqlite>,
request: &GrpcRequest,
) -> Result<GrpcRequest, sqlx::Error> {
let id = match request.id.as_str() {
"" => generate_id(Some("gr")),
_ => request.id.to_string(),
};
let trimmed_name = request.name.trim();
sqlx::query!(
r#"
INSERT INTO grpc_requests (
id, name, workspace_id, folder_id, sort_priority, url, service, method, message
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
name = excluded.name,
folder_id = excluded.folder_id,
sort_priority = excluded.sort_priority,
url = excluded.url,
service = excluded.service,
method = excluded.method,
message = excluded.message
"#,
id,
trimmed_name,
request.workspace_id,
request.folder_id,
request.sort_priority,
request.url,
request.service,
request.method,
request.message,
)
.execute(db)
.await?;
get_grpc_request(db, &id).await
}
pub async fn get_grpc_request(db: &Pool<Sqlite>, id: &str) -> Result<GrpcRequest, sqlx::Error> {
sqlx::query_as!(
GrpcRequest,
r#"
SELECT
id, model, workspace_id, folder_id, created_at, updated_at, name, sort_priority,
url, service, method, message
FROM grpc_requests
WHERE id = ?
"#,
id,
)
.fetch_one(db)
.await
}
pub async fn list_grpc_requests(
db: &Pool<Sqlite>,
workspace_id: &str,
) -> Result<Vec<GrpcRequest>, sqlx::Error> {
sqlx::query_as!(
GrpcRequest,
r#"
SELECT
id, model, workspace_id, folder_id, created_at, updated_at, name, sort_priority,
url, service, method, message
FROM grpc_requests
WHERE workspace_id = ?
"#,
workspace_id,
)
.fetch_all(db)
.await
}
pub async fn upsert_grpc_connection(
db: &Pool<Sqlite>,
connection: &GrpcConnection,
) -> Result<GrpcConnection, sqlx::Error> {
let id = match connection.id.as_str() {
"" => generate_id(Some("gc")),
_ => connection.id.to_string(),
};
sqlx::query!(
r#"
INSERT INTO grpc_connections (
id, workspace_id, request_id, service, method
)
VALUES ( ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
service = excluded.service,
method = excluded.method
"#,
id,
connection.workspace_id,
connection.request_id,
connection.service,
connection.method,
)
.execute(db)
.await?;
get_grpc_connection(db, &id).await
}
pub async fn get_grpc_connection(
db: &Pool<Sqlite>,
id: &str,
) -> Result<GrpcConnection, sqlx::Error> {
sqlx::query_as!(
GrpcConnection,
r#"
SELECT id, model, workspace_id, request_id, created_at, updated_at, service, method
FROM grpc_connections
WHERE id = ?
"#,
id,
)
.fetch_one(db)
.await
}
pub async fn list_grpc_connections(
db: &Pool<Sqlite>,
workspace_id: &str,
) -> Result<Vec<GrpcConnection>, sqlx::Error> {
sqlx::query_as!(
GrpcConnection,
r#"
SELECT id, model, workspace_id, request_id, created_at, updated_at, service, method
FROM grpc_connections
WHERE workspace_id = ?
"#,
workspace_id,
)
.fetch_all(db)
.await
}
pub async fn upsert_grpc_message(
db: &Pool<Sqlite>,
message: &GrpcMessage,
) -> Result<GrpcMessage, sqlx::Error> {
let id = match message.id.as_str() {
"" => generate_id(Some("gm")),
_ => message.id.to_string(),
};
sqlx::query!(
r#"
INSERT INTO grpc_messages (
id, workspace_id, request_id, connection_id, message
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
message = excluded.message
"#,
id,
message.workspace_id,
message.request_id,
message.connection_id,
message.message,
)
.execute(db)
.await?;
crate::models::get_grpc_message(db, &id).await
}
pub async fn get_grpc_message(db: &Pool<Sqlite>, id: &str) -> Result<GrpcMessage, sqlx::Error> {
sqlx::query_as!(
GrpcMessage,
r#"
SELECT id, model, workspace_id, request_id, connection_id, created_at, message
FROM grpc_messages
WHERE id = ?
"#,
id,
)
.fetch_one(db)
.await
}
pub async fn list_grpc_messages(
db: &Pool<Sqlite>,
workspace_id: &str,
) -> Result<Vec<GrpcMessage>, sqlx::Error> {
sqlx::query_as!(
GrpcMessage,
r#"
SELECT id, model, workspace_id, request_id, connection_id, created_at, message
FROM grpc_messages
WHERE workspace_id = ?
"#,
workspace_id,
)
.fetch_all(db)
.await
}
pub async fn upsert_cookie_jar(
db: &Pool<Sqlite>,
cookie_jar: &CookieJar,
@@ -493,10 +668,7 @@ pub async fn upsert_cookie_jar(
sqlx::query!(
r#"
INSERT INTO cookie_jars (
id,
workspace_id,
name,
cookies
id, workspace_id, name, cookies
)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
@@ -553,13 +725,7 @@ async fn get_settings(db: &Pool<Sqlite>) -> Result<Settings, sqlx::Error> {
Settings,
r#"
SELECT
id,
model,
created_at,
updated_at,
theme,
appearance,
update_channel
id, model, created_at, updated_at, theme, appearance, update_channel
FROM settings
WHERE id = 'default'
"#,
@@ -593,9 +759,7 @@ pub async fn update_settings(
sqlx::query!(
r#"
UPDATE settings SET (
theme,
appearance,
update_channel
theme, appearance, update_channel
) = (?, ?, ?) WHERE id = 'default';
"#,
settings.theme,
@@ -619,10 +783,7 @@ pub async fn upsert_environment(
sqlx::query!(
r#"
INSERT INTO environments (
id,
workspace_id,
name,
variables
id, workspace_id, name, variables
)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
@@ -645,12 +806,7 @@ pub async fn get_environment(db: &Pool<Sqlite>, id: &str) -> Result<Environment,
Environment,
r#"
SELECT
id,
model,
workspace_id,
created_at,
updated_at,
name,
id, model, workspace_id, created_at, updated_at, name,
variables AS "variables!: sqlx::types::Json<Vec<EnvironmentVariable>>"
FROM environments
WHERE id = ?
@@ -666,14 +822,7 @@ pub async fn get_folder(db: &Pool<Sqlite>, id: &str) -> Result<Folder, sqlx::Err
Folder,
r#"
SELECT
id,
model,
workspace_id,
created_at,
updated_at,
folder_id,
name,
sort_priority
id, model, workspace_id, created_at, updated_at, folder_id, name, sort_priority
FROM folders
WHERE id = ?
"#,
@@ -691,14 +840,7 @@ pub async fn find_folders(
Folder,
r#"
SELECT
id,
model,
workspace_id,
created_at,
updated_at,
folder_id,
name,
sort_priority
id, model, workspace_id, created_at, updated_at, folder_id, name, sort_priority
FROM folders
WHERE workspace_id = ?
"#,
@@ -733,11 +875,7 @@ pub async fn upsert_folder(db: &Pool<Sqlite>, r: Folder) -> Result<Folder, sqlx:
sqlx::query!(
r#"
INSERT INTO folders (
id,
workspace_id,
folder_id,
name,
sort_priority
id, workspace_id, folder_id, name, sort_priority
)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
@@ -759,7 +897,7 @@ pub async fn upsert_folder(db: &Pool<Sqlite>, r: Folder) -> Result<Folder, sqlx:
}
pub async fn duplicate_request(db: &Pool<Sqlite>, id: &str) -> Result<HttpRequest, sqlx::Error> {
let mut request = get_request(db, id).await?.clone();
let mut request = get_http_request(db, id).await?.clone();
request.id = "".to_string();
upsert_request(db, request).await
}
@@ -776,19 +914,8 @@ pub async fn upsert_request(db: &Pool<Sqlite>, r: HttpRequest) -> Result<HttpReq
sqlx::query!(
r#"
INSERT INTO http_requests (
id,
workspace_id,
folder_id,
name,
url,
url_parameters,
method,
body,
body_type,
authentication,
authentication_type,
headers,
sort_priority
id, workspace_id, folder_id, name, url, url_parameters, method, body, body_type,
authentication, authentication_type, headers, sort_priority
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
@@ -822,7 +949,7 @@ pub async fn upsert_request(db: &Pool<Sqlite>, r: HttpRequest) -> Result<HttpReq
.execute(db)
.await?;
get_request(db, &id).await
get_http_request(db, &id).await
}
pub async fn find_requests(
@@ -833,21 +960,11 @@ pub async fn find_requests(
HttpRequest,
r#"
SELECT
id,
model,
workspace_id,
folder_id,
created_at,
updated_at,
name,
url,
id, model, workspace_id, folder_id, created_at, updated_at, name, url,
url_parameters AS "url_parameters!: sqlx::types::Json<Vec<HttpUrlParameter>>",
method,
method, body_type, authentication_type, sort_priority,
body AS "body!: Json<HashMap<String, JsonValue>>",
body_type,
authentication AS "authentication!: Json<HashMap<String, JsonValue>>",
authentication_type,
sort_priority,
headers AS "headers!: sqlx::types::Json<Vec<HttpRequestHeader>>"
FROM http_requests
WHERE workspace_id = ?
@@ -858,26 +975,16 @@ pub async fn find_requests(
.await
}
pub async fn get_request(db: &Pool<Sqlite>, id: &str) -> Result<HttpRequest, sqlx::Error> {
pub async fn get_http_request(db: &Pool<Sqlite>, id: &str) -> Result<HttpRequest, sqlx::Error> {
sqlx::query_as!(
HttpRequest,
r#"
SELECT
id,
model,
workspace_id,
folder_id,
created_at,
updated_at,
name,
url,
id, model, workspace_id, folder_id, created_at, updated_at, name, url, method,
body_type, authentication_type, sort_priority,
url_parameters AS "url_parameters!: sqlx::types::Json<Vec<HttpUrlParameter>>",
method,
body AS "body!: Json<HashMap<String, JsonValue>>",
body_type,
authentication AS "authentication!: Json<HashMap<String, JsonValue>>",
authentication_type,
sort_priority,
headers AS "headers!: sqlx::types::Json<Vec<HttpRequestHeader>>"
FROM http_requests
WHERE id = ?
@@ -889,7 +996,7 @@ pub async fn get_request(db: &Pool<Sqlite>, id: &str) -> Result<HttpRequest, sql
}
pub async fn delete_request(db: &Pool<Sqlite>, id: &str) -> Result<HttpRequest, sqlx::Error> {
let req = get_request(db, id).await?;
let req = get_http_request(db, id).await?;
// DB deletes will cascade but this will delete the files
delete_all_responses(db, id).await?;
@@ -922,25 +1029,14 @@ pub async fn create_response(
version: Option<&str>,
remote_addr: Option<&str>,
) -> Result<HttpResponse, sqlx::Error> {
let req = get_request(db, request_id).await?;
let req = get_http_request(db, request_id).await?;
let id = generate_id(Some("rp"));
let headers_json = Json(headers);
sqlx::query!(
r#"
INSERT INTO http_responses (
id,
request_id,
workspace_id,
elapsed,
elapsed_headers,
url,
status,
status_reason,
content_length,
body_path,
headers,
version,
remote_addr
id, request_id, workspace_id, elapsed, elapsed_headers, url, status, status_reason,
content_length, body_path, headers, version, remote_addr
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"#,
@@ -1000,13 +1096,8 @@ pub async fn upsert_workspace(
sqlx::query!(
r#"
INSERT INTO workspaces (
id,
name,
description,
variables,
setting_request_timeout,
setting_follow_redirects,
setting_validate_certificates
id, name, description, variables, setting_request_timeout,
setting_follow_redirects, setting_validate_certificates
)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
@@ -1040,18 +1131,8 @@ pub async fn update_response(
sqlx::query!(
r#"
UPDATE http_responses SET (
elapsed,
elapsed_headers,
url,
status,
status_reason,
content_length,
body_path,
error,
headers,
version,
remote_addr,
updated_at
elapsed, elapsed_headers, url, status, status_reason, content_length, body_path,
error, headers, version, remote_addr, updated_at
) = (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) WHERE id = ?;
"#,
response.elapsed,