more documentation, wip llm processing

This commit is contained in:
Per Stark
2024-10-02 13:40:48 +02:00
parent 7d13c06a51
commit 43e5d4d629
5 changed files with 125 additions and 71 deletions

View File

@@ -35,6 +35,8 @@
{ {
# https://devenv.sh/reference/options/ # https://devenv.sh/reference/options/
enterShell = '' enterShell = ''
echo "Welcome to zettle_db project"
echo "----------------------------"
echo "run devenv up -d to start and monitor services" echo "run devenv up -d to start and monitor services"
''; '';

View File

@@ -14,14 +14,32 @@ pub struct TextContent {
pub category: String, pub category: String,
} }
#[derive(Debug, Serialize, Deserialize)] /// A struct representing a knowledge source in the graph database.
pub struct LLMAnalysis { #[derive(Deserialize, Debug, Serialize)]
pub json_ld: serde_json::Value, pub struct KnowledgeSource {
pub id: String,
pub title: String,
pub description: String, pub description: String,
pub related_category: String, pub relationships: Vec<Relationship>,
}
/// A struct representing a relationship between knowledge sources.
#[derive(Deserialize, Serialize, Debug)]
pub struct Relationship {
#[serde(rename = "type")]
pub type_: String,
pub target: String,
}
/// A struct representing the result of an LLM analysis.
#[derive(Deserialize, Debug,Serialize)]
pub struct AnalysisResult {
pub knowledge_sources: Vec<KnowledgeSource>,
pub category: String,
pub instructions: String, pub instructions: String,
} }
/// Error types for processing `TextContent`. /// Error types for processing `TextContent`.
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum ProcessingError { pub enum ProcessingError {
@@ -66,66 +84,43 @@ impl TextContent {
} }
/// Sends text to an LLM for analysis. /// Sends text to an LLM for analysis.
async fn send_to_llm(&self) -> Result<LLMAnalysis, ProcessingError> { async fn send_to_llm(&self) -> Result<AnalysisResult, ProcessingError> {
let client = async_openai::Client::new(); let client = async_openai::Client::new();
let schema = json!({
// Define the JSON Schema for the expected response "type": "object",
// let schema = json!({ "properties": {
// "type": "object", "knowledge_sources": {
// "properties": { "type": "array",
// "json_ld": { "items": {
// "type": "object", "type": "object",
// "properties": { "properties": {
// "@context": { "type": "string" }, "id": {"type": "string"},
// "@type": { "type": "string" }, "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]},
// "name": { "type": "string" } "title": {"type": "string"},
// // Define only the essential properties "description": {"type": "string"},
// }, "relationships": {
// "required": ["@context", "@type", "name"], "type": "array",
// "additionalProperties": false, "items": {
// }, "type": "object",
// "description": { "type": "string" }, "properties": {
// "related_category": { "type": "string" }, "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]},
// "instructions": { "type": "string" } "target": {"type": "string", "description": "ID of the related knowledge source"}
// }, },
// "required": ["json_ld", "description", "related_category", "instructions"], "required": ["type", "target"],
// "additionalProperties": false "additionalProperties": false,
// }); }
let schema = json!({ }
"type": "object", },
"properties": { "required": ["id", "type", "title", "description", "relationships"],
"knowledge_sources": { "additionalProperties": false,
"type": "array", }
"items": { },
"type": "object", "category": {"type": "string"},
"properties": { "instructions": {"type": "string"}
"id": {"type": "string"}, },
"type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, "required": ["knowledge_sources", "category", "instructions"],
"title": {"type": "string"}, "additionalProperties": false
"description": {"type": "string"}, });
"relationships": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]},
"target": {"type": "string", "description": "ID of the related knowledge source"}
},
"required": ["type", "target"],
"additionalProperties": false,
}
}
},
"required": ["id", "type", "title", "description", "relationships"],
"additionalProperties": false,
}
},
"category": {"type": "string"},
"instructions": {"type": "string"}
},
"required": ["knowledge_sources", "category", "instructions"],
"additionalProperties": false
});
let response_format = async_openai::types::ResponseFormat::JsonSchema { let response_format = async_openai::types::ResponseFormat::JsonSchema {
json_schema: async_openai::types::ResponseFormatJsonSchema { json_schema: async_openai::types::ResponseFormatJsonSchema {
@@ -167,7 +162,7 @@ let schema = json!({
// Extract and parse the response // Extract and parse the response
for choice in response.choices { for choice in response.choices {
if let Some(content) = choice.message.content { if let Some(content) = choice.message.content {
let analysis: LLMAnalysis = serde_json::from_str(&content).map_err(|e| { let analysis: AnalysisResult = serde_json::from_str(&content).map_err(|e| {
ProcessingError::LLMError(format!( ProcessingError::LLMError(format!(
"Failed to parse LLM response into LLMAnalysis: {}", "Failed to parse LLM response into LLMAnalysis: {}",
e.to_string() e.to_string()
@@ -183,7 +178,7 @@ let schema = json!({
} }
/// Stores analysis results in a graph database. /// Stores analysis results in a graph database.
async fn store_in_graph_db(&self, _analysis: &LLMAnalysis) -> Result<(), ProcessingError> { async fn store_in_graph_db(&self, _analysis: &AnalysisResult) -> Result<(), ProcessingError> {
// TODO: Implement storage logic for your specific graph database. // TODO: Implement storage logic for your specific graph database.
// Example: // Example:
/* /*

View File

@@ -5,7 +5,7 @@ use futures_lite::stream::StreamExt;
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject }; use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject };
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
use tracing::{info, error}; use tracing::{info, error};
/// Struct to consume messages from RabbitMQ. /// Struct to consume messages from RabbitMQ.
@@ -20,10 +20,10 @@ impl RabbitMQConsumer {
/// declares a exchange if needed, declares and binds a queue and initializes the consumer /// declares a exchange if needed, declares and binds a queue and initializes the consumer
/// ///
/// # Arguments /// # Arguments
/// * 'config' - A initialized RabbitMQConfig containing required configurations /// * `config` - A initialized RabbitMQConfig containing required configurations
/// ///
/// # Returns /// # Returns
/// * 'Result<Self, RabbitMQError>' - The created client or an error. /// * `Result<Self, RabbitMQError>` - The created client or an error.
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> { pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?; let common = RabbitMQCommon::new(config).await?;
@@ -41,6 +41,13 @@ impl RabbitMQConsumer {
} }
/// Sets up the consumer based on the channel and `RabbitMQConfig`. /// Sets up the consumer based on the channel and `RabbitMQConfig`.
///
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<Consumer, RabbitMQError>` - The initialized consumer or error
async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> { async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> {
channel channel
.basic_consume( .basic_consume(
@@ -52,6 +59,12 @@ impl RabbitMQConsumer {
.await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string())) .await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string()))
} }
/// Declares the queue based on the channel and `RabbitMQConfig`. /// Declares the queue based on the channel and `RabbitMQConfig`.
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<Queue, RabbitMQError>` - The initialized queue or error
async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result<Queue, RabbitMQError> { async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result<Queue, RabbitMQError> {
channel channel
.queue_declare( .queue_declare(
@@ -65,7 +78,16 @@ impl RabbitMQConsumer {
.await .await
.map_err(|e| RabbitMQError::QueueError(e.to_string())) .map_err(|e| RabbitMQError::QueueError(e.to_string()))
} }
/// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`. /// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`.
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `exchange` - String value of the exchange name
/// * `queue` - Lapin queue thats declared
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<(), RabbitMQError>` - Ok or error
async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> { async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> {
channel channel
.queue_bind( .queue_bind(
@@ -101,6 +123,13 @@ impl RabbitMQConsumer {
} }
/// Acknowledges the message after processing /// Acknowledges the message after processing
///
/// # Arguments
/// * `self` - Reference to self
/// * `delivery` - Delivery reciept
///
/// # Returns
/// * `Result<(), RabbitMQError>` - Ok or error
pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> { pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> {
self.common.channel self.common.channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .basic_ack(delivery.delivery_tag, BasicAckOptions::default())

View File

@@ -1,6 +1,7 @@
pub mod publisher; pub mod publisher;
pub mod consumer; pub mod consumer;
use axum::async_trait;
use lapin::{ use lapin::{
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind
}; };
@@ -41,6 +42,15 @@ pub struct RabbitMQCommon {
pub channel: Channel, pub channel: Channel,
} }
/// Defines the behavior for RabbitMQCommon client operations.
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait RabbitMQCommonTrait: Send + Sync {
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError>;
async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError>;
}
impl RabbitMQCommon { impl RabbitMQCommon {
/// Sets up a new RabbitMQ client or error /// Sets up a new RabbitMQ client or error
/// ///
@@ -54,7 +64,10 @@ impl RabbitMQCommon {
let channel = connection.create_channel().await?; let channel = connection.create_channel().await?;
Ok(Self { connection, channel }) Ok(Self { connection, channel })
} }
}
#[async_trait]
impl RabbitMQCommonTrait for RabbitMQCommon {
/// Function to set up the connection /// Function to set up the connection
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError> { async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError> {
debug!("Creating connection"); debug!("Creating connection");
@@ -64,7 +77,7 @@ impl RabbitMQCommon {
} }
/// Function to declare the exchange required /// Function to declare the exchange required
pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> { async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> {
debug!("Declaring exchange"); debug!("Declaring exchange");
self.channel self.channel
.exchange_declare( .exchange_declare(
@@ -81,4 +94,3 @@ impl RabbitMQCommon {
.map_err(|e| RabbitMQError::ExchangeError(e.to_string())) .map_err(|e| RabbitMQError::ExchangeError(e.to_string()))
} }
} }

View File

@@ -4,9 +4,10 @@ use lapin::{
use crate::models::ingress_object::IngressObject; use crate::models::ingress_object::IngressObject;
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
use tracing::{info, error}; use tracing::{info, error};
/// Struct to publish messages to RabbitMQ.
pub struct RabbitMQProducer { pub struct RabbitMQProducer {
common: RabbitMQCommon, common: RabbitMQCommon,
exchange_name: String, exchange_name: String,
@@ -14,6 +15,14 @@ pub struct RabbitMQProducer {
} }
impl RabbitMQProducer { impl RabbitMQProducer {
/// Creates a new `RabbitMQProducer` instance which sets up a RabbitMQ client,
/// declares a exchange if needed.
///
/// # Arguments
/// * `config` - A initialized RabbitMQConfig containing required configurations
///
/// # Returns
/// * `Result<Self, RabbitMQError>` - The created client or an error.
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> { pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?; let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?; common.declare_exchange(config, false).await?;
@@ -26,6 +35,13 @@ impl RabbitMQProducer {
} }
/// Publishes an IngressObject to RabbitMQ after serializing it to JSON. /// Publishes an IngressObject to RabbitMQ after serializing it to JSON.
///
/// # Arguments
/// * `self` - Reference to self
/// * `ingress_object` - A initialized IngressObject
///
/// # Returns
/// * `Result<Confirmation, RabbitMQError>` - Confirmation of sent message or error
pub async fn publish(&self, ingress_object: &IngressObject) -> Result<Confirmation, RabbitMQError> { pub async fn publish(&self, ingress_object: &IngressObject) -> Result<Confirmation, RabbitMQError> {
// Serialize IngressObject to JSON // Serialize IngressObject to JSON
let payload = serde_json::to_vec(ingress_object) let payload = serde_json::to_vec(ingress_object)