diff --git a/CHANGELOG.md b/CHANGELOG.md index f088ea6..5408f44 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,11 @@ # Changelog ## Unreleased - Performance: ingestion skips per-task index rebuild; worker runs scheduled `REBUILD INDEX` (default every 24h via `index_rebuild_interval_secs`, `0` disables) +- Performance: ingestion persists all artifacts in a single SurrealDB transaction per task (atomic replace by task id) +- Fix: ingestion reclaims tasks after a successful persist without re-running the pipeline when `mark_succeeded` failed +- Fix: content deletion clears graph relationships via shared `TextContent::clear_ingested_children` - Fix: regression re suggestion of relationships +- Internal: eval corpus DB seed uses `persist_artifacts` instead of a separate batched insert path ## 1.0.3 (2026-06-12) - Search: filter results by type — knowledge entities, ingested content, or both diff --git a/common/src/storage/db.rs b/common/src/storage/db.rs index 3d84e07..c48ed32 100644 --- a/common/src/storage/db.rs +++ b/common/src/storage/db.rs @@ -150,7 +150,7 @@ impl SurrealDbClient { /// Upsert an object in SurrealDB, replacing any existing record with the same ID. /// - /// Useful for idempotent ingestion flows. + /// Useful when a single record should be replaced by id (admin updates, embedding rows, etc.). /// /// # Errors /// diff --git a/common/src/storage/types/knowledge_entity.rs b/common/src/storage/types/knowledge_entity.rs index 9683b41..76ef2e4 100644 --- a/common/src/storage/types/knowledge_entity.rs +++ b/common/src/storage/types/knowledge_entity.rs @@ -245,8 +245,9 @@ impl KnowledgeEntity { Ok(()) } - /// Atomically store a knowledge entity and its embedding. - /// Writes the entity to `knowledge_entity` and the embedding to `knowledge_entity_embedding`. + /// Atomically store one knowledge entity and its embedding (single-record path). + /// + /// Bulk ingestion uses `ingestion_pipeline::persist_artifacts` instead. pub async fn store_with_embedding( entity: KnowledgeEntity, embedding: Vec, diff --git a/common/src/storage/types/text_chunk.rs b/common/src/storage/types/text_chunk.rs index 0096bab..7d1bbdd 100644 --- a/common/src/storage/types/text_chunk.rs +++ b/common/src/storage/types/text_chunk.rs @@ -61,8 +61,9 @@ impl TextChunk { Ok(()) } - /// Atomically store a text chunk and its embedding. - /// Writes the chunk to `text_chunk` and the embedding to `text_chunk_embedding`. + /// Atomically store one text chunk and its embedding (single-record path). + /// + /// Bulk ingestion uses `ingestion_pipeline::persist_artifacts` instead. pub async fn store_with_embedding( chunk: TextChunk, embedding: Vec, diff --git a/common/src/storage/types/text_content.rs b/common/src/storage/types/text_content.rs index f1f1af0..df81484 100644 --- a/common/src/storage/types/text_content.rs +++ b/common/src/storage/types/text_content.rs @@ -96,6 +96,41 @@ impl TextContent { } } + /// SurrealQL deletes for ingested child rows keyed by `source_id` (no transaction wrapper). + /// + /// Used inside larger transactions (e.g. ingestion `persist_artifacts`) and mirrored by + /// [`Self::clear_ingested_children`]. + pub const CLEAR_INGESTED_CHILD_ROWS_SURQL: &'static str = r" +DELETE relates_to WHERE metadata.source_id = $source_id AND metadata.user_id = $user_id; +DELETE text_chunk_embedding WHERE source_id = $source_id; +DELETE text_chunk WHERE source_id = $source_id; +DELETE knowledge_entity_embedding WHERE source_id = $source_id; +DELETE knowledge_entity WHERE source_id = $source_id; +"; + + /// Removes chunks, embeddings, entities, and relationships for one ingested document snapshot. + pub async fn clear_ingested_children( + source_id: &str, + user_id: &str, + db: &SurrealDbClient, + ) -> Result<(), AppError> { + let query = format!( + "BEGIN TRANSACTION;\n{} COMMIT TRANSACTION;", + Self::CLEAR_INGESTED_CHILD_ROWS_SURQL + ); + + db.client + .query(query) + .bind(("source_id", source_id.to_string())) + .bind(("user_id", user_id.to_string())) + .await + .map_err(AppError::from)? + .check() + .map_err(AppError::from)?; + + Ok(()) + } + pub async fn patch( id: &str, context: &str, @@ -364,7 +399,14 @@ mod tests { use anyhow::{self, Context}; use super::*; - use crate::test_utils::setup_test_db_with_runtime_indexes; + use crate::{ + storage::types::{ + knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, + knowledge_relationship::KnowledgeRelationship, + text_chunk::TextChunk, + }, + test_utils::{setup_test_db, setup_test_db_with_runtime_indexes}, + }; #[tokio::test] async fn test_text_content_creation() -> anyhow::Result<()> { @@ -638,4 +680,81 @@ mod tests { ); Ok(()) } + + #[tokio::test] + async fn clear_ingested_children_removes_chunks_entities_and_relationships( + ) -> anyhow::Result<()> { + let db = setup_test_db().await?; + let user_id = "clear-user"; + let source_id = Uuid::new_v4().to_string(); + + let entity_a = KnowledgeEntity::new( + source_id.clone(), + "entity-a".to_string(), + "desc-a".to_string(), + KnowledgeEntityType::Idea, + None, + user_id.to_string(), + ); + let entity_b = KnowledgeEntity::new( + source_id.clone(), + "entity-b".to_string(), + "desc-b".to_string(), + KnowledgeEntityType::Idea, + None, + user_id.to_string(), + ); + KnowledgeEntity::store_with_embedding(entity_a.clone(), vec![0.1; 3], 3, &db) + .await + .context("store entity a")?; + KnowledgeEntity::store_with_embedding(entity_b.clone(), vec![0.2; 3], 3, &db) + .await + .context("store entity b")?; + + let chunk = TextChunk::new(source_id.clone(), "chunk".to_string(), user_id.to_string()); + TextChunk::store_with_embedding(chunk, vec![0.3; 3], 3, &db) + .await + .context("store chunk")?; + + KnowledgeRelationship::new( + entity_a.id.clone(), + entity_b.id, + user_id.to_string(), + source_id.clone(), + "relates_to".to_string(), + ) + .store_relationship(&db) + .await + .context("store relationship")?; + + TextContent::clear_ingested_children(&source_id, user_id, &db) + .await + .context("clear ingested children")?; + + let chunks: Vec = db + .client + .query("SELECT * FROM text_chunk WHERE source_id = $source_id;") + .bind(("source_id", source_id.clone())) + .await? + .take(0)?; + assert!(chunks.is_empty()); + + let entities: Vec = db + .client + .query("SELECT * FROM knowledge_entity WHERE source_id = $source_id;") + .bind(("source_id", source_id.clone())) + .await? + .take(0)?; + assert!(entities.is_empty()); + + let relationships: Vec = db + .client + .query("SELECT * FROM relates_to WHERE metadata.source_id = $source_id;") + .bind(("source_id", source_id)) + .await? + .take(0)?; + assert!(relationships.is_empty()); + + Ok(()) + } } diff --git a/evaluations/src/corpus/store.rs b/evaluations/src/corpus/store.rs index 27fe4ee..f219251 100644 --- a/evaluations/src/corpus/store.rs +++ b/evaluations/src/corpus/store.rs @@ -7,33 +7,24 @@ use std::{ use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; -use common::storage::types::StoredObject; use common::storage::{ db::SurrealDbClient, types::{ knowledge_entity::KnowledgeEntity, - knowledge_entity_embedding::KnowledgeEntityEmbedding, - knowledge_relationship::{KnowledgeRelationship, RelationshipMetadata}, + knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, - text_chunk_embedding::TextChunkEmbedding, text_content::TextContent, + StoredObject, }, }; +use ingestion_pipeline::{persist_artifacts, IngestionTuning, PipelineArtifacts}; use serde::Deserialize; -use serde::Serialize; -use surrealdb::sql::Thing; use tracing::{debug, warn}; use crate::datasets::{ConvertedParagraph, ConvertedQuestion}; pub const MANIFEST_VERSION: u32 = 3; pub const PARAGRAPH_SHARD_VERSION: u32 = 3; -const MANIFEST_BATCH_SIZE: usize = 100; -const MANIFEST_MAX_BYTES_PER_BATCH: usize = 300_000; // default cap for non-text batches -const TEXT_CONTENT_MAX_BYTES_PER_BATCH: usize = 250_000; // text bodies can be large; limit aggressively -const MAX_BATCHES_PER_REQUEST: usize = 24; -const REQUEST_MAX_BYTES: usize = 800_000; // total payload cap per Surreal query request - fn current_manifest_version() -> u32 { MANIFEST_VERSION } @@ -251,130 +242,6 @@ pub fn window_manifest( Ok(narrowed) } -#[derive(Debug, Clone, Serialize)] -struct RelationInsert { - #[serde(rename = "in")] - pub in_: Thing, - #[serde(rename = "out")] - pub out: Thing, - pub id: String, - pub metadata: RelationshipMetadata, -} - -#[derive(Debug)] -struct SizedBatch { - approx_bytes: usize, - items: Vec, -} - -struct ManifestBatches { - text_contents: Vec>, - entities: Vec>, - entity_embeddings: Vec>, - relationships: Vec>, - chunks: Vec>, - chunk_embeddings: Vec>, -} - -fn build_manifest_batches(manifest: &CorpusManifest) -> Result { - let mut text_contents = Vec::new(); - let mut entities = Vec::new(); - let mut entity_embeddings = Vec::new(); - let mut relationships = Vec::new(); - let mut chunks = Vec::new(); - let mut chunk_embeddings = Vec::new(); - - let mut seen_text_content = HashSet::new(); - let mut seen_entities = HashSet::new(); - let mut seen_relationships = HashSet::new(); - let mut seen_chunks = HashSet::new(); - - for paragraph in &manifest.paragraphs { - if seen_text_content.insert(paragraph.text_content.id.clone()) { - text_contents.push(paragraph.text_content.clone()); - } - - for embedded_entity in ¶graph.entities { - if seen_entities.insert(embedded_entity.entity.id.clone()) { - let entity = embedded_entity.entity.clone(); - entities.push(entity.clone()); - entity_embeddings.push(KnowledgeEntityEmbedding::new( - &entity.id, - entity.source_id.clone(), - embedded_entity.embedding.clone(), - entity.user_id.clone(), - )); - } - } - - for relationship in ¶graph.relationships { - if seen_relationships.insert(relationship.id.clone()) { - let table = KnowledgeEntity::table_name(); - let in_id = relationship - .in_ - .strip_prefix(&format!("{table}:")) - .unwrap_or(&relationship.in_); - let out_id = relationship - .out - .strip_prefix(&format!("{table}:")) - .unwrap_or(&relationship.out); - let in_thing = Thing::from((table, in_id)); - let out_thing = Thing::from((table, out_id)); - relationships.push(RelationInsert { - in_: in_thing, - out: out_thing, - id: relationship.id.clone(), - metadata: relationship.metadata.clone(), - }); - } - } - - for embedded_chunk in ¶graph.chunks { - if seen_chunks.insert(embedded_chunk.chunk.id.clone()) { - let chunk = embedded_chunk.chunk.clone(); - chunks.push(chunk.clone()); - chunk_embeddings.push(TextChunkEmbedding::new( - &chunk.id, - chunk.source_id.clone(), - embedded_chunk.embedding.clone(), - chunk.user_id.clone(), - )); - } - } - } - - Ok(ManifestBatches { - text_contents: chunk_items( - &text_contents, - MANIFEST_BATCH_SIZE, - TEXT_CONTENT_MAX_BYTES_PER_BATCH, - ) - .context("chunking text_content payloads")?, - entities: chunk_items(&entities, MANIFEST_BATCH_SIZE, MANIFEST_MAX_BYTES_PER_BATCH) - .context("chunking knowledge_entity payloads")?, - entity_embeddings: chunk_items( - &entity_embeddings, - MANIFEST_BATCH_SIZE, - MANIFEST_MAX_BYTES_PER_BATCH, - ) - .context("chunking knowledge_entity_embedding payloads")?, - relationships: chunk_items( - &relationships, - MANIFEST_BATCH_SIZE, - MANIFEST_MAX_BYTES_PER_BATCH, - ) - .context("chunking relationship payloads")?, - chunks: chunk_items(&chunks, MANIFEST_BATCH_SIZE, MANIFEST_MAX_BYTES_PER_BATCH) - .context("chunking text_chunk payloads")?, - chunk_embeddings: chunk_items( - &chunk_embeddings, - MANIFEST_BATCH_SIZE, - MANIFEST_MAX_BYTES_PER_BATCH, - ) - .context("chunking text_chunk_embedding payloads")?, - }) -} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct ParagraphShard { #[serde(default = "current_paragraph_shard_version")] @@ -599,157 +466,28 @@ fn normalize_answer_text(text: &str) -> String { .join(" ") } -#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)] -fn chunk_items( - items: &[T], - max_items: usize, - max_bytes: usize, -) -> Result>> { - if items.is_empty() { - return Ok(Vec::new()); - } - - let mut batches = Vec::new(); - let mut current = Vec::new(); - let mut current_bytes = 0usize; - - for item in items { - let size = serde_json::to_vec(item) - .map(|buf| buf.len()) - .context("serialising batch item for sizing")?; - - let would_overflow_items = !current.is_empty() && current.len() >= max_items; - let would_overflow_bytes = !current.is_empty() && current_bytes + size > max_bytes; - - if would_overflow_items || would_overflow_bytes { - batches.push(SizedBatch { - approx_bytes: current_bytes.max(1), - items: std::mem::take(&mut current), - }); - current_bytes = 0; - } - - current_bytes += size; - current.push(item.clone()); - } - - if !current.is_empty() { - batches.push(SizedBatch { - approx_bytes: current_bytes.max(1), - items: current, - }); - } - - Ok(batches) -} - -#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)] -async fn execute_batched_inserts( - db: &SurrealDbClient, - statement: impl AsRef, - prefix: &str, - batches: &[SizedBatch], -) -> Result<()> { - if batches.is_empty() { - return Ok(()); - } - - let mut start = 0; - while start < batches.len() { - let mut group_bytes = 0usize; - let mut group_end = start; - let mut group_count = 0usize; - - while group_end < batches.len() { - let batch_bytes = batches[group_end].approx_bytes.max(1); - if group_count > 0 - && (group_bytes + batch_bytes > REQUEST_MAX_BYTES - || group_count >= MAX_BATCHES_PER_REQUEST) - { - break; - } - group_bytes += batch_bytes; - group_end += 1; - group_count += 1; - } - - let slice = &batches[start..group_end]; - let mut query = db.client.query("BEGIN TRANSACTION;"); - for (bind_index, batch) in slice.iter().enumerate() { - let name = format!("{prefix}{bind_index}"); - query = query - .query(format!("{} ${};", statement.as_ref(), name)) - .bind((name, batch.items.clone())); - } - let response = query - .query("COMMIT TRANSACTION;") - .await - .context("executing batched insert transaction")?; - if let Err(err) = response.check() { - return Err(anyhow!( - "batched insert failed for statement '{}': {err:?}", - statement.as_ref() - )); - } - - start = group_end; - } - - Ok(()) -} - pub async fn seed_manifest_into_db(db: &SurrealDbClient, manifest: &CorpusManifest) -> Result<()> { - let batches = build_manifest_batches(manifest).context("preparing manifest batches")?; + let tuning = IngestionTuning::default(); + let embedding_dimensions = manifest.metadata.embedding_dimension; + let mut seen_text_content = HashSet::new(); let result = async { - execute_batched_inserts( - db, - format!("INSERT INTO {}", TextContent::table_name()), - "tc", - &batches.text_contents, - ) - .await?; + for paragraph in &manifest.paragraphs { + if !seen_text_content.insert(paragraph.text_content.id.clone()) { + continue; + } - execute_batched_inserts( - db, - format!("INSERT INTO {}", KnowledgeEntity::table_name()), - "ke", - &batches.entities, - ) - .await?; - - execute_batched_inserts( - db, - format!("INSERT INTO {}", TextChunk::table_name()), - "ch", - &batches.chunks, - ) - .await?; - - execute_batched_inserts( - db, - "INSERT RELATION INTO relates_to", - "rel", - &batches.relationships, - ) - .await?; - - execute_batched_inserts( - db, - format!("INSERT INTO {}", KnowledgeEntityEmbedding::table_name()), - "kee", - &batches.entity_embeddings, - ) - .await?; - - execute_batched_inserts( - db, - format!("INSERT INTO {}", TextChunkEmbedding::table_name()), - "tce", - &batches.chunk_embeddings, - ) - .await?; + let artifacts = PipelineArtifacts { + text_content: paragraph.text_content.clone(), + entities: paragraph.entities.clone(), + relationships: paragraph.relationships.clone(), + chunks: paragraph.chunks.clone(), + }; + persist_artifacts(db, &tuning, embedding_dimensions, artifacts) + .await + .map_err(|err| anyhow!("persist manifest paragraph: {err}"))?; + } Ok(()) } .await; @@ -778,7 +516,10 @@ pub async fn seed_manifest_into_db(db: &SurrealDbClient, manifest: &CorpusManife mod tests { use super::*; use chrono::Utc; - use common::storage::types::knowledge_entity::KnowledgeEntityType; + use common::storage::types::{ + knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, + text_chunk::TextChunk, + }; use uuid::Uuid; #[allow(clippy::too_many_lines)] diff --git a/html-router/src/routes/content/handlers.rs b/html-router/src/routes/content/handlers.rs index 4a1d78c..5e48a9c 100644 --- a/html-router/src/routes/content/handlers.rs +++ b/html-router/src/routes/content/handlers.rs @@ -5,10 +5,7 @@ use axum::{ use axum_htmx::{HxBoosted, HxRequest, HxTarget}; use serde::{Deserialize, Serialize}; -use common::storage::types::{ - file_info::FileInfo, knowledge_entity::KnowledgeEntity, text_chunk::TextChunk, - text_content::TextContent, user::User, -}; +use common::storage::types::{file_info::FileInfo, text_content::TextContent, user::User}; use crate::{ html_state::HtmlState, @@ -180,9 +177,7 @@ pub async fn delete_text_content( } } - // Delete related knowledge entities and text chunks - KnowledgeEntity::delete_by_source_id(&id, &state.db).await?; - TextChunk::delete_by_source_id(&id, &state.db).await?; + TextContent::clear_ingested_children(&id, &user.id, &state.db).await?; // Delete the text content state.db.delete_item::(&id).await?; diff --git a/html-router/src/routes/index/handlers.rs b/html-router/src/routes/index/handlers.rs index 2b76a12..ce1b437 100644 --- a/html-router/src/routes/index/handlers.rs +++ b/html-router/src/routes/index/handlers.rs @@ -23,9 +23,7 @@ use common::storage::types::user::DashboardStats; use common::{ error::AppError, storage::types::{ - file_info::FileInfo, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity, - knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, - text_content::TextContent, user::User, + file_info::FileInfo, ingestion_task::IngestionTask, text_content::TextContent, user::User, }, }; @@ -81,11 +79,7 @@ pub async fn delete_text_content( } } - // Delete the text content and any related data - TextChunk::delete_by_source_id(&text_content.id, &state.db).await?; - KnowledgeEntity::delete_by_source_id(&text_content.id, &state.db).await?; - KnowledgeRelationship::delete_relationships_by_source_id(&text_content.id, &user.id, &state.db) - .await?; + TextContent::clear_ingested_children(&text_content.id, &user.id, &state.db).await?; state .db .delete_item::(&text_content.id) diff --git a/ingestion-pipeline/src/lib.rs b/ingestion-pipeline/src/lib.rs index 1c0a2ca..b6b723a 100644 --- a/ingestion-pipeline/src/lib.rs +++ b/ingestion-pipeline/src/lib.rs @@ -10,8 +10,8 @@ use common::storage::{ types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS}, }; pub use pipeline::{ - EmbeddedKnowledgeEntity, EmbeddedTextChunk, IngestionConfig, IngestionPipeline, - IngestionTuning, PipelineArtifacts, + persist_artifacts, EmbeddedKnowledgeEntity, EmbeddedTextChunk, IngestionConfig, + IngestionPipeline, IngestionTuning, PipelineArtifacts, }; use std::sync::Arc; use tokio::time::{sleep, Duration}; diff --git a/ingestion-pipeline/src/pipeline/config.rs b/ingestion-pipeline/src/pipeline/config.rs index fc374c1..1820c26 100644 --- a/ingestion-pipeline/src/pipeline/config.rs +++ b/ingestion-pipeline/src/pipeline/config.rs @@ -3,9 +3,9 @@ pub struct IngestionTuning { pub retry_base_delay_secs: u64, pub retry_max_delay_secs: u64, pub retry_backoff_cap_exponent: u32, - pub graph_store_attempts: usize, - pub graph_initial_backoff_ms: u64, - pub graph_max_backoff_ms: u64, + pub persist_attempts: usize, + pub persist_initial_backoff_ms: u64, + pub persist_max_backoff_ms: u64, pub chunk_min_tokens: usize, pub chunk_max_tokens: usize, pub chunk_overlap_tokens: usize, @@ -21,9 +21,9 @@ impl Default for IngestionTuning { retry_base_delay_secs: 30, retry_max_delay_secs: 15 * 60, retry_backoff_cap_exponent: 5, - graph_store_attempts: 3, - graph_initial_backoff_ms: 50, - graph_max_backoff_ms: 800, + persist_attempts: 3, + persist_initial_backoff_ms: 50, + persist_max_backoff_ms: 800, chunk_min_tokens: 256, chunk_max_tokens: 512, chunk_overlap_tokens: 50, diff --git a/ingestion-pipeline/src/pipeline/mod.rs b/ingestion-pipeline/src/pipeline/mod.rs index 4dbac92..0c9a604 100644 --- a/ingestion-pipeline/src/pipeline/mod.rs +++ b/ingestion-pipeline/src/pipeline/mod.rs @@ -12,6 +12,8 @@ pub use config::{IngestionConfig, IngestionTuning}; pub use context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk, PipelineArtifacts}; pub use enrichment_result::{LLMEnrichmentResult, LLMKnowledgeEntity, LLMRelationship}; #[allow(clippy::module_name_repetitions)] +pub use persistence::persist_artifacts; +#[allow(clippy::module_name_repetitions)] pub use services::{DefaultPipelineServices, PipelineServices}; use std::{ @@ -28,11 +30,13 @@ use common::{ types::{ ingestion_payload::IngestionPayload, ingestion_task::{IngestionTask, TaskErrorInfo}, + text_content::TextContent, }, }, utils::config::AppConfig, }; use retrieval_pipeline::reranking::RerankerPool; +use tokio::time::sleep; use tracing::{debug, info, warn}; use self::{ @@ -120,29 +124,31 @@ impl IngestionPipeline { )] pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> { let mut processing_task = task.mark_processing(&self.db).await?; - let payload = processing_task.take_content(); - match self - .drive_pipeline(&processing_task, payload) - .await - .map_err(|err| { - debug!( - task_id = %processing_task.id, - attempt = processing_task.attempts, - error = %err, - "ingestion pipeline failed" - ); - err - }) { - Ok(()) => { - processing_task.mark_succeeded(&self.db).await?; - tracing::info!( - task_id = %processing_task.id, - attempt = processing_task.attempts, - "ingestion task succeeded" - ); - Ok(()) - } + let pipeline_result = if self.artifacts_persisted(&processing_task.id).await? { + info!( + task_id = %processing_task.id, + attempt = processing_task.attempts, + "ingestion artifacts already persisted; skipping pipeline" + ); + Ok(()) + } else { + let payload = processing_task.take_content(); + self.drive_pipeline(&processing_task, payload) + .await + .map_err(|err| { + debug!( + task_id = %processing_task.id, + attempt = processing_task.attempts, + error = %err, + "ingestion pipeline failed" + ); + err + }) + }; + + match pipeline_result { + Ok(()) => self.finalize_succeeded(&processing_task).await, Err(err) => { let reason = err.to_string(); let retryable = !matches!(err, AppError::Validation(_)); @@ -179,6 +185,51 @@ impl IngestionPipeline { } } + async fn artifacts_persisted(&self, task_id: &str) -> Result { + Ok(self + .db + .get_item::(task_id) + .await? + .is_some()) + } + + async fn finalize_succeeded(&self, task: &IngestionTask) -> Result<(), AppError> { + let tuning = &self.pipeline_config.tuning; + let mut backoff_ms = tuning.persist_initial_backoff_ms; + let last_attempt = tuning.persist_attempts.saturating_sub(1); + + for attempt in 0..tuning.persist_attempts { + match task.mark_succeeded(&self.db).await { + Ok(_) => { + info!( + task_id = %task.id, + attempt = task.attempts, + "ingestion task succeeded" + ); + return Ok(()); + } + Err(err) if attempt < last_attempt => { + let next_attempt = attempt.saturating_add(1); + warn!( + task_id = %task.id, + attempt = next_attempt, + error = %err, + "failed to mark ingestion task succeeded; retrying" + ); + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = backoff_ms + .saturating_mul(2) + .min(tuning.persist_max_backoff_ms); + } + Err(err) => return Err(err), + } + } + + Err(AppError::InternalError( + "failed to mark ingestion task succeeded after retries".into(), + )) + } + fn retry_delay(&self, attempt: u32) -> Duration { let tuning = &self.pipeline_config.tuning; let capped_attempt = attempt @@ -291,5 +342,8 @@ impl IngestionPipeline { } } +#[cfg(test)] +mod test_support; + #[cfg(test)] mod tests; diff --git a/ingestion-pipeline/src/pipeline/persistence.rs b/ingestion-pipeline/src/pipeline/persistence.rs index 9a6e689..a4483f3 100644 --- a/ingestion-pipeline/src/pipeline/persistence.rs +++ b/ingestion-pipeline/src/pipeline/persistence.rs @@ -1,8 +1,9 @@ -//! Low-level database write mechanics for the persist stage. +//! Atomic persistence for ingested artifacts. //! -//! This module owns *how* ingested artifacts reach `SurrealDB` (per-item store loops, -//! the relationship transaction, and conflict retry/backoff). The persist stage in -//! [`super::stages`] owns *what* gets written and in which order. +//! All rows for one ingestion task are written inside a single `SurrealDB` transaction: +//! clear any prior rows for the task's `source_id`, then insert the new snapshot. +//! `SurrealDB` does not cap transaction row count; request payload size is the practical +//! limit (~4 MiB gRPC on `TiKV`). Typical single-document ingests fit comfortably. use std::sync::Arc; @@ -11,131 +12,332 @@ use common::{ storage::{ db::SurrealDbClient, types::{ - knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship, + knowledge_entity::KnowledgeEntity, + knowledge_entity_embedding::KnowledgeEntityEmbedding, text_chunk::TextChunk, + text_chunk_embedding::TextChunkEmbedding, + text_content::TextContent, }, }, }; use tokio::time::{sleep, Duration}; -use tracing::{debug, warn}; +use tracing::warn; use super::{ config::IngestionTuning, - context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk}, + context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk, PipelineArtifacts}, }; -const STORE_RELATIONSHIPS: &str = r" - BEGIN TRANSACTION; - LET $relationships = $relationships; - - FOR $relationship IN $relationships { - LET $in_node = type::thing('knowledge_entity', $relationship.in); - LET $out_node = type::thing('knowledge_entity', $relationship.out); - RELATE $in_node->relates_to->$out_node CONTENT { - id: type::thing('relates_to', $relationship.id), - metadata: $relationship.metadata - }; - }; - - COMMIT TRANSACTION; -"; - -/// Persists chunk embeddings to the vector store. -/// -/// Chunks are written serially on purpose. Concurrent/batched inserts were -/// trialed and did not reliably improve throughput; see `ingestion-pipeline/AGENTS.md` -/// for the rationale and as a candidate for future refactoring/benchmarking. -pub(super) async fn store_vector_chunks( - db: &SurrealDbClient, - task_id: &str, - embedding_dimensions: usize, - chunks: Vec, -) -> Result { - let chunk_count = chunks.len(); - for embedded in chunks { - debug!( - task_id = %task_id, - chunk_id = %embedded.chunk.id, - chunk_len = embedded.chunk.chunk.chars().count(), - "chunk persisted" - ); - TextChunk::store_with_embedding( - embedded.chunk, - embedded.embedding, - embedding_dimensions, - db, - ) - .await?; - } - - Ok(chunk_count) +#[derive(Debug, Clone, Copy)] +#[allow(clippy::struct_field_names)] +pub struct PersistCounts { + pub chunk_count: usize, + pub entity_count: usize, + pub relationship_count: usize, } -/// Persists knowledge entities and their relationships. -/// -/// Entities are stored serially (see `store_vector_chunks` and AGENTS.md for why). -/// Relationships are written via a single transaction with bounded conflict retry. -pub(super) async fn store_graph_entities( +/// Persists all pipeline artifacts in one database transaction. +pub async fn persist_artifacts( db: &SurrealDbClient, tuning: &IngestionTuning, embedding_dimensions: usize, - entities: Vec, - relationships: Vec, -) -> Result<(), AppError> { - for embedded in entities { - KnowledgeEntity::store_with_embedding( - embedded.entity, - embedded.embedding, - embedding_dimensions, - db, - ) - .await?; - } + artifacts: PipelineArtifacts, +) -> Result { + let PipelineArtifacts { + text_content, + entities, + relationships, + chunks, + } = artifacts; - if relationships.is_empty() { - return Ok(()); - } + let source_id = text_content.id.clone(); + let user_id = text_content.user_id.clone(); + let chunk_count = chunks.len(); + let entity_count = entities.len(); + let relationship_count = relationships.len(); - let relationships = Arc::new(relationships); + let (entities, entity_embeddings) = prepare_entity_rows(entities, embedding_dimensions)?; + let (chunks, chunk_embeddings) = prepare_chunk_rows(chunks, embedding_dimensions)?; - let mut backoff_ms = tuning.graph_initial_backoff_ms; - let last_attempt = tuning.graph_store_attempts.saturating_sub(1); + let payload = PersistPayload { + source_id: Arc::from(source_id), + user_id: Arc::from(user_id), + text_content: Arc::new(text_content), + entities: Arc::from(entities.into_boxed_slice()), + entity_embeddings: Arc::from(entity_embeddings.into_boxed_slice()), + chunks: Arc::from(chunks.into_boxed_slice()), + chunk_embeddings: Arc::from(chunk_embeddings.into_boxed_slice()), + relationships: relationships.into(), + }; - for attempt in 0..tuning.graph_store_attempts { - let result = db - .client - .query(STORE_RELATIONSHIPS) - .bind(("relationships", Arc::clone(&relationships))) - .await; + let mut backoff_ms = tuning.persist_initial_backoff_ms; + let last_attempt = tuning.persist_attempts.saturating_sub(1); + + for attempt in 0..tuning.persist_attempts { + let result = execute_persist_transaction(db, &payload).await; match result { - Ok(_) => return Ok(()), + Ok(()) => { + return Ok(PersistCounts { + chunk_count, + entity_count, + relationship_count, + }); + } Err(err) => { if is_retryable_conflict(&err) && attempt < last_attempt { let next_attempt = attempt.saturating_add(1); warn!( attempt = next_attempt, - "Transient SurrealDB conflict while storing graph data; retrying" + "Transient SurrealDB conflict while persisting ingestion artifacts; retrying" ); sleep(Duration::from_millis(backoff_ms)).await; backoff_ms = backoff_ms .saturating_mul(2) - .min(tuning.graph_max_backoff_ms); + .min(tuning.persist_max_backoff_ms); continue; } - return Err(AppError::from(err)); + return Err(err); } } } Err(AppError::InternalError( - "Failed to store graph entities after retries".to_string(), + "Failed to persist ingestion artifacts after retries".to_string(), )) } -fn is_retryable_conflict(error: &surrealdb::Error) -> bool { +struct PersistPayload { + source_id: Arc, + user_id: Arc, + text_content: Arc, + entities: Arc<[KnowledgeEntity]>, + entity_embeddings: Arc<[KnowledgeEntityEmbedding]>, + chunks: Arc<[TextChunk]>, + chunk_embeddings: Arc<[TextChunkEmbedding]>, + relationships: + Arc<[common::storage::types::knowledge_relationship::KnowledgeRelationship]>, +} + +async fn execute_persist_transaction( + db: &SurrealDbClient, + payload: &PersistPayload, +) -> Result<(), AppError> { + let mut query = String::from("BEGIN TRANSACTION;\n"); + query.push_str(TextContent::CLEAR_INGESTED_CHILD_ROWS_SURQL); + query.push_str( + "DELETE type::thing('text_content', $source_id); + UPSERT type::thing('text_content', $source_id) CONTENT $text_content;", + ); + + if !payload.entities.is_empty() { + query.push_str("\nINSERT INTO knowledge_entity $entities;"); + query.push_str("\nINSERT INTO knowledge_entity_embedding $entity_embeddings;"); + } + if !payload.chunks.is_empty() { + query.push_str("\nINSERT INTO text_chunk $chunks;"); + query.push_str("\nINSERT INTO text_chunk_embedding $chunk_embeddings;"); + } + if !payload.relationships.is_empty() { + query.push_str( + r#" +LET $relationships = $relationships; +FOR $relationship IN $relationships { + LET $in_node = type::thing('knowledge_entity', $relationship.`in`); + LET $out_node = type::thing('knowledge_entity', $relationship.out); + RELATE $in_node->relates_to->$out_node CONTENT { + id: type::thing('relates_to', $relationship.id), + metadata: $relationship.metadata + }; +};"#, + ); + } + + query.push_str("\nCOMMIT TRANSACTION;"); + + let mut request = db + .client + .query(query) + .bind(("source_id", Arc::clone(&payload.source_id))) + .bind(("user_id", Arc::clone(&payload.user_id))) + .bind(("text_content", Arc::clone(&payload.text_content))); + + if !payload.entities.is_empty() { + request = request + .bind(("entities", Arc::clone(&payload.entities))) + .bind(("entity_embeddings", Arc::clone(&payload.entity_embeddings))); + } + if !payload.chunks.is_empty() { + request = request + .bind(("chunks", Arc::clone(&payload.chunks))) + .bind(("chunk_embeddings", Arc::clone(&payload.chunk_embeddings))); + } + if !payload.relationships.is_empty() { + request = request.bind(("relationships", Arc::clone(&payload.relationships))); + } + + request + .await + .map_err(AppError::from)? + .check() + .map_err(AppError::from)?; + + Ok(()) +} + +fn prepare_entity_rows( + embedded: Vec, + embedding_dimensions: usize, +) -> Result<(Vec, Vec), AppError> { + let mut entities = Vec::with_capacity(embedded.len()); + let mut entity_embeddings = Vec::with_capacity(embedded.len()); + + for item in embedded { + KnowledgeEntityEmbedding::validate_dimension(&item.embedding, embedding_dimensions)?; + let entity = item.entity; + entity_embeddings.push(KnowledgeEntityEmbedding::new( + &entity.id, + entity.source_id.clone(), + item.embedding, + entity.user_id.clone(), + )); + entities.push(entity); + } + + Ok((entities, entity_embeddings)) +} + +fn prepare_chunk_rows( + embedded: Vec, + embedding_dimensions: usize, +) -> Result<(Vec, Vec), AppError> { + let mut chunks = Vec::with_capacity(embedded.len()); + let mut chunk_embeddings = Vec::with_capacity(embedded.len()); + + for item in embedded { + TextChunkEmbedding::validate_dimension(&item.embedding, embedding_dimensions)?; + let chunk = item.chunk; + chunk_embeddings.push(TextChunkEmbedding::new( + &chunk.id, + chunk.source_id.clone(), + item.embedding, + chunk.user_id.clone(), + )); + chunks.push(chunk); + } + + Ok((chunks, chunk_embeddings)) +} + +fn is_retryable_conflict(error: &AppError) -> bool { error .to_string() .contains("Failed to commit transaction due to a read or write conflict") } + +#[cfg(test)] +mod tests { + use common::storage::types::text_content::TextContent; + + use super::*; + use crate::pipeline::test_support::{ + self, count_chunks_for_source, count_entities_for_source, count_relationships_for_source, + large_artifacts, persist, sample_artifacts, setup_db, TEST_EMBEDDING_DIM, + }; + + #[tokio::test] + async fn persist_artifacts_is_idempotent_for_same_source() -> anyhow::Result<()> { + let db = setup_db().await?; + let source_id = uuid::Uuid::new_v4().to_string(); + let user_id = "persist-idempotent"; + + persist(&db, sample_artifacts(&source_id, user_id)).await?; + persist(&db, sample_artifacts(&source_id, user_id)).await?; + + assert_eq!(count_chunks_for_source(&db, &source_id).await?, 1); + assert_eq!(count_entities_for_source(&db, &source_id).await?, 1); + + Ok(()) + } + + #[tokio::test] + async fn persist_artifacts_rejects_invalid_embedding_before_write() -> anyhow::Result<()> { + let db = setup_db().await?; + let source_id = uuid::Uuid::new_v4().to_string(); + let user_id = "persist-validate"; + let mut artifacts = sample_artifacts(&source_id, user_id); + if let Some(chunk) = artifacts.chunks.first_mut() { + chunk.embedding = vec![0.1; 2]; + } + + let result = + persist_artifacts(&db, &test_support::tuning(), TEST_EMBEDDING_DIM, artifacts).await; + assert!(result.is_err()); + + let text: Option = db.get_item(&source_id).await?; + assert!(text.is_none()); + + Ok(()) + } + + #[tokio::test] + async fn persist_large_snapshot() -> anyhow::Result<()> { + let db = setup_db().await?; + let source_id = uuid::Uuid::new_v4().to_string(); + let user_id = "persist-large"; + let chunk_count = 100; + let entity_count = 20; + let relationship_count = 30; + + persist( + &db, + large_artifacts( + &source_id, + user_id, + chunk_count, + entity_count, + relationship_count, + TEST_EMBEDDING_DIM, + ), + ) + .await?; + + assert_eq!(count_chunks_for_source(&db, &source_id).await?, chunk_count); + assert_eq!( + count_entities_for_source(&db, &source_id).await?, + entity_count + ); + assert_eq!( + count_relationships_for_source(&db, &source_id).await?, + relationship_count + ); + + Ok(()) + } + + #[tokio::test] + async fn persist_does_not_touch_other_source_ids() -> anyhow::Result<()> { + let db = setup_db().await?; + let source_a = uuid::Uuid::new_v4().to_string(); + let source_b = uuid::Uuid::new_v4().to_string(); + let user_id = "persist-isolation"; + + persist(&db, large_artifacts(&source_a, user_id, 5, 3, 4, TEST_EMBEDDING_DIM)).await?; + persist(&db, large_artifacts(&source_b, user_id, 2, 1, 1, TEST_EMBEDDING_DIM)).await?; + persist( + &db, + large_artifacts(&source_a, user_id, 7, 4, 6, TEST_EMBEDDING_DIM), + ) + .await?; + + assert_eq!(count_chunks_for_source(&db, &source_a).await?, 7); + assert_eq!(count_entities_for_source(&db, &source_a).await?, 4); + assert_eq!(count_relationships_for_source(&db, &source_a).await?, 6); + assert_eq!(count_chunks_for_source(&db, &source_b).await?, 2); + assert_eq!(count_entities_for_source(&db, &source_b).await?, 1); + assert_eq!(count_relationships_for_source(&db, &source_b).await?, 1); + + Ok(()) + } +} diff --git a/ingestion-pipeline/src/pipeline/stages.rs b/ingestion-pipeline/src/pipeline/stages.rs index 330f92a..0d1b48e 100644 --- a/ingestion-pipeline/src/pipeline/stages.rs +++ b/ingestion-pipeline/src/pipeline/stages.rs @@ -12,9 +12,9 @@ use state_machines::core::GuardError; use tracing::{debug, instrument}; use super::{ - context::{PipelineArtifacts, PipelineContext}, + context::PipelineContext, enrichment_result::LLMEnrichmentResult, - persistence::{store_graph_entities, store_vector_chunks}, + persistence::persist_artifacts, state::{ContentPrepared, Enriched, IngestionMachine, Persisted, Ready, Retrieved}, }; @@ -28,7 +28,8 @@ pub async fn prepare_content( ctx: &mut PipelineContext<'_>, payload: IngestionPayload, ) -> Result, AppError> { - let text_content = ctx.services.prepare_text_content(payload).await?; + let mut text_content = ctx.services.prepare_text_content(payload).await?; + text_content.id.clone_from(&ctx.task_id); let text_len = text_content.text.chars().count(); let preview: String = text_content.text.chars().take(120).collect(); @@ -146,43 +147,26 @@ pub async fn persist( machine: IngestionMachine<(), Enriched>, ctx: &mut PipelineContext<'_>, ) -> Result, AppError> { - let PipelineArtifacts { - text_content, - entities, - relationships, - chunks, - } = ctx.build_artifacts().await?; - let entity_count = entities.len(); - let relationship_count = relationships.len(); - + let artifacts = ctx.build_artifacts().await?; let settings = SystemSettings::get_current(ctx.db).await?; let embedding_dimensions = usize::try_from(settings.embedding_dimensions).map_err(|_| { AppError::InternalError("system_settings.embedding_dimensions exceeds usize::MAX".into()) })?; - let chunk_count = store_vector_chunks( - ctx.db, - ctx.task_id.as_str(), - embedding_dimensions, - chunks, - ) - .await?; - store_graph_entities( + let counts = persist_artifacts( ctx.db, &ctx.pipeline_config.tuning, embedding_dimensions, - entities, - relationships, + artifacts, ) .await?; - ctx.db.store_item(text_content).await?; debug!( task_id = %ctx.task_id, attempt = ctx.attempt, - entity_count, - relationship_count, - chunk_count, + entity_count = counts.entity_count, + relationship_count = counts.relationship_count, + chunk_count = counts.chunk_count, "ingestion persistence flushed to database" ); diff --git a/ingestion-pipeline/src/pipeline/test_support.rs b/ingestion-pipeline/src/pipeline/test_support.rs new file mode 100644 index 0000000..805f00f --- /dev/null +++ b/ingestion-pipeline/src/pipeline/test_support.rs @@ -0,0 +1,178 @@ +//! Shared helpers for ingestion pipeline integration and persistence tests. + +use chrono::Utc; +use common::{ + error::AppError, + storage::{ + db::SurrealDbClient, + types::{ + knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, + knowledge_relationship::KnowledgeRelationship, + text_chunk::TextChunk, + text_content::TextContent, + }, + }, +}; +use uuid::Uuid; + +use super::{ + config::IngestionTuning, + context::{EmbeddedKnowledgeEntity, EmbeddedTextChunk, PipelineArtifacts}, + persistence::persist_artifacts, +}; + +pub const TEST_EMBEDDING_DIM: usize = 3; + +pub async fn setup_db() -> anyhow::Result { + let namespace = "ingestion_pipeline_test"; + let database = Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, &database).await?; + db.apply_migrations().await?; + Ok(db) +} + +pub fn tuning() -> IngestionTuning { + IngestionTuning::default() +} + +pub fn sample_artifacts(source_id: &str, user_id: &str) -> PipelineArtifacts { + large_artifacts(source_id, user_id, 1, 1, 1, TEST_EMBEDDING_DIM) +} + +#[allow( + clippy::too_many_arguments, + clippy::arithmetic_side_effects, + clippy::expect_used +)] +pub fn large_artifacts( + source_id: &str, + user_id: &str, + chunk_count: usize, + entity_count: usize, + relationship_count: usize, + embedding_dim: usize, +) -> PipelineArtifacts { + let now = Utc::now(); + let embedding = vec![0.1_f32; embedding_dim]; + + let mut entities = Vec::with_capacity(entity_count); + let mut entity_ids = Vec::with_capacity(entity_count); + + for index in 0..entity_count { + let entity_id = Uuid::new_v4().to_string(); + entity_ids.push(entity_id.clone()); + entities.push(EmbeddedKnowledgeEntity { + entity: KnowledgeEntity { + id: entity_id, + created_at: now, + updated_at: now, + name: format!("entity-{index}"), + description: format!("description-{index}"), + entity_type: KnowledgeEntityType::Idea, + source_id: source_id.to_string(), + metadata: None, + user_id: user_id.to_string(), + }, + embedding: embedding.clone(), + }); + } + + let mut relationships = Vec::with_capacity(relationship_count); + assert!( + entity_count > 0 || relationship_count == 0, + "large_artifacts requires entity_count > 0 when relationship_count > 0" + ); + for index in 0..relationship_count { + let in_id = entity_ids + .get(index % entity_count) + .expect("entity_count > 0 when relationship_count > 0") + .clone(); + let out_id = entity_ids + .get((index + 1) % entity_count) + .expect("entity_count > 0 when relationship_count > 0") + .clone(); + relationships.push(KnowledgeRelationship::new( + in_id, + out_id, + user_id.to_string(), + source_id.to_string(), + "relates_to".to_string(), + )); + } + + let mut chunks = Vec::with_capacity(chunk_count); + for index in 0..chunk_count { + chunks.push(EmbeddedTextChunk { + chunk: TextChunk { + id: Uuid::new_v4().to_string(), + created_at: now, + updated_at: now, + source_id: source_id.to_string(), + chunk: format!("chunk body {index}"), + user_id: user_id.to_string(), + }, + embedding: embedding.clone(), + }); + } + + PipelineArtifacts { + text_content: TextContent { + id: source_id.to_string(), + created_at: now, + updated_at: now, + text: format!("document with {chunk_count} chunks"), + file_info: None, + url_info: None, + context: None, + category: "notes".to_string(), + user_id: user_id.to_string(), + }, + entities, + relationships, + chunks, + } +} + +pub async fn persist( + db: &SurrealDbClient, + artifacts: PipelineArtifacts, +) -> Result<(), AppError> { + persist_artifacts(db, &tuning(), TEST_EMBEDDING_DIM, artifacts).await?; + Ok(()) +} + +pub async fn count_chunks_for_source(db: &SurrealDbClient, source_id: &str) -> anyhow::Result { + let chunks: Vec = db + .client + .query("SELECT * FROM text_chunk WHERE source_id = $source_id;") + .bind(("source_id", source_id.to_string())) + .await? + .take(0)?; + Ok(chunks.len()) +} + +pub async fn count_entities_for_source( + db: &SurrealDbClient, + source_id: &str, +) -> anyhow::Result { + let entities: Vec = db + .client + .query("SELECT * FROM knowledge_entity WHERE source_id = $source_id;") + .bind(("source_id", source_id.to_string())) + .await? + .take(0)?; + Ok(entities.len()) +} + +pub async fn count_relationships_for_source( + db: &SurrealDbClient, + source_id: &str, +) -> anyhow::Result { + let relationships: Vec = db + .client + .query("SELECT * FROM relates_to WHERE metadata.source_id = $source_id;") + .bind(("source_id", source_id.to_string())) + .await? + .take(0)?; + Ok(relationships.len()) +} diff --git a/ingestion-pipeline/src/pipeline/tests.rs b/ingestion-pipeline/src/pipeline/tests.rs index 61e0c1a..867e9d3 100644 --- a/ingestion-pipeline/src/pipeline/tests.rs +++ b/ingestion-pipeline/src/pipeline/tests.rs @@ -20,12 +20,14 @@ use common::{ }; use retrieval_pipeline::{RetrievedChunk, RetrievedEntity}; use tokio::sync::Mutex; -use uuid::Uuid; - use super::{ config::{IngestionConfig, IngestionTuning}, enrichment_result::LLMEnrichmentResult, services::PipelineServices, + test_support::{ + count_chunks_for_source, count_entities_for_source, count_relationships_for_source, + persist, sample_artifacts, setup_db, + }, IngestionPipeline, }; @@ -141,15 +143,30 @@ impl PipelineServices for MockServices { async fn convert_analysis( &self, - _content: &TextContent, + content: &TextContent, _analysis: &LLMEnrichmentResult, _entity_concurrency: usize, ) -> Result<(Vec, Vec), AppError> { self.record("convert").await; - Ok(( - self.graph_entities.clone(), - self.graph_relationships.clone(), - )) + let entities = self + .graph_entities + .iter() + .map(|embedded| { + let mut embedded = embedded.clone(); + embedded.entity.source_id = content.id.clone(); + embedded + }) + .collect(); + let relationships = self + .graph_relationships + .iter() + .map(|relationship| { + let mut relationship = relationship.clone(); + relationship.metadata.source_id = content.id.clone(); + relationship + }) + .collect(); + Ok((entities, relationships)) } async fn prepare_chunks( @@ -266,14 +283,6 @@ impl PipelineServices for ValidationServices { } } -async fn setup_db() -> anyhow::Result { - let namespace = "pipeline_test"; - let database = Uuid::new_v4().to_string(); - let db = SurrealDbClient::memory(namespace, &database).await?; - db.apply_migrations().await?; - Ok(db) -} - fn pipeline_config() -> IngestionConfig { IngestionConfig { tuning: IngestionTuning { @@ -319,7 +328,41 @@ async fn retry_delay_grows_exponentially_and_caps() -> anyhow::Result<()> { } #[tokio::test] -async fn ingestion_pipeline_happy_path_persists_entities() -> anyhow::Result<()> { +async fn process_task_skips_pipeline_when_artifacts_already_persisted() -> anyhow::Result<()> { + let db = setup_db().await?; + let worker_id = "worker-persisted-skip"; + let user_id = "user-skip"; + let services = Arc::new(FailingServices { + inner: MockServices::new(user_id), + }); + let pipeline = + IngestionPipeline::with_services(Arc::new(db.clone()), pipeline_config(), services)?; + + let task = reserve_task( + &db, + worker_id, + IngestionPayload::Text { + text: "Already persisted payload".into(), + context: "Context".into(), + category: "notes".into(), + user_id: user_id.into(), + }, + user_id, + ) + .await?; + + persist(&db, sample_artifacts(&task.id, user_id)).await?; + + pipeline.process_task(task.clone()).await?; + + let stored_task: IngestionTask = db.get_item(&task.id).await?.context("task present")?; + assert_eq!(stored_task.state, TaskState::Succeeded); + + Ok(()) +} + +#[tokio::test] +async fn ingestion_pipeline_happy_path_persists_artifacts() -> anyhow::Result<()> { let db = setup_db().await?; let worker_id = "worker-happy"; let user_id = "user-123"; @@ -346,14 +389,17 @@ async fn ingestion_pipeline_happy_path_persists_entities() -> anyhow::Result<()> let stored_task: IngestionTask = db.get_item(&task.id).await?.context("task present")?; assert_eq!(stored_task.state, TaskState::Succeeded); - let stored_entities: Vec = - db.get_all_stored_items::().await?; - assert!(!stored_entities.is_empty(), "entities should be stored"); - - let stored_chunks: Vec = db.get_all_stored_items::().await?; - assert!( - !stored_chunks.is_empty(), - "chunks should be stored for ingestion text" + let text_content: TextContent = db.get_item(&task.id).await?.context("text content")?; + assert_eq!( + text_content.id, task.id, + "ingested text_content id should equal the ingestion task id" + ); + assert_eq!(count_chunks_for_source(&db, &task.id).await?, 1); + assert_eq!(count_entities_for_source(&db, &task.id).await?, 1); + assert_eq!( + count_relationships_for_source(&db, &task.id).await?, + 1, + "graph relationships should be persisted" ); let call_log = services.calls.lock().await.clone(); @@ -397,26 +443,14 @@ async fn ingestion_pipeline_chunk_only_skips_analysis() -> anyhow::Result<()> { pipeline.process_task(task.clone()).await?; - let stored_entities: Vec = - db.get_all_stored_items::().await?; - assert!( - stored_entities.is_empty(), - "chunk-only ingestion should not persist entities" - ); - let relationship_count: Option = db - .client - .query("SELECT count() as count FROM relates_to;") - .await? - .take::>(0) - .unwrap_or_default(); assert_eq!( - relationship_count.unwrap_or(0), + count_relationships_for_source(&db, &task.id).await?, 0, "chunk-only ingestion should not persist relationships" ); - let stored_chunks: Vec = db.get_all_stored_items::().await?; - assert!( - !stored_chunks.is_empty(), + assert_eq!( + count_chunks_for_source(&db, &task.id).await?, + 1, "chunk-only ingestion should still persist chunks" );