improved rabbitmq structs

This commit is contained in:
Per Stark
2024-09-20 12:40:40 +02:00
parent 10c48f37ed
commit 040542561e
6 changed files with 424 additions and 203 deletions

View File

@@ -1,73 +1,122 @@
use lapin::{
options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result,
};
// use lapin::{
// options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result,
// };
// use tracing::{info, error};
// use futures_lite::stream::StreamExt;
// pub struct RabbitMQConsumer {
// pub connection: Connection,
// pub consumer: Consumer,
// }
// impl RabbitMQConsumer {
// pub async fn new(addr: &str, queue: &str) -> Result<Self> {
// let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
// let channel = connection.create_channel().await?;
// // Declare the queue (in case it doesn't exist)
// channel
// .queue_declare(
// queue,
// QueueDeclareOptions::default(),
// FieldTable::default(),
// )
// .await?;
// let consumer = channel
// .basic_consume(
// queue,
// "consumer",
// BasicConsumeOptions::default(),
// FieldTable::default(),
// )
// .await?;
// Ok(Self { connection, consumer })
// }
// pub async fn run(&mut self) -> Result<()> {
// info!("Consumer started - waiting for messages");
// while let Some(delivery) = self.consumer.next().await {
// match delivery {
// Ok(delivery) => {
// let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8");
// info!("Received message: {}", message);
// // Process the message here
// // For example, you could deserialize it and perform some action
// delivery.ack(BasicAckOptions::default()).await?;
// },
// Err(e) => error!("Failed to consume message: {:?}", e),
// }
// }
// Ok(())
// }
// }
// #[tokio::main]
// async fn main() -> Result<()> {
// // Set up tracing
// tracing_subscriber::fmt::init();
// let addr = "amqp://guest:guest@localhost:5672";
// let queue = "hello";
// let mut consumer = RabbitMQConsumer::new(addr, queue).await?;
// info!("Starting consumer");
// consumer.run().await?;
// Ok(())
// }
use zettle_db::rabbitmq::{RabbitMQConsumer, RabbitMQError};
use tokio;
use tracing::{info, error};
use futures_lite::stream::StreamExt;
pub struct RabbitMQConsumer {
pub connection: Connection,
pub consumer: Consumer,
}
impl RabbitMQConsumer {
pub async fn new(addr: &str, queue: &str) -> Result<Self> {
let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
// Declare the queue (in case it doesn't exist)
channel
.queue_declare(
queue,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(Self { connection, consumer })
}
pub async fn run(&mut self) -> Result<()> {
info!("Consumer started - waiting for messages");
while let Some(delivery) = self.consumer.next().await {
match delivery {
Ok(delivery) => {
let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8");
info!("Received message: {}", message);
// Process the message here
// For example, you could deserialize it and perform some action
delivery.ack(BasicAckOptions::default()).await?;
},
Err(e) => error!("Failed to consume message: {:?}", e),
}
}
Ok(())
}
}
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main]
async fn main() -> Result<()> {
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up tracing
tracing_subscriber::fmt::init();
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
let addr = "amqp://guest:guest@localhost:5672";
let queue = "hello";
info!("Starting RabbitMQ consumer");
let mut consumer = RabbitMQConsumer::new(addr, queue).await?;
info!("Starting consumer");
consumer.run().await?;
// Create a RabbitMQ consumer
let consumer = RabbitMQConsumer::new(
"amqp://localhost",
"my_exchange",
"my_queue",
"my_routing_key"
).await?;
info!("Consumer connected to RabbitMQ");
// Start consuming messages
loop {
match consumer.consume().await {
Ok(message) => {
info!("Received message: {}", message);
// Process the message here
// For example, you could insert it into a database
// process_message(&message).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
Ok(())
}