chore: git-hooks rustfmt and clippy

This commit is contained in:
Per Stark
2026-06-20 10:10:29 +02:00
parent 01ef1bcb7a
commit 09e545816e
29 changed files with 156 additions and 201 deletions
+3 -4
View File
@@ -2,10 +2,9 @@
## Unreleased
- Refactor: deduplicated test database setup across common/src/storage/ types by routing remaining inline SurrealDbClient::memory() calls through shared setup_test_db(), prepare_text_chunk_test_db(), and prepare_knowledge_entity_test_db() helpers; removed redundant apply_migrations() calls after setup_test_db() and collapsed configure_embedding_dimension + redefine_hnsw_index triplication into prepare_**test_db helpers; extracted generic ensure_fts_index helper for FTS index bootstrap replacing duplicated per-table ensure**_fts_indexes helpers
- Refactor: split knowledge-graph.js monolith into focused functions (loadGraphData, buildSvg, createSimulation, drawLinks/Nodes/Labels, createHighlighting, createZoom, attachResize); fixed dead duplicate zoom instance
- Refactor: extracted rubberbanding scroll logic in design-polish.js into standalone attachRubberbanding helper; removed dead pullDistance state
- Fix: added pre-commit hooks to further maintain code consistency.
- Refactor: deduplicated test database setup across common/src/storage/.
- Refactor: split knowledge-graph.js monolith into focused functions.
- Evaluations: simplified crate layout — linear pipeline, sharded-only converted store, in-memory ingestion, `db/` and `cli/` modules; namespace reuse state in corpus manifest (removed `cache/snapshots/`); no legacy JSON/history compatibility (re-run `--warm` after upgrade)
- Performance: ingestion skips per-task index rebuild; worker runs scheduled `REBUILD INDEX` (default every 24h via `index_rebuild_interval_secs`, `0` disables)
- Performance: ingestion persists all artifacts in a single SurrealDB transaction per task (atomic replace by task id)
+14 -14
View File
@@ -3,10 +3,10 @@
"devenv": {
"locked": {
"dir": "src/modules",
"lastModified": 1771066302,
"lastModified": 1781800860,
"owner": "cachix",
"repo": "devenv",
"rev": "1b355dec9bddbaddbe4966d6fc30d7aa3af8575b",
"rev": "d59d872d80876d9eeb3e214d3b088bc4a14a9c4f",
"type": "github"
},
"original": {
@@ -22,10 +22,10 @@
"rust-analyzer-src": "rust-analyzer-src"
},
"locked": {
"lastModified": 1771052630,
"lastModified": 1781779700,
"owner": "nix-community",
"repo": "fenix",
"rev": "d0555da98576b8611c25df0c208e51e9a182d95f",
"rev": "ad30e585c7a2917325943c2b19511f5a249eff53",
"type": "github"
},
"original": {
@@ -58,10 +58,10 @@
]
},
"locked": {
"lastModified": 1770726378,
"lastModified": 1781733627,
"owner": "cachix",
"repo": "git-hooks.nix",
"rev": "5eaaedde414f6eb1aea8b8525c466dc37bba95ae",
"rev": "3bbec39bc90eadfa031e6f3b77272f3f60803e39",
"type": "github"
},
"original": {
@@ -92,10 +92,10 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1771008912,
"lastModified": 1781577229,
"owner": "nixos",
"repo": "nixpkgs",
"rev": "a82ccc39b39b621151d6732718e3e250109076fa",
"rev": "567a49d1913ce81ac6e9582e3553dd90a955875f",
"type": "github"
},
"original": {
@@ -107,10 +107,10 @@
},
"nixpkgs_2": {
"locked": {
"lastModified": 1770843696,
"lastModified": 1781607440,
"owner": "nixos",
"repo": "nixpkgs",
"rev": "2343bbb58f99267223bc2aac4fc9ea301a155a16",
"rev": "3e41b24abd260e8f71dbe2f5737d24122f972158",
"type": "github"
},
"original": {
@@ -135,10 +135,10 @@
"rust-analyzer-src": {
"flake": false,
"locked": {
"lastModified": 1771007332,
"lastModified": 1781714865,
"owner": "rust-lang",
"repo": "rust-analyzer",
"rev": "bbc84d335fbbd9b3099d3e40c7469ee57dbd1873",
"rev": "abb1301c3c14a40645bb2588b1cc858fe374b527",
"type": "github"
},
"original": {
@@ -155,10 +155,10 @@
]
},
"locked": {
"lastModified": 1771038269,
"lastModified": 1781850613,
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "d7a86c8a4df49002446737603a3e0d7ef91a9637",
"rev": "4baecb43a008cd004e5220a777e1724bd8d43e43",
"type": "github"
},
"original": {
+17 -4
View File
@@ -4,19 +4,32 @@
config,
inputs,
...
}:
let
}: let
ortVersion = lib.removeSuffix "\n" (builtins.readFile "${toString ./.}/ort-version");
_ortVersionCheck =
if pkgs.onnxruntime.version == ortVersion
then null
else
throw "pkgs.onnxruntime.version (${pkgs.onnxruntime.version}) must match ort-version (${ortVersion})";
else throw "pkgs.onnxruntime.version (${pkgs.onnxruntime.version}) must match ort-version (${ortVersion})";
in {
devenv.warnOnNewVersion = false;
cachix.enable = false;
git-hooks.install.enable = true;
git-hooks.hooks = {
rustfmt.enable = true;
clippy = {
enable = true;
settings.allFeatures = true;
};
};
# Use pinned Rust toolchain from languages.rust for git-hooks wrappers
# (git-hooks.nix defaults to nixpkgs's cargo/clippy/rustfmt, ignoring the pin)
git-hooks.tools.cargo = lib.mkDefault config.languages.rust.toolchain.cargo;
git-hooks.tools.clippy = lib.mkDefault config.languages.rust.toolchain.clippy;
git-hooks.tools.rustfmt = lib.mkDefault config.languages.rust.toolchain.rustfmt;
packages = [
pkgs.openssl
pkgs.nodejs
+3
View File
@@ -9,3 +9,6 @@ inputs:
nixpkgs:
follows: nixpkgs
allowUnfree: true
nixpkgs:
permittedInsecurePackages:
- "minio-2025-10-15T17-29-55Z"
+24 -29
View File
@@ -103,10 +103,16 @@ pub async fn collect_status(config: &Config) -> Result<EvalStatus> {
ready: slice_manifest
.as_ref()
.is_some_and(|manifest| slice::manifest_is_complete(manifest, &slice_config)),
path: manifest_path.as_ref().map(|path| path.display().to_string()),
path: manifest_path
.as_ref()
.map(|path| path.display().to_string()),
cases: slice_manifest.as_ref().map(|manifest| manifest.case_count),
positives: slice_manifest.as_ref().map(|manifest| manifest.positive_paragraphs),
negatives: slice_manifest.as_ref().map(|manifest| manifest.negative_paragraphs),
positives: slice_manifest
.as_ref()
.map(|manifest| manifest.positive_paragraphs),
negatives: slice_manifest
.as_ref()
.map(|manifest| manifest.negative_paragraphs),
};
let beir_paragraph_ids = slice_manifest.as_ref().map(|manifest| {
@@ -159,17 +165,9 @@ pub async fn collect_status(config: &Config) -> Result<EvalStatus> {
}
};
let namespace = config
.database
.db_namespace
.clone()
.unwrap_or_else(|| {
default_namespace(
config.dataset.id(),
config.limit,
config.slice.as_deref(),
)
});
let namespace = config.database.db_namespace.clone().unwrap_or_else(|| {
default_namespace(config.dataset.id(), config.limit, config.slice.as_deref())
});
let database = config
.database
.db_database
@@ -183,16 +181,17 @@ pub async fn collect_status(config: &Config) -> Result<EvalStatus> {
.and_then(|manifest| manifest.metadata.namespace_seed)
});
let (seeded, namespace_seed_recorded) = match connect_eval_db(config, &namespace, &database).await {
Ok(db) => {
let has_corpus = namespace_has_corpus(&db).await.unwrap_or(false);
(has_corpus, namespace_seed.is_some())
}
Err(err) => {
notes.push(format!("SurrealDB unavailable: {err}"));
(false, false)
}
};
let (seeded, namespace_seed_recorded) =
match connect_eval_db(config, &namespace, &database).await {
Ok(db) => {
let has_corpus = namespace_has_corpus(&db).await.unwrap_or(false);
(has_corpus, namespace_seed.is_some())
}
Err(err) => {
notes.push(format!("SurrealDB unavailable: {err}"));
(false, false)
}
};
let query_ready = converted_ready
&& slice_ledger.ready
@@ -281,11 +280,7 @@ pub fn print_status(status: &EvalStatus) {
);
println!(
"Query-ready: {}",
if status.query_ready {
"yes"
} else {
"no"
}
if status.query_ready { "yes" } else { "no" }
);
for note in &status.notes {
println!("Note: {note}");
+1 -4
View File
@@ -701,10 +701,7 @@ mod tests {
ConvertedDataset {
generated_at: Utc::now(),
metadata: crate::datasets::DatasetMetadata::for_kind(
DatasetKind::default(),
false,
),
metadata: crate::datasets::DatasetMetadata::for_kind(DatasetKind::default(), false),
source: "src".to_string(),
paragraphs: vec![paragraph],
}
+2 -5
View File
@@ -10,11 +10,8 @@ use chrono::{DateTime, Utc};
use common::storage::{
db::SurrealDbClient,
types::{
knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk,
text_content::TextContent,
StoredObject,
knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk, text_content::TextContent, StoredObject,
},
};
use ingestion_pipeline::{persist_artifacts, IngestionTuning, PipelineArtifacts};
+1 -3
View File
@@ -190,9 +190,7 @@ pub fn convert_beir_documents(
pub fn corpus_doc_id(paragraph_id: &str, dataset: DatasetKind) -> Option<String> {
let prefix = format!("{}-", dataset.source_prefix());
paragraph_id
.strip_prefix(&prefix)
.map(str::to_string)
paragraph_id.strip_prefix(&prefix).map(str::to_string)
}
fn resolve_qrels_path(raw_dir: &Path) -> Result<PathBuf> {
+15 -19
View File
@@ -11,12 +11,9 @@ use super::{
self, build_dataset_from_catalog, paragraph_path, read_meta, store_dir_for,
upsert_sharded_paragraphs, write_sharded,
},
BEIR_DATASETS, ConvertedDataset, DatasetKind, DatasetMetadata,
};
use crate::{
args::Config,
slice,
ConvertedDataset, DatasetKind, DatasetMetadata, BEIR_DATASETS,
};
use crate::{args::Config, slice};
pub fn subset_for_paragraph_id(paragraph_id: &str) -> Option<DatasetKind> {
let mut kinds: Vec<DatasetKind> = BEIR_DATASETS.to_vec();
@@ -53,9 +50,8 @@ pub fn build_beir_mix_qrels_dataset(include_unanswerable: bool) -> Result<Conver
pub fn prepare_beir_mix(config: &Config) -> Result<super::loader::LoadedDataset> {
let virtual_ds = build_beir_mix_qrels_dataset(config.llm_mode)?;
let slice_config = slice::slice_config_with_limit(config, slice::ledger_target(config));
let resolved = slice::resolve_slice(&virtual_ds, &slice_config).context(
"resolving BEIR mix slice ledger (check --slice and --limit match your intent)",
)?;
let resolved = slice::resolve_slice(&virtual_ds, &slice_config)
.context("resolving BEIR mix slice ledger (check --slice and --limit match your intent)")?;
let unique: HashSet<String> = resolved
.manifest
@@ -83,16 +79,16 @@ pub fn prepare_beir_mix(config: &Config) -> Result<super::loader::LoadedDataset>
})
}
pub fn materialize_subset_stores(
paragraph_ids: &HashSet<String>,
force: bool,
) -> Result<()> {
pub fn materialize_subset_stores(paragraph_ids: &HashSet<String>, force: bool) -> Result<()> {
let mut by_subset: HashMap<DatasetKind, Vec<String>> = HashMap::new();
for paragraph_id in paragraph_ids {
let kind = subset_for_paragraph_id(paragraph_id).with_context(|| {
format!("routing BEIR mix paragraph id '{paragraph_id}' to subset store")
})?;
by_subset.entry(kind).or_default().push(paragraph_id.clone());
by_subset
.entry(kind)
.or_default()
.push(paragraph_id.clone());
}
for (kind, ids) in by_subset {
@@ -120,11 +116,7 @@ pub fn materialize_subset_stores(
.iter()
.filter_map(|paragraph_id| beir::corpus_doc_id(paragraph_id, kind))
.collect();
let paragraphs = beir::convert_beir_documents(
&entry.raw_path,
kind,
Some(&corpus_ids),
)?;
let paragraphs = beir::convert_beir_documents(&entry.raw_path, kind, Some(&corpus_ids))?;
if store_dir.join("meta.json").is_file() {
upsert_sharded_paragraphs(&store_dir, &paragraphs)?;
@@ -233,7 +225,11 @@ pub fn beir_subset_store_summary() -> Result<Vec<(String, usize, usize)>> {
let store_dir = store_dir_for(&entry.converted_path);
if store_dir.join("meta.json").is_file() {
let meta = read_meta(&store_dir)?;
summary.push((kind.id().to_string(), meta.paragraph_count, meta.question_count));
summary.push((
kind.id().to_string(),
meta.paragraph_count,
meta.question_count,
));
}
}
Ok(summary)
+6 -3
View File
@@ -56,8 +56,8 @@ impl ChecksumSidecar {
#[allow(clippy::indexing_slicing)]
pub fn hash_file(path: &Path) -> Result<String> {
let mut file =
File::open(path).with_context(|| format!("opening file {} for checksum", path.display()))?;
let mut file = File::open(path)
.with_context(|| format!("opening file {} for checksum", path.display()))?;
let mut hasher = Sha256::new();
let mut buffer = vec![0u8; 65_536];
loop {
@@ -176,7 +176,10 @@ fn collect_store_files(base: &Path, current: &Path, entries: &mut Vec<String>) -
for entry in fs::read_dir(current)? {
let entry = entry?;
let path = entry.path();
if path.file_name().is_some_and(|name| name == "checksum.sha256") {
if path
.file_name()
.is_some_and(|name| name == "checksum.sha256")
{
continue;
}
if path.is_dir() {
+1 -3
View File
@@ -186,9 +186,7 @@ fn slice_config_for_catalog_entry<'a>(
limit: slice_entry.limit,
corpus_limit: slice_entry.corpus_limit,
slice_seed: slice_entry.seed.unwrap_or(config.slice_seed),
llm_mode: slice_entry
.include_unanswerable
.unwrap_or(config.llm_mode),
llm_mode: slice_entry.include_unanswerable.unwrap_or(config.llm_mode),
negative_multiplier: slice_entry
.negative_multiplier
.unwrap_or(config.negative_multiplier),
+1 -3
View File
@@ -222,10 +222,8 @@ fn resolve_path(root: &Path, value: &str) -> PathBuf {
}
}
pub use beir_mix::{beir_subset_store_summary, beir_subset_stores_ready, mix_content_checksum};
pub use checksum::store_aggregate_checksum;
pub use beir_mix::{
beir_subset_store_summary, beir_subset_stores_ready, mix_content_checksum,
};
pub use loader::{prebuild_catalog_slices, prepare_dataset};
pub use store::{
content_checksum_for_layout, detect_layout, store_dir_for, write_sharded, ConvertedLayout,
+14 -12
View File
@@ -11,8 +11,8 @@ use serde::{Deserialize, Serialize};
use tracing::info;
use super::{
checksum::store_aggregate_checksum,
ConvertedDataset, ConvertedParagraph, ConvertedQuestion, DatasetMetadata,
checksum::store_aggregate_checksum, ConvertedDataset, ConvertedParagraph, ConvertedQuestion,
DatasetMetadata,
};
use crate::slice;
@@ -50,11 +50,10 @@ pub fn store_dir_for(converted_path: &Path) -> PathBuf {
converted_path
.parent()
.unwrap_or_else(|| Path::new("."))
.join(
converted_path
.file_stem()
.map_or_else(|| "dataset".to_string(), |stem| stem.to_string_lossy().into()),
)
.join(converted_path.file_stem().map_or_else(
|| "dataset".to_string(),
|stem| stem.to_string_lossy().into(),
))
}
pub fn detect_layout(converted_path: &Path) -> ConvertedLayout {
@@ -167,8 +166,8 @@ pub fn content_checksum_for_layout(converted_path: &Path) -> Result<String> {
fn load_paragraph(store_dir: &Path, paragraph_id: &str) -> Result<ConvertedParagraph> {
let path = paragraph_path(store_dir, paragraph_id);
let raw = fs::read(&path)
.with_context(|| format!("reading sharded paragraph {}", path.display()))?;
let raw =
fs::read(&path).with_context(|| format!("reading sharded paragraph {}", path.display()))?;
serde_json::from_slice(&raw)
.with_context(|| format!("parsing sharded paragraph {}", path.display()))
}
@@ -180,7 +179,10 @@ fn load_paragraphs(store_dir: &Path, paragraph_ids: &[String]) -> Result<Vec<Con
.collect()
}
pub fn load_sharded_partial(store_dir: &Path, paragraph_ids: &[String]) -> Result<ConvertedDataset> {
pub fn load_sharded_partial(
store_dir: &Path,
paragraph_ids: &[String],
) -> Result<ConvertedDataset> {
let meta = read_meta(store_dir)?;
let mut paragraphs = load_paragraphs(store_dir, paragraph_ids)?;
paragraphs.sort_by(|left, right| left.id.cmp(&right.id));
@@ -333,8 +335,8 @@ pub fn load_question_catalog(store_dir: &Path) -> Result<QuestionCatalog> {
if line.trim().is_empty() {
continue;
}
let record: QuestionRecord = serde_json::from_str(&line)
.context("parsing question catalog record")?;
let record: QuestionRecord =
serde_json::from_str(&line).context("parsing question catalog record")?;
entries.push(record);
}
Ok(QuestionCatalog { entries })
+1 -2
View File
@@ -132,8 +132,7 @@ pub(crate) async fn can_reuse_namespace(
if seed.namespace != namespace || seed.database != database {
info!(
namespace,
database,
"Corpus manifest namespace metadata mismatch; reseeding"
database, "Corpus manifest namespace metadata mismatch; reseeding"
);
return Ok(false);
}
+1 -1
View File
@@ -5,5 +5,5 @@ pub(crate) use connect::{
can_reuse_namespace, connect_eval_db, default_database, default_namespace, ensure_eval_user,
namespace_has_corpus, record_namespace_seed, sanitize_model_code,
};
pub use lifecycle::{recreate_indexes, reset_namespace};
pub(crate) use lifecycle::warm_hnsw_cache;
pub use lifecycle::{recreate_indexes, reset_namespace};
+4 -8
View File
@@ -1,8 +1,4 @@
use std::{
collections::HashMap,
fs,
path::Path,
};
use std::{collections::HashMap, fs, path::Path};
use anyhow::{anyhow, Context, Result};
use common::storage::{db::SurrealDbClient, types::text_chunk::TextChunk};
@@ -72,9 +68,9 @@ pub async fn inspect_question(config: &Config) -> Result<()> {
MissingChunks::None => println!(
"All matching_chunk_ids exist in namespace '{ns}', database '{db_name}'"
),
MissingChunks::Missing(list) => println!(
"Missing chunks in namespace '{ns}', database '{db_name}': {list:?}"
),
MissingChunks::Missing(list) => {
println!("Missing chunks in namespace '{ns}', database '{db_name}': {list:?}");
}
},
Err(err) => {
println!(
+5 -9
View File
@@ -1,7 +1,7 @@
mod args;
mod context_stats;
mod cases;
mod cli;
mod context_stats;
mod corpus;
mod datasets;
mod db;
@@ -129,10 +129,7 @@ async fn async_main() -> anyhow::Result<()> {
let store_dir = datasets::store_dir_for(&parsed.config.converted_dataset_path);
datasets::write_sharded(&dataset, &store_dir)?;
datasets::prebuild_catalog_slices(&dataset, &parsed.config)?;
println!(
"Converted dataset written under {}",
store_dir.display()
);
println!("Converted dataset written under {}", store_dir.display());
return Ok(());
}
@@ -141,14 +138,13 @@ async fn async_main() -> anyhow::Result<()> {
}
info!(dataset = dataset_kind.id(), "Preparing converted dataset");
let loaded = crate::datasets::prepare_dataset(dataset_kind, &parsed.config).with_context(
|| {
let loaded =
crate::datasets::prepare_dataset(dataset_kind, &parsed.config).with_context(|| {
format!(
"preparing converted dataset at {}",
parsed.config.converted_dataset_path.display()
)
},
)?;
})?;
info!(
questions = loaded
+1 -4
View File
@@ -14,10 +14,7 @@ pub fn ingestion_openai_client(
)?;
Ok((Arc::new(client), Some(base_url)))
} else {
Ok((
Arc::new(Client::with_config(OpenAIConfig::default())),
None,
))
Ok((Arc::new(Client::with_config(OpenAIConfig::default())), None))
}
}
+4 -1
View File
@@ -91,7 +91,10 @@ fn format_duration(value: Option<u128>) -> String {
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{EvaluationStageTimings, PerformanceTimings, LatencyStats, StageLatency, StageLatencyBreakdown};
use crate::types::{
EvaluationStageTimings, LatencyStats, PerformanceTimings, StageLatency,
StageLatencyBreakdown,
};
use chrono::Utc;
use tempfile::tempdir;
+2 -11
View File
@@ -26,12 +26,7 @@ pub async fn warm_evaluation(
config: &Config,
content_checksum: &str,
) -> Result<()> {
let _ctx = run_through_namespace(
dataset,
config,
Some(content_checksum.to_string()),
)
.await?;
let _ctx = run_through_namespace(dataset, config, Some(content_checksum.to_string())).await?;
Ok(())
}
@@ -40,11 +35,7 @@ pub async fn run_evaluation(
config: &Config,
content_checksum: Option<&str>,
) -> Result<EvaluationSummary> {
let mut ctx = EvaluationContext::new(
dataset,
config,
content_checksum.map(str::to_string),
);
let mut ctx = EvaluationContext::new(dataset, config, content_checksum.map(str::to_string));
stages::prepare_slice(&mut ctx).await?;
stages::prepare_db(&mut ctx).await?;
stages::prepare_corpus(&mut ctx).await?;
@@ -3,7 +3,10 @@ use std::time::Instant;
use anyhow::Context;
use tracing::info;
use crate::{db::{default_database, default_namespace}, slice};
use crate::{
db::{default_database, default_namespace},
slice,
};
use super::super::context::{EvalStage, EvaluationContext};
+7 -12
View File
@@ -14,9 +14,7 @@ use tracing::{info, warn};
use crate::{
args::Config,
datasets::{
ConvertedDataset, ConvertedParagraph, ConvertedQuestion, DatasetKind,
},
datasets::{ConvertedDataset, ConvertedParagraph, ConvertedQuestion, DatasetKind},
};
mod beir;
@@ -244,8 +242,7 @@ pub fn resolve_slice<'a>(
);
return Ok(resolved);
}
let resolved =
materialize_slice_ledger(dataset, config, &index, slice_arg, path)?;
let resolved = materialize_slice_ledger(dataset, config, &index, slice_arg, path)?;
info!(
slice = %resolved.manifest.slice_id,
path = %resolved.path.display(),
@@ -927,10 +924,7 @@ pub fn cached_manifest_path(config: &crate::args::Config) -> Option<PathBuf> {
}
pub fn manifest_is_complete(manifest: &SliceManifest, config: &SliceConfig<'_>) -> bool {
let requested_limit = config
.limit
.unwrap_or(manifest.case_count.max(1))
.max(1);
let requested_limit = config.limit.unwrap_or(manifest.case_count.max(1)).max(1);
if manifest.case_count < requested_limit {
return false;
}
@@ -942,7 +936,9 @@ pub fn manifest_is_complete(manifest: &SliceManifest, config: &SliceConfig<'_>)
let desired_negatives = desired_negative_target(
manifest.positive_paragraphs,
requested_corpus,
manifest.total_paragraphs.max(manifest.positive_paragraphs.max(1)),
manifest
.total_paragraphs
.max(manifest.positive_paragraphs.max(1)),
config.negative_multiplier,
);
manifest.negative_paragraphs >= desired_negatives
@@ -978,8 +974,7 @@ pub fn ledger_target(config: &Config) -> Option<usize> {
pub fn grow_slice(dataset: &ConvertedDataset, config: &Config) -> Result<()> {
let ledger_limit = ledger_target(config);
let slice_settings = slice_config_with_limit(config, ledger_limit);
let slice =
resolve_slice(dataset, &slice_settings).context("resolving dataset slice")?;
let slice = resolve_slice(dataset, &slice_settings).context("resolving dataset slice")?;
info!(
slice = slice.manifest.slice_id.as_str(),
cases = slice.manifest.case_count,
+1 -4
View File
@@ -109,10 +109,7 @@ impl<'a> PipelineContext<'a> {
let content = self.take_text_content()?;
let analysis = self.take_analysis()?;
let (entities, relationships) = self
.services
.convert_analysis(&content, &analysis)
.await?;
let (entities, relationships) = self.services.convert_analysis(&content, &analysis).await?;
let chunk_range = self.chunk_token_range();
let chunk_overlap = self.chunk_overlap_tokens();
+5 -17
View File
@@ -186,11 +186,7 @@ impl IngestionPipeline {
}
async fn artifacts_persisted(&self, task_id: &str) -> Result<bool, AppError> {
Ok(self
.db
.get_item::<TextContent>(task_id)
.await?
.is_some())
Ok(self.db.get_item::<TextContent>(task_id).await?.is_some())
}
async fn finalize_succeeded(&self, task: &IngestionTask) -> Result<(), AppError> {
@@ -379,8 +375,7 @@ mod finalize_tests {
persist_max_backoff_ms: 10,
..IngestionTuning::default()
};
let pipeline =
IngestionPipeline::with_services(Arc::new(db.clone()), config, services)?;
let pipeline = IngestionPipeline::with_services(Arc::new(db.clone()), config, services)?;
let task = reserve_task(
&db,
@@ -397,9 +392,7 @@ mod finalize_tests {
let processing = task.mark_processing(&db).await?;
db.client
.query(
"UPDATE type::thing('ingestion_task', $id) SET worker_id = $wrong_worker;",
)
.query("UPDATE type::thing('ingestion_task', $id) SET worker_id = $wrong_worker;")
.bind(("id", processing.id.clone()))
.bind(("wrong_worker", "wrong-worker"))
.await?;
@@ -410,9 +403,7 @@ mod finalize_tests {
sleep(Duration::from_millis(5)).await;
let _ = db_fix
.client
.query(
"UPDATE type::thing('ingestion_task', $id) SET worker_id = $worker_id;",
)
.query("UPDATE type::thing('ingestion_task', $id) SET worker_id = $worker_id;")
.bind(("id", task_id))
.bind(("worker_id", worker_id))
.await;
@@ -420,10 +411,7 @@ mod finalize_tests {
pipeline.finalize_succeeded(&processing).await?;
let stored: IngestionTask = db
.get_item(&processing.id)
.await?
.context("task stored")?;
let stored: IngestionTask = db.get_item(&processing.id).await?.context("task stored")?;
assert_eq!(stored.state, TaskState::Succeeded);
Ok(())
@@ -133,15 +133,15 @@ pub fn large_artifacts(
}
}
pub async fn persist(
db: &SurrealDbClient,
artifacts: PipelineArtifacts,
) -> Result<(), AppError> {
pub async fn persist(db: &SurrealDbClient, artifacts: PipelineArtifacts) -> Result<(), AppError> {
persist_artifacts(db, &tuning(), TEST_EMBEDDING_DIM, artifacts).await?;
Ok(())
}
pub async fn count_chunks_for_source(db: &SurrealDbClient, source_id: &str) -> anyhow::Result<usize> {
pub async fn count_chunks_for_source(
db: &SurrealDbClient,
source_id: &str,
) -> anyhow::Result<usize> {
let chunks: Vec<TextChunk> = db
.client
.query("SELECT * FROM text_chunk WHERE source_id = $source_id;")
+11 -13
View File
@@ -1,5 +1,15 @@
use std::sync::Arc;
use super::{
config::{IngestionConfig, IngestionTuning},
enrichment_result::LLMEnrichmentResult,
services::PipelineServices,
test_support::{
count_chunks_for_source, count_entities_for_source, count_relationships_for_source,
persist, sample_artifacts, setup_db,
},
IngestionPipeline,
};
use crate::pipeline::context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk};
use anyhow::{self, Context};
use async_trait::async_trait;
@@ -20,16 +30,6 @@ use common::{
};
use retrieval_pipeline::{RetrievedChunk, RetrievedEntity};
use tokio::sync::Mutex;
use super::{
config::{IngestionConfig, IngestionTuning},
enrichment_result::LLMEnrichmentResult,
services::PipelineServices,
test_support::{
count_chunks_for_source, count_entities_for_source, count_relationships_for_source,
persist, sample_artifacts, setup_db,
},
IngestionPipeline,
};
pub(crate) struct MockServices {
text_content: TextContent,
@@ -221,9 +221,7 @@ impl PipelineServices for FailingServices {
content: &TextContent,
analysis: &LLMEnrichmentResult,
) -> Result<(Vec<EmbeddedKnowledgeEntity>, Vec<KnowledgeRelationship>), AppError> {
self.inner
.convert_analysis(content, analysis)
.await
self.inner.convert_analysis(content, analysis).await
}
async fn prepare_chunks(
+1 -1
View File
@@ -219,7 +219,7 @@ fn add_char_into_object(
}
}
(&Value::Bool(true) | &Value::Bool(false), &ObjectStatus::Scalar { .. }, 'e')
| (&Value::Object(_), &ObjectStatus::ValueQuoteClose, '}') => {
| (&Value::Object(_), &ObjectStatus::ValueQuoteClose, '}') => {
*current_status = ObjectStatus::Closed;
}
+1 -6
View File
@@ -60,12 +60,7 @@ async fn main() -> anyhow::Result<()> {
worker_embedding,
)?);
run_worker_loop(
worker_db,
ingestion_pipeline,
index_rebuild_interval_secs,
)
.await
run_worker_loop(worker_db, ingestion_pipeline, index_rebuild_interval_secs).await
});
tokio::select! {
+1 -3
View File
@@ -74,9 +74,7 @@ mod tests {
let db = Arc::clone(&services.db);
let pipeline = Arc::new(pipeline);
let worker =
tokio::spawn(async move {
ingestion_pipeline::run_worker_loop(db, pipeline, 0).await
});
tokio::spawn(async move { ingestion_pipeline::run_worker_loop(db, pipeline, 0).await });
tokio::time::sleep(Duration::from_millis(250)).await;
assert!(