Everything in messages now

This commit is contained in:
Gregory Schier
2024-02-22 19:51:30 -08:00
parent e6c0317b37
commit f2955c26c1
10 changed files with 231 additions and 205 deletions

View File

@@ -340,7 +340,7 @@ async fn cmd_grpc_go(
let grpc_listen = {
let w = w.clone();
let base_msg = base_msg.clone();
let base_event = base_msg.clone();
let req = req.clone();
let workspace = workspace.clone();
let environment = environment.clone();
@@ -350,15 +350,14 @@ async fn cmd_grpc_go(
req.message
};
let msg = render::render(&raw_msg, &workspace, environment.as_ref());
let conn_id = conn_id.clone();
upsert_grpc_event(
&w,
&GrpcEvent {
content: format!("Connecting to {}", req.url),
event_type: GrpcEventType::Info,
event_type: GrpcEventType::ConnectionStart,
metadata: Json(metadata.clone()),
..base_msg.clone()
..base_event.clone()
},
)
.await
@@ -405,7 +404,7 @@ async fn cmd_grpc_go(
&GrpcEvent {
event_type: GrpcEventType::ClientMessage,
content: msg,
..base_msg.clone()
..base_event.clone()
},
)
.await
@@ -419,13 +418,13 @@ async fn cmd_grpc_go(
&GrpcEvent {
metadata: Json(metadata_to_map(msg.metadata().clone())),
content: if msg.metadata().len() == 0 {
"Connection established"
"Received response"
} else {
"Received metadata"
"Received response with metadata"
}
.to_string(),
event_type: GrpcEventType::Info,
..base_msg.clone()
..base_event.clone()
},
)
.await
@@ -435,30 +434,32 @@ async fn cmd_grpc_go(
&GrpcEvent {
content: serialize_message(&msg.into_inner()).unwrap(),
event_type: GrpcEventType::ServerMessage,
..base_msg.clone()
..base_event.clone()
},
)
.await
.unwrap();
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Ok as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
&GrpcEvent {
content: "Connection complete".to_string(),
event_type: GrpcEventType::ConnectionEnd,
status: Some(Code::Ok as i64),
..base_event.clone()
},
)
.await
.unwrap();
}
Some(Err(e)) => {
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
&GrpcEvent {
content: "Failed to connect".to_string(),
event_type: GrpcEventType::ConnectionEnd,
error: Some(e.to_string()),
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unknown as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
status: Some(Code::Unknown as i64),
..base_event.clone()
},
)
.await
@@ -470,33 +471,34 @@ async fn cmd_grpc_go(
}
let mut stream = match maybe_stream {
Some(Ok(Ok(s))) => {
Some(Ok(Ok(stream))) => {
upsert_grpc_event(
&w,
&GrpcEvent {
metadata: Json(metadata_to_map(s.metadata().clone())),
content: if s.metadata().len() == 0 {
"Connection established"
metadata: Json(metadata_to_map(stream.metadata().clone())),
content: if stream.metadata().len() == 0 {
"Received response"
} else {
"Received metadata"
"Received response with metadata"
}
.to_string(),
.to_string(),
event_type: GrpcEventType::Info,
..base_msg.clone()
..base_event.clone()
},
)
.await
.unwrap();
s.into_inner()
.await
.unwrap();
stream.into_inner()
}
Some(Ok(Err(e))) => {
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
&GrpcEvent {
error: Some(e.message().to_string()),
status: e.code() as i64,
elapsed: start.elapsed().as_millis() as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
status: Some(e.code() as i64),
content: e.code().description().to_string(),
event_type: GrpcEventType::ConnectionEnd,
..base_event.clone()
},
)
.await
@@ -504,13 +506,14 @@ async fn cmd_grpc_go(
return;
}
Some(Err(e)) => {
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
&GrpcEvent {
error: Some(e),
status: Code::Unknown as i64,
elapsed: start.elapsed().as_millis() as i64,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
status: Some(Code::Unknown as i64),
content: "Unknown error".to_string(),
event_type: GrpcEventType::ConnectionEnd,
..base_event.clone()
},
)
.await
@@ -529,7 +532,7 @@ async fn cmd_grpc_go(
&GrpcEvent {
content: message,
event_type: GrpcEventType::ServerMessage,
..base_msg.clone()
..base_event.clone()
},
)
.await
@@ -541,13 +544,14 @@ async fn cmd_grpc_go(
.await
.unwrap_or_default()
.unwrap_or_default();
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unavailable as i64,
trailers: Json(metadata_to_map(trailers)),
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
&GrpcEvent {
content: "Connection complete".to_string(),
status: Some(Code::Unavailable as i64),
metadata: Json(metadata_to_map(trailers)),
event_type: GrpcEventType::ConnectionEnd,
..base_event.clone()
},
)
.await
@@ -555,13 +559,14 @@ async fn cmd_grpc_go(
break;
}
Err(status) => {
upsert_grpc_connection(
upsert_grpc_event(
&w,
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i64,
status: Code::Unavailable as i64,
trailers: Json(metadata_to_map(status.metadata().clone())),
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
&GrpcEvent {
content: status.to_string(),
status: Some(status.code() as i64),
metadata: Json(metadata_to_map(status.metadata().clone())),
event_type: GrpcEventType::ConnectionEnd,
..base_event.clone()
},
)
.await
@@ -578,16 +583,32 @@ async fn cmd_grpc_go(
let w = w.clone();
tokio::select! {
_ = grpc_listen => {
// upsert_grpc_connection(
// &w,
// &GrpcConnection{
// elapsed: start.elapsed().as_millis() as i64,
// status: Code::Ok as i64,
// ..conn
// },
// ).await.unwrap();
let events = list_grpc_events(&w, &conn_id)
.await
.unwrap();
let closed_event = events
.iter()
.find(|e| GrpcEventType::ConnectionEnd == e.event_type);
let closed_status = closed_event.and_then(|e| e.status).unwrap_or(Code::Unavailable as i64);
upsert_grpc_connection(
&w,
&GrpcConnection{
elapsed: start.elapsed().as_millis() as i64,
status: closed_status,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
).await.unwrap();
},
_ = cancelled_rx.changed() => {
upsert_grpc_event(
&w,
&GrpcEvent {
content: "Cancelled".to_string(),
event_type: GrpcEventType::ConnectionEnd,
status: Some(Code::Cancelled as i64),
..base_msg.clone()
},
).await.unwrap();
upsert_grpc_connection(
&w,
&GrpcConnection {

View File

@@ -237,7 +237,7 @@ pub struct GrpcConnection {
pub trailers: Json<HashMap<String, String>>,
}
#[derive(sqlx::Type, Debug, Clone, Serialize, Deserialize)]
#[derive(sqlx::Type, Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
#[sqlx(rename_all = "snake_case")]
pub enum GrpcEventType {
@@ -245,7 +245,8 @@ pub enum GrpcEventType {
Error,
ClientMessage,
ServerMessage,
ConnectionResponse,
ConnectionStart,
ConnectionEnd,
}
impl Default for GrpcEventType {
@@ -266,6 +267,8 @@ pub struct GrpcEvent {
pub content: String,
pub event_type: GrpcEventType,
pub metadata: Json<HashMap<String, String>>,
pub status: Option<i64>,
pub error: Option<String>,
}
#[derive(sqlx::FromRow, Debug, Clone, Serialize, Deserialize, Default)]
@@ -699,32 +702,37 @@ pub async fn list_grpc_connections(
pub async fn upsert_grpc_event(
mgr: &impl Manager<Wry>,
message: &GrpcEvent,
event: &GrpcEvent,
) -> Result<GrpcEvent, sqlx::Error> {
let db = get_db(mgr).await;
let id = match message.id.as_str() {
let id = match event.id.as_str() {
"" => generate_id(Some("ge")),
_ => message.id.to_string(),
_ => event.id.to_string(),
};
sqlx::query!(
r#"
INSERT INTO grpc_events (
id, workspace_id, request_id, connection_id, content, event_type, metadata
id, workspace_id, request_id, connection_id, content, event_type, metadata,
status, error
)
VALUES (?, ?, ?, ?, ?, ?, ?)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
updated_at = CURRENT_TIMESTAMP,
content = excluded.content,
event_type = excluded.event_type,
metadata = excluded.metadata
metadata = excluded.metadata,
status = excluded.status,
error = excluded.error
"#,
id,
message.workspace_id,
message.request_id,
message.connection_id,
message.content,
message.event_type,
message.metadata,
event.workspace_id,
event.request_id,
event.connection_id,
event.content,
event.event_type,
event.metadata,
event.status,
event.error,
)
.execute(&db)
.await?;
@@ -744,7 +752,7 @@ pub async fn get_grpc_event(
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, content,
id, model, workspace_id, request_id, connection_id, created_at, content, status, error,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events
@@ -765,7 +773,7 @@ pub async fn list_grpc_events(
GrpcEvent,
r#"
SELECT
id, model, workspace_id, request_id, connection_id, created_at, content,
id, model, workspace_id, request_id, connection_id, created_at, content, status, error,
event_type AS "event_type!: GrpcEventType",
metadata AS "metadata!: sqlx::types::Json<HashMap<String, String>>"
FROM grpc_events