fix: improved concurrency

This commit is contained in:
Per Stark
2025-09-28 22:08:08 +02:00
parent 5cb15dab45
commit b0ed69330d

View File

@@ -1,9 +1,9 @@
use std::{sync::Arc, time::Instant};
use chrono::Utc;
use futures::future::try_join_all;
use text_splitter::TextSplitter;
use tracing::info;
use tokio::time::{sleep, Duration};
use tracing::{info, warn};
use common::{
error::AppError,
@@ -135,17 +135,73 @@ impl IngestionPipeline {
entities: Vec<KnowledgeEntity>,
relationships: Vec<KnowledgeRelationship>,
) -> Result<(), AppError> {
let entities = Arc::new(entities);
let relationships = Arc::new(relationships);
let entity_count = entities.len();
let relationship_count = relationships.len();
let entity_futures = entities
.iter()
.map(|entitity| self.db.store_item(entitity.to_owned()));
const STORE_GRAPH_MUTATION: &str = r#"
BEGIN TRANSACTION;
LET $entities = $entities;
LET $relationships = $relationships;
try_join_all(entity_futures).await?;
FOR $entity IN $entities {
CREATE type::thing('knowledge_entity', $entity.id) CONTENT $entity;
};
for relationship in &relationships {
relationship.store_relationship(&self.db).await?;
FOR $relationship IN $relationships {
LET $in_node = type::thing('knowledge_entity', $relationship.in);
LET $out_node = type::thing('knowledge_entity', $relationship.out);
RELATE $in_node->relates_to->$out_node CONTENT {
id: type::thing('relates_to', $relationship.id),
metadata: $relationship.metadata
};
};
COMMIT TRANSACTION;
"#;
const MAX_ATTEMPTS: usize = 3;
const INITIAL_BACKOFF_MS: u64 = 50;
const MAX_BACKOFF_MS: u64 = 800;
let mut backoff_ms = INITIAL_BACKOFF_MS;
let mut success = false;
for attempt in 0..MAX_ATTEMPTS {
let result = self
.db
.client
.query(STORE_GRAPH_MUTATION)
.bind(("entities", entities.clone()))
.bind(("relationships", relationships.clone()))
.await;
match result {
Ok(_) => {
success = true;
break;
}
Err(err) => {
if Self::is_retryable_conflict(&err) && attempt + 1 < MAX_ATTEMPTS {
warn!(
attempt = attempt + 1,
"Transient SurrealDB conflict while storing graph data; retrying"
);
sleep(Duration::from_millis(backoff_ms)).await;
backoff_ms = (backoff_ms * 2).min(MAX_BACKOFF_MS);
continue;
}
return Err(AppError::from(err));
}
}
}
if !success {
return Err(AppError::InternalError(
"Failed to store graph entities after retries".to_string(),
));
}
info!(
@@ -173,4 +229,10 @@ impl IngestionPipeline {
Ok(())
}
fn is_retryable_conflict(error: &surrealdb::Error) -> bool {
error
.to_string()
.contains("Failed to commit transaction due to a read or write conflict")
}
}