mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-07-04 12:01:52 +02:00
Improve SQLite response persistence performance (#496)
This commit is contained in:
@@ -7,7 +7,7 @@ use log::info;
|
||||
use r2d2::Pool;
|
||||
use r2d2_sqlite::SqliteConnectionManager;
|
||||
use std::fs::create_dir_all;
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::mpsc;
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -22,6 +22,19 @@ pub mod query_manager;
|
||||
pub mod render;
|
||||
pub mod util;
|
||||
|
||||
fn sqlite_file_manager(path: impl Into<PathBuf>) -> SqliteConnectionManager {
|
||||
SqliteConnectionManager::file(path.into()).with_init(|conn| {
|
||||
conn.pragma_update(None, "journal_mode", "WAL")?;
|
||||
conn.pragma_update(None, "synchronous", "NORMAL")?;
|
||||
conn.busy_timeout(Duration::from_millis(5000))
|
||||
})
|
||||
}
|
||||
|
||||
fn sqlite_memory_manager() -> SqliteConnectionManager {
|
||||
SqliteConnectionManager::memory()
|
||||
.with_init(|conn| conn.busy_timeout(Duration::from_millis(5000)))
|
||||
}
|
||||
|
||||
/// Initialize the database managers for standalone (non-Tauri) usage.
|
||||
///
|
||||
/// Returns a tuple of (QueryManager, BlobManager, event_receiver).
|
||||
@@ -43,7 +56,7 @@ pub fn init_standalone(
|
||||
|
||||
// Main database pool
|
||||
info!("Initializing app database {db_path:?}");
|
||||
let manager = SqliteConnectionManager::file(db_path);
|
||||
let manager = sqlite_file_manager(db_path);
|
||||
let pool = Pool::builder()
|
||||
.max_size(100)
|
||||
.connection_timeout(Duration::from_secs(10))
|
||||
@@ -55,7 +68,7 @@ pub fn init_standalone(
|
||||
info!("Initializing blobs database {blob_path:?}");
|
||||
|
||||
// Blob database pool
|
||||
let blob_manager = SqliteConnectionManager::file(blob_path);
|
||||
let blob_manager = sqlite_file_manager(blob_path);
|
||||
let blob_pool = Pool::builder()
|
||||
.max_size(50)
|
||||
.connection_timeout(Duration::from_secs(10))
|
||||
@@ -75,7 +88,7 @@ pub fn init_standalone(
|
||||
/// Useful for testing and CI environments.
|
||||
pub fn init_in_memory() -> Result<(QueryManager, BlobManager, mpsc::Receiver<ModelPayload>)> {
|
||||
// Main database pool
|
||||
let manager = SqliteConnectionManager::memory();
|
||||
let manager = sqlite_memory_manager();
|
||||
let pool = Pool::builder()
|
||||
.max_size(1) // In-memory DB doesn't support multiple connections
|
||||
.build(manager)
|
||||
@@ -84,7 +97,7 @@ pub fn init_in_memory() -> Result<(QueryManager, BlobManager, mpsc::Receiver<Mod
|
||||
migrate_db(&pool)?;
|
||||
|
||||
// Blob database pool
|
||||
let blob_manager = SqliteConnectionManager::memory();
|
||||
let blob_manager = sqlite_memory_manager();
|
||||
let blob_pool = Pool::builder()
|
||||
.max_size(1)
|
||||
.build(blob_manager)
|
||||
|
||||
+10
-10
@@ -684,6 +684,7 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
}
|
||||
})?;
|
||||
let body_path = params.response_dir.join(&response.id);
|
||||
let response_body_path = body_path.to_string_lossy().to_string();
|
||||
let connected_response = HttpResponse {
|
||||
state: HttpResponseState::Connected,
|
||||
elapsed_headers: headers_elapsed,
|
||||
@@ -693,7 +694,7 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
remote_addr: http_response.remote_addr.clone(),
|
||||
version: http_response.version.clone(),
|
||||
elapsed_dns: dns_elapsed.load(Ordering::Relaxed),
|
||||
body_path: Some(body_path.to_string_lossy().to_string()),
|
||||
body_path: Some(response_body_path.clone()),
|
||||
content_length: http_response.content_length.map(u64_to_i32),
|
||||
headers: http_response
|
||||
.headers
|
||||
@@ -724,6 +725,8 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
let mut body_stream =
|
||||
http_response.into_body_stream().map_err(SendHttpRequestError::ReadResponseBody)?;
|
||||
let mut response_body = Vec::new();
|
||||
let mut read_buf = vec![0; 64 * 1024];
|
||||
let collect_response_body = !persist_response && params.emit_response_body_chunks_to.is_none();
|
||||
let mut body_read_error = None;
|
||||
let mut written_bytes: usize = 0;
|
||||
let mut last_progress_update = started_at;
|
||||
@@ -740,12 +743,12 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
_ = cancelled_rx.changed() => {
|
||||
None
|
||||
}
|
||||
result = body_stream.read_buf(&mut response_body) => {
|
||||
result = body_stream.read(&mut read_buf) => {
|
||||
Some(result)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Some(body_stream.read_buf(&mut response_body).await)
|
||||
Some(body_stream.read(&mut read_buf).await)
|
||||
};
|
||||
|
||||
let Some(read_result) = read_result else {
|
||||
@@ -756,17 +759,14 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
written_bytes += n;
|
||||
let start_idx = response_body.len() - n;
|
||||
let chunk = &response_body[start_idx..];
|
||||
let chunk = &read_buf[..n];
|
||||
file.write_all(chunk).await.map_err(|source| {
|
||||
SendHttpRequestError::WriteResponseBody { path: body_path.clone(), source }
|
||||
})?;
|
||||
file.flush().await.map_err(|source| SendHttpRequestError::WriteResponseBody {
|
||||
path: body_path.clone(),
|
||||
source,
|
||||
})?;
|
||||
if let Some(tx) = params.emit_response_body_chunks_to.as_ref() {
|
||||
let _ = tx.send(chunk.to_vec());
|
||||
} else if collect_response_body {
|
||||
response_body.extend_from_slice(chunk);
|
||||
}
|
||||
|
||||
let now = Instant::now();
|
||||
@@ -854,7 +854,7 @@ pub async fn send_http_request<T: TemplateCallback>(
|
||||
|
||||
let compressed_length = http_response.content_length.unwrap_or(written_bytes as u64);
|
||||
let final_response = HttpResponse {
|
||||
body_path: Some(body_path.to_string_lossy().to_string()),
|
||||
body_path: Some(response_body_path),
|
||||
content_length: Some(usize_to_i32(written_bytes)),
|
||||
content_length_compressed: Some(u64_to_i32(compressed_length)),
|
||||
elapsed: duration_to_i32(started_at.elapsed()),
|
||||
|
||||
Reference in New Issue
Block a user