diff --git a/src/consumer.rs b/src/consumer.rs index 642d977..6bd687e 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -1 +1,18 @@ use zettle_db::rabbitmq::RabbitMQ; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let rabbitmq = RabbitMQ::new().await?; + let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0; + rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?; + + let mut rx = rabbitmq.consume_messages(&queue_name, "example_consumer").await?; + + println!("Consumer waiting for messages. To exit press CTRL+C"); + + while let Some(message) = rx.recv().await { + println!("Received message: {}", message); + } + + Ok(()) +} diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index 68e979f..afbbeec 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -7,6 +7,7 @@ use amqprs::{ consumer::DefaultConsumer, BasicProperties, }; +use tokio::sync::mpsc; pub struct RabbitMQ { pub connection: Connection, @@ -14,43 +15,65 @@ pub struct RabbitMQ { } impl RabbitMQ { - pub async fn new() -> Self { + pub async fn new() -> Result> { let connection = Connection::open(&OpenConnectionArguments::new( "localhost", 5672, "user", "bitnami", )) - .await - .unwrap(); + .await?; connection .register_callback(DefaultConnectionCallback) - .await - .unwrap(); + .await?; - let channel = connection.open_channel(None).await.unwrap(); + let channel = connection.open_channel(None).await?; channel .register_callback(DefaultChannelCallback) - .await - .unwrap(); + .await?; - RabbitMQ { connection, channel } + Ok(RabbitMQ { connection, channel }) } - pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) { - self.channel + pub async fn declare_queue(&self, queue_name: &str) -> Result<(String, u32, u32), Box> { + Ok(self.channel .queue_declare(QueueDeclareArguments::durable_client_named(queue_name)) - .await - .unwrap() - .unwrap() + .await?? + ) } - pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) { + pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<(), Box> { self.channel .queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key)) - .await - .unwrap(); + .await?; + Ok(()) + } + + pub async fn publish_message(&self, exchange_name: &str, routing_key: &str, content: String) -> Result<(), Box> { + let args = BasicPublishArguments::new(exchange_name, routing_key); + self.channel + .basic_publish(BasicProperties::default(), content.into_bytes(), args) + .await?; + Ok(()) + } + + pub async fn consume_messages(&self, queue_name: &str, consumer_tag: &str) -> Result, Box> { + let (tx, rx) = mpsc::channel(100); + let args = BasicConsumeArguments::new(queue_name, consumer_tag); + + let consumer = DefaultConsumer::new(args.no_ack).with_callback(move |_deliver, _basic_properties, content| { + let content_str = String::from_utf8_lossy(&content).to_string(); + let tx = tx.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(content_str).await { + eprintln!("Failed to send message: {}", e); + } + }); + }); + + self.channel.basic_consume(consumer, args).await?; + Ok(rx) } } diff --git a/src/server.rs b/src/server.rs index 5a6cee4..f7c7fc7 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,42 @@ +use axum::{ + routing::post, + Router, + extract::Json, +}; +use serde::Deserialize; use zettle_db::rabbitmq::RabbitMQ; +use std::sync::Arc; -#[tokio::main(flavor = "multi_thread", worker_threads = 2)] -async fn main() { - let rabbitmq = RabbitMQ::new().await; - let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await; - rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await; - //... +#[derive(Deserialize)] +struct Message { + content: String, +} + +async fn publish_message( + Json(message): Json, + rabbitmq: axum::extract::State>, +) -> Result<(), String> { + rabbitmq + .publish_message("amq.topic", "amqprs.example", message.content) + .await + .map_err(|e| format!("Failed to publish message: {}", e)) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let rabbitmq = Arc::new(RabbitMQ::new().await?); + let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0; + rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?; + + let app = Router::new() + .route("/publish", post(publish_message)) + .with_state(rabbitmq); + + println!("Server running on http://localhost:3000"); + axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + .serve(app.into_make_service()) + .await?; + + Ok(()) }