diff --git a/Cargo.lock b/Cargo.lock index 0296e88..15ae3bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -857,6 +857,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.30" @@ -864,6 +879,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -872,6 +888,17 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +[[package]] +name = "futures-executor" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.30" @@ -935,10 +962,13 @@ version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ + "futures-channel", "futures-core", + "futures-io", "futures-macro", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -2810,6 +2840,7 @@ dependencies = [ "axum", "axum_typed_multipart", "bytes", + "futures", "futures-lite 2.3.0", "lapin", "mime", diff --git a/Cargo.toml b/Cargo.toml index 941e29d..ab8cd6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" axum = { version = "0.7.5", features = ["multipart", "macros"] } axum_typed_multipart = "0.12.1" bytes = { version = "1.7.2", features = ["serde"] } +futures = "0.3.30" futures-lite = "2.3.0" lapin = { version = "2.5.0", features = ["serde_json"] } mime = "0.3.17" diff --git a/src/models/ingress.rs b/src/models/ingress.rs deleted file mode 100644 index d0c125c..0000000 --- a/src/models/ingress.rs +++ /dev/null @@ -1,94 +0,0 @@ -use std::hash::Hash; - -use serde::{Deserialize, Serialize}; -use thiserror::Error; -use tracing::info; -use url::Url; -use uuid::Uuid; -use crate::redis::client::RedisClient; - -use super::file_info::FileInfo; - -#[derive(Debug, Deserialize, Serialize)] -pub enum Content { - Url(String), - Text(String), -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct IngressInput { - pub content: Option, - pub instructions: String, - pub category: String, - pub files: Option>, -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct IngressContent { - pub content: Option, - pub instructions: String, - pub category: String, - pub files: Option>, -} - -/// Error types for file and content handling. -#[derive(Error, Debug)] -pub enum IngressContentError { - #[error("IO error occurred: {0}")] - Io(#[from] std::io::Error), - - #[error("UTF-8 conversion error: {0}")] - Utf8(#[from] std::string::FromUtf8Error), - - #[error("MIME type detection failed for input: {0}")] - MimeDetection(String), - - #[error("Unsupported MIME type: {0}")] - UnsupportedMime(String), - - #[error("URL parse error: {0}")] - UrlParse(#[from] url::ParseError), -} - -impl IngressContent { - /// Create a new `IngressContent` from `IngressInput`. - pub async fn new( - input: IngressInput, - redis_client: &RedisClient, // Add RedisClient as a parameter - ) -> Result { - let content = if let Some(input_content) = input.content { - // Check if the content is a URL - if let Ok(url) = Url::parse(&input_content) { - info!("Detected URL: {}", url); - Some(Content::Url(url.to_string())) - } else { - info!("Treating input as plain text"); - Some(Content::Text(input_content)) - } - } else { - None - }; - - // Fetch file information if file UUIDs are provided - let files = if let Some(file_uuids) = input.files { - let mut file_info_list = Vec::new(); - for uuid_str in file_uuids { - let uuid = Uuid::parse_str(&uuid_str).map_err(|_| IngressContentError::UnsupportedMime("Invalid UUID".into()))?; - match FileInfo::get(uuid, redis_client).await { - Ok(file_info) => file_info_list.push(file_info), - Err(_) => info!("No file with that uuid"), - } - } - Some(file_info_list) - } else { - None - }; - - Ok(IngressContent { - content, - instructions: input.instructions, - category: input.category, - files, - }) - } -} diff --git a/src/models/ingress_content.rs b/src/models/ingress_content.rs new file mode 100644 index 0000000..488834b --- /dev/null +++ b/src/models/ingress_content.rs @@ -0,0 +1,97 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::info; +use url::Url; +use uuid::Uuid; +use crate::redis::client::RedisClient; + +use super::{file_info::FileInfo, ingress_object::IngressObject }; + +#[derive(Serialize, Deserialize, Debug)] +pub struct IngressInput { + pub content: Option, + pub instructions: String, + pub category: String, + pub files: Option>, +} + +/// Error types for processing ingress content. +#[derive(Error, Debug)] +pub enum IngressContentError { + #[error("IO error occurred: {0}")] + Io(#[from] std::io::Error), + + #[error("UTF-8 conversion error: {0}")] + Utf8(#[from] std::string::FromUtf8Error), + + #[error("MIME type detection failed for input: {0}")] + MimeDetection(String), + + #[error("Unsupported MIME type: {0}")] + UnsupportedMime(String), + + #[error("URL parse error: {0}")] + UrlParse(#[from] url::ParseError), + + #[error("UUID parse error: {0}")] + UuidParse(#[from] uuid::Error), + + #[error("Redis error: {0}")] + RedisError(String), +} + +/// Function to create ingress objects from input. +pub async fn create_ingress_objects( + input: IngressInput, + redis_client: &RedisClient, +) -> Result, IngressContentError> { + let mut object_list = Vec::new(); + + if let Some(input_content) = input.content { + match Url::parse(&input_content) { + Ok(url) => { + info!("Detected URL: {}", url); + object_list.push(IngressObject::Url { + url: url.to_string(), + instructions: input.instructions.clone(), + category: input.category.clone(), + }); + } + Err(_) => { + info!("Treating input as plain text"); + object_list.push(IngressObject::Text { + text: input_content.to_string(), + instructions: input.instructions.clone(), + category: input.category.clone(), + }); + } + } + } + + if let Some(file_uuids) = input.files { + for uuid_str in file_uuids { + let uuid = Uuid::parse_str(&uuid_str)?; + match FileInfo::get(uuid, redis_client).await { + Ok(file_info) => { + object_list.push(IngressObject::File { + file_info, + instructions: input.instructions.clone(), + category: input.category.clone(), + }); + } + Err(_) => { + info!("No file with UUID: {}", uuid); + // Optionally, you can collect errors or continue silently + } + } + } + } + + if object_list.is_empty() { + return Err(IngressContentError::MimeDetection( + "No valid content or files provided".into(), + )); + } + + Ok(object_list) +} diff --git a/src/models/ingress_object.rs b/src/models/ingress_object.rs new file mode 100644 index 0000000..bd3e542 --- /dev/null +++ b/src/models/ingress_object.rs @@ -0,0 +1,82 @@ +use crate::models::file_info::FileInfo; +use serde::{Deserialize, Serialize}; + +use super::{ingress_content::IngressContentError, text_content::TextContent}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum IngressObject { + Url { + url: String, + instructions: String, + category: String, + }, + Text { + text: String, + instructions: String, + category: String, + }, + File { + file_info: FileInfo, + instructions: String, + category: String, + }, +} +impl IngressObject { + pub async fn to_text_content(&self) -> Result { + match self { + IngressObject::Url { url, instructions, category } => { + let text = Self::fetch_text_from_url(url).await?; + Ok(TextContent { + text, + instructions: instructions.clone(), + category: category.clone(), + file_info: None, + }) + }, + IngressObject::Text { text, instructions, category } => { + Ok(TextContent { + text: text.clone(), + instructions: instructions.clone(), + category: category.clone(), + file_info: None, + }) + }, + IngressObject::File { file_info, instructions, category } => { + let text = Self::extract_text_from_file(file_info).await?; + Ok(TextContent { + text, + instructions: instructions.clone(), + category: category.clone(), + file_info: Some(file_info.clone()), + }) + }, + } + } + + /// Fetches and extracts text from a URL. + async fn fetch_text_from_url(url: &str) -> Result { + unimplemented!() + } + + /// Extracts text from a file based on its MIME type. + async fn extract_text_from_file(file_info: &FileInfo) -> Result { + match file_info.mime_type.as_str() { + "text/plain" => { + // Read the file and return its content + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } + "application/pdf" => { + // TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf` + Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())) + } + "image/png" | "image/jpeg" => { + // TODO: Implement OCR on image using a crate like `tesseract` + Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())) + } + // Handle other MIME types as needed + _ => Err(IngressContentError::UnsupportedMime(file_info.mime_type.clone())), + } + } +} + diff --git a/src/models/mod.rs b/src/models/mod.rs index f83e89b..5fbb9bc 100644 --- a/src/models/mod.rs +++ b/src/models/mod.rs @@ -1,2 +1,4 @@ pub mod file_info; -pub mod ingress; +pub mod ingress_content; +pub mod ingress_object; +pub mod text_content; diff --git a/src/models/text_content.rs b/src/models/text_content.rs new file mode 100644 index 0000000..02d28ab --- /dev/null +++ b/src/models/text_content.rs @@ -0,0 +1,112 @@ +use serde::{Deserialize, Serialize}; +use crate::models::file_info::FileInfo; +use thiserror::Error; + +/// Represents a single piece of text content extracted from various sources. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct TextContent { + pub text: String, + pub file_info: Option, + pub instructions: String, + pub category: String, +} + +impl TextContent { + /// Creates a new `TextContent` instance. + pub fn new(text: String, file_info: Option, instructions: String, category: String) -> Self { + Self { + text, + file_info, + instructions, + category, + } + } + + /// Processes the `TextContent` by sending it to an LLM, storing in a graph DB, and vector DB. + pub async fn process(&self) -> Result<(), ProcessingError> { + // Step 1: Send to LLM for analysis + let analysis = self.send_to_llm().await?; + + // Step 2: Store analysis results in Graph DB + self.store_in_graph_db(&analysis).await?; + + // Step 3: Split text and store in Vector DB + self.store_in_vector_db().await?; + + Ok(()) + } + + /// Sends text to an LLM for analysis. + async fn send_to_llm(&self) -> Result { + // TODO: Implement interaction with your specific LLM API. + // Example using reqwest: + /* + let client = reqwest::Client::new(); + let response = client.post("http://llm-api/analyze") + .json(&serde_json::json!({ "text": self.text })) + .send() + .await + .map_err(|e| ProcessingError::LLMError(e.to_string()))?; + + if !response.status().is_success() { + return Err(ProcessingError::LLMError(format!("LLM API returned status: {}", response.status()))); + } + + let analysis: LLMAnalysis = response.json().await + .map_err(|e| ProcessingError::LLMError(e.to_string()))?; + + Ok(analysis) + */ + unimplemented!() + } + + /// Stores analysis results in a graph database. + async fn store_in_graph_db(&self, analysis: &LLMAnalysis) -> Result<(), ProcessingError> { + // TODO: Implement storage logic for your specific graph database. + // Example: + /* + let graph_db = GraphDB::new("http://graph-db:8080"); + graph_db.insert_analysis(analysis).await.map_err(|e| ProcessingError::GraphDBError(e.to_string()))?; + */ + unimplemented!() + } + + /// Splits text and stores it in a vector database. + async fn store_in_vector_db(&self) -> Result<(), ProcessingError> { + // TODO: Implement text splitting and vector storage logic. + // Example: + /* + let chunks = text_splitter::split(&self.text); + let vector_db = VectorDB::new("http://vector-db:5000"); + for chunk in chunks { + vector_db.insert(chunk).await.map_err(|e| ProcessingError::VectorDBError(e.to_string()))?; + } + */ + unimplemented!() + } +} + +/// Represents the analysis results from the LLM. +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct LLMAnalysis { + pub entities: Vec, + pub summary: String, + // Add other fields based on your LLM's output. +} + +/// Error types for processing `TextContent`. +#[derive(Error, Debug)] +pub enum ProcessingError { + #[error("LLM processing error: {0}")] + LLMError(String), + + #[error("Graph DB storage error: {0}")] + GraphDBError(String), + + #[error("Vector DB storage error: {0}")] + VectorDBError(String), + + #[error("Unknown processing error")] + Unknown, +} + diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index 2a7bd60..5404a20 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -2,7 +2,8 @@ use lapin::{ message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue }; use futures_lite::stream::StreamExt; -use crate::models::ingress::IngressContent; + +use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject}; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; @@ -80,14 +81,14 @@ impl RabbitMQConsumer { } /// Consumes a message and returns the deserialized IngressContent along with the Delivery - pub async fn consume(&self) -> Result<(IngressContent, Delivery), RabbitMQError> { + pub async fn consume(&self) -> Result<(IngressObject, Delivery), RabbitMQError> { // Receive the next message let delivery = self.consumer.clone().next().await .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; // Deserialize the message payload into IngressContent - let ingress: IngressContent = serde_json::from_slice(&delivery.data) + let ingress: IngressObject = serde_json::from_slice(&delivery.data) .map_err(|e| RabbitMQError::ConsumeError(format!("Deserialization Error: {}", e)))?; Ok((ingress, delivery)) @@ -102,19 +103,30 @@ impl RabbitMQConsumer { Ok(()) } - - /// Processes messages in a loop pub async fn process_messages(&self) -> Result<(), RabbitMQError> { loop { match self.consume().await { Ok((ingress, delivery)) => { - // info!("Received ingress object: {:?}", ingress); - - // Process the ingress object - self.handle_ingress_content(&ingress).await; + info!("Received IngressObject: {:?}", ingress); - info!("Processing done, acknowledging message"); self.ack_delivery(delivery).await?; + // Process the IngressContent + // match self.handle_ingress_content(&ingress).await { + // Ok(_) => { + // info!("Successfully handled IngressContent"); + // // Acknowledge the message + // if let Err(e) = self.ack_delivery(delivery).await { + // error!("Failed to acknowledge message: {:?}", e); + // } + // }, + // Err(e) => { + // error!("Failed to handle IngressContent: {:?}", e); + // // For now, we'll acknowledge to remove it from the queue. Change to nack? + // if let Err(ack_err) = self.ack_delivery(delivery).await { + // error!("Failed to acknowledge message after handling error: {:?}", ack_err); + // } + // } + // } } Err(RabbitMQError::ConsumeError(e)) => { error!("Error consuming message: {}", e); @@ -131,8 +143,109 @@ impl RabbitMQConsumer { Ok(()) } - /// Handles the IngressContent based on its type - async fn handle_ingress_content(&self, ingress: &IngressContent) { - info!("Processing content: {:?}", ingress); - } + // /// Processes messages in a loop + // pub async fn process_messages(&self) -> Result<(), RabbitMQError> { + // loop { + // match self.consume().await { + // Ok((ingress, delivery)) => { + // // info!("Received ingress object: {:?}", ingress); + + // // Process the ingress object + // self.handle_ingress_content(&ingress).await; + + // info!("Processing done, acknowledging message"); + // self.ack_delivery(delivery).await?; + // } + // Err(RabbitMQError::ConsumeError(e)) => { + // error!("Error consuming message: {}", e); + // // Optionally add a delay before trying again + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + // } + // Err(e) => { + // error!("Unexpected error: {}", e); + // break; + // } + // } + // } + + // Ok(()) + // } + pub async fn handle_ingress_content(&self, ingress: &IngressObject) -> Result<(), IngressContentError> { + info!("Processing IngressContent: {:?}", ingress); + + + + // Convert IngressContent to individual TextContent instances + // let text_contents = ingress.to_text_contents().await?; + // info!("Generated {} TextContent instances", text_contents.len()); + + // // Limit concurrent processing (e.g., 10 at a time) + // let semaphore = Arc::new(Semaphore::new(10)); + // let mut processing_futures = FuturesUnordered::new(); + + // for text_content in text_contents { + // let semaphore_clone = semaphore.clone(); + // processing_futures.push(tokio::spawn(async move { + // let _permit = semaphore_clone.acquire().await; + // match text_content.process().await { + // Ok(_) => { + // info!("Successfully processed TextContent"); + // Ok(()) + // } + // Err(e) => { + // error!("Error processing TextContent: {:?}", e); + // Err(e) + // } + // } + // })); + // } + + // // Await all processing tasks + // while let Some(result) = processing_futures.next().await { + // match result { + // Ok(Ok(_)) => { + // // Successfully processed + // } + // Ok(Err(e)) => { + // // Processing failed, already logged + // } + // Err(e) => { + // // Task join error + // error!("Task join error: {:?}", e); + // } + // } + // } + + // Ok(()) + unimplemented!() } +} + + // /// Handles the IngressContent based on its type + // async fn handle_ingress_content(&self, ingress: &IngressContent) { + // info!("Processing content: {:?}", ingress); + // // There are three different situations: + // // 1. There is no ingress.content but there are one or more files + // // - We should process the files and act based upon the mime type + // // - All different kinds of content return text + // // 2. There is ingress.content but there are no files + // // - We process ingress.content differently if its a URL or Text enum + // // - Return text + // // 3. There is ingress.content and files + // // - We do both + // // + // // At the end of processing we have one or several text objects with some associated + // // metadata, such as the FileInfo metadata, or the Url associated to the text + // // + // // There will always be ingress.instructions and ingress.category + // // + // // When we have all the text objects and metadata, we can begin the next processing + // // Here we will: + // // 1. Send the text content and metadata to a LLM for analyzing + // // - We want several things, JSON_LD metadata, possibly actions + // // 2. Store the JSON_LD in a graph database + // // 3. Split up the text intelligently and store it in a vector database + // // + // // We return the function if all succeeds. + // } + diff --git a/src/rabbitmq/publisher.rs b/src/rabbitmq/publisher.rs index 70033a2..d7dec5c 100644 --- a/src/rabbitmq/publisher.rs +++ b/src/rabbitmq/publisher.rs @@ -1,7 +1,8 @@ use lapin::{ - options::*, publisher_confirm::Confirmation, BasicProperties, + options::*, publisher_confirm::Confirmation, BasicProperties, }; -use crate::models::ingress::IngressContent; + +use crate::models::ingress_object::IngressObject; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; use tracing::{info, error}; @@ -16,7 +17,7 @@ impl RabbitMQProducer { pub async fn new(config: &RabbitMQConfig) -> Result { let common = RabbitMQCommon::new(config).await?; common.declare_exchange(config, false).await?; - + Ok(Self { common, exchange_name: config.exchange.clone(), @@ -24,10 +25,10 @@ impl RabbitMQProducer { }) } - /// Publishes an IngressContent object to RabbitMQ after serializing it to JSON. - pub async fn publish(&self, ingress: &IngressContent) -> Result { - // Serialize IngressContent to JSON - let payload = serde_json::to_vec(ingress) + /// Publishes an IngressObject to RabbitMQ after serializing it to JSON. + pub async fn publish(&self, ingress_object: &IngressObject) -> Result { + // Serialize IngressObject to JSON + let payload = serde_json::to_vec(ingress_object) .map_err(|e| { error!("Serialization Error: {}", e); RabbitMQError::PublishError(format!("Serialization Error: {}", e)) @@ -53,7 +54,7 @@ impl RabbitMQProducer { RabbitMQError::PublishError(format!("Publish Confirmation Error: {}", e)) })?; - info!("Published message to exchange '{}' with routing key '{}'", self.exchange_name, self.routing_key); + info!("Published IngressObject to exchange '{}' with routing key '{}'", self.exchange_name, self.routing_key); Ok(confirmation) } diff --git a/src/routes/ingress.rs b/src/routes/ingress.rs index d0fcd74..686d2b9 100644 --- a/src/routes/ingress.rs +++ b/src/routes/ingress.rs @@ -1,34 +1,42 @@ use std::sync::Arc; - use axum::{http::StatusCode, response::IntoResponse, Extension, Json}; use tracing::{error, info}; - -use crate::{models::ingress::{IngressContent, IngressInput}, rabbitmq::publisher::RabbitMQProducer, redis::client::RedisClient}; +use crate::{models::ingress_content::{create_ingress_objects, IngressInput}, rabbitmq::publisher::RabbitMQProducer, redis::client::RedisClient}; pub async fn ingress_handler( Extension(producer): Extension>, Json(input): Json, ) -> impl IntoResponse { - info!("Recieved input: {:?}", input); + info!("Received input: {:?}", input); let redis_client = RedisClient::new("redis://127.0.0.1/"); - if let Ok(content) = IngressContent::new(input, &redis_client).await { - // Publish content to RabbitMQ (or other system) - match producer.publish(&content).await { - Ok(_) => { - info!("Message published successfully"); - "Successfully processed".to_string().into_response() + match create_ingress_objects(input, &redis_client).await { + Ok(objects) => { + for object in objects { + match producer.publish(&object).await { + Ok(_) => { + info!("Message published successfully"); + } + Err(e) => { + error!("Failed to publish message: {:?}", e); + return ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to publish message", + ) + .into_response(); + } + } + } + StatusCode::OK.into_response() } Err(e) => { - error!("Failed to publish message: {:?}", e); - (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() + error!("Failed to process input: {:?}", e); + ( + StatusCode::INTERNAL_SERVER_ERROR, + "Failed to process input", + ) + .into_response() } } - } - else { - error!("Failed to create IngressContent object" ); - (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create object").into_response() - - } }