From 7fbfa0fdbcbc3bf2dbae028779f5fc4baa095baa Mon Sep 17 00:00:00 2001 From: Per Stark Date: Fri, 20 Sep 2024 22:57:47 +0200 Subject: [PATCH] working on a queue length --- src/consumer.rs | 81 ++----------------- src/rabbitmq/consumer.rs | 13 ++- src/rabbitmq/mod.rs | 5 +- src/server.rs | 169 ++++++++------------------------------- 4 files changed, 52 insertions(+), 216 deletions(-) diff --git a/src/consumer.rs b/src/consumer.rs index 030d245..0d2ef69 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,76 +1,3 @@ -// use lapin::{ -// options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result, -// }; -// use tracing::{info, error}; -// use futures_lite::stream::StreamExt; - -// pub struct RabbitMQConsumer { -// pub connection: Connection, -// pub consumer: Consumer, -// } - -// impl RabbitMQConsumer { -// pub async fn new(addr: &str, queue: &str) -> Result { -// let connection = Connection::connect(addr, ConnectionProperties::default()).await?; -// let channel = connection.create_channel().await?; - -// // Declare the queue (in case it doesn't exist) -// channel -// .queue_declare( -// queue, -// QueueDeclareOptions::default(), -// FieldTable::default(), -// ) -// .await?; - -// let consumer = channel -// .basic_consume( -// queue, -// "consumer", -// BasicConsumeOptions::default(), -// FieldTable::default(), -// ) -// .await?; - -// Ok(Self { connection, consumer }) -// } - -// pub async fn run(&mut self) -> Result<()> { -// info!("Consumer started - waiting for messages"); -// while let Some(delivery) = self.consumer.next().await { -// match delivery { -// Ok(delivery) => { -// let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8"); -// info!("Received message: {}", message); - -// // Process the message here -// // For example, you could deserialize it and perform some action - -// delivery.ack(BasicAckOptions::default()).await?; -// }, -// Err(e) => error!("Failed to consume message: {:?}", e), -// } -// } - -// Ok(()) -// } -// } - -// #[tokio::main] -// async fn main() -> Result<()> { -// // Set up tracing -// tracing_subscriber::fmt::init(); - -// let addr = "amqp://guest:guest@localhost:5672"; -// let queue = "hello"; - -// let mut consumer = RabbitMQConsumer::new(addr, queue).await?; - -// info!("Starting consumer"); -// consumer.run().await?; - -// Ok(()) -// } use tokio; use tracing::{info, error}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -86,7 +13,8 @@ async fn main() -> Result<(), Box> { .ok(); info!("Starting RabbitMQ consumer"); -// Set up RabbitMQ config + + // Set up RabbitMQ config let config = RabbitMQConfig { amqp_addr: "amqp://localhost".to_string(), exchange: "my_exchange".to_string(), @@ -103,11 +31,14 @@ async fn main() -> Result<(), Box> { // Start consuming messages loop { match consumer.consume().await { - Ok(message) => { + Ok((message, delivery)) => { info!("Received message: {}", message); // Process the message here // For example, you could insert it into a database // process_message(&message).await?; + + info!("Done processing, acking"); + consumer.ack_delivery(delivery).await? } Err(RabbitMQError::ConsumeError(e)) => { error!("Error consuming message: {}", e); diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index a0fae2f..4ff9874 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -1,13 +1,14 @@ use lapin::{ - options::*, types::FieldTable, Channel, Consumer, Queue + message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue }; use futures_lite::stream::StreamExt; +use tracing::info; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; pub struct RabbitMQConsumer { common: RabbitMQCommon, - queue: Queue, + pub queue: Queue, consumer: Consumer, } @@ -66,19 +67,23 @@ impl RabbitMQConsumer { .map_err(|e| RabbitMQError::QueueError(e.to_string())) } - pub async fn consume(&self) -> Result { + pub async fn consume(&self) -> Result<(String, Delivery), RabbitMQError> { let delivery = self.consumer.clone().next().await .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; let message = String::from_utf8_lossy(&delivery.data).to_string(); + Ok((message, delivery)) + } + + pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> { self.common.channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .await .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - Ok(message) + Ok(()) } } diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index c2d042b..3959fe7 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -2,9 +2,7 @@ pub mod producer; pub mod consumer; use lapin::{ - Connection, ConnectionProperties, Channel, ExchangeKind, - options::ExchangeDeclareOptions, - types::FieldTable, + options::{ExchangeDeclareOptions, QueueDeclareOptions}, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind, Queue }; use thiserror::Error; @@ -28,6 +26,7 @@ pub enum RabbitMQError { QueueError(String), } +#[derive(Clone)] pub struct RabbitMQConfig { pub amqp_addr: String, pub exchange: String, diff --git a/src/server.rs b/src/server.rs index 486a66c..21c4c82 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,140 +1,9 @@ -// use axum::{ -// routing::post, -// Router, -// response::{IntoResponse, Response}, -// Error, Extension, Json, -// }; -// use serde::Deserialize; -// use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - -// #[derive(Deserialize)] -// struct IngressPayload { -// payload: String, -// } - -// use tracing::{info, error}; -// use zettle_db::rabbitmq::RabbitMQClient; - -// async fn ingress_handler( -// // Extension(rabbitmq): Extension, -// Json(payload): Json -// ) -> Response { -// info!("Received payload: {:?}", payload.payload); -// let rabbitmqclient = RabbitMQClient::new("127.0.0.1").await; -// match rabbitmq.publish(&payload.payload).await { -// Ok(_) => { -// info!("Message published successfully"); -// "thank you".to_string().into_response() -// }, -// Err(e) => { -// error!("Failed to publish message: {:?}", e); -// (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() -// } -// } -// } - -// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] -// async fn main() -> Result<(), Error> { -// // Set up tracing -// tracing_subscriber::registry() -// .with(fmt::layer()) -// .with(EnvFilter::from_default_env()) -// .try_init() -// .ok(); - -// // Set up RabbitMQ -// // let rabbitmq = RabbitMQ::new("amqprs.examples.basic2", "amq.topic", "amqprs.example2").await; - -// // Create Axum router -// let app = Router::new() -// .route("/ingress", post(ingress_handler)); // .layer(Extension(rabbitmq)); - -// tracing::info!("Listening on 0.0.0.0:3000"); -// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); -// axum::serve(listener, app).await.unwrap(); - -// Ok(()) -// } -// use axum::{ -// routing::post, -// Router, -// response::{IntoResponse, Response}, -// Error, Extension, Json, -// }; -// use serde::Deserialize; -// use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -// use zettle_db::rabbitmq::{RabbitMQProducer}; -// use std::sync::Arc; - -// #[derive(Deserialize)] -// struct IngressPayload { -// payload: String, -// } - -// use tracing::{info, error}; - -// async fn ingress_handler( -// // Extension(rabbitmq): Extension>, -// Extension(producer): Extension>, -// Json(payload): Json -// ) -> Response { -// info!("Received payload: {:?}", payload.payload); -// match producer.publish(&payload.payload).await { - -// Ok(_) => { -// info!("Message published successfully"); -// "thank you".to_string().into_response() -// }, -// Err(e) => { -// error!("Failed to publish message: {:?}", e); -// (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() -// } -// } -// // match rabbitmq.publish("hello", payload.payload.as_bytes()).await { -// // Ok(_) => { -// // info!("Message published successfully"); -// // "thank you".to_string().into_response() -// // }, -// // Err(e) => { -// // error!("Failed to publish message: {:?}", e); -// // (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() -// // } -// // } -// } - -// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] -// async fn main() -> Result<(), Error> { -// // Set up tracing -// tracing_subscriber::registry() -// .with(fmt::layer()) -// .with(EnvFilter::from_default_env()) -// .try_init() -// .ok(); - -// // Set up RabbitMQ -// // let rabbitmq = Arc::new(RabbitMQClient::new("amqp://guest:guest@localhost:5672").await.expect("Failed to connect to RabbitMQ")); -// let producer = Arc::new(RabbitMQProducer::new("amqp://localhost", "my_exchange", "my_routing_key").await); - -// // Create Axum router -// let app = Router::new() -// .route("/ingress", post(ingress_handler)) -// .layer(Extension(producer)); - -// tracing::info!("Listening on 0.0.0.0:3000"); -// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); -// axum::serve(listener, app).await.unwrap(); - -// Ok(()) -// } use axum::{ - routing::post, - Router, - response::{IntoResponse, Response}, - Extension, Json, + http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router }; use serde::Deserialize; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::{producer::RabbitMQProducer, RabbitMQConfig}; +use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, producer::RabbitMQProducer, RabbitMQConfig}; use std::sync::Arc; #[derive(Deserialize)] @@ -161,6 +30,37 @@ async fn ingress_handler( } } +async fn queue_length_handler() -> Response { + info!("Getting queue length"); + + // Set up RabbitMQ config + let config = RabbitMQConfig { + amqp_addr: "amqp://localhost".to_string(), + exchange: "my_exchange".to_string(), + queue: "my_queue".to_string(), + routing_key: "my_key".to_string(), + }; + + // Create a new consumer + match RabbitMQConsumer::new(&config).await { + Ok(consumer) => { + info!("Consumer connected to RabbitMQ"); + + // Get the queue length + let queue_length = consumer.queue.message_count(); + + info!("Queue length: {}", queue_length); + + // Return the queue length with a 200 OK status + (StatusCode::OK, queue_length.to_string()).into_response() + }, + Err(e) => { + error!("Failed to create consumer: {:?}", e); + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to connect to RabbitMQ".to_string()).into_response() + } + } +} + #[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn main() -> Result<(), Box> { // Set up tracing @@ -171,7 +71,7 @@ async fn main() -> Result<(), Box> { .ok(); // Set up RabbitMQ - let config = RabbitMQConfig { + let config = RabbitMQConfig { amqp_addr: "amqp://localhost".to_string(), exchange: "my_exchange".to_string(), queue: "my_queue".to_string(), @@ -183,6 +83,7 @@ async fn main() -> Result<(), Box> { // Create Axum router let app = Router::new() .route("/ingress", post(ingress_handler)) + .route("/message_count", get(queue_length_handler)) .layer(Extension(producer)); tracing::info!("Listening on 0.0.0.0:3000");