refactoring: ingress_analyser and cleaning

This commit is contained in:
Per Stark
2024-11-21 20:26:59 +01:00
parent 22abd3d731
commit 94f328e542
13 changed files with 420 additions and 375 deletions

View File

@@ -0,0 +1,147 @@
use crate::{
analysis::ingress::{
prompt::{get_ingress_analysis_schema, INGRESS_ANALYSIS_SYSTEM_MESSAGE},
types::llm_analysis_result::LLMGraphAnalysisResult,
},
error::ProcessingError,
retrieval::vector::find_items_by_vector_similarity,
storage::types::{knowledge_entity::KnowledgeEntity, StoredObject},
};
use async_openai::types::{
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
CreateChatCompletionRequest, CreateChatCompletionRequestArgs, ResponseFormat,
ResponseFormatJsonSchema,
};
use serde_json::json;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::{debug, instrument};
pub struct IngressAnalyzer<'a> {
db_client: &'a Surreal<Client>,
openai_client: &'a async_openai::Client<async_openai::config::OpenAIConfig>,
}
impl<'a> IngressAnalyzer<'a> {
pub fn new(
db_client: &'a Surreal<Client>,
openai_client: &'a async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Self {
Self {
db_client,
openai_client,
}
}
#[instrument(skip(self))]
pub async fn analyze_content(
&self,
category: &str,
instructions: &str,
text: &str,
) -> Result<LLMGraphAnalysisResult, ProcessingError> {
let similar_entities = self
.find_similar_entities(category, instructions, text)
.await?;
let llm_request =
self.prepare_llm_request(category, instructions, text, &similar_entities)?;
self.perform_analysis(llm_request).await
}
#[instrument(skip(self))]
async fn find_similar_entities(
&self,
category: &str,
instructions: &str,
text: &str,
) -> Result<Vec<KnowledgeEntity>, ProcessingError> {
let input_text = format!(
"content: {}, category: {}, user_instructions: {}",
text, category, instructions
);
find_items_by_vector_similarity(
10,
input_text,
self.db_client,
KnowledgeEntity::table_name().to_string(),
self.openai_client,
)
.await
}
#[instrument(skip(self))]
fn prepare_llm_request(
&self,
category: &str,
instructions: &str,
text: &str,
similar_entities: &[KnowledgeEntity],
) -> Result<CreateChatCompletionRequest, ProcessingError> {
let entities_json = json!(similar_entities
.iter()
.map(|entity| {
json!({
"KnowledgeEntity": {
"id": entity.id,
"name": entity.name,
"description": entity.description
}
})
})
.collect::<Vec<_>>());
let user_message = format!(
"Category:\n{}\nInstructions:\n{}\nContent:\n{}\nExisting KnowledgeEntities in database:\n{}",
category, instructions, text, entities_json
);
debug!("Prepared LLM request message: {}", user_message);
let response_format = ResponseFormat::JsonSchema {
json_schema: ResponseFormatJsonSchema {
description: Some("Structured analysis of the submitted content".into()),
name: "content_analysis".into(),
schema: Some(get_ingress_analysis_schema()),
strict: Some(true),
},
};
CreateChatCompletionRequestArgs::default()
.model("gpt-4-mini")
.temperature(0.2)
.max_tokens(2048u32)
.messages([
ChatCompletionRequestSystemMessage::from(INGRESS_ANALYSIS_SYSTEM_MESSAGE).into(),
ChatCompletionRequestUserMessage::from(user_message).into(),
])
.response_format(response_format)
.build()
.map_err(|e| ProcessingError::LLMParsingError(e.to_string()))
}
#[instrument(skip(self, request))]
async fn perform_analysis(
&self,
request: CreateChatCompletionRequest,
) -> Result<LLMGraphAnalysisResult, ProcessingError> {
let response = self.openai_client.chat().create(request).await?;
debug!("Received LLM response: {:?}", response);
response
.choices
.first()
.and_then(|choice| choice.message.content.as_ref())
.ok_or(ProcessingError::LLMParsingError(
"No content found in LLM response".into(),
))
.and_then(|content| {
serde_json::from_str(content).map_err(|e| {
ProcessingError::LLMParsingError(format!(
"Failed to parse LLM response into analysis: {}",
e
))
})
})
}
}

View File

@@ -0,0 +1,3 @@
pub mod ingress_analyser;
pub mod prompt;
pub mod types;

View File

