commit 5b4f9b1fa31be3ff04df4f16f9dda7f8de9c4e6f Author: Per Stark Date: Wed Sep 18 12:20:01 2024 +0200 feat: implement RabbitMQ connection and queue management in new module diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs new file mode 100644 index 0000000..68e979f --- /dev/null +++ b/src/rabbitmq/mod.rs @@ -0,0 +1,56 @@ +use amqprs::{ + callbacks::{DefaultChannelCallback, DefaultConnectionCallback}, + channel::{ + BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments, + }, + connection::{Connection, OpenConnectionArguments}, + consumer::DefaultConsumer, + BasicProperties, +}; + +pub struct RabbitMQ { + pub connection: Connection, + pub channel: amqprs::channel::Channel, +} + +impl RabbitMQ { + pub async fn new() -> Self { + let connection = Connection::open(&OpenConnectionArguments::new( + "localhost", + 5672, + "user", + "bitnami", + )) + .await + .unwrap(); + + connection + .register_callback(DefaultConnectionCallback) + .await + .unwrap(); + + let channel = connection.open_channel(None).await.unwrap(); + channel + .register_callback(DefaultChannelCallback) + .await + .unwrap(); + + RabbitMQ { connection, channel } + } + + pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) { + self.channel + .queue_declare(QueueDeclareArguments::durable_client_named(queue_name)) + .await + .unwrap() + .unwrap() + } + + pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) { + self.channel + .queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key)) + .await + .unwrap(); + } +} + diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..5a6cee4 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,10 @@ +use zettle_db::rabbitmq::RabbitMQ; + +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +async fn main() { + let rabbitmq = RabbitMQ::new().await; + let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await; + rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await; + //... +} +