From 30de906ee6afb0d4ade9ba1c831b2eafdc5ea44c Mon Sep 17 00:00:00 2001 From: "Per Stark\" (aider)" Date: Wed, 18 Sep 2024 12:24:32 +0200 Subject: [PATCH] fix: resolve compilation errors in rabbitmq module by correcting async function return types and method calls --- src/rabbitmq/mod.rs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index afbbeec..4a9717c 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -37,10 +37,10 @@ impl RabbitMQ { } pub async fn declare_queue(&self, queue_name: &str) -> Result<(String, u32, u32), Box> { - Ok(self.channel + self.channel .queue_declare(QueueDeclareArguments::durable_client_named(queue_name)) - .await?? - ) + .await? + .ok_or_else(|| Box::new(std::io::Error::new(std::io::ErrorKind::Other, "Failed to declare queue")) as Box) } pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<(), Box> { @@ -62,17 +62,21 @@ impl RabbitMQ { let (tx, rx) = mpsc::channel(100); let args = BasicConsumeArguments::new(queue_name, consumer_tag); - let consumer = DefaultConsumer::new(args.no_ack).with_callback(move |_deliver, _basic_properties, content| { - let content_str = String::from_utf8_lossy(&content).to_string(); - let tx = tx.clone(); - tokio::spawn(async move { - if let Err(e) = tx.send(content_str).await { - eprintln!("Failed to send message: {}", e); - } + let consumer = DefaultConsumer::new(args.no_ack); + let tx_clone = tx.clone(); + + self.channel.basic_consume(consumer, args) + .await? + .set_delegate(move |deliver: amqprs::consumer::DeliverEvent| { + let content_str = String::from_utf8_lossy(&deliver.content).to_string(); + let tx = tx_clone.clone(); + tokio::spawn(async move { + if let Err(e) = tx.send(content_str).await { + eprintln!("Failed to send message: {}", e); + } + }); }); - }); - self.channel.basic_consume(consumer, args).await?; Ok(rx) } }