fix: schedule nightly index rebuild on worker and skip per-ingest rebuild.

Ingest relies on SurrealDB incremental index maintenance; the worker runs native REBUILD INDEX on a configurable interval with lease state on system_settings.
This commit is contained in:
Per Stark
2026-06-12 15:01:53 +02:00
parent 4e8a58fff1
commit ead17530bd
12 changed files with 370 additions and 11 deletions
+1
View File
@@ -1,5 +1,6 @@
# Changelog
## Unreleased
- Performance: ingestion skips per-task index rebuild; worker runs scheduled `REBUILD INDEX` (default every 24h via `index_rebuild_interval_secs`, `0` disables)
## 1.0.3 (2026-06-12)
- Search: filter results by type — knowledge entities, ingested content, or both
@@ -0,0 +1,5 @@
-- Track scheduled runtime index rebuild state on the system_settings singleton.
DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;
@@ -0,0 +1 @@
{"schemas":"--- original\n+++ modified\n@@ -201,6 +201,10 @@\n DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option<string>;\n+DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;\n\n # Defines the schema for the 'text_chunk' table.\n\n","events":null}
+4
View File
@@ -14,3 +14,7 @@ DEFINE FIELD IF NOT EXISTS query_system_prompt ON system_settings TYPE string;
DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;
DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;
DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;
DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option<string>;
DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;
+195 -3
View File
@@ -1,12 +1,19 @@
use std::time::Duration;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use futures::future::try_join_all;
use serde::Deserialize;
use serde_json::{Map, Value};
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};
use crate::{error::AppError, storage::db::SurrealDbClient};
use crate::{
error::AppError,
storage::{
db::SurrealDbClient,
types::system_settings::SystemSettings,
},
};
const INDEX_POLL_INTERVAL: Duration = Duration::from_millis(50);
const INDEX_BUILD_TIMEOUT: Duration = Duration::from_secs(30 * 60);
@@ -204,6 +211,9 @@ pub async fn ensure_runtime(
/// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined.
///
/// Uses `DEFINE INDEX OVERWRITE` and is reserved for dimension migrations, re-embed
/// flows, and tests. Routine optimization should use [`rebuild_runtime`].
///
/// # Errors
///
/// Returns `AppError::InternalError` if any index rebuild operation fails.
@@ -211,6 +221,117 @@ pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> {
rebuild_inner(db).await.map_err(AppError::internal)
}
/// Rebuilds existing runtime FTS and HNSW indexes in place via SurrealQL `REBUILD INDEX`.
///
/// SurrealDB maintains ready indexes incrementally on writes; this is for periodic
/// optimization (for example a nightly maintainer job), not ingest correctness.
/// On SurrealDB 2.6 this runs synchronously (`CONCURRENTLY` is not supported on `REBUILD`).
///
/// # Errors
///
/// Returns `AppError::InternalError` if any rebuild operation fails.
pub async fn rebuild_runtime(db: &SurrealDbClient) -> Result<(), AppError> {
rebuild_runtime_inner(db)
.await
.map_err(AppError::internal)
}
/// Returns whether a scheduled index rebuild is due based on the persisted last-run time.
#[must_use]
pub fn scheduled_index_rebuild_due(
last_run: Option<DateTime<Utc>>,
interval_secs: u64,
now: DateTime<Utc>,
) -> bool {
if interval_secs == 0 {
return false;
}
let Some(last_run) = last_run else {
return false;
};
let elapsed = now.signed_duration_since(last_run);
elapsed.num_seconds() >= i64::try_from(interval_secs).unwrap_or(i64::MAX)
}
/// Runs a scheduled native `REBUILD INDEX` pass when due, using a DB lock so only one
/// maintainer rebuilds at a time. Seeds a checkpoint on first run so the initial rebuild
/// waits one full interval after worker startup.
pub async fn maybe_run_scheduled_index_rebuild(
db: &SurrealDbClient,
worker_id: &str,
interval_secs: u64,
) {
if interval_secs == 0 {
return;
}
let now = Utc::now();
let settings = match SystemSettings::get_current(db).await {
Ok(settings) => settings,
Err(err) => {
warn!(error = %err, "failed to load system settings for index rebuild schedule");
return;
}
};
let last_run = settings.last_index_rebuild_at;
if last_run.is_none() {
match SystemSettings::seed_index_rebuild_checkpoint(db).await {
Ok(true) => debug!("seeded index rebuild checkpoint; first rebuild deferred"),
Ok(false) => {}
Err(err) => warn!(error = %err, "failed to seed index rebuild checkpoint"),
}
return;
}
if !scheduled_index_rebuild_due(last_run, interval_secs, now) {
return;
}
let lock_owner = format!("{worker_id}-index-rebuild");
let acquired = match SystemSettings::try_acquire_index_rebuild_lease(db, &lock_owner).await {
Ok(value) => value,
Err(err) => {
warn!(error = %err, "failed to acquire index rebuild lease");
return;
}
};
if !acquired {
debug!("another maintainer is rebuilding indexes");
return;
}
let started = Instant::now();
info!(interval_secs, "starting scheduled runtime index rebuild");
let rebuild_result = rebuild_runtime(db).await;
match rebuild_result {
Ok(()) => {
if let Err(err) = SystemSettings::record_index_rebuild_completed(db, &lock_owner).await
{
warn!(error = %err, "failed to persist index rebuild checkpoint");
SystemSettings::release_index_rebuild_lease(db, &lock_owner).await;
}
info!(
elapsed_ms = started.elapsed().as_millis(),
"scheduled runtime index rebuild completed"
);
}
Err(err) => {
SystemSettings::release_index_rebuild_lease(db, &lock_owner).await;
error!(
error = %err,
elapsed_ms = started.elapsed().as_millis(),
"scheduled runtime index rebuild failed"
);
}
}
}
/// Returns the dimension of the currently defined chunk-embedding HNSW index, if any.
///
/// Stored embeddings always share this index's dimension because re-embedding rewrites the
@@ -382,6 +503,46 @@ async fn rebuild_inner(db: &SurrealDbClient) -> Result<()> {
try_join_all(hnsw_tasks).await.map(|_| ())
}
async fn rebuild_runtime_inner(db: &SurrealDbClient) -> Result<()> {
debug!("Rebuilding runtime indexes with REBUILD INDEX");
for spec in fts_index_specs() {
rebuild_existing_index_in_place(db, spec.index_name, spec.table).await?;
}
let hnsw_tasks = hnsw_index_specs().into_iter().map(|spec| async move {
rebuild_existing_index_in_place(db, spec.index_name, spec.table).await
});
try_join_all(hnsw_tasks).await.map(|_| ())
}
async fn rebuild_existing_index_in_place(
db: &SurrealDbClient,
index_name: &str,
table: &str,
) -> Result<()> {
if !index_exists(db, table, index_name).await? {
debug!(
index = index_name,
table,
"Skipping in-place rebuild because index is missing"
);
return Ok(());
}
let query = format!("REBUILD INDEX IF EXISTS {index_name} ON {table};");
let res = db
.client
.query(query)
.await
.with_context(|| format!("rebuilding index {index_name} on table {table}"))?;
res.check()
.with_context(|| format!("rebuild index {index_name} on table {table} failed"))?;
Ok(())
}
async fn existing_hnsw_dimension(
db: &SurrealDbClient,
spec: &HnswIndexSpec,
@@ -906,6 +1067,37 @@ mod tests {
assert_eq!(extract_dimension(definition), Some(1536));
}
#[test]
fn scheduled_index_rebuild_due_respects_interval_and_disabled() {
let now = Utc::now();
let last = now - chrono::Duration::hours(25);
assert!(!scheduled_index_rebuild_due(None, 86_400, now));
assert!(!scheduled_index_rebuild_due(Some(last), 0, now));
assert!(!scheduled_index_rebuild_due(Some(now - chrono::Duration::hours(1)), 86_400, now));
assert!(scheduled_index_rebuild_due(Some(last), 86_400, now));
}
#[tokio::test]
async fn rebuild_runtime_is_idempotent() -> anyhow::Result<()> {
let namespace = "indexes_in_place_rebuild";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.context("in-memory db")?;
db.apply_migrations().await.context("migrations")?;
ensure_runtime(&db, 8).await.context("ensure runtime indexes")?;
rebuild_runtime(&db)
.await
.context("first in-place rebuild")?;
rebuild_runtime(&db)
.await
.context("second in-place rebuild")?;
Ok(())
}
#[tokio::test]
async fn ensure_runtime_is_idempotent() -> anyhow::Result<()> {
let namespace = "indexes_ns";
+128
View File
@@ -1,3 +1,6 @@
use chrono::{DateTime, Utc};
use tracing::warn;
use crate::utils::config::EmbeddingBackend;
use crate::utils::serde_helpers::deserialize_flexible_id;
use serde::{Deserialize, Serialize};
@@ -22,6 +25,15 @@ pub struct SystemSettings {
pub image_processing_model: String,
pub image_processing_prompt: String,
pub voice_processing_model: String,
/// When the maintainer last completed a scheduled `REBUILD INDEX` pass.
#[serde(default)]
pub last_index_rebuild_at: Option<DateTime<Utc>>,
/// Worker id holding the index-rebuild lease, if any.
#[serde(default)]
pub index_rebuild_lease_owner: Option<String>,
/// Lease expiry for in-flight scheduled index rebuilds.
#[serde(default)]
pub index_rebuild_lease_expires_at: Option<DateTime<Utc>>,
}
/// Partial update for singleton system settings without cloning unchanged fields.
@@ -100,6 +112,8 @@ impl SystemSettingsPatch {
}
}
const INDEX_REBUILD_LEASE_TTL: &str = "6h";
impl SystemSettings {
pub const RECORD_ID: &'static str = "current";
@@ -227,6 +241,89 @@ impl SystemSettings {
Ok((settings, needs_update))
}
/// Seeds the first rebuild checkpoint so the initial scheduled rebuild waits one interval.
pub async fn seed_index_rebuild_checkpoint(db: &SurrealDbClient) -> Result<bool, AppError> {
let mut response = db
.client
.query(
"UPDATE type::thing('system_settings', $id) SET last_index_rebuild_at = time::now()
WHERE last_index_rebuild_at IS NONE
RETURN AFTER;",
)
.bind(("id", Self::RECORD_ID))
.await
.map_err(AppError::from)?;
let updated: Option<Self> = response.take(0).map_err(AppError::from)?;
Ok(updated.is_some())
}
/// Claims the singleton index-rebuild lease when it is free or expired.
pub async fn try_acquire_index_rebuild_lease(
db: &SurrealDbClient,
owner: &str,
) -> Result<bool, AppError> {
let mut response = db
.client
.query(format!(
"UPDATE type::thing('system_settings', $id) SET
index_rebuild_lease_owner = $owner,
index_rebuild_lease_expires_at = time::now() + {INDEX_REBUILD_LEASE_TTL}
WHERE index_rebuild_lease_expires_at IS NONE
OR index_rebuild_lease_expires_at < time::now()
RETURN AFTER;"
))
.bind(("id", Self::RECORD_ID))
.bind(("owner", owner.to_string()))
.await
.map_err(AppError::from)?;
let updated: Option<Self> = response.take(0).map_err(AppError::from)?;
Ok(updated.is_some())
}
/// Releases the index-rebuild lease when held by `owner`.
pub async fn release_index_rebuild_lease(db: &SurrealDbClient, owner: &str) {
let released = db
.client
.query(
"UPDATE type::thing('system_settings', $id) SET
index_rebuild_lease_owner = NONE,
index_rebuild_lease_expires_at = NONE
WHERE index_rebuild_lease_owner = $owner;",
)
.bind(("id", Self::RECORD_ID))
.bind(("owner", owner.to_string()))
.await
.and_then(surrealdb::Response::check);
if let Err(err) = released {
warn!(error = %err, "failed to release index rebuild lease");
}
}
/// Records a completed scheduled index rebuild and clears the lease.
pub async fn record_index_rebuild_completed(
db: &SurrealDbClient,
owner: &str,
) -> Result<(), AppError> {
let response = db
.client
.query(
"UPDATE type::thing('system_settings', $id) SET
last_index_rebuild_at = time::now(),
index_rebuild_lease_owner = NONE,
index_rebuild_lease_expires_at = NONE
WHERE index_rebuild_lease_owner = $owner;",
)
.bind(("id", Self::RECORD_ID))
.bind(("owner", owner.to_string()))
.await
.map_err(AppError::from)?;
response.check().map_err(AppError::from)?;
Ok(())
}
}
#[cfg(test)]
@@ -802,4 +899,35 @@ mod tests {
);
Ok(())
}
#[tokio::test]
async fn index_rebuild_lease_is_exclusive_on_system_settings() -> anyhow::Result<()> {
let namespace = "system_settings_index_rebuild";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.context("in-memory db")?;
db.apply_migrations().await.context("migrations")?;
assert!(
SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-a")
.await?,
"first lease claim should succeed"
);
assert!(
!SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b")
.await?,
"second lease claim should fail while lease is held"
);
SystemSettings::release_index_rebuild_lease(&db, "worker-a").await;
SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b").await?;
SystemSettings::record_index_rebuild_completed(&db, "worker-b").await?;
let settings = SystemSettings::get_current(&db).await?;
assert!(settings.last_index_rebuild_at.is_some());
assert!(settings.index_rebuild_lease_owner.is_none());
Ok(())
}
}
+8
View File
@@ -135,6 +135,9 @@ pub struct AppConfig {
pub ingest_max_context_bytes: usize,
#[serde(default = "default_ingest_max_category_bytes")]
pub ingest_max_category_bytes: usize,
/// Seconds between scheduled `REBUILD INDEX` maintainer runs (`0` disables).
#[serde(default = "default_index_rebuild_interval_secs")]
pub index_rebuild_interval_secs: u64,
}
/// Default data directory for persisted assets.
@@ -172,6 +175,10 @@ fn default_ingest_max_category_bytes() -> usize {
128
}
fn default_index_rebuild_interval_secs() -> u64 {
86_400
}
static ORT_PATH_INIT: Once = Once::new();
/// Sets `ORT_DYLIB_PATH` once per process when a bundled ONNX runtime library is found.
@@ -238,6 +245,7 @@ impl Default for AppConfig {
ingest_max_content_bytes: default_ingest_max_content_bytes(),
ingest_max_context_bytes: default_ingest_max_context_bytes(),
ingest_max_category_bytes: default_ingest_max_category_bytes(),
index_rebuild_interval_secs: default_index_rebuild_interval_secs(),
}
}
}
+3
View File
@@ -350,6 +350,9 @@ mod tests {
image_processing_model: "gpt-4o-mini".into(),
image_processing_prompt: "p".into(),
voice_processing_model: "whisper-1".into(),
last_index_rebuild_at: None,
index_rebuild_lease_owner: None,
index_rebuild_lease_expires_at: None,
}
}
+8
View File
@@ -6,6 +6,7 @@ pub mod utils;
use chrono::Utc;
use common::storage::{
db::SurrealDbClient,
indexes::maybe_run_scheduled_index_rebuild,
types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS},
};
pub use pipeline::{
@@ -25,6 +26,7 @@ const WORKER_CLAIM_ERROR_BACKOFF_MS: u64 = 1_000;
pub async fn run_worker_loop(
db: Arc<SurrealDbClient>,
ingestion_pipeline: Arc<IngestionPipeline>,
index_rebuild_interval_secs: u64,
) -> anyhow::Result<()> {
let worker_id = format!("ingestion-worker-{}", Uuid::new_v4());
let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64);
@@ -46,6 +48,12 @@ pub async fn run_worker_loop(
}
}
Ok(None) => {
maybe_run_scheduled_index_rebuild(
db.as_ref(),
&worker_id,
index_rebuild_interval_secs,
)
.await;
sleep(idle_backoff).await;
}
Err(err) => {
+1 -5
View File
@@ -6,10 +6,7 @@
use common::{
error::AppError,
storage::{
indexes::rebuild,
types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings},
},
storage::types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings},
};
use state_machines::core::GuardError;
use tracing::{debug, instrument};
@@ -179,7 +176,6 @@ pub async fn persist(
)
.await?;
ctx.db.store_item(text_content).await?;
rebuild(ctx.db).await?;
debug!(
task_id = %ctx.task_id,
+7 -1
View File
@@ -43,6 +43,7 @@ async fn main() -> anyhow::Result<()> {
let worker_openai = Arc::clone(&services.openai_client);
let worker_embedding = Arc::clone(&services.embedding_provider);
let worker_config = services.config.clone();
let index_rebuild_interval_secs = worker_config.index_rebuild_interval_secs;
let worker_reranker = services.reranker_pool.clone();
let worker_storage = services.storage.clone();
@@ -59,7 +60,12 @@ async fn main() -> anyhow::Result<()> {
worker_embedding,
)?);
run_worker_loop(worker_db, ingestion_pipeline).await
run_worker_loop(
worker_db,
ingestion_pipeline,
index_rebuild_interval_secs,
)
.await
});
tokio::select! {
+9 -2
View File
@@ -26,7 +26,12 @@ async fn main() -> anyhow::Result<()> {
Arc::clone(&services.embedding_provider),
)?);
run_worker_loop(services.db, ingestion_pipeline).await
run_worker_loop(
services.db,
ingestion_pipeline,
services.config.index_rebuild_interval_secs,
)
.await
}
#[cfg(test)]
@@ -69,7 +74,9 @@ mod tests {
let db = Arc::clone(&services.db);
let pipeline = Arc::new(pipeline);
let worker =
tokio::spawn(async move { ingestion_pipeline::run_worker_loop(db, pipeline).await });
tokio::spawn(async move {
ingestion_pipeline::run_worker_loop(db, pipeline, 0).await
});
tokio::time::sleep(Duration::from_millis(250)).await;
assert!(