mirror of
https://github.com/perstarkse/minne.git
synced 2026-04-24 09:48:32 +02:00
fix: somewhat improved concurrency
limiting edge creation to sequential due to surrealdb
This commit is contained in:
@@ -1,8 +1,9 @@
|
|||||||
use std::{sync::Arc, time::Instant};
|
use std::{sync::Arc, time::Instant};
|
||||||
|
|
||||||
use chrono::Utc;
|
use chrono::Utc;
|
||||||
|
use futures::future::try_join_all;
|
||||||
use text_splitter::TextSplitter;
|
use text_splitter::TextSplitter;
|
||||||
use tracing::{debug, info};
|
use tracing::info;
|
||||||
|
|
||||||
use common::{
|
use common::{
|
||||||
error::AppError,
|
error::AppError,
|
||||||
@@ -134,20 +135,22 @@ impl IngestionPipeline {
|
|||||||
entities: Vec<KnowledgeEntity>,
|
entities: Vec<KnowledgeEntity>,
|
||||||
relationships: Vec<KnowledgeRelationship>,
|
relationships: Vec<KnowledgeRelationship>,
|
||||||
) -> Result<(), AppError> {
|
) -> Result<(), AppError> {
|
||||||
for entity in &entities {
|
let entity_count = entities.len();
|
||||||
debug!("Storing entity: {:?}", entity);
|
let relationship_count = relationships.len();
|
||||||
self.db.store_item(entity.clone()).await?;
|
|
||||||
}
|
let entity_futures = entities
|
||||||
|
.iter()
|
||||||
|
.map(|entitity| self.db.store_item(entitity.to_owned()));
|
||||||
|
|
||||||
|
try_join_all(entity_futures).await?;
|
||||||
|
|
||||||
for relationship in &relationships {
|
for relationship in &relationships {
|
||||||
debug!("Storing relationship: {:?}", relationship);
|
|
||||||
relationship.store_relationship(&self.db).await?;
|
relationship.store_relationship(&self.db).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"Stored {} entities and {} relationships",
|
"Stored {} entities and {} relationships",
|
||||||
entities.len(),
|
entity_count, relationship_count
|
||||||
relationships.len()
|
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user