@@ -0,0 +1,81 @@
use serde_json::{json, Value};
pub static INGRESS_ANALYSIS_SYSTEM_MESSAGE: &str = r#"
You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON object representing the content in a graph format suitable for a graph database. You will also be presented with some existing knowledge_entities from the database, do not replicate these!
The JSON should have the following structure:
{
"knowledge_entities": [
{
"key": "unique-key-1",
"name": "Entity Name",
"description": "A detailed description of the entity.",
"entity_type": "TypeOfEntity"
},
// More entities...
],
"relationships": [
{
"type": "RelationshipType",
"source": "unique-key-1 or UUID from existing database",
"target": "unique-key-1 or UUID from existing database"
},
// More relationships...
]
}
Guidelines:
1. Do NOT generate any IDs or UUIDs. Use a unique `key` for each knowledge entity.
2. Each KnowledgeEntity should have a unique `key`, a meaningful `name`, and a descriptive `description`.
3. Define the type of each KnowledgeEntity using the following categories: Idea, Project, Document, Page, TextSnippet.
4. Establish relationships between entities using types like RelatedTo, RelevantTo, SimilarTo.
5. Use the `source` key to indicate the originating entity and the `target` key to indicate the related entity"
6. You will be presented with a few existing KnowledgeEntities that are similar to the current ones. They will have an existing UUID. When creating relationships to these entities, use their UUID.
7. Only create relationships between existing KnowledgeEntities.
8. Entities that exist already in the database should NOT be created again. If there is only a minor overlap, skip creating a new entity.
9. A new relationship MUST include a newly created KnowledgeEntity.
"#;
pub fn get_ingress_analysis_schema() -> Value {
json!({
"type": "object",
"properties": {
"knowledge_entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": { "type": "string" },
"name": { "type": "string" },
"description": { "type": "string" },
"entity_type": {
"type": "string",
"enum": ["idea", "project", "document", "page", "textsnippet"]
}
},
"required": ["key", "name", "description", "entity_type"],
"additionalProperties": false
}
},
"relationships": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["RelatedTo", "RelevantTo", "SimilarTo"]
},
"source": { "type": "string" },
"target": { "type": "string" }
},
"required": ["type", "source", "target"],
"additionalProperties": false
}
}
},
"required": ["knowledge_entities", "relationships"],
"additionalProperties": false
})
}

View File

