From b0ed69330d1c9916684870fb494500772c8c55a2 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Sun, 28 Sep 2025 22:08:08 +0200 Subject: [PATCH] fix: improved concurrency --- ingestion-pipeline/src/pipeline.rs | 78 +++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 8 deletions(-) diff --git a/ingestion-pipeline/src/pipeline.rs b/ingestion-pipeline/src/pipeline.rs index 15e26aa..cffaf90 100644 --- a/ingestion-pipeline/src/pipeline.rs +++ b/ingestion-pipeline/src/pipeline.rs @@ -1,9 +1,9 @@ use std::{sync::Arc, time::Instant}; use chrono::Utc; -use futures::future::try_join_all; use text_splitter::TextSplitter; -use tracing::info; +use tokio::time::{sleep, Duration}; +use tracing::{info, warn}; use common::{ error::AppError, @@ -135,17 +135,73 @@ impl IngestionPipeline { entities: Vec, relationships: Vec, ) -> Result<(), AppError> { + let entities = Arc::new(entities); + let relationships = Arc::new(relationships); let entity_count = entities.len(); let relationship_count = relationships.len(); - let entity_futures = entities - .iter() - .map(|entitity| self.db.store_item(entitity.to_owned())); + const STORE_GRAPH_MUTATION: &str = r#" + BEGIN TRANSACTION; + LET $entities = $entities; + LET $relationships = $relationships; - try_join_all(entity_futures).await?; + FOR $entity IN $entities { + CREATE type::thing('knowledge_entity', $entity.id) CONTENT $entity; + }; - for relationship in &relationships { - relationship.store_relationship(&self.db).await?; + FOR $relationship IN $relationships { + LET $in_node = type::thing('knowledge_entity', $relationship.in); + LET $out_node = type::thing('knowledge_entity', $relationship.out); + RELATE $in_node->relates_to->$out_node CONTENT { + id: type::thing('relates_to', $relationship.id), + metadata: $relationship.metadata + }; + }; + + COMMIT TRANSACTION; + "#; + + const MAX_ATTEMPTS: usize = 3; + const INITIAL_BACKOFF_MS: u64 = 50; + const MAX_BACKOFF_MS: u64 = 800; + + let mut backoff_ms = INITIAL_BACKOFF_MS; + let mut success = false; + + for attempt in 0..MAX_ATTEMPTS { + let result = self + .db + .client + .query(STORE_GRAPH_MUTATION) + .bind(("entities", entities.clone())) + .bind(("relationships", relationships.clone())) + .await; + + match result { + Ok(_) => { + success = true; + break; + } + Err(err) => { + if Self::is_retryable_conflict(&err) && attempt + 1 < MAX_ATTEMPTS { + warn!( + attempt = attempt + 1, + "Transient SurrealDB conflict while storing graph data; retrying" + ); + sleep(Duration::from_millis(backoff_ms)).await; + backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS); + continue; + } + + return Err(AppError::from(err)); + } + } + } + + if !success { + return Err(AppError::InternalError( + "Failed to store graph entities after retries".to_string(), + )); } info!( @@ -173,4 +229,10 @@ impl IngestionPipeline { Ok(()) } + + fn is_retryable_conflict(error: &surrealdb::Error) -> bool { + error + .to_string() + .contains("Failed to commit transaction due to a read or write conflict") + } }