working impl

This commit is contained in:
Per Stark
2024-09-20 11:39:39 +02:00
parent 7557769388
commit 10c48f37ed
13 changed files with 38043 additions and 107 deletions

View File

@@ -1,56 +1,44 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
use lapin::{
options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result
};
pub struct RabbitMQ {
pub struct RabbitMQClient {
pub connection: Connection,
pub channel: amqprs::channel::Channel,
pub channel: Channel,
}
impl RabbitMQ {
pub async fn new() -> Self {
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await
.unwrap();
impl RabbitMQClient {
pub async fn new(addr: &str) -> Result<Self> {
let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
RabbitMQ { connection, channel }
Ok(Self { connection, channel })
}
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
pub async fn publish(&self, queue: &str, payload: &[u8]) -> Result<()> {
self.channel
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
.await
.unwrap()
.unwrap()
.basic_publish(
"",
queue,
Default::default(),
payload,
BasicProperties::default(),
)
.await?;
Ok(())
}
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
self.channel
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
.await
.unwrap();
pub async fn consume(&self, queue: &str) -> Result<lapin::Consumer> {
let consumer = self.channel
.basic_consume(
queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(consumer)
}
}