mirror of
https://github.com/perstarkse/minne.git
synced 2026-05-30 11:20:46 +02:00
feat: implement RabbitMQ module with server and consumer functionality
This commit is contained in:
@@ -1 +1,18 @@
|
|||||||
use zettle_db::rabbitmq::RabbitMQ;
|
use zettle_db::rabbitmq::RabbitMQ;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let rabbitmq = RabbitMQ::new().await?;
|
||||||
|
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
|
||||||
|
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
|
||||||
|
|
||||||
|
let mut rx = rabbitmq.consume_messages(&queue_name, "example_consumer").await?;
|
||||||
|
|
||||||
|
println!("Consumer waiting for messages. To exit press CTRL+C");
|
||||||
|
|
||||||
|
while let Some(message) = rx.recv().await {
|
||||||
|
println!("Received message: {}", message);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|||||||
+40
-17
@@ -7,6 +7,7 @@ use amqprs::{
|
|||||||
consumer::DefaultConsumer,
|
consumer::DefaultConsumer,
|
||||||
BasicProperties,
|
BasicProperties,
|
||||||
};
|
};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
pub struct RabbitMQ {
|
pub struct RabbitMQ {
|
||||||
pub connection: Connection,
|
pub connection: Connection,
|
||||||
@@ -14,43 +15,65 @@ pub struct RabbitMQ {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl RabbitMQ {
|
impl RabbitMQ {
|
||||||
pub async fn new() -> Self {
|
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
|
||||||
let connection = Connection::open(&OpenConnectionArguments::new(
|
let connection = Connection::open(&OpenConnectionArguments::new(
|
||||||
"localhost",
|
"localhost",
|
||||||
5672,
|
5672,
|
||||||
"user",
|
"user",
|
||||||
"bitnami",
|
"bitnami",
|
||||||
))
|
))
|
||||||
.await
|
.await?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
connection
|
connection
|
||||||
.register_callback(DefaultConnectionCallback)
|
.register_callback(DefaultConnectionCallback)
|
||||||
.await
|
.await?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
let channel = connection.open_channel(None).await.unwrap();
|
let channel = connection.open_channel(None).await?;
|
||||||
channel
|
channel
|
||||||
.register_callback(DefaultChannelCallback)
|
.register_callback(DefaultChannelCallback)
|
||||||
.await
|
.await?;
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
RabbitMQ { connection, channel }
|
Ok(RabbitMQ { connection, channel })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
|
pub async fn declare_queue(&self, queue_name: &str) -> Result<(String, u32, u32), Box<dyn std::error::Error>> {
|
||||||
self.channel
|
Ok(self.channel
|
||||||
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
|
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
|
||||||
.await
|
.await??
|
||||||
.unwrap()
|
)
|
||||||
.unwrap()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
|
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
self.channel
|
self.channel
|
||||||
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
|
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
|
||||||
.await
|
.await?;
|
||||||
.unwrap();
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_message(&self, exchange_name: &str, routing_key: &str, content: String) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let args = BasicPublishArguments::new(exchange_name, routing_key);
|
||||||
|
self.channel
|
||||||
|
.basic_publish(BasicProperties::default(), content.into_bytes(), args)
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn consume_messages(&self, queue_name: &str, consumer_tag: &str) -> Result<mpsc::Receiver<String>, Box<dyn std::error::Error>> {
|
||||||
|
let (tx, rx) = mpsc::channel(100);
|
||||||
|
let args = BasicConsumeArguments::new(queue_name, consumer_tag);
|
||||||
|
|
||||||
|
let consumer = DefaultConsumer::new(args.no_ack).with_callback(move |_deliver, _basic_properties, content| {
|
||||||
|
let content_str = String::from_utf8_lossy(&content).to_string();
|
||||||
|
let tx = tx.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = tx.send(content_str).await {
|
||||||
|
eprintln!("Failed to send message: {}", e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
self.channel.basic_consume(consumer, args).await?;
|
||||||
|
Ok(rx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
+38
-6
@@ -1,10 +1,42 @@
|
|||||||
|
use axum::{
|
||||||
|
routing::post,
|
||||||
|
Router,
|
||||||
|
extract::Json,
|
||||||
|
};
|
||||||
|
use serde::Deserialize;
|
||||||
use zettle_db::rabbitmq::RabbitMQ;
|
use zettle_db::rabbitmq::RabbitMQ;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
|
#[derive(Deserialize)]
|
||||||
async fn main() {
|
struct Message {
|
||||||
let rabbitmq = RabbitMQ::new().await;
|
content: String,
|
||||||
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
|
}
|
||||||
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
|
|
||||||
//...
|
async fn publish_message(
|
||||||
|
Json(message): Json<Message>,
|
||||||
|
rabbitmq: axum::extract::State<Arc<RabbitMQ>>,
|
||||||
|
) -> Result<(), String> {
|
||||||
|
rabbitmq
|
||||||
|
.publish_message("amq.topic", "amqprs.example", message.content)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to publish message: {}", e))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let rabbitmq = Arc::new(RabbitMQ::new().await?);
|
||||||
|
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
|
||||||
|
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
|
||||||
|
|
||||||
|
let app = Router::new()
|
||||||
|
.route("/publish", post(publish_message))
|
||||||
|
.with_state(rabbitmq);
|
||||||
|
|
||||||
|
println!("Server running on http://localhost:3000");
|
||||||
|
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
|
||||||
|
.serve(app.into_make_service())
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user