new file:   .aider.chat.history.md
This commit is contained in:
Per Stark
2024-09-18 12:29:27 +02:00
parent 303ddc5811
commit a3a1cdd9e5
43 changed files with 5419 additions and 97 deletions

View File

@@ -1,18 +1,9 @@
use zettle_db::rabbitmq::RabbitMQ;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rabbitmq = RabbitMQ::new().await?;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
let mut rx = rabbitmq.consume_messages(&queue_name, "example_consumer").await?;
println!("Consumer waiting for messages. To exit press CTRL+C");
while let Some(message) = rx.recv().await {
println!("Received message: {}", message);
}
Ok(())
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
let rabbitmq = RabbitMQ::new().await;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
//...
}

1
src/lib.rs Normal file
View File

@@ -0,0 +1 @@
pub mod rabbitmq;

View File

@@ -7,7 +7,6 @@ use amqprs::{
consumer::DefaultConsumer,
BasicProperties,
};
use tokio::sync::mpsc;
pub struct RabbitMQ {
pub connection: Connection,
@@ -15,69 +14,43 @@ pub struct RabbitMQ {
}
impl RabbitMQ {
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
pub async fn new() -> Self {
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await?;
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await?;
.await
.unwrap();
let channel = connection.open_channel(None).await?;
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await?;
.await
.unwrap();
Ok(RabbitMQ { connection, channel })
RabbitMQ { connection, channel }
}
pub async fn declare_queue(&self, queue_name: &str) -> Result<(String, u32, u32), Box<dyn std::error::Error>> {
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
self.channel
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
.await?
.ok_or_else(|| Box::new(std::io::Error::new(std::io::ErrorKind::Other, "Failed to declare queue")) as Box<dyn std::error::Error>)
.await
.unwrap()
.unwrap()
}
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) -> Result<(), Box<dyn std::error::Error>> {
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
self.channel
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
.await?;
Ok(())
}
pub async fn publish_message(&self, exchange_name: &str, routing_key: &str, content: String) -> Result<(), Box<dyn std::error::Error>> {
let args = BasicPublishArguments::new(exchange_name, routing_key);
self.channel
.basic_publish(BasicProperties::default(), content.into_bytes(), args)
.await?;
Ok(())
}
pub async fn consume_messages(&self, queue_name: &str, consumer_tag: &str) -> Result<mpsc::Receiver<String>, Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel(100);
let args = BasicConsumeArguments::new(queue_name, consumer_tag);
let consumer = DefaultConsumer::new(args.no_ack);
let tx_clone = tx.clone();
self.channel.basic_consume(consumer, args)
.await?
.set_delegate(move |deliver: amqprs::consumer::DeliverEvent| {
let content_str = String::from_utf8_lossy(&deliver.content).to_string();
let tx = tx_clone.clone();
tokio::spawn(async move {
if let Err(e) = tx.send(content_str).await {
eprintln!("Failed to send message: {}", e);
}
});
});
Ok(rx)
.await
.unwrap();
}
}

View File

@@ -1,42 +1,9 @@
use axum::{
routing::post,
Router,
extract::Json,
};
use serde::Deserialize;
use zettle_db::rabbitmq::RabbitMQ;
use std::sync::Arc;
#[derive(Deserialize)]
struct Message {
content: String,
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
let rabbitmq = RabbitMQ::new().await;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
//...
}
async fn publish_message(
Json(message): Json<Message>,
rabbitmq: axum::extract::State<Arc<RabbitMQ>>,
) -> Result<(), String> {
rabbitmq
.publish_message("amq.topic", "amqprs.example", message.content)
.await
.map_err(|e| format!("Failed to publish message: {}", e))
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let rabbitmq = Arc::new(RabbitMQ::new().await?);
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await?.0;
rabbitmq.bind_queue(&queue_name, "amq.topic", "amqprs.example").await?;
let app = Router::new()
.route("/publish", post(publish_message))
.with_state(rabbitmq);
println!("Server running on http://localhost:3000");
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await?;
Ok(())
}