diff --git a/Cargo.lock b/Cargo.lock index c0e81ea..ac187a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2387,6 +2387,7 @@ dependencies = [ "futures-lite 2.3.0", "lapin", "serde", + "thiserror", "tokio", "tracing", "tracing-subscriber", diff --git a/Cargo.toml b/Cargo.toml index 0d90e54..b3934b8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ axum = "0.7.5" futures-lite = "2.3.0" lapin = { version = "2.5.0", features = ["serde_json"] } serde = { version = "1.0.210", features = ["derive"] } +thiserror = "1.0.63" tokio = { version = "1.40.0", features = ["full"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } @@ -22,7 +23,3 @@ path = "src/server.rs" [[bin]] name = "consumer" path = "src/consumer.rs" - -[[bin]] -name = "example" -path = "src/example.rs" diff --git a/src/consumer.rs b/src/consumer.rs index f8fe205..5d2beb9 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1,73 +1,122 @@ -use lapin::{ - options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result, -}; +// use lapin::{ +// options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result, +// }; +// use tracing::{info, error}; +// use futures_lite::stream::StreamExt; + +// pub struct RabbitMQConsumer { +// pub connection: Connection, +// pub consumer: Consumer, +// } + +// impl RabbitMQConsumer { +// pub async fn new(addr: &str, queue: &str) -> Result { +// let connection = Connection::connect(addr, ConnectionProperties::default()).await?; +// let channel = connection.create_channel().await?; + +// // Declare the queue (in case it doesn't exist) +// channel +// .queue_declare( +// queue, +// QueueDeclareOptions::default(), +// FieldTable::default(), +// ) +// .await?; + +// let consumer = channel +// .basic_consume( +// queue, +// "consumer", +// BasicConsumeOptions::default(), +// FieldTable::default(), +// ) +// .await?; + +// Ok(Self { connection, consumer }) +// } + +// pub async fn run(&mut self) -> Result<()> { +// info!("Consumer started - waiting for messages"); +// while let Some(delivery) = self.consumer.next().await { +// match delivery { +// Ok(delivery) => { +// let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8"); +// info!("Received message: {}", message); + +// // Process the message here +// // For example, you could deserialize it and perform some action + +// delivery.ack(BasicAckOptions::default()).await?; +// }, +// Err(e) => error!("Failed to consume message: {:?}", e), +// } +// } + +// Ok(()) +// } +// } + +// #[tokio::main] +// async fn main() -> Result<()> { +// // Set up tracing +// tracing_subscriber::fmt::init(); + +// let addr = "amqp://guest:guest@localhost:5672"; +// let queue = "hello"; + +// let mut consumer = RabbitMQConsumer::new(addr, queue).await?; + +// info!("Starting consumer"); +// consumer.run().await?; + +// Ok(()) +// } +use zettle_db::rabbitmq::{RabbitMQConsumer, RabbitMQError}; +use tokio; use tracing::{info, error}; -use futures_lite::stream::StreamExt; - -pub struct RabbitMQConsumer { - pub connection: Connection, - pub consumer: Consumer, -} - -impl RabbitMQConsumer { - pub async fn new(addr: &str, queue: &str) -> Result { - let connection = Connection::connect(addr, ConnectionProperties::default()).await?; - let channel = connection.create_channel().await?; - - // Declare the queue (in case it doesn't exist) - channel - .queue_declare( - queue, - QueueDeclareOptions::default(), - FieldTable::default(), - ) - .await?; - - let consumer = channel - .basic_consume( - queue, - "consumer", - BasicConsumeOptions::default(), - FieldTable::default(), - ) - .await?; - - Ok(Self { connection, consumer }) - } - - pub async fn run(&mut self) -> Result<()> { - info!("Consumer started - waiting for messages"); - while let Some(delivery) = self.consumer.next().await { - match delivery { - Ok(delivery) => { - let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8"); - info!("Received message: {}", message); - - // Process the message here - // For example, you could deserialize it and perform some action - - delivery.ack(BasicAckOptions::default()).await?; - }, - Err(e) => error!("Failed to consume message: {:?}", e), - } - } - - Ok(()) - } -} +use tracing_subscriber::{fmt, prelude::*, EnvFilter}; #[tokio::main] -async fn main() -> Result<()> { +async fn main() -> Result<(), Box> { // Set up tracing - tracing_subscriber::fmt::init(); + tracing_subscriber::registry() + .with(fmt::layer()) + .with(EnvFilter::from_default_env()) + .try_init() + .ok(); - let addr = "amqp://guest:guest@localhost:5672"; - let queue = "hello"; + info!("Starting RabbitMQ consumer"); - let mut consumer = RabbitMQConsumer::new(addr, queue).await?; - - info!("Starting consumer"); - consumer.run().await?; + // Create a RabbitMQ consumer + let consumer = RabbitMQConsumer::new( + "amqp://localhost", + "my_exchange", + "my_queue", + "my_routing_key" + ).await?; + + info!("Consumer connected to RabbitMQ"); + + // Start consuming messages + loop { + match consumer.consume().await { + Ok(message) => { + info!("Received message: {}", message); + // Process the message here + // For example, you could insert it into a database + // process_message(&message).await?; + } + Err(RabbitMQError::ConsumeError(e)) => { + error!("Error consuming message: {}", e); + // Optionally add a delay before trying again + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + Err(e) => { + error!("Unexpected error: {}", e); + break; + } + } + } Ok(()) } diff --git a/src/example.rs b/src/example.rs deleted file mode 100644 index 5733021..0000000 --- a/src/example.rs +++ /dev/null @@ -1,104 +0,0 @@ -use amqprs::{ - callbacks::{DefaultChannelCallback, DefaultConnectionCallback}, - channel::{ - BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments, - }, - connection::{Connection, OpenConnectionArguments}, - consumer::DefaultConsumer, - BasicProperties, -}; -use tokio::time; - -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - -#[tokio::main(flavor = "multi_thread", worker_threads = 2)] -async fn main() { - // construct a subscriber that prints formatted traces to stdout - // global subscriber with log level according to RUST_LOG - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - - // open a connection to RabbitMQ server - let connection = Connection::open(&OpenConnectionArguments::new( - "localhost", - 5672, - "guest", - "guest", - )) - .await - .unwrap(); - connection - .register_callback(DefaultConnectionCallback) - .await - .unwrap(); - - // open a channel on the connection - let channel = connection.open_channel(None).await.unwrap(); - channel - .register_callback(DefaultChannelCallback) - .await - .unwrap(); - - // declare a durable queue - let (queue_name, _, _) = channel - .queue_declare(QueueDeclareArguments::durable_client_named( - "amqprs.examples.basic", - )) - .await - .unwrap() - .unwrap(); - - // bind the queue to exchange - let routing_key = "amqprs.example"; - let exchange_name = "amq.topic"; - channel - .queue_bind(QueueBindArguments::new( - &queue_name, - exchange_name, - routing_key, - )) - .await - .unwrap(); - - ////////////////////////////////////////////////////////////////////////////// - // start consumer with given name - let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub"); - - channel - .basic_consume(DefaultConsumer::new(args.no_ack), args) - .await - .unwrap(); - - ////////////////////////////////////////////////////////////////////////////// - // publish message - let content = String::from( - r#" - { - "publisher": "example" - "data": "Hello, amqprs!" - } - "#, - ) - .into_bytes(); - - // create arguments for basic_publish - let args = BasicPublishArguments::new(exchange_name, routing_key); - - channel - .basic_publish(BasicProperties::default(), content, args) - .await - .unwrap(); - tracing::info!("finished"); - - // keep the `channel` and `connection` object from dropping before pub/sub is done. - // channel/connection will be closed when drop. - time::sleep(time::Duration::from_secs(1)).await; - // explicitly close - - channel.close().await.unwrap(); - connection.close().await.unwrap(); -} - diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index 8002dee..112ffaf 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -1,36 +1,170 @@ -use lapin::{ - options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result -}; +// use lapin::{ +// options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result +// }; -pub struct RabbitMQClient { +// pub struct RabbitMQClient { +// pub connection: Connection, +// pub channel: Channel, +// } + +// impl RabbitMQClient { +// pub async fn new(addr: &str) -> Result { +// let connection = Connection::connect(addr, ConnectionProperties::default()).await?; +// let channel = connection.create_channel().await?; + +// Ok(Self { connection, channel }) +// } + +// pub async fn publish(&self, queue: &str, payload: &[u8]) -> Result<()> { +// self.channel +// .basic_publish( +// "", +// queue, +// Default::default(), +// payload, +// BasicProperties::default(), +// ) +// .await?; + +// Ok(()) +// } + +// pub async fn consume(&self, queue: &str) -> Result { +// let consumer = self.channel +// .basic_consume( +// queue, +// "consumer", +// BasicConsumeOptions::default(), +// FieldTable::default(), +// ) +// .await?; + +// Ok(consumer) +// } +// } +use lapin::{ + options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Consumer, ExchangeKind +}; +use futures_lite::stream::StreamExt; + +#[derive(Debug, thiserror::Error)] +pub enum RabbitMQError { + #[error("Connection error: {0}")] + ConnectionError(#[from] lapin::Error), + #[error("Channel error: {0}")] + ChannelError(String), + #[error("Consume error: {0}")] + ConsumeError(String), + #[error("Publish error: {0}")] + PublishError(String), +} + +struct RabbitMQConnection { pub connection: Connection, pub channel: Channel, } -impl RabbitMQClient { - pub async fn new(addr: &str) -> Result { - let connection = Connection::connect(addr, ConnectionProperties::default()).await?; +impl RabbitMQConnection { + async fn new(url: &str) -> Result { + let connection = Connection::connect( + url, + ConnectionProperties::default(), + ).await?; + let channel = connection.create_channel().await?; - + Ok(Self { connection, channel }) } +} - pub async fn publish(&self, queue: &str, payload: &[u8]) -> Result<()> { - self.channel - .basic_publish( - "", - queue, - Default::default(), - payload, - BasicProperties::default(), - ) - .await?; +// pub struct RabbitMQConsumer { +// connection: RabbitMQConnection, +// queue_name: String, +// } - Ok(()) - } +// impl RabbitMQConsumer { +// pub async fn new(url: &str, queue_name: &str) -> Result { +// let connection = RabbitMQConnection::new(url).await?; +// Ok(Self { +// connection, +// queue_name: queue_name.to_string(), +// }) +// } - pub async fn consume(&self, queue: &str) -> Result { - let consumer = self.channel +// pub async fn consume(&self) -> Result { +// let consumer = self.connection.channel +// .basic_consume( +// &self.queue_name, +// "consumer", +// BasicConsumeOptions::default(), +// FieldTable::default(), +// ) +// .await +// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + +// let delivery = consumer.clone().next().await +// .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? +// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + +// let message = String::from_utf8_lossy(&delivery.data).to_string(); + +// self.connection.channel +// .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) +// .await +// .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + +// Ok(message) +// } +// } +pub struct RabbitMQConsumer { + connection: Connection, + channel: Channel, + consumer: Consumer, +} + +impl RabbitMQConsumer { + pub async fn new(url: &str, exchange: &str, queue: &str, routing_key: &str) -> Result { + let connection = Connection::connect( + url, + ConnectionProperties::default(), + ).await?; + + let channel = connection.create_channel().await?; + + // Declare the exchange + channel.exchange_declare( + exchange, + ExchangeKind::Topic, + ExchangeDeclareOptions { + durable: true, + auto_delete: false, + ..ExchangeDeclareOptions::default() + }, + FieldTable::default(), + ).await?; + + // Declare the queue + channel.queue_declare( + queue, + QueueDeclareOptions { + durable: true, + auto_delete: false, + ..QueueDeclareOptions::default() + }, + FieldTable::default(), + ).await?; + + // Bind the queue to the exchange + channel.queue_bind( + queue, + exchange, + routing_key, + QueueBindOptions::default(), + FieldTable::default(), + ).await?; + + // Create the consumer + let consumer = channel .basic_consume( queue, "consumer", @@ -39,6 +173,78 @@ impl RabbitMQClient { ) .await?; - Ok(consumer) + Ok(Self { + connection, + channel, + consumer, + }) + } + + pub async fn consume(&self) -> Result { + let delivery = self.consumer.clone().next().await + .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? + .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + + let message = String::from_utf8_lossy(&delivery.data).to_string(); + + self.channel + .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) + .await + .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; + + Ok(message) + } +} + + +pub struct RabbitMQProducer { + connection: Connection, + channel: Channel, + exchange_name: String, + routing_key: String, +} + +impl RabbitMQProducer { + pub async fn new(url: &str, exchange_name: &str, routing_key: &str) -> Result { + let connection = Connection::connect( + url, + ConnectionProperties::default(), + ).await?; + + let channel = connection.create_channel().await?; + + // Declare the exchange + channel.exchange_declare( + exchange_name, + ExchangeKind::Topic, + ExchangeDeclareOptions { + durable: true, + auto_delete: false, + ..ExchangeDeclareOptions::default() + }, + FieldTable::default(), + ).await?; + + Ok(Self { + connection, + channel, + exchange_name: exchange_name.to_string(), + routing_key: routing_key.to_string(), + }) + } + + pub async fn publish(&self, message: &str) -> Result<(), RabbitMQError> { + self.channel + .basic_publish( + &self.exchange_name, + &self.routing_key, + BasicPublishOptions::default(), + message.as_bytes(), + BasicProperties::default(), + ) + .await? + .await?; + + Ok(()) } } diff --git a/src/server.rs b/src/server.rs index 6f16df2..e8dcbcb 100644 --- a/src/server.rs +++ b/src/server.rs @@ -53,6 +53,77 @@ // let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); // axum::serve(listener, app).await.unwrap(); +// Ok(()) +// } +// use axum::{ +// routing::post, +// Router, +// response::{IntoResponse, Response}, +// Error, Extension, Json, +// }; +// use serde::Deserialize; +// use tracing_subscriber::{fmt, prelude::*, EnvFilter}; +// use zettle_db::rabbitmq::{RabbitMQProducer}; +// use std::sync::Arc; + +// #[derive(Deserialize)] +// struct IngressPayload { +// payload: String, +// } + +// use tracing::{info, error}; + +// async fn ingress_handler( +// // Extension(rabbitmq): Extension>, +// Extension(producer): Extension>, +// Json(payload): Json +// ) -> Response { +// info!("Received payload: {:?}", payload.payload); +// match producer.publish(&payload.payload).await { + +// Ok(_) => { +// info!("Message published successfully"); +// "thank you".to_string().into_response() +// }, +// Err(e) => { +// error!("Failed to publish message: {:?}", e); +// (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() +// } +// } +// // match rabbitmq.publish("hello", payload.payload.as_bytes()).await { +// // Ok(_) => { +// // info!("Message published successfully"); +// // "thank you".to_string().into_response() +// // }, +// // Err(e) => { +// // error!("Failed to publish message: {:?}", e); +// // (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() +// // } +// // } +// } + +// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] +// async fn main() -> Result<(), Error> { +// // Set up tracing +// tracing_subscriber::registry() +// .with(fmt::layer()) +// .with(EnvFilter::from_default_env()) +// .try_init() +// .ok(); + +// // Set up RabbitMQ +// // let rabbitmq = Arc::new(RabbitMQClient::new("amqp://guest:guest@localhost:5672").await.expect("Failed to connect to RabbitMQ")); +// let producer = Arc::new(RabbitMQProducer::new("amqp://localhost", "my_exchange", "my_routing_key").await); + +// // Create Axum router +// let app = Router::new() +// .route("/ingress", post(ingress_handler)) +// .layer(Extension(producer)); + +// tracing::info!("Listening on 0.0.0.0:3000"); +// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); +// axum::serve(listener, app).await.unwrap(); + // Ok(()) // } use axum::{ @@ -63,7 +134,7 @@ use axum::{ }; use serde::Deserialize; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::RabbitMQClient; +use zettle_db::rabbitmq::{RabbitMQProducer, RabbitMQError}; use std::sync::Arc; #[derive(Deserialize)] @@ -74,11 +145,11 @@ struct IngressPayload { use tracing::{info, error}; async fn ingress_handler( - Extension(rabbitmq): Extension>, + Extension(producer): Extension>, Json(payload): Json ) -> Response { info!("Received payload: {:?}", payload.payload); - match rabbitmq.publish("hello", payload.payload.as_bytes()).await { + match producer.publish(&payload.payload).await { Ok(_) => { info!("Message published successfully"); "thank you".to_string().into_response() @@ -91,7 +162,7 @@ async fn ingress_handler( } #[tokio::main(flavor = "multi_thread", worker_threads = 2)] -async fn main() -> Result<(), Error> { +async fn main() -> Result<(), Box> { // Set up tracing tracing_subscriber::registry() .with(fmt::layer()) @@ -100,17 +171,18 @@ async fn main() -> Result<(), Error> { .ok(); // Set up RabbitMQ - let rabbitmq = Arc::new(RabbitMQClient::new("amqp://guest:guest@localhost:5672").await.expect("Failed to connect to RabbitMQ")); + let producer = Arc::new(RabbitMQProducer::new("amqp://localhost", "my_exchange", "my_routing_key").await?); // Create Axum router let app = Router::new() .route("/ingress", post(ingress_handler)) - .layer(Extension(rabbitmq)); + .layer(Extension(producer)); tracing::info!("Listening on 0.0.0.0:3000"); - let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap(); - axum::serve(listener, app).await.unwrap(); + let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?; + axum::serve(listener, app).await?; Ok(()) } +