Response streaming

This commit is contained in:
Gregory Schier
2023-03-01 09:05:00 -08:00
parent 5040d73a8b
commit 569a9454ad
11 changed files with 378 additions and 262 deletions

View File

@@ -16,12 +16,14 @@ use http::{HeaderMap, HeaderValue, Method};
use reqwest::redirect::Policy;
use sqlx::migrate::Migrator;
use sqlx::sqlite::SqlitePoolOptions;
use sqlx::types::Json;
use sqlx::{Pool, Sqlite};
use tauri::regex::Regex;
use tauri::{AppHandle, State, Wry};
use tauri::{CustomMenuItem, Manager, SystemTray, SystemTrayEvent, SystemTrayMenu, WindowEvent};
use tokio::sync::Mutex;
use crate::models::{update_response, HttpResponse};
use window_ext::WindowExt;
mod models;
@@ -55,11 +57,18 @@ async fn send_request(
app_handle: AppHandle<Wry>,
db_instance: State<'_, Mutex<Pool<Sqlite>>>,
request_id: &str,
) -> Result<models::HttpResponse, String> {
) -> Result<String, String> {
let pool = &*db_instance.lock().await;
let req = models::get_request(request_id, pool)
.await
.expect("Failed to get request");
let mut response = models::create_response(&req.id, 0, "", 0, None, "", vec![], pool)
.await
.expect("Failed to create response");
app_handle.emit_all("updated_response", &response).unwrap();
let start = std::time::Instant::now();
let mut url_string = req.url.to_string();
@@ -110,7 +119,7 @@ async fn send_request(
let sendable_req = match sendable_req_result {
Ok(r) => r,
Err(e) => {
return Err(e.to_string());
return response_err(response, e.to_string(), app_handle, pool).await;
}
};
@@ -125,41 +134,44 @@ async fn send_request(
match resp {
Ok(v) => {
let status = v.status().as_u16() as i64;
let status_reason = v.status().canonical_reason();
let headers = v
.headers()
.iter()
.map(|(k, v)| models::HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap().to_string(),
})
.collect();
let url = v.url().clone();
let body = v.text().await.expect("Failed to get body");
let elapsed = start.elapsed().as_millis() as i64;
let response = models::create_response(
&req.id,
elapsed,
url.as_str(),
status,
status_reason,
body.as_str(),
headers,
pool,
)
.await
.expect("Failed to create response");
Ok(response)
}
Err(e) => {
println!("Error: {}", e);
Err(e.to_string())
response.status = v.status().as_u16() as i64;
response.status_reason = v.status().canonical_reason().map(|s| s.to_string());
response.headers = Json(
v.headers()
.iter()
.map(|(k, v)| models::HttpResponseHeader {
name: k.as_str().to_string(),
value: v.to_str().unwrap().to_string(),
})
.collect(),
);
response.url = v.url().to_string();
response.body = v.text().await.expect("Failed to get body");
response.elapsed = start.elapsed().as_millis() as i64;
response = update_response(response, pool)
.await
.expect("Failed to update response");
app_handle.emit_all("updated_response", &response).unwrap();
Ok(response.id)
}
Err(e) => response_err(response, e.to_string(), app_handle, pool).await,
}
}
async fn response_err(
mut response: HttpResponse,
error: String,
app_handle: AppHandle<Wry>,
pool: &Pool<Sqlite>,
) -> Result<String, String> {
response.error = Some(error.clone());
response = update_response(response, pool)
.await
.expect("Failed to update response");
app_handle.emit_all("updated_response", &response).unwrap();
Ok(response.id)
}
#[tauri::command]
async fn create_request(
workspace_id: &str,
@@ -175,7 +187,7 @@ async fn create_request(
.expect("Failed to create request");
app_handle
.emit_all("created_request", &created_request)
.emit_all("updated_request", &created_request)
.unwrap();
Ok(created_request.id)

View File

@@ -53,6 +53,7 @@ pub struct HttpResponse {
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub deleted_at: Option<NaiveDateTime>,
pub error: Option<String>,
pub url: String,
pub elapsed: i64,
pub status: i64,
@@ -241,11 +242,37 @@ pub async fn create_response(
get_response(&id, pool).await
}
pub async fn update_response(
response: HttpResponse,
pool: &Pool<Sqlite>,
) -> Result<HttpResponse, sqlx::Error> {
let headers_json = Json(response.headers);
sqlx::query!(
r#"
UPDATE http_responses SET (elapsed, url, status, status_reason, body, error, headers, updated_at) =
(?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) WHERE id = ?;
"#,
response.elapsed,
response.url,
response.status,
response.status_reason,
response.body,
response.error,
headers_json,
response.id,
)
.execute(pool)
.await
.expect("Failed to update response");
get_response(&response.id, pool).await
}
pub async fn get_response(id: &str, pool: &Pool<Sqlite>) -> Result<HttpResponse, sqlx::Error> {
sqlx::query_as!(
sqlx::query_as_unchecked!(
HttpResponse,
r#"
SELECT id, workspace_id, request_id, updated_at, deleted_at, created_at, status, status_reason, body, elapsed, url,
SELECT id, workspace_id, request_id, updated_at, deleted_at, created_at,
status, status_reason, body, elapsed, url, error,
headers AS "headers!: sqlx::types::Json<Vec<HttpResponseHeader>>"
FROM http_responses
WHERE id = ?
@@ -263,16 +290,17 @@ pub async fn find_responses(
sqlx::query_as!(
HttpResponse,
r#"
SELECT id, workspace_id, request_id, updated_at, deleted_at, created_at, status, status_reason, body, elapsed, url,
SELECT id, workspace_id, request_id, updated_at, deleted_at,
created_at, status, status_reason, body, elapsed, url, error,
headers AS "headers!: sqlx::types::Json<Vec<HttpResponseHeader>>"
FROM http_responses
WHERE request_id = ?
ORDER BY created_at DESC
ORDER BY created_at ASC
"#,
request_id,
)
.fetch_all(pool)
.await
.fetch_all(pool)
.await
}
pub async fn delete_response(id: &str, pool: &Pool<Sqlite>) -> Result<(), sqlx::Error> {