diff --git a/Cargo.lock b/Cargo.lock index c7588be..07f1700 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1929,13 +1929,22 @@ dependencies = [ "subtle", ] +[[package]] +name = "dirs" +version = "5.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44c45a9d03d6676652bcb5e724c7e988de1acad23a711b5217ab9cbecbec2225" +dependencies = [ + "dirs-sys 0.4.1", +] + [[package]] name = "dirs" version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3e8aa94d75141228480295a7d0e7feb620b1a5ad9f12bc40be62411e38cce4e" dependencies = [ - "dirs-sys", + "dirs-sys 0.5.0", ] [[package]] @@ -1948,6 +1957,18 @@ dependencies = [ "dirs-sys-next", ] +[[package]] +name = "dirs-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "520f05a5cbd335fae5a99ff7a6ab8627577660ee5cfd6a94a6a929b52ff0321c" +dependencies = [ + "libc", + "option-ext", + "redox_users 0.4.6", + "windows-sys 0.48.0", +] + [[package]] name = "dirs-sys" version = "0.5.0" @@ -2159,6 +2180,9 @@ name = "esaxx-rs" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d817e038c30374a4bcb22f94d0a8a0e216958d4c3dcde369b1439fec4bdda6e6" +dependencies = [ + "cc", +] [[package]] name = "euclid" @@ -2198,6 +2222,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "unicode-normalization", "uuid", ] @@ -2278,12 +2303,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2e9bf3ea201e5d338450555088e02cff23b00be92bead3eff7ed341c68f5ac6" dependencies = [ "anyhow", - "hf-hub", + "hf-hub 0.4.3", "image", "ndarray 0.16.1", "ort", "serde_json", - "tokenizers", + "tokenizers 0.22.1", ] [[package]] @@ -2808,13 +2833,30 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "hf-hub" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b780635574b3d92f036890d8373433d6f9fc7abb320ee42a5c25897fc8ed732" +dependencies = [ + "dirs 5.0.1", + "indicatif", + "log", + "native-tls", + "rand 0.8.5", + "serde", + "serde_json", + "thiserror 1.0.69", + "ureq", +] + [[package]] name = "hf-hub" version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "629d8f3bbeda9d148036d6b0de0a3ab947abd08ce90626327fc3547a49d59d97" dependencies = [ - "dirs", + "dirs 6.0.0", "http", "indicatif", "libc", @@ -3337,6 +3379,7 @@ dependencies = [ "surrealdb", "tempfile", "text-splitter", + "tokenizers 0.20.4", "tokio", "tracing", "url", @@ -4986,6 +5029,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "pulldown-cmark" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f86ba2052aebccc42cbbb3ed234b8b13ce76f75c3551a303cb2bcffcff12bb14" +dependencies = [ + "bitflags 2.9.0", + "memchr", + "unicase", +] + [[package]] name = "pxfm" version = "0.1.25" @@ -5251,6 +5305,17 @@ dependencies = [ "rayon-core", ] +[[package]] +name = "rayon-cond" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "059f538b55efd2309c9794130bc149c6a553db90e9d99c2030785c82f0bd7df9" +dependencies = [ + "either", + "itertools 0.11.0", + "rayon", +] + [[package]] name = "rayon-cond" version = "0.4.0" @@ -6746,9 +6811,11 @@ dependencies = [ "either", "itertools 0.13.0", "once_cell", + "pulldown-cmark", "regex", "strum", "thiserror 1.0.69", + "tokenizers 0.20.4", "unicode-segmentation", ] @@ -6908,6 +6975,39 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tokenizers" +version = "0.20.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b08cc37428a476fc9e20ac850132a513a2e1ce32b6a31addf2b74fa7033b905" +dependencies = [ + "aho-corasick", + "derive_builder", + "esaxx-rs", + "getrandom 0.2.16", + "hf-hub 0.3.2", + "indicatif", + "itertools 0.12.1", + "lazy_static", + "log", + "macro_rules_attribute", + "monostate", + "onig", + "paste", + "rand 0.8.5", + "rayon", + "rayon-cond 0.3.0", + "regex", + "regex-syntax 0.8.5", + "serde", + "serde_json", + "spm_precompiled", + "thiserror 1.0.69", + "unicode-normalization-alignments", + "unicode-segmentation", + "unicode_categories", +] + [[package]] name = "tokenizers" version = "0.22.1" @@ -6929,7 +7029,7 @@ dependencies = [ "paste", "rand 0.9.1", "rayon", - "rayon-cond", + "rayon-cond 0.4.0", "regex", "regex-syntax 0.8.5", "serde", @@ -7919,6 +8019,15 @@ dependencies = [ "windows-link 0.1.1", ] +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", +] + [[package]] name = "windows-sys" version = "0.52.0" @@ -7946,6 +8055,21 @@ dependencies = [ "windows-targets 0.53.5", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -7979,6 +8103,12 @@ dependencies = [ "windows_x86_64_msvc 0.53.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -7991,6 +8121,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86b8d5f90ddd19cb4a147a5fa63ca848db3df085e25fee3cc10b39b6eebae764" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -8003,6 +8139,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c7651a1f62a11b8cbd5e0d42526e55f2c99886c77e007179efff86c2b137e66c" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -8027,6 +8169,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ce6ccbdedbf6d6354471319e781c0dfef054c81fbc7cf83f338a4296c0cae11" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -8039,6 +8187,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "581fee95406bb13382d2f65cd4a908ca7b1e4c2f1917f143ba16efe98a589b5d" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -8051,6 +8205,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e55b5ac9ea33f2fc1716d1742db15574fd6fc8dadc51caab1c16a3d3b4190ba" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -8063,6 +8223,12 @@ version = "0.53.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a6e035dd0599267ce1ee132e51c27dd29437f63325753051e71dd9e42406c57" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" diff --git a/Cargo.toml b/Cargo.toml index a4eaec9..a857efa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,9 @@ sha2 = "0.10.8" surrealdb-migrations = "2.2.2" surrealdb = { version = "2", features = ["kv-mem"] } tempfile = "3.12.0" -text-splitter = "0.18.1" +text-splitter = { version = "0.18.1", features = ["markdown", "tokenizers"] } +tokenizers = { version = "0.20.4", features = ["http"] } +unicode-normalization = "0.1.24" thiserror = "1.0.63" tokio-util = { version = "0.7.15", features = ["io"] } tokio = { version = "1", features = ["full"] } diff --git a/common/src/storage/indexes.rs b/common/src/storage/indexes.rs index c4a60b0..0cd8ace 100644 --- a/common/src/storage/indexes.rs +++ b/common/src/storage/indexes.rs @@ -1,6 +1,7 @@ use std::time::Duration; use anyhow::{Context, Result}; +use futures::future::try_join_all; use serde::Deserialize; use serde_json::Value; use tracing::{info, warn}; @@ -83,10 +84,11 @@ async fn ensure_runtime_indexes_inner( ) -> Result<()> { create_fts_analyzer(db).await?; - for spec in fts_index_specs() { + let fts_tasks = fts_index_specs().into_iter().map(|spec| async move { if index_exists(db, spec.table, spec.index_name).await? { - continue; + return Ok(()); } + create_index_with_polling( db, spec.definition(), @@ -94,53 +96,52 @@ async fn ensure_runtime_indexes_inner( spec.table, Some(spec.table), ) - .await?; - } + .await + }); - for spec in hnsw_index_specs() { - ensure_hnsw_index(db, &spec, embedding_dimension).await?; - } + let hnsw_tasks = hnsw_index_specs() + .into_iter() + .map(|spec| async move { + match hnsw_index_state(db, &spec, embedding_dimension).await? { + HnswIndexState::Missing => { + create_index_with_polling( + db, + spec.definition_if_not_exists(embedding_dimension), + spec.index_name, + spec.table, + Some(spec.table), + ) + .await + } + HnswIndexState::Matches => Ok(()), + HnswIndexState::Different(existing) => { + info!( + index = spec.index_name, + table = spec.table, + existing_dimension = existing, + target_dimension = embedding_dimension, + "Overwriting HNSW index to match new embedding dimension" + ); + create_index_with_polling( + db, + spec.definition_overwrite(embedding_dimension), + spec.index_name, + spec.table, + Some(spec.table), + ) + .await + } + } + }); + + futures::try_join!( + async { try_join_all(fts_tasks).await.map(|_| ()) }, + async { try_join_all(hnsw_tasks).await.map(|_| ()) }, + )?; Ok(()) } -async fn ensure_hnsw_index( - db: &SurrealDbClient, - spec: &HnswIndexSpec, - dimension: usize, -) -> Result<()> { - match hnsw_index_state(db, spec, dimension).await? { - HnswIndexState::Missing => { - create_index_with_polling( - db, - spec.definition_if_not_exists(dimension), - spec.index_name, - spec.table, - Some(spec.table), - ) - .await - } - HnswIndexState::Matches => Ok(()), - HnswIndexState::Different(existing) => { - info!( - index = spec.index_name, - table = spec.table, - existing_dimension = existing, - target_dimension = dimension, - "Overwriting HNSW index to match new embedding dimension" - ); - create_index_with_polling( - db, - spec.definition_overwrite(dimension), - spec.index_name, - spec.table, - Some(spec.table), - ) - .await - } - } -} - async fn hnsw_index_state( db: &SurrealDbClient, spec: &HnswIndexSpec, @@ -265,9 +266,10 @@ async fn poll_index_build_status( tokio::time::sleep(poll_every).await; let info_query = format!("INFO FOR INDEX {index_name} ON TABLE {table};"); - let mut info_res = db.client.query(info_query).await.with_context(|| { - format!("checking index build status for {index_name} on {table}") - })?; + let mut info_res = + db.client.query(info_query).await.with_context(|| { + format!("checking index build status for {index_name} on {table}") + })?; let info: Option = info_res .take(0) @@ -461,7 +463,7 @@ const fn hnsw_index_specs() -> [HnswIndexSpec; 2] { ] } -const fn fts_index_specs() -> [FtsIndexSpec; 9] { +const fn fts_index_specs() -> [FtsIndexSpec; 8] { [ FtsIndexSpec { index_name: "text_content_fts_idx", @@ -470,13 +472,6 @@ const fn fts_index_specs() -> [FtsIndexSpec; 9] { analyzer: Some(FTS_ANALYZER_NAME), method: "BM25", }, - FtsIndexSpec { - index_name: "text_content_category_fts_idx", - table: "text_content", - field: "category", - analyzer: Some(FTS_ANALYZER_NAME), - method: "BM25", - }, FtsIndexSpec { index_name: "text_content_context_fts_idx", table: "text_content", diff --git a/eval/Cargo.toml b/eval/Cargo.toml index f5d8ca9..33cf249 100644 --- a/eval/Cargo.toml +++ b/eval/Cargo.toml @@ -18,6 +18,7 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } uuid = { workspace = true } text-splitter = { workspace = true } +unicode-normalization = { workspace = true } rand = "0.8" sha2 = { workspace = true } object_store = { workspace = true } diff --git a/eval/src/args.rs b/eval/src/args.rs index 88e9a0d..73f4d64 100644 --- a/eval/src/args.rs +++ b/eval/src/args.rs @@ -52,14 +52,6 @@ impl std::fmt::Display for EmbeddingBackend { #[derive(Debug, Clone, Args)] pub struct RetrievalSettings { - /// Minimum characters per chunk for text splitting - #[arg(long, default_value_t = 500)] - pub chunk_min_chars: usize, - - /// Maximum characters per chunk for text splitting - #[arg(long, default_value_t = 2000)] - pub chunk_max_chars: usize, - /// Override chunk vector candidate cap #[arg(long)] pub chunk_vector_take: Option, @@ -68,10 +60,6 @@ pub struct RetrievalSettings { #[arg(long)] pub chunk_fts_take: Option, - /// Override chunk token budget estimate for assembly - #[arg(long)] - pub chunk_token_budget: Option, - /// Override average characters per token used for budgeting #[arg(long)] pub chunk_avg_chars_per_token: Option, @@ -80,18 +68,22 @@ pub struct RetrievalSettings { #[arg(long)] pub max_chunks_per_entity: Option, - /// Disable the FastEmbed reranking stage - #[arg(long = "no-rerank", action = clap::ArgAction::SetFalse)] + /// Enable the FastEmbed reranking stage + #[arg(long = "rerank", action = clap::ArgAction::SetTrue, default_value_t = false)] pub rerank: bool, /// Reranking engine pool size / parallelism - #[arg(long, default_value_t = 16)] + #[arg(long, default_value_t = 4)] pub rerank_pool_size: usize, /// Keep top-N entities after reranking #[arg(long, default_value_t = 10)] pub rerank_keep_top: usize, + /// Cap the number of chunks returned by retrieval (revised strategy) + #[arg(long, default_value_t = 5)] + pub chunk_result_cap: usize, + /// Require verified chunks (disable with --llm-mode) #[arg(skip = true)] pub require_verified_chunks: bool, @@ -104,16 +96,14 @@ pub struct RetrievalSettings { impl Default for RetrievalSettings { fn default() -> Self { Self { - chunk_min_chars: 500, - chunk_max_chars: 2_000, chunk_vector_take: None, chunk_fts_take: None, - chunk_token_budget: None, chunk_avg_chars_per_token: None, max_chunks_per_entity: None, - rerank: true, - rerank_pool_size: 16, + rerank: false, + rerank_pool_size: 4, rerank_keep_top: 10, + chunk_result_cap: 5, require_verified_chunks: true, strategy: RetrievalStrategy::Initial, } @@ -175,7 +165,7 @@ pub struct Config { pub retrieval: RetrievalSettings, /// Concurrency level - #[arg(long, default_value_t = 4)] + #[arg(long, default_value_t = 1)] pub concurrency: usize, /// Embedding backend @@ -195,19 +185,23 @@ pub struct Config { pub ingestion_cache_dir: PathBuf, /// Minimum tokens per chunk for ingestion - #[arg(long, default_value_t = 500)] + #[arg(long, default_value_t = 256)] pub ingest_chunk_min_tokens: usize, /// Maximum tokens per chunk for ingestion - #[arg(long, default_value_t = 2_000)] + #[arg(long, default_value_t = 512)] pub ingest_chunk_max_tokens: usize, + /// Overlap between chunks during ingestion (tokens) + #[arg(long, default_value_t = 50)] + pub ingest_chunk_overlap_tokens: usize, + /// Run ingestion in chunk-only mode (skip analyzer/graph generation) #[arg(long)] pub ingest_chunks_only: bool, /// Number of paragraphs to ingest concurrently - #[arg(long, default_value_t = 5)] + #[arg(long, default_value_t = 10)] pub ingestion_batch_size: usize, /// Maximum retries for ingestion failures per paragraph @@ -354,15 +348,9 @@ impl Config { } // Validations - if self.retrieval.chunk_min_chars >= self.retrieval.chunk_max_chars { - return Err(anyhow!( - "--chunk-min must be less than --chunk-max (got {} >= {})", - self.retrieval.chunk_min_chars, - self.retrieval.chunk_max_chars - )); - } - - if self.ingest_chunk_min_tokens == 0 || self.ingest_chunk_min_tokens >= self.ingest_chunk_max_tokens { + if self.ingest_chunk_min_tokens == 0 + || self.ingest_chunk_min_tokens >= self.ingest_chunk_max_tokens + { return Err(anyhow!( "--ingest-chunk-min-tokens must be greater than zero and less than --ingest-chunk-max-tokens (got {} >= {})", self.ingest_chunk_min_tokens, @@ -370,6 +358,14 @@ impl Config { )); } + if self.ingest_chunk_overlap_tokens >= self.ingest_chunk_min_tokens { + return Err(anyhow!( + "--ingest-chunk-overlap-tokens ({}) must be less than --ingest-chunk-min-tokens ({})", + self.ingest_chunk_overlap_tokens, + self.ingest_chunk_min_tokens + )); + } + if self.retrieval.rerank && self.retrieval.rerank_pool_size == 0 { return Err(anyhow!( "--rerank-pool must be greater than zero when reranking is enabled" @@ -444,9 +440,7 @@ pub struct ParsedArgs { pub fn parse() -> Result { let mut config = Config::parse(); config.finalize()?; - Ok(ParsedArgs { - config, - }) + Ok(ParsedArgs { config }) } pub fn ensure_parent(path: &Path) -> Result<()> { diff --git a/eval/src/eval/mod.rs b/eval/src/eval/mod.rs index 1b8070f..0ab7254 100644 --- a/eval/src/eval/mod.rs +++ b/eval/src/eval/mod.rs @@ -15,7 +15,6 @@ use common::{ types::{system_settings::SystemSettings, user::User, StoredObject}, }, }; -use retrieval_pipeline::RetrievalTuning; use serde::Deserialize; use tokio::io::AsyncWriteExt; use tracing::{info, warn}; @@ -120,37 +119,6 @@ pub(crate) fn ledger_target(config: &Config) -> Option { } } -pub(crate) fn apply_dataset_tuning_overrides( - dataset: &ConvertedDataset, - config: &Config, - tuning: &mut RetrievalTuning, -) { - let is_long_form = dataset - .metadata - .id - .to_ascii_lowercase() - .contains("natural-questions"); - if !is_long_form { - return; - } - - if config.retrieval.chunk_vector_take.is_none() { - tuning.chunk_vector_take = tuning.chunk_vector_take.max(80); - } - if config.retrieval.chunk_fts_take.is_none() { - tuning.chunk_fts_take = tuning.chunk_fts_take.max(80); - } - if config.retrieval.chunk_token_budget.is_none() { - tuning.token_budget_estimate = tuning.token_budget_estimate.max(20_000); - } - if config.retrieval.max_chunks_per_entity.is_none() { - tuning.max_chunks_per_entity = tuning.max_chunks_per_entity.max(12); - } - if tuning.lexical_match_weight < 0.25 { - tuning.lexical_match_weight = 0.3; - } -} - pub(crate) async fn write_chunk_diagnostics(path: &Path, cases: &[CaseDiagnostics]) -> Result<()> { args::ensure_parent(path)?; let mut file = tokio::fs::File::create(path) @@ -175,9 +143,10 @@ pub(crate) async fn warm_hnsw_cache(db: &SurrealDbClient, dimension: usize) -> R let _ = db .client .query( - "SELECT chunk_id \ - FROM text_chunk_embedding \ - WHERE embedding <|1,1|> $embedding LIMIT 5", + r#"SELECT chunk_id + FROM text_chunk_embedding + WHERE embedding <|1,1|> $embedding + LIMIT 5"#, ) .bind(("embedding", dummy_embedding.clone())) .await @@ -187,9 +156,10 @@ pub(crate) async fn warm_hnsw_cache(db: &SurrealDbClient, dimension: usize) -> R let _ = db .client .query( - "SELECT entity_id \ - FROM knowledge_entity_embedding \ - WHERE embedding <|1,1|> $embedding LIMIT 5", + r#"SELECT entity_id + FROM knowledge_entity_embedding + WHERE embedding <|1,1|> $embedding + LIMIT 5"#, ) .bind(("embedding", dummy_embedding)) .await @@ -427,12 +397,10 @@ pub(crate) async fn enforce_system_settings( ) -> Result { let mut updated_settings = settings.clone(); let mut needs_settings_update = false; - // let mut embedding_dimension_changed = false; if provider_dimension != settings.embedding_dimensions as usize { updated_settings.embedding_dimensions = provider_dimension as u32; needs_settings_update = true; - // embedding_dimension_changed = true; } if let Some(query_override) = config.query_model.as_deref() { if settings.query_model != query_override { @@ -449,12 +417,6 @@ pub(crate) async fn enforce_system_settings( .await .context("updating system settings overrides")?; } - // We dont need to do this, we've changed from default settings already - // if embedding_dimension_changed { - // change_embedding_length_in_hnsw_indexes(db, provider_dimension) - // .await - // .context("redefining HNSW indexes for new embedding dimension")?; - // } Ok(settings) } diff --git a/eval/src/eval/pipeline/stages/run_queries.rs b/eval/src/eval/pipeline/stages/run_queries.rs index edbcdf0..34e7a41 100644 --- a/eval/src/eval/pipeline/stages/run_queries.rs +++ b/eval/src/eval/pipeline/stages/run_queries.rs @@ -6,7 +6,7 @@ use futures::stream::{self, StreamExt}; use tracing::{debug, info}; use crate::eval::{ - adapt_strategy_output, apply_dataset_tuning_overrides, build_case_diagnostics, + adapt_strategy_output, build_case_diagnostics, text_contains_answer, CaseDiagnostics, CaseSummary, RetrievedSummary, }; use retrieval_pipeline::{ @@ -56,15 +56,13 @@ pub(crate) async fn run_queries( if retrieval_config.tuning.fallback_min_results < config.retrieval.rerank_keep_top { retrieval_config.tuning.fallback_min_results = config.retrieval.rerank_keep_top; } + retrieval_config.tuning.chunk_result_cap = config.retrieval.chunk_result_cap.max(1); if let Some(value) = config.retrieval.chunk_vector_take { retrieval_config.tuning.chunk_vector_take = value; } if let Some(value) = config.retrieval.chunk_fts_take { retrieval_config.tuning.chunk_fts_take = value; } - if let Some(value) = config.retrieval.chunk_token_budget { - retrieval_config.tuning.token_budget_estimate = value; - } if let Some(value) = config.retrieval.chunk_avg_chars_per_token { retrieval_config.tuning.avg_chars_per_token = value; } @@ -72,8 +70,6 @@ pub(crate) async fn run_queries( retrieval_config.tuning.max_chunks_per_entity = value; } - apply_dataset_tuning_overrides(dataset, config, &mut retrieval_config.tuning); - let active_tuning = retrieval_config.tuning.clone(); let effective_chunk_vector = config .retrieval @@ -95,11 +91,8 @@ pub(crate) async fn run_queries( rerank_enabled = config.retrieval.rerank, rerank_pool_size = config.retrieval.rerank_pool_size, rerank_keep_top = config.retrieval.rerank_keep_top, - chunk_min = config.retrieval.chunk_min_chars, - chunk_max = config.retrieval.chunk_max_chars, chunk_vector_take = effective_chunk_vector, chunk_fts_take = effective_chunk_fts, - chunk_token_budget = active_tuning.token_budget_estimate, embedding_backend = ctx.embedding_provider().backend_label(), embedding_model = ctx .embedding_provider() @@ -405,4 +398,3 @@ fn calculate_ndcg(retrieved: &[RetrievedSummary], k: usize) -> f64 { dcg / idcg } } - diff --git a/eval/src/eval/pipeline/stages/summarize.rs b/eval/src/eval/pipeline/stages/summarize.rs index 3d5c851..2da9b9d 100644 --- a/eval/src/eval/pipeline/stages/summarize.rs +++ b/eval/src/eval/pipeline/stages/summarize.rs @@ -201,9 +201,13 @@ pub(crate) async fn summarize( concurrency: config.concurrency.max(1), detailed_report: config.detailed_report, retrieval_strategy: config.retrieval.strategy.to_string(), + chunk_result_cap: config.retrieval.chunk_result_cap, + ingest_chunk_min_tokens: config.ingest_chunk_min_tokens, + ingest_chunk_max_tokens: config.ingest_chunk_max_tokens, + ingest_chunks_only: config.ingest_chunks_only, + ingest_chunk_overlap_tokens: config.ingest_chunk_overlap_tokens, chunk_vector_take: active_tuning.chunk_vector_take, chunk_fts_take: active_tuning.chunk_fts_take, - chunk_token_budget: active_tuning.token_budget_estimate, chunk_avg_chars_per_token: active_tuning.avg_chars_per_token, max_chunks_per_entity: active_tuning.max_chunks_per_entity, cases: summaries, diff --git a/eval/src/eval/types.rs b/eval/src/eval/types.rs index 0395992..f599b27 100644 --- a/eval/src/eval/types.rs +++ b/eval/src/eval/types.rs @@ -6,6 +6,7 @@ use retrieval_pipeline::{ PipelineDiagnostics, PipelineStageTimings, RetrievedChunk, RetrievedEntity, StrategyOutput, }; use serde::{Deserialize, Serialize}; +use unicode_normalization::UnicodeNormalization; #[derive(Debug, Serialize)] pub struct EvaluationSummary { @@ -68,9 +69,13 @@ pub struct EvaluationSummary { pub concurrency: usize, pub detailed_report: bool, pub retrieval_strategy: String, + pub chunk_result_cap: usize, + pub ingest_chunk_min_tokens: usize, + pub ingest_chunk_max_tokens: usize, + pub ingest_chunks_only: bool, + pub ingest_chunk_overlap_tokens: usize, pub chunk_vector_take: usize, pub chunk_fts_take: usize, - pub chunk_token_budget: usize, pub chunk_avg_chars_per_token: usize, pub max_chunks_per_entity: usize, pub cases: Vec, @@ -107,7 +112,17 @@ pub struct LatencyStats { pub p95: u128, } -#[derive(Debug, Clone, Serialize)] +impl Default for LatencyStats { + fn default() -> Self { + Self { + avg: 0.0, + p50: 0, + p95: 0, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct StageLatencyBreakdown { pub embed: LatencyStats, pub collect_candidates: LatencyStats, @@ -117,7 +132,7 @@ pub struct StageLatencyBreakdown { pub assemble: LatencyStats, } -#[derive(Debug, Default, Clone, Serialize)] +#[derive(Debug, Default, Clone, Serialize, Deserialize)] pub struct EvaluationStageTimings { pub prepare_slice_ms: u128, pub prepare_db_ms: u128, @@ -128,7 +143,7 @@ pub struct EvaluationStageTimings { pub finalize_ms: u128, } -#[derive(Debug, Serialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct PerformanceTimings { pub openai_base_url: String, pub ingestion_ms: u128, @@ -254,8 +269,44 @@ pub fn text_contains_answer(text: &str, answers: &[String]) -> bool { if answers.is_empty() { return true; } - let haystack = text.to_ascii_lowercase(); - answers.iter().any(|needle| haystack.contains(needle)) + let haystack = normalize_for_match(text); + answers + .iter() + .map(|needle| normalize_for_match(needle)) + .any(|needle| !needle.is_empty() && haystack.contains(&needle)) +} + +fn normalize_for_match(input: &str) -> String { + // NFKC normalize, lowercase, and collapse whitespace/punctuation to a single space + // to reduce false negatives from formatting or punctuation differences. + let mut out = String::with_capacity(input.len()); + let mut last_space = false; + for ch in input.nfkc().flat_map(|c| c.to_lowercase()) { + let is_space = ch.is_whitespace(); + let is_punct = ch.is_ascii_punctuation() + || matches!( + ch, + '“' | '”' | '‘' | '’' | '«' | '»' | '–' | '—' | '…' | '·' | '•' + ); + if is_space || is_punct { + if !last_space { + out.push(' '); + last_space = true; + } + } else { + out.push(ch); + last_space = false; + } + } + + let trimmed = out.trim(); + if trimmed.is_empty() { + return String::new(); + } + + trimmed + .trim_matches(|c: char| c.is_ascii_punctuation() || c.is_whitespace()) + .to_string() } fn chunk_snippet(text: &str) -> String { diff --git a/eval/src/ingest/mod.rs b/eval/src/ingest/mod.rs index 020bb53..f63d23f 100644 --- a/eval/src/ingest/mod.rs +++ b/eval/src/ingest/mod.rs @@ -17,6 +17,7 @@ pub fn make_ingestion_config(config: &crate::args::Config) -> ingestion_pipeline let mut tuning = ingestion_pipeline::IngestionTuning::default(); tuning.chunk_min_tokens = config.ingest_chunk_min_tokens; tuning.chunk_max_tokens = config.ingest_chunk_max_tokens; + tuning.chunk_overlap_tokens = config.ingest_chunk_overlap_tokens; ingestion_pipeline::IngestionConfig { tuning, diff --git a/eval/src/main.rs b/eval/src/main.rs index 97d3773..c03c9ab 100644 --- a/eval/src/main.rs +++ b/eval/src/main.rs @@ -172,13 +172,14 @@ async fn async_main() -> anyhow::Result<()> { .await .context("running retrieval evaluation")?; - let report_paths = report::write_reports( + let report = report::write_reports( &summary, parsed.config.report_dir.as_path(), parsed.config.summary_sample, ) .with_context(|| format!("writing reports to {}", parsed.config.report_dir.display()))?; - let perf_log_path = perf::write_perf_logs( + let perf_mirrors = perf::mirror_perf_outputs( + &report.record, &summary, parsed.config.report_dir.as_path(), parsed.config.perf_log_json.as_deref(), @@ -186,14 +187,27 @@ async fn async_main() -> anyhow::Result<()> { ) .with_context(|| { format!( - "writing perf logs under {}", + "writing perf mirrors under {}", parsed.config.report_dir.display() ) })?; + let perf_note = if perf_mirrors.is_empty() { + String::new() + } else { + format!( + " | Perf mirrors: {}", + perf_mirrors + .iter() + .map(|path| path.display().to_string()) + .collect::>() + .join(", ") + ) + }; + if summary.llm_cases > 0 { println!( - "[{}] Retrieval Precision@{k}: {precision:.3} ({correct}/{retrieval_total}) + LLM: {llm_answered}/{llm_total} ({llm_precision:.3}) → JSON: {json} | Markdown: {md} | Perf: {perf}", + "[{}] Retrieval Precision@{k}: {precision:.3} ({correct}/{retrieval_total}) + LLM: {llm_answered}/{llm_total} ({llm_precision:.3}) → JSON: {json} | Markdown: {md} | History: {history}{perf_note}", summary.dataset_label, k = summary.k, precision = summary.precision, @@ -202,26 +216,28 @@ async fn async_main() -> anyhow::Result<()> { llm_answered = summary.llm_answered, llm_total = summary.llm_cases, llm_precision = summary.llm_precision, - json = report_paths.json.display(), - md = report_paths.markdown.display(), - perf = perf_log_path.display() + json = report.paths.json.display(), + md = report.paths.markdown.display(), + history = report.history_path.display(), + perf_note = perf_note, ); } else { println!( - "[{}] Retrieval Precision@{k}: {precision:.3} ({correct}/{retrieval_total}) → JSON: {json} | Markdown: {md} | Perf: {perf}", + "[{}] Retrieval Precision@{k}: {precision:.3} ({correct}/{retrieval_total}) → JSON: {json} | Markdown: {md} | History: {history}{perf_note}", summary.dataset_label, k = summary.k, precision = summary.precision, correct = summary.correct, retrieval_total = summary.retrieval_cases, - json = report_paths.json.display(), - md = report_paths.markdown.display(), - perf = perf_log_path.display() + json = report.paths.json.display(), + md = report.paths.markdown.display(), + history = report.history_path.display(), + perf_note = perf_note, ); } if parsed.config.perf_log_console { - perf::print_console_summary(&summary); + perf::print_console_summary(&report.record); } Ok(()) diff --git a/eval/src/perf.rs b/eval/src/perf.rs index 3cb8201..f44b047 100644 --- a/eval/src/perf.rs +++ b/eval/src/perf.rs @@ -1,160 +1,37 @@ use std::{ - fs::{self, OpenOptions}, - io::Write, + fs, path::{Path, PathBuf}, }; use anyhow::{Context, Result}; -use serde::Serialize; use crate::{ args, - eval::{format_timestamp, EvaluationStageTimings, EvaluationSummary}, - report, + eval::EvaluationSummary, + report::{self, EvaluationReport}, }; -#[derive(Debug, Serialize)] -struct PerformanceLogEntry { - generated_at: String, - dataset_id: String, - dataset_label: String, - run_label: Option, - retrieval_strategy: String, - slice_id: String, - slice_seed: u64, - slice_window_offset: usize, - slice_window_length: usize, - limit: Option, - total_cases: usize, - correct: usize, - precision: f64, - retrieval_cases: usize, - llm_cases: usize, - llm_answered: usize, - llm_precision: f64, - k: usize, - openai_base_url: String, - ingestion: IngestionPerf, - namespace: NamespacePerf, - retrieval: RetrievalPerf, - evaluation_stages: EvaluationStageTimings, -} - -#[derive(Debug, Serialize)] -struct IngestionPerf { - duration_ms: u128, - cache_path: String, - reused: bool, - embeddings_reused: bool, - fingerprint: String, - positives_total: usize, - negatives_total: usize, -} - -#[derive(Debug, Serialize)] -struct NamespacePerf { - reused: bool, - seed_ms: Option, -} - -#[derive(Debug, Serialize)] -struct RetrievalPerf { - latency_ms: crate::eval::LatencyStats, - stage_latency: crate::eval::StageLatencyBreakdown, - concurrency: usize, - rerank_enabled: bool, - rerank_pool_size: Option, - rerank_keep_top: usize, - evaluated_cases: usize, -} - -impl PerformanceLogEntry { - fn from_summary(summary: &EvaluationSummary) -> Self { - let ingestion = IngestionPerf { - duration_ms: summary.perf.ingestion_ms, - cache_path: summary.ingestion_cache_path.clone(), - reused: summary.ingestion_reused, - embeddings_reused: summary.ingestion_embeddings_reused, - fingerprint: summary.ingestion_fingerprint.clone(), - positives_total: summary.slice_positive_paragraphs, - negatives_total: summary.slice_negative_paragraphs, - }; - - let namespace = NamespacePerf { - reused: summary.namespace_reused, - seed_ms: summary.perf.namespace_seed_ms, - }; - - let retrieval = RetrievalPerf { - latency_ms: summary.latency_ms.clone(), - stage_latency: summary.perf.stage_latency.clone(), - concurrency: summary.concurrency, - rerank_enabled: summary.rerank_enabled, - rerank_pool_size: summary.rerank_pool_size, - rerank_keep_top: summary.rerank_keep_top, - evaluated_cases: summary.retrieval_cases, - }; - - Self { - generated_at: format_timestamp(&summary.generated_at), - dataset_id: summary.dataset_id.clone(), - dataset_label: summary.dataset_label.clone(), - run_label: summary.run_label.clone(), - retrieval_strategy: summary.retrieval_strategy.clone(), - slice_id: summary.slice_id.clone(), - slice_seed: summary.slice_seed, - slice_window_offset: summary.slice_window_offset, - slice_window_length: summary.slice_window_length, - limit: summary.limit, - total_cases: summary.total_cases, - correct: summary.correct, - precision: summary.precision, - retrieval_cases: summary.retrieval_cases, - llm_cases: summary.llm_cases, - llm_answered: summary.llm_answered, - llm_precision: summary.llm_precision, - k: summary.k, - openai_base_url: summary.perf.openai_base_url.clone(), - ingestion, - namespace, - retrieval, - evaluation_stages: summary.perf.evaluation_stage_ms.clone(), - } - } -} - -pub fn write_perf_logs( +pub fn mirror_perf_outputs( + record: &EvaluationReport, summary: &EvaluationSummary, report_root: &Path, extra_json: Option<&Path>, extra_dir: Option<&Path>, -) -> Result { - let entry = PerformanceLogEntry::from_summary(summary); - let dataset_dir = report::dataset_report_dir(report_root, &summary.dataset_id); - fs::create_dir_all(&dataset_dir) - .with_context(|| format!("creating dataset perf directory {}", dataset_dir.display()))?; - - let log_path = dataset_dir.join("perf-log.jsonl"); - let mut file = OpenOptions::new() - .create(true) - .append(true) - .open(&log_path) - .with_context(|| format!("opening perf log {}", log_path.display()))?; - let line = serde_json::to_vec(&entry).context("serialising perf log entry")?; - file.write_all(&line)?; - file.write_all(b"\n")?; - file.flush()?; +) -> Result> { + let mut written = Vec::new(); if let Some(path) = extra_json { args::ensure_parent(path)?; - let blob = serde_json::to_vec_pretty(&entry).context("serialising perf log JSON")?; + let blob = serde_json::to_vec_pretty(record).context("serialising perf log JSON")?; fs::write(path, blob) .with_context(|| format!("writing perf log copy to {}", path.display()))?; + written.push(path.to_path_buf()); } if let Some(dir) = extra_dir { fs::create_dir_all(dir) .with_context(|| format!("creating perf log directory {}", dir.display()))?; + let dataset_dir = report::dataset_report_dir(report_root, &summary.dataset_id); let dataset_slug = dataset_dir .file_name() .and_then(|os| os.to_str()) @@ -162,22 +39,24 @@ pub fn write_perf_logs( let timestamp = summary.generated_at.format("%Y%m%dT%H%M%S").to_string(); let filename = format!("perf-{}-{}.json", dataset_slug, timestamp); let path = dir.join(filename); - let blob = serde_json::to_vec_pretty(&entry).context("serialising perf log JSON")?; + let blob = serde_json::to_vec_pretty(record).context("serialising perf log JSON")?; fs::write(&path, blob) .with_context(|| format!("writing perf log mirror {}", path.display()))?; + written.push(path); } - Ok(log_path) + Ok(written) } -pub fn print_console_summary(summary: &EvaluationSummary) { - let perf = &summary.perf; +pub fn print_console_summary(record: &EvaluationReport) { + let perf = &record.performance; println!( - "[perf] retrieval strategy={} | rerank={} (pool {:?}, keep {})", - summary.retrieval_strategy, - summary.rerank_enabled, - summary.rerank_pool_size, - summary.rerank_keep_top + "[perf] retrieval strategy={} | concurrency={} | rerank={} (pool {:?}, keep {})", + record.retrieval.strategy, + record.retrieval.concurrency, + record.retrieval.rerank_enabled, + record.retrieval.rerank_pool_size, + record.retrieval.rerank_keep_top ); println!( "[perf] ingestion={}ms | namespace_seed={}", @@ -194,7 +73,7 @@ pub fn print_console_summary(summary: &EvaluationSummary) { stage.rerank.avg, stage.assemble.avg, ); - let eval = &perf.evaluation_stage_ms; + let eval = &perf.evaluation_stages_ms; println!( "[perf] eval stage ms → slice {} | db {} | corpus {} | namespace {} | queries {} | summarize {} | finalize {}", eval.prepare_slice_ms, @@ -315,9 +194,12 @@ mod tests { concurrency: 2, retrieval_strategy: "initial".into(), detailed_report: false, + ingest_chunk_min_tokens: 256, + ingest_chunk_max_tokens: 512, + ingest_chunk_overlap_tokens: 50, + ingest_chunks_only: false, chunk_vector_take: 20, chunk_fts_take: 20, - chunk_token_budget: 10000, chunk_avg_chars_per_token: 4, max_chunks_per_entity: 4, average_ndcg: 0.0, @@ -327,18 +209,34 @@ mod tests { } #[test] - fn writes_perf_log_jsonl() { + fn writes_perf_mirrors_from_record() { let tmp = tempdir().unwrap(); let report_root = tmp.path().join("reports"); let summary = sample_summary(); - let log_path = write_perf_logs(&summary, &report_root, None, None).expect("perf log write"); - assert!(log_path.exists()); - let contents = std::fs::read_to_string(&log_path).expect("reading perf log jsonl"); + let record = report::EvaluationReport::from_summary(&summary, 5); + + let json_path = tmp.path().join("extra.json"); + let dir_path = tmp.path().join("copies"); + let outputs = mirror_perf_outputs( + &record, + &summary, + &report_root, + Some(json_path.as_path()), + Some(dir_path.as_path()), + ) + .expect("perf mirrors"); + + assert!(json_path.exists()); + let content = std::fs::read_to_string(&json_path).expect("reading mirror json"); assert!( - contents.contains("\"openai_base_url\":\"https://example.com\""), - "serialized log should include base URL" + content.contains("\"evaluation_stages_ms\""), + "perf mirror should include evaluation stage timings" ); - let dataset_dir = report::dataset_report_dir(&report_root, &summary.dataset_id); - assert!(dataset_dir.join("perf-log.jsonl").exists()); + assert_eq!(outputs.len(), 2); + let mirrored = outputs + .into_iter() + .filter(|path| path.starts_with(&dir_path)) + .collect::>(); + assert_eq!(mirrored.len(), 1, "expected timestamped mirror in dir"); } } diff --git a/eval/src/report.rs b/eval/src/report.rs index 374811a..b47c7e4 100644 --- a/eval/src/report.rs +++ b/eval/src/report.rs @@ -19,7 +19,7 @@ pub struct ReportPaths { pub markdown: PathBuf, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct EvaluationReport { pub overview: OverviewSection, pub dataset: DatasetSection, @@ -28,14 +28,15 @@ pub struct EvaluationReport { #[serde(skip_serializing_if = "Option::is_none")] pub llm: Option, pub performance: PerformanceSection, - #[serde(skip_serializing_if = "Vec::is_empty")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub misses: Vec, - #[serde(skip_serializing_if = "Vec::is_empty")] + #[serde(default, skip_serializing_if = "Vec::is_empty")] pub llm_cases: Vec, + #[serde(default)] pub detailed_report: bool, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct OverviewSection { pub generated_at: String, pub run_label: Option, @@ -43,7 +44,7 @@ pub struct OverviewSection { pub filtered_questions: usize, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct DatasetSection { pub id: String, pub label: String, @@ -55,7 +56,7 @@ pub struct DatasetSection { pub embedding_dimension: usize, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct SliceSection { pub id: String, pub seed: u64, @@ -69,7 +70,7 @@ pub struct SliceSection { pub negative_multiplier: f32, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RetrievalSection { pub k: usize, pub cases: usize, @@ -86,16 +87,21 @@ pub struct RetrievalSection { pub rerank_enabled: bool, pub rerank_pool_size: Option, pub rerank_keep_top: usize, + pub chunk_result_cap: usize, + pub ingest_chunk_min_tokens: usize, + pub ingest_chunk_max_tokens: usize, + pub ingest_chunk_overlap_tokens: usize, + pub ingest_chunks_only: bool, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct LlmSection { pub cases: usize, pub answered: usize, pub precision: f64, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct PerformanceSection { pub openai_base_url: String, pub ingestion_ms: u128, @@ -111,7 +117,7 @@ pub struct PerformanceSection { pub negative_paragraphs_reused: usize, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MissEntry { pub question_id: String, pub paragraph_title: String, @@ -122,7 +128,7 @@ pub struct MissEntry { pub retrieved: Vec, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct LlmCaseEntry { pub question_id: String, pub answered: bool, @@ -131,7 +137,7 @@ pub struct LlmCaseEntry { pub retrieved: Vec, } -#[derive(Debug, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct RetrievedSnippet { pub rank: usize, pub source_id: String, @@ -139,6 +145,13 @@ pub struct RetrievedSnippet { pub matched: bool, } +#[derive(Debug)] +pub struct ReportOutcome { + pub record: EvaluationReport, + pub paths: ReportPaths, + pub history_path: PathBuf, +} + impl EvaluationReport { pub fn from_summary(summary: &EvaluationSummary, sample: usize) -> Self { let overview = OverviewSection { @@ -188,6 +201,11 @@ impl EvaluationReport { rerank_enabled: summary.rerank_enabled, rerank_pool_size: summary.rerank_pool_size, rerank_keep_top: summary.rerank_keep_top, + chunk_result_cap: summary.chunk_result_cap, + ingest_chunk_min_tokens: summary.ingest_chunk_min_tokens, + ingest_chunk_max_tokens: summary.ingest_chunk_max_tokens, + ingest_chunk_overlap_tokens: summary.ingest_chunk_overlap_tokens, + ingest_chunks_only: summary.ingest_chunks_only, }; let llm = if summary.llm_cases > 0 { @@ -215,24 +233,30 @@ impl EvaluationReport { negative_paragraphs_reused: summary.negative_paragraphs_reused, }; - let misses = summary - .cases - .iter() - .filter(|case| !case.matched && !case.is_impossible) - .take(sample) - .map(MissEntry::from_case) - .collect(); - - let llm_cases = if llm.is_some() { - summary + let (misses, llm_cases) = if summary.detailed_report { + let misses = summary .cases .iter() - .filter(|case| case.is_impossible) + .filter(|case| !case.matched && !case.is_impossible) .take(sample) - .map(LlmCaseEntry::from_case) - .collect() + .map(MissEntry::from_case) + .collect(); + + let llm_cases = if llm.is_some() { + summary + .cases + .iter() + .filter(|case| case.is_impossible) + .take(sample) + .map(LlmCaseEntry::from_case) + .collect() + } else { + Vec::new() + }; + + (misses, llm_cases) } else { - Vec::new() + (Vec::new(), Vec::new()) }; Self { @@ -299,7 +323,7 @@ pub fn write_reports( summary: &EvaluationSummary, report_dir: &Path, sample: usize, -) -> Result { +) -> Result { fs::create_dir_all(report_dir) .with_context(|| format!("creating report directory {}", report_dir.display()))?; let dataset_dir = dataset_report_dir(report_dir, &summary.dataset_id); @@ -331,11 +355,15 @@ pub fn write_reports( fs::write(&latest_md, markdown) .with_context(|| format!("writing latest Markdown report to {}", latest_md.display()))?; - record_history(summary, &dataset_dir)?; + let history_path = record_history(&report, &dataset_dir)?; - Ok(ReportPaths { - json: json_path, - markdown: md_path, + Ok(ReportOutcome { + record: report, + paths: ReportPaths { + json: json_path, + markdown: md_path, + }, + history_path, }) } @@ -555,10 +583,14 @@ fn render_markdown(report: &EvaluationReport) -> String { ); if report.misses.is_empty() { - md.push_str("\\n_All evaluated retrieval queries matched within the top-k window._\\n"); if report.detailed_report { md.push_str( - "\\nSuccess measures were captured for each query (entity, chunk text, chunk ID).\\n", + "\\n_All evaluated retrieval queries matched within the top-k window._\\n\ + \\nSuccess measures were captured for each query (entity, chunk text, chunk ID).\\n", + ); + } else { + md.push_str( + "\\n_Misses omitted. Re-run with `--detailed-report` to see sampled failures._\\n", ); } } else { @@ -597,7 +629,11 @@ fn render_markdown(report: &EvaluationReport) -> String { if report.llm.is_some() { md.push_str("\\n## LLM-Only Cases (sample)\\n\\n"); if report.llm_cases.is_empty() { - md.push_str("All LLM-only cases matched within the evaluation window.\\n"); + if report.detailed_report { + md.push_str("All LLM-only cases matched within the evaluation window.\\n"); + } else { + md.push_str("LLM-only cases omitted. Re-run with `--detailed-report` to see samples.\\n"); + } } else { md.push_str("| Question ID | Answered | Match Rank | Top Retrieved |\\n"); md.push_str("| --- | --- | --- | --- |\\n"); @@ -681,7 +717,7 @@ pub fn dataset_report_dir(report_dir: &Path, dataset_id: &str) -> PathBuf { } #[derive(Debug, Serialize, Deserialize)] -struct HistoryEntry { +struct LegacyHistoryEntry { generated_at: String, run_label: Option, dataset_id: String, @@ -719,7 +755,18 @@ struct HistoryEntry { rerank_enabled: bool, rerank_keep_top: usize, rerank_pool_size: Option, - delta: Option, + #[serde(default)] + chunk_result_cap: Option, + #[serde(default)] + ingest_chunk_min_tokens: Option, + #[serde(default)] + ingest_chunk_max_tokens: Option, + #[serde(default)] + ingest_chunk_overlap_tokens: Option, + #[serde(default)] + ingest_chunks_only: Option, + #[serde(default)] + delta: Option, openai_base_url: String, ingestion_ms: u128, #[serde(default)] @@ -727,92 +774,173 @@ struct HistoryEntry { } #[derive(Debug, Serialize, Deserialize)] -struct HistoryDelta { +struct LegacyHistoryDelta { precision: f64, precision_at_1: f64, latency_avg_ms: f64, } -fn record_history(summary: &EvaluationSummary, report_dir: &Path) -> Result<()> { - let path = report_dir.join("evaluations.json"); - let mut entries: Vec = if path.exists() { - let contents = fs::read(&path) - .with_context(|| format!("reading evaluation log {}", path.display()))?; - match serde_json::from_slice(&contents) { - Ok(entries) => entries, - Err(err) => { - let timestamp = Utc::now().format("%Y%m%dT%H%M%S"); - let backup_path = - report_dir.join(format!("evaluations.json.corrupted.{}", timestamp)); +fn default_stage_latency() -> StageLatencyBreakdown { + StageLatencyBreakdown { + embed: LatencyStats::default(), + collect_candidates: LatencyStats::default(), + graph_expansion: LatencyStats::default(), + chunk_attach: LatencyStats::default(), + rerank: LatencyStats::default(), + assemble: LatencyStats::default(), + } +} + +fn convert_legacy_entry(entry: LegacyHistoryEntry) -> EvaluationReport { + let overview = OverviewSection { + generated_at: entry.generated_at, + run_label: entry.run_label, + total_cases: entry.slice_cases, + filtered_questions: 0, + }; + + let dataset = DatasetSection { + id: entry.dataset_id, + label: entry.dataset_label, + source: String::new(), + includes_unanswerable: entry.llm_cases > 0, + require_verified_chunks: true, + embedding_backend: entry.embedding_backend, + embedding_model: entry.embedding_model, + embedding_dimension: 0, + }; + + let slice = SliceSection { + id: entry.slice_id, + seed: entry.slice_seed, + window_offset: entry.slice_window_offset, + window_length: entry.slice_window_length, + slice_cases: entry.slice_cases, + ledger_total_cases: entry.slice_total_cases, + positives: 0, + negatives: 0, + total_paragraphs: 0, + negative_multiplier: 0.0, + }; + + let retrieval_cases = if entry.retrieval_cases > 0 { + entry.retrieval_cases + } else { + entry.slice_cases.saturating_sub(entry.llm_cases) + }; + let retrieval_precision = if entry.retrieval_precision > 0.0 { + entry.retrieval_precision + } else { + entry.precision + }; + + let retrieval = RetrievalSection { + k: entry.k, + cases: retrieval_cases, + correct: 0, + precision: retrieval_precision, + precision_at_1: entry.precision_at_1, + precision_at_2: entry.precision_at_2, + precision_at_3: entry.precision_at_3, + mrr: entry.mrr, + average_ndcg: entry.average_ndcg, + latency: entry.latency_ms, + concurrency: 0, + strategy: "unknown".into(), + rerank_enabled: entry.rerank_enabled, + rerank_pool_size: entry.rerank_pool_size, + rerank_keep_top: entry.rerank_keep_top, + chunk_result_cap: entry.chunk_result_cap.unwrap_or(5), + ingest_chunk_min_tokens: entry.ingest_chunk_min_tokens.unwrap_or(256), + ingest_chunk_max_tokens: entry.ingest_chunk_max_tokens.unwrap_or(512), + ingest_chunk_overlap_tokens: entry.ingest_chunk_overlap_tokens.unwrap_or(50), + ingest_chunks_only: entry.ingest_chunks_only.unwrap_or(false), + }; + + let llm = if entry.llm_cases > 0 { + Some(LlmSection { + cases: entry.llm_cases, + answered: 0, + precision: entry.llm_precision, + }) + } else { + None + }; + + let performance = PerformanceSection { + openai_base_url: entry.openai_base_url, + ingestion_ms: entry.ingestion_ms, + namespace_seed_ms: entry.namespace_seed_ms, + evaluation_stages_ms: EvaluationStageTimings::default(), + stage_latency: default_stage_latency(), + namespace_reused: false, + ingestion_reused: entry.ingestion_reused, + embeddings_reused: entry.ingestion_embeddings_reused, + ingestion_cache_path: String::new(), + corpus_paragraphs: 0, + positive_paragraphs_reused: 0, + negative_paragraphs_reused: 0, + }; + + EvaluationReport { + overview, + dataset, + slice, + retrieval, + llm, + performance, + misses: Vec::new(), + llm_cases: Vec::new(), + detailed_report: false, + } +} + +fn load_history(path: &Path) -> Result> { + if !path.exists() { + return Ok(Vec::new()); + } + + let contents = + fs::read(path).with_context(|| format!("reading evaluation log {}", path.display()))?; + + if let Ok(entries) = serde_json::from_slice::>(&contents) { + return Ok(entries); + } + + match serde_json::from_slice::>(&contents) { + Ok(entries) => Ok(entries.into_iter().map(convert_legacy_entry).collect()), + Err(err) => { + let timestamp = Utc::now().format("%Y%m%dT%H%M%S"); + let backup_path = path + .parent() + .unwrap_or_else(|| Path::new(".")) + .join(format!("evaluations.json.corrupted.{timestamp}")); + warn!( + path = %path.display(), + backup = %backup_path.display(), + error = %err, + "Evaluation history file is corrupted; backing up and starting fresh" + ); + if let Err(e) = fs::rename(path, &backup_path) { warn!( path = %path.display(), - backup = %backup_path.display(), - error = %err, - "Evaluation history file is corrupted; backing up and starting fresh" + error = %e, + "Failed to backup corrupted evaluation history" ); - if let Err(e) = fs::rename(&path, &backup_path) { - warn!( - path = %path.display(), - error = %e, - "Failed to backup corrupted evaluation history" - ); - } - Vec::new() } + Ok(Vec::new()) } - } else { - Vec::new() - }; + } +} - let delta = entries.last().map(|prev| HistoryDelta { - precision: summary.precision - prev.precision, - precision_at_1: summary.precision_at_1 - prev.precision_at_1, - latency_avg_ms: summary.latency_ms.avg - prev.latency_ms.avg, - }); - - let entry = HistoryEntry { - generated_at: format_timestamp(&summary.generated_at), - run_label: summary.run_label.clone(), - dataset_id: summary.dataset_id.clone(), - dataset_label: summary.dataset_label.clone(), - slice_id: summary.slice_id.clone(), - slice_seed: summary.slice_seed, - slice_window_offset: summary.slice_window_offset, - slice_window_length: summary.slice_window_length, - slice_cases: summary.slice_cases, - slice_total_cases: summary.slice_total_cases, - k: summary.k, - limit: summary.limit, - precision: summary.precision, - precision_at_1: summary.precision_at_1, - precision_at_2: summary.precision_at_2, - precision_at_3: summary.precision_at_3, - mrr: summary.mrr, - average_ndcg: summary.average_ndcg, - retrieval_cases: summary.retrieval_cases, - retrieval_precision: summary.retrieval_precision, - llm_cases: summary.llm_cases, - llm_precision: summary.llm_precision, - duration_ms: summary.duration_ms, - latency_ms: summary.latency_ms.clone(), - embedding_backend: summary.embedding_backend.clone(), - embedding_model: summary.embedding_model.clone(), - ingestion_reused: summary.ingestion_reused, - ingestion_embeddings_reused: summary.ingestion_embeddings_reused, - rerank_enabled: summary.rerank_enabled, - rerank_keep_top: summary.rerank_keep_top, - rerank_pool_size: summary.rerank_pool_size, - delta, - openai_base_url: summary.perf.openai_base_url.clone(), - ingestion_ms: summary.perf.ingestion_ms, - namespace_seed_ms: summary.perf.namespace_seed_ms, - }; - - entries.push(entry); +fn record_history(report: &EvaluationReport, report_dir: &Path) -> Result { + let path = report_dir.join("evaluations.json"); + let mut entries = load_history(&path)?; + entries.push(report.clone()); let blob = serde_json::to_vec_pretty(&entries).context("serialising evaluation log")?; fs::write(&path, blob).with_context(|| format!("writing evaluation log {}", path.display()))?; - Ok(()) + Ok(path) } #[cfg(test)] @@ -822,6 +950,7 @@ mod tests { EvaluationStageTimings, PerformanceTimings, RetrievedSummary, StageLatencyBreakdown, }; use chrono::Utc; + use tempfile::tempdir; fn latency(ms: f64) -> LatencyStats { LatencyStats { @@ -961,9 +1090,13 @@ mod tests { concurrency: 2, detailed_report: true, retrieval_strategy: "initial".into(), + chunk_result_cap: 5, + ingest_chunk_min_tokens: 256, + ingest_chunk_max_tokens: 512, + ingest_chunk_overlap_tokens: 50, + ingest_chunks_only: false, chunk_vector_take: 50, chunk_fts_take: 50, - chunk_token_budget: 10_000, chunk_avg_chars_per_token: 4, max_chunks_per_entity: 4, cases, @@ -987,4 +1120,25 @@ mod tests { assert!(!md.contains("LLM Mode Metrics")); assert!(!md.contains("LLM-Only Cases")); } + + #[test] + fn evaluations_history_captures_strategy_and_concurrency() { + let tmp = tempdir().unwrap(); + let summary = sample_summary(false); + + let outcome = + write_reports(&summary, tmp.path(), 5).expect("writing consolidated reports"); + let contents = + std::fs::read_to_string(&outcome.history_path).expect("reading evaluations history"); + let entries: Vec = + serde_json::from_str(&contents).expect("parsing evaluations history"); + assert_eq!(entries.len(), 1); + let stored = &entries[0]; + assert_eq!(stored.retrieval.concurrency, summary.concurrency); + assert_eq!(stored.retrieval.strategy, summary.retrieval_strategy); + assert_eq!( + stored.performance.evaluation_stages_ms.run_queries_ms, + summary.perf.evaluation_stage_ms.run_queries_ms + ); + } } diff --git a/eval/src/snapshot.rs b/eval/src/snapshot.rs index e655939..bd41c35 100644 --- a/eval/src/snapshot.rs +++ b/eval/src/snapshot.rs @@ -16,8 +16,6 @@ pub struct SnapshotMetadata { pub embedding_backend: String, pub embedding_model: Option, pub embedding_dimension: usize, - pub chunk_min_chars: usize, - pub chunk_max_chars: usize, pub rerank_enabled: bool, } @@ -55,8 +53,6 @@ impl Descriptor { embedding_backend: embedding_provider.backend_label().to_string(), embedding_model: embedding_provider.model_code(), embedding_dimension: embedding_provider.dimension(), - chunk_min_chars: config.retrieval.chunk_min_chars, - chunk_max_chars: config.retrieval.chunk_max_chars, rerank_enabled: config.retrieval.rerank, }; @@ -146,8 +142,6 @@ mod tests { embedding_backend: "hashed".into(), embedding_model: None, embedding_dimension: 128, - chunk_min_chars: 10, - chunk_max_chars: 100, rerank_enabled: true, }; let descriptor = Descriptor::from_parts( diff --git a/ingestion-pipeline/Cargo.toml b/ingestion-pipeline/Cargo.toml index 1aea309..2a52040 100644 --- a/ingestion-pipeline/Cargo.toml +++ b/ingestion-pipeline/Cargo.toml @@ -30,10 +30,11 @@ base64 = { workspace = true } pdf-extract = "0.9" lopdf = "0.32" bytes = { workspace = true } - -common = { path = "../common" } -retrieval-pipeline = { path = "../retrieval-pipeline" } async-trait = { workspace = true } state-machines = { workspace = true } +tokenizers = { workspace = true } +common = { path = "../common" } +retrieval-pipeline = { path = "../retrieval-pipeline" } + [features] docker = [] diff --git a/ingestion-pipeline/src/pipeline/config.rs b/ingestion-pipeline/src/pipeline/config.rs index 222a078..b0a9df8 100644 --- a/ingestion-pipeline/src/pipeline/config.rs +++ b/ingestion-pipeline/src/pipeline/config.rs @@ -8,6 +8,7 @@ pub struct IngestionTuning { pub graph_max_backoff_ms: u64, pub chunk_min_tokens: usize, pub chunk_max_tokens: usize, + pub chunk_overlap_tokens: usize, pub chunk_insert_concurrency: usize, pub entity_embedding_concurrency: usize, } @@ -21,8 +22,9 @@ impl Default for IngestionTuning { graph_store_attempts: 3, graph_initial_backoff_ms: 50, graph_max_backoff_ms: 800, - chunk_min_tokens: 500, - chunk_max_tokens: 2_000, + chunk_min_tokens: 256, + chunk_max_tokens: 512, + chunk_overlap_tokens: 50, chunk_insert_concurrency: 8, entity_embedding_concurrency: 4, } diff --git a/ingestion-pipeline/src/pipeline/context.rs b/ingestion-pipeline/src/pipeline/context.rs index aa3a524..d9e8ebf 100644 --- a/ingestion-pipeline/src/pipeline/context.rs +++ b/ingestion-pipeline/src/pipeline/context.rs @@ -118,8 +118,12 @@ impl<'a> PipelineContext<'a> { .await?; let chunk_range = self.chunk_token_range(); + let chunk_overlap = self.chunk_overlap_tokens(); - let chunks = self.services.prepare_chunks(&content, chunk_range).await?; + let chunks = self + .services + .prepare_chunks(&content, chunk_range, chunk_overlap) + .await?; Ok(PipelineArtifacts { text_content: content, @@ -132,8 +136,12 @@ impl<'a> PipelineContext<'a> { pub async fn build_chunk_only_artifacts(&mut self) -> Result { let content = self.take_text_content()?; let chunk_range = self.chunk_token_range(); + let chunk_overlap = self.chunk_overlap_tokens(); - let chunks = self.services.prepare_chunks(&content, chunk_range).await?; + let chunks = self + .services + .prepare_chunks(&content, chunk_range, chunk_overlap) + .await?; Ok(PipelineArtifacts { text_content: content, @@ -146,4 +154,8 @@ impl<'a> PipelineContext<'a> { fn chunk_token_range(&self) -> Range { self.pipeline_config.tuning.chunk_min_tokens..self.pipeline_config.tuning.chunk_max_tokens } + + fn chunk_overlap_tokens(&self) -> usize { + self.pipeline_config.tuning.chunk_overlap_tokens + } } diff --git a/ingestion-pipeline/src/pipeline/services.rs b/ingestion-pipeline/src/pipeline/services.rs index 949d899..8d6ca97 100644 --- a/ingestion-pipeline/src/pipeline/services.rs +++ b/ingestion-pipeline/src/pipeline/services.rs @@ -1,4 +1,7 @@ -use std::{ops::Range, sync::Arc}; +use std::{ + ops::Range, + sync::{Arc, OnceLock}, +}; use anyhow::Context; use async_openai::types::{ @@ -21,6 +24,7 @@ use common::{ utils::{config::AppConfig, embedding::EmbeddingProvider}, }; use retrieval_pipeline::{reranking::RerankerPool, retrieved_entities_to_json, RetrievedEntity}; +use text_splitter::{ChunkCapacity, ChunkConfig, TextSplitter}; use super::{enrichment_result::LLMEnrichmentResult, preparation::to_text_content}; use crate::pipeline::context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk}; @@ -29,7 +33,6 @@ use crate::utils::llm_instructions::{ }; const EMBEDDING_QUERY_CHAR_LIMIT: usize = 12_000; - #[async_trait] pub trait PipelineServices: Send + Sync { async fn prepare_text_content( @@ -59,6 +62,7 @@ pub trait PipelineServices: Send + Sync { &self, content: &TextContent, token_range: Range, + overlap_tokens: usize, ) -> Result, AppError>; } @@ -238,9 +242,14 @@ impl PipelineServices for DefaultPipelineServices { &self, content: &TextContent, token_range: Range, + overlap_tokens: usize, ) -> Result, AppError> { - let chunk_candidates = - split_by_token_bounds(&content.text, token_range.start, token_range.end)?; + let chunk_candidates = prepare_chunks( + &content.text, + token_range.start, + token_range.end, + overlap_tokens, + )?; let mut chunks = Vec::with_capacity(chunk_candidates.len()); for chunk_text in chunk_candidates { @@ -249,8 +258,11 @@ impl PipelineServices for DefaultPipelineServices { .embed(&chunk_text) .await .context("generating FastEmbed embedding for chunk")?; - let chunk_struct = - TextChunk::new(content.get_id().to_string(), chunk_text, content.user_id.clone()); + let chunk_struct = TextChunk::new( + content.get_id().to_string(), + chunk_text, + content.user_id.clone(), + ); chunks.push(EmbeddedTextChunk { chunk: chunk_struct, embedding, @@ -260,10 +272,11 @@ impl PipelineServices for DefaultPipelineServices { } } -fn split_by_token_bounds( +fn prepare_chunks( text: &str, min_tokens: usize, max_tokens: usize, + overlap_tokens: usize, ) -> Result, AppError> { if min_tokens == 0 || max_tokens == 0 || min_tokens > max_tokens { return Err(AppError::Validation( @@ -271,34 +284,44 @@ fn split_by_token_bounds( )); } - let tokens: Vec<&str> = text.split_whitespace().collect(); - if tokens.is_empty() { - return Ok(vec![String::new()]); + if overlap_tokens >= min_tokens { + return Err(AppError::Validation(format!( + "chunk_min_tokens must be greater than the configured overlap of {overlap_tokens}" + ))); } - let mut chunks = Vec::new(); - let mut buffer: Vec<&str> = Vec::new(); - for (idx, token) in tokens.iter().enumerate() { - buffer.push(token); - let remaining = tokens.len().saturating_sub(idx + 1); - let at_max = buffer.len() >= max_tokens; - let at_min_and_boundary = - buffer.len() >= min_tokens && (remaining == 0 || buffer.len() + 1 > max_tokens); - if at_max || at_min_and_boundary { - let chunk_text = buffer.join(" "); - chunks.push(chunk_text); - buffer.clear(); - } - } + let tokenizer = get_tokenizer()?; - if !buffer.is_empty() { - let chunk_text = buffer.join(" "); - chunks.push(chunk_text); + let chunk_capacity = ChunkCapacity::new(min_tokens) + .with_max(max_tokens) + .map_err(|e| AppError::Validation(format!("invalid chunk token bounds: {e}")))?; + let chunk_config = ChunkConfig::new(chunk_capacity) + .with_overlap(overlap_tokens) + .map_err(|e| AppError::Validation(format!("invalid chunk overlap: {e}")))? + .with_sizer(tokenizer); + let splitter = TextSplitter::new(chunk_config); + + let mut chunks: Vec = splitter.chunks(text).map(str::to_owned).collect(); + + if chunks.is_empty() { + chunks.push(String::new()); } Ok(chunks) } +fn get_tokenizer() -> Result<&'static tokenizers::Tokenizer, AppError> { + static TOKENIZER: OnceLock> = OnceLock::new(); + + match TOKENIZER.get_or_init(|| { + tokenizers::Tokenizer::from_pretrained("bert-base-cased", None) + .map_err(|e| format!("failed to initialize tokenizer: {e}")) + }) { + Ok(tokenizer) => Ok(tokenizer), + Err(err) => Err(AppError::InternalError(err.clone())), + } +} + fn truncate_for_embedding(text: &str, max_chars: usize) -> String { if text.chars().count() <= max_chars { return text.to_string(); diff --git a/ingestion-pipeline/src/pipeline/tests.rs b/ingestion-pipeline/src/pipeline/tests.rs index 476924f..5f4bb56 100644 --- a/ingestion-pipeline/src/pipeline/tests.rs +++ b/ingestion-pipeline/src/pipeline/tests.rs @@ -155,6 +155,7 @@ impl PipelineServices for MockServices { &self, content: &TextContent, _range: std::ops::Range, + _overlap_tokens: usize, ) -> Result, AppError> { self.record("chunk").await; Ok(vec![EmbeddedTextChunk { @@ -213,8 +214,11 @@ impl PipelineServices for FailingServices { &self, content: &TextContent, token_range: std::ops::Range, + overlap_tokens: usize, ) -> Result, AppError> { - self.inner.prepare_chunks(content, token_range).await + self.inner + .prepare_chunks(content, token_range, overlap_tokens) + .await } } @@ -255,6 +259,7 @@ impl PipelineServices for ValidationServices { &self, _content: &TextContent, _token_range: std::ops::Range, + _overlap_tokens: usize, ) -> Result, AppError> { unreachable!("prepare_chunks should not be called after validation failure") } diff --git a/retrieval-pipeline/src/pipeline/config.rs b/retrieval-pipeline/src/pipeline/config.rs index 40d2650..3f5e6d4 100644 --- a/retrieval-pipeline/src/pipeline/config.rs +++ b/retrieval-pipeline/src/pipeline/config.rs @@ -63,6 +63,7 @@ pub struct RetrievalTuning { pub rerank_blend_weight: f32, pub rerank_scores_only: bool, pub rerank_keep_top: usize, + pub chunk_result_cap: usize, } impl Default for RetrievalTuning { @@ -86,6 +87,7 @@ impl Default for RetrievalTuning { rerank_blend_weight: 0.65, rerank_scores_only: false, rerank_keep_top: 8, + chunk_result_cap: 5, } } } diff --git a/retrieval-pipeline/src/pipeline/stages/mod.rs b/retrieval-pipeline/src/pipeline/stages/mod.rs index f7f9fed..6f19c04 100644 --- a/retrieval-pipeline/src/pipeline/stages/mod.rs +++ b/retrieval-pipeline/src/pipeline/stages/mod.rs @@ -675,7 +675,13 @@ pub fn assemble_chunks(ctx: &mut PipelineContext<'_>) -> Result<(), AppError> { ctx.config.tuning.lexical_match_weight, ); - let limit = ctx.config.tuning.chunk_vector_take.max(1); + // Limit how many chunks we return to keep context size reasonable. + let limit = ctx + .config + .tuning + .chunk_result_cap + .max(1) + .min(ctx.config.tuning.chunk_vector_take.max(1)); if chunk_values.len() > limit { chunk_values.truncate(limit); } diff --git a/retrieval-pipeline/src/reranking/mod.rs b/retrieval-pipeline/src/reranking/mod.rs index c7ae22b..c919631 100644 --- a/retrieval-pipeline/src/reranking/mod.rs +++ b/retrieval-pipeline/src/reranking/mod.rs @@ -29,7 +29,10 @@ impl RerankerPool { /// Build the pool at startup. /// `pool_size` controls max parallel reranks. pub fn new(pool_size: usize) -> Result, AppError> { - Self::new_with_options(pool_size, RerankInitOptions::default()) + Self::new_with_options( + pool_size, + RerankInitOptions::new(fastembed::RerankerModel::JINARerankerV1TurboEn), + ) } fn new_with_options(