diff --git a/src/bin/server.rs b/src/bin/server.rs index b867e3d..b50f5be 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -1,6 +1,6 @@ use axum::{ extract::DefaultBodyLimit, - routing::{delete, get, post, put}, + routing::{get, post}, Router, }; use std::sync::Arc; @@ -11,11 +11,8 @@ use zettle_db::{ rabbitmq::{consumer::RabbitMQConsumer, publisher::RabbitMQProducer, RabbitMQConfig}, server::{ routes::{ - file::{delete_file_handler, get_file_handler, update_file_handler, upload_handler}, - index::index_handler, - ingress::ingress_handler, - query::query_handler, - queue_length::queue_length_handler, + file::upload_handler, index::index_handler, ingress::ingress_handler, + query::query_handler, queue_length::queue_length_handler, }, AppState, }, @@ -41,7 +38,7 @@ async fn main() -> Result<(), Box> { let app_state = AppState { rabbitmq_producer: Arc::new(RabbitMQProducer::new(&config).await?), - rabbitmq_consumer: Arc::new(RabbitMQConsumer::new(&config).await?), + rabbitmq_consumer: Arc::new(RabbitMQConsumer::new(&config, false).await?), surreal_db_client: Arc::new(SurrealDbClient::new().await?), tera: Arc::new(Tera::new("src/server/templates/**/*.html").unwrap()), }; @@ -68,9 +65,6 @@ fn api_routes_v1() -> Router { // File routes .route("/file", post(upload_handler)) .layer(DefaultBodyLimit::max(1024 * 1024 * 1024)) - .route("/file/:uuid", get(get_file_handler)) - .route("/file/:uuid", put(update_file_handler)) - .route("/file/:uuid", delete(delete_file_handler)) // Query routes .route("/query", post(query_handler)) } diff --git a/src/bin/worker.rs b/src/bin/worker.rs index a6c8ca0..48f2cd8 100644 --- a/src/bin/worker.rs +++ b/src/bin/worker.rs @@ -1,6 +1,6 @@ use tracing::info; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig }; +use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { .ok(); info!("Starting RabbitMQ consumer"); - + // Set up RabbitMQ config let config = RabbitMQConfig { amqp_addr: "amqp://localhost".to_string(), @@ -21,11 +21,8 @@ async fn main() -> Result<(), Box> { routing_key: "my_key".to_string(), }; - // Create a RabbitMQ consumer - let consumer = RabbitMQConsumer::new(&config).await?; - - info!("Consumer connected to RabbitMQ"); + let consumer = RabbitMQConsumer::new(&config, true).await?; // Start consuming messages consumer.process_messages().await?; diff --git a/src/error.rs b/src/error.rs index ae06928..e0b9160 100644 --- a/src/error.rs +++ b/src/error.rs @@ -4,7 +4,10 @@ use serde_json::json; use thiserror::Error; use tokio::task::JoinError; -use crate::{ingress::types::ingress_input::IngressContentError, rabbitmq::RabbitMQError}; +use crate::{ + ingress::types::ingress_input::IngressContentError, rabbitmq::RabbitMQError, + storage::types::file_info::FileError, +}; #[derive(Error, Debug)] pub enum ProcessingError { @@ -55,6 +58,8 @@ pub enum ApiError { RabbitMQError(#[from] RabbitMQError), #[error("LLM processing error: {0}")] OpenAIerror(#[from] OpenAIError), + #[error("File error: {0}")] + FileError(#[from] FileError), } impl IntoResponse for ApiError { @@ -69,6 +74,7 @@ impl IntoResponse for ApiError { (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()) } ApiError::RabbitMQError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), + ApiError::FileError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()), }; ( diff --git a/src/ingress/types/ingress_input.rs b/src/ingress/types/ingress_input.rs index 3a32d2e..49ba4ff 100644 --- a/src/ingress/types/ingress_input.rs +++ b/src/ingress/types/ingress_input.rs @@ -83,16 +83,16 @@ pub async fn create_ingress_objects( } // Look up FileInfo objects using the db and the submitted uuids in input.files - if let Some(file_uuids) = input.files { - for uuid in file_uuids { - if let Some(file_info) = get_item::(&db_client, &uuid).await? { + if let Some(file_ids) = input.files { + for id in file_ids { + if let Some(file_info) = get_item::(db_client, &id).await? { object_list.push(IngressObject::File { file_info, instructions: input.instructions.clone(), category: input.category.clone(), }); } else { - info!("No file with UUID: {}", uuid); + info!("No file with id: {}", id); } } } diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index dd3e29c..6659718 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -13,7 +13,7 @@ use tracing::{error, info}; pub struct RabbitMQConsumer { common: RabbitMQCommon, pub queue: Queue, - consumer: Consumer, + consumer: Option, } impl RabbitMQConsumer { @@ -22,10 +22,14 @@ impl RabbitMQConsumer { /// /// # Arguments /// * `config` - A initialized RabbitMQConfig containing required configurations + /// * `start_consuming` - Set to true to start consuming messages /// /// # Returns /// * `Result` - The created client or an error. - pub async fn new(config: &RabbitMQConfig) -> Result { + pub async fn new( + config: &RabbitMQConfig, + start_consuming: bool, + ) -> Result { let common = RabbitMQCommon::new(config).await?; // Passively declare the exchange (it should already exist) @@ -36,7 +40,11 @@ impl RabbitMQConsumer { Self::bind_queue(&common.channel, &config.exchange, &queue, config).await?; // Initialize the consumer - let consumer = Self::initialize_consumer(&common.channel, config).await?; + let consumer = if start_consuming { + Some(Self::initialize_consumer(&common.channel, config).await?) + } else { + None + }; Ok(Self { common, @@ -67,6 +75,27 @@ impl RabbitMQConsumer { .await .map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string())) } + + /// Operation to get the current queue length + /// Will redeclare queue to get a updated number + pub async fn get_queue_length(&self) -> Result { + let queue_info = self + .common + .channel + .queue_declare( + &self.queue.name().to_string(), + QueueDeclareOptions { + durable: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| RabbitMQError::QueueError(e.to_string()))?; + + Ok(queue_info.message_count()) + } + /// Declares the queue based on the channel and `RabbitMQConfig`. /// # Arguments /// * `channel` - Lapin Channel. @@ -127,9 +156,15 @@ impl RabbitMQConsumer { /// `IngressObject` - The object containing content and metadata. /// `Delivery` - A delivery reciept, required to ack or nack the delivery. pub async fn consume(&self) -> Result<(IngressObject, Delivery), RabbitMQError> { + // Get consumer or return error if not initialized + let consumer: &lapin::Consumer = self.consumer.as_ref().ok_or_else(|| { + RabbitMQError::ConsumeError( + "Consumer not initialized. Call new() with start_consuming=true".to_string(), + ) + })?; + // Receive the next message - let delivery = self - .consumer + let delivery = consumer .clone() .next() .await diff --git a/src/server/routes/file.rs b/src/server/routes/file.rs index 827d827..8b6fb15 100644 --- a/src/server/routes/file.rs +++ b/src/server/routes/file.rs @@ -1,21 +1,13 @@ -use crate::{ - server::AppState, - storage::types::file_info::{FileError, FileInfo}, -}; -use axum::{ - extract::{Path, State}, - response::IntoResponse, - Json, -}; +use crate::{error::ApiError, server::AppState, storage::types::file_info::FileInfo}; +use axum::{extract::State, response::IntoResponse, Json}; use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart}; use serde_json::json; use tempfile::NamedTempFile; use tracing::info; -use uuid::Uuid; #[derive(Debug, TryFromMultipart)] pub struct FileUploadRequest { - #[form_data(limit = "100000")] // Example limit: ~100 KB + #[form_data(limit = "1000000")] // Example limit: ~1000 KB pub file: FieldData, } @@ -25,7 +17,7 @@ pub struct FileUploadRequest { pub async fn upload_handler( State(state): State, TypedMultipart(input): TypedMultipart, -) -> Result { +) -> Result { info!("Received an upload request"); // Process the file upload @@ -33,7 +25,7 @@ pub async fn upload_handler( // Prepare the response JSON let response = json!({ - "uuid": file_info.uuid, + "id": file_info.id, "sha256": file_info.sha256, "path": file_info.path, "mime_type": file_info.mime_type, @@ -44,82 +36,3 @@ pub async fn upload_handler( // Return the response with HTTP 200 Ok((axum::http::StatusCode::OK, Json(response))) } - -/// Handler to retrieve file information by UUID. -/// -/// Route: GET /file/:uuid -pub async fn get_file_handler( - State(state): State, - Path(uuid_str): Path, -) -> Result { - // Parse UUID - let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - - // Retrieve FileInfo - let file_info = FileInfo::get_by_uuid(uuid, &state.surreal_db_client).await?; - - // Prepare the response JSON - let response = json!({ - "uuid": file_info.uuid, - "sha256": file_info.sha256, - "path": file_info.path, - "mime_type": file_info.mime_type, - }); - - info!("Retrieved FileInfo: {:?}", file_info); - - // Return the response with HTTP 200 - Ok((axum::http::StatusCode::OK, Json(response))) -} - -/// Handler to update an existing file by UUID. -/// -/// Route: PUT /file/:uuid -pub async fn update_file_handler( - State(state): State, - Path(uuid_str): Path, - TypedMultipart(input): TypedMultipart, -) -> Result { - // Parse UUID - let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - - // Update the file - let updated_file_info = FileInfo::update(uuid, input.file, &state.surreal_db_client).await?; - - // Prepare the response JSON - let response = json!({ - "uuid": updated_file_info.uuid, - "sha256": updated_file_info.sha256, - "path": updated_file_info.path, - "mime_type": updated_file_info.mime_type, - }); - - info!("File updated successfully: {:?}", updated_file_info); - - // Return the response with HTTP 200 - Ok((axum::http::StatusCode::OK, Json(response))) -} - -/// Handler to delete a file by UUID. -/// -/// Route: DELETE /file/:uuid -pub async fn delete_file_handler( - State(state): State, - Path(uuid_str): Path, -) -> Result { - // Parse UUID - let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?; - - // Delete the file - FileInfo::delete(uuid, &state.surreal_db_client).await?; - - info!("Deleted file with UUID: {}", uuid); - - // Prepare the response JSON - let response = json!({ - "message": "File deleted successfully", - }); - - // Return the response with HTTP 204 No Content - Ok((axum::http::StatusCode::NO_CONTENT, Json(response))) -} diff --git a/src/server/routes/index.rs b/src/server/routes/index.rs index 185c7da..907081d 100644 --- a/src/server/routes/index.rs +++ b/src/server/routes/index.rs @@ -8,7 +8,6 @@ use crate::{error::ApiError, server::AppState}; pub async fn index_handler(State(state): State) -> Result, ApiError> { info!("Displaying index page"); - // Now you can access the consumer directly from the state let queue_length = state.rabbitmq_consumer.queue.message_count(); let output = state diff --git a/src/server/routes/queue_length.rs b/src/server/routes/queue_length.rs index d8c5731..15a4db0 100644 --- a/src/server/routes/queue_length.rs +++ b/src/server/routes/queue_length.rs @@ -1,42 +1,17 @@ -use axum::{ - http::StatusCode, - response::{IntoResponse, Response}, -}; -use tracing::{error, info}; +use axum::{extract::State, http::StatusCode, response::IntoResponse}; +use tracing::info; -use crate::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}; +use crate::{error::ApiError, server::AppState}; -pub async fn queue_length_handler() -> Response { +pub async fn queue_length_handler( + State(state): State, +) -> Result { info!("Getting queue length"); - // Set up RabbitMQ config - let config = RabbitMQConfig { - amqp_addr: "amqp://localhost".to_string(), - exchange: "my_exchange".to_string(), - queue: "my_queue".to_string(), - routing_key: "my_key".to_string(), - }; + let queue_length = state.rabbitmq_consumer.get_queue_length().await?; - // Create a new consumer - match RabbitMQConsumer::new(&config).await { - Ok(consumer) => { - info!("Consumer connected to RabbitMQ"); + info!("Queue length: {}", queue_length); - // Get the queue length - let queue_length = consumer.queue.message_count(); - - info!("Queue length: {}", queue_length); - - // Return the queue length with a 200 OK status - (StatusCode::OK, queue_length.to_string()).into_response() - } - Err(e) => { - error!("Failed to create consumer: {:?}", e); - ( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to connect to RabbitMQ".to_string(), - ) - .into_response() - } - } + // Return the queue length with a 200 OK status + Ok((StatusCode::OK, queue_length.to_string())) } diff --git a/src/storage/db.rs b/src/storage/db.rs index 911059a..548140a 100644 --- a/src/storage/db.rs +++ b/src/storage/db.rs @@ -111,5 +111,5 @@ pub async fn get_item(db_client: &Surreal, id: &str) -> Result