From ead17530bdc28bbd16174b562615baf6c8c9f5a1 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Fri, 12 Jun 2026 15:01:53 +0200 Subject: [PATCH] fix: schedule nightly index rebuild on worker and skip per-ingest rebuild. Ingest relies on SurrealDB incremental index maintenance; the worker runs native REBUILD INDEX on a configurable interval with lease state on system_settings. --- CHANGELOG.md | 1 + ...000001_system_settings_index_rebuild.surql | 5 + ..._000001_system_settings_index_rebuild.json | 1 + common/db/schemas/system_settings.surql | 4 + common/src/storage/indexes.rs | 198 +++++++++++++++++- common/src/storage/types/system_settings.rs | 128 +++++++++++ common/src/utils/config.rs | 8 + html-router/src/routes/admin/handlers.rs | 3 + ingestion-pipeline/src/lib.rs | 8 + ingestion-pipeline/src/pipeline/stages.rs | 6 +- main/src/main.rs | 8 +- main/src/worker.rs | 11 +- 12 files changed, 370 insertions(+), 11 deletions(-) create mode 100644 common/db/migrations/20260612_000001_system_settings_index_rebuild.surql create mode 100644 common/db/migrations/definitions/20260612_000001_system_settings_index_rebuild.json diff --git a/CHANGELOG.md b/CHANGELOG.md index 2aafdff..18e1520 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ # Changelog ## Unreleased +- Performance: ingestion skips per-task index rebuild; worker runs scheduled `REBUILD INDEX` (default every 24h via `index_rebuild_interval_secs`, `0` disables) ## 1.0.3 (2026-06-12) - Search: filter results by type — knowledge entities, ingested content, or both diff --git a/common/db/migrations/20260612_000001_system_settings_index_rebuild.surql b/common/db/migrations/20260612_000001_system_settings_index_rebuild.surql new file mode 100644 index 0000000..88ea5ca --- /dev/null +++ b/common/db/migrations/20260612_000001_system_settings_index_rebuild.surql @@ -0,0 +1,5 @@ +-- Track scheduled runtime index rebuild state on the system_settings singleton. + +DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option; +DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option; +DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option; diff --git a/common/db/migrations/definitions/20260612_000001_system_settings_index_rebuild.json b/common/db/migrations/definitions/20260612_000001_system_settings_index_rebuild.json new file mode 100644 index 0000000..202513e --- /dev/null +++ b/common/db/migrations/definitions/20260612_000001_system_settings_index_rebuild.json @@ -0,0 +1 @@ +{"schemas":"--- original\n+++ modified\n@@ -201,6 +201,10 @@\n DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option;\n+DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option;\n\n # Defines the schema for the 'text_chunk' table.\n\n","events":null} \ No newline at end of file diff --git a/common/db/schemas/system_settings.surql b/common/db/schemas/system_settings.surql index e53a75e..18dbdeb 100644 --- a/common/db/schemas/system_settings.surql +++ b/common/db/schemas/system_settings.surql @@ -14,3 +14,7 @@ DEFINE FIELD IF NOT EXISTS query_system_prompt ON system_settings TYPE string; DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string; DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string; DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string; +DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option; +DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option; +DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option; +DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option; diff --git a/common/src/storage/indexes.rs b/common/src/storage/indexes.rs index 33bd90b..1aac6bb 100644 --- a/common/src/storage/indexes.rs +++ b/common/src/storage/indexes.rs @@ -1,12 +1,19 @@ -use std::time::Duration; +use std::time::{Duration, Instant}; use anyhow::{Context, Result}; +use chrono::{DateTime, Utc}; use futures::future::try_join_all; use serde::Deserialize; use serde_json::{Map, Value}; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; -use crate::{error::AppError, storage::db::SurrealDbClient}; +use crate::{ + error::AppError, + storage::{ + db::SurrealDbClient, + types::system_settings::SystemSettings, + }, +}; const INDEX_POLL_INTERVAL: Duration = Duration::from_millis(50); const INDEX_BUILD_TIMEOUT: Duration = Duration::from_secs(30 * 60); @@ -204,6 +211,9 @@ pub async fn ensure_runtime( /// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined. /// +/// Uses `DEFINE INDEX OVERWRITE` and is reserved for dimension migrations, re-embed +/// flows, and tests. Routine optimization should use [`rebuild_runtime`]. +/// /// # Errors /// /// Returns `AppError::InternalError` if any index rebuild operation fails. @@ -211,6 +221,117 @@ pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> { rebuild_inner(db).await.map_err(AppError::internal) } +/// Rebuilds existing runtime FTS and HNSW indexes in place via SurrealQL `REBUILD INDEX`. +/// +/// SurrealDB maintains ready indexes incrementally on writes; this is for periodic +/// optimization (for example a nightly maintainer job), not ingest correctness. +/// On SurrealDB 2.6 this runs synchronously (`CONCURRENTLY` is not supported on `REBUILD`). +/// +/// # Errors +/// +/// Returns `AppError::InternalError` if any rebuild operation fails. +pub async fn rebuild_runtime(db: &SurrealDbClient) -> Result<(), AppError> { + rebuild_runtime_inner(db) + .await + .map_err(AppError::internal) +} + +/// Returns whether a scheduled index rebuild is due based on the persisted last-run time. +#[must_use] +pub fn scheduled_index_rebuild_due( + last_run: Option>, + interval_secs: u64, + now: DateTime, +) -> bool { + if interval_secs == 0 { + return false; + } + + let Some(last_run) = last_run else { + return false; + }; + + let elapsed = now.signed_duration_since(last_run); + elapsed.num_seconds() >= i64::try_from(interval_secs).unwrap_or(i64::MAX) +} + +/// Runs a scheduled native `REBUILD INDEX` pass when due, using a DB lock so only one +/// maintainer rebuilds at a time. Seeds a checkpoint on first run so the initial rebuild +/// waits one full interval after worker startup. +pub async fn maybe_run_scheduled_index_rebuild( + db: &SurrealDbClient, + worker_id: &str, + interval_secs: u64, +) { + if interval_secs == 0 { + return; + } + + let now = Utc::now(); + let settings = match SystemSettings::get_current(db).await { + Ok(settings) => settings, + Err(err) => { + warn!(error = %err, "failed to load system settings for index rebuild schedule"); + return; + } + }; + + let last_run = settings.last_index_rebuild_at; + + if last_run.is_none() { + match SystemSettings::seed_index_rebuild_checkpoint(db).await { + Ok(true) => debug!("seeded index rebuild checkpoint; first rebuild deferred"), + Ok(false) => {} + Err(err) => warn!(error = %err, "failed to seed index rebuild checkpoint"), + } + return; + } + + if !scheduled_index_rebuild_due(last_run, interval_secs, now) { + return; + } + + let lock_owner = format!("{worker_id}-index-rebuild"); + let acquired = match SystemSettings::try_acquire_index_rebuild_lease(db, &lock_owner).await { + Ok(value) => value, + Err(err) => { + warn!(error = %err, "failed to acquire index rebuild lease"); + return; + } + }; + + if !acquired { + debug!("another maintainer is rebuilding indexes"); + return; + } + + let started = Instant::now(); + info!(interval_secs, "starting scheduled runtime index rebuild"); + let rebuild_result = rebuild_runtime(db).await; + + match rebuild_result { + Ok(()) => { + if let Err(err) = SystemSettings::record_index_rebuild_completed(db, &lock_owner).await + { + warn!(error = %err, "failed to persist index rebuild checkpoint"); + SystemSettings::release_index_rebuild_lease(db, &lock_owner).await; + } + info!( + elapsed_ms = started.elapsed().as_millis(), + "scheduled runtime index rebuild completed" + ); + } + Err(err) => { + SystemSettings::release_index_rebuild_lease(db, &lock_owner).await; + error!( + error = %err, + elapsed_ms = started.elapsed().as_millis(), + "scheduled runtime index rebuild failed" + ); + } + } +} + /// Returns the dimension of the currently defined chunk-embedding HNSW index, if any. /// /// Stored embeddings always share this index's dimension because re-embedding rewrites the @@ -382,6 +503,46 @@ async fn rebuild_inner(db: &SurrealDbClient) -> Result<()> { try_join_all(hnsw_tasks).await.map(|_| ()) } +async fn rebuild_runtime_inner(db: &SurrealDbClient) -> Result<()> { + debug!("Rebuilding runtime indexes with REBUILD INDEX"); + + for spec in fts_index_specs() { + rebuild_existing_index_in_place(db, spec.index_name, spec.table).await?; + } + + let hnsw_tasks = hnsw_index_specs().into_iter().map(|spec| async move { + rebuild_existing_index_in_place(db, spec.index_name, spec.table).await + }); + + try_join_all(hnsw_tasks).await.map(|_| ()) +} + +async fn rebuild_existing_index_in_place( + db: &SurrealDbClient, + index_name: &str, + table: &str, +) -> Result<()> { + if !index_exists(db, table, index_name).await? { + debug!( + index = index_name, + table, + "Skipping in-place rebuild because index is missing" + ); + return Ok(()); + } + + let query = format!("REBUILD INDEX IF EXISTS {index_name} ON {table};"); + let res = db + .client + .query(query) + .await + .with_context(|| format!("rebuilding index {index_name} on table {table}"))?; + res.check() + .with_context(|| format!("rebuild index {index_name} on table {table} failed"))?; + + Ok(()) +} + async fn existing_hnsw_dimension( db: &SurrealDbClient, spec: &HnswIndexSpec, @@ -906,6 +1067,37 @@ mod tests { assert_eq!(extract_dimension(definition), Some(1536)); } + #[test] + fn scheduled_index_rebuild_due_respects_interval_and_disabled() { + let now = Utc::now(); + let last = now - chrono::Duration::hours(25); + + assert!(!scheduled_index_rebuild_due(None, 86_400, now)); + assert!(!scheduled_index_rebuild_due(Some(last), 0, now)); + assert!(!scheduled_index_rebuild_due(Some(now - chrono::Duration::hours(1)), 86_400, now)); + assert!(scheduled_index_rebuild_due(Some(last), 86_400, now)); + } + + #[tokio::test] + async fn rebuild_runtime_is_idempotent() -> anyhow::Result<()> { + let namespace = "indexes_in_place_rebuild"; + let database = &Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, database) + .await + .context("in-memory db")?; + + db.apply_migrations().await.context("migrations")?; + ensure_runtime(&db, 8).await.context("ensure runtime indexes")?; + + rebuild_runtime(&db) + .await + .context("first in-place rebuild")?; + rebuild_runtime(&db) + .await + .context("second in-place rebuild")?; + Ok(()) + } + #[tokio::test] async fn ensure_runtime_is_idempotent() -> anyhow::Result<()> { let namespace = "indexes_ns"; diff --git a/common/src/storage/types/system_settings.rs b/common/src/storage/types/system_settings.rs index 80b5333..a9687ff 100644 --- a/common/src/storage/types/system_settings.rs +++ b/common/src/storage/types/system_settings.rs @@ -1,3 +1,6 @@ +use chrono::{DateTime, Utc}; +use tracing::warn; + use crate::utils::config::EmbeddingBackend; use crate::utils::serde_helpers::deserialize_flexible_id; use serde::{Deserialize, Serialize}; @@ -22,6 +25,15 @@ pub struct SystemSettings { pub image_processing_model: String, pub image_processing_prompt: String, pub voice_processing_model: String, + /// When the maintainer last completed a scheduled `REBUILD INDEX` pass. + #[serde(default)] + pub last_index_rebuild_at: Option>, + /// Worker id holding the index-rebuild lease, if any. + #[serde(default)] + pub index_rebuild_lease_owner: Option, + /// Lease expiry for in-flight scheduled index rebuilds. + #[serde(default)] + pub index_rebuild_lease_expires_at: Option>, } /// Partial update for singleton system settings without cloning unchanged fields. @@ -100,6 +112,8 @@ impl SystemSettingsPatch { } } +const INDEX_REBUILD_LEASE_TTL: &str = "6h"; + impl SystemSettings { pub const RECORD_ID: &'static str = "current"; @@ -227,6 +241,89 @@ impl SystemSettings { Ok((settings, needs_update)) } + + /// Seeds the first rebuild checkpoint so the initial scheduled rebuild waits one interval. + pub async fn seed_index_rebuild_checkpoint(db: &SurrealDbClient) -> Result { + let mut response = db + .client + .query( + "UPDATE type::thing('system_settings', $id) SET last_index_rebuild_at = time::now() + WHERE last_index_rebuild_at IS NONE + RETURN AFTER;", + ) + .bind(("id", Self::RECORD_ID)) + .await + .map_err(AppError::from)?; + + let updated: Option = response.take(0).map_err(AppError::from)?; + Ok(updated.is_some()) + } + + /// Claims the singleton index-rebuild lease when it is free or expired. + pub async fn try_acquire_index_rebuild_lease( + db: &SurrealDbClient, + owner: &str, + ) -> Result { + let mut response = db + .client + .query(format!( + "UPDATE type::thing('system_settings', $id) SET + index_rebuild_lease_owner = $owner, + index_rebuild_lease_expires_at = time::now() + {INDEX_REBUILD_LEASE_TTL} + WHERE index_rebuild_lease_expires_at IS NONE + OR index_rebuild_lease_expires_at < time::now() + RETURN AFTER;" + )) + .bind(("id", Self::RECORD_ID)) + .bind(("owner", owner.to_string())) + .await + .map_err(AppError::from)?; + + let updated: Option = response.take(0).map_err(AppError::from)?; + Ok(updated.is_some()) + } + + /// Releases the index-rebuild lease when held by `owner`. + pub async fn release_index_rebuild_lease(db: &SurrealDbClient, owner: &str) { + let released = db + .client + .query( + "UPDATE type::thing('system_settings', $id) SET + index_rebuild_lease_owner = NONE, + index_rebuild_lease_expires_at = NONE + WHERE index_rebuild_lease_owner = $owner;", + ) + .bind(("id", Self::RECORD_ID)) + .bind(("owner", owner.to_string())) + .await + .and_then(surrealdb::Response::check); + + if let Err(err) = released { + warn!(error = %err, "failed to release index rebuild lease"); + } + } + + /// Records a completed scheduled index rebuild and clears the lease. + pub async fn record_index_rebuild_completed( + db: &SurrealDbClient, + owner: &str, + ) -> Result<(), AppError> { + let response = db + .client + .query( + "UPDATE type::thing('system_settings', $id) SET + last_index_rebuild_at = time::now(), + index_rebuild_lease_owner = NONE, + index_rebuild_lease_expires_at = NONE + WHERE index_rebuild_lease_owner = $owner;", + ) + .bind(("id", Self::RECORD_ID)) + .bind(("owner", owner.to_string())) + .await + .map_err(AppError::from)?; + response.check().map_err(AppError::from)?; + Ok(()) + } } #[cfg(test)] @@ -802,4 +899,35 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn index_rebuild_lease_is_exclusive_on_system_settings() -> anyhow::Result<()> { + let namespace = "system_settings_index_rebuild"; + let database = &Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, database) + .await + .context("in-memory db")?; + db.apply_migrations().await.context("migrations")?; + + assert!( + SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-a") + .await?, + "first lease claim should succeed" + ); + assert!( + !SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b") + .await?, + "second lease claim should fail while lease is held" + ); + + SystemSettings::release_index_rebuild_lease(&db, "worker-a").await; + + SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b").await?; + SystemSettings::record_index_rebuild_completed(&db, "worker-b").await?; + + let settings = SystemSettings::get_current(&db).await?; + assert!(settings.last_index_rebuild_at.is_some()); + assert!(settings.index_rebuild_lease_owner.is_none()); + Ok(()) + } } diff --git a/common/src/utils/config.rs b/common/src/utils/config.rs index 540e7ec..454f000 100644 --- a/common/src/utils/config.rs +++ b/common/src/utils/config.rs @@ -135,6 +135,9 @@ pub struct AppConfig { pub ingest_max_context_bytes: usize, #[serde(default = "default_ingest_max_category_bytes")] pub ingest_max_category_bytes: usize, + /// Seconds between scheduled `REBUILD INDEX` maintainer runs (`0` disables). + #[serde(default = "default_index_rebuild_interval_secs")] + pub index_rebuild_interval_secs: u64, } /// Default data directory for persisted assets. @@ -172,6 +175,10 @@ fn default_ingest_max_category_bytes() -> usize { 128 } +fn default_index_rebuild_interval_secs() -> u64 { + 86_400 +} + static ORT_PATH_INIT: Once = Once::new(); /// Sets `ORT_DYLIB_PATH` once per process when a bundled ONNX runtime library is found. @@ -238,6 +245,7 @@ impl Default for AppConfig { ingest_max_content_bytes: default_ingest_max_content_bytes(), ingest_max_context_bytes: default_ingest_max_context_bytes(), ingest_max_category_bytes: default_ingest_max_category_bytes(), + index_rebuild_interval_secs: default_index_rebuild_interval_secs(), } } } diff --git a/html-router/src/routes/admin/handlers.rs b/html-router/src/routes/admin/handlers.rs index b80a970..bb36324 100644 --- a/html-router/src/routes/admin/handlers.rs +++ b/html-router/src/routes/admin/handlers.rs @@ -350,6 +350,9 @@ mod tests { image_processing_model: "gpt-4o-mini".into(), image_processing_prompt: "p".into(), voice_processing_model: "whisper-1".into(), + last_index_rebuild_at: None, + index_rebuild_lease_owner: None, + index_rebuild_lease_expires_at: None, } } diff --git a/ingestion-pipeline/src/lib.rs b/ingestion-pipeline/src/lib.rs index 83b2ae9..1c0a2ca 100644 --- a/ingestion-pipeline/src/lib.rs +++ b/ingestion-pipeline/src/lib.rs @@ -6,6 +6,7 @@ pub mod utils; use chrono::Utc; use common::storage::{ db::SurrealDbClient, + indexes::maybe_run_scheduled_index_rebuild, types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS}, }; pub use pipeline::{ @@ -25,6 +26,7 @@ const WORKER_CLAIM_ERROR_BACKOFF_MS: u64 = 1_000; pub async fn run_worker_loop( db: Arc, ingestion_pipeline: Arc, + index_rebuild_interval_secs: u64, ) -> anyhow::Result<()> { let worker_id = format!("ingestion-worker-{}", Uuid::new_v4()); let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64); @@ -46,6 +48,12 @@ pub async fn run_worker_loop( } } Ok(None) => { + maybe_run_scheduled_index_rebuild( + db.as_ref(), + &worker_id, + index_rebuild_interval_secs, + ) + .await; sleep(idle_backoff).await; } Err(err) => { diff --git a/ingestion-pipeline/src/pipeline/stages.rs b/ingestion-pipeline/src/pipeline/stages.rs index bfe2063..330f92a 100644 --- a/ingestion-pipeline/src/pipeline/stages.rs +++ b/ingestion-pipeline/src/pipeline/stages.rs @@ -6,10 +6,7 @@ use common::{ error::AppError, - storage::{ - indexes::rebuild, - types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings}, - }, + storage::types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings}, }; use state_machines::core::GuardError; use tracing::{debug, instrument}; @@ -179,7 +176,6 @@ pub async fn persist( ) .await?; ctx.db.store_item(text_content).await?; - rebuild(ctx.db).await?; debug!( task_id = %ctx.task_id, diff --git a/main/src/main.rs b/main/src/main.rs index 700effd..844314e 100644 --- a/main/src/main.rs +++ b/main/src/main.rs @@ -43,6 +43,7 @@ async fn main() -> anyhow::Result<()> { let worker_openai = Arc::clone(&services.openai_client); let worker_embedding = Arc::clone(&services.embedding_provider); let worker_config = services.config.clone(); + let index_rebuild_interval_secs = worker_config.index_rebuild_interval_secs; let worker_reranker = services.reranker_pool.clone(); let worker_storage = services.storage.clone(); @@ -59,7 +60,12 @@ async fn main() -> anyhow::Result<()> { worker_embedding, )?); - run_worker_loop(worker_db, ingestion_pipeline).await + run_worker_loop( + worker_db, + ingestion_pipeline, + index_rebuild_interval_secs, + ) + .await }); tokio::select! { diff --git a/main/src/worker.rs b/main/src/worker.rs index 79c52eb..dac6dfb 100644 --- a/main/src/worker.rs +++ b/main/src/worker.rs @@ -26,7 +26,12 @@ async fn main() -> anyhow::Result<()> { Arc::clone(&services.embedding_provider), )?); - run_worker_loop(services.db, ingestion_pipeline).await + run_worker_loop( + services.db, + ingestion_pipeline, + services.config.index_rebuild_interval_secs, + ) + .await } #[cfg(test)] @@ -69,7 +74,9 @@ mod tests { let db = Arc::clone(&services.db); let pipeline = Arc::new(pipeline); let worker = - tokio::spawn(async move { ingestion_pipeline::run_worker_loop(db, pipeline).await }); + tokio::spawn(async move { + ingestion_pipeline::run_worker_loop(db, pipeline, 0).await + }); tokio::time::sleep(Duration::from_millis(250)).await; assert!(