Server sent event response viewer (#126)

This commit is contained in:
Gregory Schier
2024-10-11 06:52:32 -07:00
committed by GitHub
parent f974a66086
commit d754e7233d
24 changed files with 513 additions and 104 deletions

42
src-tauri/Cargo.lock generated
View File

@@ -1623,6 +1623,21 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "eventsource-client"
version = "0.13.0"
source = "git+https://github.com/yaakapp/rust-eventsource-client#e9e1e52421f11f0409179389b997aa49275a8461"
dependencies = [
"futures",
"hyper 0.14.30",
"hyper-rustls 0.24.2",
"hyper-timeout 0.4.1",
"log",
"pin-project",
"rand 0.8.5",
"tokio",
]
[[package]]
name = "exr"
version = "1.72.0"
@@ -1807,6 +1822,21 @@ dependencies = [
"new_debug_unreachable",
]
[[package]]
name = "futures"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.30"
@@ -1893,6 +1923,7 @@ version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
@@ -7823,6 +7854,7 @@ dependencies = [
"chrono",
"cocoa 0.26.0",
"datetime",
"eventsource-client",
"hex_color",
"http 1.1.0",
"log",
@@ -7854,6 +7886,7 @@ dependencies = [
"yaak_grpc",
"yaak_models",
"yaak_plugin_runtime",
"yaak_sse",
"yaak_templates",
]
@@ -7926,6 +7959,15 @@ dependencies = [
"yaak_models",
]
[[package]]
name = "yaak_sse"
version = "0.1.0"
dependencies = [
"serde",
"serde_json",
"ts-rs",
]
[[package]]
name = "yaak_templates"
version = "0.1.0"

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["yaak_grpc", "yaak_templates", "yaak_plugin_runtime", "yaak_models"]
members = ["yaak_grpc", "yaak_templates", "yaak_plugin_runtime", "yaak_models", "yaak_sse"]
[package]
name = "yaak-app"
@@ -30,6 +30,7 @@ yaak_grpc = { path = "yaak_grpc" }
yaak_templates = { path = "yaak_templates" }
yaak_plugin_runtime = { workspace = true }
yaak_models = { workspace = true }
yaak_sse = { path = "yaak_sse" }
anyhow = "1.0.86"
base64 = "0.22.0"
chrono = { version = "0.4.31", features = ["serde"] }
@@ -59,6 +60,7 @@ uuid = "1.7.0"
thiserror = "1.0.61"
mime_guess = "2.0.5"
urlencoding = "2.1.3"
eventsource-client = { git = "https://github.com/yaakapp/rust-eventsource-client", version = "0.13.0" }
[workspace.dependencies]
yaak_models = { path = "yaak_models" }

View File

@@ -1 +1 @@
{"main":{"identifier":"main","description":"Main permissions","local":true,"windows":["*"],"permissions":["core:event:allow-emit","core:event:allow-listen","core:event:allow-unlisten","os:allow-os-type","clipboard-manager:allow-clear","clipboard-manager:allow-write-text","clipboard-manager:allow-read-text","dialog:allow-open","dialog:allow-save","fs:allow-read-file","fs:allow-read-text-file",{"identifier":"fs:scope","allow":[{"path":"$APPDATA"},{"path":"$APPDATA/**"}]},"shell:allow-open","core:webview:allow-set-webview-zoom","core:window:allow-close","core:window:allow-is-fullscreen","core:window:allow-maximize","core:window:allow-minimize","core:window:allow-set-decorations","core:window:allow-set-title","core:window:allow-show","core:window:allow-start-dragging","core:window:allow-theme","core:window:allow-toggle-maximize","core:window:allow-internal-toggle-maximize","core:window:allow-unmaximize","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}}
{"main":{"identifier":"main","description":"Main permissions","local":true,"windows":["*"],"permissions":["core:event:allow-emit","core:event:allow-listen","core:event:allow-unlisten","os:allow-os-type","clipboard-manager:allow-clear","clipboard-manager:allow-write-text","clipboard-manager:allow-read-text","dialog:allow-open","dialog:allow-save","fs:allow-read-file","fs:allow-read-text-file",{"identifier":"fs:scope","allow":[{"path":"$APPDATA"},{"path":"$APPDATA/**"}]},"shell:allow-open","core:webview:allow-set-webview-zoom","core:window:allow-close","core:window:allow-internal-toggle-maximize","core:window:allow-is-fullscreen","core:window:allow-maximize","core:window:allow-minimize","core:window:allow-set-decorations","core:window:allow-set-title","core:window:allow-show","core:window:allow-start-dragging","core:window:allow-theme","core:window:allow-toggle-maximize","core:window:allow-unmaximize","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}}

View File

@@ -21,19 +21,21 @@ use tauri::{Manager, Runtime, WebviewWindow};
use tokio::fs;
use tokio::fs::{create_dir_all, File};
use tokio::io::AsyncWriteExt;
use tokio::sync::oneshot;
use tokio::sync::watch::Receiver;
use tokio::sync::{oneshot, Mutex};
use yaak_models::models::{
Cookie, CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseHeader,
HttpResponseState,
};
use yaak_models::queries::{get_workspace, update_response_if_id, upsert_cookie_jar};
use yaak_models::queries::{
get_http_response, get_workspace, update_response_if_id, upsert_cookie_jar,
};
use yaak_plugin_runtime::events::{RenderPurpose, WindowContext};
pub async fn send_http_request<R: Runtime>(
window: &WebviewWindow<R>,
request: &HttpRequest,
response: &HttpResponse,
og_response: &HttpResponse,
environment: Option<Environment>,
cookie_jar: Option<CookieJar>,
cancelled_rx: &mut Receiver<bool>,
@@ -47,6 +49,9 @@ pub async fn send_http_request<R: Runtime>(
RenderPurpose::Send,
);
let response_id = og_response.id.clone();
let response = Arc::new(Mutex::new(og_response.clone()));
let rendered_request =
render_http_request(&request, &workspace, environment.as_ref(), &cb).await;
@@ -116,7 +121,7 @@ pub async fn send_http_request<R: Runtime>(
Ok(u) => u,
Err(e) => {
return Ok(response_err(
response,
&*response.lock().await,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
)
@@ -128,7 +133,7 @@ pub async fn send_http_request<R: Runtime>(
Ok(u) => u,
Err(e) => {
return Ok(response_err(
response,
&*response.lock().await,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
)
@@ -275,7 +280,7 @@ pub async fn send_http_request<R: Runtime>(
request_builder = request_builder.body(f);
}
Err(e) => {
return Ok(response_err(response, e, window).await);
return Ok(response_err(&*response.lock().await, e, window).await);
}
}
} else if body_type == "multipart/form-data" && request_body.contains_key("form") {
@@ -301,9 +306,12 @@ pub async fn send_http_request<R: Runtime>(
match fs::read(file_path.clone()).await {
Ok(f) => multipart::Part::bytes(f),
Err(e) => {
return Ok(
response_err(response, e.to_string(), window).await
);
return Ok(response_err(
&*response.lock().await,
e.to_string(),
window,
)
.await);
}
}
};
@@ -351,7 +359,7 @@ pub async fn send_http_request<R: Runtime>(
let sendable_req = match request_builder.build() {
Ok(r) => r,
Err(e) => {
return Ok(response_err(response, e.to_string(), window).await);
return Ok(response_err(&*response.lock().await, e.to_string(), window).await);
}
};
@@ -368,62 +376,60 @@ pub async fn send_http_request<R: Runtime>(
Ok(r) = resp_rx => r,
_ = cancelled_rx.changed() => {
debug!("Request cancelled");
return Ok(response_err(response, "Request was cancelled".to_string(), window).await);
return Ok(response_err(&*response.lock().await, "Request was cancelled".to_string(), window).await);
}
};
{
let window = window.clone();
let response = response.clone();
let cancelled_rx = cancelled_rx.clone();
let response_id = response_id.clone();
let response = response.clone();
tokio::spawn(async move {
let result = match raw_response {
match raw_response {
Ok(mut v) => {
let mut response = response.clone();
let content_length = v.content_length();
let response_headers = v.headers().clone();
response.elapsed_headers = start.elapsed().as_millis() as i32;
response.status = v.status().as_u16() as i32;
response.status_reason = v.status().canonical_reason().map(|s| s.to_string());
response.headers = response_headers
.iter()
.map(|(k, v)| HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap_or_default().to_string(),
})
.collect();
response.url = v.url().to_string();
response.remote_addr = v.remote_addr().map(|a| a.to_string());
response.version = match v.version() {
reqwest::Version::HTTP_09 => Some("HTTP/0.9".to_string()),
reqwest::Version::HTTP_10 => Some("HTTP/1.0".to_string()),
reqwest::Version::HTTP_11 => Some("HTTP/1.1".to_string()),
reqwest::Version::HTTP_2 => Some("HTTP/2".to_string()),
reqwest::Version::HTTP_3 => Some("HTTP/3".to_string()),
_ => None,
};
let dir = window.app_handle().path().app_data_dir().unwrap();
let base_dir = dir.join("responses");
create_dir_all(base_dir.clone())
.await
.expect("Failed to create responses dir");
let body_path = if response.id.is_empty() {
base_dir.join(response.id.clone())
let body_path = if response_id.is_empty() {
base_dir.join(response_id.clone())
} else {
base_dir.join(uuid::Uuid::new_v4().to_string())
};
response.body_path = Some(
body_path
.to_str()
.expect("Failed to get body path")
.to_string(),
);
{
let mut r = response.lock().await;
r.body_path = Some(body_path.to_str().unwrap().to_string());
r.elapsed_headers = start.elapsed().as_millis() as i32;
r.status = v.status().as_u16() as i32;
r.status_reason = v.status().canonical_reason().map(|s| s.to_string());
r.headers = response_headers
.iter()
.map(|(k, v)| HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap_or_default().to_string(),
})
.collect();
r.url = v.url().to_string();
r.remote_addr = v.remote_addr().map(|a| a.to_string());
r.version = match v.version() {
reqwest::Version::HTTP_09 => Some("HTTP/0.9".to_string()),
reqwest::Version::HTTP_10 => Some("HTTP/1.0".to_string()),
reqwest::Version::HTTP_11 => Some("HTTP/1.1".to_string()),
reqwest::Version::HTTP_2 => Some("HTTP/2".to_string()),
reqwest::Version::HTTP_3 => Some("HTTP/3".to_string()),
_ => None,
};
let content_length = v.content_length();
response.state = HttpResponseState::Connected;
response = update_response_if_id(&window, &response)
.await
.expect("Failed to update response after connected");
r.state = HttpResponseState::Connected;
update_response_if_id(&window, &r)
.await
.expect("Failed to update response after connected");
}
// Write body to FS
let mut f = File::options()
@@ -446,9 +452,10 @@ pub async fn send_http_request<R: Runtime>(
f.write_all(&bytes).await.expect("Failed to write to file");
f.flush().await.expect("Failed to flush file");
written_bytes += bytes.len();
response.elapsed = start.elapsed().as_millis() as i32;
response.content_length = Some(written_bytes as i32);
response = update_response_if_id(&window, &response)
let mut r = response.lock().await;
r.elapsed = start.elapsed().as_millis() as i32;
r.content_length = Some(written_bytes as i32);
update_response_if_id(&window, &r)
.await
.expect("Failed to update response");
}
@@ -456,21 +463,24 @@ pub async fn send_http_request<R: Runtime>(
break;
}
Err(e) => {
response = response_err(&response, e.to_string(), &window).await;
response_err(&*response.lock().await, e.to_string(), &window).await;
break;
}
}
}
// Set final content length
response.content_length = match content_length {
Some(l) => Some(l as i32),
None => Some(written_bytes as i32),
{
let mut r = response.lock().await;
r.content_length = match content_length {
Some(l) => Some(l as i32),
None => Some(written_bytes as i32),
};
r.state = HttpResponseState::Closed;
update_response_if_id(&window, &r)
.await
.expect("Failed to update response");
};
response.state = HttpResponseState::Closed;
response = update_response_if_id(&window, &response)
.await
.expect("Failed to update response");
// Add cookie store if specified
if let Some((cookie_store, mut cookie_jar)) = maybe_cookie_manager {
@@ -497,19 +507,29 @@ pub async fn send_http_request<R: Runtime>(
error!("Failed to update cookie jar: {}", e);
};
}
response
}
Err(e) => response_err(&response, e.to_string(), &window).await,
Err(e) => {
response_err(&*response.lock().await, e.to_string(), &window).await;
}
};
done_tx.send(result.clone()).unwrap();
let r = response.lock().await.clone();
done_tx.send(r).unwrap();
});
};
Ok(tokio::select! {
Ok(r) = done_rx => r,
_ = cancelled_rx.changed() => {
response_err(&response, "Request was cancelled".to_string(), &window).await
match get_http_response(window, response_id.as_str()).await {
Ok(mut r) => {
r.state = HttpResponseState::Closed;
update_response_if_id(&window, &r).await.expect("Failed to update response")
},
_ => {
response_err(&*response.lock().await, "Ephemeral request was cancelled".to_string(), &window).await
}.clone(),
}
}
})
}

View File

@@ -3,7 +3,7 @@ extern crate core;
extern crate objc;
use std::collections::BTreeMap;
use std::fs::{create_dir_all, read_to_string, File};
use std::fs::{create_dir_all, File};
use std::path::PathBuf;
use std::process::exit;
use std::str::FromStr;
@@ -13,6 +13,7 @@ use std::{fs, panic};
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use chrono::Utc;
use eventsource_client::{EventParser, SSE};
use fern::colors::ColoredLevelConfig;
use log::{debug, error, info, warn};
use rand::random;
@@ -27,6 +28,7 @@ use tauri::{Manager, WindowEvent};
use tauri_plugin_clipboard_manager::ClipboardExt;
use tauri_plugin_log::{fern, Target, TargetKind};
use tauri_plugin_shell::ShellExt;
use tokio::fs::read_to_string;
use tokio::sync::Mutex;
use yaak_grpc::manager::{DynamicMessage, GrpcHandle};
@@ -69,6 +71,7 @@ use yaak_plugin_runtime::events::{
WindowContext,
};
use yaak_plugin_runtime::plugin_handle::PluginHandle;
use yaak_sse::sse::ServerSentEvent;
use yaak_templates::{Parser, Tokens};
mod analytics;
@@ -337,7 +340,7 @@ async fn cmd_grpc_go<R: Runtime>(
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i32,
error: Some(err.clone()),
state: GrpcConnectionState::Initialized,
state: GrpcConnectionState::Closed,
..conn.clone()
},
)
@@ -797,7 +800,7 @@ async fn cmd_filter_response<R: Runtime>(
}
}
let body = read_to_string(response.body_path.unwrap()).unwrap();
let body = read_to_string(response.body_path.unwrap()).await.unwrap();
// TODO: Have plugins register their own content type (regex?)
plugin_manager
@@ -806,14 +809,36 @@ async fn cmd_filter_response<R: Runtime>(
.map_err(|e| e.to_string())
}
#[tauri::command]
async fn cmd_get_sse_events(file_path: &str) -> Result<Vec<ServerSentEvent>, String> {
let body = fs::read(file_path).map_err(|e| e.to_string())?;
let mut p = EventParser::new();
p.process_bytes(body.into()).map_err(|e| e.to_string())?;
let mut events = Vec::new();
while let Some(e) = p.get_event() {
if let SSE::Event(e) = e {
events.push(ServerSentEvent {
event_type: e.event_type,
data: e.data,
id: e.id,
retry: e.retry,
});
}
}
Ok(events)
}
#[tauri::command]
async fn cmd_import_data<R: Runtime>(
window: WebviewWindow<R>,
plugin_manager: State<'_, PluginManager>,
file_path: &str,
) -> Result<WorkspaceExportResources, String> {
let file =
read_to_string(file_path).unwrap_or_else(|_| panic!("Unable to read file {}", file_path));
let file = read_to_string(file_path)
.await
.unwrap_or_else(|_| panic!("Unable to read file {}", file_path));
let file_contents = file.as_str();
let (import_result, plugin_name) = plugin_manager
.import_data(&window, file_contents)
@@ -1801,6 +1826,7 @@ pub fn run() {
])
.level_for("plugin_runtime", log::LevelFilter::Info)
.level_for("cookie_store", log::LevelFilter::Info)
.level_for("eventsource_client::event_parser", log::LevelFilter::Info)
.level_for("h2", log::LevelFilter::Info)
.level_for("hyper", log::LevelFilter::Info)
.level_for("hyper_util", log::LevelFilter::Info)
@@ -1901,6 +1927,7 @@ pub fn run() {
cmd_get_folder,
cmd_get_grpc_request,
cmd_get_http_request,
cmd_get_sse_events,
cmd_get_key_value,
cmd_get_settings,
cmd_get_workspace,

View File

@@ -488,6 +488,7 @@ pub async fn upsert_grpc_connection<R: Runtime>(
GrpcConnectionIden::Method,
GrpcConnectionIden::Elapsed,
GrpcConnectionIden::Status,
GrpcConnectionIden::State,
GrpcConnectionIden::Error,
GrpcConnectionIden::Trailers,
GrpcConnectionIden::Url,

View File

@@ -0,0 +1,9 @@
[package]
name = "yaak_sse"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1.0.204", features = ["derive"] }
serde_json = "1.0.122"
ts-rs = { version = "10.0.0", features = ["serde-json-impl"] }

View File

@@ -0,0 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type ServerSentEvent = { eventType: string, data: string, id: string | null, retry: bigint | null, };

View File

@@ -0,0 +1 @@
export * from './bindings/sse';

View File

@@ -0,0 +1,6 @@
{
"name": "@yaakapp-internal/sse",
"private": true,
"version": "1.0.0",
"main": "index.ts"
}

View File

@@ -0,0 +1 @@
pub mod sse;

View File

@@ -0,0 +1,12 @@
use serde::{Deserialize, Serialize};
use ts_rs::TS;
#[derive(Debug, Clone, Serialize, Deserialize, Default, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export, export_to = "sse.ts")]
pub struct ServerSentEvent {
pub event_type: String,
pub data: String,
pub id: Option<String>,
pub retry: Option<u64>,
}