working on a queue length

This commit is contained in:
Per Stark
2024-09-20 22:57:47 +02:00
parent 7d9ff5b920
commit 990f995caf
4 changed files with 52 additions and 216 deletions

View File

@@ -1,76 +1,3 @@
// use lapin::{
// options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result,
// };
// use tracing::{info, error};
// use futures_lite::stream::StreamExt;
// pub struct RabbitMQConsumer {
// pub connection: Connection,
// pub consumer: Consumer,
// }
// impl RabbitMQConsumer {
// pub async fn new(addr: &str, queue: &str) -> Result<Self> {
// let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
// let channel = connection.create_channel().await?;
// // Declare the queue (in case it doesn't exist)
// channel
// .queue_declare(
// queue,
// QueueDeclareOptions::default(),
// FieldTable::default(),
// )
// .await?;
// let consumer = channel
// .basic_consume(
// queue,
// "consumer",
// BasicConsumeOptions::default(),
// FieldTable::default(),
// )
// .await?;
// Ok(Self { connection, consumer })
// }
// pub async fn run(&mut self) -> Result<()> {
// info!("Consumer started - waiting for messages");
// while let Some(delivery) = self.consumer.next().await {
// match delivery {
// Ok(delivery) => {
// let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8");
// info!("Received message: {}", message);
// // Process the message here
// // For example, you could deserialize it and perform some action
// delivery.ack(BasicAckOptions::default()).await?;
// },
// Err(e) => error!("Failed to consume message: {:?}", e),
// }
// }
// Ok(())
// }
// }
// #[tokio::main]
// async fn main() -> Result<()> {
// // Set up tracing
// tracing_subscriber::fmt::init();
// let addr = "amqp://guest:guest@localhost:5672";
// let queue = "hello";
// let mut consumer = RabbitMQConsumer::new(addr, queue).await?;
// info!("Starting consumer");
// consumer.run().await?;
// Ok(())
// }
use tokio;
use tracing::{info, error};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -86,7 +13,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.ok();
info!("Starting RabbitMQ consumer");
// Set up RabbitMQ config
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
@@ -103,11 +31,14 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Start consuming messages
loop {
match consumer.consume().await {
Ok(message) => {
Ok((message, delivery)) => {
info!("Received message: {}", message);
// Process the message here
// For example, you could insert it into a database
// process_message(&message).await?;
info!("Done processing, acking");
consumer.ack_delivery(delivery).await?
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);