diff --git a/flake.nix b/flake.nix index 9e220a0..53c151a 100644 --- a/flake.nix +++ b/flake.nix @@ -35,6 +35,8 @@ { # https://devenv.sh/reference/options/ enterShell = '' + echo "Welcome to zettle_db project" + echo "----------------------------" echo "run devenv up -d to start and monitor services" ''; diff --git a/src/models/text_content.rs b/src/models/text_content.rs index 3c6bc27..56a3a9f 100644 --- a/src/models/text_content.rs +++ b/src/models/text_content.rs @@ -14,14 +14,32 @@ pub struct TextContent { pub category: String, } -#[derive(Debug, Serialize, Deserialize)] -pub struct LLMAnalysis { - pub json_ld: serde_json::Value, +/// A struct representing a knowledge source in the graph database. +#[derive(Deserialize, Debug, Serialize)] +pub struct KnowledgeSource { + pub id: String, + pub title: String, pub description: String, - pub related_category: String, + pub relationships: Vec, +} + +/// A struct representing a relationship between knowledge sources. +#[derive(Deserialize, Serialize, Debug)] +pub struct Relationship { + #[serde(rename = "type")] + pub type_: String, + pub target: String, +} + +/// A struct representing the result of an LLM analysis. +#[derive(Deserialize, Debug,Serialize)] +pub struct AnalysisResult { + pub knowledge_sources: Vec, + pub category: String, pub instructions: String, } + /// Error types for processing `TextContent`. #[derive(Error, Debug)] pub enum ProcessingError { @@ -66,66 +84,43 @@ impl TextContent { } /// Sends text to an LLM for analysis. - async fn send_to_llm(&self) -> Result { + async fn send_to_llm(&self) -> Result { let client = async_openai::Client::new(); - - // Define the JSON Schema for the expected response -// let schema = json!({ -// "type": "object", -// "properties": { -// "json_ld": { -// "type": "object", -// "properties": { -// "@context": { "type": "string" }, -// "@type": { "type": "string" }, -// "name": { "type": "string" } -// // Define only the essential properties -// }, -// "required": ["@context", "@type", "name"], -// "additionalProperties": false, -// }, -// "description": { "type": "string" }, -// "related_category": { "type": "string" }, -// "instructions": { "type": "string" } -// }, -// "required": ["json_ld", "description", "related_category", "instructions"], -// "additionalProperties": false -// }); -let schema = json!({ - "type": "object", - "properties": { - "knowledge_sources": { - "type": "array", - "items": { - "type": "object", - "properties": { - "id": {"type": "string"}, - "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, - "title": {"type": "string"}, - "description": {"type": "string"}, - "relationships": { - "type": "array", - "items": { - "type": "object", - "properties": { - "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, - "target": {"type": "string", "description": "ID of the related knowledge source"} - }, - "required": ["type", "target"], - "additionalProperties": false, - } - } - }, - "required": ["id", "type", "title", "description", "relationships"], - "additionalProperties": false, - } - }, - "category": {"type": "string"}, - "instructions": {"type": "string"} - }, - "required": ["knowledge_sources", "category", "instructions"], - "additionalProperties": false -}); + let schema = json!({ + "type": "object", + "properties": { + "knowledge_sources": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "relationships": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, + "target": {"type": "string", "description": "ID of the related knowledge source"} + }, + "required": ["type", "target"], + "additionalProperties": false, + } + } + }, + "required": ["id", "type", "title", "description", "relationships"], + "additionalProperties": false, + } + }, + "category": {"type": "string"}, + "instructions": {"type": "string"} + }, + "required": ["knowledge_sources", "category", "instructions"], + "additionalProperties": false + }); let response_format = async_openai::types::ResponseFormat::JsonSchema { json_schema: async_openai::types::ResponseFormatJsonSchema { @@ -167,7 +162,7 @@ let schema = json!({ // Extract and parse the response for choice in response.choices { if let Some(content) = choice.message.content { - let analysis: LLMAnalysis = serde_json::from_str(&content).map_err(|e| { + let analysis: AnalysisResult = serde_json::from_str(&content).map_err(|e| { ProcessingError::LLMError(format!( "Failed to parse LLM response into LLMAnalysis: {}", e.to_string() @@ -183,7 +178,7 @@ let schema = json!({ } /// Stores analysis results in a graph database. - async fn store_in_graph_db(&self, _analysis: &LLMAnalysis) -> Result<(), ProcessingError> { + async fn store_in_graph_db(&self, _analysis: &AnalysisResult) -> Result<(), ProcessingError> { // TODO: Implement storage logic for your specific graph database. // Example: /* diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index f36f382..8d33106 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -5,7 +5,7 @@ use futures_lite::stream::StreamExt; use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject }; -use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; +use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; /// Struct to consume messages from RabbitMQ. @@ -20,10 +20,10 @@ impl RabbitMQConsumer { /// declares a exchange if needed, declares and binds a queue and initializes the consumer /// /// # Arguments - /// * 'config' - A initialized RabbitMQConfig containing required configurations + /// * `config` - A initialized RabbitMQConfig containing required configurations /// /// # Returns - /// * 'Result' - The created client or an error. + /// * `Result` - The created client or an error. pub async fn new(config: &RabbitMQConfig) -> Result { let common = RabbitMQCommon::new(config).await?; @@ -41,6 +41,13 @@ impl RabbitMQConsumer { } /// Sets up the consumer based on the channel and `RabbitMQConfig`. + /// + /// # Arguments + /// * `channel` - Lapin Channel. + /// * `config` - A initialized RabbitMQConfig containing required information + /// + /// # Returns + /// * `Result` - The initialized consumer or error async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result { channel .basic_consume( @@ -52,6 +59,12 @@ impl RabbitMQConsumer { .await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string())) } /// Declares the queue based on the channel and `RabbitMQConfig`. + /// # Arguments + /// * `channel` - Lapin Channel. + /// * `config` - A initialized RabbitMQConfig containing required information + /// + /// # Returns + /// * `Result` - The initialized queue or error async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result { channel .queue_declare( @@ -65,7 +78,16 @@ impl RabbitMQConsumer { .await .map_err(|e| RabbitMQError::QueueError(e.to_string())) } + /// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`. + /// # Arguments + /// * `channel` - Lapin Channel. + /// * `exchange` - String value of the exchange name + /// * `queue` - Lapin queue thats declared + /// * `config` - A initialized RabbitMQConfig containing required information + /// + /// # Returns + /// * `Result<(), RabbitMQError>` - Ok or error async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> { channel .queue_bind( @@ -101,6 +123,13 @@ impl RabbitMQConsumer { } /// Acknowledges the message after processing + /// + /// # Arguments + /// * `self` - Reference to self + /// * `delivery` - Delivery reciept + /// + /// # Returns + /// * `Result<(), RabbitMQError>` - Ok or error pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> { self.common.channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index ad1615d..f4c2e78 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -1,6 +1,7 @@ pub mod publisher; pub mod consumer; +use axum::async_trait; use lapin::{ options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind }; @@ -41,6 +42,15 @@ pub struct RabbitMQCommon { pub channel: Channel, } + +/// Defines the behavior for RabbitMQCommon client operations. +#[cfg_attr(test, mockall::automock)] +#[async_trait] +pub trait RabbitMQCommonTrait: Send + Sync { + async fn create_connection(config: &RabbitMQConfig) -> Result; + async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError>; +} + impl RabbitMQCommon { /// Sets up a new RabbitMQ client or error /// @@ -54,7 +64,10 @@ impl RabbitMQCommon { let channel = connection.create_channel().await?; Ok(Self { connection, channel }) } +} +#[async_trait] +impl RabbitMQCommonTrait for RabbitMQCommon { /// Function to set up the connection async fn create_connection(config: &RabbitMQConfig) -> Result { debug!("Creating connection"); @@ -64,7 +77,7 @@ impl RabbitMQCommon { } /// Function to declare the exchange required - pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> { + async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> { debug!("Declaring exchange"); self.channel .exchange_declare( @@ -81,4 +94,3 @@ impl RabbitMQCommon { .map_err(|e| RabbitMQError::ExchangeError(e.to_string())) } } - diff --git a/src/rabbitmq/publisher.rs b/src/rabbitmq/publisher.rs index d7dec5c..2d9f125 100644 --- a/src/rabbitmq/publisher.rs +++ b/src/rabbitmq/publisher.rs @@ -4,9 +4,10 @@ use lapin::{ use crate::models::ingress_object::IngressObject; -use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; +use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; +/// Struct to publish messages to RabbitMQ. pub struct RabbitMQProducer { common: RabbitMQCommon, exchange_name: String, @@ -14,6 +15,14 @@ pub struct RabbitMQProducer { } impl RabbitMQProducer { + /// Creates a new `RabbitMQProducer` instance which sets up a RabbitMQ client, + /// declares a exchange if needed. + /// + /// # Arguments + /// * `config` - A initialized RabbitMQConfig containing required configurations + /// + /// # Returns + /// * `Result` - The created client or an error. pub async fn new(config: &RabbitMQConfig) -> Result { let common = RabbitMQCommon::new(config).await?; common.declare_exchange(config, false).await?; @@ -26,6 +35,13 @@ impl RabbitMQProducer { } /// Publishes an IngressObject to RabbitMQ after serializing it to JSON. + /// + /// # Arguments + /// * `self` - Reference to self + /// * `ingress_object` - A initialized IngressObject + /// + /// # Returns + /// * `Result` - Confirmation of sent message or error pub async fn publish(&self, ingress_object: &IngressObject) -> Result { // Serialize IngressObject to JSON let payload = serde_json::to_vec(ingress_object)