fix: atomic ingestion persist with task reclaim and shared cleanup.

One transaction per task replaces prior artifact rows; workers skip the pipeline when content already exists, eval seeding reuses persist_artifacts, and deletes clear graph children via shared SQL.
This commit is contained in:
Per Stark
2026-06-12 16:27:07 +02:00
parent cf69cb7b05
commit 1013035731
15 changed files with 794 additions and 487 deletions
+24 -283
View File
@@ -7,33 +7,24 @@ use std::{
use anyhow::{anyhow, Context, Result};
use chrono::{DateTime, Utc};
use common::storage::types::StoredObject;
use common::storage::{
db::SurrealDbClient,
types::{
knowledge_entity::KnowledgeEntity,
knowledge_entity_embedding::KnowledgeEntityEmbedding,
knowledge_relationship::{KnowledgeRelationship, RelationshipMetadata},
knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk,
text_chunk_embedding::TextChunkEmbedding,
text_content::TextContent,
StoredObject,
},
};
use ingestion_pipeline::{persist_artifacts, IngestionTuning, PipelineArtifacts};
use serde::Deserialize;
use serde::Serialize;
use surrealdb::sql::Thing;
use tracing::{debug, warn};
use crate::datasets::{ConvertedParagraph, ConvertedQuestion};
pub const MANIFEST_VERSION: u32 = 3;
pub const PARAGRAPH_SHARD_VERSION: u32 = 3;
const MANIFEST_BATCH_SIZE: usize = 100;
const MANIFEST_MAX_BYTES_PER_BATCH: usize = 300_000; // default cap for non-text batches
const TEXT_CONTENT_MAX_BYTES_PER_BATCH: usize = 250_000; // text bodies can be large; limit aggressively
const MAX_BATCHES_PER_REQUEST: usize = 24;
const REQUEST_MAX_BYTES: usize = 800_000; // total payload cap per Surreal query request
fn current_manifest_version() -> u32 {
MANIFEST_VERSION
}
@@ -251,130 +242,6 @@ pub fn window_manifest(
Ok(narrowed)
}
#[derive(Debug, Clone, Serialize)]
struct RelationInsert {
#[serde(rename = "in")]
pub in_: Thing,
#[serde(rename = "out")]
pub out: Thing,
pub id: String,
pub metadata: RelationshipMetadata,
}
#[derive(Debug)]
struct SizedBatch<T> {
approx_bytes: usize,
items: Vec<T>,
}
struct ManifestBatches {
text_contents: Vec<SizedBatch<TextContent>>,
entities: Vec<SizedBatch<KnowledgeEntity>>,
entity_embeddings: Vec<SizedBatch<KnowledgeEntityEmbedding>>,
relationships: Vec<SizedBatch<RelationInsert>>,
chunks: Vec<SizedBatch<TextChunk>>,
chunk_embeddings: Vec<SizedBatch<TextChunkEmbedding>>,
}
fn build_manifest_batches(manifest: &CorpusManifest) -> Result<ManifestBatches> {
let mut text_contents = Vec::new();
let mut entities = Vec::new();
let mut entity_embeddings = Vec::new();
let mut relationships = Vec::new();
let mut chunks = Vec::new();
let mut chunk_embeddings = Vec::new();
let mut seen_text_content = HashSet::new();
let mut seen_entities = HashSet::new();
let mut seen_relationships = HashSet::new();
let mut seen_chunks = HashSet::new();
for paragraph in &manifest.paragraphs {
if seen_text_content.insert(paragraph.text_content.id.clone()) {
text_contents.push(paragraph.text_content.clone());
}
for embedded_entity in &paragraph.entities {
if seen_entities.insert(embedded_entity.entity.id.clone()) {
let entity = embedded_entity.entity.clone();
entities.push(entity.clone());
entity_embeddings.push(KnowledgeEntityEmbedding::new(
&entity.id,
entity.source_id.clone(),
embedded_entity.embedding.clone(),
entity.user_id.clone(),
));
}
}
for relationship in &paragraph.relationships {
if seen_relationships.insert(relationship.id.clone()) {
let table = KnowledgeEntity::table_name();
let in_id = relationship
.in_
.strip_prefix(&format!("{table}:"))
.unwrap_or(&relationship.in_);
let out_id = relationship
.out
.strip_prefix(&format!("{table}:"))
.unwrap_or(&relationship.out);
let in_thing = Thing::from((table, in_id));
let out_thing = Thing::from((table, out_id));
relationships.push(RelationInsert {
in_: in_thing,
out: out_thing,
id: relationship.id.clone(),
metadata: relationship.metadata.clone(),
});
}
}
for embedded_chunk in &paragraph.chunks {
if seen_chunks.insert(embedded_chunk.chunk.id.clone()) {
let chunk = embedded_chunk.chunk.clone();
chunks.push(chunk.clone());
chunk_embeddings.push(TextChunkEmbedding::new(
&chunk.id,
chunk.source_id.clone(),
embedded_chunk.embedding.clone(),
chunk.user_id.clone(),
));
}
}
}
Ok(ManifestBatches {
text_contents: chunk_items(
&text_contents,
MANIFEST_BATCH_SIZE,
TEXT_CONTENT_MAX_BYTES_PER_BATCH,
)
.context("chunking text_content payloads")?,
entities: chunk_items(&entities, MANIFEST_BATCH_SIZE, MANIFEST_MAX_BYTES_PER_BATCH)
.context("chunking knowledge_entity payloads")?,
entity_embeddings: chunk_items(
&entity_embeddings,
MANIFEST_BATCH_SIZE,
MANIFEST_MAX_BYTES_PER_BATCH,
)
.context("chunking knowledge_entity_embedding payloads")?,
relationships: chunk_items(
&relationships,
MANIFEST_BATCH_SIZE,
MANIFEST_MAX_BYTES_PER_BATCH,
)
.context("chunking relationship payloads")?,
chunks: chunk_items(&chunks, MANIFEST_BATCH_SIZE, MANIFEST_MAX_BYTES_PER_BATCH)
.context("chunking text_chunk payloads")?,
chunk_embeddings: chunk_items(
&chunk_embeddings,
MANIFEST_BATCH_SIZE,
MANIFEST_MAX_BYTES_PER_BATCH,
)
.context("chunking text_chunk_embedding payloads")?,
})
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ParagraphShard {
#[serde(default = "current_paragraph_shard_version")]
@@ -599,157 +466,28 @@ fn normalize_answer_text(text: &str) -> String {
.join(" ")
}
#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)]
fn chunk_items<T: Clone + Serialize>(
items: &[T],
max_items: usize,
max_bytes: usize,
) -> Result<Vec<SizedBatch<T>>> {
if items.is_empty() {
return Ok(Vec::new());
}
let mut batches = Vec::new();
let mut current = Vec::new();
let mut current_bytes = 0usize;
for item in items {
let size = serde_json::to_vec(item)
.map(|buf| buf.len())
.context("serialising batch item for sizing")?;
let would_overflow_items = !current.is_empty() && current.len() >= max_items;
let would_overflow_bytes = !current.is_empty() && current_bytes + size > max_bytes;
if would_overflow_items || would_overflow_bytes {
batches.push(SizedBatch {
approx_bytes: current_bytes.max(1),
items: std::mem::take(&mut current),
});
current_bytes = 0;
}
current_bytes += size;
current.push(item.clone());
}
if !current.is_empty() {
batches.push(SizedBatch {
approx_bytes: current_bytes.max(1),
items: current,
});
}
Ok(batches)
}
#[allow(clippy::arithmetic_side_effects, clippy::indexing_slicing)]
async fn execute_batched_inserts<T: Clone + Serialize + 'static>(
db: &SurrealDbClient,
statement: impl AsRef<str>,
prefix: &str,
batches: &[SizedBatch<T>],
) -> Result<()> {
if batches.is_empty() {
return Ok(());
}
let mut start = 0;
while start < batches.len() {
let mut group_bytes = 0usize;
let mut group_end = start;
let mut group_count = 0usize;
while group_end < batches.len() {
let batch_bytes = batches[group_end].approx_bytes.max(1);
if group_count > 0
&& (group_bytes + batch_bytes > REQUEST_MAX_BYTES
|| group_count >= MAX_BATCHES_PER_REQUEST)
{
break;
}
group_bytes += batch_bytes;
group_end += 1;
group_count += 1;
}
let slice = &batches[start..group_end];
let mut query = db.client.query("BEGIN TRANSACTION;");
for (bind_index, batch) in slice.iter().enumerate() {
let name = format!("{prefix}{bind_index}");
query = query
.query(format!("{} ${};", statement.as_ref(), name))
.bind((name, batch.items.clone()));
}
let response = query
.query("COMMIT TRANSACTION;")
.await
.context("executing batched insert transaction")?;
if let Err(err) = response.check() {
return Err(anyhow!(
"batched insert failed for statement '{}': {err:?}",
statement.as_ref()
));
}
start = group_end;
}
Ok(())
}
pub async fn seed_manifest_into_db(db: &SurrealDbClient, manifest: &CorpusManifest) -> Result<()> {
let batches = build_manifest_batches(manifest).context("preparing manifest batches")?;
let tuning = IngestionTuning::default();
let embedding_dimensions = manifest.metadata.embedding_dimension;
let mut seen_text_content = HashSet::new();
let result = async {
execute_batched_inserts(
db,
format!("INSERT INTO {}", TextContent::table_name()),
"tc",
&batches.text_contents,
)
.await?;
for paragraph in &manifest.paragraphs {
if !seen_text_content.insert(paragraph.text_content.id.clone()) {
continue;
}
execute_batched_inserts(
db,
format!("INSERT INTO {}", KnowledgeEntity::table_name()),
"ke",
&batches.entities,
)
.await?;
execute_batched_inserts(
db,
format!("INSERT INTO {}", TextChunk::table_name()),
"ch",
&batches.chunks,
)
.await?;
execute_batched_inserts(
db,
"INSERT RELATION INTO relates_to",
"rel",
&batches.relationships,
)
.await?;
execute_batched_inserts(
db,
format!("INSERT INTO {}", KnowledgeEntityEmbedding::table_name()),
"kee",
&batches.entity_embeddings,
)
.await?;
execute_batched_inserts(
db,
format!("INSERT INTO {}", TextChunkEmbedding::table_name()),
"tce",
&batches.chunk_embeddings,
)
.await?;
let artifacts = PipelineArtifacts {
text_content: paragraph.text_content.clone(),
entities: paragraph.entities.clone(),
relationships: paragraph.relationships.clone(),
chunks: paragraph.chunks.clone(),
};
persist_artifacts(db, &tuning, embedding_dimensions, artifacts)
.await
.map_err(|err| anyhow!("persist manifest paragraph: {err}"))?;
}
Ok(())
}
.await;
@@ -778,7 +516,10 @@ pub async fn seed_manifest_into_db(db: &SurrealDbClient, manifest: &CorpusManife
mod tests {
use super::*;
use chrono::Utc;
use common::storage::types::knowledge_entity::KnowledgeEntityType;
use common::storage::types::{
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
text_chunk::TextChunk,
};
use uuid::Uuid;
#[allow(clippy::too_many_lines)]