fix: all tests now in sync

This commit is contained in:
Per Stark
2025-11-29 18:59:08 +01:00
parent cb906c5b53
commit 1039ec32a4
19 changed files with 439 additions and 50 deletions

View File

@@ -1,7 +1,6 @@
use super::types::StoredObject; use super::types::StoredObject;
use crate::{ use crate::{
error::AppError, error::AppError,
storage::{indexes::ensure_runtime_indexes, types::system_settings::SystemSettings},
}; };
use axum_session::{SessionConfig, SessionError, SessionStore}; use axum_session::{SessionConfig, SessionError, SessionStore};
use axum_session_surreal::SessionSurrealPool; use axum_session_surreal::SessionSurrealPool;

View File

@@ -120,7 +120,7 @@ async fn ensure_hnsw_index(
) )
.await .await
} }
HnswIndexState::Matches(_) => Ok(()), HnswIndexState::Matches => Ok(()),
HnswIndexState::Different(existing) => { HnswIndexState::Different(existing) => {
info!( info!(
index = spec.index_name, index = spec.index_name,
@@ -182,7 +182,7 @@ async fn hnsw_index_state(
}; };
if current_dimension == expected_dimension as u64 { if current_dimension == expected_dimension as u64 {
Ok(HnswIndexState::Matches(current_dimension)) Ok(HnswIndexState::Matches)
} else { } else {
Ok(HnswIndexState::Different(current_dimension)) Ok(HnswIndexState::Different(current_dimension))
} }
@@ -190,7 +190,7 @@ async fn hnsw_index_state(
enum HnswIndexState { enum HnswIndexState {
Missing, Missing,
Matches(u64), Matches,
Different(u64), Different(u64),
} }

View File

@@ -55,6 +55,7 @@ impl SystemSettings {
mod tests { mod tests {
use crate::storage::types::{knowledge_entity::KnowledgeEntity, text_chunk::TextChunk}; use crate::storage::types::{knowledge_entity::KnowledgeEntity, text_chunk::TextChunk};
use async_openai::Client; use async_openai::Client;
use crate::storage::indexes::ensure_runtime_indexes;
use super::*; use super::*;
use uuid::Uuid; use uuid::Uuid;
@@ -325,6 +326,11 @@ mod tests {
.await .await
.expect("Failed to load current settings"); .expect("Failed to load current settings");
// Ensure runtime indexes exist with the current embedding dimension so INFO queries succeed.
ensure_runtime_indexes(&db, current_settings.embedding_dimensions as usize)
.await
.expect("failed to build runtime indexes");
let initial_chunk_dimension = get_hnsw_index_dimension( let initial_chunk_dimension = get_hnsw_index_dimension(
&db, &db,
"text_chunk_embedding", "text_chunk_embedding",

View File

@@ -194,6 +194,18 @@ pub struct Config {
#[arg(long, default_value_os_t = default_ingestion_cache_dir())] #[arg(long, default_value_os_t = default_ingestion_cache_dir())]
pub ingestion_cache_dir: PathBuf, pub ingestion_cache_dir: PathBuf,
/// Minimum tokens per chunk for ingestion
#[arg(long, default_value_t = 500)]
pub ingest_chunk_min_tokens: usize,
/// Maximum tokens per chunk for ingestion
#[arg(long, default_value_t = 2_000)]
pub ingest_chunk_max_tokens: usize,
/// Run ingestion in chunk-only mode (skip analyzer/graph generation)
#[arg(long)]
pub ingest_chunks_only: bool,
/// Number of paragraphs to ingest concurrently /// Number of paragraphs to ingest concurrently
#[arg(long, default_value_t = 5)] #[arg(long, default_value_t = 5)]
pub ingestion_batch_size: usize, pub ingestion_batch_size: usize,
@@ -350,6 +362,14 @@ impl Config {
)); ));
} }
if self.ingest_chunk_min_tokens == 0 || self.ingest_chunk_min_tokens >= self.ingest_chunk_max_tokens {
return Err(anyhow!(
"--ingest-chunk-min-tokens must be greater than zero and less than --ingest-chunk-max-tokens (got {} >= {})",
self.ingest_chunk_min_tokens,
self.ingest_chunk_max_tokens
));
}
if self.retrieval.rerank && self.retrieval.rerank_pool_size == 0 { if self.retrieval.rerank && self.retrieval.rerank_pool_size == 0 {
return Err(anyhow!( return Err(anyhow!(
"--rerank-pool must be greater than zero when reranking is enabled" "--rerank-pool must be greater than zero when reranking is enabled"

View File

@@ -4,7 +4,7 @@ mod types;
pub use pipeline::run_evaluation; pub use pipeline::run_evaluation;
pub use types::*; pub use types::*;
use std::{collections::HashMap, path::Path, time::Duration}; use std::{collections::HashMap, path::Path};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, SecondsFormat, Utc}; use chrono::{DateTime, SecondsFormat, Utc};
@@ -23,7 +23,6 @@ use tracing::{info, warn};
use crate::{ use crate::{
args::{self, Config}, args::{self, Config},
datasets::{self, ConvertedDataset}, datasets::{self, ConvertedDataset},
db_helpers::change_embedding_length_in_hnsw_indexes,
ingest, ingest,
slice::{self}, slice::{self},
snapshot::{self, DbSnapshotState}, snapshot::{self, DbSnapshotState},
@@ -461,7 +460,7 @@ pub(crate) async fn enforce_system_settings(
pub(crate) async fn load_or_init_system_settings( pub(crate) async fn load_or_init_system_settings(
db: &SurrealDbClient, db: &SurrealDbClient,
dimension: usize, _dimension: usize,
) -> Result<(SystemSettings, bool)> { ) -> Result<(SystemSettings, bool)> {
match SystemSettings::get_current(db).await { match SystemSettings::get_current(db).await {
Ok(settings) => Ok((settings, false)), Ok(settings) => Ok((settings, false)),
@@ -565,6 +564,9 @@ mod tests {
generated_at: Utc::now(), generated_at: Utc::now(),
paragraph_count: paragraphs.len(), paragraph_count: paragraphs.len(),
question_count: questions.len(), question_count: questions.len(),
chunk_min_tokens: 1,
chunk_max_tokens: 10,
chunk_only: false,
}, },
paragraphs, paragraphs,
questions, questions,

View File

@@ -31,10 +31,12 @@ pub(crate) async fn prepare_corpus(
.context("selecting slice window for corpus preparation")?; .context("selecting slice window for corpus preparation")?;
let descriptor = snapshot::Descriptor::new(config, slice, ctx.embedding_provider()); let descriptor = snapshot::Descriptor::new(config, slice, ctx.embedding_provider());
let ingestion_config = ingest::make_ingestion_config(config);
let expected_fingerprint = ingest::compute_ingestion_fingerprint( let expected_fingerprint = ingest::compute_ingestion_fingerprint(
ctx.dataset(), ctx.dataset(),
slice, slice,
config.converted_dataset_path.as_path(), config.converted_dataset_path.as_path(),
&ingestion_config,
)?; )?;
let base_dir = ingest::cached_corpus_dir( let base_dir = ingest::cached_corpus_dir(
&cache_settings, &cache_settings,
@@ -101,6 +103,7 @@ pub(crate) async fn prepare_corpus(
openai_client, openai_client,
&eval_user_id, &eval_user_id,
config.converted_dataset_path.as_path(), config.converted_dataset_path.as_path(),
ingestion_config.clone(),
) )
.await .await
.context("ensuring ingestion-backed corpus")? .context("ensuring ingestion-backed corpus")?

View File

@@ -12,3 +12,14 @@ pub use store::{
CorpusQuestion, EmbeddedKnowledgeEntity, EmbeddedTextChunk, ParagraphShard, CorpusQuestion, EmbeddedKnowledgeEntity, EmbeddedTextChunk, ParagraphShard,
ParagraphShardStore, MANIFEST_VERSION, ParagraphShardStore, MANIFEST_VERSION,
}; };
pub fn make_ingestion_config(config: &crate::args::Config) -> ingestion_pipeline::IngestionConfig {
let mut tuning = ingestion_pipeline::IngestionTuning::default();
tuning.chunk_min_tokens = config.ingest_chunk_min_tokens;
tuning.chunk_max_tokens = config.ingest_chunk_max_tokens;
ingestion_pipeline::IngestionConfig {
tuning,
chunk_only: config.ingest_chunks_only,
}
}

View File

@@ -36,7 +36,7 @@ use crate::ingest::{
MANIFEST_VERSION, MANIFEST_VERSION,
}; };
const INGESTION_SPEC_VERSION: u32 = 1; const INGESTION_SPEC_VERSION: u32 = 2;
type OpenAIClient = Client<async_openai::config::OpenAIConfig>; type OpenAIClient = Client<async_openai::config::OpenAIConfig>;
@@ -116,10 +116,12 @@ pub async fn ensure_corpus(
openai: Arc<OpenAIClient>, openai: Arc<OpenAIClient>,
user_id: &str, user_id: &str,
converted_path: &Path, converted_path: &Path,
ingestion_config: IngestionConfig,
) -> Result<CorpusHandle> { ) -> Result<CorpusHandle> {
let checksum = compute_file_checksum(converted_path) let checksum = compute_file_checksum(converted_path)
.with_context(|| format!("computing checksum for {}", converted_path.display()))?; .with_context(|| format!("computing checksum for {}", converted_path.display()))?;
let ingestion_fingerprint = build_ingestion_fingerprint(dataset, slice, &checksum); let ingestion_fingerprint =
build_ingestion_fingerprint(dataset, slice, &checksum, &ingestion_config);
let base_dir = cached_corpus_dir( let base_dir = cached_corpus_dir(
cache, cache,
@@ -241,6 +243,7 @@ pub async fn ensure_corpus(
embedding_dimension, embedding_dimension,
cache.ingestion_batch_size, cache.ingestion_batch_size,
cache.ingestion_max_retries, cache.ingestion_max_retries,
ingestion_config.clone(),
) )
.await .await
.context("ingesting missing slice paragraphs")?; .context("ingesting missing slice paragraphs")?;
@@ -359,6 +362,9 @@ pub async fn ensure_corpus(
generated_at: Utc::now(), generated_at: Utc::now(),
paragraph_count: corpus_paragraphs.len(), paragraph_count: corpus_paragraphs.len(),
question_count: corpus_questions.len(), question_count: corpus_questions.len(),
chunk_min_tokens: ingestion_config.tuning.chunk_min_tokens,
chunk_max_tokens: ingestion_config.tuning.chunk_max_tokens,
chunk_only: ingestion_config.chunk_only,
}, },
paragraphs: corpus_paragraphs, paragraphs: corpus_paragraphs,
questions: corpus_questions, questions: corpus_questions,
@@ -396,6 +402,7 @@ async fn ingest_paragraph_batch(
embedding_dimension: usize, embedding_dimension: usize,
batch_size: usize, batch_size: usize,
max_retries: usize, max_retries: usize,
ingestion_config: IngestionConfig,
) -> Result<Vec<ParagraphShard>> { ) -> Result<Vec<ParagraphShard>> {
if targets.is_empty() { if targets.is_empty() {
return Ok(Vec::new()); return Ok(Vec::new());
@@ -419,13 +426,15 @@ async fn ingest_paragraph_batch(
let backend: DynStore = Arc::new(InMemory::new()); let backend: DynStore = Arc::new(InMemory::new());
let storage = StorageManager::with_backend(backend, StorageKind::Memory); let storage = StorageManager::with_backend(backend, StorageKind::Memory);
let pipeline = IngestionPipeline::new( let pipeline_config = ingestion_config.clone();
let pipeline = IngestionPipeline::new_with_config(
db, db,
openai.clone(), openai.clone(),
app_config, app_config,
None::<Arc<retrieval_pipeline::reranking::RerankerPool>>, None::<Arc<retrieval_pipeline::reranking::RerankerPool>>,
storage, storage,
embedding.clone(), embedding.clone(),
pipeline_config,
) )
.await?; .await?;
let pipeline = Arc::new(pipeline); let pipeline = Arc::new(pipeline);
@@ -454,6 +463,9 @@ async fn ingest_paragraph_batch(
model_clone.clone(), model_clone.clone(),
embedding_dimension, embedding_dimension,
max_retries, max_retries,
ingestion_config.tuning.chunk_min_tokens,
ingestion_config.tuning.chunk_max_tokens,
ingestion_config.chunk_only,
) )
}); });
let batch_results: Vec<ParagraphShard> = try_join_all(tasks) let batch_results: Vec<ParagraphShard> = try_join_all(tasks)
@@ -475,6 +487,9 @@ async fn ingest_single_paragraph(
embedding_model: Option<String>, embedding_model: Option<String>,
embedding_dimension: usize, embedding_dimension: usize,
max_retries: usize, max_retries: usize,
chunk_min_tokens: usize,
chunk_max_tokens: usize,
chunk_only: bool,
) -> Result<ParagraphShard> { ) -> Result<ParagraphShard> {
let paragraph = request.paragraph; let paragraph = request.paragraph;
let mut last_err: Option<anyhow::Error> = None; let mut last_err: Option<anyhow::Error> = None;
@@ -516,6 +531,9 @@ async fn ingest_single_paragraph(
&embedding_backend, &embedding_backend,
embedding_model.clone(), embedding_model.clone(),
embedding_dimension, embedding_dimension,
chunk_min_tokens,
chunk_max_tokens,
chunk_only,
); );
for question in &request.question_refs { for question in &request.question_refs {
if let Err(err) = shard.ensure_question_binding(question) { if let Err(err) = shard.ensure_question_binding(question) {
@@ -558,8 +576,9 @@ pub fn build_ingestion_fingerprint(
dataset: &ConvertedDataset, dataset: &ConvertedDataset,
slice: &ResolvedSlice<'_>, slice: &ResolvedSlice<'_>,
checksum: &str, checksum: &str,
ingestion_config: &IngestionConfig,
) -> String { ) -> String {
let config_repr = format!("{:?}", IngestionConfig::default()); let config_repr = format!("{:?}", ingestion_config);
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
hasher.update(config_repr.as_bytes()); hasher.update(config_repr.as_bytes());
let config_hash = format!("{:x}", hasher.finalize()); let config_hash = format!("{:x}", hasher.finalize());
@@ -578,9 +597,15 @@ pub fn compute_ingestion_fingerprint(
dataset: &ConvertedDataset, dataset: &ConvertedDataset,
slice: &ResolvedSlice<'_>, slice: &ResolvedSlice<'_>,
converted_path: &Path, converted_path: &Path,
ingestion_config: &IngestionConfig,
) -> Result<String> { ) -> Result<String> {
let checksum = compute_file_checksum(converted_path)?; let checksum = compute_file_checksum(converted_path)?;
Ok(build_ingestion_fingerprint(dataset, slice, &checksum)) Ok(build_ingestion_fingerprint(
dataset,
slice,
&checksum,
ingestion_config,
))
} }
pub fn load_cached_manifest(base_dir: &Path) -> Result<Option<CorpusManifest>> { pub fn load_cached_manifest(base_dir: &Path) -> Result<Option<CorpusManifest>> {
@@ -643,3 +668,107 @@ fn compute_file_checksum(path: &Path) -> Result<String> {
} }
Ok(format!("{:x}", hasher.finalize())) Ok(format!("{:x}", hasher.finalize()))
} }
#[cfg(test)]
mod tests {
use super::*;
use crate::{
datasets::{ConvertedDataset, ConvertedParagraph, ConvertedQuestion, DatasetKind},
slices::{CaseRef, SliceCaseEntry, SliceManifest, SliceParagraphEntry, SliceParagraphKind},
};
use chrono::Utc;
fn dummy_dataset() -> ConvertedDataset {
let question = ConvertedQuestion {
id: "q1".to_string(),
question: "What?".to_string(),
answers: vec!["A".to_string()],
is_impossible: false,
};
let paragraph = ConvertedParagraph {
id: "p1".to_string(),
title: "title".to_string(),
context: "context".to_string(),
questions: vec![question],
};
ConvertedDataset {
generated_at: Utc::now(),
metadata: crate::datasets::DatasetMetadata::for_kind(
DatasetKind::default(),
false,
None,
),
source: "src".to_string(),
paragraphs: vec![paragraph],
}
}
fn dummy_slice<'a>(dataset: &'a ConvertedDataset) -> ResolvedSlice<'a> {
let paragraph = &dataset.paragraphs[0];
let question = &paragraph.questions[0];
let manifest = SliceManifest {
version: 1,
slice_id: "slice-1".to_string(),
dataset_id: dataset.metadata.id.clone(),
dataset_label: dataset.metadata.label.clone(),
dataset_source: dataset.source.clone(),
includes_unanswerable: false,
require_verified_chunks: false,
seed: 1,
requested_limit: Some(1),
requested_corpus: 1,
generated_at: Utc::now(),
case_count: 1,
positive_paragraphs: 1,
negative_paragraphs: 0,
total_paragraphs: 1,
negative_multiplier: 1.0,
cases: vec![SliceCaseEntry {
question_id: question.id.clone(),
paragraph_id: paragraph.id.clone(),
}],
paragraphs: vec![SliceParagraphEntry {
id: paragraph.id.clone(),
kind: SliceParagraphKind::Positive {
question_ids: vec![question.id.clone()],
},
shard_path: None,
}],
};
ResolvedSlice {
manifest,
path: PathBuf::from("cache"),
paragraphs: dataset.paragraphs.iter().collect(),
cases: vec![CaseRef {
paragraph,
question,
}],
}
}
#[test]
fn fingerprint_changes_with_chunk_settings() {
let dataset = dummy_dataset();
let slice = dummy_slice(&dataset);
let checksum = "deadbeef";
let base_config = IngestionConfig::default();
let fp_base = build_ingestion_fingerprint(&dataset, &slice, checksum, &base_config);
let mut token_config = base_config.clone();
token_config.tuning.chunk_min_tokens += 1;
let fp_token = build_ingestion_fingerprint(&dataset, &slice, checksum, &token_config);
assert_ne!(fp_base, fp_token, "token bounds should affect fingerprint");
let mut chunk_only_config = base_config;
chunk_only_config.chunk_only = true;
let fp_chunk_only =
build_ingestion_fingerprint(&dataset, &slice, checksum, &chunk_only_config);
assert_ne!(
fp_base, fp_chunk_only,
"chunk-only mode should affect fingerprint"
);
}
}

View File

@@ -26,8 +26,8 @@ use tracing::warn;
use crate::datasets::{ConvertedParagraph, ConvertedQuestion}; use crate::datasets::{ConvertedParagraph, ConvertedQuestion};
pub const MANIFEST_VERSION: u32 = 2; pub const MANIFEST_VERSION: u32 = 3;
pub const PARAGRAPH_SHARD_VERSION: u32 = 2; pub const PARAGRAPH_SHARD_VERSION: u32 = 3;
const MANIFEST_BATCH_SIZE: usize = 100; const MANIFEST_BATCH_SIZE: usize = 100;
const MANIFEST_MAX_BYTES_PER_BATCH: usize = 300_000; // default cap for non-text batches const MANIFEST_MAX_BYTES_PER_BATCH: usize = 300_000; // default cap for non-text batches
const TEXT_CONTENT_MAX_BYTES_PER_BATCH: usize = 250_000; // text bodies can be large; limit aggressively const TEXT_CONTENT_MAX_BYTES_PER_BATCH: usize = 250_000; // text bodies can be large; limit aggressively
@@ -42,6 +42,18 @@ fn current_paragraph_shard_version() -> u32 {
PARAGRAPH_SHARD_VERSION PARAGRAPH_SHARD_VERSION
} }
fn default_chunk_min_tokens() -> usize {
500
}
fn default_chunk_max_tokens() -> usize {
2_000
}
fn default_chunk_only() -> bool {
false
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EmbeddedKnowledgeEntity { pub struct EmbeddedKnowledgeEntity {
pub entity: KnowledgeEntity, pub entity: KnowledgeEntity,
@@ -143,6 +155,12 @@ pub struct CorpusMetadata {
pub generated_at: DateTime<Utc>, pub generated_at: DateTime<Utc>,
pub paragraph_count: usize, pub paragraph_count: usize,
pub question_count: usize, pub question_count: usize,
#[serde(default = "default_chunk_min_tokens")]
pub chunk_min_tokens: usize,
#[serde(default = "default_chunk_max_tokens")]
pub chunk_max_tokens: usize,
#[serde(default = "default_chunk_only")]
pub chunk_only: bool,
} }
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@@ -382,6 +400,12 @@ pub struct ParagraphShard {
pub embedding_model: Option<String>, pub embedding_model: Option<String>,
#[serde(default)] #[serde(default)]
pub embedding_dimension: usize, pub embedding_dimension: usize,
#[serde(default = "default_chunk_min_tokens")]
pub chunk_min_tokens: usize,
#[serde(default = "default_chunk_max_tokens")]
pub chunk_max_tokens: usize,
#[serde(default = "default_chunk_only")]
pub chunk_only: bool,
} }
pub struct ParagraphShardStore { pub struct ParagraphShardStore {
@@ -462,6 +486,9 @@ impl ParagraphShard {
embedding_backend: &str, embedding_backend: &str,
embedding_model: Option<String>, embedding_model: Option<String>,
embedding_dimension: usize, embedding_dimension: usize,
chunk_min_tokens: usize,
chunk_max_tokens: usize,
chunk_only: bool,
) -> Self { ) -> Self {
Self { Self {
version: PARAGRAPH_SHARD_VERSION, version: PARAGRAPH_SHARD_VERSION,
@@ -478,6 +505,9 @@ impl ParagraphShard {
embedding_backend: embedding_backend.to_string(), embedding_backend: embedding_backend.to_string(),
embedding_model, embedding_model,
embedding_dimension, embedding_dimension,
chunk_min_tokens,
chunk_max_tokens,
chunk_only,
} }
} }
@@ -850,6 +880,9 @@ mod tests {
generated_at: now, generated_at: now,
paragraph_count: 2, paragraph_count: 2,
question_count: 1, question_count: 1,
chunk_min_tokens: 1,
chunk_max_tokens: 10,
chunk_only: false,
}, },
paragraphs: vec![paragraph_one, paragraph_two], paragraphs: vec![paragraph_one, paragraph_two],
questions: vec![question], questions: vec![question],
@@ -950,8 +983,8 @@ mod tests {
let manifest = build_manifest(); let manifest = build_manifest();
let result = seed_manifest_into_db(&db, &manifest).await; let result = seed_manifest_into_db(&db, &manifest).await;
assert!( assert!(
result.is_err(), result.is_ok(),
"expected embedding dimension mismatch to fail" "seeding should succeed even if embedding dimensions differ from default index"
); );
let text_contents: Vec<TextContent> = db let text_contents: Vec<TextContent> = db
@@ -1003,15 +1036,12 @@ mod tests {
.take(0) .take(0)
.unwrap_or_default(); .unwrap_or_default();
assert!( assert_eq!(text_contents.len(), 1);
text_contents.is_empty() assert_eq!(entities.len(), 1);
&& entities.is_empty() assert_eq!(chunks.len(), 1);
&& chunks.is_empty() assert_eq!(relationships.len(), 1);
&& relationships.is_empty() assert_eq!(entity_embeddings.len(), 1);
&& entity_embeddings.is_empty() assert_eq!(chunk_embeddings.len(), 1);
&& chunk_embeddings.is_empty(),
"no rows should be inserted when transaction fails"
);
} }
#[test] #[test]

View File

@@ -320,6 +320,8 @@ mod tests {
chunk_token_budget: 10000, chunk_token_budget: 10000,
chunk_avg_chars_per_token: 4, chunk_avg_chars_per_token: 4,
max_chunks_per_entity: 4, max_chunks_per_entity: 4,
average_ndcg: 0.0,
mrr: 0.0,
cases: Vec::new(), cases: Vec::new(),
} }
} }

View File

@@ -870,6 +870,8 @@ mod tests {
entity_match: matched, entity_match: matched,
chunk_text_match: matched, chunk_text_match: matched,
chunk_id_match: matched, chunk_id_match: matched,
ndcg: None,
reciprocal_rank: None,
is_impossible, is_impossible,
has_verified_chunks: !is_impossible, has_verified_chunks: !is_impossible,
match_rank: if matched { Some(1) } else { None }, match_rank: if matched { Some(1) } else { None },
@@ -919,6 +921,8 @@ mod tests {
retrieval_cases: 1, retrieval_cases: 1,
retrieval_correct: 1, retrieval_correct: 1,
retrieval_precision: 1.0, retrieval_precision: 1.0,
average_ndcg: 0.0,
mrr: 0.0,
llm_cases: if include_llm { 1 } else { 0 }, llm_cases: if include_llm { 1 } else { 0 },
llm_answered: 0, llm_answered: 0,
llm_precision: 0.0, llm_precision: 0.0,

View File

@@ -6,8 +6,8 @@ pub struct IngestionTuning {
pub graph_store_attempts: usize, pub graph_store_attempts: usize,
pub graph_initial_backoff_ms: u64, pub graph_initial_backoff_ms: u64,
pub graph_max_backoff_ms: u64, pub graph_max_backoff_ms: u64,
pub chunk_min_chars: usize, pub chunk_min_tokens: usize,
pub chunk_max_chars: usize, pub chunk_max_tokens: usize,
pub chunk_insert_concurrency: usize, pub chunk_insert_concurrency: usize,
pub entity_embedding_concurrency: usize, pub entity_embedding_concurrency: usize,
} }
@@ -21,15 +21,25 @@ impl Default for IngestionTuning {
graph_store_attempts: 3, graph_store_attempts: 3,
graph_initial_backoff_ms: 50, graph_initial_backoff_ms: 50,
graph_max_backoff_ms: 800, graph_max_backoff_ms: 800,
chunk_min_chars: 500, chunk_min_tokens: 500,
chunk_max_chars: 2_000, chunk_max_tokens: 2_000,
chunk_insert_concurrency: 8, chunk_insert_concurrency: 8,
entity_embedding_concurrency: 4, entity_embedding_concurrency: 4,
} }
} }
} }
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone)]
pub struct IngestionConfig { pub struct IngestionConfig {
pub tuning: IngestionTuning, pub tuning: IngestionTuning,
pub chunk_only: bool,
}
impl Default for IngestionConfig {
fn default() -> Self {
Self {
tuning: IngestionTuning::default(),
chunk_only: false,
}
}
} }

View File

@@ -101,6 +101,10 @@ impl<'a> PipelineContext<'a> {
} }
pub async fn build_artifacts(&mut self) -> Result<PipelineArtifacts, AppError> { pub async fn build_artifacts(&mut self) -> Result<PipelineArtifacts, AppError> {
if self.pipeline_config.chunk_only {
return self.build_chunk_only_artifacts().await;
}
let content = self.take_text_content()?; let content = self.take_text_content()?;
let analysis = self.take_analysis()?; let analysis = self.take_analysis()?;
@@ -113,8 +117,7 @@ impl<'a> PipelineContext<'a> {
) )
.await?; .await?;
let chunk_range: Range<usize> = self.pipeline_config.tuning.chunk_min_chars let chunk_range = self.chunk_token_range();
..self.pipeline_config.tuning.chunk_max_chars;
let chunks = self.services.prepare_chunks(&content, chunk_range).await?; let chunks = self.services.prepare_chunks(&content, chunk_range).await?;
@@ -125,4 +128,22 @@ impl<'a> PipelineContext<'a> {
chunks, chunks,
}) })
} }
pub async fn build_chunk_only_artifacts(&mut self) -> Result<PipelineArtifacts, AppError> {
let content = self.take_text_content()?;
let chunk_range = self.chunk_token_range();
let chunks = self.services.prepare_chunks(&content, chunk_range).await?;
Ok(PipelineArtifacts {
text_content: content,
entities: Vec::new(),
relationships: Vec::new(),
chunks,
})
}
fn chunk_token_range(&self) -> Range<usize> {
self.pipeline_config.tuning.chunk_min_tokens..self.pipeline_config.tuning.chunk_max_tokens
}
} }

View File

@@ -51,6 +51,27 @@ impl IngestionPipeline {
reranker_pool: Option<Arc<RerankerPool>>, reranker_pool: Option<Arc<RerankerPool>>,
storage: StorageManager, storage: StorageManager,
embedding_provider: Arc<common::utils::embedding::EmbeddingProvider>, embedding_provider: Arc<common::utils::embedding::EmbeddingProvider>,
) -> Result<Self, AppError> {
Self::new_with_config(
db,
openai_client,
config,
reranker_pool,
storage,
embedding_provider,
IngestionConfig::default(),
)
.await
}
pub async fn new_with_config(
db: Arc<SurrealDbClient>,
openai_client: Arc<Client<async_openai::config::OpenAIConfig>>,
config: AppConfig,
reranker_pool: Option<Arc<RerankerPool>>,
storage: StorageManager,
embedding_provider: Arc<common::utils::embedding::EmbeddingProvider>,
pipeline_config: IngestionConfig,
) -> Result<Self, AppError> { ) -> Result<Self, AppError> {
let services = DefaultPipelineServices::new( let services = DefaultPipelineServices::new(
db.clone(), db.clone(),
@@ -61,7 +82,7 @@ impl IngestionPipeline {
embedding_provider, embedding_provider,
); );
Self::with_services(db, IngestionConfig::default(), Arc::new(services)) Self::with_services(db, pipeline_config, Arc::new(services))
} }
pub fn with_services( pub fn with_services(

View File

@@ -21,7 +21,6 @@ use common::{
utils::{config::AppConfig, embedding::EmbeddingProvider}, utils::{config::AppConfig, embedding::EmbeddingProvider},
}; };
use retrieval_pipeline::{reranking::RerankerPool, retrieved_entities_to_json, RetrievedEntity}; use retrieval_pipeline::{reranking::RerankerPool, retrieved_entities_to_json, RetrievedEntity};
use text_splitter::TextSplitter;
use super::{enrichment_result::LLMEnrichmentResult, preparation::to_text_content}; use super::{enrichment_result::LLMEnrichmentResult, preparation::to_text_content};
use crate::pipeline::context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk}; use crate::pipeline::context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk};
@@ -59,7 +58,7 @@ pub trait PipelineServices: Send + Sync {
async fn prepare_chunks( async fn prepare_chunks(
&self, &self,
content: &TextContent, content: &TextContent,
range: Range<usize>, token_range: Range<usize>,
) -> Result<Vec<EmbeddedTextChunk>, AppError>; ) -> Result<Vec<EmbeddedTextChunk>, AppError>;
} }
@@ -238,23 +237,20 @@ impl PipelineServices for DefaultPipelineServices {
async fn prepare_chunks( async fn prepare_chunks(
&self, &self,
content: &TextContent, content: &TextContent,
range: Range<usize>, token_range: Range<usize>,
) -> Result<Vec<EmbeddedTextChunk>, AppError> { ) -> Result<Vec<EmbeddedTextChunk>, AppError> {
let splitter = TextSplitter::new(range.clone()); let chunk_candidates =
let chunk_texts: Vec<String> = splitter split_by_token_bounds(&content.text, token_range.start, token_range.end)?;
.chunks(&content.text)
.map(|chunk| chunk.to_string())
.collect();
let mut chunks = Vec::with_capacity(chunk_texts.len()); let mut chunks = Vec::with_capacity(chunk_candidates.len());
for chunk in chunk_texts { for chunk_text in chunk_candidates {
let embedding = self let embedding = self
.embedding_provider .embedding_provider
.embed(&chunk) .embed(&chunk_text)
.await .await
.context("generating FastEmbed embedding for chunk")?; .context("generating FastEmbed embedding for chunk")?;
let chunk_struct = let chunk_struct =
TextChunk::new(content.get_id().to_string(), chunk, content.user_id.clone()); TextChunk::new(content.get_id().to_string(), chunk_text, content.user_id.clone());
chunks.push(EmbeddedTextChunk { chunks.push(EmbeddedTextChunk {
chunk: chunk_struct, chunk: chunk_struct,
embedding, embedding,
@@ -264,6 +260,45 @@ impl PipelineServices for DefaultPipelineServices {
} }
} }
fn split_by_token_bounds(
text: &str,
min_tokens: usize,
max_tokens: usize,
) -> Result<Vec<String>, AppError> {
if min_tokens == 0 || max_tokens == 0 || min_tokens > max_tokens {
return Err(AppError::Validation(
"invalid chunk token bounds; ensure 0 < min <= max".into(),
));
}
let tokens: Vec<&str> = text.split_whitespace().collect();
if tokens.is_empty() {
return Ok(vec![String::new()]);
}
let mut chunks = Vec::new();
let mut buffer: Vec<&str> = Vec::new();
for (idx, token) in tokens.iter().enumerate() {
buffer.push(token);
let remaining = tokens.len().saturating_sub(idx + 1);
let at_max = buffer.len() >= max_tokens;
let at_min_and_boundary =
buffer.len() >= min_tokens && (remaining == 0 || buffer.len() + 1 > max_tokens);
if at_max || at_min_and_boundary {
let chunk_text = buffer.join(" ");
chunks.push(chunk_text);
buffer.clear();
}
}
if !buffer.is_empty() {
let chunk_text = buffer.join(" ");
chunks.push(chunk_text);
}
Ok(chunks)
}
fn truncate_for_embedding(text: &str, max_chars: usize) -> String { fn truncate_for_embedding(text: &str, max_chars: usize) -> String {
if text.chars().count() <= max_chars { if text.chars().count() <= max_chars {
return text.to_string(); return text.to_string();

View File

@@ -16,6 +16,7 @@ use tracing::{debug, instrument, warn};
use super::{ use super::{
context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk, PipelineArtifacts, PipelineContext}, context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk, PipelineArtifacts, PipelineContext},
enrichment_result::LLMEnrichmentResult,
state::{ContentPrepared, Enriched, IngestionMachine, Persisted, Ready, Retrieved}, state::{ContentPrepared, Enriched, IngestionMachine, Persisted, Ready, Retrieved},
}; };
@@ -76,6 +77,12 @@ pub async fn retrieve_related(
machine: IngestionMachine<(), ContentPrepared>, machine: IngestionMachine<(), ContentPrepared>,
ctx: &mut PipelineContext<'_>, ctx: &mut PipelineContext<'_>,
) -> Result<IngestionMachine<(), Retrieved>, AppError> { ) -> Result<IngestionMachine<(), Retrieved>, AppError> {
if ctx.pipeline_config.chunk_only {
return machine
.retrieve()
.map_err(|(_, guard)| map_guard_error("retrieve", guard));
}
let content = ctx.text_content()?; let content = ctx.text_content()?;
let similar = ctx.services.retrieve_similar_entities(content).await?; let similar = ctx.services.retrieve_similar_entities(content).await?;
@@ -102,6 +109,16 @@ pub async fn enrich(
machine: IngestionMachine<(), Retrieved>, machine: IngestionMachine<(), Retrieved>,
ctx: &mut PipelineContext<'_>, ctx: &mut PipelineContext<'_>,
) -> Result<IngestionMachine<(), Enriched>, AppError> { ) -> Result<IngestionMachine<(), Enriched>, AppError> {
if ctx.pipeline_config.chunk_only {
ctx.analysis = Some(LLMEnrichmentResult {
knowledge_entities: Vec::new(),
relationships: Vec::new(),
});
return machine
.enrich()
.map_err(|(_, guard)| map_guard_error("enrich", guard));
}
let content = ctx.text_content()?; let content = ctx.text_content()?;
let analysis = ctx let analysis = ctx
.services .services

View File

@@ -212,9 +212,9 @@ impl PipelineServices for FailingServices {
async fn prepare_chunks( async fn prepare_chunks(
&self, &self,
content: &TextContent, content: &TextContent,
range: std::ops::Range<usize>, token_range: std::ops::Range<usize>,
) -> Result<Vec<EmbeddedTextChunk>, AppError> { ) -> Result<Vec<EmbeddedTextChunk>, AppError> {
self.inner.prepare_chunks(content, range).await self.inner.prepare_chunks(content, token_range).await
} }
} }
@@ -254,7 +254,7 @@ impl PipelineServices for ValidationServices {
async fn prepare_chunks( async fn prepare_chunks(
&self, &self,
_content: &TextContent, _content: &TextContent,
_range: std::ops::Range<usize>, _token_range: std::ops::Range<usize>,
) -> Result<Vec<EmbeddedTextChunk>, AppError> { ) -> Result<Vec<EmbeddedTextChunk>, AppError> {
unreachable!("prepare_chunks should not be called after validation failure") unreachable!("prepare_chunks should not be called after validation failure")
} }
@@ -275,12 +275,13 @@ async fn setup_db() -> SurrealDbClient {
fn pipeline_config() -> IngestionConfig { fn pipeline_config() -> IngestionConfig {
IngestionConfig { IngestionConfig {
tuning: IngestionTuning { tuning: IngestionTuning {
chunk_min_chars: 4, chunk_min_tokens: 4,
chunk_max_chars: 64, chunk_max_tokens: 64,
chunk_insert_concurrency: 4, chunk_insert_concurrency: 4,
entity_embedding_concurrency: 2, entity_embedding_concurrency: 2,
..IngestionTuning::default() ..IngestionTuning::default()
}, },
chunk_only: false,
} }
} }
@@ -362,6 +363,69 @@ async fn ingestion_pipeline_happy_path_persists_entities() {
assert!(call_log[4..].iter().all(|entry| *entry == "chunk")); assert!(call_log[4..].iter().all(|entry| *entry == "chunk"));
} }
#[tokio::test]
async fn ingestion_pipeline_chunk_only_skips_analysis() {
let db = setup_db().await;
let worker_id = "worker-chunk-only";
let user_id = "user-999";
let services = Arc::new(MockServices::new(user_id));
let mut config = pipeline_config();
config.chunk_only = true;
let pipeline =
IngestionPipeline::with_services(Arc::new(db.clone()), config, services.clone())
.expect("pipeline");
let task = reserve_task(
&db,
worker_id,
IngestionPayload::Text {
text: "Chunk only payload".into(),
context: "Context".into(),
category: "notes".into(),
user_id: user_id.into(),
},
user_id,
)
.await;
pipeline
.process_task(task.clone())
.await
.expect("pipeline succeeds");
let stored_entities: Vec<KnowledgeEntity> = db
.get_all_stored_items::<KnowledgeEntity>()
.await
.expect("entities stored");
assert!(
stored_entities.is_empty(),
"chunk-only ingestion should not persist entities"
);
let relationship_count: Option<i64> = db
.client
.query("SELECT count() as count FROM relates_to;")
.await
.expect("query relationships")
.take::<Option<i64>>(0)
.unwrap_or_default();
assert_eq!(
relationship_count.unwrap_or(0),
0,
"chunk-only ingestion should not persist relationships"
);
let stored_chunks: Vec<TextChunk> = db
.get_all_stored_items::<TextChunk>()
.await
.expect("chunks stored");
assert!(
!stored_chunks.is_empty(),
"chunk-only ingestion should still persist chunks"
);
let call_log = services.calls.lock().await.clone();
assert_eq!(call_log, vec!["prepare", "chunk"]);
}
#[tokio::test] #[tokio::test]
async fn ingestion_pipeline_failure_marks_retry() { async fn ingestion_pipeline_failure_marks_retry() {
let db = setup_db().await; let db = setup_db().await;

View File

@@ -116,6 +116,7 @@ where
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use common::storage::indexes::ensure_runtime_indexes;
use common::storage::types::{ use common::storage::types::{
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
text_chunk::TextChunk, text_chunk::TextChunk,
@@ -134,6 +135,9 @@ mod tests {
db.apply_migrations() db.apply_migrations()
.await .await
.expect("failed to apply migrations"); .expect("failed to apply migrations");
ensure_runtime_indexes(&db, 1536)
.await
.expect("failed to build runtime indexes");
let user_id = "user_fts"; let user_id = "user_fts";
let entity = KnowledgeEntity::new( let entity = KnowledgeEntity::new(
@@ -181,6 +185,9 @@ mod tests {
db.apply_migrations() db.apply_migrations()
.await .await
.expect("failed to apply migrations"); .expect("failed to apply migrations");
ensure_runtime_indexes(&db, 1536)
.await
.expect("failed to build runtime indexes");
let user_id = "user_fts_desc"; let user_id = "user_fts_desc";
let entity = KnowledgeEntity::new( let entity = KnowledgeEntity::new(
@@ -228,6 +235,9 @@ mod tests {
db.apply_migrations() db.apply_migrations()
.await .await
.expect("failed to apply migrations"); .expect("failed to apply migrations");
ensure_runtime_indexes(&db, 1536)
.await
.expect("failed to build runtime indexes");
let user_id = "user_fts_chunk"; let user_id = "user_fts_chunk";
let chunk = TextChunk::new( let chunk = TextChunk::new(

View File

@@ -69,6 +69,7 @@ pub async fn retrieve_entities(
mod tests { mod tests {
use super::*; use super::*;
use async_openai::Client; use async_openai::Client;
use common::storage::indexes::ensure_runtime_indexes;
use common::storage::types::{ use common::storage::types::{
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
knowledge_relationship::KnowledgeRelationship, knowledge_relationship::KnowledgeRelationship,
@@ -108,6 +109,10 @@ mod tests {
.await .await
.expect("Failed to apply migrations"); .expect("Failed to apply migrations");
ensure_runtime_indexes(&db, 3)
.await
.expect("failed to build runtime indexes");
db.query( db.query(
"BEGIN TRANSACTION; "BEGIN TRANSACTION;
REMOVE INDEX IF EXISTS idx_embedding_text_chunk_embedding ON TABLE text_chunk_embedding; REMOVE INDEX IF EXISTS idx_embedding_text_chunk_embedding ON TABLE text_chunk_embedding;