mirror of
https://github.com/perstarkse/minne.git
synced 2026-03-26 19:31:32 +01:00
feat: implement RabbitMQ module with server and consumer functionality
This commit is contained in:
@@ -1 +1,18 @@
|
||||
use zettle_db::rabbitmq::RabbitMQ;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let rabbitmq = RabbitMQ::new().await?;
|
||||
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
|
||||
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
|
||||
|
||||
let mut rx = rabbitmq.consume_messages(&queue_name, "example_consumer").await?;
|
||||
|
||||
println!("Consumer waiting for messages. To exit press CTRL+C");
|
||||
|
||||
while let Some(message) = rx.recv().await {
|
||||
println!("Received message: {}", message);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ use amqprs::{
|
||||
consumer::DefaultConsumer,
|
||||
BasicProperties,
|
||||
};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
pub struct RabbitMQ {
|
||||
pub connection: Connection,
|
||||
@@ -14,43 +15,65 @@ pub struct RabbitMQ {
|
||||
}
|
||||
|
||||
impl RabbitMQ {
|
||||
pub async fn new() -> Self {
|
||||
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let connection = Connection::open(&OpenConnectionArguments::new(
|
||||
"localhost",
|
||||
5672,
|
||||
"user",
|
||||
"bitnami",
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
|
||||
connection
|
||||
.register_callback(DefaultConnectionCallback)
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
|
||||
let channel = connection.open_channel(None).await.unwrap();
|
||||
let channel = connection.open_channel(None).await?;
|
||||
channel
|
||||
.register_callback(DefaultChannelCallback)
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
|
||||
RabbitMQ { connection, channel }
|
||||
Ok(RabbitMQ { connection, channel })
|
||||
}
|
||||
|
||||
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
|
||||
self.channel
|
||||
pub async fn declare_queue(&self, queue_name: &str) -> Result<(String, u32, u32), Box<dyn std::error::Error>> {
|
||||
Ok(self.channel
|
||||
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
|
||||
.await
|
||||
.unwrap()
|
||||
.unwrap()
|
||||
.await??
|
||||
)
|
||||
}
|
||||
|
||||
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
|
||||
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||
self.channel
|
||||
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
|
||||
.await
|
||||
.unwrap();
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn publish_message(&self, exchange_name: &str, routing_key: &str, content: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||
let args = BasicPublishArguments::new(exchange_name, routing_key);
|
||||
self.channel
|
||||
.basic_publish(BasicProperties::default(), content.into_bytes(), args)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn consume_messages(&self, queue_name: &str, consumer_tag: &str) -> Result<mpsc::Receiver<String>, Box<dyn std::error::Error>> {
|
||||
let (tx, rx) = mpsc::channel(100);
|
||||
let args = BasicConsumeArguments::new(queue_name, consumer_tag);
|
||||
|
||||
let consumer = DefaultConsumer::new(args.no_ack).with_callback(move |_deliver, _basic_properties, content| {
|
||||
let content_str = String::from_utf8_lossy(&content).to_string();
|
||||
let tx = tx.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = tx.send(content_str).await {
|
||||
eprintln!("Failed to send message: {}", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
self.channel.basic_consume(consumer, args).await?;
|
||||
Ok(rx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,42 @@
|
||||
use axum::{
|
||||
routing::post,
|
||||
Router,
|
||||
extract::Json,
|
||||
};
|
||||
use serde::Deserialize;
|
||||
use zettle_db::rabbitmq::RabbitMQ;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
|
||||
async fn main() {
|
||||
let rabbitmq = RabbitMQ::new().await;
|
||||
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
|
||||
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
|
||||
//...
|
||||
#[derive(Deserialize)]
|
||||
struct Message {
|
||||
content: String,
|
||||
}
|
||||
|
||||
async fn publish_message(
|
||||
Json(message): Json<Message>,
|
||||
rabbitmq: axum::extract::State<Arc<RabbitMQ>>,
|
||||
) -> Result<(), String> {
|
||||
rabbitmq
|
||||
.publish_message("amq.topic", "amqprs.example", message.content)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to publish message: {}", e))
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
let rabbitmq = Arc::new(RabbitMQ::new().await?);
|
||||
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
|
||||
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
|
||||
|
||||
let app = Router::new()
|
||||
.route("/publish", post(publish_message))
|
||||
.with_state(rabbitmq);
|
||||
|
||||
println!("Server running on http://localhost:3000");
|
||||
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
|
||||
.serve(app.into_make_service())
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user