@@ -0,0 +1,174 @@
use std::sync::{Arc, Mutex};
use serde::{Deserialize, Serialize};
use tokio::task;
use crate::{
error::ProcessingError,
models::graph_entities::GraphMapper,
storage::types::{
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
knowledge_relationship::KnowledgeRelationship,
},
utils::embedding::generate_embedding,
};
use futures::future::try_join_all; // For future parallelization
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMKnowledgeEntity {
pub key: String, // Temporary identifier
pub name: String,
pub description: String,
pub entity_type: String, // Should match KnowledgeEntityType variants
}
/// Represents a single relationship from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMRelationship {
#[serde(rename = "type")]
pub type_: String, // e.g., RelatedTo, RelevantTo
pub source: String, // Key of the source entity
pub target: String, // Key of the target entity
}
/// Represents the entire graph analysis result from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMGraphAnalysisResult {
pub knowledge_entities: Vec<LLMKnowledgeEntity>,
pub relationships: Vec<LLMRelationship>,
}
/// Converts the LLM graph analysis result into database entities and relationships.
/// Processes embeddings sequentially for simplicity.
///
/// # Arguments
///
/// * `source_id` - A UUID representing the source identifier.
/// * `openai_client` - OpenAI client for LLM calls.
///
/// # Returns
///
/// * `Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`.
impl LLMGraphAnalysisResult {
// Split the main function into smaller, focused functions
pub async fn to_database_entities(
&self,
source_id: &str,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError> {
// Create mapper and pre-assign IDs
let mapper = Arc::new(Mutex::new(self.create_mapper()?));
// Process entities (prepared for future parallelization)
let entities = self
.process_entities(source_id, Arc::clone(&mapper), openai_client)
.await?;
// Process relationships
let relationships = self.process_relationships(Arc::clone(&mapper))?;
Ok((entities, relationships))
}
fn create_mapper(&self) -> Result<GraphMapper, ProcessingError> {
let mut mapper = GraphMapper::new();
// Pre-assign all IDs
for entity in &self.knowledge_entities {
mapper.assign_id(&entity.key);
}
Ok(mapper)
}
async fn process_entities(
&self,
source_id: &str,
mapper: Arc<Mutex<GraphMapper>>,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Result<Vec<KnowledgeEntity>, ProcessingError> {
let futures: Vec<_> = self
.knowledge_entities
.iter()
.map(|entity| {
let mapper = Arc::clone(&mapper);
let openai_client = openai_client.clone();
let source_id = source_id.to_string();
let entity = entity.clone();
task::spawn(async move {
create_single_entity(&entity, &source_id, mapper, &openai_client).await
})
})
.collect();
let results = try_join_all(futures)
.await?
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(results)
}
fn process_relationships(
&self,
mapper: Arc<Mutex<GraphMapper>>,
) -> Result<Vec<KnowledgeRelationship>, ProcessingError> {
let mut mapper_guard = mapper
.lock()
.map_err(|_| ProcessingError::GraphProcessingError("Failed to lock mapper".into()))?;
self.relationships
.iter()
.map(|rel| {
let source_db_id = mapper_guard.get_or_parse_id(&rel.source);
let target_db_id = mapper_guard.get_or_parse_id(&rel.target);
Ok(KnowledgeRelationship::new(
source_db_id.to_string(),
target_db_id.to_string(),
rel.type_.clone(),
None,
))
})
.collect()
}
}
async fn create_single_entity(
llm_entity: &LLMKnowledgeEntity,
source_id: &str,
mapper: Arc<Mutex<GraphMapper>>,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Result<KnowledgeEntity, ProcessingError> {
let assigned_id = {
let mapper = mapper
.lock()
.map_err(|_| ProcessingError::GraphProcessingError("Failed to lock mapper".into()))?;
mapper
.get_id(&llm_entity.key)
.ok_or_else(|| {
ProcessingError::GraphProcessingError(format!(
"ID not found for key: {}",
llm_entity.key
))
})?
.to_string()
};
let embedding_input = format!(
"name: {}, description: {}, type: {}",
llm_entity.name, llm_entity.description, llm_entity.entity_type
);
let embedding = generate_embedding(openai_client, embedding_input).await?;
Ok(KnowledgeEntity {
id: assigned_id,
name: llm_entity.name.to_string(),
description: llm_entity.description.to_string(),
entity_type: KnowledgeEntityType::from(llm_entity.entity_type.to_string()),
source_id: source_id.to_string(),
metadata: None,
embedding,
})
}

View File

@@ -0,0 +1 @@
pub mod llm_analysis_result;

1
src/analysis/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod ingress;

View File

@@ -1,5 +1,6 @@
use async_openai::error::OpenAIError;
use thiserror::Error;
use tokio::task::JoinError;
/// Error types for processing `TextContent`.
#[derive(Error, Debug)]
@@ -18,4 +19,7 @@ pub enum ProcessingError {
#[error("LLM parsing error: {0}")]
LLMParsingError(String),
#[error("Task join error: {0}")]
JoinError(#[from] JoinError),
}

View File

@@ -1,3 +1,4 @@
pub mod analysis;
pub mod error;
pub mod models;
pub mod rabbitmq;

View File

@@ -2,7 +2,6 @@ use super::ingress_content::IngressContentError;
use crate::models::file_info::FileInfo;
use crate::storage::types::text_content::TextContent;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
/// Knowledge object type, containing the content or reference to it, as well as metadata
#[derive(Debug, Serialize, Deserialize, Clone)]

View File

@@ -1,3 +1,4 @@
use crate::analysis::ingress::ingress_analyser::IngressAnalyzer;
use crate::retrieval::graph::find_entities_by_source_id;
use crate::retrieval::vector::find_items_by_vector_similarity;
use crate::storage::db::store_item;
@@ -7,7 +8,7 @@ use crate::storage::types::text_chunk::TextChunk;
use crate::storage::types::text_content::TextContent;
use crate::storage::types::StoredObject;
use crate::utils::embedding::generate_embedding;
use crate::{error::ProcessingError, surrealdb::SurrealDbClient, utils::llm::create_json_ld};
use crate::{error::ProcessingError, surrealdb::SurrealDbClient};
use surrealdb::{engine::remote::ws::Client, Surreal};
use text_splitter::TextSplitter;
use tracing::{debug, info};
@@ -19,8 +20,7 @@ impl TextContent {
let openai_client = async_openai::Client::new();
// Store TextContent
let create_operation = store_item(&db_client, self.clone()).await?;
info!("{:?}", create_operation);
store_item(&db_client, self.clone()).await?;
// Get related nodes
let closest_text_content: Vec<TextChunk> = find_items_by_vector_similarity(
@@ -48,15 +48,10 @@ impl TextContent {
db_client.rebuild_indexes().await?;
// Step 1: Send to LLM for analysis
let analysis = create_json_ld(
&self.category,
&self.instructions,
&self.text,
&db_client,
&openai_client,
)
.await?;
// info!("{:#?}", &analysis);
let analyser = IngressAnalyzer::new(&db_client, &openai_client);
let analysis = analyser
.analyze_content(&self.category, &self.instructions, &self.text)
.await?;
// Step 2: Convert LLM analysis to database entities
let (entities, relationships) = analysis
@@ -116,7 +111,7 @@ impl TextContent {
for chunk in chunks {
info!("Chunk: {}", chunk);
let embedding = generate_embedding(&openai_client, chunk.to_string()).await?;
let embedding = generate_embedding(openai_client, chunk.to_string()).await?;
let text_chunk = TextChunk::new(self.id.to_string(), chunk.to_string(), embedding);
store_item(db_client, text_chunk).await?;

View File

@@ -1,299 +0,0 @@
use crate::{
error::ProcessingError,
models::graph_entities::GraphMapper,
retrieval::vector::find_items_by_vector_similarity,
storage::types::{
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
knowledge_relationship::KnowledgeRelationship,
StoredObject,
},
};
use async_openai::types::{
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
CreateChatCompletionRequestArgs,
};
use serde::{Deserialize, Serialize};
use serde_json::json;
use surrealdb::engine::remote::ws::Client;
use surrealdb::Surreal;
use tracing::debug;
use uuid::Uuid;
use super::embedding::generate_embedding;
/// Represents a single knowledge entity from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMKnowledgeEntity {
pub key: String, // Temporary identifier
pub name: String,
pub description: String,
pub entity_type: String, // Should match KnowledgeEntityType variants
}
/// Represents a single relationship from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMRelationship {
#[serde(rename = "type")]
pub type_: String, // e.g., RelatedTo, RelevantTo
pub source: String, // Key of the source entity
pub target: String, // Key of the target entity
}
/// Represents the entire graph analysis result from the LLM.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct LLMGraphAnalysisResult {
pub knowledge_entities: Vec<LLMKnowledgeEntity>,
pub relationships: Vec<LLMRelationship>,
}
impl LLMGraphAnalysisResult {
/// Converts the LLM graph analysis result into database entities and relationships.
/// Processes embeddings sequentially for simplicity.
///
/// # Arguments
///
/// * `source_id` - A UUID representing the source identifier.
/// * `openai_client` - OpenAI client for LLM calls.
///
/// # Returns
///
/// * `Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError>` - A tuple containing vectors of `KnowledgeEntity` and `KnowledgeRelationship`.
pub async fn to_database_entities(
&self,
source_id: &String,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Result<(Vec<KnowledgeEntity>, Vec<KnowledgeRelationship>), ProcessingError> {
let mut mapper = GraphMapper::new();
// Step 1: Assign unique IDs to all knowledge entities upfront
for llm_entity in &self.knowledge_entities {
mapper.assign_id(&llm_entity.key);
}
let mut entities = vec![];
// Step 2: Process each knowledge entity sequentially
for llm_entity in &self.knowledge_entities {
// Retrieve the assigned ID for the current entity
let assigned_id = mapper
.get_id(&llm_entity.key)
.ok_or_else(|| {
ProcessingError::GraphProcessingError(format!(
"ID not found for key: {}",
llm_entity.key
))
})?
.clone();
// Prepare the embedding input
let embedding_input = format!(
"name: {}, description: {}, type: {}",
llm_entity.name, llm_entity.description, llm_entity.entity_type
);
// Generate embedding
let embedding = generate_embedding(&openai_client, embedding_input).await?;
// Construct the KnowledgeEntity with embedding
let knowledge_entity = KnowledgeEntity {
id: assigned_id.to_string(),
name: llm_entity.name.clone(),
description: llm_entity.description.clone(),
entity_type: KnowledgeEntityType::from(llm_entity.entity_type.clone()),
source_id: source_id.to_string(),
metadata: None,
embedding,
};
entities.push(knowledge_entity);
}
// Step 3: Process relationships using the pre-assigned IDs
let relationships: Vec<KnowledgeRelationship> = self
.relationships
.iter()
.filter_map(|llm_rel| {
let source_db_id = mapper.get_or_parse_id(&llm_rel.source);
let target_db_id = mapper.get_or_parse_id(&llm_rel.target);
debug!("IN: {}, OUT: {}", &source_db_id, &target_db_id);
Some(KnowledgeRelationship::new(
source_db_id.to_string(),
target_db_id.to_string(),
llm_rel.type_.to_owned(),
None,
))
})
.collect();
Ok((entities, relationships))
}
}
/// Sends text to an LLM for analysis.
pub async fn create_json_ld(
category: &str,
instructions: &str,
text: &str,
db_client: &Surreal<Client>,
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
) -> Result<LLMGraphAnalysisResult, ProcessingError> {
// Format the input for more cohesive comparison
let input_text = format!(
"content: {}, category: {}, user_instructions: {}",
text, category, instructions
);
let closest_entities: Vec<KnowledgeEntity> = find_items_by_vector_similarity(
10,
input_text,
db_client,
KnowledgeEntity::table_name().to_string(),
openai_client,
)
.await?;
// Format the KnowledgeEntity, remove redudant fields
let closest_entities_to_llm = json!(closest_entities
.iter()
.map(|entity| {
json!({
"KnowledgeEntity": {
"id": entity.id,
"name": entity.name,
"description": entity.description
}
})
})
.collect::<Vec<_>>());
// let db_context = serde_json::to_string_pretty(&closest_entities_to_llm).unwrap();
let schema = json!({
"type": "object",
"properties": {
"knowledge_entities": {
"type": "array",
"items": {
"type": "object",
"properties": {
"key": { "type": "string" },
"name": { "type": "string" },
"description": { "type": "string" },
"entity_type": {
"type": "string",
"enum": ["idea", "project", "document", "page", "textsnippet"]
}
},
"required": ["key", "name", "description", "entity_type"],
"additionalProperties": false
}
},
"relationships": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {
"type": "string",
"enum": ["RelatedTo", "RelevantTo", "SimilarTo"]
},
"source": { "type": "string" },
"target": { "type": "string" }
},
"required": ["type", "source", "target"],
"additionalProperties": false
}
}
},
"required": ["knowledge_entities", "relationships"],
"additionalProperties": false
});
let response_format = async_openai::types::ResponseFormat::JsonSchema {
json_schema: async_openai::types::ResponseFormatJsonSchema {
description: Some("Structured analysis of the submitted content".into()),
name: "content_analysis".into(),
schema: Some(schema),
strict: Some(true),
},
};
// Construct the system and user messages
let system_message = r#"
You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON object representing the content in a graph format suitable for a graph database. You will also be presented with some existing knowledge_entities from the database, do not replicate these!
The JSON should have the following structure:
{
"knowledge_entities": [
{
"key": "unique-key-1",
"name": "Entity Name",
"description": "A detailed description of the entity.",
"entity_type": "TypeOfEntity"
},
// More entities...
],
"relationships": [
{
"type": "RelationshipType",
"source": "unique-key-1 or UUID from existing database",
"target": "unique-key-1 or UUID from existing database"
},
// More relationships...
]
}
Guidelines:
1. Do NOT generate any IDs or UUIDs. Use a unique `key` for each knowledge entity.
2. Each KnowledgeEntity should have a unique `key`, a meaningful `name`, and a descriptive `description`.
3. Define the type of each KnowledgeEntity using the following categories: Idea, Project, Document, Page, TextSnippet.
4. Establish relationships between entities using types like RelatedTo, RelevantTo, SimilarTo.
5. Use the `source` key to indicate the originating entity and the `target` key to indicate the related entity"
6. You will be presented with a few existing KnowledgeEntities that are similar to the current ones. They will have an existing UUID. When creating relationships to these entities, use their UUID.
7. Only create relationships between existing KnowledgeEntities.
8. Entities that exist already in the database should NOT be created again. If there is only a minor overlap, skip creating a new entity.
9. A new relationship MUST include a newly created KnowledgeEntity.
"#;
let user_message = format!(
"Category:\n{}\nInstructions:\n{}\nContent:\n{}\nExisting KnowledgeEntities in database:\n{}",
category, instructions, text, closest_entities_to_llm
);
debug!("{}", user_message);
// Build the chat completion request
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.temperature(0.2)
.max_tokens(2048u32)
.messages([
ChatCompletionRequestSystemMessage::from(system_message).into(),
ChatCompletionRequestUserMessage::from(user_message).into(),
])
.response_format(response_format)
.build()?;
// Send the request to OpenAI
let response = openai_client.chat().create(request).await?;
debug!("{:?}", response);
response
.choices
.first()
.and_then(|choice| choice.message.content.as_ref())
.ok_or(ProcessingError::LLMParsingError(
"No content found in LLM response".into(),
))
.and_then(|content| {
serde_json::from_str(content).map_err(|e| {
ProcessingError::LLMParsingError(format!(
"Failed to parse LLM response into analysis: {}",
e
))
})
})
}

View File

@@ -1,2 +1 @@
pub mod embedding;
pub mod llm;