From 779b32f807b5de09d55e366bcf0f1206e180277d Mon Sep 17 00:00:00 2001 From: Per Stark Date: Tue, 1 Oct 2024 19:01:29 +0200 Subject: [PATCH] comments, llm schema --- Cargo.lock | 178 ++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + hello_world.txt | 1 - json_schema.md | 44 +++++++++ src/consumer.rs | 22 ----- src/lib.rs | 1 + src/models/file_info.rs | 15 +-- src/models/ingress_content.rs | 11 +++ src/models/ingress_object.rs | 14 +++ src/models/text_content.rs | 73 ++++++++++---- src/neo4j/client.rs | 103 ++++++++++++++++++++ src/neo4j/mod.rs | 1 + src/rabbitmq/consumer.rs | 36 ++++--- src/rabbitmq/mod.rs | 13 ++- test.txt | 1 - 15 files changed, 442 insertions(+), 72 deletions(-) delete mode 100644 hello_world.txt create mode 100644 json_schema.md create mode 100644 src/neo4j/client.rs create mode 100644 src/neo4j/mod.rs delete mode 100644 test.txt diff --git a/Cargo.lock b/Cargo.lock index 9a0a385..1aceb16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -577,10 +577,33 @@ dependencies = [ "iana-time-zone", "js-sys", "num-traits", + "serde", "wasm-bindgen", "windows-targets 0.52.6", ] +[[package]] +name = "chrono-tz" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59ae0466b83e838b81a54256c39d5d7c20b9d7daa10510a242d9b75abd5936e" +dependencies = [ + "chrono", + "chrono-tz-build", + "phf", +] + +[[package]] +name = "chrono-tz-build" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "433e39f13c9a060046954e0592a8d0a4bcb1040125cbf91cb8ee58964cfb350f" +dependencies = [ + "parse-zoneinfo", + "phf", + "phf_codegen", +] + [[package]] name = "cipher" version = "0.4.4" @@ -720,6 +743,36 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8566979429cf69b49a5c740c60791108e86440e8be149bbea4fe54d2c32d6e2" +[[package]] +name = "deadpool" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "421fe0f90f2ab22016f32a9881be5134fdd71c65298917084b0c7477cbc3856e" +dependencies = [ + "async-trait", + "deadpool-runtime", + "num_cpus", + "retain_mut", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + +[[package]] +name = "delegate" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee5df75c70b95bd3aacc8e2fd098797692fb1d54121019c4de481e42f04c8a1" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "der" version = "0.7.9" @@ -1469,6 +1522,45 @@ dependencies = [ "version_check", ] +[[package]] +name = "neo4rs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43dd99fe7dbc68f754759874d83ec2ca43a61ab7d51c10353d024094805382be" +dependencies = [ + "async-trait", + "backoff", + "bytes", + "chrono", + "chrono-tz", + "deadpool", + "delegate", + "futures", + "log", + "neo4rs-macros", + "paste", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "rustls-pemfile", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-rustls", + "url", + "webpki-roots", +] + +[[package]] +name = "neo4rs-macros" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53a0d57c55d2d1dc62a2b1d16a0a1079eb78d67c36bdf468d582ab4482ec7002" +dependencies = [ + "quote", + "syn 2.0.77", +] + [[package]] name = "nom" version = "7.1.3" @@ -1523,6 +1615,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "object" version = "0.36.4" @@ -1610,6 +1712,21 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "parse-zoneinfo" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1f2a05b18d44e2957b88f96ba460715e295bc1d7510468a2f3d3b44535d26c24" +dependencies = [ + "regex", +] + +[[package]] +name = "paste" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57c0d7b74b563b49d38dae00a0c37d4d6de9b432382b2892f0574ddcae73fd0a" + [[package]] name = "pbkdf2" version = "0.12.2" @@ -1635,6 +1752,44 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "phf" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ade2d8b8f33c7333b51bcf0428d37e217e9f32192ae4772156f65063b8ce03dc" +dependencies = [ + "phf_shared", +] + +[[package]] +name = "phf_codegen" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8d39688d359e6b34654d328e262234662d16cc0f60ec8dcbe5e718709342a5a" +dependencies = [ + "phf_generator", + "phf_shared", +] + +[[package]] +name = "phf_generator" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48e4cc64c2ad9ebe670cb8fd69dd50ae301650392e81c05f9bfcb2d5bdbc24b0" +dependencies = [ + "phf_shared", + "rand", +] + +[[package]] +name = "phf_shared" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90fcb95eef784c2ac79119d1dd819e162b5da872ce6f3c3abe1e8ca1c082f72b" +dependencies = [ + "siphasher", +] + [[package]] name = "pin-project" version = "1.1.5" @@ -2043,6 +2198,12 @@ dependencies = [ "thiserror", ] +[[package]] +name = "retain_mut" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4389f1d5789befaf6029ebd9f7dac4af7f7e3d61b69d4f30e2ac02b57e7712b0" + [[package]] name = "ring" version = "0.17.8" @@ -2372,6 +2533,12 @@ dependencies = [ "libc", ] +[[package]] +name = "siphasher" +version = "0.3.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" + [[package]] name = "slab" version = "0.4.9" @@ -2445,6 +2612,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" dependencies = [ "proc-macro2", + "quote", "unicode-ident", ] @@ -2939,6 +3107,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "webpki-roots" +version = "0.26.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "841c67bff177718f1d4dfefde8d8f0e78f9b6589319ba88312f567fc5841a958" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "winapi" version = "0.3.9" @@ -3216,6 +3393,7 @@ dependencies = [ "lapin", "mime", "mime_guess", + "neo4rs", "redis", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index a86c6e2..b7fe4eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ futures-lite = "2.3.0" lapin = { version = "2.5.0", features = ["serde_json"] } mime = "0.3.17" mime_guess = "2.0.5" +neo4rs = { version = "0.8.0", features = ["serde_json"] } redis = { version = "0.27.2", features = ["aio", "tokio-comp"] } serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" diff --git a/hello_world.txt b/hello_world.txt deleted file mode 100644 index 557db03..0000000 --- a/hello_world.txt +++ /dev/null @@ -1 +0,0 @@ -Hello World diff --git a/json_schema.md b/json_schema.md new file mode 100644 index 0000000..6d75db0 --- /dev/null +++ b/json_schema.md @@ -0,0 +1,44 @@ +In that case, here's an updated response schema that takes into account the entities being sources of knowledge and the user sending instructions and category with the LLM call: +```json +{ + "type": "object", + "properties": { + "knowledge_sources": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "relationships": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, + "target": {"type": "string", "description": "ID of the related knowledge source"} + }, + "required": ["type", "target"] + } + } + }, + "required": ["id", "type", "title", "description", "relationships"] + } + }, + "category": {"type": "string"}, + "instructions": {"type": "string"} + }, + "required": ["knowledge_sources", "category", "instructions"], + "additionalProperties": false +} +``` +In this schema, the `knowledge_sources` array contains objects that represent individual sources of knowledge, with properties like `id`, `type`, `title`, `description`, and `relationships`. The `relationships` array contains objects that represent relationships between knowledge sources, with properties like `type` and `target`. + +The `category` and `instructions` fields are also included as required properties, which will be populated with the user's input. + +Note that I've used enums to restrict the values of the `type` fields in the `knowledge_sources` and `relationships` arrays. This ensures that the LLM can only return specific types of knowledge sources and relationships. + +Also, as per your requirement, all fields are required, and `additionalProperties` is set to `false` to prevent the LLM from returning any additional fields not specified in the schema. + diff --git a/src/consumer.rs b/src/consumer.rs index 6c5e864..332ef10 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -30,28 +30,6 @@ async fn main() -> Result<(), Box> { // Start consuming messages consumer.process_messages().await?; - // loop { - // match consumer.consume().await { - // Ok((message, delivery)) => { - // info!("Received message: {}", message); - // // Process the message here - // // For example, you could insert it into a database - // // process_message(&message).await?; - - // info!("Done processing, acking"); - // consumer.ack_delivery(delivery).await? - // } - // Err(RabbitMQError::ConsumeError(e)) => { - // error!("Error consuming message: {}", e); - // // Optionally add a delay before trying again - // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - // } - // Err(e) => { - // error!("Unexpected error: {}", e); - // break; - // } - // } - // } Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index 7545dec..23aa5bc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ pub mod models; +pub mod neo4j; pub mod rabbitmq; pub mod redis; pub mod routes; diff --git a/src/models/file_info.rs b/src/models/file_info.rs index 56c6be9..d29df96 100644 --- a/src/models/file_info.rs +++ b/src/models/file_info.rs @@ -24,6 +24,7 @@ pub struct FileInfo { pub mime_type: String, } +/// Errors that can occur during FileInfo operations #[derive(Error, Debug)] pub enum FileError { #[error("IO error occurred: {0}")] @@ -98,11 +99,9 @@ impl FileInfo { /// Creates a new `FileInfo` instance from uploaded field data. /// /// # Arguments - /// /// * `field_data` - The uploaded file data. /// /// # Returns - /// /// * `Result` - The created `FileInfo` or an error. pub async fn new(field_data: FieldData, redis_client: &RedisClient) -> Result { let file = field_data.contents; // NamedTempFile @@ -157,11 +156,9 @@ impl FileInfo { /// Retrieves `FileInfo` based on UUID. /// /// # Arguments - /// /// * `uuid` - The UUID of the file. /// /// # Returns - /// /// * `Result` - The `FileInfo` or an error. pub async fn get(uuid: Uuid, redis_client: &RedisClient) -> Result { // Fetch SHA256 from UUID mapping @@ -178,13 +175,11 @@ impl FileInfo { /// Updates an existing file identified by UUID with new file data. /// /// # Arguments - /// /// * `uuid` - The UUID of the file to update. /// * `new_field_data` - The new file data. /// * `redis_client` - Reference to the RedisClient. /// /// # Returns - /// /// * `Result` - The updated `FileInfo` or an error. pub async fn update(uuid: Uuid, new_field_data: FieldData, redis_client: &RedisClient) -> Result { let new_file = new_field_data.contents; @@ -237,12 +232,10 @@ impl FileInfo { /// Deletes a file and its corresponding metadata based on UUID. /// /// # Arguments - /// /// * `uuid` - The UUID of the file to delete. /// * `redis_client` - Reference to the RedisClient. /// /// # Returns - /// /// * `Result<(), FileError>` - Empty result or an error. pub async fn delete(uuid: Uuid, redis_client: &RedisClient) -> Result<(), FileError> { // Retrieve FileInfo to get SHA256 and path @@ -279,13 +272,11 @@ impl FileInfo { /// Persists the file to the filesystem under `./data/{uuid}/{file_name}`. /// /// # Arguments - /// /// * `uuid` - The UUID of the file. /// * `file` - The temporary file to persist. /// * `file_name` - The sanitized file name. /// /// # Returns - /// /// * `Result` - The persisted file path or an error. async fn persist_file(uuid: &Uuid, file: NamedTempFile, file_name: &str) -> Result { let base_dir = Path::new("./data"); @@ -309,11 +300,9 @@ impl FileInfo { /// Calculates the SHA256 hash of the given file. /// /// # Arguments - /// /// * `file` - The file to hash. /// /// # Returns - /// /// * `Result` - The SHA256 hash as a hex string or an error. async fn get_sha(file: &NamedTempFile) -> Result { let mut reader = BufReader::new(file.as_file()); @@ -335,11 +324,9 @@ impl FileInfo { /// Guesses the MIME type based on the file extension. /// /// # Arguments - /// /// * `path` - The path to the file. /// /// # Returns - /// /// * `String` - The guessed MIME type as a string. fn guess_mime_type(path: &Path) -> String { from_path(path) diff --git a/src/models/ingress_content.rs b/src/models/ingress_content.rs index 1b22675..e6ebf20 100644 --- a/src/models/ingress_content.rs +++ b/src/models/ingress_content.rs @@ -7,6 +7,7 @@ use crate::redis::client::RedisClient; use super::{file_info::FileInfo, ingress_object::IngressObject }; +/// Struct defining the expected body when ingressing content. #[derive(Serialize, Deserialize, Debug)] pub struct IngressInput { pub content: Option, @@ -41,6 +42,13 @@ pub enum IngressContentError { } /// Function to create ingress objects from input. +/// +/// # Arguments +/// * `input` - IngressInput containing information needed to ingress content. +/// * `redis_client` - Initialized redis client needed to retrieve file information +/// +/// # Returns +/// * `Vec` - An array containing the ingressed objects, one file/contenttype per object. pub async fn create_ingress_objects( input: IngressInput, redis_client: &RedisClient, @@ -48,6 +56,7 @@ pub async fn create_ingress_objects( // Initialize list let mut object_list = Vec::new(); + // Create a IngressObject from input.content if it exists, checking for URL or text if let Some(input_content) = input.content { match Url::parse(&input_content) { Ok(url) => { @@ -69,6 +78,7 @@ pub async fn create_ingress_objects( } } + // Look up FileInfo objects using the redis db and the submitted uuids in input.files if let Some(file_uuids) = input.files { for uuid_str in file_uuids { let uuid = Uuid::parse_str(&uuid_str)?; @@ -88,6 +98,7 @@ pub async fn create_ingress_objects( } } + // If no objects are constructed, we return Err if object_list.is_empty() { return Err(IngressContentError::MimeDetection( "No valid content or files provided".into(), diff --git a/src/models/ingress_object.rs b/src/models/ingress_object.rs index 0317770..e35166e 100644 --- a/src/models/ingress_object.rs +++ b/src/models/ingress_object.rs @@ -3,6 +3,7 @@ use serde::{Deserialize, Serialize}; use super::{ingress_content::IngressContentError, text_content::TextContent}; +/// Knowledge object type, containing the content or reference to it, as well as metadata #[derive(Debug, Serialize, Deserialize, Clone)] pub enum IngressObject { Url { @@ -21,7 +22,15 @@ pub enum IngressObject { category: String, }, } + impl IngressObject { + /// Creates a new `TextContent` instance from a `IngressObject`. + /// + /// # Arguments + /// `&self` - A reference to the `IngressObject`. + /// + /// # Returns + /// `TextContent` - An object containing a text representation of the object, could be a scraped URL, parsed PDF, etc. pub async fn to_text_content(&self) -> Result { match self { IngressObject::Url { url, instructions, category } => { @@ -66,6 +75,11 @@ impl IngressObject { let content = tokio::fs::read_to_string(&file_info.path).await?; Ok(content) } + "text/markdown" => { + // Read the file and return its content + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } "application/pdf" => { // TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf` Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())) diff --git a/src/models/text_content.rs b/src/models/text_content.rs index 115f8e4..3c6bc27 100644 --- a/src/models/text_content.rs +++ b/src/models/text_content.rs @@ -70,26 +70,61 @@ impl TextContent { let client = async_openai::Client::new(); // Define the JSON Schema for the expected response - let schema = json!({ - "type": "object", - "properties": { - "json_ld": { - "type": "object", - "properties": { - "@context": { "type": "string" }, - "@type": { "type": "string" }, - "name": { "type": "string" } - // Define only the essential properties - }, - "required": ["@context", "@type", "name"], - "additionalProperties": false +// let schema = json!({ +// "type": "object", +// "properties": { +// "json_ld": { +// "type": "object", +// "properties": { +// "@context": { "type": "string" }, +// "@type": { "type": "string" }, +// "name": { "type": "string" } +// // Define only the essential properties +// }, +// "required": ["@context", "@type", "name"], +// "additionalProperties": false, +// }, +// "description": { "type": "string" }, +// "related_category": { "type": "string" }, +// "instructions": { "type": "string" } +// }, +// "required": ["json_ld", "description", "related_category", "instructions"], +// "additionalProperties": false +// }); +let schema = json!({ + "type": "object", + "properties": { + "knowledge_sources": { + "type": "array", + "items": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "type": {"type": "string", "enum": ["Document", "Page", "TextSnippet"]}, + "title": {"type": "string"}, + "description": {"type": "string"}, + "relationships": { + "type": "array", + "items": { + "type": "object", + "properties": { + "type": {"type": "string", "enum": ["RelatedTo", "RelevantTo", "SimilarTo"]}, + "target": {"type": "string", "description": "ID of the related knowledge source"} + }, + "required": ["type", "target"], + "additionalProperties": false, + } + } }, - "description": { "type": "string" }, - "related_category": { "type": "string" }, - "instructions": { "type": "string" } + "required": ["id", "type", "title", "description", "relationships"], + "additionalProperties": false, + } }, - "required": ["json_ld", "description", "related_category", "instructions"], - "additionalProperties": false + "category": {"type": "string"}, + "instructions": {"type": "string"} + }, + "required": ["knowledge_sources", "category", "instructions"], + "additionalProperties": false }); let response_format = async_openai::types::ResponseFormat::JsonSchema { @@ -114,7 +149,7 @@ impl TextContent { // Build the chat completion request let request = CreateChatCompletionRequestArgs::default() .model("gpt-4o-mini") - .max_tokens(1024u32) + .max_tokens(2048u32) .messages([ ChatCompletionRequestSystemMessage::from(system_message).into(), ChatCompletionRequestUserMessage::from(user_message).into(), diff --git a/src/neo4j/client.rs b/src/neo4j/client.rs new file mode 100644 index 0000000..00f86ca --- /dev/null +++ b/src/neo4j/client.rs @@ -0,0 +1,103 @@ +// use neo4rs::*; +// use serde::{Deserialize, Serialize}; + +// /// A struct representing a knowledge source in the graph database. +// #[derive(Deserialize, Serialize)] +// pub struct KnowledgeSource { +// pub id: String, +// pub title: String, +// pub description: String, +// pub relationships: Vec, +// } + +// /// A struct representing a relationship between knowledge sources. +// #[derive(Deserialize, Serialize)] +// pub struct Relationship { +// pub type_: String, +// pub target: String, +// } + +// /// A struct representing the result of an LLM analysis. +// #[derive(Deserialize, Serialize)] +// pub struct AnalysisResult { +// pub knowledge_sources: Vec, +// pub category: String, +// pub instructions: String, +// } + +// /// A trait for interacting with the Neo4j database. +// pub trait Neo4jClient { +// /// Create a new knowledge source in the graph database. +// fn create_knowledge_source( +// &self, +// knowledge_source: KnowledgeSource, +// ) -> Result<(), neo4rs::Error>; + +// /// Get a knowledge source by its ID. +// fn get_knowledge_source(&self, id: &str) -> Result; + +// /// Create a new relationship between knowledge sources. +// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error>; + +// /// Get all relationships for a given knowledge source. +// fn get_relationships(&self, id: &str) -> Result, neo4rs::Error>; +// } + +// /// A concrete implementation of the Neo4jClient trait. +// pub struct Neo4jClientImpl { +// client: Graph, +// } + +// impl Neo4jClientImpl { +// /// Create a new Neo4j client. +// pub async fn new(uri: &str, auth: &str, pass: &str) -> Result { +// let client = Graph::new(uri, auth, pass).await?; +// Ok(Neo4jClientImpl { client }) +// } +// } + +// impl Neo4jClient for Neo4jClientImpl { +// fn create_knowledge_source( +// &self, +// knowledge_source: KnowledgeSource, +// ) -> Result<(), neo4rs::Error> { +// let node = Node::new( +// knowledge_source.id, +// knowledge_source.title, +// knowledge_source.description, +// )?; +// self.client.create_node(node)?; +// Ok(()) +// } + +// fn get_knowledge_source(&self, id: &str) -> Result { +// let node = self.client.get_node(id)?; +// let knowledge_source = KnowledgeSource { +// id: node.id(), +// title: node.get_property("title")?, +// description: node.get_property("description")?, +// relationships: vec![], +// }; +// Ok(knowledge_source) +// } + +// fn create_relationship(&self, relationship: Relationship) -> Result<(), neo4rs::Error> { +// let rel = Relationship::new(relationship.type_, relationship.target)?; +// self.client.create_relationship(rel)?; +// Ok(()) +// } + +// fn get_relationships(&self, id: &str) -> Result, neo4rs::Error> { +// let node = self.client.get_node(id)?; +// let relationships = node.get_relationships()?; +// let mut result = vec![]; +// for rel in relationships { +// let relationship = Relationship { +// type_: rel.type_(), +// target: rel.target(), +// }; +// result.push(relationship); +// } +// Ok(result) +// } +// } diff --git a/src/neo4j/mod.rs b/src/neo4j/mod.rs new file mode 100644 index 0000000..b9babe5 --- /dev/null +++ b/src/neo4j/mod.rs @@ -0,0 +1 @@ +pub mod client; diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index 8990fd2..f36f382 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -3,7 +3,7 @@ use lapin::{ }; use futures_lite::stream::StreamExt; -use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject, text_content::TextContent}; +use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject }; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; @@ -16,16 +16,14 @@ pub struct RabbitMQConsumer { } impl RabbitMQConsumer { - //// Creates a new 'RabbitMQConsumer' instance which sets up a rabbitmq client, - //// declares a exchange if needed, declares and binds a queue and initializes the consumer - //// - //// # Arguments - //// - //// * 'config' - RabbitMQConfig - //// - //// # Returns - //// - //// * 'Result' - The created client or an error. + /// Creates a new 'RabbitMQConsumer' instance which sets up a rabbitmq client, + /// declares a exchange if needed, declares and binds a queue and initializes the consumer + /// + /// # Arguments + /// * 'config' - A initialized RabbitMQConfig containing required configurations + /// + /// # Returns + /// * 'Result' - The created client or an error. pub async fn new(config: &RabbitMQConfig) -> Result { let common = RabbitMQCommon::new(config).await?; @@ -42,6 +40,7 @@ impl RabbitMQConsumer { Ok(Self { common, queue, consumer }) } + /// Sets up the consumer based on the channel and `RabbitMQConfig`. async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result { channel .basic_consume( @@ -52,7 +51,7 @@ impl RabbitMQConsumer { ) .await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string())) } - + /// Declares the queue based on the channel and `RabbitMQConfig`. async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result { channel .queue_declare( @@ -66,7 +65,7 @@ impl RabbitMQConsumer { .await .map_err(|e| RabbitMQError::QueueError(e.to_string())) } - + /// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`. async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> { channel .queue_bind( @@ -80,7 +79,14 @@ impl RabbitMQConsumer { .map_err(|e| RabbitMQError::QueueError(e.to_string())) } - /// Consumes a message and returns the deserialized IngressContent along with the Delivery + /// Consumes a message and returns the message along with delivery details. + /// + /// # Arguments + /// * `&self` - A reference to self + /// + /// # Returns + /// `IngressObject` - The object containing content and metadata. + /// `Delivery` - A delivery reciept, required to ack or nack the delivery. pub async fn consume(&self) -> Result<(IngressObject, Delivery), RabbitMQError> { // Receive the next message let delivery = self.consumer.clone().next().await @@ -103,6 +109,8 @@ impl RabbitMQConsumer { Ok(()) } + /// Function to continually consume messages as they come in + /// WIP pub async fn process_messages(&self) -> Result<(), RabbitMQError> { loop { match self.consume().await { diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index 8a8f1ea..ad1615d 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -3,11 +3,11 @@ pub mod consumer; use lapin::{ options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind - }; use thiserror::Error; use tracing::debug; +/// Possible errors related to RabbitMQ operations. #[derive(Error, Debug)] pub enum RabbitMQError { #[error("Failed to connect to RabbitMQ: {0}")] @@ -26,6 +26,7 @@ pub enum RabbitMQError { QueueError(String), } +/// Struct containing the information required to set up a client and connection. #[derive(Clone)] pub struct RabbitMQConfig { pub amqp_addr: String, @@ -34,18 +35,27 @@ pub struct RabbitMQConfig { pub routing_key: String, } +/// Struct containing the connection and channel of a client pub struct RabbitMQCommon { pub connection: Connection, pub channel: Channel, } impl RabbitMQCommon { + /// Sets up a new RabbitMQ client or error + /// + /// # Arguments + /// * `RabbitMQConfig` - Configuration object with required information + /// + /// # Returns + /// * `self` - A initialized instance of the client pub async fn new(config: &RabbitMQConfig) -> Result { let connection = Self::create_connection(config).await?; let channel = connection.create_channel().await?; Ok(Self { connection, channel }) } + /// Function to set up the connection async fn create_connection(config: &RabbitMQConfig) -> Result { debug!("Creating connection"); Connection::connect(&config.amqp_addr, ConnectionProperties::default()) @@ -53,6 +63,7 @@ impl RabbitMQCommon { .map_err(RabbitMQError::ConnectionError) } + /// Function to declare the exchange required pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> { debug!("Declaring exchange"); self.channel diff --git a/test.txt b/test.txt deleted file mode 100644 index 9daeafb..0000000 --- a/test.txt +++ /dev/null @@ -1 +0,0 @@ -test