comments, llm schema

This commit is contained in:
Per Stark
2024-10-01 19:01:29 +02:00
parent dcb82ca454
commit 779b32f807
15 changed files with 442 additions and 72 deletions

178
Cargo.lock generated
View File

@@ -577,10 +577,33 @@ dependencies = [
"iana-time-zone",
"js-sys",
"num-traits",
"serde",
"wasm-bindgen",
"windows-targets 0.52.6",
]
[[package]]
name = "chrono-tz"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e"
dependencies = [
"chrono",
"chrono-tz-build",
"phf",
]
[[package]]
name = "chrono-tz-build"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f"
dependencies = [
"parse-zoneinfo",
"phf",
"phf_codegen",
]
[[package]]
name = "cipher"
version = "0.4.4"
@@ -720,6 +743,36 @@ version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2"
[[package]]
name = "deadpool"
version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e"
dependencies = [
"async-trait",
"deadpool-runtime",
"num_cpus",
"retain_mut",
"tokio",
]
[[package]]
name = "deadpool-runtime"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b"
[[package]]
name = "delegate"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ee5df75c70b95bd3aacc8e2fd098797692fb1d54121019c4de481e42f04c8a1"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "der"
version = "0.7.9"
@@ -1469,6 +1522,45 @@ dependencies = [
"version_check",
]
[[package]]
name = "neo4rs"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43dd99fe7dbc68f754759874d83ec2ca43a61ab7d51c10353d024094805382be"
dependencies = [
"async-trait",
"backoff",
"bytes",
"chrono",
"chrono-tz",
"deadpool",
"delegate",
"futures",
"log",
"neo4rs-macros",
"paste",
"pin-project-lite",
"rustls-native-certs 0.7.3",
"rustls-pemfile",
"serde",
"serde_json",
"thiserror",
"tokio",
"tokio-rustls",
"url",
"webpki-roots",
]
[[package]]
name = "neo4rs-macros"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53a0d57c55d2d1dc62a2b1d16a0a1079eb78d67c36bdf468d582ab4482ec7002"
dependencies = [
"quote",
"syn 2.0.77",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -1523,6 +1615,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi 0.3.9",
"libc",
]
[[package]]
name = "object"
version = "0.36.4"
@@ -1610,6 +1712,21 @@ dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "parse-zoneinfo"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24"
dependencies = [
"regex",
]
[[package]]
name = "paste"
version = "1.0.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a"
[[package]]
name = "pbkdf2"
version = "0.12.2"
@@ -1635,6 +1752,44 @@ version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e"
[[package]]
name = "phf"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_codegen"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a"
dependencies = [
"phf_generator",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0"
dependencies = [
"phf_shared",
"rand",
]
[[package]]
name = "phf_shared"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "1.1.5"
@@ -2043,6 +2198,12 @@ dependencies = [
"thiserror",
]
[[package]]
name = "retain_mut"
version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0"
[[package]]
name = "ring"
version = "0.17.8"
@@ -2372,6 +2533,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.3.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d"
[[package]]
name = "slab"
version = "0.4.9"
@@ -2445,6 +2612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
@@ -2939,6 +3107,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "0.26.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.3.9"
@@ -3216,6 +3393,7 @@ dependencies = [
"lapin",
"mime",
"mime_guess",
"neo4rs",
"redis",
"serde",
"serde_json",

View File

@@ -15,6 +15,7 @@ futures-lite = "2.3.0"
lapin = { version = "2.5.0", features = ["serde_json"] }
mime = "0.3.17"
mime_guess = "2.0.5"
neo4rs = { version = "0.8.0", features = ["serde_json"] }
redis = { version = "0.27.2", features = ["aio", "tokio-comp"] }
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"

View File

@@ -1 +0,0 @@
Hello World

44
json_schema.md Normal file
View File

@@ -0,0 +1,44 @@
In that case, here's an updated response schema that takes into account the entities being sources of knowledge and the user sending instructions and category with the LLM call:
```json
{
"type": "object",
"properties": {
"knowledge_sources": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]},
"title": {"type": "string"},
"description": {"type": "string"},
"relationships": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]},
"target": {"type": "string", "description": "ID of the related knowledge source"}
},
"required": ["type", "target"]
}
}
},
"required": ["id", "type", "title", "description", "relationships"]
}
},
"category": {"type": "string"},
"instructions": {"type": "string"}
},
"required": ["knowledge_sources", "category", "instructions"],
"additionalProperties": false
}
```
In this schema, the `knowledge_sources` array contains objects that represent individual sources of knowledge, with properties like `id`, `type`, `title`, `description`, and `relationships`. The `relationships` array contains objects that represent relationships between knowledge sources, with properties like `type` and `target`.
The `category` and `instructions` fields are also included as required properties, which will be populated with the user's input.
Note that I've used enums to restrict the values of the `type` fields in the `knowledge_sources` and `relationships` arrays. This ensures that the LLM can only return specific types of knowledge sources and relationships.
Also, as per your requirement, all fields are required, and `additionalProperties` is set to `false` to prevent the LLM from returning any additional fields not specified in the schema.

View File

@@ -30,28 +30,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Start consuming messages
consumer.process_messages().await?;
// loop {
// match consumer.consume().await {
// Ok((message, delivery)) => {
// info!("Received message: {}", message);
// // Process the message here
// // For example, you could insert it into a database
// // process_message(&message).await?;
// info!("Done processing, acking");
// consumer.ack_delivery(delivery).await?
// }
// Err(RabbitMQError::ConsumeError(e)) => {
// error!("Error consuming message: {}", e);
// // Optionally add a delay before trying again
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// }
// Err(e) => {
// error!("Unexpected error: {}", e);
// break;
// }
// }
// }
Ok(())
}

View File

@@ -1,4 +1,5 @@
pub mod models;
pub mod neo4j;
pub mod rabbitmq;
pub mod redis;
pub mod routes;

View File

@@ -24,6 +24,7 @@ pub struct FileInfo {
pub mime_type: String,
}
/// Errors that can occur during FileInfo operations
#[derive(Error, Debug)]
pub enum FileError {
#[error("IO error occurred: {0}")]
@@ -98,11 +99,9 @@ impl FileInfo {
/// Creates a new `FileInfo` instance from uploaded field data.
///
/// # Arguments
///
/// * `field_data` - The uploaded file data.
///
/// # Returns
///
/// * `Result<FileInfo, FileError>` - The created `FileInfo` or an error.
pub async fn new(field_data: FieldData<NamedTempFile>, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
let file = field_data.contents; // NamedTempFile
@@ -157,11 +156,9 @@ impl FileInfo {
/// Retrieves `FileInfo` based on UUID.
///
/// # Arguments
///
/// * `uuid` - The UUID of the file.
///
/// # Returns
///
/// * `Result<FileInfo, FileError>` - The `FileInfo` or an error.
pub async fn get(uuid: Uuid, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
// Fetch SHA256 from UUID mapping
@@ -178,13 +175,11 @@ impl FileInfo {
/// Updates an existing file identified by UUID with new file data.
///
/// # Arguments
///
/// * `uuid` - The UUID of the file to update.
/// * `new_field_data` - The new file data.
/// * `redis_client` - Reference to the RedisClient.
///
/// # Returns
///
/// * `Result<FileInfo, FileError>` - The updated `FileInfo` or an error.
pub async fn update(uuid: Uuid, new_field_data: FieldData<NamedTempFile>, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
let new_file = new_field_data.contents;
@@ -237,12 +232,10 @@ impl FileInfo {
/// Deletes a file and its corresponding metadata based on UUID.
///
/// # Arguments
///
/// * `uuid` - The UUID of the file to delete.
/// * `redis_client` - Reference to the RedisClient.
///
/// # Returns
///
/// * `Result<(), FileError>` - Empty result or an error.
pub async fn delete(uuid: Uuid, redis_client: &RedisClient) -> Result<(), FileError> {
// Retrieve FileInfo to get SHA256 and path
@@ -279,13 +272,11 @@ impl FileInfo {
/// Persists the file to the filesystem under `./data/{uuid}/{file_name}`.
///
/// # Arguments
///
/// * `uuid` - The UUID of the file.
/// * `file` - The temporary file to persist.
/// * `file_name` - The sanitized file name.
///
/// # Returns
///
/// * `Result<PathBuf, FileError>` - The persisted file path or an error.
async fn persist_file(uuid: &Uuid, file: NamedTempFile, file_name: &str) -> Result<PathBuf, FileError> {
let base_dir = Path::new("./data");
@@ -309,11 +300,9 @@ impl FileInfo {
/// Calculates the SHA256 hash of the given file.
///
/// # Arguments
///
/// * `file` - The file to hash.
///
/// # Returns
///
/// * `Result<String, FileError>` - The SHA256 hash as a hex string or an error.
async fn get_sha(file: &NamedTempFile) -> Result<String, FileError> {
let mut reader = BufReader::new(file.as_file());
@@ -335,11 +324,9 @@ impl FileInfo {
/// Guesses the MIME type based on the file extension.
///
/// # Arguments
///
/// * `path` - The path to the file.
///
/// # Returns
///
/// * `String` - The guessed MIME type as a string.
fn guess_mime_type(path: &Path) -> String {
from_path(path)

View File

@@ -7,6 +7,7 @@ use crate::redis::client::RedisClient;
use super::{file_info::FileInfo, ingress_object::IngressObject };
/// Struct defining the expected body when ingressing content.
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressInput {
pub content: Option<String>,
@@ -41,6 +42,13 @@ pub enum IngressContentError {
}
/// Function to create ingress objects from input.
///
/// # Arguments
/// * `input` - IngressInput containing information needed to ingress content.
/// * `redis_client` - Initialized redis client needed to retrieve file information
///
/// # Returns
/// * `Vec<IngressObject>` - An array containing the ingressed objects, one file/contenttype per object.
pub async fn create_ingress_objects(
input: IngressInput,
redis_client: &RedisClient,
@@ -48,6 +56,7 @@ pub async fn create_ingress_objects(
// Initialize list
let mut object_list = Vec::new();
// Create a IngressObject from input.content if it exists, checking for URL or text
if let Some(input_content) = input.content {
match Url::parse(&input_content) {
Ok(url) => {
@@ -69,6 +78,7 @@ pub async fn create_ingress_objects(
}
}
// Look up FileInfo objects using the redis db and the submitted uuids in input.files
if let Some(file_uuids) = input.files {
for uuid_str in file_uuids {
let uuid = Uuid::parse_str(&uuid_str)?;
@@ -88,6 +98,7 @@ pub async fn create_ingress_objects(
}
}
// If no objects are constructed, we return Err
if object_list.is_empty() {
return Err(IngressContentError::MimeDetection(
"No valid content or files provided".into(),

View File

@@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize};
use super::{ingress_content::IngressContentError, text_content::TextContent};
/// Knowledge object type, containing the content or reference to it, as well as metadata
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum IngressObject {
Url {
@@ -21,7 +22,15 @@ pub enum IngressObject {
category: String,
},
}
impl IngressObject {
/// Creates a new `TextContent` instance from a `IngressObject`.
///
/// # Arguments
/// `&self` - A reference to the `IngressObject`.
///
/// # Returns
/// `TextContent` - An object containing a text representation of the object, could be a scraped URL, parsed PDF, etc.
pub async fn to_text_content(&self) -> Result<TextContent, IngressContentError> {
match self {
IngressObject::Url { url, instructions, category } => {
@@ -66,6 +75,11 @@ impl IngressObject {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"text/markdown" => {
// Read the file and return its content
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"application/pdf" => {
// TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf`
Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone()))

View File

@@ -70,26 +70,61 @@ impl TextContent {
let client = async_openai::Client::new();
// Define the JSON Schema for the expected response
let schema = json!({
"type": "object",
"properties": {
"json_ld": {
"type": "object",
"properties": {
"@context": { "type": "string" },
"@type": { "type": "string" },
"name": { "type": "string" }
// Define only the essential properties
},
"required": ["@context", "@type", "name"],
"additionalProperties": false
// let schema = json!({
// "type": "object",
// "properties": {
// "json_ld": {
// "type": "object",
// "properties": {
// "@context": { "type": "string" },
// "@type": { "type": "string" },
// "name": { "type": "string" }
// // Define only the essential properties
// },
// "required": ["@context", "@type", "name"],
// "additionalProperties": false,
// },
// "description": { "type": "string" },
// "related_category": { "type": "string" },
// "instructions": { "type": "string" }
// },
// "required": ["json_ld", "description", "related_category", "instructions"],
// "additionalProperties": false
// });
let schema = json!({
"type": "object",
"properties": {
"knowledge_sources": {
"type": "array",
"items": {
"type": "object",
"properties": {
"id": {"type": "string"},
"type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]},
"title": {"type": "string"},
"description": {"type": "string"},
"relationships": {
"type": "array",
"items": {
"type": "object",
"properties": {
"type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]},
"target": {"type": "string", "description": "ID of the related knowledge source"}
},
"required": ["type", "target"],
"additionalProperties": false,
}
}
},
"description": { "type": "string" },
"related_category": { "type": "string" },
"instructions": { "type": "string" }
"required": ["id", "type", "title", "description", "relationships"],
"additionalProperties": false,
}
},
"required": ["json_ld", "description", "related_category", "instructions"],
"additionalProperties": false
"category": {"type": "string"},
"instructions": {"type": "string"}
},
"required": ["knowledge_sources", "category", "instructions"],
"additionalProperties": false
});
let response_format = async_openai::types::ResponseFormat::JsonSchema {
@@ -114,7 +149,7 @@ impl TextContent {
// Build the chat completion request
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.max_tokens(1024u32)
.max_tokens(2048u32)
.messages([
ChatCompletionRequestSystemMessage::from(system_message).into(),
ChatCompletionRequestUserMessage::from(user_message).into(),

103
src/neo4j/client.rs Normal file
View File

@@ -0,0 +1,103 @@
// use neo4rs::*;
// use serde::{Deserialize, Serialize};
// /// A struct representing a knowledge source in the graph database.
// #[derive(Deserialize, Serialize)]
// pub struct KnowledgeSource {
// pub id: String,
// pub title: String,
// pub description: String,
// pub relationships: Vec<Relationship>,
// }
// /// A struct representing a relationship between knowledge sources.
// #[derive(Deserialize, Serialize)]
// pub struct Relationship {
// pub type_: String,
// pub target: String,
// }
// /// A struct representing the result of an LLM analysis.
// #[derive(Deserialize, Serialize)]
// pub struct AnalysisResult {
// pub knowledge_sources: Vec<KnowledgeSource>,
// pub category: String,
// pub instructions: String,
// }
// /// A trait for interacting with the Neo4j database.
// pub trait Neo4jClient {
// /// Create a new knowledge source in the graph database.
// fn create_knowledge_source(
// &self,
// knowledge_source: KnowledgeSource,
// ) -> Result<(), neo4rs::Error>;
// /// Get a knowledge source by its ID.
// fn get_knowledge_source(&self, id: &str) -> Result<KnowledgeSource, neo4rs::Error>;
// /// Create a new relationship between knowledge sources.
// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error>;
// /// Get all relationships for a given knowledge source.
// fn get_relationships(&self, id: &str) -> Result<Vec<Relationship>, neo4rs::Error>;
// }
// /// A concrete implementation of the Neo4jClient trait.
// pub struct Neo4jClientImpl {
// client: Graph,
// }
// impl Neo4jClientImpl {
// /// Create a new Neo4j client.
// pub async fn new(uri: &str, auth: &str, pass: &str) -> Result<Self, neo4rs::Error> {
// let client = Graph::new(uri, auth, pass).await?;
// Ok(Neo4jClientImpl { client })
// }
// }
// impl Neo4jClient for Neo4jClientImpl {
// fn create_knowledge_source(
// &self,
// knowledge_source: KnowledgeSource,
// ) -> Result<(), neo4rs::Error> {
// let node = Node::new(
// knowledge_source.id,
// knowledge_source.title,
// knowledge_source.description,
// )?;
// self.client.create_node(node)?;
// Ok(())
// }
// fn get_knowledge_source(&self, id: &str) -> Result<KnowledgeSource, neo4rs::Error> {
// let node = self.client.get_node(id)?;
// let knowledge_source = KnowledgeSource {
// id: node.id(),
// title: node.get_property("title")?,
// description: node.get_property("description")?,
// relationships: vec![],
// };
// Ok(knowledge_source)
// }
// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error> {
// let rel = Relationship::new(relationship.type_, relationship.target)?;
// self.client.create_relationship(rel)?;
// Ok(())
// }
// fn get_relationships(&self, id: &str) -> Result<Vec<Relationship>, neo4rs::Error> {
// let node = self.client.get_node(id)?;
// let relationships = node.get_relationships()?;
// let mut result = vec![];
// for rel in relationships {
// let relationship = Relationship {
// type_: rel.type_(),
// target: rel.target(),
// };
// result.push(relationship);
// }
// Ok(result)
// }
// }

1
src/neo4j/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod client;

View File

@@ -3,7 +3,7 @@ use lapin::{
};
use futures_lite::stream::StreamExt;
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject, text_content::TextContent};
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject };
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
@@ -16,16 +16,14 @@ pub struct RabbitMQConsumer {
}
impl RabbitMQConsumer {
//// Creates a new 'RabbitMQConsumer' instance which sets up a rabbitmq client,
//// declares a exchange if needed, declares and binds a queue and initializes the consumer
////
//// # Arguments
////
//// * 'config' - RabbitMQConfig
////
//// # Returns
////
//// * 'Result<Self, RabbitMQError>' - The created client or an error.
/// Creates a new 'RabbitMQConsumer' instance which sets up a rabbitmq client,
/// declares a exchange if needed, declares and binds a queue and initializes the consumer
///
/// # Arguments
/// * 'config' - A initialized RabbitMQConfig containing required configurations
///
/// # Returns
/// * 'Result<Self, RabbitMQError>' - The created client or an error.
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
@@ -42,6 +40,7 @@ impl RabbitMQConsumer {
Ok(Self { common, queue, consumer })
}
/// Sets up the consumer based on the channel and `RabbitMQConfig`.
async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> {
channel
.basic_consume(
@@ -52,7 +51,7 @@ impl RabbitMQConsumer {
)
.await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string()))
}
/// Declares the queue based on the channel and `RabbitMQConfig`.
async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result<Queue, RabbitMQError> {
channel
.queue_declare(
@@ -66,7 +65,7 @@ impl RabbitMQConsumer {
.await
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
/// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`.
async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> {
channel
.queue_bind(
@@ -80,7 +79,14 @@ impl RabbitMQConsumer {
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
/// Consumes a message and returns the deserialized IngressContent along with the Delivery
/// Consumes a message and returns the message along with delivery details.
///
/// # Arguments
/// * `&self` - A reference to self
///
/// # Returns
/// `IngressObject` - The object containing content and metadata.
/// `Delivery` - A delivery reciept, required to ack or nack the delivery.
pub async fn consume(&self) -> Result<(IngressObject, Delivery), RabbitMQError> {
// Receive the next message
let delivery = self.consumer.clone().next().await
@@ -103,6 +109,8 @@ impl RabbitMQConsumer {
Ok(())
}
/// Function to continually consume messages as they come in
/// WIP
pub async fn process_messages(&self) -> Result<(), RabbitMQError> {
loop {
match self.consume().await {

View File

@@ -3,11 +3,11 @@ pub mod consumer;
use lapin::{
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind
};
use thiserror::Error;
use tracing::debug;
/// Possible errors related to RabbitMQ operations.
#[derive(Error, Debug)]
pub enum RabbitMQError {
#[error("Failed to connect to RabbitMQ: {0}")]
@@ -26,6 +26,7 @@ pub enum RabbitMQError {
QueueError(String),
}
/// Struct containing the information required to set up a client and connection.
#[derive(Clone)]
pub struct RabbitMQConfig {
pub amqp_addr: String,
@@ -34,18 +35,27 @@ pub struct RabbitMQConfig {
pub routing_key: String,
}
/// Struct containing the connection and channel of a client
pub struct RabbitMQCommon {
pub connection: Connection,
pub channel: Channel,
}
impl RabbitMQCommon {
/// Sets up a new RabbitMQ client or error
///
/// # Arguments
/// * `RabbitMQConfig` - Configuration object with required information
///
/// # Returns
/// * `self` - A initialized instance of the client
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let connection = Self::create_connection(config).await?;
let channel = connection.create_channel().await?;
Ok(Self { connection, channel })
}
/// Function to set up the connection
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError> {
debug!("Creating connection");
Connection::connect(&config.amqp_addr, ConnectionProperties::default())
@@ -53,6 +63,7 @@ impl RabbitMQCommon {
.map_err(RabbitMQError::ConnectionError)
}
/// Function to declare the exchange required
pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> {
debug!("Declaring exchange");
self.channel

View File

@@ -1 +0,0 @@
test