perf: offload blocking calls to spawn_blocking

- Move headless_chrome PDF rasterization from async context to
  spawn_blocking, keeping tokio worker threads responsive.
- Switch RerankerPool from tokio::sync::Mutex to std::sync::Mutex
  and run TextRerank::rerank inside spawn_blocking, since the
  rerank call is CPU-bound with no .await points.
This commit is contained in:
Per Stark
2026-05-26 15:30:03 +02:00
parent 1927149ce9
commit 6c7b586fc5
2 changed files with 47 additions and 31 deletions
+36 -24
View File
@@ -15,7 +15,7 @@ use headless_chrome::{
}; };
use lopdf::Document; use lopdf::Document;
use serde_json::Value; use serde_json::Value;
use tokio::time::sleep;
use tracing::{debug, warn}; use tracing::{debug, warn};
use common::{ use common::{
@@ -116,8 +116,29 @@ async fn load_page_numbers(pdf_bytes: Vec<u8>) -> Result<Vec<u32>, AppError> {
} }
/// Uses the existing headless Chrome dependency to rasterize the requested PDF pages into PNGs. /// Uses the existing headless Chrome dependency to rasterize the requested PDF pages into PNGs.
#[allow(clippy::too_many_lines)]
async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>>, AppError> { async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>>, AppError> {
let file_path = file_path.to_path_buf();
let pages = pages.to_vec();
let page_numbers = pages.clone();
let captures = tokio::task::spawn_blocking(move || {
render_pdf_pages_inner(&file_path, &pages)
})
.await??;
for (idx, png) in captures.iter().enumerate() {
if let Err(err) = maybe_dump_debug_image(page_numbers[idx], png).await {
warn!(
page = page_numbers[idx],
error = %err,
"Failed to write debug screenshot to disk"
);
}
}
Ok(captures)
}
fn render_pdf_pages_inner(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>>, AppError> {
let file_url = url::Url::from_file_path(file_path) let file_url = url::Url::from_file_path(file_path)
.map_err(|()| AppError::Processing("Unable to construct PDF file URL".into()))?; .map_err(|()| AppError::Processing("Unable to construct PDF file URL".into()))?;
@@ -132,7 +153,7 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>
let mut captures = Vec::with_capacity(pages.len()); let mut captures = Vec::with_capacity(pages.len());
for (idx, page) in pages.iter().enumerate() { for page in pages.iter().copied() {
let target = format!("{file_url}#page={page}&toolbar=0&statusbar=0&zoom=page-fit"); let target = format!("{file_url}#page={page}&toolbar=0&statusbar=0&zoom=page-fit");
tab.navigate_to(&target) tab.navigate_to(&target)
.map_err(|err| AppError::Processing(format!("Failed to navigate to PDF page: {err}")))? .map_err(|err| AppError::Processing(format!("Failed to navigate to PDF page: {err}")))?
@@ -150,7 +171,7 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>
break; break;
} }
if attempt < NAVIGATION_RETRY_ATTEMPTS.saturating_sub(1) { if attempt < NAVIGATION_RETRY_ATTEMPTS.saturating_sub(1) {
sleep(Duration::from_millis(NAVIGATION_RETRY_INTERVAL_MS)).await; std::thread::sleep(Duration::from_millis(NAVIGATION_RETRY_INTERVAL_MS));
} }
} }
@@ -160,25 +181,25 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>
)); ));
} }
wait_for_pdf_ready(&tab, *page)?; wait_for_pdf_ready(&tab, page)?;
tokio::time::sleep(Duration::from_millis(350)).await; std::thread::sleep(Duration::from_millis(350));
prepare_pdf_viewer(&tab, *page); prepare_pdf_viewer(&tab, page);
let mut viewport: Option<Page::Viewport> = None; let mut viewport: Option<Page::Viewport> = None;
for attempt in 0..CANVAS_VIEWPORT_ATTEMPTS { for attempt in 0..CANVAS_VIEWPORT_ATTEMPTS {
match canvas_viewport_for_page(&tab, *page) { match canvas_viewport_for_page(&tab, page) {
Ok(Some(vp)) => { Ok(Some(vp)) => {
viewport = Some(vp); viewport = Some(vp);
break; break;
} }
Ok(None) => { Ok(None) => {
if attempt < CANVAS_VIEWPORT_ATTEMPTS.saturating_sub(1) { if attempt < CANVAS_VIEWPORT_ATTEMPTS.saturating_sub(1) {
tokio::time::sleep(Duration::from_millis(CANVAS_VIEWPORT_WAIT_MS)).await; std::thread::sleep(Duration::from_millis(CANVAS_VIEWPORT_WAIT_MS));
} }
} }
Err(err) => { Err(err) => {
warn!(page = *page, error = %err, "Failed to derive canvas viewport"); warn!(page, error = %err, "Failed to derive canvas viewport");
break; break;
} }
} }
@@ -196,46 +217,37 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result<Vec<Vec<u8>
Ok(data) => match STANDARD.decode(data.data) { Ok(data) => match STANDARD.decode(data.data) {
Ok(bytes) => bytes, Ok(bytes) => bytes,
Err(err) => { Err(err) => {
warn!(error = %err, page = *page, "Failed to decode clipped screenshot; falling back to full page capture"); warn!(error = %err, page, "Failed to decode clipped screenshot; falling back to full page capture");
capture_full_page_png(&tab)? capture_full_page_png(&tab)?
} }
}, },
Err(err) => { Err(err) => {
warn!(error = %err, page = *page, "Clipped screenshot failed; falling back to full page capture"); warn!(error = %err, page, "Clipped screenshot failed; falling back to full page capture");
capture_full_page_png(&tab)? capture_full_page_png(&tab)?
} }
} }
} else { } else {
warn!( warn!(
page = *page, page,
"Unable to determine canvas viewport; capturing full page" "Unable to determine canvas viewport; capturing full page"
); );
capture_full_page_png(&tab)? capture_full_page_png(&tab)?
}; };
debug!( debug!(
page = *page, page,
bytes = png.len(), bytes = png.len(),
page_index = idx,
"Captured PDF page screenshot" "Captured PDF page screenshot"
); );
if is_suspicious_image(png.len()) { if is_suspicious_image(png.len()) {
warn!( warn!(
page = *page, page,
bytes = png.len(), bytes = png.len(),
"Screenshot size below threshold; check rendering output" "Screenshot size below threshold; check rendering output"
); );
} }
if let Err(err) = maybe_dump_debug_image(*page, &png).await {
warn!(
page = *page,
error = %err,
"Failed to write debug screenshot to disk"
);
}
captures.push(png); captures.push(png);
} }
+9 -5
View File
@@ -3,14 +3,14 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc, Mutex,
}, },
thread::available_parallelism, thread::available_parallelism,
}; };
use common::{error::AppError, utils::config::AppConfig}; use common::{error::AppError, utils::config::AppConfig};
use fastembed::{RerankInitOptions, RerankResult, TextRerank}; use fastembed::{RerankInitOptions, RerankResult, TextRerank};
use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore}; use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::debug; use tracing::debug;
static NEXT_ENGINE: AtomicUsize = AtomicUsize::new(0); static NEXT_ENGINE: AtomicUsize = AtomicUsize::new(0);
@@ -161,11 +161,15 @@ impl RerankerLease {
query: &str, query: &str,
documents: Vec<String>, documents: Vec<String>,
) -> Result<Vec<RerankResult>, AppError> { ) -> Result<Vec<RerankResult>, AppError> {
// Lock this specific engine so we get &mut TextRerank let query = query.to_owned();
let mut guard = self.engine.lock().await; let engine = Arc::clone(&self.engine);
tokio::task::spawn_blocking(move || {
let mut guard = engine.lock().expect("reranker engine mutex poisoned");
guard guard
.rerank(query.to_owned(), documents, false, None) .rerank(query, documents, false, None)
.map_err(|e| AppError::InternalError(e.to_string())) .map_err(|e| AppError::InternalError(e.to_string()))
})
.await?
} }
} }