mirror of
https://github.com/perstarkse/minne.git
synced 2026-06-29 05:16:26 +02:00
evals: eval crate overhaul, simplification and performance improvements
This commit is contained in:
@@ -20,11 +20,11 @@ use retrieval_pipeline::{
|
||||
|
||||
use crate::{
|
||||
args::Config,
|
||||
cache::EmbeddingCache,
|
||||
cases::SeededCase,
|
||||
corpus,
|
||||
datasets::ConvertedDataset,
|
||||
eval::{CaseDiagnostics, CaseSummary, EvaluationStageTimings, EvaluationSummary, SeededCase},
|
||||
slice, snapshot,
|
||||
slice,
|
||||
types::{CaseDiagnostics, CaseSummary, EvaluationStageTimings, EvaluationSummary},
|
||||
};
|
||||
|
||||
#[allow(clippy::struct_excessive_bools)]
|
||||
@@ -41,12 +41,10 @@ pub(super) struct EvaluationContext<'a> {
|
||||
pub namespace: String,
|
||||
pub database: String,
|
||||
pub db: Option<SurrealDbClient>,
|
||||
pub descriptor: Option<snapshot::Descriptor>,
|
||||
pub settings: Option<SystemSettings>,
|
||||
pub settings_missing: bool,
|
||||
pub must_reapply_settings: bool,
|
||||
pub embedding_provider: Option<EmbeddingProvider>,
|
||||
pub embedding_cache: Option<EmbeddingCache>,
|
||||
pub openai_client: Option<Arc<Client<async_openai::config::OpenAIConfig>>>,
|
||||
pub openai_base_url: Option<String>,
|
||||
pub expected_fingerprint: Option<String>,
|
||||
@@ -67,13 +65,19 @@ pub(super) struct EvaluationContext<'a> {
|
||||
pub summary: Option<EvaluationSummary>,
|
||||
pub diagnostics_path: Option<PathBuf>,
|
||||
pub diagnostics_enabled: bool,
|
||||
pub content_checksum: Option<String>,
|
||||
}
|
||||
|
||||
impl<'a> EvaluationContext<'a> {
|
||||
pub fn new(dataset: &'a ConvertedDataset, config: &'a Config) -> Self {
|
||||
pub fn new(
|
||||
dataset: &'a ConvertedDataset,
|
||||
config: &'a Config,
|
||||
content_checksum: Option<String>,
|
||||
) -> Self {
|
||||
Self {
|
||||
dataset,
|
||||
config,
|
||||
content_checksum,
|
||||
stage_timings: EvaluationStageTimings::default(),
|
||||
ledger_limit: None,
|
||||
slice_settings: None,
|
||||
@@ -84,12 +88,10 @@ impl<'a> EvaluationContext<'a> {
|
||||
namespace: String::new(),
|
||||
database: String::new(),
|
||||
db: None,
|
||||
descriptor: None,
|
||||
settings: None,
|
||||
settings_missing: false,
|
||||
must_reapply_settings: false,
|
||||
embedding_provider: None,
|
||||
embedding_cache: None,
|
||||
openai_client: None,
|
||||
openai_base_url: None,
|
||||
expected_fingerprint: None,
|
||||
@@ -133,12 +135,6 @@ impl<'a> EvaluationContext<'a> {
|
||||
.ok_or_else(|| anyhow!("database connection missing"))
|
||||
}
|
||||
|
||||
pub fn descriptor(&self) -> Result<&snapshot::Descriptor> {
|
||||
self.descriptor
|
||||
.as_ref()
|
||||
.ok_or_else(|| anyhow!("snapshot descriptor unavailable"))
|
||||
}
|
||||
|
||||
pub fn embedding_provider(&self) -> Result<&EmbeddingProvider> {
|
||||
self.embedding_provider
|
||||
.as_ref()
|
||||
@@ -159,6 +155,10 @@ impl<'a> EvaluationContext<'a> {
|
||||
.ok_or_else(|| anyhow!("corpus handle missing"))
|
||||
}
|
||||
|
||||
pub fn content_checksum(&self) -> Option<&str> {
|
||||
self.content_checksum.as_deref()
|
||||
}
|
||||
|
||||
pub fn evaluation_user(&self) -> Result<&User> {
|
||||
self.eval_user
|
||||
.as_ref()
|
||||
|
||||
@@ -0,0 +1,20 @@
|
||||
use std::path::Path;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use tokio::io::AsyncWriteExt;
|
||||
|
||||
use crate::{args, types::CaseDiagnostics};
|
||||
|
||||
pub(crate) async fn write_chunk_diagnostics(path: &Path, cases: &[CaseDiagnostics]) -> Result<()> {
|
||||
args::ensure_parent(path)?;
|
||||
let mut file = tokio::fs::File::create(path)
|
||||
.await
|
||||
.with_context(|| format!("creating diagnostics file {}", path.display()))?;
|
||||
for case in cases {
|
||||
let line = serde_json::to_vec(case).context("serialising chunk diagnostics entry")?;
|
||||
file.write_all(&line).await?;
|
||||
file.write_all(b"\n").await?;
|
||||
}
|
||||
file.flush().await?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
mod context;
|
||||
mod diagnostics;
|
||||
mod stages;
|
||||
mod state;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
@@ -8,20 +8,49 @@ use crate::{args::Config, datasets::ConvertedDataset, types::EvaluationSummary};
|
||||
|
||||
use context::EvaluationContext;
|
||||
|
||||
async fn run_through_namespace<'a>(
|
||||
dataset: &'a ConvertedDataset,
|
||||
config: &'a Config,
|
||||
content_checksum: Option<String>,
|
||||
) -> Result<EvaluationContext<'a>> {
|
||||
let mut ctx = EvaluationContext::new(dataset, config, content_checksum);
|
||||
stages::prepare_slice(&mut ctx).await?;
|
||||
stages::prepare_db(&mut ctx).await?;
|
||||
stages::prepare_corpus(&mut ctx).await?;
|
||||
stages::prepare_namespace(&mut ctx).await?;
|
||||
Ok(ctx)
|
||||
}
|
||||
|
||||
pub async fn warm_evaluation(
|
||||
dataset: &ConvertedDataset,
|
||||
config: &Config,
|
||||
content_checksum: &str,
|
||||
) -> Result<()> {
|
||||
let _ctx = run_through_namespace(
|
||||
dataset,
|
||||
config,
|
||||
Some(content_checksum.to_string()),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn run_evaluation(
|
||||
dataset: &ConvertedDataset,
|
||||
config: &Config,
|
||||
content_checksum: Option<&str>,
|
||||
) -> Result<EvaluationSummary> {
|
||||
let mut ctx = EvaluationContext::new(dataset, config);
|
||||
let machine = state::ready();
|
||||
|
||||
let machine = stages::prepare_slice(machine, &mut ctx).await?;
|
||||
let machine = stages::prepare_db(machine, &mut ctx).await?;
|
||||
let machine = stages::prepare_corpus(machine, &mut ctx).await?;
|
||||
let machine = stages::prepare_namespace(machine, &mut ctx).await?;
|
||||
let machine = stages::run_queries(machine, &mut ctx).await?;
|
||||
let machine = stages::summarize(machine, &mut ctx).await?;
|
||||
let _ = stages::finalize(machine, &mut ctx).await?;
|
||||
|
||||
let mut ctx = EvaluationContext::new(
|
||||
dataset,
|
||||
config,
|
||||
content_checksum.map(str::to_string),
|
||||
);
|
||||
stages::prepare_slice(&mut ctx).await?;
|
||||
stages::prepare_db(&mut ctx).await?;
|
||||
stages::prepare_corpus(&mut ctx).await?;
|
||||
stages::prepare_namespace(&mut ctx).await?;
|
||||
stages::run_queries(&mut ctx).await?;
|
||||
stages::summarize(&mut ctx).await?;
|
||||
stages::finalize(&mut ctx).await?;
|
||||
ctx.into_summary()
|
||||
}
|
||||
|
||||
@@ -3,18 +3,12 @@ use std::time::Instant;
|
||||
use anyhow::Context;
|
||||
use tracing::info;
|
||||
|
||||
use crate::eval::write_chunk_diagnostics;
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{Completed, EvaluationMachine, Summarized},
|
||||
diagnostics::write_chunk_diagnostics,
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
|
||||
pub(crate) async fn finalize(
|
||||
machine: EvaluationMachine<(), Summarized>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<Completed> {
|
||||
pub(crate) async fn finalize(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::Finalize;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -22,13 +16,6 @@ pub(crate) async fn finalize(
|
||||
);
|
||||
let started = Instant::now();
|
||||
|
||||
if let Some(cache) = ctx.embedding_cache.as_ref() {
|
||||
cache
|
||||
.persist()
|
||||
.await
|
||||
.context("persisting embedding cache")?;
|
||||
}
|
||||
|
||||
if let Some(path) = ctx.diagnostics_path.as_ref() {
|
||||
if ctx.diagnostics_enabled {
|
||||
write_chunk_diagnostics(path.as_path(), &ctx.diagnostics_output)
|
||||
@@ -53,7 +40,5 @@ pub(crate) async fn finalize(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.finalize()
|
||||
.map_err(|(_, guard)| map_guard_error("finalize", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -13,14 +13,3 @@ pub(crate) use prepare_namespace::prepare_namespace;
|
||||
pub(crate) use prepare_slice::prepare_slice;
|
||||
pub(crate) use run_queries::run_queries;
|
||||
pub(crate) use summarize::summarize;
|
||||
|
||||
use anyhow::Result;
|
||||
use state_machines::core::GuardError;
|
||||
|
||||
use super::state::EvaluationMachine;
|
||||
|
||||
fn map_guard_error(event: &str, guard: &GuardError) -> anyhow::Error {
|
||||
anyhow::anyhow!("invalid evaluation pipeline transition during {event}: {guard:?}")
|
||||
}
|
||||
|
||||
type StageResult<S> = Result<EvaluationMachine<(), S>>;
|
||||
|
||||
@@ -3,19 +3,12 @@ use std::time::Instant;
|
||||
use anyhow::Context;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{corpus, eval::can_reuse_namespace, slice, snapshot};
|
||||
use crate::{corpus, db::can_reuse_namespace, slice};
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{CorpusReady, DbReady, EvaluationMachine},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub(crate) async fn prepare_corpus(
|
||||
machine: EvaluationMachine<(), DbReady>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<CorpusReady> {
|
||||
pub(crate) async fn prepare_corpus(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::PrepareCorpus;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -31,13 +24,13 @@ pub(crate) async fn prepare_corpus(
|
||||
let window = slice::select_window(slice, ctx.config().slice_offset, ctx.config().limit)
|
||||
.context("selecting slice window for corpus preparation")?;
|
||||
|
||||
let descriptor = snapshot::Descriptor::new(config, slice, ctx.embedding_provider()?);
|
||||
let ingestion_config = corpus::make_ingestion_config(config);
|
||||
let expected_fingerprint = corpus::compute_ingestion_fingerprint(
|
||||
ctx.dataset(),
|
||||
slice,
|
||||
config.converted_dataset_path.as_path(),
|
||||
&ingestion_config,
|
||||
ctx.content_checksum(),
|
||||
)?;
|
||||
let base_dir = corpus::cached_corpus_dir(
|
||||
&cache_settings,
|
||||
@@ -47,19 +40,18 @@ pub(crate) async fn prepare_corpus(
|
||||
|
||||
if !config.reseed_slice {
|
||||
let requested_cases = window.cases.len();
|
||||
if can_reuse_namespace(
|
||||
ctx.db()?,
|
||||
&descriptor,
|
||||
&ctx.namespace,
|
||||
&ctx.database,
|
||||
ctx.dataset().metadata.id.as_str(),
|
||||
slice.manifest.slice_id.as_str(),
|
||||
expected_fingerprint.as_str(),
|
||||
requested_cases,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
if let Some(manifest) = corpus::load_cached_manifest(&base_dir)? {
|
||||
if let Some(manifest) = corpus::load_cached_manifest(&base_dir)? {
|
||||
if can_reuse_namespace(
|
||||
ctx.db()?,
|
||||
&manifest,
|
||||
&embedding_provider,
|
||||
&ctx.namespace,
|
||||
&ctx.database,
|
||||
expected_fingerprint.as_str(),
|
||||
requested_cases,
|
||||
)
|
||||
.await?
|
||||
{
|
||||
info!(
|
||||
cache = %base_dir.display(),
|
||||
namespace = ctx.namespace.as_str(),
|
||||
@@ -70,7 +62,6 @@ pub(crate) async fn prepare_corpus(
|
||||
ctx.corpus_handle = Some(corpus_handle);
|
||||
ctx.expected_fingerprint = Some(expected_fingerprint);
|
||||
ctx.ingestion_duration_ms = 0;
|
||||
ctx.descriptor = Some(descriptor);
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
ctx.record_stage_duration(stage, elapsed);
|
||||
@@ -80,14 +71,8 @@ pub(crate) async fn prepare_corpus(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
return machine
|
||||
.prepare_corpus()
|
||||
.map_err(|(_, guard)| map_guard_error("prepare_corpus", &guard));
|
||||
return Ok(());
|
||||
}
|
||||
info!(
|
||||
cache = %base_dir.display(),
|
||||
"Namespace reusable but cached manifest missing; regenerating corpus"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -103,6 +88,7 @@ pub(crate) async fn prepare_corpus(
|
||||
openai_client,
|
||||
&eval_user_id,
|
||||
config.converted_dataset_path.as_path(),
|
||||
ctx.content_checksum(),
|
||||
ingestion_config.clone(),
|
||||
)
|
||||
.await
|
||||
@@ -126,7 +112,6 @@ pub(crate) async fn prepare_corpus(
|
||||
ctx.corpus_handle = Some(corpus_handle);
|
||||
ctx.expected_fingerprint = Some(expected_fingerprint);
|
||||
ctx.ingestion_duration_ms = ingestion_duration_ms;
|
||||
ctx.descriptor = Some(descriptor);
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
ctx.record_stage_duration(stage, elapsed);
|
||||
@@ -136,7 +121,5 @@ pub(crate) async fn prepare_corpus(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.prepare_corpus()
|
||||
.map_err(|(_, guard)| map_guard_error("prepare_corpus", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,28 +1,19 @@
|
||||
use std::{sync::Arc, time::Instant};
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
args::EmbeddingBackend,
|
||||
cache::EmbeddingCache,
|
||||
eval::{
|
||||
connect_eval_db, enforce_system_settings, load_or_init_system_settings, sanitize_model_code,
|
||||
},
|
||||
db::{connect_eval_db, sanitize_model_code},
|
||||
openai,
|
||||
settings::{enforce_system_settings, load_or_init_system_settings},
|
||||
};
|
||||
use common::utils::embedding::{default_embedding_pool_size, EmbeddingProvider};
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{DbReady, EvaluationMachine, SlicePrepared},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
pub(crate) async fn prepare_db(
|
||||
machine: EvaluationMachine<(), SlicePrepared>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<DbReady> {
|
||||
pub(crate) async fn prepare_db(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::PrepareDb;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -36,19 +27,18 @@ pub(crate) async fn prepare_db(
|
||||
|
||||
let db = connect_eval_db(config, &namespace, &database).await?;
|
||||
|
||||
let (raw_openai_client, openai_base_url) =
|
||||
openai::build_client_from_env().context("building OpenAI client")?;
|
||||
let openai_client = Arc::new(raw_openai_client);
|
||||
let (openai_client, openai_base_url) =
|
||||
openai::ingestion_openai_client(config.ingest.include_entities)
|
||||
.context("building OpenAI client for ingestion")?;
|
||||
|
||||
// Create embedding provider directly from config (eval only supports FastEmbed and Hashed)
|
||||
let embedding_provider = match config.embedding_backend {
|
||||
crate::args::EmbeddingBackend::FastEmbed => EmbeddingProvider::new_fastembed(
|
||||
EmbeddingBackend::FastEmbed => EmbeddingProvider::new_fastembed(
|
||||
config.embedding_model.clone(),
|
||||
default_embedding_pool_size(),
|
||||
)
|
||||
.await
|
||||
.context("creating FastEmbed provider")?,
|
||||
crate::args::EmbeddingBackend::Hashed => {
|
||||
EmbeddingBackend::Hashed => {
|
||||
EmbeddingProvider::new_hashed(1536).context("creating Hashed provider")?
|
||||
}
|
||||
};
|
||||
@@ -68,12 +58,14 @@ pub(crate) async fn prepare_db(
|
||||
dimension = provider_dimension,
|
||||
"Embedding provider initialised"
|
||||
);
|
||||
info!(openai_base_url = %openai_base_url, "OpenAI client configured");
|
||||
if let Some(base_url) = &openai_base_url {
|
||||
info!(openai_base_url = %base_url, "OpenAI client configured for entity ingestion");
|
||||
}
|
||||
|
||||
let (mut settings, settings_missing) =
|
||||
load_or_init_system_settings(&db, provider_dimension).await?;
|
||||
|
||||
let embedding_cache = if config.embedding_backend == EmbeddingBackend::FastEmbed {
|
||||
if config.embedding_backend == EmbeddingBackend::FastEmbed {
|
||||
if let Some(model_code) = embedding_provider.model_code() {
|
||||
let sanitized = sanitize_model_code(&model_code);
|
||||
let path = config.cache_dir.join(format!("{sanitized}.json"));
|
||||
@@ -83,15 +75,8 @@ pub(crate) async fn prepare_db(
|
||||
.with_context(|| format!("removing stale cache {}", path.display()))
|
||||
.ok();
|
||||
}
|
||||
let cache = EmbeddingCache::load(&path).await?;
|
||||
info!(path = %path.display(), "Embedding cache ready");
|
||||
Some(cache)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
}
|
||||
|
||||
let must_reapply_settings = settings_missing;
|
||||
let defer_initial_enforce = settings_missing && !config.reseed_slice;
|
||||
@@ -104,9 +89,8 @@ pub(crate) async fn prepare_db(
|
||||
ctx.must_reapply_settings = must_reapply_settings;
|
||||
ctx.settings = Some(settings);
|
||||
ctx.embedding_provider = Some(embedding_provider);
|
||||
ctx.embedding_cache = embedding_cache;
|
||||
ctx.openai_client = Some(openai_client);
|
||||
ctx.openai_base_url = Some(openai_base_url);
|
||||
ctx.openai_base_url = openai_base_url;
|
||||
|
||||
let elapsed = started.elapsed();
|
||||
ctx.record_stage_duration(stage, elapsed);
|
||||
@@ -116,7 +100,5 @@ pub(crate) async fn prepare_db(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.prepare_db()
|
||||
.map_err(|(_, guard)| map_guard_error("prepare_db", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,25 +5,19 @@ use common::storage::types::system_settings::SystemSettings;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::{
|
||||
cases::cases_from_manifest,
|
||||
corpus,
|
||||
db_helpers::{recreate_indexes, remove_all_indexes, reset_namespace},
|
||||
eval::{
|
||||
can_reuse_namespace, cases_from_manifest, enforce_system_settings, ensure_eval_user,
|
||||
record_namespace_state, warm_hnsw_cache,
|
||||
db::{
|
||||
can_reuse_namespace, ensure_eval_user, record_namespace_seed, recreate_indexes,
|
||||
reset_namespace, warm_hnsw_cache,
|
||||
},
|
||||
settings::enforce_system_settings,
|
||||
};
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{CorpusReady, EvaluationMachine, NamespaceReady},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
#[allow(clippy::too_many_lines)]
|
||||
pub(crate) async fn prepare_namespace(
|
||||
machine: EvaluationMachine<(), CorpusReady>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<NamespaceReady> {
|
||||
pub(crate) async fn prepare_namespace(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::PrepareNamespace;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -32,7 +26,6 @@ pub(crate) async fn prepare_namespace(
|
||||
let started = Instant::now();
|
||||
|
||||
let config = ctx.config();
|
||||
let dataset = ctx.dataset();
|
||||
let expected_fingerprint = ctx
|
||||
.expected_fingerprint
|
||||
.as_deref()
|
||||
@@ -60,20 +53,16 @@ pub(crate) async fn prepare_namespace(
|
||||
|
||||
let mut namespace_reused = false;
|
||||
if !config.reseed_slice {
|
||||
namespace_reused = {
|
||||
let slice = ctx.slice()?;
|
||||
can_reuse_namespace(
|
||||
ctx.db()?,
|
||||
ctx.descriptor()?,
|
||||
&namespace,
|
||||
&database,
|
||||
dataset.metadata.id.as_str(),
|
||||
slice.manifest.slice_id.as_str(),
|
||||
expected_fingerprint.as_str(),
|
||||
requested_cases,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
namespace_reused = can_reuse_namespace(
|
||||
ctx.db()?,
|
||||
base_manifest,
|
||||
&embedding_provider,
|
||||
&namespace,
|
||||
&database,
|
||||
expected_fingerprint.as_str(),
|
||||
requested_cases,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
let mut namespace_seed_ms = None;
|
||||
@@ -114,34 +103,20 @@ pub(crate) async fn prepare_namespace(
|
||||
"Seeding ingestion corpus into SurrealDB"
|
||||
);
|
||||
}
|
||||
let indexes_disabled = remove_all_indexes(ctx.db()?).await.is_ok();
|
||||
|
||||
let seed_start = Instant::now();
|
||||
corpus::seed_manifest_into_db(ctx.db()?, &manifest_for_seed)
|
||||
.await
|
||||
.context("seeding ingestion corpus from manifest")?;
|
||||
namespace_seed_ms = Some(seed_start.elapsed().as_millis());
|
||||
|
||||
// Recreate indexes AFTER data is loaded (correct bulk loading pattern)
|
||||
if indexes_disabled {
|
||||
info!("Recreating indexes after seeding data");
|
||||
recreate_indexes(ctx.db()?, embedding_provider.dimension())
|
||||
.await
|
||||
.context("recreating indexes with correct dimension")?;
|
||||
warm_hnsw_cache(ctx.db()?, embedding_provider.dimension()).await?;
|
||||
}
|
||||
{
|
||||
let slice = ctx.slice()?;
|
||||
record_namespace_state(
|
||||
ctx.descriptor()?,
|
||||
dataset.metadata.id.as_str(),
|
||||
slice.manifest.slice_id.as_str(),
|
||||
expected_fingerprint.as_str(),
|
||||
&namespace,
|
||||
&database,
|
||||
requested_cases,
|
||||
)
|
||||
.await;
|
||||
info!("Recreating indexes after seeding data");
|
||||
recreate_indexes(ctx.db()?, embedding_provider.dimension())
|
||||
.await
|
||||
.context("recreating indexes with correct dimension")?;
|
||||
warm_hnsw_cache(ctx.db()?, embedding_provider.dimension()).await?;
|
||||
|
||||
if let Some(handle) = ctx.corpus_handle.as_mut() {
|
||||
record_namespace_seed(handle, &namespace, &database, requested_cases).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -198,7 +173,5 @@ pub(crate) async fn prepare_namespace(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.prepare_namespace()
|
||||
.map_err(|(_, guard)| map_guard_error("prepare_namespace", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -3,21 +3,11 @@ use std::time::Instant;
|
||||
use anyhow::Context;
|
||||
use tracing::info;
|
||||
|
||||
use crate::{
|
||||
eval::{default_database, default_namespace, ledger_target},
|
||||
slice,
|
||||
};
|
||||
use crate::{db::{default_database, default_namespace}, slice};
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{EvaluationMachine, Ready, SlicePrepared},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
pub(crate) async fn prepare_slice(
|
||||
machine: EvaluationMachine<(), Ready>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<SlicePrepared> {
|
||||
pub(crate) async fn prepare_slice(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::PrepareSlice;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -25,7 +15,7 @@ pub(crate) async fn prepare_slice(
|
||||
);
|
||||
let started = Instant::now();
|
||||
|
||||
let ledger_limit = ledger_target(ctx.config());
|
||||
let ledger_limit = slice::ledger_target(ctx.config());
|
||||
let slice_settings = slice::slice_config_with_limit(ctx.config(), ledger_limit);
|
||||
let resolved_slice =
|
||||
slice::resolve_slice(ctx.dataset(), &slice_settings).context("resolving dataset slice")?;
|
||||
@@ -49,7 +39,11 @@ pub(crate) async fn prepare_slice(
|
||||
.db_namespace
|
||||
.clone()
|
||||
.unwrap_or_else(|| {
|
||||
default_namespace(ctx.dataset().metadata.id.as_str(), ctx.config().limit)
|
||||
default_namespace(
|
||||
ctx.dataset().metadata.id.as_str(),
|
||||
ctx.config().limit,
|
||||
ctx.config().slice.as_deref(),
|
||||
)
|
||||
});
|
||||
ctx.database = ctx
|
||||
.config()
|
||||
@@ -66,7 +60,5 @@ pub(crate) async fn prepare_slice(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.prepare_slice()
|
||||
.map_err(|(_, guard)| map_guard_error("prepare_slice", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -5,9 +5,13 @@ use common::storage::types::StoredObject;
|
||||
use futures::stream::{self, StreamExt};
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::eval::{
|
||||
adapt_retrieval_output, build_case_diagnostics, text_contains_answer, CaseDiagnostics,
|
||||
CaseSummary, RetrievedSummary,
|
||||
use crate::{
|
||||
cases::SeededCase,
|
||||
context_stats,
|
||||
types::{
|
||||
adapt_retrieval_output, build_case_diagnostics, text_contains_answer, CaseDiagnostics,
|
||||
CaseSummary, RetrievedSummary,
|
||||
},
|
||||
};
|
||||
use retrieval_pipeline::{
|
||||
pipeline::{self, RetrievalConfig, StageTimings},
|
||||
@@ -15,17 +19,10 @@ use retrieval_pipeline::{
|
||||
};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{EvaluationMachine, NamespaceReady, QueriesFinished},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
#[allow(clippy::too_many_lines, clippy::arithmetic_side_effects)]
|
||||
pub(crate) async fn run_queries(
|
||||
machine: EvaluationMachine<(), NamespaceReady>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<QueriesFinished> {
|
||||
pub(crate) async fn run_queries(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::RunQueries;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -153,7 +150,7 @@ pub(crate) async fn run_queries(
|
||||
.await
|
||||
.context("acquiring query semaphore permit")?;
|
||||
|
||||
let crate::eval::SeededCase {
|
||||
let SeededCase {
|
||||
question_id,
|
||||
question,
|
||||
expected_source,
|
||||
@@ -197,6 +194,7 @@ pub(crate) async fn run_queries(
|
||||
let query_latency = query_start.elapsed().as_millis();
|
||||
|
||||
let candidates = adapt_retrieval_output(result_output);
|
||||
let retrieved_context = context_stats::stats_for_candidates(&candidates);
|
||||
let mut retrieved = Vec::new();
|
||||
let mut match_rank = None;
|
||||
let answers_lower: Vec<String> =
|
||||
@@ -288,6 +286,7 @@ pub(crate) async fn run_queries(
|
||||
reciprocal_rank: Some(reciprocal_rank),
|
||||
ndcg: Some(ndcg),
|
||||
latency_ms: query_latency,
|
||||
retrieved_context,
|
||||
retrieved,
|
||||
};
|
||||
|
||||
@@ -353,9 +352,7 @@ pub(crate) async fn run_queries(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.run_queries()
|
||||
.map_err(|(_, guard)| map_guard_error("run_queries", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[allow(clippy::arithmetic_side_effects, clippy::cast_precision_loss)]
|
||||
|
||||
@@ -3,25 +3,19 @@ use std::time::Instant;
|
||||
use chrono::Utc;
|
||||
use tracing::info;
|
||||
|
||||
use crate::eval::{
|
||||
use crate::types::{
|
||||
build_stage_latency_breakdown, compute_latency_stats, EvaluationSummary, PerformanceTimings,
|
||||
RetrievedContextStats,
|
||||
};
|
||||
|
||||
use super::super::{
|
||||
context::{EvalStage, EvaluationContext},
|
||||
state::{EvaluationMachine, QueriesFinished, Summarized},
|
||||
};
|
||||
use super::{map_guard_error, StageResult};
|
||||
use super::super::context::{EvalStage, EvaluationContext};
|
||||
|
||||
#[allow(
|
||||
clippy::too_many_lines,
|
||||
clippy::arithmetic_side_effects,
|
||||
clippy::cast_precision_loss
|
||||
)]
|
||||
pub(crate) async fn summarize(
|
||||
machine: EvaluationMachine<(), QueriesFinished>,
|
||||
ctx: &mut EvaluationContext<'_>,
|
||||
) -> StageResult<Summarized> {
|
||||
pub(crate) async fn summarize(ctx: &mut EvaluationContext<'_>) -> anyhow::Result<()> {
|
||||
let stage = EvalStage::Summarize;
|
||||
info!(
|
||||
evaluation_stage = stage.label(),
|
||||
@@ -123,6 +117,12 @@ pub(crate) async fn summarize(
|
||||
sum_ndcg / (retrieval_cases as f64)
|
||||
};
|
||||
|
||||
let per_query_context: Vec<RetrievedContextStats> = summaries
|
||||
.iter()
|
||||
.map(|summary| summary.retrieved_context)
|
||||
.collect();
|
||||
let retrieved_context = crate::context_stats::aggregate_context_stats(&per_query_context);
|
||||
|
||||
let active_tuning = ctx
|
||||
.retrieval_config
|
||||
.as_ref()
|
||||
@@ -133,7 +133,7 @@ pub(crate) async fn summarize(
|
||||
openai_base_url: ctx
|
||||
.openai_base_url
|
||||
.clone()
|
||||
.unwrap_or_else(|| "<unknown>".to_string()),
|
||||
.unwrap_or_else(|| "n/a (chunk-only ingestion)".to_string()),
|
||||
ingestion_ms: ctx.ingestion_duration_ms,
|
||||
namespace_seed_ms: ctx.namespace_seed_ms,
|
||||
evaluation_stage_ms: ctx.stage_timings.clone(),
|
||||
@@ -217,11 +217,12 @@ pub(crate) async fn summarize(
|
||||
chunk_rrf_use_fts: active_tuning.flags.chunk_rrf_use_fts.as_bool(),
|
||||
ingest_chunk_min_tokens: config.ingest.ingest_chunk_min_tokens,
|
||||
ingest_chunk_max_tokens: config.ingest.ingest_chunk_max_tokens,
|
||||
ingest_chunks_only: config.ingest.ingest_chunks_only,
|
||||
ingest_chunks_only: !config.ingest.include_entities,
|
||||
ingest_chunk_overlap_tokens: config.ingest.ingest_chunk_overlap_tokens,
|
||||
chunk_vector_take: active_tuning.chunk_vector_take,
|
||||
chunk_fts_take: active_tuning.chunk_fts_take,
|
||||
max_chunks_per_entity: active_tuning.max_chunks_per_entity,
|
||||
retrieved_context,
|
||||
cases: summaries,
|
||||
});
|
||||
|
||||
@@ -233,7 +234,5 @@ pub(crate) async fn summarize(
|
||||
"completed evaluation stage"
|
||||
);
|
||||
|
||||
machine
|
||||
.summarize()
|
||||
.map_err(|(_, guard)| map_guard_error("summarize", &guard))
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,31 +0,0 @@
|
||||
use state_machines::state_machine;
|
||||
|
||||
state_machine! {
|
||||
name: EvaluationMachine,
|
||||
state: EvaluationState,
|
||||
initial: Ready,
|
||||
states: [Ready, SlicePrepared, DbReady, CorpusReady, NamespaceReady, QueriesFinished, Summarized, Completed, Failed],
|
||||
events {
|
||||
prepare_slice { transition: { from: Ready, to: SlicePrepared } }
|
||||
prepare_db { transition: { from: SlicePrepared, to: DbReady } }
|
||||
prepare_corpus { transition: { from: DbReady, to: CorpusReady } }
|
||||
prepare_namespace { transition: { from: CorpusReady, to: NamespaceReady } }
|
||||
run_queries { transition: { from: NamespaceReady, to: QueriesFinished } }
|
||||
summarize { transition: { from: QueriesFinished, to: Summarized } }
|
||||
finalize { transition: { from: Summarized, to: Completed } }
|
||||
abort {
|
||||
transition: { from: Ready, to: Failed }
|
||||
transition: { from: SlicePrepared, to: Failed }
|
||||
transition: { from: DbReady, to: Failed }
|
||||
transition: { from: CorpusReady, to: Failed }
|
||||
transition: { from: NamespaceReady, to: Failed }
|
||||
transition: { from: QueriesFinished, to: Failed }
|
||||
transition: { from: Summarized, to: Failed }
|
||||
transition: { from: Completed, to: Failed }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn ready() -> EvaluationMachine<(), Ready> {
|
||||
EvaluationMachine::new(())
|
||||
}
|
||||
Reference in New Issue
Block a user