diff --git a/src/consumer.rs b/src/consumer.rs index 5d2beb9..030d245 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -71,10 +71,10 @@ // Ok(()) // } -use zettle_db::rabbitmq::{RabbitMQConsumer, RabbitMQError}; use tokio; use tracing::{info, error}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; +use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -86,14 +86,17 @@ async fn main() -> Result<(), Box> { .ok(); info!("Starting RabbitMQ consumer"); +// Set up RabbitMQ config + let config = RabbitMQConfig { + amqp_addr: "amqp://localhost".to_string(), + exchange: "my_exchange".to_string(), + queue: "my_queue".to_string(), + routing_key: "my_key".to_string(), + }; + // Create a RabbitMQ consumer - let consumer = RabbitMQConsumer::new( - "amqp://localhost", - "my_exchange", - "my_queue", - "my_routing_key" - ).await?; + let consumer = RabbitMQConsumer::new(&config).await?; info!("Consumer connected to RabbitMQ"); diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs new file mode 100644 index 0000000..a0fae2f --- /dev/null +++ b/src/rabbitmq/consumer.rs @@ -0,0 +1,84 @@ +use lapin::{ + options::*, types::FieldTable, Channel, Consumer, Queue +}; +use futures_lite::stream::StreamExt; + +use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; + +pub struct RabbitMQConsumer { + common: RabbitMQCommon, + queue: Queue, + consumer: Consumer, +} + +impl RabbitMQConsumer { + pub async fn new(config: &RabbitMQConfig) -> Result { + let common = RabbitMQCommon::new(config).await?; + + // Passively declare the exchange (it should already exist) + common.declare_exchange(config, true).await?; + + // Declare queue and bind it to the channel + let queue = Self::declare_queue(&common.channel, config).await?; + Self::bind_queue(&common.channel, &config.exchange, &queue, config).await?; + + // Initialize the consumer + let consumer = Self::initialize_consumer(&common.channel, &config).await?; + + Ok(Self { common, queue, consumer}) + } + + async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result { + channel + .basic_consume( + &config.queue, + "consumer", + BasicConsumeOptions::default(), + FieldTable::default(), + ) + .await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string())) + } + + async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result { + channel + .queue_declare( + &config.queue, + QueueDeclareOptions { + durable: true, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ) + .await + .map_err(|e| RabbitMQError::QueueError(e.to_string())) + } + + async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> { + channel + .queue_bind( + queue.name().as_str(), + exchange, + &config.routing_key, + QueueBindOptions::default(), + FieldTable::default(), + ) + .await + .map_err(|e| RabbitMQError::QueueError(e.to_string())) + } + + pub async fn consume(&self) -> Result { + let delivery = self.consumer.clone().next().await + .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? + .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + + let message = String::from_utf8_lossy(&delivery.data).to_string(); + + self.common.channel + .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) + .await + .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + + Ok(message) + } +} + diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index 112ffaf..c2d042b 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -1,250 +1,74 @@ -// use lapin::{ -// options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result -// }; +pub mod producer; +pub mod consumer; -// pub struct RabbitMQClient { -// pub connection: Connection, -// pub channel: Channel, -// } - -// impl RabbitMQClient { -// pub async fn new(addr: &str) -> Result { -// let connection = Connection::connect(addr, ConnectionProperties::default()).await?; -// let channel = connection.create_channel().await?; - -// Ok(Self { connection, channel }) -// } - -// pub async fn publish(&self, queue: &str, payload: &[u8]) -> Result<()> { -// self.channel -// .basic_publish( -// "", -// queue, -// Default::default(), -// payload, -// BasicProperties::default(), -// ) -// .await?; - -// Ok(()) -// } - -// pub async fn consume(&self, queue: &str) -> Result { -// let consumer = self.channel -// .basic_consume( -// queue, -// "consumer", -// BasicConsumeOptions::default(), -// FieldTable::default(), -// ) -// .await?; - -// Ok(consumer) -// } -// } use lapin::{ - options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Consumer, ExchangeKind + Connection, ConnectionProperties, Channel, ExchangeKind, + options::ExchangeDeclareOptions, + types::FieldTable, + }; -use futures_lite::stream::StreamExt; +use thiserror::Error; +use tracing::debug; -#[derive(Debug, thiserror::Error)] +#[derive(Error, Debug)] pub enum RabbitMQError { - #[error("Connection error: {0}")] + #[error("Failed to connect to RabbitMQ: {0}")] ConnectionError(#[from] lapin::Error), #[error("Channel error: {0}")] ChannelError(String), #[error("Consume error: {0}")] ConsumeError(String), + #[error("Exchange error: {0}")] + ExchangeError(String), #[error("Publish error: {0}")] PublishError(String), + #[error("Error initializing consumer: {0}")] + InitializeConsumerError(String), + #[error("Queue error: {0}")] + QueueError(String), } -struct RabbitMQConnection { +pub struct RabbitMQConfig { + pub amqp_addr: String, + pub exchange: String, + pub queue: String, + pub routing_key: String, +} + +pub struct RabbitMQCommon { pub connection: Connection, pub channel: Channel, } -impl RabbitMQConnection { - async fn new(url: &str) -> Result { - let connection = Connection::connect( - url, - ConnectionProperties::default(), - ).await?; - +impl RabbitMQCommon { + pub async fn new(config: &RabbitMQConfig) -> Result { + let connection = Self::create_connection(config).await?; let channel = connection.create_channel().await?; - Ok(Self { connection, channel }) } -} -// pub struct RabbitMQConsumer { -// connection: RabbitMQConnection, -// queue_name: String, -// } + async fn create_connection(config: &RabbitMQConfig) -> Result { + debug!("Creating connection"); + Connection::connect(&config.amqp_addr, ConnectionProperties::default()) + .await + .map_err(RabbitMQError::ConnectionError) + } -// impl RabbitMQConsumer { -// pub async fn new(url: &str, queue_name: &str) -> Result { -// let connection = RabbitMQConnection::new(url).await?; -// Ok(Self { -// connection, -// queue_name: queue_name.to_string(), -// }) -// } - -// pub async fn consume(&self) -> Result { -// let consumer = self.connection.channel -// .basic_consume( -// &self.queue_name, -// "consumer", -// BasicConsumeOptions::default(), -// FieldTable::default(), -// ) -// .await -// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - -// let delivery = consumer.clone().next().await -// .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? -// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - -// let message = String::from_utf8_lossy(&delivery.data).to_string(); - -// self.connection.channel -// .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) -// .await -// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - -// Ok(message) -// } -// } -pub struct RabbitMQConsumer { - connection: Connection, - channel: Channel, - consumer: Consumer, -} - -impl RabbitMQConsumer { - pub async fn new(url: &str, exchange: &str, queue: &str, routing_key: &str) -> Result { - let connection = Connection::connect( - url, - ConnectionProperties::default(), - ).await?; - - let channel = connection.create_channel().await?; - - // Declare the exchange - channel.exchange_declare( - exchange, - ExchangeKind::Topic, - ExchangeDeclareOptions { - durable: true, - auto_delete: false, - ..ExchangeDeclareOptions::default() - }, - FieldTable::default(), - ).await?; - - // Declare the queue - channel.queue_declare( - queue, - QueueDeclareOptions { - durable: true, - auto_delete: false, - ..QueueDeclareOptions::default() - }, - FieldTable::default(), - ).await?; - - // Bind the queue to the exchange - channel.queue_bind( - queue, - exchange, - routing_key, - QueueBindOptions::default(), - FieldTable::default(), - ).await?; - - // Create the consumer - let consumer = channel - .basic_consume( - queue, - "consumer", - BasicConsumeOptions::default(), + pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> { + debug!("Declaring exchange"); + self.channel + .exchange_declare( + &config.exchange, + ExchangeKind::Topic, + ExchangeDeclareOptions { + passive, + durable: true, + ..ExchangeDeclareOptions::default() + }, FieldTable::default(), ) - .await?; - - Ok(Self { - connection, - channel, - consumer, - }) - } - - pub async fn consume(&self) -> Result { - let delivery = self.consumer.clone().next().await - .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? - .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - - let message = String::from_utf8_lossy(&delivery.data).to_string(); - - self.channel - .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) .await - .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - - Ok(message) + .map_err(|e| RabbitMQError::ExchangeError(e.to_string())) } } - -pub struct RabbitMQProducer { - connection: Connection, - channel: Channel, - exchange_name: String, - routing_key: String, -} - -impl RabbitMQProducer { - pub async fn new(url: &str, exchange_name: &str, routing_key: &str) -> Result { - let connection = Connection::connect( - url, - ConnectionProperties::default(), - ).await?; - - let channel = connection.create_channel().await?; - - // Declare the exchange - channel.exchange_declare( - exchange_name, - ExchangeKind::Topic, - ExchangeDeclareOptions { - durable: true, - auto_delete: false, - ..ExchangeDeclareOptions::default() - }, - FieldTable::default(), - ).await?; - - Ok(Self { - connection, - channel, - exchange_name: exchange_name.to_string(), - routing_key: routing_key.to_string(), - }) - } - - pub async fn publish(&self, message: &str) -> Result<(), RabbitMQError> { - self.channel - .basic_publish( - &self.exchange_name, - &self.routing_key, - BasicPublishOptions::default(), - message.as_bytes(), - BasicProperties::default(), - ) - .await? - .await?; - - Ok(()) - } -} diff --git a/src/rabbitmq/producer.rs b/src/rabbitmq/producer.rs new file mode 100644 index 0000000..8e44690 --- /dev/null +++ b/src/rabbitmq/producer.rs @@ -0,0 +1,39 @@ +use lapin::{ + options::*, publisher_confirm::Confirmation, BasicProperties, +}; + +use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; + +pub struct RabbitMQProducer { + common: RabbitMQCommon, + exchange_name: String, + routing_key: String, +} + +impl RabbitMQProducer { + pub async fn new(config: &RabbitMQConfig) -> Result { + let common = RabbitMQCommon::new(config).await?; + common.declare_exchange(config, false).await?; + + Ok(Self { + common, + exchange_name: config.exchange.clone(), + routing_key: config.routing_key.clone(), + }) + } + + pub async fn publish(&self, payload: &[u8]) -> Result { + self.common.channel + .basic_publish( + &self.exchange_name, + &self.routing_key, + BasicPublishOptions::default(), + payload, + BasicProperties::default(), + ) + .await + .map_err(|e| RabbitMQError::PublishError(e.to_string()))? + .await + .map_err(|e| RabbitMQError::PublishError(e.to_string())) + } +} diff --git a/src/server.rs b/src/server.rs index e8dcbcb..486a66c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -130,11 +130,11 @@ use axum::{ routing::post, Router, response::{IntoResponse, Response}, - Error, Extension, Json, + Extension, Json, }; use serde::Deserialize; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::{RabbitMQProducer, RabbitMQError}; +use zettle_db::rabbitmq::{producer::RabbitMQProducer, RabbitMQConfig}; use std::sync::Arc; #[derive(Deserialize)] @@ -149,7 +149,7 @@ async fn ingress_handler( Json(payload): Json ) -> Response { info!("Received payload: {:?}", payload.payload); - match producer.publish(&payload.payload).await { + match producer.publish(&payload.payload.into_bytes().to_vec()).await { Ok(_) => { info!("Message published successfully"); "thank you".to_string().into_response() @@ -171,7 +171,14 @@ async fn main() -> Result<(), Box> { .ok(); // Set up RabbitMQ - let producer = Arc::new(RabbitMQProducer::new("amqp://localhost", "my_exchange", "my_routing_key").await?); + let config = RabbitMQConfig { + amqp_addr: "amqp://localhost".to_string(), + exchange: "my_exchange".to_string(), + queue: "my_queue".to_string(), + routing_key: "my_key".to_string(), + }; + + let producer = Arc::new(RabbitMQProducer::new(&config).await?); // Create Axum router let app = Router::new()