diff --git a/ingestion-pipeline/src/pipeline.rs b/ingestion-pipeline/src/pipeline.rs index 618d19b..15e26aa 100644 --- a/ingestion-pipeline/src/pipeline.rs +++ b/ingestion-pipeline/src/pipeline.rs @@ -1,8 +1,9 @@ use std::{sync::Arc, time::Instant}; use chrono::Utc; +use futures::future::try_join_all; use text_splitter::TextSplitter; -use tracing::{debug, info}; +use tracing::info; use common::{ error::AppError, @@ -134,20 +135,22 @@ impl IngestionPipeline { entities: Vec, relationships: Vec, ) -> Result<(), AppError> { - for entity in &entities { - debug!("Storing entity: {:?}", entity); - self.db.store_item(entity.clone()).await?; - } + let entity_count = entities.len(); + let relationship_count = relationships.len(); + + let entity_futures = entities + .iter() + .map(|entitity| self.db.store_item(entitity.to_owned())); + + try_join_all(entity_futures).await?; for relationship in &relationships { - debug!("Storing relationship: {:?}", relationship); relationship.store_relationship(&self.db).await?; } info!( "Stored {} entities and {} relationships", - entities.len(), - relationships.len() + entity_count, relationship_count ); Ok(()) }