mirror of
https://github.com/perstarkse/minne.git
synced 2026-07-04 12:01:48 +02:00
fix: schedule nightly index rebuild on worker and skip per-ingest rebuild.
Ingest relies on SurrealDB incremental index maintenance; the worker runs native REBUILD INDEX on a configurable interval with lease state on system_settings.
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
# Changelog
|
# Changelog
|
||||||
## Unreleased
|
## Unreleased
|
||||||
|
- Performance: ingestion skips per-task index rebuild; worker runs scheduled `REBUILD INDEX` (default every 24h via `index_rebuild_interval_secs`, `0` disables)
|
||||||
|
|
||||||
## 1.0.3 (2026-06-12)
|
## 1.0.3 (2026-06-12)
|
||||||
- Search: filter results by type — knowledge entities, ingested content, or both
|
- Search: filter results by type — knowledge entities, ingested content, or both
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
-- Track scheduled runtime index rebuild state on the system_settings singleton.
|
||||||
|
|
||||||
|
DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;
|
||||||
|
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;
|
||||||
|
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;
|
||||||
@@ -0,0 +1 @@
|
|||||||
|
{"schemas":"--- original\n+++ modified\n@@ -201,6 +201,10 @@\n DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option<string>;\n+DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;\n+DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;\n\n # Defines the schema for the 'text_chunk' table.\n\n","events":null}
|
||||||
@@ -14,3 +14,7 @@ DEFINE FIELD IF NOT EXISTS query_system_prompt ON system_settings TYPE string;
|
|||||||
DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;
|
DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;
|
||||||
DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;
|
DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;
|
||||||
DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;
|
DEFINE FIELD IF NOT EXISTS voice_processing_model ON system_settings TYPE string;
|
||||||
|
DEFINE FIELD IF NOT EXISTS embedding_backend ON system_settings TYPE option<string>;
|
||||||
|
DEFINE FIELD IF NOT EXISTS last_index_rebuild_at ON system_settings TYPE option<datetime>;
|
||||||
|
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_owner ON system_settings TYPE option<string>;
|
||||||
|
DEFINE FIELD IF NOT EXISTS index_rebuild_lease_expires_at ON system_settings TYPE option<datetime>;
|
||||||
|
|||||||
@@ -1,12 +1,19 @@
|
|||||||
use std::time::Duration;
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
use futures::future::try_join_all;
|
use futures::future::try_join_all;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{Map, Value};
|
use serde_json::{Map, Value};
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, error, info, warn};
|
||||||
|
|
||||||
use crate::{error::AppError, storage::db::SurrealDbClient};
|
use crate::{
|
||||||
|
error::AppError,
|
||||||
|
storage::{
|
||||||
|
db::SurrealDbClient,
|
||||||
|
types::system_settings::SystemSettings,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
const INDEX_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
const INDEX_POLL_INTERVAL: Duration = Duration::from_millis(50);
|
||||||
const INDEX_BUILD_TIMEOUT: Duration = Duration::from_secs(30 * 60);
|
const INDEX_BUILD_TIMEOUT: Duration = Duration::from_secs(30 * 60);
|
||||||
@@ -204,6 +211,9 @@ pub async fn ensure_runtime(
|
|||||||
|
|
||||||
/// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined.
|
/// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined.
|
||||||
///
|
///
|
||||||
|
/// Uses `DEFINE INDEX OVERWRITE` and is reserved for dimension migrations, re-embed
|
||||||
|
/// flows, and tests. Routine optimization should use [`rebuild_runtime`].
|
||||||
|
///
|
||||||
/// # Errors
|
/// # Errors
|
||||||
///
|
///
|
||||||
/// Returns `AppError::InternalError` if any index rebuild operation fails.
|
/// Returns `AppError::InternalError` if any index rebuild operation fails.
|
||||||
@@ -211,6 +221,117 @@ pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> {
|
|||||||
rebuild_inner(db).await.map_err(AppError::internal)
|
rebuild_inner(db).await.map_err(AppError::internal)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Rebuilds existing runtime FTS and HNSW indexes in place via SurrealQL `REBUILD INDEX`.
|
||||||
|
///
|
||||||
|
/// SurrealDB maintains ready indexes incrementally on writes; this is for periodic
|
||||||
|
/// optimization (for example a nightly maintainer job), not ingest correctness.
|
||||||
|
/// On SurrealDB 2.6 this runs synchronously (`CONCURRENTLY` is not supported on `REBUILD`).
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::InternalError` if any rebuild operation fails.
|
||||||
|
pub async fn rebuild_runtime(db: &SurrealDbClient) -> Result<(), AppError> {
|
||||||
|
rebuild_runtime_inner(db)
|
||||||
|
.await
|
||||||
|
.map_err(AppError::internal)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns whether a scheduled index rebuild is due based on the persisted last-run time.
|
||||||
|
#[must_use]
|
||||||
|
pub fn scheduled_index_rebuild_due(
|
||||||
|
last_run: Option<DateTime<Utc>>,
|
||||||
|
interval_secs: u64,
|
||||||
|
now: DateTime<Utc>,
|
||||||
|
) -> bool {
|
||||||
|
if interval_secs == 0 {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
let Some(last_run) = last_run else {
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
let elapsed = now.signed_duration_since(last_run);
|
||||||
|
elapsed.num_seconds() >= i64::try_from(interval_secs).unwrap_or(i64::MAX)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Runs a scheduled native `REBUILD INDEX` pass when due, using a DB lock so only one
|
||||||
|
/// maintainer rebuilds at a time. Seeds a checkpoint on first run so the initial rebuild
|
||||||
|
/// waits one full interval after worker startup.
|
||||||
|
pub async fn maybe_run_scheduled_index_rebuild(
|
||||||
|
db: &SurrealDbClient,
|
||||||
|
worker_id: &str,
|
||||||
|
interval_secs: u64,
|
||||||
|
) {
|
||||||
|
if interval_secs == 0 {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let now = Utc::now();
|
||||||
|
let settings = match SystemSettings::get_current(db).await {
|
||||||
|
Ok(settings) => settings,
|
||||||
|
Err(err) => {
|
||||||
|
warn!(error = %err, "failed to load system settings for index rebuild schedule");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let last_run = settings.last_index_rebuild_at;
|
||||||
|
|
||||||
|
if last_run.is_none() {
|
||||||
|
match SystemSettings::seed_index_rebuild_checkpoint(db).await {
|
||||||
|
Ok(true) => debug!("seeded index rebuild checkpoint; first rebuild deferred"),
|
||||||
|
Ok(false) => {}
|
||||||
|
Err(err) => warn!(error = %err, "failed to seed index rebuild checkpoint"),
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if !scheduled_index_rebuild_due(last_run, interval_secs, now) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lock_owner = format!("{worker_id}-index-rebuild");
|
||||||
|
let acquired = match SystemSettings::try_acquire_index_rebuild_lease(db, &lock_owner).await {
|
||||||
|
Ok(value) => value,
|
||||||
|
Err(err) => {
|
||||||
|
warn!(error = %err, "failed to acquire index rebuild lease");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if !acquired {
|
||||||
|
debug!("another maintainer is rebuilding indexes");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let started = Instant::now();
|
||||||
|
info!(interval_secs, "starting scheduled runtime index rebuild");
|
||||||
|
let rebuild_result = rebuild_runtime(db).await;
|
||||||
|
|
||||||
|
match rebuild_result {
|
||||||
|
Ok(()) => {
|
||||||
|
if let Err(err) = SystemSettings::record_index_rebuild_completed(db, &lock_owner).await
|
||||||
|
{
|
||||||
|
warn!(error = %err, "failed to persist index rebuild checkpoint");
|
||||||
|
SystemSettings::release_index_rebuild_lease(db, &lock_owner).await;
|
||||||
|
}
|
||||||
|
info!(
|
||||||
|
elapsed_ms = started.elapsed().as_millis(),
|
||||||
|
"scheduled runtime index rebuild completed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
SystemSettings::release_index_rebuild_lease(db, &lock_owner).await;
|
||||||
|
error!(
|
||||||
|
error = %err,
|
||||||
|
elapsed_ms = started.elapsed().as_millis(),
|
||||||
|
"scheduled runtime index rebuild failed"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Returns the dimension of the currently defined chunk-embedding HNSW index, if any.
|
/// Returns the dimension of the currently defined chunk-embedding HNSW index, if any.
|
||||||
///
|
///
|
||||||
/// Stored embeddings always share this index's dimension because re-embedding rewrites the
|
/// Stored embeddings always share this index's dimension because re-embedding rewrites the
|
||||||
@@ -382,6 +503,46 @@ async fn rebuild_inner(db: &SurrealDbClient) -> Result<()> {
|
|||||||
try_join_all(hnsw_tasks).await.map(|_| ())
|
try_join_all(hnsw_tasks).await.map(|_| ())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn rebuild_runtime_inner(db: &SurrealDbClient) -> Result<()> {
|
||||||
|
debug!("Rebuilding runtime indexes with REBUILD INDEX");
|
||||||
|
|
||||||
|
for spec in fts_index_specs() {
|
||||||
|
rebuild_existing_index_in_place(db, spec.index_name, spec.table).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let hnsw_tasks = hnsw_index_specs().into_iter().map(|spec| async move {
|
||||||
|
rebuild_existing_index_in_place(db, spec.index_name, spec.table).await
|
||||||
|
});
|
||||||
|
|
||||||
|
try_join_all(hnsw_tasks).await.map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn rebuild_existing_index_in_place(
|
||||||
|
db: &SurrealDbClient,
|
||||||
|
index_name: &str,
|
||||||
|
table: &str,
|
||||||
|
) -> Result<()> {
|
||||||
|
if !index_exists(db, table, index_name).await? {
|
||||||
|
debug!(
|
||||||
|
index = index_name,
|
||||||
|
table,
|
||||||
|
"Skipping in-place rebuild because index is missing"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let query = format!("REBUILD INDEX IF EXISTS {index_name} ON {table};");
|
||||||
|
let res = db
|
||||||
|
.client
|
||||||
|
.query(query)
|
||||||
|
.await
|
||||||
|
.with_context(|| format!("rebuilding index {index_name} on table {table}"))?;
|
||||||
|
res.check()
|
||||||
|
.with_context(|| format!("rebuild index {index_name} on table {table} failed"))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
async fn existing_hnsw_dimension(
|
async fn existing_hnsw_dimension(
|
||||||
db: &SurrealDbClient,
|
db: &SurrealDbClient,
|
||||||
spec: &HnswIndexSpec,
|
spec: &HnswIndexSpec,
|
||||||
@@ -906,6 +1067,37 @@ mod tests {
|
|||||||
assert_eq!(extract_dimension(definition), Some(1536));
|
assert_eq!(extract_dimension(definition), Some(1536));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn scheduled_index_rebuild_due_respects_interval_and_disabled() {
|
||||||
|
let now = Utc::now();
|
||||||
|
let last = now - chrono::Duration::hours(25);
|
||||||
|
|
||||||
|
assert!(!scheduled_index_rebuild_due(None, 86_400, now));
|
||||||
|
assert!(!scheduled_index_rebuild_due(Some(last), 0, now));
|
||||||
|
assert!(!scheduled_index_rebuild_due(Some(now - chrono::Duration::hours(1)), 86_400, now));
|
||||||
|
assert!(scheduled_index_rebuild_due(Some(last), 86_400, now));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rebuild_runtime_is_idempotent() -> anyhow::Result<()> {
|
||||||
|
let namespace = "indexes_in_place_rebuild";
|
||||||
|
let database = &Uuid::new_v4().to_string();
|
||||||
|
let db = SurrealDbClient::memory(namespace, database)
|
||||||
|
.await
|
||||||
|
.context("in-memory db")?;
|
||||||
|
|
||||||
|
db.apply_migrations().await.context("migrations")?;
|
||||||
|
ensure_runtime(&db, 8).await.context("ensure runtime indexes")?;
|
||||||
|
|
||||||
|
rebuild_runtime(&db)
|
||||||
|
.await
|
||||||
|
.context("first in-place rebuild")?;
|
||||||
|
rebuild_runtime(&db)
|
||||||
|
.await
|
||||||
|
.context("second in-place rebuild")?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn ensure_runtime_is_idempotent() -> anyhow::Result<()> {
|
async fn ensure_runtime_is_idempotent() -> anyhow::Result<()> {
|
||||||
let namespace = "indexes_ns";
|
let namespace = "indexes_ns";
|
||||||
|
|||||||
@@ -1,3 +1,6 @@
|
|||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use tracing::warn;
|
||||||
|
|
||||||
use crate::utils::config::EmbeddingBackend;
|
use crate::utils::config::EmbeddingBackend;
|
||||||
use crate::utils::serde_helpers::deserialize_flexible_id;
|
use crate::utils::serde_helpers::deserialize_flexible_id;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -22,6 +25,15 @@ pub struct SystemSettings {
|
|||||||
pub image_processing_model: String,
|
pub image_processing_model: String,
|
||||||
pub image_processing_prompt: String,
|
pub image_processing_prompt: String,
|
||||||
pub voice_processing_model: String,
|
pub voice_processing_model: String,
|
||||||
|
/// When the maintainer last completed a scheduled `REBUILD INDEX` pass.
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_index_rebuild_at: Option<DateTime<Utc>>,
|
||||||
|
/// Worker id holding the index-rebuild lease, if any.
|
||||||
|
#[serde(default)]
|
||||||
|
pub index_rebuild_lease_owner: Option<String>,
|
||||||
|
/// Lease expiry for in-flight scheduled index rebuilds.
|
||||||
|
#[serde(default)]
|
||||||
|
pub index_rebuild_lease_expires_at: Option<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Partial update for singleton system settings without cloning unchanged fields.
|
/// Partial update for singleton system settings without cloning unchanged fields.
|
||||||
@@ -100,6 +112,8 @@ impl SystemSettingsPatch {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const INDEX_REBUILD_LEASE_TTL: &str = "6h";
|
||||||
|
|
||||||
impl SystemSettings {
|
impl SystemSettings {
|
||||||
pub const RECORD_ID: &'static str = "current";
|
pub const RECORD_ID: &'static str = "current";
|
||||||
|
|
||||||
@@ -227,6 +241,89 @@ impl SystemSettings {
|
|||||||
|
|
||||||
Ok((settings, needs_update))
|
Ok((settings, needs_update))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Seeds the first rebuild checkpoint so the initial scheduled rebuild waits one interval.
|
||||||
|
pub async fn seed_index_rebuild_checkpoint(db: &SurrealDbClient) -> Result<bool, AppError> {
|
||||||
|
let mut response = db
|
||||||
|
.client
|
||||||
|
.query(
|
||||||
|
"UPDATE type::thing('system_settings', $id) SET last_index_rebuild_at = time::now()
|
||||||
|
WHERE last_index_rebuild_at IS NONE
|
||||||
|
RETURN AFTER;",
|
||||||
|
)
|
||||||
|
.bind(("id", Self::RECORD_ID))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::from)?;
|
||||||
|
|
||||||
|
let updated: Option<Self> = response.take(0).map_err(AppError::from)?;
|
||||||
|
Ok(updated.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Claims the singleton index-rebuild lease when it is free or expired.
|
||||||
|
pub async fn try_acquire_index_rebuild_lease(
|
||||||
|
db: &SurrealDbClient,
|
||||||
|
owner: &str,
|
||||||
|
) -> Result<bool, AppError> {
|
||||||
|
let mut response = db
|
||||||
|
.client
|
||||||
|
.query(format!(
|
||||||
|
"UPDATE type::thing('system_settings', $id) SET
|
||||||
|
index_rebuild_lease_owner = $owner,
|
||||||
|
index_rebuild_lease_expires_at = time::now() + {INDEX_REBUILD_LEASE_TTL}
|
||||||
|
WHERE index_rebuild_lease_expires_at IS NONE
|
||||||
|
OR index_rebuild_lease_expires_at < time::now()
|
||||||
|
RETURN AFTER;"
|
||||||
|
))
|
||||||
|
.bind(("id", Self::RECORD_ID))
|
||||||
|
.bind(("owner", owner.to_string()))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::from)?;
|
||||||
|
|
||||||
|
let updated: Option<Self> = response.take(0).map_err(AppError::from)?;
|
||||||
|
Ok(updated.is_some())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Releases the index-rebuild lease when held by `owner`.
|
||||||
|
pub async fn release_index_rebuild_lease(db: &SurrealDbClient, owner: &str) {
|
||||||
|
let released = db
|
||||||
|
.client
|
||||||
|
.query(
|
||||||
|
"UPDATE type::thing('system_settings', $id) SET
|
||||||
|
index_rebuild_lease_owner = NONE,
|
||||||
|
index_rebuild_lease_expires_at = NONE
|
||||||
|
WHERE index_rebuild_lease_owner = $owner;",
|
||||||
|
)
|
||||||
|
.bind(("id", Self::RECORD_ID))
|
||||||
|
.bind(("owner", owner.to_string()))
|
||||||
|
.await
|
||||||
|
.and_then(surrealdb::Response::check);
|
||||||
|
|
||||||
|
if let Err(err) = released {
|
||||||
|
warn!(error = %err, "failed to release index rebuild lease");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Records a completed scheduled index rebuild and clears the lease.
|
||||||
|
pub async fn record_index_rebuild_completed(
|
||||||
|
db: &SurrealDbClient,
|
||||||
|
owner: &str,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
let response = db
|
||||||
|
.client
|
||||||
|
.query(
|
||||||
|
"UPDATE type::thing('system_settings', $id) SET
|
||||||
|
last_index_rebuild_at = time::now(),
|
||||||
|
index_rebuild_lease_owner = NONE,
|
||||||
|
index_rebuild_lease_expires_at = NONE
|
||||||
|
WHERE index_rebuild_lease_owner = $owner;",
|
||||||
|
)
|
||||||
|
.bind(("id", Self::RECORD_ID))
|
||||||
|
.bind(("owner", owner.to_string()))
|
||||||
|
.await
|
||||||
|
.map_err(AppError::from)?;
|
||||||
|
response.check().map_err(AppError::from)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -802,4 +899,35 @@ mod tests {
|
|||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn index_rebuild_lease_is_exclusive_on_system_settings() -> anyhow::Result<()> {
|
||||||
|
let namespace = "system_settings_index_rebuild";
|
||||||
|
let database = &Uuid::new_v4().to_string();
|
||||||
|
let db = SurrealDbClient::memory(namespace, database)
|
||||||
|
.await
|
||||||
|
.context("in-memory db")?;
|
||||||
|
db.apply_migrations().await.context("migrations")?;
|
||||||
|
|
||||||
|
assert!(
|
||||||
|
SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-a")
|
||||||
|
.await?,
|
||||||
|
"first lease claim should succeed"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
!SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b")
|
||||||
|
.await?,
|
||||||
|
"second lease claim should fail while lease is held"
|
||||||
|
);
|
||||||
|
|
||||||
|
SystemSettings::release_index_rebuild_lease(&db, "worker-a").await;
|
||||||
|
|
||||||
|
SystemSettings::try_acquire_index_rebuild_lease(&db, "worker-b").await?;
|
||||||
|
SystemSettings::record_index_rebuild_completed(&db, "worker-b").await?;
|
||||||
|
|
||||||
|
let settings = SystemSettings::get_current(&db).await?;
|
||||||
|
assert!(settings.last_index_rebuild_at.is_some());
|
||||||
|
assert!(settings.index_rebuild_lease_owner.is_none());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -135,6 +135,9 @@ pub struct AppConfig {
|
|||||||
pub ingest_max_context_bytes: usize,
|
pub ingest_max_context_bytes: usize,
|
||||||
#[serde(default = "default_ingest_max_category_bytes")]
|
#[serde(default = "default_ingest_max_category_bytes")]
|
||||||
pub ingest_max_category_bytes: usize,
|
pub ingest_max_category_bytes: usize,
|
||||||
|
/// Seconds between scheduled `REBUILD INDEX` maintainer runs (`0` disables).
|
||||||
|
#[serde(default = "default_index_rebuild_interval_secs")]
|
||||||
|
pub index_rebuild_interval_secs: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Default data directory for persisted assets.
|
/// Default data directory for persisted assets.
|
||||||
@@ -172,6 +175,10 @@ fn default_ingest_max_category_bytes() -> usize {
|
|||||||
128
|
128
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_index_rebuild_interval_secs() -> u64 {
|
||||||
|
86_400
|
||||||
|
}
|
||||||
|
|
||||||
static ORT_PATH_INIT: Once = Once::new();
|
static ORT_PATH_INIT: Once = Once::new();
|
||||||
|
|
||||||
/// Sets `ORT_DYLIB_PATH` once per process when a bundled ONNX runtime library is found.
|
/// Sets `ORT_DYLIB_PATH` once per process when a bundled ONNX runtime library is found.
|
||||||
@@ -238,6 +245,7 @@ impl Default for AppConfig {
|
|||||||
ingest_max_content_bytes: default_ingest_max_content_bytes(),
|
ingest_max_content_bytes: default_ingest_max_content_bytes(),
|
||||||
ingest_max_context_bytes: default_ingest_max_context_bytes(),
|
ingest_max_context_bytes: default_ingest_max_context_bytes(),
|
||||||
ingest_max_category_bytes: default_ingest_max_category_bytes(),
|
ingest_max_category_bytes: default_ingest_max_category_bytes(),
|
||||||
|
index_rebuild_interval_secs: default_index_rebuild_interval_secs(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -350,6 +350,9 @@ mod tests {
|
|||||||
image_processing_model: "gpt-4o-mini".into(),
|
image_processing_model: "gpt-4o-mini".into(),
|
||||||
image_processing_prompt: "p".into(),
|
image_processing_prompt: "p".into(),
|
||||||
voice_processing_model: "whisper-1".into(),
|
voice_processing_model: "whisper-1".into(),
|
||||||
|
last_index_rebuild_at: None,
|
||||||
|
index_rebuild_lease_owner: None,
|
||||||
|
index_rebuild_lease_expires_at: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ pub mod utils;
|
|||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
use common::storage::{
|
use common::storage::{
|
||||||
db::SurrealDbClient,
|
db::SurrealDbClient,
|
||||||
|
indexes::maybe_run_scheduled_index_rebuild,
|
||||||
types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS},
|
types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS},
|
||||||
};
|
};
|
||||||
pub use pipeline::{
|
pub use pipeline::{
|
||||||
@@ -25,6 +26,7 @@ const WORKER_CLAIM_ERROR_BACKOFF_MS: u64 = 1_000;
|
|||||||
pub async fn run_worker_loop(
|
pub async fn run_worker_loop(
|
||||||
db: Arc<SurrealDbClient>,
|
db: Arc<SurrealDbClient>,
|
||||||
ingestion_pipeline: Arc<IngestionPipeline>,
|
ingestion_pipeline: Arc<IngestionPipeline>,
|
||||||
|
index_rebuild_interval_secs: u64,
|
||||||
) -> anyhow::Result<()> {
|
) -> anyhow::Result<()> {
|
||||||
let worker_id = format!("ingestion-worker-{}", Uuid::new_v4());
|
let worker_id = format!("ingestion-worker-{}", Uuid::new_v4());
|
||||||
let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64);
|
let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64);
|
||||||
@@ -46,6 +48,12 @@ pub async fn run_worker_loop(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
Ok(None) => {
|
||||||
|
maybe_run_scheduled_index_rebuild(
|
||||||
|
db.as_ref(),
|
||||||
|
&worker_id,
|
||||||
|
index_rebuild_interval_secs,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
sleep(idle_backoff).await;
|
sleep(idle_backoff).await;
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|||||||
@@ -6,10 +6,7 @@
|
|||||||
|
|
||||||
use common::{
|
use common::{
|
||||||
error::AppError,
|
error::AppError,
|
||||||
storage::{
|
storage::types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings},
|
||||||
indexes::rebuild,
|
|
||||||
types::{ingestion_payload::IngestionPayload, system_settings::SystemSettings},
|
|
||||||
},
|
|
||||||
};
|
};
|
||||||
use state_machines::core::GuardError;
|
use state_machines::core::GuardError;
|
||||||
use tracing::{debug, instrument};
|
use tracing::{debug, instrument};
|
||||||
@@ -179,7 +176,6 @@ pub async fn persist(
|
|||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
ctx.db.store_item(text_content).await?;
|
ctx.db.store_item(text_content).await?;
|
||||||
rebuild(ctx.db).await?;
|
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
task_id = %ctx.task_id,
|
task_id = %ctx.task_id,
|
||||||
|
|||||||
+7
-1
@@ -43,6 +43,7 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
let worker_openai = Arc::clone(&services.openai_client);
|
let worker_openai = Arc::clone(&services.openai_client);
|
||||||
let worker_embedding = Arc::clone(&services.embedding_provider);
|
let worker_embedding = Arc::clone(&services.embedding_provider);
|
||||||
let worker_config = services.config.clone();
|
let worker_config = services.config.clone();
|
||||||
|
let index_rebuild_interval_secs = worker_config.index_rebuild_interval_secs;
|
||||||
let worker_reranker = services.reranker_pool.clone();
|
let worker_reranker = services.reranker_pool.clone();
|
||||||
let worker_storage = services.storage.clone();
|
let worker_storage = services.storage.clone();
|
||||||
|
|
||||||
@@ -59,7 +60,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
worker_embedding,
|
worker_embedding,
|
||||||
)?);
|
)?);
|
||||||
|
|
||||||
run_worker_loop(worker_db, ingestion_pipeline).await
|
run_worker_loop(
|
||||||
|
worker_db,
|
||||||
|
ingestion_pipeline,
|
||||||
|
index_rebuild_interval_secs,
|
||||||
|
)
|
||||||
|
.await
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
|||||||
+9
-2
@@ -26,7 +26,12 @@ async fn main() -> anyhow::Result<()> {
|
|||||||
Arc::clone(&services.embedding_provider),
|
Arc::clone(&services.embedding_provider),
|
||||||
)?);
|
)?);
|
||||||
|
|
||||||
run_worker_loop(services.db, ingestion_pipeline).await
|
run_worker_loop(
|
||||||
|
services.db,
|
||||||
|
ingestion_pipeline,
|
||||||
|
services.config.index_rebuild_interval_secs,
|
||||||
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -69,7 +74,9 @@ mod tests {
|
|||||||
let db = Arc::clone(&services.db);
|
let db = Arc::clone(&services.db);
|
||||||
let pipeline = Arc::new(pipeline);
|
let pipeline = Arc::new(pipeline);
|
||||||
let worker =
|
let worker =
|
||||||
tokio::spawn(async move { ingestion_pipeline::run_worker_loop(db, pipeline).await });
|
tokio::spawn(async move {
|
||||||
|
ingestion_pipeline::run_worker_loop(db, pipeline, 0).await
|
||||||
|
});
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(250)).await;
|
tokio::time::sleep(Duration::from_millis(250)).await;
|
||||||
assert!(
|
assert!(
|
||||||
|
|||||||
Reference in New Issue
Block a user