From 6c7b586fc5f8b85dd2cbed66affe20cdb20e7490 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Tue, 26 May 2026 15:30:03 +0200 Subject: [PATCH] perf: offload blocking calls to spawn_blocking - Move headless_chrome PDF rasterization from async context to spawn_blocking, keeping tokio worker threads responsive. - Switch RerankerPool from tokio::sync::Mutex to std::sync::Mutex and run TextRerank::rerank inside spawn_blocking, since the rerank call is CPU-bound with no .await points. --- ingestion-pipeline/src/utils/pdf_ingestion.rs | 60 +++++++++++-------- retrieval-pipeline/src/reranking/mod.rs | 18 +++--- 2 files changed, 47 insertions(+), 31 deletions(-) diff --git a/ingestion-pipeline/src/utils/pdf_ingestion.rs b/ingestion-pipeline/src/utils/pdf_ingestion.rs index bc39b89..4f84130 100644 --- a/ingestion-pipeline/src/utils/pdf_ingestion.rs +++ b/ingestion-pipeline/src/utils/pdf_ingestion.rs @@ -15,7 +15,7 @@ use headless_chrome::{ }; use lopdf::Document; use serde_json::Value; -use tokio::time::sleep; + use tracing::{debug, warn}; use common::{ @@ -116,8 +116,29 @@ async fn load_page_numbers(pdf_bytes: Vec) -> Result, AppError> { } /// Uses the existing headless Chrome dependency to rasterize the requested PDF pages into PNGs. -#[allow(clippy::too_many_lines)] async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result>, AppError> { + let file_path = file_path.to_path_buf(); + let pages = pages.to_vec(); + let page_numbers = pages.clone(); + let captures = tokio::task::spawn_blocking(move || { + render_pdf_pages_inner(&file_path, &pages) + }) + .await??; + + for (idx, png) in captures.iter().enumerate() { + if let Err(err) = maybe_dump_debug_image(page_numbers[idx], png).await { + warn!( + page = page_numbers[idx], + error = %err, + "Failed to write debug screenshot to disk" + ); + } + } + + Ok(captures) +} + +fn render_pdf_pages_inner(file_path: &Path, pages: &[u32]) -> Result>, AppError> { let file_url = url::Url::from_file_path(file_path) .map_err(|()| AppError::Processing("Unable to construct PDF file URL".into()))?; @@ -132,7 +153,7 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result let mut captures = Vec::with_capacity(pages.len()); - for (idx, page) in pages.iter().enumerate() { + for page in pages.iter().copied() { let target = format!("{file_url}#page={page}&toolbar=0&statusbar=0&zoom=page-fit"); tab.navigate_to(&target) .map_err(|err| AppError::Processing(format!("Failed to navigate to PDF page: {err}")))? @@ -150,7 +171,7 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result break; } if attempt < NAVIGATION_RETRY_ATTEMPTS.saturating_sub(1) { - sleep(Duration::from_millis(NAVIGATION_RETRY_INTERVAL_MS)).await; + std::thread::sleep(Duration::from_millis(NAVIGATION_RETRY_INTERVAL_MS)); } } @@ -160,25 +181,25 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result )); } - wait_for_pdf_ready(&tab, *page)?; - tokio::time::sleep(Duration::from_millis(350)).await; + wait_for_pdf_ready(&tab, page)?; + std::thread::sleep(Duration::from_millis(350)); - prepare_pdf_viewer(&tab, *page); + prepare_pdf_viewer(&tab, page); let mut viewport: Option = None; for attempt in 0..CANVAS_VIEWPORT_ATTEMPTS { - match canvas_viewport_for_page(&tab, *page) { + match canvas_viewport_for_page(&tab, page) { Ok(Some(vp)) => { viewport = Some(vp); break; } Ok(None) => { if attempt < CANVAS_VIEWPORT_ATTEMPTS.saturating_sub(1) { - tokio::time::sleep(Duration::from_millis(CANVAS_VIEWPORT_WAIT_MS)).await; + std::thread::sleep(Duration::from_millis(CANVAS_VIEWPORT_WAIT_MS)); } } Err(err) => { - warn!(page = *page, error = %err, "Failed to derive canvas viewport"); + warn!(page, error = %err, "Failed to derive canvas viewport"); break; } } @@ -196,46 +217,37 @@ async fn render_pdf_pages(file_path: &Path, pages: &[u32]) -> Result Ok(data) => match STANDARD.decode(data.data) { Ok(bytes) => bytes, Err(err) => { - warn!(error = %err, page = *page, "Failed to decode clipped screenshot; falling back to full page capture"); + warn!(error = %err, page, "Failed to decode clipped screenshot; falling back to full page capture"); capture_full_page_png(&tab)? } }, Err(err) => { - warn!(error = %err, page = *page, "Clipped screenshot failed; falling back to full page capture"); + warn!(error = %err, page, "Clipped screenshot failed; falling back to full page capture"); capture_full_page_png(&tab)? } } } else { warn!( - page = *page, + page, "Unable to determine canvas viewport; capturing full page" ); capture_full_page_png(&tab)? }; debug!( - page = *page, + page, bytes = png.len(), - page_index = idx, "Captured PDF page screenshot" ); if is_suspicious_image(png.len()) { warn!( - page = *page, + page, bytes = png.len(), "Screenshot size below threshold; check rendering output" ); } - if let Err(err) = maybe_dump_debug_image(*page, &png).await { - warn!( - page = *page, - error = %err, - "Failed to write debug screenshot to disk" - ); - } - captures.push(png); } diff --git a/retrieval-pipeline/src/reranking/mod.rs b/retrieval-pipeline/src/reranking/mod.rs index 705af40..b706774 100644 --- a/retrieval-pipeline/src/reranking/mod.rs +++ b/retrieval-pipeline/src/reranking/mod.rs @@ -3,14 +3,14 @@ use std::{ path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, thread::available_parallelism, }; use common::{error::AppError, utils::config::AppConfig}; use fastembed::{RerankInitOptions, RerankResult, TextRerank}; -use tokio::sync::{Mutex, OwnedSemaphorePermit, Semaphore}; +use tokio::sync::{OwnedSemaphorePermit, Semaphore}; use tracing::debug; static NEXT_ENGINE: AtomicUsize = AtomicUsize::new(0); @@ -161,11 +161,15 @@ impl RerankerLease { query: &str, documents: Vec, ) -> Result, AppError> { - // Lock this specific engine so we get &mut TextRerank - let mut guard = self.engine.lock().await; + let query = query.to_owned(); + let engine = Arc::clone(&self.engine); - guard - .rerank(query.to_owned(), documents, false, None) - .map_err(|e| AppError::InternalError(e.to_string())) + tokio::task::spawn_blocking(move || { + let mut guard = engine.lock().expect("reranker engine mutex poisoned"); + guard + .rerank(query, documents, false, None) + .map_err(|e| AppError::InternalError(e.to_string())) + }) + .await? } }