From 5c2ab58566f83766491a611a4ab92c5f3d83ac5d Mon Sep 17 00:00:00 2001 From: Per Stark Date: Thu, 28 Nov 2024 11:25:00 +0100 Subject: [PATCH] chore: clean & refactor --- .../analysis/types/llm_analysis_result.rs | 25 +++++++------- src/ingress/content_processor.rs | 5 +-- src/rabbitmq/consumer.rs | 14 +------- src/retrieval/graph.rs | 33 +++++++------------ 4 files changed, 27 insertions(+), 50 deletions(-) diff --git a/src/ingress/analysis/types/llm_analysis_result.rs b/src/ingress/analysis/types/llm_analysis_result.rs index fb1bc80..a54f5da 100644 --- a/src/ingress/analysis/types/llm_analysis_result.rs +++ b/src/ingress/analysis/types/llm_analysis_result.rs @@ -39,20 +39,17 @@ pub struct LLMGraphAnalysisResult { pub relationships: Vec, } -/// Converts the LLM graph analysis result into database entities and relationships. -/// Processes embeddings sequentially for simplicity. -/// -/// # Arguments -/// -/// * `source_id` - A UUID representing the source identifier. -/// * `openai_client` - OpenAI client for LLM calls. -/// -/// # Returns -/// -/// * `Result<(Vec, Vec), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`. - impl LLMGraphAnalysisResult { - // Split the main function into smaller, focused functions + /// Converts the LLM graph analysis result into database entities and relationships. + /// + /// # Arguments + /// + /// * `source_id` - A UUID representing the source identifier. + /// * `openai_client` - OpenAI client for LLM calls. + /// + /// # Returns + /// + /// * `Result<(Vec, Vec), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`. pub async fn to_database_entities( &self, source_id: &str, @@ -61,7 +58,7 @@ impl LLMGraphAnalysisResult { // Create mapper and pre-assign IDs let mapper = Arc::new(Mutex::new(self.create_mapper()?)); - // Process entities (prepared for future parallelization) + // Process entities let entities = self .process_entities(source_id, Arc::clone(&mapper), openai_client) .await?; diff --git a/src/ingress/content_processor.rs b/src/ingress/content_processor.rs index 4a236ec..0c3e308 100644 --- a/src/ingress/content_processor.rs +++ b/src/ingress/content_processor.rs @@ -37,7 +37,8 @@ impl ContentProcessor { store_item(&self.db_client, content.clone()).await?; let now = Instant::now(); - // Process in parallel where possible + + // Perform analyis, this step also includes retrieval let analysis = self.perform_semantic_analysis(content).await?; let end = now.elapsed(); @@ -46,7 +47,7 @@ impl ContentProcessor { end ); - // Convert and store entities + // Convert analysis to objects let (entities, relationships) = analysis .to_database_entities(&content.id, &self.openai_client) .await?; diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index 30d749f..dd3e29c 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -3,10 +3,7 @@ use lapin::{message::Delivery, options::*, types::FieldTable, Channel, Consumer, use crate::{ error::IngressConsumerError, - ingress::{ - content_processor::ContentProcessor, - types::{ingress_input::IngressContentError, ingress_object::IngressObject}, - }, + ingress::{content_processor::ContentProcessor, types::ingress_object::IngressObject}, }; use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError}; @@ -195,13 +192,4 @@ impl RabbitMQConsumer { Ok(()) } - - pub async fn handle_ingress_content( - &self, - ingress: &IngressObject, - ) -> Result<(), IngressContentError> { - info!("Processing IngressContent: {:?}", ingress); - - unimplemented!() - } } diff --git a/src/retrieval/graph.rs b/src/retrieval/graph.rs index 8157ef4..86d0a34 100644 --- a/src/retrieval/graph.rs +++ b/src/retrieval/graph.rs @@ -1,10 +1,7 @@ -use surrealdb::{engine::remote::ws::Client, Surreal}; +use surrealdb::{engine::remote::ws::Client, Error, Surreal}; use tracing::debug; -use crate::{ - error::ProcessingError, - storage::types::{knowledge_entity::KnowledgeEntity, StoredObject}, -}; +use crate::storage::types::{knowledge_entity::KnowledgeEntity, StoredObject}; /// Retrieves database entries that match a specific source identifier. /// @@ -26,11 +23,11 @@ use crate::{ /// /// Returns a `Result` containing either: /// * `Ok(Vec)` - A vector of matching records deserialized into type `T` -/// * `Err(ProcessingError)` - An error if the database query fails +/// * `Err(Error)` - An error if the database query fails /// /// # Errors /// -/// This function will return a `ProcessingError` if: +/// This function will return a `Error` if: /// * The database query fails to execute /// * The results cannot be deserialized into type `T` /// @@ -54,27 +51,25 @@ pub async fn find_entities_by_source_ids( source_id: Vec, table_name: String, db_client: &Surreal, -) -> Result, ProcessingError> +) -> Result, Error> where T: for<'de> serde::Deserialize<'de>, { let query = "SELECT * FROM type::table($table) WHERE source_id IN $source_ids"; - let matching_entities: Vec = db_client + db_client .query(query) .bind(("table", table_name)) .bind(("source_ids", source_id)) .await? - .take(0)?; - - Ok(matching_entities) + .take(0) } /// Find entities by their relationship to the id pub async fn find_entities_by_relationship_by_id( db_client: &Surreal, entity_id: String, -) -> Result, ProcessingError> { +) -> Result, Error> { let query = format!( "SELECT *, <-> relates_to <-> knowledge_entity AS related FROM knowledge_entity:`{}`", entity_id @@ -82,19 +77,15 @@ pub async fn find_entities_by_relationship_by_id( debug!("{}", query); - let result: Vec = db_client.query(query).await?.take(0)?; - - Ok(result) + db_client.query(query).await?.take(0) } /// Get a specific KnowledgeEntity by its id pub async fn get_entity_by_id( db_client: &Surreal, entity_id: &str, -) -> Result, ProcessingError> { - let response: Option = db_client +) -> Result, Error> { + db_client .select((KnowledgeEntity::table_name(), entity_id)) - .await?; - - Ok(response) + .await }