diff --git a/Cargo.lock b/Cargo.lock index 3811ec3..cc9a986 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3810,6 +3810,7 @@ dependencies = [ "api-router", "async-openai", "axum", + "chrono", "common", "futures", "html-router", diff --git a/common/.surrealdb b/common/db/.surrealdb similarity index 100% rename from common/.surrealdb rename to common/db/.surrealdb diff --git a/common/migrations/20250503_215025_initial_setup.surql b/common/db/migrations/20250503_215025_initial_setup.surql similarity index 100% rename from common/migrations/20250503_215025_initial_setup.surql rename to common/db/migrations/20250503_215025_initial_setup.surql diff --git a/common/migrations/20250509_152033_rename_instructions_to_context.surql b/common/db/migrations/20250509_152033_rename_instructions_to_context.surql similarity index 100% rename from common/migrations/20250509_152033_rename_instructions_to_context.surql rename to common/db/migrations/20250509_152033_rename_instructions_to_context.surql diff --git a/common/migrations/20250514_142342_add_full_text_search_text_content.surql b/common/db/migrations/20250514_142342_add_full_text_search_text_content.surql similarity index 100% rename from common/migrations/20250514_142342_add_full_text_search_text_content.surql rename to common/db/migrations/20250514_142342_add_full_text_search_text_content.surql diff --git a/common/migrations/20250606_234535_add_embedding_model_and_dimensions_to_system_settings.surql b/common/db/migrations/20250606_234535_add_embedding_model_and_dimensions_to_system_settings.surql similarity index 100% rename from common/migrations/20250606_234535_add_embedding_model_and_dimensions_to_system_settings.surql rename to common/db/migrations/20250606_234535_add_embedding_model_and_dimensions_to_system_settings.surql diff --git a/common/migrations/20250627_120000_add_image_processing_settings.surql b/common/db/migrations/20250627_120000_add_image_processing_settings.surql similarity index 100% rename from common/migrations/20250627_120000_add_image_processing_settings.surql rename to common/db/migrations/20250627_120000_add_image_processing_settings.surql diff --git a/common/migrations/20250627_231035_remove_job_table.surql b/common/db/migrations/20250627_231035_remove_job_table.surql similarity index 100% rename from common/migrations/20250627_231035_remove_job_table.surql rename to common/db/migrations/20250627_231035_remove_job_table.surql diff --git a/common/migrations/20250701_000000_add_voice_processing_model_to_system_settings.surql b/common/db/migrations/20250701_000000_add_voice_processing_model_to_system_settings.surql similarity index 100% rename from common/migrations/20250701_000000_add_voice_processing_model_to_system_settings.surql rename to common/db/migrations/20250701_000000_add_voice_processing_model_to_system_settings.surql diff --git a/common/migrations/20250921_120004_fix_datetime_fields.surql b/common/db/migrations/20250921_120004_fix_datetime_fields.surql similarity index 100% rename from common/migrations/20250921_120004_fix_datetime_fields.surql rename to common/db/migrations/20250921_120004_fix_datetime_fields.surql diff --git a/common/migrations/20251003_000001_add_fts_for_entities_and_chunks.surql b/common/db/migrations/20251003_000001_add_fts_for_entities_and_chunks.surql similarity index 100% rename from common/migrations/20251003_000001_add_fts_for_entities_and_chunks.surql rename to common/db/migrations/20251003_000001_add_fts_for_entities_and_chunks.surql diff --git a/common/migrations/20251012_205900_state_machine_migration.surql b/common/db/migrations/20251012_205900_state_machine_migration.surql similarity index 100% rename from common/migrations/20251012_205900_state_machine_migration.surql rename to common/db/migrations/20251012_205900_state_machine_migration.surql diff --git a/common/migrations/20251022_120302_add_scratchpad_table.surql b/common/db/migrations/20251022_120302_add_scratchpad_table.surql similarity index 100% rename from common/migrations/20251022_120302_add_scratchpad_table.surql rename to common/db/migrations/20251022_120302_add_scratchpad_table.surql diff --git a/common/migrations/20251120_000001_remove_legacy_hnsw_indexes.surql b/common/db/migrations/20251120_000001_remove_legacy_hnsw_indexes.surql similarity index 100% rename from common/migrations/20251120_000001_remove_legacy_hnsw_indexes.surql rename to common/db/migrations/20251120_000001_remove_legacy_hnsw_indexes.surql diff --git a/common/migrations/20251121_113121_separate_embeddings_to_own_table.surql b/common/db/migrations/20251121_113121_separate_embeddings_to_own_table.surql similarity index 100% rename from common/migrations/20251121_113121_separate_embeddings_to_own_table.surql rename to common/db/migrations/20251121_113121_separate_embeddings_to_own_table.surql diff --git a/common/migrations/20251121_113122_migrate_embedding_data.surql b/common/db/migrations/20251121_113122_migrate_embedding_data.surql similarity index 100% rename from common/migrations/20251121_113122_migrate_embedding_data.surql rename to common/db/migrations/20251121_113122_migrate_embedding_data.surql diff --git a/common/migrations/20251122_151002_remove_legacy_embedding_fields.surql b/common/db/migrations/20251122_151002_remove_legacy_embedding_fields.surql similarity index 100% rename from common/migrations/20251122_151002_remove_legacy_embedding_fields.surql rename to common/db/migrations/20251122_151002_remove_legacy_embedding_fields.surql diff --git a/common/migrations/20251210_add_embedding_backend_to_system_settings.surql b/common/db/migrations/20251210_add_embedding_backend_to_system_settings.surql similarity index 100% rename from common/migrations/20251210_add_embedding_backend_to_system_settings.surql rename to common/db/migrations/20251210_add_embedding_backend_to_system_settings.surql diff --git a/common/migrations/20251231_enforce_schemafull.surql b/common/db/migrations/20251231_enforce_schemafull.surql similarity index 100% rename from common/migrations/20251231_enforce_schemafull.surql rename to common/db/migrations/20251231_enforce_schemafull.surql diff --git a/common/migrations/20260116_000000_add_user_theme_preference.surql b/common/db/migrations/20260116_000000_add_user_theme_preference.surql similarity index 100% rename from common/migrations/20260116_000000_add_user_theme_preference.surql rename to common/db/migrations/20260116_000000_add_user_theme_preference.surql diff --git a/common/migrations/20260528_000001_file_user_sha256_index.surql b/common/db/migrations/20260528_000001_file_user_sha256_index.surql similarity index 100% rename from common/migrations/20260528_000001_file_user_sha256_index.surql rename to common/db/migrations/20260528_000001_file_user_sha256_index.surql diff --git a/common/migrations/20260528_000002_knowledge_graph_storage_hardening.surql b/common/db/migrations/20260528_000002_knowledge_graph_storage_hardening.surql similarity index 100% rename from common/migrations/20260528_000002_knowledge_graph_storage_hardening.surql rename to common/db/migrations/20260528_000002_knowledge_graph_storage_hardening.surql diff --git a/common/migrations/20260528_000003_text_chunk_embedding_storage_hardening.surql b/common/db/migrations/20260528_000003_text_chunk_embedding_storage_hardening.surql similarity index 100% rename from common/migrations/20260528_000003_text_chunk_embedding_storage_hardening.surql rename to common/db/migrations/20260528_000003_text_chunk_embedding_storage_hardening.surql diff --git a/common/migrations/definitions/20251210_add_embedding_backend_to_system_settings.json b/common/db/migrations/definitions/20251210_add_embedding_backend_to_system_settings.json similarity index 100% rename from common/migrations/definitions/20251210_add_embedding_backend_to_system_settings.json rename to common/db/migrations/definitions/20251210_add_embedding_backend_to_system_settings.json diff --git a/common/migrations/definitions/20251231_enforce_schemafull.json b/common/db/migrations/definitions/20251231_enforce_schemafull.json similarity index 100% rename from common/migrations/definitions/20251231_enforce_schemafull.json rename to common/db/migrations/definitions/20251231_enforce_schemafull.json diff --git a/common/migrations/definitions/20260116_000000_add_user_theme_preference.json b/common/db/migrations/definitions/20260116_000000_add_user_theme_preference.json similarity index 100% rename from common/migrations/definitions/20260116_000000_add_user_theme_preference.json rename to common/db/migrations/definitions/20260116_000000_add_user_theme_preference.json diff --git a/common/migrations/definitions/20260528_000001_file_user_sha256_index.json b/common/db/migrations/definitions/20260528_000001_file_user_sha256_index.json similarity index 100% rename from common/migrations/definitions/20260528_000001_file_user_sha256_index.json rename to common/db/migrations/definitions/20260528_000001_file_user_sha256_index.json diff --git a/common/migrations/definitions/20260528_000002_knowledge_graph_storage_hardening.json b/common/db/migrations/definitions/20260528_000002_knowledge_graph_storage_hardening.json similarity index 100% rename from common/migrations/definitions/20260528_000002_knowledge_graph_storage_hardening.json rename to common/db/migrations/definitions/20260528_000002_knowledge_graph_storage_hardening.json diff --git a/common/migrations/definitions/20260528_000003_text_chunk_embedding_storage_hardening.json b/common/db/migrations/definitions/20260528_000003_text_chunk_embedding_storage_hardening.json similarity index 100% rename from common/migrations/definitions/20260528_000003_text_chunk_embedding_storage_hardening.json rename to common/db/migrations/definitions/20260528_000003_text_chunk_embedding_storage_hardening.json diff --git a/common/migrations/definitions/_initial.json b/common/db/migrations/definitions/_initial.json similarity index 100% rename from common/migrations/definitions/_initial.json rename to common/db/migrations/definitions/_initial.json diff --git a/common/schemas/analytics.surql b/common/db/schemas/analytics.surql similarity index 100% rename from common/schemas/analytics.surql rename to common/db/schemas/analytics.surql diff --git a/common/schemas/auth.surql b/common/db/schemas/auth.surql similarity index 100% rename from common/schemas/auth.surql rename to common/db/schemas/auth.surql diff --git a/common/schemas/conversation.surql b/common/db/schemas/conversation.surql similarity index 100% rename from common/schemas/conversation.surql rename to common/db/schemas/conversation.surql diff --git a/common/schemas/file.surql b/common/db/schemas/file.surql similarity index 100% rename from common/schemas/file.surql rename to common/db/schemas/file.surql diff --git a/common/schemas/ingestion_task.surql b/common/db/schemas/ingestion_task.surql similarity index 100% rename from common/schemas/ingestion_task.surql rename to common/db/schemas/ingestion_task.surql diff --git a/common/schemas/knowledge_entity.surql b/common/db/schemas/knowledge_entity.surql similarity index 100% rename from common/schemas/knowledge_entity.surql rename to common/db/schemas/knowledge_entity.surql diff --git a/common/schemas/knowledge_entity_embedding.surql b/common/db/schemas/knowledge_entity_embedding.surql similarity index 100% rename from common/schemas/knowledge_entity_embedding.surql rename to common/db/schemas/knowledge_entity_embedding.surql diff --git a/common/schemas/message.surql b/common/db/schemas/message.surql similarity index 100% rename from common/schemas/message.surql rename to common/db/schemas/message.surql diff --git a/common/schemas/relates_to.surql b/common/db/schemas/relates_to.surql similarity index 100% rename from common/schemas/relates_to.surql rename to common/db/schemas/relates_to.surql diff --git a/common/schemas/scratchpad.surql b/common/db/schemas/scratchpad.surql similarity index 100% rename from common/schemas/scratchpad.surql rename to common/db/schemas/scratchpad.surql diff --git a/common/schemas/script_migration.surql b/common/db/schemas/script_migration.surql similarity index 100% rename from common/schemas/script_migration.surql rename to common/db/schemas/script_migration.surql diff --git a/common/schemas/system_settings.surql b/common/db/schemas/system_settings.surql similarity index 100% rename from common/schemas/system_settings.surql rename to common/db/schemas/system_settings.surql diff --git a/common/schemas/text_chunk.surql b/common/db/schemas/text_chunk.surql similarity index 100% rename from common/schemas/text_chunk.surql rename to common/db/schemas/text_chunk.surql diff --git a/common/schemas/text_chunk_embedding.surql b/common/db/schemas/text_chunk_embedding.surql similarity index 100% rename from common/schemas/text_chunk_embedding.surql rename to common/db/schemas/text_chunk_embedding.surql diff --git a/common/schemas/text_content.surql b/common/db/schemas/text_content.surql similarity index 100% rename from common/schemas/text_content.surql rename to common/db/schemas/text_content.surql diff --git a/common/schemas/user.surql b/common/db/schemas/user.surql similarity index 100% rename from common/schemas/user.surql rename to common/db/schemas/user.surql diff --git a/common/src/storage/db.rs b/common/src/storage/db.rs index e8499e0..3d84e07 100644 --- a/common/src/storage/db.rs +++ b/common/src/storage/db.rs @@ -13,8 +13,8 @@ use surrealdb::{ use surrealdb_migrations::MigrationRunner; use tracing::debug; -/// Embedded SurrealDB migration directory packaged with the crate. -static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/"); +/// Embedded SurrealDB project root (`migrations/`, `schemas/`, `.surrealdb`). +static MIGRATIONS_DIR: Dir<'_> = include_dir!("$CARGO_MANIFEST_DIR/db"); #[derive(Clone)] pub struct SurrealDbClient { diff --git a/common/src/storage/indexes.rs b/common/src/storage/indexes.rs index 40a0e4d..a3d413d 100644 --- a/common/src/storage/indexes.rs +++ b/common/src/storage/indexes.rs @@ -9,6 +9,7 @@ use tracing::{debug, info, warn}; use crate::{error::AppError, storage::db::SurrealDbClient}; const INDEX_POLL_INTERVAL: Duration = Duration::from_millis(50); +const INDEX_BUILD_TIMEOUT: Duration = Duration::from_secs(30 * 60); const FTS_ANALYZER_NAME: &str = "app_en_fts_analyzer"; /// HNSW index options used by runtime index creation (includes CONCURRENTLY). @@ -296,14 +297,10 @@ async fn get_index_status(db: &SurrealDbClient, index_name: &str, table: &str) - return Ok("unknown".to_string()); }; - let building = info.get("building"); - let status = building - .and_then(|b| b.get("status")) - .and_then(|s| s.as_str()) - .unwrap_or("ready") - .to_string(); + let parsed: IndexInfoForIndex = serde_json::from_value(info) + .context("deserializing INFO FOR INDEX response")?; - Ok(status) + Ok(parsed.building_status()) } async fn rebuild_inner(db: &SurrealDbClient) -> Result<()> { @@ -531,8 +528,21 @@ async fn poll_index_build_status( poll_every: Duration, ) -> Result<()> { let started_at = std::time::Instant::now(); + let mut last_snapshot: Option = None; loop { + if started_at.elapsed() >= INDEX_BUILD_TIMEOUT { + return Err(anyhow::anyhow!( + "index build timed out after {:?} for {index_name} on {table} (last status: {})", + INDEX_BUILD_TIMEOUT, + last_snapshot + .as_ref() + .map(|snapshot| snapshot.status.as_str()) + .unwrap_or("unknown") + )) + .with_context(|| format!("index {index_name} on table {table} did not become ready")); + } + tokio::time::sleep(poll_every).await; let info_query = format!("INFO FOR INDEX {index_name} ON TABLE {table};"); @@ -546,14 +556,13 @@ async fn poll_index_build_status( .context("failed to deserialize INFO FOR INDEX result")?; let Some(snapshot) = parse_index_build_info(info, total_rows) else { - warn!( - index = %index_name, - table = %table, - "INFO FOR INDEX returned no data; assuming index definition might be missing" - ); - break; + return Err(anyhow::anyhow!( + "INFO FOR INDEX returned no data for {index_name} on {table}" + )); }; + last_snapshot = Some(snapshot.clone()); + if let Some(pct) = snapshot.progress_pct { debug!( index = %index_name, @@ -589,25 +598,87 @@ async fn poll_index_build_status( total = snapshot.total_rows, "Index is ready" ); - break; + return Ok(()); } if snapshot.status.eq_ignore_ascii_case("error") { - warn!( - index = %index_name, - table = %table, - status = snapshot.status, - "Index build reported error status; stopping polling" - ); - break; + return Err(anyhow::anyhow!( + "index build failed for {index_name} on {table}: status=error, processed={}, total={:?}", + snapshot.processed, + snapshot.total_rows + )); + } + } +} + +/// `building` block from SurrealDB `INFO FOR INDEX` (concurrent index builds). +#[derive(Debug, Clone, Deserialize, PartialEq, Eq)] +struct IndexBuildingProgress { + #[serde(default)] + initial: u64, + #[serde(default)] + pending: u64, + #[serde(default)] + updated: u64, + #[serde(default)] + status: String, +} + +/// Top-level `INFO FOR INDEX` payload shape (SurrealDB v2.x). +#[derive(Debug, Clone, Deserialize, PartialEq, Eq, Default)] +struct IndexInfoForIndex { + #[serde(default)] + building: Option, +} + +impl IndexInfoForIndex { + fn building_status(&self) -> String { + match &self.building { + None => "ready".to_string(), + Some(progress) if progress.status.is_empty() => "ready".to_string(), + Some(progress) => progress.status.clone(), } } - Ok(()) + fn into_build_snapshot(self, total_rows: Option) -> IndexBuildSnapshot { + let (initial, pending, updated, status) = match self.building { + None => (0, 0, 0, "ready".to_string()), + Some(progress) => { + let status = if progress.status.is_empty() { + "ready".to_string() + } else { + progress.status + }; + (progress.initial, progress.pending, progress.updated, status) + } + }; + + let processed = initial.saturating_add(updated); + let progress_pct = total_rows.map(|total| { + if total == 0 { + 0.0 + } else { + ((f64::from(u32::try_from(processed).unwrap_or(u32::MAX)) + / f64::from(u32::try_from(total).unwrap_or(1))) + .min(1.0)) + * 100.0 + } + }); + + IndexBuildSnapshot { + status, + initial, + pending, + updated, + processed, + total_rows, + progress_pct, + } + } } /// Snapshot of an index build progress as reported by SurrealDB's `INFO FOR INDEX`. -#[derive(Debug, PartialEq)] +#[derive(Debug, Clone, PartialEq)] struct IndexBuildSnapshot { /// Current build status string (e.g., `"indexing"`, `"ready"`, `"error"`). status: String, @@ -636,53 +707,8 @@ fn parse_index_build_info( total_rows: Option, ) -> Option { let info = info?; - let building = info.get("building"); - - let status = building - .and_then(|b| b.get("status")) - .and_then(|s| s.as_str()) - // If there's no `building` block at all, treat as "ready" (index not building anymore) - .unwrap_or("ready") - .to_string(); - - let initial = building - .and_then(|b| b.get("initial")) - .and_then(serde_json::Value::as_u64) - .unwrap_or(0); - - let pending = building - .and_then(|b| b.get("pending")) - .and_then(serde_json::Value::as_u64) - .unwrap_or(0); - - let updated = building - .and_then(|b| b.get("updated")) - .and_then(serde_json::Value::as_u64) - .unwrap_or(0); - - // `initial` is the number of rows seen when the build started; `updated` accounts for later writes. - let processed = initial.saturating_add(updated); - - let progress_pct = total_rows.map(|total| { - if total == 0 { - 0.0 - } else { - ((f64::from(u32::try_from(processed).unwrap_or(u32::MAX)) - / f64::from(u32::try_from(total).unwrap_or(1))) - .min(1.0)) - * 100.0 - } - }); - - Some(IndexBuildSnapshot { - status, - initial, - pending, - updated, - processed, - total_rows, - progress_pct, - }) + let parsed: IndexInfoForIndex = serde_json::from_value(info).ok()?; + Some(parsed.into_build_snapshot(total_rows)) } #[derive(Debug, Deserialize)] @@ -786,6 +812,72 @@ mod tests { Ok(()) } + #[test] + fn index_info_for_index_deserializes_ready_status_shape() -> anyhow::Result<()> { + let info = json!({ + "building": { + "status": "ready" + } + }); + + let parsed: IndexInfoForIndex = + serde_json::from_value(info).context("deserialize ready shape")?; + assert_eq!(parsed.building_status(), "ready"); + + let snapshot = parse_index_build_info( + Some(json!({ + "building": { "status": "ready" } + })), + None, + ) + .context("snapshot")?; + assert!(snapshot.is_ready()); + assert_eq!(snapshot.initial, 0); + Ok(()) + } + + #[test] + fn index_info_for_index_deserializes_indexing_shape_from_surreal_docs() -> anyhow::Result<()> { + let info = json!({ + "building": { + "initial": 8143, + "pending": 19, + "status": "indexing", + "updated": 80 + } + }); + + let parsed: IndexInfoForIndex = + serde_json::from_value(info.clone()).context("deserialize indexing shape")?; + assert_eq!(parsed.building_status(), "indexing"); + + let snapshot = parse_index_build_info(Some(info), None).context("snapshot")?; + assert_eq!(snapshot.status, "indexing"); + assert_eq!(snapshot.initial, 8143); + assert_eq!(snapshot.pending, 19); + assert_eq!(snapshot.updated, 80); + assert_eq!(snapshot.processed, 8223); + assert!(!snapshot.is_ready()); + Ok(()) + } + + #[test] + fn parse_index_build_info_reports_error_status() -> anyhow::Result<()> { + let info = json!({ + "building": { + "initial": 100, + "pending": 5, + "status": "error", + "updated": 10 + } + }); + + let snapshot = parse_index_build_info(Some(info), Some(200)).context("snapshot")?; + assert_eq!(snapshot.status, "error"); + assert!(!snapshot.is_ready()); + Ok(()) + } + #[test] fn extract_dimension_parses_value() { let definition = "DEFINE INDEX idx_embedding_text_chunk_embedding ON TABLE text_chunk_embedding FIELDS embedding HNSW DIMENSION 1536 DIST COSINE TYPE F32 EFC 100 M 8;"; diff --git a/common/src/storage/store.rs b/common/src/storage/store.rs index 759a551..b40389a 100644 --- a/common/src/storage/store.rs +++ b/common/src/storage/store.rs @@ -2,7 +2,7 @@ use std::io::ErrorKind; use std::path::{Component, Path, PathBuf}; use std::sync::Arc; -use anyhow::{anyhow, Result as AnyResult}; +use anyhow::{anyhow, Context, Result as AnyResult}; use bytes::Bytes; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; @@ -107,15 +107,18 @@ impl StorageManager { /// Retrieve bytes from the specified location. /// - /// Returns the full contents buffered in memory. + /// Reads via [`Self::get_stream`] and buffers the full object in memory. /// /// # Errors /// /// Returns `Err` if the location does not exist or the underlying backend fails. pub async fn get(&self, location: &str) -> object_store::Result { - let path = ObjPath::from(location); - let result = self.store.get(&path).await?; - result.bytes().await + let mut stream = self.get_stream(location).await?; + let mut collected = Vec::new(); + while let Some(chunk) = stream.next().await { + collected.extend_from_slice(&chunk?); + } + Ok(Bytes::from(collected)) } /// Get a streaming handle for large objects. @@ -252,7 +255,10 @@ async fn create_storage_backend( ) -> object_store::Result<(DynStorage, Option)> { match cfg.storage { StorageKind::Local => { - let base = resolve_base_dir(cfg); + let base = resolve_base_dir(cfg).map_err(|err| object_store::Error::Generic { + store: "LocalFileSystem", + source: err.into(), + })?; if !base.exists() { tokio::fs::create_dir_all(&base).await.map_err(|e| { object_store::Error::Generic { @@ -576,15 +582,22 @@ pub mod testing { /// Resolve the absolute base directory used for local storage from config. /// -/// If `data_dir` is relative, it is resolved against the current working directory. -#[must_use] -pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf { +/// If `data_dir` is relative, it is resolved against the process current working directory. +/// +/// # Errors +/// +/// Returns `Err` when `data_dir` is relative and the current working directory cannot be read. +pub fn resolve_base_dir(cfg: &AppConfig) -> AnyResult { if cfg.data_dir.starts_with('/') { - PathBuf::from(&cfg.data_dir) + Ok(PathBuf::from(&cfg.data_dir)) } else { - std::env::current_dir() - .unwrap_or_else(|_| PathBuf::from(".")) - .join(&cfg.data_dir) + let cwd = std::env::current_dir().with_context(|| { + format!( + "failed to resolve relative data_dir '{}' against the current working directory", + cfg.data_dir + ) + })?; + Ok(cwd.join(&cfg.data_dir)) } } diff --git a/common/src/storage/types/system_settings.rs b/common/src/storage/types/system_settings.rs index ba5faf6..9fd3519 100644 --- a/common/src/storage/types/system_settings.rs +++ b/common/src/storage/types/system_settings.rs @@ -1,4 +1,4 @@ -use crate::utils::embedding::EmbeddingBackend; +use crate::utils::config::EmbeddingBackend; use crate::utils::serde_helpers::deserialize_flexible_id; use serde::{Deserialize, Serialize}; diff --git a/common/src/utils/config.rs b/common/src/utils/config.rs index 04d0eb2..ef45320 100644 --- a/common/src/utils/config.rs +++ b/common/src/utils/config.rs @@ -1,9 +1,18 @@ use config::{Config, ConfigError, Environment, File}; -use serde::Deserialize; -use std::env; +use serde::{Deserialize, Serialize}; +use std::{env, sync::Once, str::FromStr}; +use thiserror::Error; + +/// Error returned when parsing an embedding backend name. +#[derive(Debug, Error, PartialEq, Eq)] +#[error("unknown embedding backend '{input}': expected 'openai', 'hashed', or 'fastembed'")] +pub struct ParseEmbeddingBackendError { + /// The unrecognized input string. + pub input: String, +} /// Selects the embedding backend for vector generation. -#[derive(Clone, Copy, Deserialize, Debug, Default, PartialEq)] +#[derive(Clone, Copy, Deserialize, Serialize, Debug, Default, PartialEq, Eq)] #[serde(rename_all = "lowercase")] pub enum EmbeddingBackend { /// Use OpenAI-compatible API for embeddings. @@ -15,6 +24,32 @@ pub enum EmbeddingBackend { Hashed, } +impl EmbeddingBackend { + #[must_use] + pub fn as_str(self) -> &'static str { + match self { + Self::OpenAI => "openai", + Self::FastEmbed => "fastembed", + Self::Hashed => "hashed", + } + } +} + +impl FromStr for EmbeddingBackend { + type Err = ParseEmbeddingBackendError; + + fn from_str(s: &str) -> Result { + match s.to_ascii_lowercase().as_str() { + "openai" => Ok(Self::OpenAI), + "hashed" => Ok(Self::Hashed), + "fastembed" | "fast-embed" | "fast" => Ok(Self::FastEmbed), + other => Err(ParseEmbeddingBackendError { + input: other.to_string(), + }), + } + } +} + #[derive(Clone, Copy, Deserialize, Debug, PartialEq)] #[serde(rename_all = "lowercase")] pub enum StorageKind { @@ -133,11 +168,17 @@ fn default_ingest_max_category_bytes() -> usize { 128 } +static ORT_PATH_INIT: Once = Once::new(); + +/// Sets `ORT_DYLIB_PATH` once per process when a bundled ONNX runtime library is found. pub fn ensure_ort_path() { - if env::var_os("ORT_DYLIB_PATH").is_some() { - return; - } - if let Ok(mut exe) = env::current_exe() { + ORT_PATH_INIT.call_once(|| { + if env::var_os("ORT_DYLIB_PATH").is_some() { + return; + } + let Ok(mut exe) = env::current_exe() else { + return; + }; exe.pop(); if cfg!(target_os = "windows") { @@ -160,7 +201,7 @@ pub fn ensure_ort_path() { if p.exists() { env::set_var("ORT_DYLIB_PATH", p); } - } + }); } impl Default for AppConfig { diff --git a/common/src/utils/embedding.rs b/common/src/utils/embedding.rs index f67ab4c..a1847b0 100644 --- a/common/src/utils/embedding.rs +++ b/common/src/utils/embedding.rs @@ -8,59 +8,15 @@ use std::{ use anyhow::{anyhow, Context, Result}; use async_openai::{types::CreateEmbeddingRequestArgs, Client}; use fastembed::{EmbeddingModel, ModelTrait, TextEmbedding, TextInitOptions}; -use serde::{Deserialize, Serialize}; -use thiserror::Error; use tracing::debug; use crate::{ error::AppError, storage::{db::SurrealDbClient, types::system_settings::SystemSettings}, + utils::config::AppConfig, }; -/// Error returned when parsing an embedding backend name. -#[derive(Debug, Error, PartialEq, Eq)] -#[error("unknown embedding backend '{input}': expected 'openai', 'hashed', or 'fastembed'")] -pub struct ParseEmbeddingBackendError { - /// The unrecognized input string. - pub input: String, -} - -/// Supported embedding backends. -#[allow(clippy::module_name_repetitions)] -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum EmbeddingBackend { - #[default] - OpenAI, - FastEmbed, - Hashed, -} - -impl EmbeddingBackend { - #[must_use] - pub fn as_str(self) -> &'static str { - match self { - Self::OpenAI => "openai", - Self::FastEmbed => "fastembed", - Self::Hashed => "hashed", - } - } -} - -impl std::str::FromStr for EmbeddingBackend { - type Err = ParseEmbeddingBackendError; - - fn from_str(s: &str) -> Result { - match s.to_ascii_lowercase().as_str() { - "openai" => Ok(Self::OpenAI), - "hashed" => Ok(Self::Hashed), - "fastembed" | "fast-embed" | "fast" => Ok(Self::FastEmbed), - other => Err(ParseEmbeddingBackendError { - input: other.to_string(), - }), - } - } -} +pub use crate::utils::config::{EmbeddingBackend, ParseEmbeddingBackendError}; /// Wrapper around the chosen embedding backend. #[allow(clippy::module_name_repetitions)] @@ -281,30 +237,31 @@ impl EmbeddingProvider { }) } - /// Creates an embedding provider based on application configuration. + /// Creates an embedding provider from persisted settings and bootstrap config. /// - /// Dispatches to the appropriate constructor based on `config.embedding_backend`: - /// - `OpenAI`: Requires a valid OpenAI client - /// - `FastEmbed`: Uses local embedding model - /// - `Hashed`: Uses deterministic hashed embeddings (for testing) - pub async fn from_config( - config: &crate::utils::config::AppConfig, + /// Model name and dimensions come from [`SystemSettings`]. The active backend is taken + /// from `config.embedding_backend` at startup; [`SystemSettings::sync_from_embedding_provider`] + /// persists the resolved backend to the database. + pub async fn from_system_settings( + settings: &SystemSettings, + config: &AppConfig, openai_client: Option>>, ) -> Result { - use crate::utils::config::EmbeddingBackend; - + let dimensions = settings.embedding_dimensions; match config.embedding_backend { EmbeddingBackend::OpenAI => { let client = openai_client .ok_or_else(|| anyhow!("OpenAI embedding backend requires an OpenAI client"))?; - // Use defaults that match SystemSettings initial values - Self::new_openai(client, "text-embedding-3-small".to_string(), 1536) + Self::new_openai(client, settings.embedding_model.clone(), dimensions) } EmbeddingBackend::FastEmbed => { - // Use nomic-embed-text-v1.5 as the default FastEmbed model - Self::new_fastembed(Some("nomic-ai/nomic-embed-text-v1.5".to_string())).await + Self::new_fastembed(Some(settings.embedding_model.clone())).await + } + EmbeddingBackend::Hashed => { + let dimension = usize::try_from(dimensions) + .map_err(|_| anyhow!("embedding_dimensions exceeds usize::MAX"))?; + Self::new_hashed(dimension) } - EmbeddingBackend::Hashed => Self::new_hashed(384), } } } @@ -460,6 +417,14 @@ mod tests { use crate::storage::types::system_settings::SystemSettings; use serde_json::json; + #[test] + fn embedding_backend_defaults_to_fastembed() { + assert_eq!( + EmbeddingBackend::default(), + EmbeddingBackend::FastEmbed + ); + } + #[test] fn embedding_backend_as_str_matches_serde_names() { assert_eq!(EmbeddingBackend::OpenAI.as_str(), "openai"); diff --git a/common/src/utils/ingest_limits.rs b/common/src/utils/ingest_limits.rs index 7692ef2..868fc4b 100644 --- a/common/src/utils/ingest_limits.rs +++ b/common/src/utils/ingest_limits.rs @@ -29,6 +29,14 @@ pub fn validate_ingest_input( category: &str, file_count: usize, ) -> Result<(), IngestValidationError> { + let text_field_bytes = content.map(str::len).unwrap_or(0) + ctx.len() + category.len(); + if text_field_bytes > config.ingest_max_body_bytes { + return Err(IngestValidationError::PayloadTooLarge(format!( + "request text fields exceed maximum allowed body size of {} bytes", + config.ingest_max_body_bytes + ))); + } + if file_count > config.ingest_max_files { return Err(IngestValidationError::BadRequest(format!( "too many files: maximum allowed is {}", @@ -127,4 +135,18 @@ mod tests { assert!(result.is_ok()); } + + #[test] + fn validate_ingest_input_rejects_oversized_text_fields() { + let config = AppConfig { + ingest_max_body_bytes: 10, + ..Default::default() + }; + let result = validate_ingest_input(&config, Some("123456"), "ctx", "cat", 0); + + assert!(matches!( + result, + Err(IngestValidationError::PayloadTooLarge(_)) + )); + } } diff --git a/flake.nix b/flake.nix index 4747963..3486d32 100644 --- a/flake.nix +++ b/flake.nix @@ -27,8 +27,7 @@ filter = let extraPaths = [ (toString ./Cargo.lock) - (toString ./common/migrations) - (toString ./common/schemas) + (toString ./common/db) (toString ./html-router/templates) (toString ./html-router/assets) ]; diff --git a/main/Cargo.toml b/main/Cargo.toml index 2314c88..9dcf6f1 100644 --- a/main/Cargo.toml +++ b/main/Cargo.toml @@ -30,6 +30,7 @@ retrieval-pipeline = { path = "../retrieval-pipeline" } [dev-dependencies] tower = "0.5" uuid = { workspace = true } +chrono = { workspace = true } common = { path = "../common", features = ["test-utils"] } [[bin]] diff --git a/main/src/bootstrap.rs b/main/src/bootstrap.rs deleted file mode 100644 index 4272ed6..0000000 --- a/main/src/bootstrap.rs +++ /dev/null @@ -1,73 +0,0 @@ -use std::sync::Arc; - -use async_openai::Client; -use common::{ - storage::{ - db::SurrealDbClient, - store::StorageManager, - }, - utils::{ - config::{get_config, AppConfig}, - embedding::EmbeddingProvider, - }, -}; -use retrieval_pipeline::reranking::RerankerPool; -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - -pub struct SharedServices { - pub db: Arc, - pub openai_client: Arc>, - pub embedding_provider: Arc, - pub storage: StorageManager, - pub reranker_pool: Option>, - pub config: AppConfig, -} - -pub async fn init() -> anyhow::Result { - tracing_subscriber::registry() - .with(fmt::layer().with_writer(std::io::stderr)) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - - let config = get_config()?; - init_with_config(config).await -} - -pub(crate) async fn init_with_config(config: AppConfig) -> anyhow::Result { - let db = Arc::new( - SurrealDbClient::new( - &config.surrealdb_address, - &config.surrealdb_username, - &config.surrealdb_password, - &config.surrealdb_namespace, - &config.surrealdb_database, - ) - .await?, - ); - - db.apply_migrations().await?; - - let openai_client = Arc::new(Client::with_config( - async_openai::config::OpenAIConfig::new() - .with_api_key(&config.openai_api_key) - .with_api_base(&config.openai_base_url), - )); - - let embedding_provider = Arc::new( - EmbeddingProvider::from_config(&config, Some(Arc::clone(&openai_client))).await?, - ); - - let reranker_pool = RerankerPool::maybe_from_config(&config)?; - - let storage = StorageManager::new(&config).await?; - - Ok(SharedServices { - db, - openai_client, - embedding_provider, - storage, - reranker_pool, - config, - }) -} diff --git a/main/src/bootstrap/mod.rs b/main/src/bootstrap/mod.rs new file mode 100644 index 0000000..13ee0b5 --- /dev/null +++ b/main/src/bootstrap/mod.rs @@ -0,0 +1,136 @@ +mod startup; +pub mod wiring; + +pub use startup::prepare_embedding_runtime; + +use std::sync::Arc; + +use anyhow::Context; +use async_openai::Client; +use common::{ + storage::{ + db::SurrealDbClient, + store::StorageManager, + types::system_settings::SystemSettings, + }, + utils::{ + config::{get_config, AppConfig}, + embedding::EmbeddingProvider, + }, +}; +use retrieval_pipeline::reranking::RerankerPool; +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + +pub struct SharedServices { + pub db: Arc, + pub openai_client: Arc>, + pub embedding_provider: Arc, + pub storage: StorageManager, + pub reranker_pool: Option>, + pub config: AppConfig, +} + +pub async fn init() -> anyhow::Result { + tracing_subscriber::registry() + .with(fmt::layer().with_writer(std::io::stderr)) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); + + let config = get_config()?; + init_with_config(config).await +} + +pub(crate) async fn init_with_config(config: AppConfig) -> anyhow::Result { + let db = Arc::new( + SurrealDbClient::new( + &config.surrealdb_address, + &config.surrealdb_username, + &config.surrealdb_password, + &config.surrealdb_namespace, + &config.surrealdb_database, + ) + .await + .context("connect to surrealdb")?, + ); + + db.apply_migrations() + .await + .context("apply database migrations")?; + + let settings = SystemSettings::get_current(&db) + .await + .context("load system settings")?; + + let openai_client = Arc::new(Client::with_config( + async_openai::config::OpenAIConfig::new() + .with_api_key(&config.openai_api_key) + .with_api_base(&config.openai_base_url), + )); + + let embedding_provider = Arc::new( + EmbeddingProvider::from_system_settings( + &settings, + &config, + Some(Arc::clone(&openai_client)), + ) + .await + .context("initialize embedding provider")?, + ); + + let reranker_pool = RerankerPool::maybe_from_config(&config)?; + + let storage = StorageManager::new(&config) + .await + .context("initialize storage manager")?; + + Ok(SharedServices { + db, + openai_client, + embedding_provider, + storage, + reranker_pool, + config, + }) +} + +#[cfg(test)] +pub(crate) mod tests { + use std::path::Path; + + use anyhow::Context; + use common::utils::config::{AppConfig, EmbeddingBackend, PdfIngestMode, StorageKind}; + use uuid::Uuid; + + pub fn smoke_test_config(namespace: &str, database: &str, data_dir: &Path) -> AppConfig { + AppConfig { + openai_api_key: "test-key".into(), + surrealdb_address: "mem://".into(), + surrealdb_username: "root".into(), + surrealdb_password: "root".into(), + surrealdb_namespace: namespace.into(), + surrealdb_database: database.into(), + data_dir: data_dir.to_string_lossy().into_owned(), + http_port: 0, + openai_base_url: "https://example.com".into(), + storage: StorageKind::Local, + pdf_ingest_mode: PdfIngestMode::LlmFirst, + embedding_backend: EmbeddingBackend::Hashed, + ..Default::default() + } + } + + pub async fn init_smoke_services() -> anyhow::Result<(super::SharedServices, std::path::PathBuf)> + { + let namespace = "test_ns"; + let database = format!("test_db_{}", Uuid::new_v4()); + let data_dir = std::env::temp_dir().join(format!("minne_smoke_{}", Uuid::new_v4())); + tokio::fs::create_dir_all(&data_dir) + .await + .context("create temp data directory")?; + + let config = smoke_test_config(namespace, &database, &data_dir); + let services = super::init_with_config(config).await?; + Ok((services, data_dir)) + } +} diff --git a/main/src/bootstrap/startup.rs b/main/src/bootstrap/startup.rs new file mode 100644 index 0000000..056199a --- /dev/null +++ b/main/src/bootstrap/startup.rs @@ -0,0 +1,66 @@ +use anyhow::Context; +use common::{ + storage::{ + db::SurrealDbClient, + indexes::ensure_runtime, + types::{ + knowledge_entity::KnowledgeEntity, system_settings::SystemSettings, + text_chunk::TextChunk, + }, + }, + utils::embedding::EmbeddingProvider, +}; +use tracing::{info, warn}; + +use super::SharedServices; + +/// Syncs embedding settings, re-embeds stored vectors when dimensions change, and +/// ensures runtime indexes match the active embedding dimension. +pub async fn prepare_embedding_runtime(services: &SharedServices) -> anyhow::Result { + let (settings, dimensions_changed) = + SystemSettings::sync_from_embedding_provider(&services.db, &services.embedding_provider) + .await + .context("sync system settings from embedding provider")?; + + if dimensions_changed { + re_embed_all( + &services.db, + &services.embedding_provider, + settings.embedding_dimensions, + ) + .await?; + } + + ensure_runtime( + &services.db, + settings.embedding_dimensions as usize, + ) + .await + .context("ensure runtime indexes")?; + + Ok(settings) +} + +async fn re_embed_all( + db: &SurrealDbClient, + embedding_provider: &EmbeddingProvider, + embedding_dimensions: u32, +) -> anyhow::Result<()> { + warn!( + embedding_dimensions, + "Embedding configuration changed; re-embedding existing data" + ); + + info!("Re-embedding TextChunks"); + TextChunk::update_all_embeddings_with_provider(db, embedding_provider) + .await + .context("re-embed text chunks after embedding dimension change")?; + + info!("Re-embedding KnowledgeEntities"); + KnowledgeEntity::update_all_embeddings_with_provider(db, embedding_provider) + .await + .context("re-embed knowledge entities after embedding dimension change")?; + + info!("Re-embedding complete"); + Ok(()) +} diff --git a/main/src/bootstrap/wiring.rs b/main/src/bootstrap/wiring.rs new file mode 100644 index 0000000..27a3410 --- /dev/null +++ b/main/src/bootstrap/wiring.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use anyhow::Context; +use api_router::{api_routes_v1, api_state::ApiState}; +use axum::{extract::FromRef, Router}; +use html_router::{ + html_routes, + html_state::{HtmlState, StateResources}, +}; + +use super::SharedServices; + +/// Builds the Minne API and HTML route subtrees without fixing the outer Axum state +/// type. SaaS consumers can merge additional routers and attach their own `AppState` +/// as long as it implements `FromRef` for `ApiState` and `HtmlState`. +pub fn minne_routes(api_state: &ApiState, html_state: &HtmlState) -> Router +where + S: Clone + Send + Sync + 'static, + ApiState: FromRef, + HtmlState: FromRef, +{ + Router::new() + .nest("/api/v1", api_routes_v1(api_state)) + .merge(html_routes(html_state)) +} + +pub fn build_api_state(services: &SharedServices) -> ApiState { + ApiState { + db: Arc::clone(&services.db), + config: services.config.clone(), + storage: services.storage.clone(), + } +} + +pub async fn build_html_state(services: &SharedServices) -> anyhow::Result { + let session_store = Arc::new( + services + .db + .create_session_store() + .await + .context("create session store")?, + ); + + Ok(HtmlState::new_with_resources(StateResources { + db: Arc::clone(&services.db), + openai_client: Arc::clone(&services.openai_client), + session_store, + storage: services.storage.clone(), + config: services.config.clone(), + reranker_pool: services.reranker_pool.clone(), + embedding_provider: Arc::clone(&services.embedding_provider), + template_engine: None, + })) +} diff --git a/main/src/main.rs b/main/src/main.rs index 2b634fa..6ec9582 100644 --- a/main/src/main.rs +++ b/main/src/main.rs @@ -2,50 +2,17 @@ mod bootstrap; use std::sync::Arc; -use api_router::{api_routes_v1, api_state::ApiState}; -use axum::{extract::FromRef, Router}; -use common::{ - storage::{ - indexes::ensure_runtime, - types::{ - knowledge_entity::KnowledgeEntity, system_settings::SystemSettings, - text_chunk::TextChunk, - }, - }, -}; -use html_router::{ - html_routes, - html_state::{HtmlState, StateResources}, +use axum::extract::FromRef; +use bootstrap::{ + init, prepare_embedding_runtime, + wiring::{build_api_state, build_html_state, minne_routes}, }; use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop}; -use tracing::{error, info, warn}; -use tokio::task::LocalSet; - -fn spawn_server_thread( - listener: tokio::net::TcpListener, - app: Router, -) -> std::thread::JoinHandle<()> { - std::thread::spawn(move || { - let rt = match tokio::runtime::Runtime::new() { - Ok(rt) => rt, - Err(e) => { - error!("Failed to create server runtime: {e}"); - return; - } - }; - rt.block_on(async { - if let Err(e) = axum::serve(listener, app).await { - error!("Server error: {}", e); - } - }); - }) -} +use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { - let services = bootstrap::init().await?; - - let session_store = Arc::new(services.db.create_session_store().await?); + let services = init().await?; info!( embedding_backend = ?services.config.embedding_backend, @@ -53,64 +20,16 @@ async fn main() -> anyhow::Result<()> { "Embedding provider initialized" ); - let (settings, dimensions_changed) = - SystemSettings::sync_from_embedding_provider(&services.db, &services.embedding_provider) - .await?; + prepare_embedding_runtime(&services).await?; - if dimensions_changed { - warn!( - new_dimensions = settings.embedding_dimensions, - "Embedding configuration changed; re-embedding existing data" - ); + let html_state = build_html_state(&services).await?; + let api_state = build_api_state(&services); - info!("Re-embedding TextChunks"); - if let Err(e) = - TextChunk::update_all_embeddings_with_provider(&services.db, &services.embedding_provider) - .await - { - error!( - "Failed to re-embed TextChunks: {}. Search results may be stale.", - e - ); - } - - info!("Re-embedding KnowledgeEntities"); - if let Err(e) = - KnowledgeEntity::update_all_embeddings_with_provider(&services.db, &services.embedding_provider) - .await - { - error!( - "Failed to re-embed KnowledgeEntities: {}. Search results may be stale.", - e - ); - } - - info!("Re-embedding complete."); - } - - ensure_runtime(&services.db, settings.embedding_dimensions as usize).await?; - - let html_state = HtmlState::new_with_resources(StateResources { - db: Arc::clone(&services.db), - openai_client: Arc::clone(&services.openai_client), - session_store, - storage: services.storage.clone(), - config: services.config.clone(), - reranker_pool: services.reranker_pool.clone(), - embedding_provider: Arc::clone(&services.embedding_provider), - template_engine: None, + let app = minne_routes(&api_state, &html_state).with_state(AppState { + api_state, + html_state, }); - let api_state = ApiState::new(&services.config, services.storage.clone()).await?; - - let app = Router::new() - .nest("/api/v1", api_routes_v1(&api_state)) - .merge(html_routes(&html_state)) - .with_state(AppState { - api_state, - html_state, - }); - info!( "Starting server listening on 0.0.0.0:{}", services.config.http_port @@ -118,28 +37,32 @@ async fn main() -> anyhow::Result<()> { let serve_address = format!("0.0.0.0:{}", services.config.http_port); let listener = tokio::net::TcpListener::bind(serve_address).await?; - let server_handle = spawn_server_thread(listener, app); + let worker_db = Arc::clone(&services.db); + let worker_openai = Arc::clone(&services.openai_client); + let worker_embedding = Arc::clone(&services.embedding_provider); + let worker_config = services.config.clone(); + let worker_reranker = services.reranker_pool.clone(); + let worker_storage = services.storage.clone(); - let ingestion_pipeline = Arc::new(IngestionPipeline::new( - Arc::clone(&services.db), - Arc::clone(&services.openai_client), - services.config.clone(), - services.reranker_pool.clone(), - services.storage, - Arc::clone(&services.embedding_provider), - )?); - - let local = LocalSet::new(); - local.spawn_local(async move { + let server = tokio::spawn(async move { axum::serve(listener, app).await }); + let worker = tokio::spawn(async move { info!("Starting worker process"); - if let Err(e) = run_worker_loop(services.db, ingestion_pipeline).await { - error!("Worker error: {}", e); - } - }); - local.await; - if let Err(e) = server_handle.join() { - error!("Server thread panicked: {:?}", e); + let ingestion_pipeline = Arc::new(IngestionPipeline::new( + Arc::clone(&worker_db), + worker_openai, + worker_config, + worker_reranker, + worker_storage, + worker_embedding, + )?); + + run_worker_loop(worker_db, ingestion_pipeline).await + }); + + tokio::select! { + result = server => result??, + result = worker => result??, } Ok(()) @@ -147,8 +70,8 @@ async fn main() -> anyhow::Result<()> { #[derive(Clone, FromRef)] struct AppState { - api_state: ApiState, - html_state: HtmlState, + api_state: api_router::api_state::ApiState, + html_state: html_router::html_state::HtmlState, } #[cfg(test)] @@ -160,79 +83,33 @@ mod tests { response::Response, Router, }; - use common::storage::{ - db::SurrealDbClient, - store::StorageManager, - types::{system_settings::SystemSettings, user::User}, + use bootstrap::{ + prepare_embedding_runtime, + tests::init_smoke_services, + wiring::{build_api_state, build_html_state, minne_routes}, }; - use common::utils::config::{AppConfig, EmbeddingBackend, PdfIngestMode, StorageKind}; - use std::{path::Path, sync::Arc}; + use common::storage::types::{system_settings::SystemSettings, user::User}; use tower::ServiceExt; - use uuid::Uuid; - fn smoke_test_config(namespace: &str, database: &str, data_dir: &Path) -> AppConfig { - AppConfig { - openai_api_key: "test-key".into(), - surrealdb_address: "mem://".into(), - surrealdb_username: "root".into(), - surrealdb_password: "root".into(), - surrealdb_namespace: namespace.into(), - surrealdb_database: database.into(), - data_dir: data_dir.to_string_lossy().into_owned(), - http_port: 0, - openai_base_url: "https://example.com".into(), - storage: StorageKind::Local, - pdf_ingest_mode: PdfIngestMode::LlmFirst, - embedding_backend: EmbeddingBackend::Hashed, - ..Default::default() - } - } - - async fn build_test_app() -> (Router, Arc, std::path::PathBuf) { - let namespace = "test_ns"; - let database = format!("test_db_{}", Uuid::new_v4()); - let data_dir = std::env::temp_dir().join(format!("minne_smoke_{}", Uuid::new_v4())); - tokio::fs::create_dir_all(&data_dir).await - .expect("failed to create temp data directory"); - - let config = smoke_test_config(namespace, &database, &data_dir); - let services = crate::bootstrap::init_with_config(config.clone()) + async fn build_test_app() -> (Router, Arc, std::path::PathBuf) { + let (services, data_dir) = init_smoke_services() .await .expect("failed to init services"); - let session_store = Arc::new( - services - .db - .create_session_store() - .await - .expect("failed to create session store"), - ); + prepare_embedding_runtime(&services) + .await + .expect("failed to prepare embedding runtime"); - let html_state = HtmlState::new_with_resources(StateResources { - db: Arc::clone(&services.db), - openai_client: Arc::clone(&services.openai_client), - session_store, - storage: services.storage.clone(), - config: services.config.clone(), - reranker_pool: services.reranker_pool.clone(), - embedding_provider: Arc::clone(&services.embedding_provider), - template_engine: None, + let html_state = build_html_state(&services) + .await + .expect("failed to build html state"); + let api_state = build_api_state(&services); + + let app = minne_routes(&api_state, &html_state).with_state(AppState { + api_state, + html_state, }); - let api_state = ApiState { - db: Arc::clone(&services.db), - config: services.config.clone(), - storage: services.storage, - }; - - let app = Router::new() - .nest("/api/v1", api_routes_v1(&api_state)) - .merge(html_routes(&html_state)) - .with_state(AppState { - api_state, - html_state, - }); - (app, services.db, data_dir) } diff --git a/main/src/server.rs b/main/src/server.rs index 462fea9..1b79391 100644 --- a/main/src/server.rs +++ b/main/src/server.rs @@ -1,47 +1,25 @@ mod bootstrap; -use std::sync::Arc; - -use api_router::{api_routes_v1, api_state::ApiState}; -use axum::{extract::FromRef, Router}; -use common::storage::types::system_settings::SystemSettings; -use html_router::{ - html_routes, - html_state::{HtmlState, StateResources}, +use axum::extract::FromRef; +use bootstrap::{ + init, prepare_embedding_runtime, + wiring::{build_api_state, build_html_state, minne_routes}, }; use tracing::info; #[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn main() -> anyhow::Result<()> { - let services = bootstrap::init().await?; + let services = init().await?; + prepare_embedding_runtime(&services).await?; - let session_store = Arc::new(services.db.create_session_store().await?); + let html_state = build_html_state(&services).await?; + let api_state = build_api_state(&services); - let (_settings, _dimensions_changed) = - SystemSettings::sync_from_embedding_provider(&services.db, &services.embedding_provider) - .await?; - - let html_state = HtmlState::new_with_resources(StateResources { - db: Arc::clone(&services.db), - openai_client: Arc::clone(&services.openai_client), - session_store, - storage: services.storage.clone(), - config: services.config.clone(), - reranker_pool: services.reranker_pool.clone(), - embedding_provider: Arc::clone(&services.embedding_provider), - template_engine: None, + let app = minne_routes(&api_state, &html_state).with_state(AppState { + api_state, + html_state, }); - let api_state = ApiState::new(&services.config, services.storage).await?; - - let app = Router::new() - .nest("/api/v1", api_routes_v1(&api_state)) - .merge(html_routes(&html_state)) - .with_state(AppState { - api_state, - html_state, - }); - info!( "Starting server listening on 0.0.0.0:{}", services.config.http_port @@ -55,6 +33,6 @@ async fn main() -> anyhow::Result<()> { #[derive(Clone, FromRef)] struct AppState { - api_state: ApiState, - html_state: HtmlState, + api_state: api_router::api_state::ApiState, + html_state: html_router::html_state::HtmlState, } diff --git a/main/src/worker.rs b/main/src/worker.rs index 4e34332..52eb8a5 100644 --- a/main/src/worker.rs +++ b/main/src/worker.rs @@ -2,12 +2,14 @@ mod bootstrap; use std::sync::Arc; +use bootstrap::{init, prepare_embedding_runtime}; use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop}; use tracing::info; #[tokio::main] async fn main() -> anyhow::Result<()> { - let services = bootstrap::init().await?; + let services = init().await?; + prepare_embedding_runtime(&services).await?; info!( embedding_backend = ?services.config.embedding_backend, @@ -25,3 +27,58 @@ async fn main() -> anyhow::Result<()> { run_worker_loop(services.db, ingestion_pipeline).await } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + use std::time::Duration; + + use chrono::Utc; + use common::storage::types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS}; + use ingestion_pipeline::pipeline::IngestionPipeline; + + use crate::bootstrap::tests::init_smoke_services; + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn worker_smoke_initializes_and_claims_idle() -> anyhow::Result<()> { + let (services, data_dir) = init_smoke_services().await?; + + let pipeline = IngestionPipeline::new( + Arc::clone(&services.db), + Arc::clone(&services.openai_client), + services.config.clone(), + services.reranker_pool.clone(), + services.storage, + Arc::clone(&services.embedding_provider), + )?; + + let worker_id = "worker-smoke"; + let claimed = IngestionTask::claim_next_ready( + &services.db, + worker_id, + Utc::now(), + Duration::from_secs(DEFAULT_LEASE_SECS as u64), + ) + .await?; + assert!( + claimed.is_none(), + "worker smoke test should find no pending tasks" + ); + + let db = Arc::clone(&services.db); + let pipeline = Arc::new(pipeline); + let worker = tokio::spawn(async move { + ingestion_pipeline::run_worker_loop(db, pipeline).await + }); + + tokio::time::sleep(Duration::from_millis(250)).await; + assert!( + !worker.is_finished(), + "worker loop should keep running while idle" + ); + worker.abort(); + + tokio::fs::remove_dir_all(&data_dir).await.ok(); + Ok(()) + } +}