Response Streaming (#124)

This commit is contained in:
Gregory Schier
2024-10-09 16:27:37 -07:00
committed by GitHub
parent 2ca30bcb31
commit da6baf72f5
20 changed files with 425 additions and 301 deletions

View File

@@ -1,7 +1,4 @@
use std::collections::BTreeMap;
use std::fs;
use std::fs::{create_dir_all, File};
use std::io::Write;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
@@ -14,17 +11,21 @@ use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use http::header::{ACCEPT, USER_AGENT};
use http::{HeaderMap, HeaderName, HeaderValue};
use log::{error, warn};
use log::{debug, error, warn};
use mime_guess::Mime;
use reqwest::redirect::Policy;
use reqwest::Method;
use reqwest::{multipart, Url};
use reqwest::{Method, Response};
use serde_json::Value;
use tauri::{Manager, Runtime, WebviewWindow};
use tokio::fs;
use tokio::fs::{create_dir_all, File};
use tokio::io::AsyncWriteExt;
use tokio::sync::oneshot;
use tokio::sync::watch::Receiver;
use yaak_models::models::{
Cookie, CookieJar, Environment, HttpRequest, HttpResponse, HttpResponseHeader,
HttpResponseState,
};
use yaak_models::queries::{get_workspace, update_response_if_id, upsert_cookie_jar};
use yaak_plugin_runtime::events::{RenderPurpose, WindowContext};
@@ -35,7 +36,7 @@ pub async fn send_http_request<R: Runtime>(
response: &HttpResponse,
environment: Option<Environment>,
cookie_jar: Option<CookieJar>,
cancel_rx: &mut Receiver<bool>,
cancelled_rx: &mut Receiver<bool>,
) -> Result<HttpResponse, String> {
let workspace = get_workspace(window, &request.workspace_id)
.await
@@ -45,7 +46,7 @@ pub async fn send_http_request<R: Runtime>(
&WindowContext::from_window(window),
RenderPurpose::Send,
);
let rendered_request =
render_http_request(&request, &workspace, environment.as_ref(), &cb).await;
@@ -114,24 +115,24 @@ pub async fn send_http_request<R: Runtime>(
let uri = match http::Uri::from_str(url_string.as_str()) {
Ok(u) => u,
Err(e) => {
return response_err(
return Ok(response_err(
response,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
)
.await;
.await);
}
};
// Yes, we're parsing both URI and URL because they could return different errors
let url = match Url::from_str(uri.to_string().as_str()) {
Ok(u) => u,
Err(e) => {
return response_err(
return Ok(response_err(
response,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
)
.await;
.await);
}
};
@@ -269,12 +270,12 @@ pub async fn send_http_request<R: Runtime>(
.as_str()
.unwrap_or_default();
match fs::read(file_path).map_err(|e| e.to_string()) {
match fs::read(file_path).await.map_err(|e| e.to_string()) {
Ok(f) => {
request_builder = request_builder.body(f);
}
Err(e) => {
return response_err(response, e, window).await;
return Ok(response_err(response, e, window).await);
}
}
} else if body_type == "multipart/form-data" && request_body.contains_key("form") {
@@ -297,10 +298,12 @@ pub async fn send_http_request<R: Runtime>(
let mut part = if file_path.is_empty() {
multipart::Part::text(value.clone())
} else {
match fs::read(file_path.clone()) {
match fs::read(file_path.clone()).await {
Ok(f) => multipart::Part::bytes(f),
Err(e) => {
return response_err(response, e.to_string(), window).await;
return Ok(
response_err(response, e.to_string(), window).await
);
}
}
};
@@ -348,118 +351,167 @@ pub async fn send_http_request<R: Runtime>(
let sendable_req = match request_builder.build() {
Ok(r) => r,
Err(e) => {
return response_err(response, e.to_string(), window).await;
return Ok(response_err(response, e.to_string(), window).await);
}
};
let start = std::time::Instant::now();
let (resp_tx, resp_rx) = oneshot::channel::<Result<Response, reqwest::Error>>();
let (done_tx, done_rx) = oneshot::channel::<HttpResponse>();
let (resp_tx, resp_rx) = oneshot::channel();
let start = std::time::Instant::now();
tokio::spawn(async move {
let _ = resp_tx.send(client.execute(sendable_req).await);
});
let raw_response = tokio::select! {
Ok(r) = resp_rx => {r}
_ = cancel_rx.changed() => {
return response_err(response, "Request was cancelled".to_string(), window).await;
Ok(r) = resp_rx => r,
_ = cancelled_rx.changed() => {
debug!("Request cancelled");
return Ok(response_err(response, "Request was cancelled".to_string(), window).await);
}
};
match raw_response {
Ok(v) => {
let mut response = response.clone();
response.elapsed_headers = start.elapsed().as_millis() as i32;
let response_headers = v.headers().clone();
response.status = v.status().as_u16() as i32;
response.status_reason = v.status().canonical_reason().map(|s| s.to_string());
response.headers = response_headers
.iter()
.map(|(k, v)| HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap_or_default().to_string(),
})
.collect();
response.url = v.url().to_string();
response.remote_addr = v.remote_addr().map(|a| a.to_string());
response.version = match v.version() {
reqwest::Version::HTTP_09 => Some("HTTP/0.9".to_string()),
reqwest::Version::HTTP_10 => Some("HTTP/1.0".to_string()),
reqwest::Version::HTTP_11 => Some("HTTP/1.1".to_string()),
reqwest::Version::HTTP_2 => Some("HTTP/2".to_string()),
reqwest::Version::HTTP_3 => Some("HTTP/3".to_string()),
_ => None,
{
let window = window.clone();
let response = response.clone();
let cancelled_rx = cancelled_rx.clone();
tokio::spawn(async move {
let result = match raw_response {
Ok(mut v) => {
let mut response = response.clone();
let response_headers = v.headers().clone();
response.elapsed_headers = start.elapsed().as_millis() as i32;
response.status = v.status().as_u16() as i32;
response.status_reason = v.status().canonical_reason().map(|s| s.to_string());
response.headers = response_headers
.iter()
.map(|(k, v)| HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap_or_default().to_string(),
})
.collect();
response.url = v.url().to_string();
response.remote_addr = v.remote_addr().map(|a| a.to_string());
response.version = match v.version() {
reqwest::Version::HTTP_09 => Some("HTTP/0.9".to_string()),
reqwest::Version::HTTP_10 => Some("HTTP/1.0".to_string()),
reqwest::Version::HTTP_11 => Some("HTTP/1.1".to_string()),
reqwest::Version::HTTP_2 => Some("HTTP/2".to_string()),
reqwest::Version::HTTP_3 => Some("HTTP/3".to_string()),
_ => None,
};
let dir = window.app_handle().path().app_data_dir().unwrap();
let base_dir = dir.join("responses");
create_dir_all(base_dir.clone())
.await
.expect("Failed to create responses dir");
let body_path = if response.id.is_empty() {
base_dir.join(response.id.clone())
} else {
base_dir.join(uuid::Uuid::new_v4().to_string())
};
response.body_path = Some(
body_path
.to_str()
.expect("Failed to get body path")
.to_string(),
);
let content_length = v.content_length();
response.state = HttpResponseState::Connected;
response = update_response_if_id(&window, &response)
.await
.expect("Failed to update response after connected");
// Write body to FS
let mut f = File::options()
.create(true)
.truncate(true)
.write(true)
.open(&body_path)
.await
.expect("Failed to open file");
let mut written_bytes: usize = 0;
loop {
let chunk = v.chunk().await;
if *cancelled_rx.borrow() {
// Request was canceled
return;
}
match chunk {
Ok(Some(bytes)) => {
f.write_all(&bytes).await.expect("Failed to write to file");
f.flush().await.expect("Failed to flush file");
written_bytes += bytes.len();
response.elapsed = start.elapsed().as_millis() as i32;
response.content_length = Some(written_bytes as i32);
response = update_response_if_id(&window, &response)
.await
.expect("Failed to update response");
}
Ok(None) => {
break;
}
Err(e) => {
response = response_err(&response, e.to_string(), &window).await;
break;
}
}
}
// Set final content length
response.content_length = match content_length {
Some(l) => Some(l as i32),
None => Some(written_bytes as i32),
};
response.state = HttpResponseState::Closed;
response = update_response_if_id(&window, &response)
.await
.expect("Failed to update response");
// Add cookie store if specified
if let Some((cookie_store, mut cookie_jar)) = maybe_cookie_manager {
// let cookies = response_headers.get_all(SET_COOKIE).iter().map(|h| {
// println!("RESPONSE COOKIE: {}", h.to_str().unwrap());
// cookie_store::RawCookie::from_str(h.to_str().unwrap())
// .expect("Failed to parse cookie")
// });
// store.store_response_cookies(cookies, &url);
let json_cookies: Vec<Cookie> = cookie_store
.lock()
.unwrap()
.iter_any()
.map(|c| {
let json_cookie =
serde_json::to_value(&c).expect("Failed to serialize cookie");
serde_json::from_value(json_cookie)
.expect("Failed to deserialize cookie")
})
.collect::<Vec<_>>();
cookie_jar.cookies = json_cookies;
if let Err(e) = upsert_cookie_jar(&window, &cookie_jar).await {
error!("Failed to update cookie jar: {}", e);
};
}
response
}
Err(e) => response_err(&response, e.to_string(), &window).await,
};
let content_length = v.content_length();
let body_bytes = v.bytes().await.expect("Failed to get body").to_vec();
response.elapsed = start.elapsed().as_millis() as i32;
done_tx.send(result.clone()).unwrap();
});
};
// Use content length if available, otherwise use body length
response.content_length = match content_length {
Some(l) => Some(l as i32),
None => Some(body_bytes.len() as i32),
};
{
// Write body to FS
let dir = window.app_handle().path().app_data_dir().unwrap();
let base_dir = dir.join("responses");
create_dir_all(base_dir.clone()).expect("Failed to create responses dir");
let body_path = match response.id.is_empty() {
false => base_dir.join(response.id.clone()),
true => base_dir.join(uuid::Uuid::new_v4().to_string()),
};
let mut f = File::options()
.create(true)
.truncate(true)
.write(true)
.open(&body_path)
.expect("Failed to open file");
f.write_all(body_bytes.as_slice())
.expect("Failed to write to file");
response.body_path = Some(
body_path
.to_str()
.expect("Failed to get body path")
.to_string(),
);
}
response = update_response_if_id(window, &response)
.await
.expect("Failed to update response");
// Add cookie store if specified
if let Some((cookie_store, mut cookie_jar)) = maybe_cookie_manager {
// let cookies = response_headers.get_all(SET_COOKIE).iter().map(|h| {
// println!("RESPONSE COOKIE: {}", h.to_str().unwrap());
// cookie_store::RawCookie::from_str(h.to_str().unwrap())
// .expect("Failed to parse cookie")
// });
// store.store_response_cookies(cookies, &url);
let json_cookies: Vec<Cookie> = cookie_store
.lock()
.unwrap()
.iter_any()
.map(|c| {
let json_cookie =
serde_json::to_value(&c).expect("Failed to serialize cookie");
serde_json::from_value(json_cookie).expect("Failed to deserialize cookie")
})
.collect::<Vec<_>>();
cookie_jar.cookies = json_cookies;
if let Err(e) = upsert_cookie_jar(window, &cookie_jar).await {
error!("Failed to update cookie jar: {}", e);
};
}
Ok(response)
Ok(tokio::select! {
Ok(r) = done_rx => r,
_ = cancelled_rx.changed() => {
response_err(&response, "Request was cancelled".to_string(), &window).await
}
Err(e) => response_err(response, e.to_string(), window).await,
}
})
}
fn ensure_proto(url_str: &str) -> String {

View File

@@ -43,8 +43,9 @@ use crate::template_callback::PluginTemplateCallback;
use crate::updates::{UpdateMode, YaakUpdater};
use crate::window_menu::app_menu;
use yaak_models::models::{
CookieJar, Environment, EnvironmentVariable, Folder, GrpcConnection, GrpcEvent, GrpcEventType,
GrpcRequest, HttpRequest, HttpResponse, KeyValue, ModelType, Plugin, Settings, Workspace,
CookieJar, Environment, EnvironmentVariable, Folder, GrpcConnection, GrpcConnectionState,
GrpcEvent, GrpcEventType, GrpcRequest, HttpRequest, HttpResponse, HttpResponseState, KeyValue,
ModelType, Plugin, Settings, Workspace,
};
use yaak_models::queries::{
cancel_pending_grpc_connections, cancel_pending_responses, create_default_http_response,
@@ -280,6 +281,7 @@ async fn cmd_grpc_go<R: Runtime>(
request_id: req.id,
status: -1,
elapsed: 0,
state: GrpcConnectionState::Initialized,
url: req.url.clone(),
..Default::default()
},
@@ -335,6 +337,7 @@ async fn cmd_grpc_go<R: Runtime>(
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i32,
error: Some(err.clone()),
state: GrpcConnectionState::Initialized,
..conn.clone()
},
)
@@ -689,6 +692,7 @@ async fn cmd_grpc_go<R: Runtime>(
&GrpcConnection{
elapsed: start.elapsed().as_millis() as i32,
status: closed_status,
state: GrpcConnectionState::Closed,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
).await.unwrap();
@@ -708,6 +712,7 @@ async fn cmd_grpc_go<R: Runtime>(
&GrpcConnection {
elapsed: start.elapsed().as_millis() as i32,
status: Code::Cancelled as i32,
state: GrpcConnectionState::Closed,
..get_grpc_connection(&w, &conn_id).await.unwrap().clone()
},
)
@@ -752,7 +757,9 @@ async fn cmd_send_ephemeral_request(
window.listen_any(
format!("cancel_http_response_{}", response.id),
move |_event| {
let _ = cancel_tx.send(true);
if let Err(e) = cancel_tx.send(true) {
warn!("Failed to send cancel event for ephemeral request {e:?}");
}
},
);
@@ -1090,7 +1097,9 @@ async fn cmd_send_http_request(
window.listen_any(
format!("cancel_http_response_{}", response.id),
move |_event| {
let _ = cancel_tx.send(true);
if let Err(e) = cancel_tx.send(true) {
warn!("Failed to send cancel event for request {e:?}");
}
},
);
@@ -1129,15 +1138,15 @@ async fn response_err<R: Runtime>(
response: &HttpResponse,
error: String,
w: &WebviewWindow<R>,
) -> Result<HttpResponse, String> {
) -> HttpResponse {
warn!("Failed to send request: {}", error);
let mut response = response.clone();
response.elapsed = -1;
response.state = HttpResponseState::Closed;
response.error = Some(error.clone());
response = update_response_if_id(w, &response)
.await
.expect("Failed to update response");
Ok(response)
response
}
#[tauri::command]
@@ -2182,13 +2191,16 @@ async fn call_frontend<T: Serialize + Clone, R: Runtime>(
let (tx, mut rx) = tokio::sync::watch::channel(PromptTextResponse::default());
let event_id = window.clone().listen(reply_id, move |ev| {
println!("GOT REPLY {ev:?}");
let resp: PromptTextResponse = serde_json::from_str(ev.payload()).unwrap();
_ = tx.send(resp);
if let Err(e) = tx.send(resp) {
warn!("Failed to prompt for text {e:?}");
}
});
// When reply shows up, unlisten to events and return
_ = rx.changed().await;
if let Err(e) = rx.changed().await {
warn!("Failed to check channel changed {e:?}");
}
window.unlisten(event_id);
let foo = rx.borrow();