feat: implement RabbitMQ connection and queue management in new module

This commit is contained in:
Per Stark
2024-09-18 12:20:01 +02:00
committed by Per Stark" (aider)
commit 5b4f9b1fa3
2 changed files with 66 additions and 0 deletions

56
src/rabbitmq/mod.rs Normal file
View File

@@ -0,0 +1,56 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
};
pub struct RabbitMQ {
pub connection: Connection,
pub channel: amqprs::channel::Channel,
}
impl RabbitMQ {
pub async fn new() -> Self {
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
RabbitMQ { connection, channel }
}
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
self.channel
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
.await
.unwrap()
.unwrap()
}
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
self.channel
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
.await
.unwrap();
}
}

10
src/server.rs Normal file
View File

@@ -0,0 +1,10 @@
use zettle_db::rabbitmq::RabbitMQ;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
let rabbitmq = RabbitMQ::new().await;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
//...
}