diff --git a/flake.nix b/flake.nix index 53c151a..73de9bb 100644 --- a/flake.nix +++ b/flake.nix @@ -40,8 +40,16 @@ echo "run devenv up -d to start and monitor services" ''; + packages = [ + pkgs.neo4j + ]; + languages.rust.enable = true; + processes = { + start-neo4j.exec = "NEO4J_HOME=$(mktemp -d) neo4j console"; + }; + services = { redis = { enable = true; diff --git a/src/models/ingress_object.rs b/src/models/ingress_object.rs index a473d39..a2b354c 100644 --- a/src/models/ingress_object.rs +++ b/src/models/ingress_object.rs @@ -1,5 +1,6 @@ use crate::models::file_info::FileInfo; use serde::{Deserialize, Serialize}; +use uuid::Uuid; use super::{ingress_content::IngressContentError, text_content::TextContent}; @@ -35,7 +36,9 @@ impl IngressObject { match self { IngressObject::Url { url, instructions, category } => { let text = Self::fetch_text_from_url(url).await?; + let id = Uuid::new_v4(); Ok(TextContent { + id, text, instructions: instructions.clone(), category: category.clone(), @@ -43,7 +46,9 @@ impl IngressObject { }) }, IngressObject::Text { text, instructions, category } => { + let id = Uuid::new_v4(); Ok(TextContent { + id, text: text.clone(), instructions: instructions.clone(), category: category.clone(), @@ -51,8 +56,10 @@ impl IngressObject { }) }, IngressObject::File { file_info, instructions, category } => { + let id = Uuid::new_v4(); let text = Self::extract_text_from_file(file_info).await?; Ok(TextContent { + id, text, instructions: instructions.clone(), category: category.clone(), diff --git a/src/models/text_content.rs b/src/models/text_content.rs index e4708f7..98b80e6 100644 --- a/src/models/text_content.rs +++ b/src/models/text_content.rs @@ -1,13 +1,13 @@ -use async_openai::types::{ ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, CreateChatCompletionRequestArgs}; use serde::{Deserialize, Serialize}; -use serde_json::json; use tracing::info; -use crate::{models::file_info::FileInfo, utils::llm::create_json_ld}; +use uuid::Uuid; +use crate::{models::file_info::FileInfo, neo4j::client::Neo4jClient, utils::llm::create_json_ld}; use thiserror::Error; /// Represents a single piece of text content extracted from various sources. #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TextContent { + pub id: Uuid, pub text: String, pub file_info: Option, pub instructions: String, @@ -15,7 +15,7 @@ pub struct TextContent { } /// A struct representing a knowledge source in the graph database. -#[derive(Deserialize, Debug, Serialize)] +#[derive(Deserialize, Debug, Serialize, Clone )] pub struct KnowledgeSource { pub id: String, pub title: String, @@ -24,7 +24,7 @@ pub struct KnowledgeSource { } /// A struct representing a relationship between knowledge sources. -#[derive(Deserialize, Serialize, Debug)] +#[derive(Deserialize, Clone, Serialize, Debug)] pub struct Relationship { #[serde(rename = "type")] pub type_: String, @@ -34,7 +34,7 @@ pub struct Relationship { /// A struct representing the result of an LLM analysis. #[derive(Deserialize, Debug,Serialize)] pub struct AnalysisResult { - pub knowledge_sources: Vec, + pub knowledge_source: KnowledgeSource, pub category: String, pub instructions: String, } @@ -58,24 +58,24 @@ pub enum ProcessingError { impl TextContent { - /// Creates a new `TextContent` instance. - pub fn new(text: String, file_info: Option, instructions: String, category: String) -> Self { - Self { - text, - file_info, - instructions, - category, - } - } - /// Processes the `TextContent` by sending it to an LLM, storing in a graph DB, and vector DB. pub async fn process(&self) -> Result<(), ProcessingError> { + let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo4j").await.expect("Failed to create Neo4j client"); + // Step 1: Send to LLM for analysis let analysis = create_json_ld(&self.category, &self.instructions, &self.text).await?; - info!("{:?}", analysis); + info!("{:?}", &analysis); // Step 2: Store analysis results in Graph DB - // self.store_in_graph_db(&analysis).await?; + client.store_knowledge_source(&analysis.knowledge_source).await?; + + // Step 3: Store relationships in Graph DB + for relationship in analysis.knowledge_source.relationships.iter() { + client + .store_relationship(&analysis.knowledge_source.id, relationship) + .await?; + } + // Step 3: Split text and store in Vector DB // self.store_in_vector_db().await?; diff --git a/src/neo4j/client.rs b/src/neo4j/client.rs index 00f86ca..7ebbb26 100644 --- a/src/neo4j/client.rs +++ b/src/neo4j/client.rs @@ -1,103 +1,183 @@ -// use neo4rs::*; -// use serde::{Deserialize, Serialize}; +use neo4rs::*; +use tracing::info; +use std::error::Error; -// /// A struct representing a knowledge source in the graph database. -// #[derive(Deserialize, Serialize)] -// pub struct KnowledgeSource { -// pub id: String, -// pub title: String, -// pub description: String, -// pub relationships: Vec, -// } +use crate::models::text_content::{KnowledgeSource, ProcessingError, Relationship}; -// /// A struct representing a relationship between knowledge sources. -// #[derive(Deserialize, Serialize)] -// pub struct Relationship { -// pub type_: String, -// pub target: String, -// } +/// A client for interacting with the Neo4j graph database. +pub struct Neo4jClient { + /// The Neo4j graph instance. + graph: Graph, +} -// /// A struct representing the result of an LLM analysis. -// #[derive(Deserialize, Serialize)] -// pub struct AnalysisResult { -// pub knowledge_sources: Vec, -// pub category: String, -// pub instructions: String, -// } +impl Neo4jClient { + /// Creates a new `Neo4jClient` instance by connecting to the Neo4j database. + /// + /// # Arguments + /// + /// * `uri` - The URI of the Neo4j database (e.g., "127.0.0.1:7687"). + /// * `user` - The username for authentication. + /// * `pass` - The password for authentication. + /// + /// # Errors + /// + /// Returns an `Err` variant if the connection to the database fails. + pub async fn new(uri: &str, user: &str, pass: &str) -> Result> { + // Initialize the Neo4j graph connection. + let graph = Graph::new(uri, user, pass).await?; + Ok(Neo4jClient { graph }) + } -// /// A trait for interacting with the Neo4j database. -// pub trait Neo4jClient { -// /// Create a new knowledge source in the graph database. -// fn create_knowledge_source( -// &self, -// knowledge_source: KnowledgeSource, -// ) -> Result<(), neo4rs::Error>; + /// Stores a knowledge source in the Neo4j database. + /// + /// # Arguments + /// + /// * `source` - A reference to the `KnowledgeSource` to be stored. + /// + /// # Errors + /// + /// Returns an `Err` variant if the database operation fails. + pub async fn store_knowledge_source( + &self, + source: &KnowledgeSource, + ) -> Result<(), ProcessingError> { + // Cypher query to create a knowledge source node with properties. + let cypher_query = " + CREATE (ks:KnowledgeSource { + id: $id, + type: $type, + title: $title, + description: $description + }) + RETURN ks + "; -// /// Get a knowledge source by its ID. -// fn get_knowledge_source(&self, id: &str) -> Result; + // Execute the query with parameters. + let _ = self.graph + .run( + query(cypher_query) + .param("id", source.id.to_string()) + .param("title", source.title.to_string()) + .param("description", source.description.to_string()), + ) + .await.map_err(|e| ProcessingError::GraphDBError(e.to_string())); -// /// Create a new relationship between knowledge sources. -// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error>; + info!("Stored knowledge source"); -// /// Get all relationships for a given knowledge source. -// fn get_relationships(&self, id: &str) -> Result, neo4rs::Error>; -// } + Ok(()) + } -// /// A concrete implementation of the Neo4jClient trait. -// pub struct Neo4jClientImpl { -// client: Graph, -// } + /// Stores a relationship between two knowledge sources in the Neo4j database. + /// + /// # Arguments + /// + /// * `source_id` - The ID of the source knowledge source. + /// * `relationship` - A reference to the `Relationship` defining the connection. + /// + /// # Errors + /// + /// Returns an `Err` variant if the database operation fails. + pub async fn store_relationship( + &self, + source_id: &str, + relationship: &Relationship, + ) -> Result<(), ProcessingError> { + // Cypher query to create a relationship between two knowledge source nodes. + let cypher_query = format!( + " + MATCH (a:KnowledgeSource {{id: $source_id}}) + MATCH (b:KnowledgeSource {{id: $target_id}}) + CREATE (a)-[:{}]->(b) + RETURN a, b + ", + relationship.type_ + ); -// impl Neo4jClientImpl { -// /// Create a new Neo4j client. -// pub async fn new(uri: &str, auth: &str, pass: &str) -> Result { -// let client = Graph::new(uri, auth, pass).await?; -// Ok(Neo4jClientImpl { client }) -// } -// } + // Execute the query with parameters. + let _ = self.graph + .run( + query(&cypher_query) + .param("source_id", source_id) + .param("target", relationship.target.to_string()), + ) + .await.map_err(|e| ProcessingError::GraphDBError(e.to_string())); -// impl Neo4jClient for Neo4jClientImpl { -// fn create_knowledge_source( -// &self, -// knowledge_source: KnowledgeSource, -// ) -> Result<(), neo4rs::Error> { -// let node = Node::new( -// knowledge_source.id, -// knowledge_source.title, -// knowledge_source.description, -// )?; -// self.client.create_node(node)?; -// Ok(()) -// } + info!("Stored knowledge relationship"); -// fn get_knowledge_source(&self, id: &str) -> Result { -// let node = self.client.get_node(id)?; -// let knowledge_source = KnowledgeSource { -// id: node.id(), -// title: node.get_property("title")?, -// description: node.get_property("description")?, -// relationships: vec![], + Ok(()) + } + + //// Closes the connection to the Neo4j database. + //// + //// This function ensures that all pending operations are completed before shutting down. + // pub async fn close(self) -> Result<(), Box> { + // self.graph.close().await?; + // Ok(()) + // } +} + +// #[cfg(test)] +// mod tests { +// use super::*; +// use uuid::Uuid; + +// /// Tests the `store_knowledge_source` and `store_relationship` functions. +// /// +// /// # Note +// /// +// /// Ensure that a Neo4j database is running and accessible at the specified URI +// /// before running these tests. +// #[tokio::test] +// async fn test_store_functions() { +// // Initialize the Neo4j client. +// let client = Neo4jClient::new("127.0.0.1:7687", "neo4j", "neo") +// .await +// .expect("Failed to create Neo4j client"); + +// // Create a first knowledge source. +// let source1 = KnowledgeSource { +// id: Uuid::new_v4().to_string(), +// source_type: "Document".into(), +// title: "Understanding Neural Networks".into(), +// description: +// "An in-depth analysis of neural networks and their applications in machine learning." +// .into(), // }; -// Ok(knowledge_source) -// } -// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error> { -// let rel = Relationship::new(relationship.type_, relationship.target)?; -// self.client.create_relationship(rel)?; -// Ok(()) -// } +// // Store the first knowledge source. +// client +// .store_knowledge_source(&source1) +// .await +// .expect("Failed to store knowledge source 1"); -// fn get_relationships(&self, id: &str) -> Result, neo4rs::Error> { -// let node = self.client.get_node(id)?; -// let relationships = node.get_relationships()?; -// let mut result = vec![]; -// for rel in relationships { -// let relationship = Relationship { -// type_: rel.type_(), -// target: rel.target(), -// }; -// result.push(relationship); -// } -// Ok(result) +// // Create a second knowledge source. +// let source2 = KnowledgeSource { +// id: Uuid::new_v4().to_string(), +// source_type: "Document".into(), +// title: "Machine Learning Basics".into(), +// description: "A foundational text on machine learning principles and techniques." +// .into(), +// }; + +// // Store the second knowledge source. +// client +// .store_knowledge_source(&source2) +// .await +// .expect("Failed to store knowledge source 2"); + +// // Define a relationship between the two sources. +// let relationship = Relationship { +// relationship_type: "RelatedTo".into(), +// target_id: source2.id.clone(), +// }; + +// // Store the relationship from source1 to source2. +// client +// .store_relationship(&source1.id, &relationship) +// .await +// .expect("Failed to store relationship"); + +// // Clean up by closing the client. +// client.close().await.expect("Failed to close Neo4j client"); // } // } diff --git a/src/utils/llm.rs b/src/utils/llm.rs index 2617741..119ed53 100644 --- a/src/utils/llm.rs +++ b/src/utils/llm.rs @@ -1,165 +1,98 @@ use async_openai::types::ChatCompletionRequestSystemMessage; use async_openai::types::ChatCompletionRequestUserMessage; use async_openai::types::CreateChatCompletionRequestArgs; -use tracing::debug; use tracing::info; use crate::models::text_content::ProcessingError; use serde_json::json; use crate::models::text_content::AnalysisResult; -/// Placeholder for your actual graph summary retrieval logic -async fn get_graph_summary() -> Result { - // Implement your logic to fetch and summarize the graph database - Ok("Current graph contains documents related to AI, Rust programming, and asynchronous systems.".into()) -} - -/// Sends text to an LLM for analysis with enhanced functionality. +/// Sends text to an LLM for analysis. pub async fn create_json_ld(category: &str, instructions: &str, text: &str) -> Result { - let client = async_openai::Client::new(); - - // Fetch the graph summary - let graph_summary = get_graph_summary().await?; - - let schema = json!({ - "type": "object", - "properties": { - "knowledge_sources": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": {"type": "string"}, - "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, - "title": {"type": "string"}, - "description": {"type": "string"}, - "relationships": { - "type": "array", - "items": { - "type": "object", - "properties": { - "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, - "target": {"type": "string", "description": "ID of the related knowledge source"} - }, - "required": ["type", "target"], - "additionalProperties": false, - } - } - }, - "required": ["id", "type", "title", "description", "relationships"], - "additionalProperties": false, - } + let client = async_openai::Client::new(); + let schema = json!({ + "type": "object", + "properties": { + "knowledge_source": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "relationships": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, + "target": {"type": "string", "description": "ID of the related knowledge source"} + }, + "required": ["type", "target"], + "additionalProperties": false, + } + } + }, + "required": ["id", "type", "title", "description", "relationships"], + "additionalProperties": false, }, "category": {"type": "string"}, "instructions": {"type": "string"} - }, - "required": ["knowledge_sources", "category", "instructions"], - "additionalProperties": false - }); + }, + "required": ["knowledge_source", "category", "instructions"], + "additionalProperties": false + }); - let response_format = async_openai::types::ResponseFormat::JsonSchema { - json_schema: async_openai::types::ResponseFormatJsonSchema { - description: Some("Structured analysis of the submitted content".into()), - name: "content_analysis".into(), - schema: Some(schema), - strict: Some(true), - }, - }; + let response_format = async_openai::types::ResponseFormat::JsonSchema { + json_schema: async_openai::types::ResponseFormatJsonSchema { + description: Some("Structured analysis of the submitted content".into()), + name: "content_analysis".into(), + schema: Some(schema), + strict: Some(true), + }, + }; - // Construct examples to guide the LLM - let system_message = format!( - "You are an expert document analyzer. You will receive a document's text content, user instructions, a category, and a summary of the current graph database. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database. + // Construct the system and user messages + let system_message = format!( + "You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a moderately short description of the document, how it relates to the submitted category and any relevant instructions. You shall also include related objects. The goal is to insert your output into a graph database." + ); - Here are examples of the desired output: + let user_message = format!( + "Category: {}\nInstructions: {}\nContent:\n{}", + category, instructions, text + ); - Example 1: - {{ - \"knowledge_sources\": [ - {{ - \"id\": \"ai_neural_networks\", - \"type\": \"Document\", - \"title\": \"Understanding Neural Networks\", - \"description\": \"An in-depth analysis of neural networks and their applications in machine learning.\", - \"relationships\": [ - {{ - \"type\": \"RelatedTo\", - \"target\": \"ai_machine_learning\" - }}, - {{ - \"type\": \"SimilarTo\", - \"target\": \"ai_deep_learning\" - }} - ] - }} - ], - \"category\": \"ai\", - \"instructions\": \"Analyze the document and relate it to existing AI knowledge.\" - }} + // Build the chat completion request + let request = CreateChatCompletionRequestArgs::default() + .model("gpt-4o-mini") + .max_tokens(2048u32) + .messages([ + ChatCompletionRequestSystemMessage::from(system_message).into(), + ChatCompletionRequestUserMessage::from(user_message).into(), + ]) + .response_format(response_format) + .build().map_err(|e| ProcessingError::LLMError(e.to_string()))?; - Example 2: - {{ - \"knowledge_sources\": [ - {{ - \"id\": \"rust_async_programming\", - \"type\": \"Document\", - \"title\": \"Asynchronous Programming in Rust\", - \"description\": \"A comprehensive guide to writing asynchronous code in Rust using async/await syntax.\", - \"relationships\": [ - {{ - \"type\": \"RelatedTo\", - \"target\": \"rust_concurrency\" - }}, - {{ - \"type\": \"SimilarTo\", - \"target\": \"rust_multithreading\" - }} - ] - }} - ], - \"category\": \"rust\", - \"instructions\": \"Incorporate the document into the Rust programming knowledge base.\" - }} + // Send the request to OpenAI + let response = client.chat().create(request).await.map_err(|e| { + ProcessingError::LLMError(format!("OpenAI API request failed: {}", e.to_string())) + })?; - Please ensure the IDs follow the format _ using snake_case." - ); + info!("{:?}", response); - let user_message = format!( - "Graph Summary: {}\nCategory: {}\nInstructions: {}\nContent:\n{}", - graph_summary, category, instructions, text - ); - - // Build the chat completion request - let request = CreateChatCompletionRequestArgs::default() - .model("gpt-4o-mini") // Ensure this is the correct model identifier - .max_tokens(2048u32) - .messages([ - ChatCompletionRequestSystemMessage::from(system_message).into(), - ChatCompletionRequestUserMessage::from(user_message).into(), - ]) - .response_format(response_format) - .build() - .map_err(|e| ProcessingError::LLMError(e.to_string()))?; - - // Send the request to OpenAI - let response = client.chat().create(request).await.map_err(|e| { - ProcessingError::LLMError(format!("OpenAI API request failed: {}", e.to_string())) - })?; - - debug!("{:?}", response); - - // Extract and parse the response - for choice in response.choices { - if let Some(content) = choice.message.content { - let analysis: AnalysisResult = serde_json::from_str(&content).map_err(|e| { - ProcessingError::LLMError(format!( - "Failed to parse LLM response into LLMAnalysis: {}", - e.to_string() - )) - })?; - return Ok(analysis); + // Extract and parse the response + for choice in response.choices { + if let Some(content) = choice.message.content { + let analysis: AnalysisResult = serde_json::from_str(&content).map_err(|e| { + ProcessingError::LLMError(format!( + "Failed to parse LLM response into LLMAnalysis: {}", + e.to_string() + )) + })?; + return Ok(analysis); + } } - } - Err(ProcessingError::LLMError( - "No content found in LLM response".into(), - )) -} + Err(ProcessingError::LLMError( + "No content found in LLM response".into(), + )) + }