chore: clean & refactor

This commit is contained in:
Per Stark
2024-11-28 11:25:00 +01:00
parent 1cd26061d7
commit 5c2ab58566
4 changed files with 27 additions and 50 deletions

View File

@@ -39,20 +39,17 @@ pub struct LLMGraphAnalysisResult {
pub relationships: Vec<LLMRelationship>,
}
/// Converts the LLM graph analysis result into database entities and relationships.
/// Processes embeddings sequentially for simplicity.
///
/// # Arguments
///
/// * `source_id` - A UUID representing the source identifier.
/// * `openai_client` - OpenAI client for LLM calls.
///
/// # Returns
///
/// * `Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`.
impl LLMGraphAnalysisResult {
// Split the main function into smaller, focused functions
/// Converts the LLM graph analysis result into database entities and relationships.
///
/// # Arguments
///
/// * `source_id` - A UUID representing the source identifier.
/// * `openai_client` - OpenAI client for LLM calls.
///
/// # Returns
///
/// * `Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`.
pub async fn to_database_entities(
&self,
source_id: &str,
@@ -61,7 +58,7 @@ impl LLMGraphAnalysisResult {
// Create mapper and pre-assign IDs
let mapper = Arc::new(Mutex::new(self.create_mapper()?));
// Process entities (prepared for future parallelization)
// Process entities
let entities = self
.process_entities(source_id, Arc::clone(&mapper), openai_client)
.await?;

View File

@@ -37,7 +37,8 @@ impl ContentProcessor {
store_item(&self.db_client, content.clone()).await?;
let now = Instant::now();
// Process in parallel where possible
// Perform analyis, this step also includes retrieval
let analysis = self.perform_semantic_analysis(content).await?;
let end = now.elapsed();
@@ -46,7 +47,7 @@ impl ContentProcessor {
end
);
// Convert and store entities
// Convert analysis to objects
let (entities, relationships) = analysis
.to_database_entities(&content.id, &self.openai_client)
.await?;

View File

@@ -3,10 +3,7 @@ use lapin::{message::Delivery, options::*, types::FieldTable, Channel, Consumer,
use crate::{
error::IngressConsumerError,
ingress::{
content_processor::ContentProcessor,
types::{ingress_input::IngressContentError, ingress_object::IngressObject},
},
ingress::{content_processor::ContentProcessor, types::ingress_object::IngressObject},
};
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
@@ -195,13 +192,4 @@ impl RabbitMQConsumer {
Ok(())
}
pub async fn handle_ingress_content(
&self,
ingress: &IngressObject,
) -> Result<(), IngressContentError> {
info!("Processing IngressContent: {:?}", ingress);
unimplemented!()
}
}

View File

@@ -1,10 +1,7 @@
use surrealdb::{engine::remote::ws::Client, Surreal};
use surrealdb::{engine::remote::ws::Client, Error, Surreal};
use tracing::debug;
use crate::{
error::ProcessingError,
storage::types::{knowledge_entity::KnowledgeEntity, StoredObject},
};
use crate::storage::types::{knowledge_entity::KnowledgeEntity, StoredObject};
/// Retrieves database entries that match a specific source identifier.
///
@@ -26,11 +23,11 @@ use crate::{
///
/// Returns a `Result` containing either:
/// * `Ok(Vec<T>)` - A vector of matching records deserialized into type `T`
/// * `Err(ProcessingError)` - An error if the database query fails
/// * `Err(Error)` - An error if the database query fails
///
/// # Errors
///
/// This function will return a `ProcessingError` if:
/// This function will return a `Error` if:
/// * The database query fails to execute
/// * The results cannot be deserialized into type `T`
///
@@ -54,27 +51,25 @@ pub async fn find_entities_by_source_ids<T>(
source_id: Vec<String>,
table_name: String,
db_client: &Surreal<Client>,
) -> Result<Vec<T>, ProcessingError>
) -> Result<Vec<T>, Error>
where
T: for<'de> serde::Deserialize<'de>,
{
let query = "SELECT * FROM type::table($table) WHERE source_id IN $source_ids";
let matching_entities: Vec<T> = db_client
db_client
.query(query)
.bind(("table", table_name))
.bind(("source_ids", source_id))
.await?
.take(0)?;
Ok(matching_entities)
.take(0)
}
/// Find entities by their relationship to the id
pub async fn find_entities_by_relationship_by_id(
db_client: &Surreal<Client>,
entity_id: String,
) -> Result<Vec<KnowledgeEntity>, ProcessingError> {
) -> Result<Vec<KnowledgeEntity>, Error> {
let query = format!(
"SELECT *, <-> relates_to <-> knowledge_entity AS related FROM knowledge_entity:`{}`",
entity_id
@@ -82,19 +77,15 @@ pub async fn find_entities_by_relationship_by_id(
debug!("{}", query);
let result: Vec<KnowledgeEntity> = db_client.query(query).await?.take(0)?;
Ok(result)
db_client.query(query).await?.take(0)
}
/// Get a specific KnowledgeEntity by its id
pub async fn get_entity_by_id(
db_client: &Surreal<Client>,
entity_id: &str,
) -> Result<Option<KnowledgeEntity>, ProcessingError> {
let response: Option<KnowledgeEntity> = db_client
) -> Result<Option<KnowledgeEntity>, Error> {
db_client
.select((KnowledgeEntity::table_name(), entity_id))
.await?;
Ok(response)
.await
}