From a348950234bcaba465f6e3d8d6b91c077a7b3827 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Mon, 30 Sep 2024 20:51:42 +0200 Subject: [PATCH] llm analysis wip --- .gitignore | 1 + Cargo.lock | 373 +++++++++++++++++- Cargo.toml | 1 + .../flake.nix | 57 +++ src/consumer.rs | 4 +- src/models/ingress_content.rs | 1 + src/models/ingress_object.rs | 6 +- src/models/text_content.rs | 156 +++++--- src/rabbitmq/consumer.rs | 110 +----- src/utils/mod.rs | 1 + 10 files changed, 555 insertions(+), 155 deletions(-) create mode 100644 data/182cb7b4-2286-4f40-9237-30b812d67696/flake.nix diff --git a/.gitignore b/.gitignore index c5e7996..15085da 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ /target .aider .aider.chat.history.md .aider.input.history .aider.tags.cache.v3 .aider.tags.cache.v3/cache.db +data diff --git a/Cargo.lock b/Cargo.lock index 15ae3bc..9a0a385 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-convert" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d416feee97712e43152cd42874de162b8f9b77295b1c85e5d92725cc8310bae" +dependencies = [ + "async-trait", +] + [[package]] name = "async-executor" version = "1.13.1" @@ -261,6 +270,32 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "async-openai" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6db3286b4f52b6556ac5208fb575d035eca61a2bf40d7e75d1db2733ffc599f" +dependencies = [ + "async-convert", + "backoff", + "base64", + "bytes", + "derive_builder", + "eventsource-stream", + "futures", + "rand", + "reqwest", + "reqwest-eventsource", + "secrecy", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + [[package]] name = "async-reactor-trait" version = "1.1.0" @@ -403,6 +438,20 @@ dependencies = [ "ubyte", ] +[[package]] +name = "backoff" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +dependencies = [ + "futures-core", + "getrandom", + "instant", + "pin-project-lite", + "rand", + "tokio", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -718,6 +767,37 @@ dependencies = [ "powerfmt", ] +[[package]] +name = "derive_builder" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd33f37ee6a119146a1781d3356a7c26028f83d779b2e04ecd45fdc75c76877b" +dependencies = [ + "derive_builder_macro", +] + +[[package]] +name = "derive_builder_core" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7431fa049613920234f22c47fdc33e6cf3ee83067091ea4277a3f8c4587aae38" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "derive_builder_macro" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4abae7035bf79b9877b779505d8cf3749285b80c43941eda66604841889451dc" +dependencies = [ + "derive_builder_core", + "syn 2.0.77", +] + [[package]] name = "des" version = "0.8.1" @@ -801,6 +881,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "eventsource-stream" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74fef4569247a5f429d9156b9d0a2599914385dd189c539334c625d8099d90ab" +dependencies = [ + "futures-core", + "nom", + "pin-project-lite", +] + [[package]] name = "executor-trait" version = "2.1.0" @@ -956,6 +1047,12 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +[[package]] +name = "futures-timer" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" + [[package]] name = "futures-util" version = "0.3.30" @@ -1097,6 +1194,25 @@ dependencies = [ "pin-project-lite", "smallvec", "tokio", + "want", +] + +[[package]] +name = "hyper-rustls" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" +dependencies = [ + "futures-util", + "http", + "hyper", + "hyper-util", + "rustls", + "rustls-native-certs 0.8.0", + "rustls-pki-types", + "tokio", + "tokio-rustls", + "tower-service", ] [[package]] @@ -1106,12 +1222,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "da62f120a8a37763efb0cf8fdf264b884c7b8b9ac8660b900c8661030c00e6ba" dependencies = [ "bytes", + "futures-channel", "futures-util", "http", "http-body", "hyper", "pin-project-lite", + "socket2 0.5.7", "tokio", + "tower", + "tower-service", + "tracing", ] [[package]] @@ -1183,6 +1304,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "ipnet" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "187674a687eed5fe42285b40c6291f9a01517d415fad1c3cbc6a9f778af7fcd4" + [[package]] name = "itoa" version = "1.0.11" @@ -1672,6 +1799,54 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quinn" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls", + "socket2 0.5.7", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +dependencies = [ + "bytes", + "rand", + "ring", + "rustc-hash", + "rustls", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" +dependencies = [ + "libc", + "once_cell", + "socket2 0.5.7", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "quote" version = "1.0.37" @@ -1807,6 +1982,67 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "reqwest" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" +dependencies = [ + "base64", + "bytes", + "futures-core", + "futures-util", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-util", + "ipnet", + "js-sys", + "log", + "mime", + "mime_guess", + "once_cell", + "percent-encoding", + "pin-project-lite", + "quinn", + "rustls", + "rustls-native-certs 0.8.0", + "rustls-pemfile", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tokio-rustls", + "tokio-util", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "wasm-streams", + "web-sys", + "windows-registry", +] + +[[package]] +name = "reqwest-eventsource" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "632c55746dbb44275691640e7b40c907c16a2dc1a5842aa98aaec90da6ec6bde" +dependencies = [ + "eventsource-stream", + "futures-core", + "futures-timer", + "mime", + "nom", + "pin-project-lite", + "reqwest", + "thiserror", +] + [[package]] name = "ring" version = "0.17.8" @@ -1828,6 +2064,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rusticata-macros" version = "4.1.0" @@ -1886,7 +2128,7 @@ checksum = "2a980454b497c439c274f2feae2523ed8138bbd3d323684e1435fec62f800481" dependencies = [ "log", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.7.3", "rustls-pki-types", "rustls-webpki", ] @@ -1904,6 +2146,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "2.1.3" @@ -1978,6 +2233,16 @@ dependencies = [ "sha2", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "serde", + "zeroize", +] + [[package]] name = "security-framework" version = "2.11.1" @@ -2205,6 +2470,9 @@ name = "sync_wrapper" version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] [[package]] name = "synstructure" @@ -2347,6 +2615,28 @@ dependencies = [ "syn 2.0.77", ] +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls", + "rustls-pki-types", + "tokio", +] + +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "tokio-util" version = "0.7.12" @@ -2450,6 +2740,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "try-lock" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + [[package]] name = "typenum" version = "1.17.0" @@ -2538,6 +2834,15 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "317211a0dc0ceedd78fb2ca9a44aed3d7b9b26f81870d485c07122b4350673b7" +[[package]] +name = "want" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bfa7760aed19e106de2c7c0b581b509f2f25d3dacaf737cb82ac61bc6d760b0e" +dependencies = [ + "try-lock", +] + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -2570,6 +2875,18 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.43" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e9300f63a621e96ed275155c108eb6f843b6a26d053f122ab69724559dc8ed" +dependencies = [ + "cfg-if", + "js-sys", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.93" @@ -2599,6 +2916,29 @@ version = "0.2.93" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" +[[package]] +name = "wasm-streams" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e072d4e72f700fb3443d8fe94a39315df013eef1104903cdb0a2abd322bbecd" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + +[[package]] +name = "web-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26fdeaafd9bd129f65e7c031593c24d62186301e0c72c8978fa1678be7d532c0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "winapi" version = "0.3.9" @@ -2630,6 +2970,36 @@ dependencies = [ "windows-targets 0.52.6", ] +[[package]] +name = "windows-registry" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" +dependencies = [ + "windows-result", + "windows-strings", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-result" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" +dependencies = [ + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-strings" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" +dependencies = [ + "windows-result", + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -2837,6 +3207,7 @@ checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" name = "zettle_db" version = "0.1.0" dependencies = [ + "async-openai", "axum", "axum_typed_multipart", "bytes", diff --git a/Cargo.toml b/Cargo.toml index ab8cd6b..a86c6e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-openai = "0.24.1" axum = { version = "0.7.5", features = ["multipart", "macros"] } axum_typed_multipart = "0.12.1" bytes = { version = "1.7.2", features = ["serde"] } diff --git a/data/182cb7b4-2286-4f40-9237-30b812d67696/flake.nix b/data/182cb7b4-2286-4f40-9237-30b812d67696/flake.nix new file mode 100644 index 0000000..9e220a0 --- /dev/null +++ b/data/182cb7b4-2286-4f40-9237-30b812d67696/flake.nix @@ -0,0 +1,57 @@ +{ + inputs = { + nixpkgs.url = "github:cachix/devenv-nixpkgs/rolling"; + systems.url = "github:nix-systems/default"; + devenv.url = "github:cachix/devenv"; + devenv.inputs.nixpkgs.follows = "nixpkgs"; + }; + + nixConfig = { + extra-trusted-public-keys = "devenv.cachix.org-1:w1cLUi8dv3hnoSPGAuibQv+f9TZLr6cv/Hm9XgU50cw="; + extra-substituters = "https://devenv.cachix.org"; + }; + + outputs = { + self, + nixpkgs, + devenv, + systems, + ... + } @ inputs: let + forEachSystem = nixpkgs.lib.genAttrs (import systems); + in { + packages = forEachSystem (system: { + devenv-up = self.devShells.${system}.default.config.procfileScript; + }); + + devShells = + forEachSystem + (system: let + pkgs = nixpkgs.legacyPackages.${system}; + in { + default = devenv.lib.mkShell { + inherit inputs pkgs; + modules = [ + { + # https://devenv.sh/reference/options/ + enterShell = '' + echo "run devenv up -d to start and monitor services" + ''; + + languages.rust.enable = true; + + services = { + redis = { + enable = true; + }; + rabbitmq = { + enable = true; + plugins = ["tracing"]; + }; + }; + } + ]; + }; + }); + }; +} diff --git a/src/consumer.rs b/src/consumer.rs index da87f92..6c5e864 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,7 +1,7 @@ use tokio; -use tracing::{info, error}; +use tracing::info; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError}; +use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig }; #[tokio::main] async fn main() -> Result<(), Box> { diff --git a/src/models/ingress_content.rs b/src/models/ingress_content.rs index 488834b..1b22675 100644 --- a/src/models/ingress_content.rs +++ b/src/models/ingress_content.rs @@ -45,6 +45,7 @@ pub async fn create_ingress_objects( input: IngressInput, redis_client: &RedisClient, ) -> Result, IngressContentError> { + // Initialize list let mut object_list = Vec::new(); if let Some(input_content) = input.content { diff --git a/src/models/ingress_object.rs b/src/models/ingress_object.rs index bd3e542..0317770 100644 --- a/src/models/ingress_object.rs +++ b/src/models/ingress_object.rs @@ -54,7 +54,7 @@ impl IngressObject { } /// Fetches and extracts text from a URL. - async fn fetch_text_from_url(url: &str) -> Result { + async fn fetch_text_from_url(_url: &str) -> Result { unimplemented!() } @@ -74,6 +74,10 @@ impl IngressObject { // TODO: Implement OCR on image using a crate like `tesseract` Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())) } + "application/octet-stream" => { + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } // Handle other MIME types as needed _ => Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())), } diff --git a/src/models/text_content.rs b/src/models/text_content.rs index 02d28ab..115f8e4 100644 --- a/src/models/text_content.rs +++ b/src/models/text_content.rs @@ -1,4 +1,7 @@ +use async_openai::types::{ ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, CreateChatCompletionRequestArgs}; use serde::{Deserialize, Serialize}; +use serde_json::json; +use tracing::info; use crate::models::file_info::FileInfo; use thiserror::Error; @@ -11,6 +14,31 @@ pub struct TextContent { pub category: String, } +#[derive(Debug, Serialize, Deserialize)] +pub struct LLMAnalysis { + pub json_ld: serde_json::Value, + pub description: String, + pub related_category: String, + pub instructions: String, +} + +/// Error types for processing `TextContent`. +#[derive(Error, Debug)] +pub enum ProcessingError { + #[error("LLM processing error: {0}")] + LLMError(String), + + #[error("Graph DB storage error: {0}")] + GraphDBError(String), + + #[error("Vector DB storage error: {0}")] + VectorDBError(String), + + #[error("Unknown processing error")] + Unknown, +} + + impl TextContent { /// Creates a new `TextContent` instance. pub fn new(text: String, file_info: Option, instructions: String, category: String) -> Self { @@ -26,42 +54,101 @@ impl TextContent { pub async fn process(&self) -> Result<(), ProcessingError> { // Step 1: Send to LLM for analysis let analysis = self.send_to_llm().await?; + info!("{:?}", analysis); // Step 2: Store analysis results in Graph DB - self.store_in_graph_db(&analysis).await?; + // self.store_in_graph_db(&analysis).await?; // Step 3: Split text and store in Vector DB - self.store_in_vector_db().await?; + // self.store_in_vector_db().await?; Ok(()) } /// Sends text to an LLM for analysis. async fn send_to_llm(&self) -> Result { - // TODO: Implement interaction with your specific LLM API. - // Example using reqwest: - /* - let client = reqwest::Client::new(); - let response = client.post("http://llm-api/analyze") - .json(&serde_json::json!({ "text": self.text })) - .send() - .await - .map_err(|e| ProcessingError::LLMError(e.to_string()))?; - - if !response.status().is_success() { - return Err(ProcessingError::LLMError(format!("LLM API returned status: {}", response.status()))); + let client = async_openai::Client::new(); + + // Define the JSON Schema for the expected response + let schema = json!({ + "type": "object", + "properties": { + "json_ld": { + "type": "object", + "properties": { + "@context": { "type": "string" }, + "@type": { "type": "string" }, + "name": { "type": "string" } + // Define only the essential properties + }, + "required": ["@context", "@type", "name"], + "additionalProperties": false + }, + "description": { "type": "string" }, + "related_category": { "type": "string" }, + "instructions": { "type": "string" } + }, + "required": ["json_ld", "description", "related_category", "instructions"], + "additionalProperties": false +}); + + let response_format = async_openai::types::ResponseFormat::JsonSchema { + json_schema: async_openai::types::ResponseFormatJsonSchema { + description: Some("Structured analysis of the submitted content".into()), + name: "content_analysis".into(), + schema: Some(schema), + strict: Some(true), + }, + }; + + // Construct the system and user messages + let system_message = format!( + "You are an expert document analyzer. You will receive a document's text content, along with user instructions and a category. Your task is to provide a structured JSON-LD object representing the content, a short description of the document, how it relates to the submitted category, and any relevant instructions." + ); + + let user_message = format!( + "Category: {}\nInstructions: {}\nContent:\n{}", + self.category, self.instructions, self.text + ); + + // Build the chat completion request + let request = CreateChatCompletionRequestArgs::default() + .model("gpt-4o-mini") + .max_tokens(1024u32) + .messages([ + ChatCompletionRequestSystemMessage::from(system_message).into(), + ChatCompletionRequestUserMessage::from(user_message).into(), + ]) + .response_format(response_format) + .build().map_err(|e| ProcessingError::LLMError(e.to_string()))?; + + // Send the request to OpenAI + let response = client.chat().create(request).await.map_err(|e| { + ProcessingError::LLMError(format!("OpenAI API request failed: {}", e.to_string())) + })?; + + info!("{:?}", response); + + // Extract and parse the response + for choice in response.choices { + if let Some(content) = choice.message.content { + let analysis: LLMAnalysis = serde_json::from_str(&content).map_err(|e| { + ProcessingError::LLMError(format!( + "Failed to parse LLM response into LLMAnalysis: {}", + e.to_string() + )) + })?; + return Ok(analysis); + } } - - let analysis: LLMAnalysis = response.json().await - .map_err(|e| ProcessingError::LLMError(e.to_string()))?; - - Ok(analysis) - */ - unimplemented!() + + Err(ProcessingError::LLMError( + "No content found in LLM response".into(), + )) } /// Stores analysis results in a graph database. - async fn store_in_graph_db(&self, analysis: &LLMAnalysis) -> Result<(), ProcessingError> { + async fn store_in_graph_db(&self, _analysis: &LLMAnalysis) -> Result<(), ProcessingError> { // TODO: Implement storage logic for your specific graph database. // Example: /* @@ -85,28 +172,3 @@ impl TextContent { unimplemented!() } } - -/// Represents the analysis results from the LLM. -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct LLMAnalysis { - pub entities: Vec, - pub summary: String, - // Add other fields based on your LLM's output. -} - -/// Error types for processing `TextContent`. -#[derive(Error, Debug)] -pub enum ProcessingError { - #[error("LLM processing error: {0}")] - LLMError(String), - - #[error("Graph DB storage error: {0}")] - GraphDBError(String), - - #[error("Vector DB storage error: {0}")] - VectorDBError(String), - - #[error("Unknown processing error")] - Unknown, -} - diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index 5404a20..8990fd2 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -3,7 +3,7 @@ use lapin::{ }; use futures_lite::stream::StreamExt; -use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject}; +use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject, text_content::TextContent}; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; @@ -108,7 +108,9 @@ impl RabbitMQConsumer { match self.consume().await { Ok((ingress, delivery)) => { info!("Received IngressObject: {:?}", ingress); - + let text_content = ingress.to_text_content().await.unwrap(); + text_content.process().await.unwrap(); + self.ack_delivery(delivery).await?; // Process the IngressContent // match self.handle_ingress_content(&ingress).await { @@ -143,109 +145,9 @@ impl RabbitMQConsumer { Ok(()) } - // /// Processes messages in a loop - // pub async fn process_messages(&self) -> Result<(), RabbitMQError> { - // loop { - // match self.consume().await { - // Ok((ingress, delivery)) => { - // // info!("Received ingress object: {:?}", ingress); - - // // Process the ingress object - // self.handle_ingress_content(&ingress).await; - - // info!("Processing done, acknowledging message"); - // self.ack_delivery(delivery).await?; - // } - // Err(RabbitMQError::ConsumeError(e)) => { - // error!("Error consuming message: {}", e); - // // Optionally add a delay before trying again - // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - // } - // Err(e) => { - // error!("Unexpected error: {}", e); - // break; - // } - // } - // } - - // Ok(()) - // } - pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> { + pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> { info!("Processing IngressContent: {:?}", ingress); - - - // Convert IngressContent to individual TextContent instances - // let text_contents = ingress.to_text_contents().await?; - // info!("Generated {} TextContent instances", text_contents.len()); - - // // Limit concurrent processing (e.g., 10 at a time) - // let semaphore = Arc::new(Semaphore::new(10)); - // let mut processing_futures = FuturesUnordered::new(); - - // for text_content in text_contents { - // let semaphore_clone = semaphore.clone(); - // processing_futures.push(tokio::spawn(async move { - // let _permit = semaphore_clone.acquire().await; - // match text_content.process().await { - // Ok(_) => { - // info!("Successfully processed TextContent"); - // Ok(()) - // } - // Err(e) => { - // error!("Error processing TextContent: {:?}", e); - // Err(e) - // } - // } - // })); - // } - - // // Await all processing tasks - // while let Some(result) = processing_futures.next().await { - // match result { - // Ok(Ok(_)) => { - // // Successfully processed - // } - // Ok(Err(e)) => { - // // Processing failed, already logged - // } - // Err(e) => { - // // Task join error - // error!("Task join error: {:?}", e); - // } - // } - // } - - // Ok(()) unimplemented!() + } } -} - - // /// Handles the IngressContent based on its type - // async fn handle_ingress_content(&self, ingress: &IngressContent) { - // info!("Processing content: {:?}", ingress); - // // There are three different situations: - // // 1. There is no ingress.content but there are one or more files - // // - We should process the files and act based upon the mime type - // // - All different kinds of content return text - // // 2. There is ingress.content but there are no files - // // - We process ingress.content differently if its a URL or Text enum - // // - Return text - // // 3. There is ingress.content and files - // // - We do both - // // - // // At the end of processing we have one or several text objects with some associated - // // metadata, such as the FileInfo metadata, or the Url associated to the text - // // - // // There will always be ingress.instructions and ingress.category - // // - // // When we have all the text objects and metadata, we can begin the next processing - // // Here we will: - // // 1. Send the text content and metadata to a LLM for analyzing - // // - We want several things, JSON_LD metadata, possibly actions - // // 2. Store the JSON_LD in a graph database - // // 3. Split up the text intelligently and store it in a vector database - // // - // // We return the function if all succeeds. - // } - diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 701e689..be6ff9b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1 +1,2 @@ // pub mod mime; +// pub mod llm;