Refactor into grpc events

This commit is contained in:
Gregory Schier
2024-02-22 00:49:22 -08:00
parent 8ef103fbde
commit 0cfb218b07
31 changed files with 851 additions and 595 deletions

View File

@@ -231,33 +231,41 @@ pub struct GrpcConnection {
pub service: String,
pub method: String,
pub elapsed: i64,
pub status: i64,
pub url: String,
pub error: Option<String>,
pub trailers: Json<HashMap<String, String>>,
}
#[derive(sqlx::Type, Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
#[sqlx(rename_all = "snake_case")]
pub enum GrpcEventType {
Info,
Error,
ClientMessage,
ServerMessage,
ConnectionResponse,
}
impl Default for GrpcEventType {
fn default() -> Self {
GrpcEventType::Info
}
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcMessage {
pub struct GrpcEvent {
pub id: String,
pub model: String,
pub workspace_id: String,
pub request_id: String,
pub connection_id: String,
pub created_at: NaiveDateTime,
pub message: String,
pub is_server: bool,
pub is_info: bool,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
#[serde(default, rename_all = "camelCase")]
pub struct GrpcResponse {
pub id: String,
pub model: String,
pub workspace_id: String,
pub grpc_endpoint_id: String,
pub grpc_connection_id: String,
pub request_id: String,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub content: String,
pub event_type: GrpcEventType,
pub metadata: Json<HashMap<String, String>>,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -612,14 +620,19 @@ pub async fn upsert_grpc_connection(
sqlx::query!(
r#"
INSERT INTO grpc_connections (
id, workspace_id, request_id, service, method, elapsed
id, workspace_id, request_id, service, method, elapsed,
status, error, trailers, url
)
VALUES (?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
service = excluded.service,
method = excluded.method,
elapsed = excluded.elapsed
elapsed = excluded.elapsed,
status = excluded.status,
error = excluded.error,
trailers = excluded.trailers,
url = excluded.url
"#,
id,
connection.workspace_id,
@@ -627,6 +640,10 @@ pub async fn upsert_grpc_connection(
connection.service,
connection.method,
connection.elapsed,
connection.status,
connection.error,
connection.trailers,
connection.url,
)
.execute(&db)
.await?;
@@ -647,7 +664,8 @@ pub async fn get_grpc_connection(
r#"
SELECT
id, model, workspace_id, request_id, created_at, updated_at, service,
method, elapsed
method, elapsed, status, error, url,
trailers AS "trailers!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_connections
WHERE id = ?
"#,
@@ -667,7 +685,8 @@ pub async fn list_grpc_connections(
r#"
SELECT
id, model, workspace_id, request_id, created_at, updated_at, service,
method, elapsed
method, elapsed, status, error, url,
trailers AS "trailers!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_connections
WHERE request_id = ?
ORDER BY created_at DESC
@@ -678,56 +697,57 @@ pub async fn list_grpc_connections(
.await
}
pub async fn upsert_grpc_message(
pub async fn upsert_grpc_event(
mgr: &impl Manager<Wry>,
message: &GrpcMessage,
) -> Result<GrpcMessage, sqlx::Error> {
message: &GrpcEvent,
) -> Result<GrpcEvent, sqlx::Error> {
let db = get_db(mgr).await;
let id = match message.id.as_str() {
"" => generate_id(Some("gm")),
"" => generate_id(Some("ge")),
_ => message.id.to_string(),
};
sqlx::query!(
r#"
INSERT INTO grpc_messages (
id, workspace_id, request_id, connection_id, message, is_server, is_info
INSERT INTO grpc_events (
id, workspace_id, request_id, connection_id, content, event_type, metadata
)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
message = excluded.message,
is_server = excluded.is_server,
is_info = excluded.is_info
content = excluded.content,
event_type = excluded.event_type,
metadata = excluded.metadata
"#,
id,
message.workspace_id,
message.request_id,
message.connection_id,
message.message,
message.is_server,
message.is_info,
message.content,
message.event_type,
message.metadata,
)
.execute(&db)
.await?;
match get_grpc_message(mgr, &id).await {
match get_grpc_event(mgr, &id).await {
Ok(m) => Ok(emit_upserted_model(mgr, m)),
Err(e) => Err(e),
}
}
pub async fn get_grpc_message(
pub async fn get_grpc_event(
mgr: &impl Manager<Wry>,
id: &str,
) -> Result<GrpcMessage, sqlx::Error> {
) -> Result<GrpcEvent, sqlx::Error> {
let db = get_db(mgr).await;
sqlx::query_as!(
GrpcMessage,
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
id, model, workspace_id, request_id, connection_id, created_at, content,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events
WHERE id = ?
"#,
id,
@@ -736,18 +756,19 @@ pub async fn get_grpc_message(
.await
}
pub async fn list_grpc_messages(
pub async fn list_grpc_events(
mgr: &impl Manager<Wry>,
connection_id: &str,
) -> Result<Vec<GrpcMessage>, sqlx::Error> {
) -> Result<Vec<GrpcEvent>, sqlx::Error> {
let db = get_db(mgr).await;
sqlx::query_as!(
GrpcMessage,
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, message,
is_server, is_info
FROM grpc_messages
id, model, workspace_id, request_id, connection_id, created_at, content,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events
WHERE connection_id = ?
"#,
connection_id,