benchmarks: fin

This commit is contained in:
Per Stark
2025-12-08 21:57:53 +01:00
parent 0cb1abc6db
commit a8d10f265c
39 changed files with 774 additions and 714 deletions
+197
View File
@@ -0,0 +1,197 @@
use std::{
path::PathBuf,
sync::Arc,
time::{Duration, Instant},
};
use async_openai::Client;
use common::{
storage::{
db::SurrealDbClient,
types::{system_settings::SystemSettings, user::User},
},
utils::embedding::EmbeddingProvider,
};
use retrieval_pipeline::{
pipeline::{PipelineStageTimings, RetrievalConfig},
reranking::RerankerPool,
};
use crate::{
args::Config,
cache::EmbeddingCache,
datasets::ConvertedDataset,
eval::{CaseDiagnostics, CaseSummary, EvaluationStageTimings, EvaluationSummary, SeededCase},
corpus, slice, snapshot,
};
pub(super) struct EvaluationContext<'a> {
dataset: &'a ConvertedDataset,
config: &'a Config,
pub stage_timings: EvaluationStageTimings,
pub ledger_limit: Option<usize>,
pub slice_settings: Option<slice::SliceConfig<'a>>,
pub slice: Option<slice::ResolvedSlice<'a>>,
pub window_offset: usize,
pub window_length: usize,
pub window_total_cases: usize,
pub namespace: String,
pub database: String,
pub db: Option<SurrealDbClient>,
pub descriptor: Option<snapshot::Descriptor>,
pub settings: Option<SystemSettings>,
pub settings_missing: bool,
pub must_reapply_settings: bool,
pub embedding_provider: Option<EmbeddingProvider>,
pub embedding_cache: Option<EmbeddingCache>,
pub openai_client: Option<Arc<Client<async_openai::config::OpenAIConfig>>>,
pub openai_base_url: Option<String>,
pub expected_fingerprint: Option<String>,
pub ingestion_duration_ms: u128,
pub namespace_seed_ms: Option<u128>,
pub namespace_reused: bool,
pub evaluation_start: Option<Instant>,
pub eval_user: Option<User>,
pub corpus_handle: Option<corpus::CorpusHandle>,
pub cases: Vec<SeededCase>,
pub filtered_questions: usize,
pub stage_latency_samples: Vec<PipelineStageTimings>,
pub latencies: Vec<u128>,
pub diagnostics_output: Vec<CaseDiagnostics>,
pub query_summaries: Vec<CaseSummary>,
pub rerank_pool: Option<Arc<RerankerPool>>,
pub retrieval_config: Option<Arc<RetrievalConfig>>,
pub summary: Option<EvaluationSummary>,
pub diagnostics_path: Option<PathBuf>,
pub diagnostics_enabled: bool,
}
impl<'a> EvaluationContext<'a> {
pub fn new(dataset: &'a ConvertedDataset, config: &'a Config) -> Self {
Self {
dataset,
config,
stage_timings: EvaluationStageTimings::default(),
ledger_limit: None,
slice_settings: None,
slice: None,
window_offset: 0,
window_length: 0,
window_total_cases: 0,
namespace: String::new(),
database: String::new(),
db: None,
descriptor: None,
settings: None,
settings_missing: false,
must_reapply_settings: false,
embedding_provider: None,
embedding_cache: None,
openai_client: None,
openai_base_url: None,
expected_fingerprint: None,
ingestion_duration_ms: 0,
namespace_seed_ms: None,
namespace_reused: false,
evaluation_start: None,
eval_user: None,
corpus_handle: None,
cases: Vec::new(),
filtered_questions: 0,
stage_latency_samples: Vec::new(),
latencies: Vec::new(),
diagnostics_output: Vec::new(),
query_summaries: Vec::new(),
rerank_pool: None,
retrieval_config: None,
summary: None,
diagnostics_path: config.chunk_diagnostics_path.clone(),
diagnostics_enabled: config.chunk_diagnostics_path.is_some(),
}
}
pub fn dataset(&self) -> &'a ConvertedDataset {
self.dataset
}
pub fn config(&self) -> &'a Config {
self.config
}
pub fn slice(&self) -> &slice::ResolvedSlice<'a> {
self.slice.as_ref().expect("slice has not been prepared")
}
pub fn db(&self) -> &SurrealDbClient {
self.db.as_ref().expect("database connection missing")
}
pub fn descriptor(&self) -> &snapshot::Descriptor {
self.descriptor
.as_ref()
.expect("snapshot descriptor unavailable")
}
pub fn embedding_provider(&self) -> &EmbeddingProvider {
self.embedding_provider
.as_ref()
.expect("embedding provider not initialised")
}
pub fn openai_client(&self) -> Arc<Client<async_openai::config::OpenAIConfig>> {
self.openai_client
.as_ref()
.expect("openai client missing")
.clone()
}
pub fn corpus_handle(&self) -> &corpus::CorpusHandle {
self.corpus_handle.as_ref().expect("corpus handle missing")
}
pub fn evaluation_user(&self) -> &User {
self.eval_user.as_ref().expect("evaluation user missing")
}
pub fn record_stage_duration(&mut self, stage: EvalStage, duration: Duration) {
let elapsed = duration.as_millis() as u128;
match stage {
EvalStage::PrepareSlice => self.stage_timings.prepare_slice_ms += elapsed,
EvalStage::PrepareDb => self.stage_timings.prepare_db_ms += elapsed,
EvalStage::PrepareCorpus => self.stage_timings.prepare_corpus_ms += elapsed,
EvalStage::PrepareNamespace => self.stage_timings.prepare_namespace_ms += elapsed,
EvalStage::RunQueries => self.stage_timings.run_queries_ms += elapsed,
EvalStage::Summarize => self.stage_timings.summarize_ms += elapsed,
EvalStage::Finalize => self.stage_timings.finalize_ms += elapsed,
}
}
pub fn into_summary(self) -> EvaluationSummary {
self.summary.expect("evaluation summary missing")
}
}
#[derive(Copy, Clone)]
pub(super) enum EvalStage {
PrepareSlice,
PrepareDb,
PrepareCorpus,
PrepareNamespace,
RunQueries,
Summarize,
Finalize,
}
impl EvalStage {
pub fn label(&self) -> &'static str {
match self {
EvalStage::PrepareSlice => "prepare-slice",
EvalStage::PrepareDb => "prepare-db",
EvalStage::PrepareCorpus => "prepare-corpus",
EvalStage::PrepareNamespace => "prepare-namespace",
EvalStage::RunQueries => "run-queries",
EvalStage::Summarize => "summarize",
EvalStage::Finalize => "finalize",
}
}
}
+29
View File
@@ -0,0 +1,29 @@
mod context;
mod stages;
mod state;
use anyhow::Result;
use crate::{args::Config, datasets::ConvertedDataset, types::EvaluationSummary};
use context::EvaluationContext;
pub async fn run_evaluation(
dataset: &ConvertedDataset,
config: &Config,
) -> Result<EvaluationSummary> {
let mut ctx = EvaluationContext::new(dataset, config);
let machine = state::ready();
let machine = stages::prepare_slice(machine, &mut ctx).await?;
let machine = stages::prepare_db(machine, &mut ctx).await?;
let machine = stages::prepare_corpus(machine, &mut ctx).await?;
let machine = stages::prepare_namespace(machine, &mut ctx).await?;
let machine = stages::run_queries(machine, &mut ctx).await?;
let machine = stages::summarize(machine, &mut ctx).await?;
let machine = stages::finalize(machine, &mut ctx).await?;
drop(machine);
Ok(ctx.into_summary())
}
@@ -0,0 +1,59 @@
use std::time::Instant;
use anyhow::Context;
use tracing::info;
use crate::eval::write_chunk_diagnostics;
use super::super::{
context::{EvalStage, EvaluationContext},
state::{Completed, EvaluationMachine, Summarized},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn finalize(
machine: EvaluationMachine<(), Summarized>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<Completed> {
let stage = EvalStage::Finalize;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
if let Some(cache) = ctx.embedding_cache.as_ref() {
cache
.persist()
.await
.context("persisting embedding cache")?;
}
if let Some(path) = ctx.diagnostics_path.as_ref() {
if ctx.diagnostics_enabled {
write_chunk_diagnostics(path.as_path(), &ctx.diagnostics_output)
.await
.with_context(|| format!("writing chunk diagnostics to {}", path.display()))?;
}
}
info!(
total_cases = ctx.summary.as_ref().map(|s| s.total_cases).unwrap_or(0),
correct = ctx.summary.as_ref().map(|s| s.correct).unwrap_or(0),
precision = ctx.summary.as_ref().map(|s| s.precision).unwrap_or(0.0),
dataset = ctx.dataset().metadata.id.as_str(),
"Evaluation complete"
);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.finalize()
.map_err(|(_, guard)| map_guard_error("finalize", guard))
}
+26
View File
@@ -0,0 +1,26 @@
mod finalize;
mod prepare_corpus;
mod prepare_db;
mod prepare_namespace;
mod prepare_slice;
mod run_queries;
mod summarize;
pub(crate) use finalize::finalize;
pub(crate) use prepare_corpus::prepare_corpus;
pub(crate) use prepare_db::prepare_db;
pub(crate) use prepare_namespace::prepare_namespace;
pub(crate) use prepare_slice::prepare_slice;
pub(crate) use run_queries::run_queries;
pub(crate) use summarize::summarize;
use anyhow::Result;
use state_machines::core::GuardError;
use super::state::EvaluationMachine;
fn map_guard_error(event: &str, guard: GuardError) -> anyhow::Error {
anyhow::anyhow!("invalid evaluation pipeline transition during {event}: {guard:?}")
}
type StageResult<S> = Result<EvaluationMachine<(), S>>;
@@ -0,0 +1,142 @@
use std::time::Instant;
use anyhow::Context;
use tracing::info;
use crate::{eval::can_reuse_namespace, corpus, slice, snapshot};
use super::super::{
context::{EvalStage, EvaluationContext},
state::{CorpusReady, DbReady, EvaluationMachine},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn prepare_corpus(
machine: EvaluationMachine<(), DbReady>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<CorpusReady> {
let stage = EvalStage::PrepareCorpus;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let config = ctx.config();
let cache_settings = corpus::CorpusCacheConfig::from(config);
let embedding_provider = ctx.embedding_provider().clone();
let openai_client = ctx.openai_client();
let slice = ctx.slice();
let window = slice::select_window(slice, ctx.config().slice_offset, ctx.config().limit)
.context("selecting slice window for corpus preparation")?;
let descriptor = snapshot::Descriptor::new(config, slice, ctx.embedding_provider());
let ingestion_config = corpus::make_ingestion_config(config);
let expected_fingerprint = corpus::compute_ingestion_fingerprint(
ctx.dataset(),
slice,
config.converted_dataset_path.as_path(),
&ingestion_config,
)?;
let base_dir = corpus::cached_corpus_dir(
&cache_settings,
ctx.dataset().metadata.id.as_str(),
slice.manifest.slice_id.as_str(),
);
if !config.reseed_slice {
let requested_cases = window.cases.len();
if can_reuse_namespace(
ctx.db(),
&descriptor,
&ctx.namespace,
&ctx.database,
ctx.dataset().metadata.id.as_str(),
slice.manifest.slice_id.as_str(),
expected_fingerprint.as_str(),
requested_cases,
)
.await?
{
if let Some(manifest) = corpus::load_cached_manifest(&base_dir)? {
info!(
cache = %base_dir.display(),
namespace = ctx.namespace.as_str(),
database = ctx.database.as_str(),
"Namespace already seeded; reusing cached corpus manifest"
);
let corpus_handle = corpus::corpus_handle_from_manifest(manifest, base_dir);
ctx.corpus_handle = Some(corpus_handle);
ctx.expected_fingerprint = Some(expected_fingerprint);
ctx.ingestion_duration_ms = 0;
ctx.descriptor = Some(descriptor);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
return machine
.prepare_corpus()
.map_err(|(_, guard)| map_guard_error("prepare_corpus", guard));
} else {
info!(
cache = %base_dir.display(),
"Namespace reusable but cached manifest missing; regenerating corpus"
);
}
}
}
let eval_user_id = "eval-user".to_string();
let ingestion_timer = Instant::now();
let corpus_handle = {
corpus::ensure_corpus(
ctx.dataset(),
slice,
&window,
&cache_settings,
embedding_provider.clone().into(),
openai_client,
&eval_user_id,
config.converted_dataset_path.as_path(),
ingestion_config.clone(),
)
.await
.context("ensuring ingestion-backed corpus")?
};
let expected_fingerprint = corpus_handle
.manifest
.metadata
.ingestion_fingerprint
.clone();
let ingestion_duration_ms = ingestion_timer.elapsed().as_millis() as u128;
info!(
cache = %corpus_handle.path.display(),
reused_ingestion = corpus_handle.reused_ingestion,
reused_embeddings = corpus_handle.reused_embeddings,
positive_ingested = corpus_handle.positive_ingested,
negative_ingested = corpus_handle.negative_ingested,
"Ingestion corpus ready"
);
ctx.corpus_handle = Some(corpus_handle);
ctx.expected_fingerprint = Some(expected_fingerprint);
ctx.ingestion_duration_ms = ingestion_duration_ms;
ctx.descriptor = Some(descriptor);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.prepare_corpus()
.map_err(|(_, guard)| map_guard_error("prepare_corpus", guard))
}
@@ -0,0 +1,121 @@
use std::{sync::Arc, time::Instant};
use anyhow::{anyhow, Context};
use tracing::info;
use crate::{
args::EmbeddingBackend,
cache::EmbeddingCache,
eval::{
connect_eval_db, enforce_system_settings, load_or_init_system_settings, sanitize_model_code,
},
openai,
};
use common::utils::embedding::EmbeddingProvider;
use super::super::{
context::{EvalStage, EvaluationContext},
state::{DbReady, EvaluationMachine, SlicePrepared},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn prepare_db(
machine: EvaluationMachine<(), SlicePrepared>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<DbReady> {
let stage = EvalStage::PrepareDb;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let namespace = ctx.namespace.clone();
let database = ctx.database.clone();
let config = ctx.config();
let db = connect_eval_db(config, &namespace, &database).await?;
let (raw_openai_client, openai_base_url) =
openai::build_client_from_env().context("building OpenAI client")?;
let openai_client = Arc::new(raw_openai_client);
// Create embedding provider directly from config (eval only supports FastEmbed and Hashed)
let embedding_provider = match config.embedding_backend {
crate::args::EmbeddingBackend::FastEmbed => {
EmbeddingProvider::new_fastembed(config.embedding_model.clone())
.await
.context("creating FastEmbed provider")?
}
crate::args::EmbeddingBackend::Hashed => {
EmbeddingProvider::new_hashed(1536).context("creating Hashed provider")?
}
};
let provider_dimension = embedding_provider.dimension();
if provider_dimension == 0 {
return Err(anyhow!(
"embedding provider reported zero dimensions; cannot continue"
));
}
info!(
backend = embedding_provider.backend_label(),
model = embedding_provider
.model_code()
.as_deref()
.unwrap_or("<none>"),
dimension = provider_dimension,
"Embedding provider initialised"
);
info!(openai_base_url = %openai_base_url, "OpenAI client configured");
let (mut settings, settings_missing) =
load_or_init_system_settings(&db, provider_dimension).await?;
let embedding_cache = if config.embedding_backend == EmbeddingBackend::FastEmbed {
if let Some(model_code) = embedding_provider.model_code() {
let sanitized = sanitize_model_code(&model_code);
let path = config.cache_dir.join(format!("{sanitized}.json"));
if config.force_convert && path.exists() {
tokio::fs::remove_file(&path)
.await
.with_context(|| format!("removing stale cache {}", path.display()))
.ok();
}
let cache = EmbeddingCache::load(&path).await?;
info!(path = %path.display(), "Embedding cache ready");
Some(cache)
} else {
None
}
} else {
None
};
let must_reapply_settings = settings_missing;
let defer_initial_enforce = settings_missing && !config.reseed_slice;
if !defer_initial_enforce {
settings = enforce_system_settings(&db, settings, provider_dimension, config).await?;
}
ctx.db = Some(db);
ctx.settings_missing = settings_missing;
ctx.must_reapply_settings = must_reapply_settings;
ctx.settings = Some(settings);
ctx.embedding_provider = Some(embedding_provider);
ctx.embedding_cache = embedding_cache;
ctx.openai_client = Some(openai_client);
ctx.openai_base_url = Some(openai_base_url);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.prepare_db()
.map_err(|(_, guard)| map_guard_error("prepare_db", guard))
}
@@ -0,0 +1,203 @@
use std::time::Instant;
use anyhow::{anyhow, Context};
use common::storage::types::system_settings::SystemSettings;
use tracing::{info, warn};
use crate::{
db_helpers::{recreate_indexes, remove_all_indexes, reset_namespace},
eval::{
can_reuse_namespace, cases_from_manifest, enforce_system_settings, ensure_eval_user,
record_namespace_state, warm_hnsw_cache,
},
corpus,
};
use super::super::{
context::{EvalStage, EvaluationContext},
state::{CorpusReady, EvaluationMachine, NamespaceReady},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn prepare_namespace(
machine: EvaluationMachine<(), CorpusReady>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<NamespaceReady> {
let stage = EvalStage::PrepareNamespace;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let config = ctx.config();
let dataset = ctx.dataset();
let expected_fingerprint = ctx
.expected_fingerprint
.as_deref()
.unwrap_or_default()
.to_string();
let namespace = ctx.namespace.clone();
let database = ctx.database.clone();
let embedding_provider = ctx.embedding_provider().clone();
let corpus_handle = ctx.corpus_handle();
let base_manifest = &corpus_handle.manifest;
let manifest_for_seed =
if ctx.window_offset == 0 && ctx.window_length >= base_manifest.questions.len() {
base_manifest.clone()
} else {
corpus::window_manifest(
base_manifest,
ctx.window_offset,
ctx.window_length,
ctx.config().negative_multiplier,
)
.context("selecting manifest window for seeding")?
};
let requested_cases = manifest_for_seed.questions.len();
let mut namespace_reused = false;
if !config.reseed_slice {
namespace_reused = {
let slice = ctx.slice();
can_reuse_namespace(
ctx.db(),
ctx.descriptor(),
&namespace,
&database,
dataset.metadata.id.as_str(),
slice.manifest.slice_id.as_str(),
expected_fingerprint.as_str(),
requested_cases,
)
.await?
};
}
let mut namespace_seed_ms = None;
if !namespace_reused {
ctx.must_reapply_settings = true;
if let Err(err) = reset_namespace(ctx.db(), &namespace, &database).await {
warn!(
error = %err,
namespace,
database = %database,
"Failed to reset namespace before reseeding; continuing with existing data"
);
} else if let Err(err) = ctx.db().apply_migrations().await {
warn!(error = %err, "Failed to reapply migrations after namespace reset");
}
{
let slice = ctx.slice();
info!(
slice = slice.manifest.slice_id.as_str(),
window_offset = ctx.window_offset,
window_length = ctx.window_length,
positives = manifest_for_seed
.questions
.iter()
.map(|q| q.paragraph_id.as_str())
.collect::<std::collections::HashSet<_>>()
.len(),
negatives = manifest_for_seed.paragraphs.len().saturating_sub(
manifest_for_seed
.questions
.iter()
.map(|q| q.paragraph_id.as_str())
.collect::<std::collections::HashSet<_>>()
.len(),
),
total = manifest_for_seed.paragraphs.len(),
"Seeding ingestion corpus into SurrealDB"
);
}
let indexes_disabled = remove_all_indexes(ctx.db()).await.is_ok();
let seed_start = Instant::now();
corpus::seed_manifest_into_db(ctx.db(), &manifest_for_seed)
.await
.context("seeding ingestion corpus from manifest")?;
namespace_seed_ms = Some(seed_start.elapsed().as_millis() as u128);
// Recreate indexes AFTER data is loaded (correct bulk loading pattern)
if indexes_disabled {
info!("Recreating indexes after seeding data");
recreate_indexes(ctx.db(), embedding_provider.dimension())
.await
.context("recreating indexes with correct dimension")?;
warm_hnsw_cache(ctx.db(), embedding_provider.dimension()).await?;
}
{
let slice = ctx.slice();
record_namespace_state(
ctx.descriptor(),
dataset.metadata.id.as_str(),
slice.manifest.slice_id.as_str(),
expected_fingerprint.as_str(),
&namespace,
&database,
requested_cases,
)
.await;
}
}
if ctx.must_reapply_settings {
let mut settings = SystemSettings::get_current(ctx.db())
.await
.context("reloading system settings after namespace reset")?;
settings =
enforce_system_settings(ctx.db(), settings, embedding_provider.dimension(), config)
.await?;
ctx.settings = Some(settings);
ctx.must_reapply_settings = false;
}
let user = ensure_eval_user(ctx.db()).await?;
ctx.eval_user = Some(user);
let total_manifest_questions = manifest_for_seed.questions.len();
let cases = cases_from_manifest(&manifest_for_seed);
let include_impossible = manifest_for_seed.metadata.include_unanswerable;
let require_verified_chunks = manifest_for_seed.metadata.require_verified_chunks;
let filtered = total_manifest_questions.saturating_sub(cases.len());
if filtered > 0 {
info!(
filtered_questions = filtered,
total_questions = total_manifest_questions,
includes_impossible = include_impossible,
require_verified_chunks = require_verified_chunks,
"Filtered questions not eligible for this evaluation mode (impossible or unverifiable)"
);
}
if cases.is_empty() {
return Err(anyhow!(
"no eligible questions found in converted dataset for evaluation (consider --llm-mode or refreshing ingestion data)"
));
}
ctx.cases = cases;
ctx.filtered_questions = filtered;
ctx.namespace_reused = namespace_reused;
ctx.namespace_seed_ms = namespace_seed_ms;
info!(
cases = ctx.cases.len(),
window_offset = ctx.window_offset,
namespace_reused = namespace_reused,
"Dataset ready"
);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.prepare_namespace()
.map_err(|(_, guard)| map_guard_error("prepare_namespace", guard))
}
@@ -0,0 +1,70 @@
use std::time::Instant;
use anyhow::Context;
use tracing::info;
use crate::{
eval::{default_database, default_namespace, ledger_target},
slice,
};
use super::super::{
context::{EvalStage, EvaluationContext},
state::{EvaluationMachine, Ready, SlicePrepared},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn prepare_slice(
machine: EvaluationMachine<(), Ready>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<SlicePrepared> {
let stage = EvalStage::PrepareSlice;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let ledger_limit = ledger_target(ctx.config());
let slice_settings = slice::slice_config_with_limit(ctx.config(), ledger_limit);
let resolved_slice =
slice::resolve_slice(ctx.dataset(), &slice_settings).context("resolving dataset slice")?;
let window = slice::select_window(
&resolved_slice,
ctx.config().slice_offset,
ctx.config().limit,
)
.context("selecting slice window (use --slice-grow to extend the ledger first)")?;
ctx.ledger_limit = ledger_limit;
ctx.slice_settings = Some(slice_settings);
ctx.slice = Some(resolved_slice.clone());
ctx.window_offset = window.offset;
ctx.window_length = window.length;
ctx.window_total_cases = window.total_cases;
ctx.namespace = ctx
.config()
.database
.db_namespace
.clone()
.unwrap_or_else(|| default_namespace(ctx.dataset().metadata.id.as_str(), ctx.config().limit));
ctx.database = ctx
.config()
.database
.db_database
.clone()
.unwrap_or_else(default_database);
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.prepare_slice()
.map_err(|(_, guard)| map_guard_error("prepare_slice", guard))
}
@@ -0,0 +1,420 @@
use std::{collections::HashSet, sync::Arc, time::Instant};
use anyhow::Context;
use common::storage::types::StoredObject;
use futures::stream::{self, StreamExt};
use tracing::{debug, info};
use crate::eval::{
adapt_strategy_output, build_case_diagnostics, text_contains_answer, CaseDiagnostics,
CaseSummary, RetrievedSummary,
};
use retrieval_pipeline::{
pipeline::{self, PipelineStageTimings, RetrievalConfig},
reranking::RerankerPool,
};
use tokio::sync::Semaphore;
use super::super::{
context::{EvalStage, EvaluationContext},
state::{EvaluationMachine, NamespaceReady, QueriesFinished},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn run_queries(
machine: EvaluationMachine<(), NamespaceReady>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<QueriesFinished> {
let stage = EvalStage::RunQueries;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let config = ctx.config();
let dataset = ctx.dataset();
let slice_settings = ctx
.slice_settings
.as_ref()
.expect("slice settings missing during query stage");
let total_cases = ctx.cases.len();
let cases_iter = std::mem::take(&mut ctx.cases).into_iter().enumerate();
let rerank_pool = if config.retrieval.rerank {
Some(
RerankerPool::new(config.retrieval.rerank_pool_size)
.context("initialising reranker pool")?,
)
} else {
None
};
let mut retrieval_config = RetrievalConfig::default();
retrieval_config.strategy = config.retrieval.strategy;
retrieval_config.tuning.rerank_keep_top = config.retrieval.rerank_keep_top;
if retrieval_config.tuning.fallback_min_results < config.retrieval.rerank_keep_top {
retrieval_config.tuning.fallback_min_results = config.retrieval.rerank_keep_top;
}
retrieval_config.tuning.chunk_result_cap = config.retrieval.chunk_result_cap.max(1);
if let Some(value) = config.retrieval.chunk_vector_take {
retrieval_config.tuning.chunk_vector_take = value;
}
if let Some(value) = config.retrieval.chunk_fts_take {
retrieval_config.tuning.chunk_fts_take = value;
}
if let Some(value) = config.retrieval.chunk_rrf_k {
retrieval_config.tuning.chunk_rrf_k = value;
}
if let Some(value) = config.retrieval.chunk_rrf_vector_weight {
retrieval_config.tuning.chunk_rrf_vector_weight = value;
}
if let Some(value) = config.retrieval.chunk_rrf_fts_weight {
retrieval_config.tuning.chunk_rrf_fts_weight = value;
}
if let Some(value) = config.retrieval.chunk_rrf_use_vector {
retrieval_config.tuning.chunk_rrf_use_vector = value;
}
if let Some(value) = config.retrieval.chunk_rrf_use_fts {
retrieval_config.tuning.chunk_rrf_use_fts = value;
}
if let Some(value) = config.retrieval.chunk_avg_chars_per_token {
retrieval_config.tuning.avg_chars_per_token = value;
}
if let Some(value) = config.retrieval.max_chunks_per_entity {
retrieval_config.tuning.max_chunks_per_entity = value;
}
let active_tuning = retrieval_config.tuning.clone();
let effective_chunk_vector = config
.retrieval
.chunk_vector_take
.unwrap_or(active_tuning.chunk_vector_take);
let effective_chunk_fts = config
.retrieval
.chunk_fts_take
.unwrap_or(active_tuning.chunk_fts_take);
info!(
dataset = dataset.metadata.id.as_str(),
slice_seed = config.slice_seed,
slice_offset = config.slice_offset,
slice_limit = config
.limit
.unwrap_or(ctx.window_total_cases),
negative_multiplier = %slice_settings.negative_multiplier,
rerank_enabled = config.retrieval.rerank,
rerank_pool_size = config.retrieval.rerank_pool_size,
rerank_keep_top = config.retrieval.rerank_keep_top,
chunk_vector_take = effective_chunk_vector,
chunk_fts_take = effective_chunk_fts,
chunk_rrf_k = active_tuning.chunk_rrf_k,
chunk_rrf_vector_weight = active_tuning.chunk_rrf_vector_weight,
chunk_rrf_fts_weight = active_tuning.chunk_rrf_fts_weight,
chunk_rrf_use_vector = active_tuning.chunk_rrf_use_vector,
chunk_rrf_use_fts = active_tuning.chunk_rrf_use_fts,
embedding_backend = ctx.embedding_provider().backend_label(),
embedding_model = ctx
.embedding_provider()
.model_code()
.as_deref()
.unwrap_or("<default>"),
"Starting evaluation run"
);
let retrieval_config = Arc::new(retrieval_config);
ctx.rerank_pool = rerank_pool.clone();
ctx.retrieval_config = Some(retrieval_config.clone());
ctx.evaluation_start = Some(Instant::now());
let user_id = ctx.evaluation_user().id.clone();
let concurrency = config.concurrency.max(1);
let diagnostics_enabled = ctx.diagnostics_enabled;
let query_semaphore = Arc::new(Semaphore::new(concurrency));
info!(
total_cases = total_cases,
max_concurrent_queries = concurrency,
"Starting evaluation with staged query execution"
);
let embedding_provider_for_queries = ctx.embedding_provider().clone();
let rerank_pool_for_queries = rerank_pool.clone();
let db = ctx.db().clone();
let openai_client = ctx.openai_client();
let raw_results = stream::iter(cases_iter)
.map(move |(idx, case)| {
let db = db.clone();
let openai_client = openai_client.clone();
let user_id = user_id.clone();
let retrieval_config = retrieval_config.clone();
let embedding_provider = embedding_provider_for_queries.clone();
let rerank_pool = rerank_pool_for_queries.clone();
let semaphore = query_semaphore.clone();
let diagnostics_enabled = diagnostics_enabled;
async move {
let _permit = semaphore
.acquire()
.await
.context("acquiring query semaphore permit")?;
let crate::eval::SeededCase {
question_id,
question,
expected_source,
answers,
paragraph_id,
paragraph_title,
expected_chunk_ids,
is_impossible,
has_verified_chunks,
} = case;
let query_start = Instant::now();
debug!(question_id = %question_id, "Evaluating query");
let query_embedding =
embedding_provider.embed(&question).await.with_context(|| {
format!("generating embedding for question {}", question_id)
})?;
let reranker = match &rerank_pool {
Some(pool) => Some(pool.checkout().await),
None => None,
};
let (result_output, pipeline_diagnostics, stage_timings) = if diagnostics_enabled {
let outcome = pipeline::run_pipeline_with_embedding_with_diagnostics(
&db,
&openai_client,
Some(&embedding_provider),
query_embedding,
&question,
&user_id,
(*retrieval_config).clone(),
reranker,
)
.await
.with_context(|| format!("running pipeline for question {}", question_id))?;
(outcome.results, outcome.diagnostics, outcome.stage_timings)
} else {
let outcome = pipeline::run_pipeline_with_embedding_with_metrics(
&db,
&openai_client,
Some(&embedding_provider),
query_embedding,
&question,
&user_id,
(*retrieval_config).clone(),
reranker,
)
.await
.with_context(|| format!("running pipeline for question {}", question_id))?;
(outcome.results, None, outcome.stage_timings)
};
let query_latency = query_start.elapsed().as_millis() as u128;
let candidates = adapt_strategy_output(result_output);
let mut retrieved = Vec::new();
let mut match_rank = None;
let answers_lower: Vec<String> =
answers.iter().map(|ans| ans.to_ascii_lowercase()).collect();
let expected_chunk_ids_set: HashSet<&str> =
expected_chunk_ids.iter().map(|id| id.as_str()).collect();
let chunk_id_required = has_verified_chunks;
let mut entity_hit = false;
let mut chunk_text_hit = false;
let mut chunk_id_hit = !chunk_id_required;
for (idx_entity, candidate) in candidates.iter().enumerate() {
if idx_entity >= config.k {
break;
}
let entity_match = candidate.source_id == expected_source;
if entity_match {
entity_hit = true;
}
let chunk_text_for_entity = candidate
.chunks
.iter()
.any(|chunk| text_contains_answer(&chunk.chunk.chunk, &answers_lower));
if chunk_text_for_entity {
chunk_text_hit = true;
}
let chunk_id_for_entity = if chunk_id_required {
expected_chunk_ids_set.contains(candidate.source_id.as_str())
|| candidate
.chunks
.iter()
.any(|chunk| expected_chunk_ids_set.contains(&chunk.chunk.get_id()))
} else {
true
};
if chunk_id_for_entity {
chunk_id_hit = true;
}
let success = entity_match && chunk_text_for_entity && chunk_id_for_entity;
if success && match_rank.is_none() {
match_rank = Some(idx_entity + 1);
}
let detail_fields = if config.detailed_report {
let description = candidate.entity_description.clone();
let category = candidate.entity_category.clone();
(
description,
category,
Some(chunk_text_for_entity),
Some(chunk_id_for_entity),
)
} else {
(None, None, None, None)
};
retrieved.push(RetrievedSummary {
rank: idx_entity + 1,
entity_id: candidate.entity_id.clone(),
source_id: candidate.source_id.clone(),
entity_name: candidate.entity_name.clone(),
score: candidate.score,
matched: success,
entity_description: detail_fields.0,
entity_category: detail_fields.1,
chunk_text_match: detail_fields.2,
chunk_id_match: detail_fields.3,
});
}
let overall_match = match_rank.is_some();
let reciprocal_rank = calculate_reciprocal_rank(match_rank);
let ndcg = calculate_ndcg(&retrieved, config.k);
let summary = CaseSummary {
question_id,
question,
paragraph_id,
paragraph_title,
expected_source,
answers,
matched: overall_match,
entity_match: entity_hit,
chunk_text_match: chunk_text_hit,
chunk_id_match: chunk_id_hit,
is_impossible,
has_verified_chunks,
match_rank,
reciprocal_rank: Some(reciprocal_rank),
ndcg: Some(ndcg),
latency_ms: query_latency,
retrieved,
};
let diagnostics = if diagnostics_enabled {
Some(build_case_diagnostics(
&summary,
&expected_chunk_ids,
&answers_lower,
&candidates,
pipeline_diagnostics,
))
} else {
None
};
Ok::<
(
usize,
CaseSummary,
Option<CaseDiagnostics>,
PipelineStageTimings,
),
anyhow::Error,
>((idx, summary, diagnostics, stage_timings))
}
})
.buffer_unordered(concurrency)
.collect::<Vec<_>>()
.await;
let mut results = Vec::with_capacity(raw_results.len());
for result in raw_results {
match result {
Ok(val) => results.push(val),
Err(err) => {
tracing::error!(error = ?err, "Query execution failed");
}
}
}
let mut ordered = results;
ordered.sort_by_key(|(idx, ..)| *idx);
let mut summaries = Vec::with_capacity(ordered.len());
let mut latencies = Vec::with_capacity(ordered.len());
let mut diagnostics_output = Vec::new();
let mut stage_latency_samples = Vec::with_capacity(ordered.len());
for (_, summary, diagnostics, stage_timings) in ordered {
latencies.push(summary.latency_ms);
summaries.push(summary);
if let Some(diag) = diagnostics {
diagnostics_output.push(diag);
}
stage_latency_samples.push(stage_timings);
}
ctx.query_summaries = summaries;
ctx.latencies = latencies;
ctx.diagnostics_output = diagnostics_output;
ctx.stage_latency_samples = stage_latency_samples;
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.run_queries()
.map_err(|(_, guard)| map_guard_error("run_queries", guard))
}
fn calculate_reciprocal_rank(rank: Option<usize>) -> f64 {
match rank {
Some(r) if r > 0 => 1.0 / (r as f64),
_ => 0.0,
}
}
fn calculate_ndcg(retrieved: &[RetrievedSummary], k: usize) -> f64 {
let mut dcg = 0.0;
let mut relevant_count = 0;
for (i, item) in retrieved.iter().enumerate() {
if i >= k {
break;
}
if item.matched {
let rel = 1.0;
dcg += rel / (i as f64 + 2.0).log2();
relevant_count += 1;
}
}
if dcg == 0.0 {
return 0.0;
}
// Calculate IDCG based on the number of relevant items found
// We assume ideal ordering would place all 'relevant_count' items at the top
let mut idcg = 0.0;
for i in 0..relevant_count {
let rel = 1.0;
idcg += rel / (i as f64 + 2.0).log2();
}
if idcg == 0.0 {
0.0
} else {
dcg / idcg
}
}
@@ -0,0 +1,232 @@
use std::time::Instant;
use chrono::Utc;
use tracing::info;
use crate::eval::{
build_stage_latency_breakdown, compute_latency_stats, EvaluationSummary, PerformanceTimings,
};
use super::super::{
context::{EvalStage, EvaluationContext},
state::{EvaluationMachine, QueriesFinished, Summarized},
};
use super::{map_guard_error, StageResult};
pub(crate) async fn summarize(
machine: EvaluationMachine<(), QueriesFinished>,
ctx: &mut EvaluationContext<'_>,
) -> StageResult<Summarized> {
let stage = EvalStage::Summarize;
info!(
evaluation_stage = stage.label(),
"starting evaluation stage"
);
let started = Instant::now();
let summaries = std::mem::take(&mut ctx.query_summaries);
let latencies = std::mem::take(&mut ctx.latencies);
let stage_latency_samples = std::mem::take(&mut ctx.stage_latency_samples);
let duration_ms = ctx
.evaluation_start
.take()
.map(|start| start.elapsed().as_millis())
.unwrap_or_default();
let config = ctx.config();
let dataset = ctx.dataset();
let slice = ctx.slice();
let corpus_handle = ctx.corpus_handle();
let total_cases = summaries.len();
let mut correct = 0usize;
let mut correct_at_1 = 0usize;
let mut correct_at_2 = 0usize;
let mut correct_at_3 = 0usize;
let mut retrieval_cases = 0usize;
let mut llm_cases = 0usize;
let mut llm_answered = 0usize;
let mut sum_reciprocal_rank = 0.0;
let mut sum_ndcg = 0.0;
for summary in &summaries {
if summary.is_impossible {
llm_cases += 1;
if summary.matched {
llm_answered += 1;
}
continue;
}
retrieval_cases += 1;
if let Some(rr) = summary.reciprocal_rank {
sum_reciprocal_rank += rr;
}
if let Some(ndcg) = summary.ndcg {
sum_ndcg += ndcg;
}
if summary.matched {
correct += 1;
if let Some(rank) = summary.match_rank {
if rank <= 1 {
correct_at_1 += 1;
}
if rank <= 2 {
correct_at_2 += 1;
}
if rank <= 3 {
correct_at_3 += 1;
}
}
}
}
let latency_stats = compute_latency_stats(&latencies);
let stage_latency = build_stage_latency_breakdown(&stage_latency_samples);
let retrieval_precision = if retrieval_cases == 0 {
0.0
} else {
(correct as f64) / (retrieval_cases as f64)
};
let llm_precision = if llm_cases == 0 {
0.0
} else {
(llm_answered as f64) / (llm_cases as f64)
};
let precision = retrieval_precision;
let precision_at_1 = if retrieval_cases == 0 {
0.0
} else {
(correct_at_1 as f64) / (retrieval_cases as f64)
};
let precision_at_2 = if retrieval_cases == 0 {
0.0
} else {
(correct_at_2 as f64) / (retrieval_cases as f64)
};
let precision_at_3 = if retrieval_cases == 0 {
0.0
} else {
(correct_at_3 as f64) / (retrieval_cases as f64)
};
let mrr = if retrieval_cases == 0 {
0.0
} else {
sum_reciprocal_rank / (retrieval_cases as f64)
};
let average_ndcg = if retrieval_cases == 0 {
0.0
} else {
sum_ndcg / (retrieval_cases as f64)
};
let active_tuning = ctx
.retrieval_config
.as_ref()
.map(|cfg| cfg.tuning.clone())
.unwrap_or_default();
let perf_timings = PerformanceTimings {
openai_base_url: ctx
.openai_base_url
.clone()
.unwrap_or_else(|| "<unknown>".to_string()),
ingestion_ms: ctx.ingestion_duration_ms,
namespace_seed_ms: ctx.namespace_seed_ms,
evaluation_stage_ms: ctx.stage_timings.clone(),
stage_latency,
};
ctx.summary = Some(EvaluationSummary {
generated_at: Utc::now(),
k: config.k,
limit: config.limit,
run_label: config.label.clone(),
total_cases,
correct,
precision,
correct_at_1,
correct_at_2,
correct_at_3,
precision_at_1,
precision_at_2,
precision_at_3,
mrr,
average_ndcg,
duration_ms,
dataset_id: dataset.metadata.id.clone(),
dataset_label: dataset.metadata.label.clone(),
dataset_includes_unanswerable: dataset.metadata.include_unanswerable,
dataset_source: dataset.source.clone(),
includes_impossible_cases: slice.manifest.includes_unanswerable,
require_verified_chunks: slice.manifest.require_verified_chunks,
filtered_questions: ctx.filtered_questions,
retrieval_cases,
retrieval_correct: correct,
retrieval_precision,
llm_cases,
llm_answered,
llm_precision,
slice_id: slice.manifest.slice_id.clone(),
slice_seed: slice.manifest.seed,
slice_total_cases: slice.manifest.case_count,
slice_window_offset: ctx.window_offset,
slice_window_length: ctx.window_length,
slice_cases: total_cases,
slice_positive_paragraphs: slice.manifest.positive_paragraphs,
slice_negative_paragraphs: slice.manifest.negative_paragraphs,
slice_total_paragraphs: slice.manifest.total_paragraphs,
slice_negative_multiplier: slice.manifest.negative_multiplier,
namespace_reused: ctx.namespace_reused,
corpus_paragraphs: ctx.corpus_handle().manifest.metadata.paragraph_count,
ingestion_cache_path: corpus_handle.path.display().to_string(),
ingestion_reused: corpus_handle.reused_ingestion,
ingestion_embeddings_reused: corpus_handle.reused_embeddings,
ingestion_fingerprint: corpus_handle
.manifest
.metadata
.ingestion_fingerprint
.clone(),
positive_paragraphs_reused: corpus_handle.positive_reused,
negative_paragraphs_reused: corpus_handle.negative_reused,
latency_ms: latency_stats,
perf: perf_timings,
embedding_backend: ctx.embedding_provider().backend_label().to_string(),
embedding_model: ctx.embedding_provider().model_code(),
embedding_dimension: ctx.embedding_provider().dimension(),
rerank_enabled: config.retrieval.rerank,
rerank_pool_size: ctx
.rerank_pool
.as_ref()
.map(|_| config.retrieval.rerank_pool_size),
rerank_keep_top: config.retrieval.rerank_keep_top,
concurrency: config.concurrency.max(1),
detailed_report: config.detailed_report,
retrieval_strategy: config.retrieval.strategy.to_string(),
chunk_result_cap: config.retrieval.chunk_result_cap,
chunk_rrf_k: active_tuning.chunk_rrf_k,
chunk_rrf_vector_weight: active_tuning.chunk_rrf_vector_weight,
chunk_rrf_fts_weight: active_tuning.chunk_rrf_fts_weight,
chunk_rrf_use_vector: active_tuning.chunk_rrf_use_vector,
chunk_rrf_use_fts: active_tuning.chunk_rrf_use_fts,
ingest_chunk_min_tokens: config.ingest.ingest_chunk_min_tokens,
ingest_chunk_max_tokens: config.ingest.ingest_chunk_max_tokens,
ingest_chunks_only: config.ingest.ingest_chunks_only,
ingest_chunk_overlap_tokens: config.ingest.ingest_chunk_overlap_tokens,
chunk_vector_take: active_tuning.chunk_vector_take,
chunk_fts_take: active_tuning.chunk_fts_take,
chunk_avg_chars_per_token: active_tuning.avg_chars_per_token,
max_chunks_per_entity: active_tuning.max_chunks_per_entity,
cases: summaries,
});
let elapsed = started.elapsed();
ctx.record_stage_duration(stage, elapsed);
info!(
evaluation_stage = stage.label(),
duration_ms = elapsed.as_millis(),
"completed evaluation stage"
);
machine
.summarize()
.map_err(|(_, guard)| map_guard_error("summarize", guard))
}
+31
View File
@@ -0,0 +1,31 @@
use state_machines::state_machine;
state_machine! {
name: EvaluationMachine,
state: EvaluationState,
initial: Ready,
states: [Ready, SlicePrepared, DbReady, CorpusReady, NamespaceReady, QueriesFinished, Summarized, Completed, Failed],
events {
prepare_slice { transition: { from: Ready, to: SlicePrepared } }
prepare_db { transition: { from: SlicePrepared, to: DbReady } }
prepare_corpus { transition: { from: DbReady, to: CorpusReady } }
prepare_namespace { transition: { from: CorpusReady, to: NamespaceReady } }
run_queries { transition: { from: NamespaceReady, to: QueriesFinished } }
summarize { transition: { from: QueriesFinished, to: Summarized } }
finalize { transition: { from: Summarized, to: Completed } }
abort {
transition: { from: Ready, to: Failed }
transition: { from: SlicePrepared, to: Failed }
transition: { from: DbReady, to: Failed }
transition: { from: CorpusReady, to: Failed }
transition: { from: NamespaceReady, to: Failed }
transition: { from: QueriesFinished, to: Failed }
transition: { from: Summarized, to: Failed }
transition: { from: Completed, to: Failed }
}
}
}
pub fn ready() -> EvaluationMachine<(), Ready> {
EvaluationMachine::new(())
}