working impl

This commit is contained in:
Per Stark
2024-09-20 11:39:39 +02:00
parent 2e74468392
commit d45ec5b503
13 changed files with 38043 additions and 107 deletions

1489
.aider.chat.history.md Normal file

File diff suppressed because it is too large Load Diff

81
.aider.input.history Normal file
View File

@@ -0,0 +1,81 @@
# 2024-09-18 12:16:15.162114
+/help
# 2024-09-18 12:16:20.233100
+/models
# 2024-09-18 12:16:24.809720
+/models gpt
# 2024-09-18 12:16:48.569734
+/models sonett
# 2024-09-18 12:16:51.577488
+/models sonnett
# 2024-09-18 12:17:00.056944
+/models openrouter/anth
# 2024-09-18 12:17:14.809931
+/model openrouter/anthropic/claude-3.5-sonnet
# 2024-09-18 12:17:33.881262
+/map-refresh
# 2024-09-18 12:17:41.641177
+/help
# 2024-09-18 12:17:57.961398
+/map
# 2024-09-18 12:18:05.401158
+/git add .
# 2024-09-18 12:18:07.192985
+/map
# 2024-09-18 12:18:28.009751
+/add src/server.rs src/rabbitmq/mod.rs
# 2024-09-18 12:19:07.145078
+/git add .
# 2024-09-18 12:19:10.360827
+/add src/consumer.rs
# 2024-09-18 12:19:21.032773
+/help
# 2024-09-18 12:19:29.945442
+/web https://github.com/gftea/amqprs/blob/main/examples/src/basic_pub_sub.rs
# 2024-09-18 12:20:07.212864
+/run cargo run --bin server
# 2024-09-18 12:20:36.696824
+/undo
# 2024-09-18 12:22:06.040133
+/ask Welcome to the project, were setting up a rust server running axum and a ingress route that takes a string for now. The ingress route will publish a message containing the string to the rabbitmq connection. The consumer consumes messages and prints them for now. We should have a rabbitmq module that is usable and testable from both the server and consumer
# 2024-09-18 12:23:21.895428
+/code implement the changes
# 2024-09-18 12:23:59.290161
+/run cargo run --bin server
# 2024-09-18 12:24:03.149084
+/run cargo run --bin consumer
# 2024-09-18 12:24:12.199239
+/code fix the errors
# 2024-09-18 12:24:56.459326
+/run cargo run --bin server
# 2024-09-18 12:25:06.791601
+/undo
# 2024-09-18 12:25:14.774649
+/exit

Binary file not shown.

1
.gitignore vendored
View File

@@ -6,3 +6,4 @@
/target
.aider
.aider.chat.history.md .aider.input.history .aider.tags.cache.v3 .aider.tags.cache.v3/cache.db

1542
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -6,8 +6,9 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
amqprs = "2.0.0"
axum = "0.7.5"
futures-lite = "2.3.0"
lapin = { version = "2.5.0", features = ["serde_json"] }
serde = { version = "1.0.210", features = ["derive"] }
tokio = { version = "1.40.0", features = ["full"] }
tracing = "0.1.40"
@@ -22,3 +23,6 @@ path = "src/server.rs"
name = "consumer"
path = "src/consumer.rs"
[[bin]]
name = "example"
path = "src/example.rs"

34638
erl_crash.dump Normal file

File diff suppressed because one or more lines are too long

View File

@@ -41,7 +41,10 @@
languages.rust.enable = true;
services = {
rabbitmq.enable = true;
rabbitmq = {
enable = true;
plugins = ["tracing"];
};
};
}
];

13
log Normal file
View File

@@ -0,0 +1,13 @@
2024-09-20T08:47:35.861748Z INFO server: Listening on 0.0.0.0:3000
2024-09-20T08:47:40.074036Z INFO server: Received payload: "hello"
2024-09-20T08:47:40.116592Z INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2024-09-20T08:47:40.117117Z INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-20T08:47:41.120340Z INFO server: Message published successfully
2024-09-20T08:47:41.120554Z INFO amqprs::api::connection: try to close connection AMQPRS000@localhost:5672/ at drop
2024-09-20T08:47:41.120634Z INFO amqprs::api::channel: try to close channel 1 at drop
2024-09-20T08:47:41.122283Z WARN amqprs::net::reader_handler: CloseOk responder not found, probably connection 'AMQPRS000@localhost:5672/ [closed]' has dropped
2024-09-20T08:47:41.122309Z INFO amqprs::net::reader_handler: close connection 'AMQPRS000@localhost:5672/ [closed]' OK
2024-09-20T08:47:41.122362Z INFO amqprs::net::reader_handler: connection 'AMQPRS000@localhost:5672/ [closed]' is closed, shutting down socket I/O handlers
2024-09-20T08:47:41.122424Z INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [closed] of connection 'AMQPRS000@localhost:5672/ [closed]'
2024-09-20T08:47:41.122450Z INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:5672/ [closed]'
2024-09-20T08:47:41.122472Z ERROR amqprs::api::channel: failed to gracefully close channel 1 at drop, cause: 'AMQP internal communication error: channel closed'

View File

@@ -1,9 +1,73 @@
use zettle_db::rabbitmq::RabbitMQ;
use lapin::{
options::*, types::FieldTable, Connection, ConnectionProperties, Consumer, Result,
};
use tracing::{info, error};
use futures_lite::stream::StreamExt;
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
let rabbitmq = RabbitMQ::new().await;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
//...
pub struct RabbitMQConsumer {
pub connection: Connection,
pub consumer: Consumer,
}
impl RabbitMQConsumer {
pub async fn new(addr: &str, queue: &str) -> Result<Self> {
let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
// Declare the queue (in case it doesn't exist)
channel
.queue_declare(
queue,
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await?;
let consumer = channel
.basic_consume(
queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(Self { connection, consumer })
}
pub async fn run(&mut self) -> Result<()> {
info!("Consumer started - waiting for messages");
while let Some(delivery) = self.consumer.next().await {
match delivery {
Ok(delivery) => {
let message = std::str::from_utf8(&delivery.data).unwrap_or("Invalid UTF-8");
info!("Received message: {}", message);
// Process the message here
// For example, you could deserialize it and perform some action
delivery.ack(BasicAckOptions::default()).await?;
},
Err(e) => error!("Failed to consume message: {:?}", e),
}
}
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
// Set up tracing
tracing_subscriber::fmt::init();
let addr = "amqp://guest:guest@localhost:5672";
let queue = "hello";
let mut consumer = RabbitMQConsumer::new(addr, queue).await?;
info!("Starting consumer");
consumer.run().await?;
Ok(())
}

104
src/example.rs Normal file
View File

@@ -0,0 +1,104 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
};
use tokio::time;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
// construct a subscriber that prints formatted traces to stdout
// global subscriber with log level according to RUST_LOG
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
// open a connection to RabbitMQ server
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"guest",
"guest",
))
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
// open a channel on the connection
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
// declare a durable queue
let (queue_name, _, _) = channel
.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
.await
.unwrap()
.unwrap();
// bind the queue to exchange
let routing_key = "amqprs.example";
let exchange_name = "amq.topic";
channel
.queue_bind(QueueBindArguments::new(
&queue_name,
exchange_name,
routing_key,
))
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// start consumer with given name
let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub");
channel
.basic_consume(DefaultConsumer::new(args.no_ack), args)
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// publish message
let content = String::from(
r#"
{
"publisher": "example"
"data": "Hello, amqprs!"
}
"#,
)
.into_bytes();
// create arguments for basic_publish
let args = BasicPublishArguments::new(exchange_name, routing_key);
channel
.basic_publish(BasicProperties::default(), content, args)
.await
.unwrap();
tracing::info!("finished");
// keep the `channel` and `connection` object from dropping before pub/sub is done.
// channel/connection will be closed when drop.
time::sleep(time::Duration::from_secs(1)).await;
// explicitly close
channel.close().await.unwrap();
connection.close().await.unwrap();
}

View File

@@ -1,56 +1,44 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
use lapin::{
options::*, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties, Result
};
pub struct RabbitMQ {
pub struct RabbitMQClient {
pub connection: Connection,
pub channel: amqprs::channel::Channel,
pub channel: Channel,
}
impl RabbitMQ {
pub async fn new() -> Self {
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await
.unwrap();
impl RabbitMQClient {
pub async fn new(addr: &str) -> Result<Self> {
let connection = Connection::connect(addr, ConnectionProperties::default()).await?;
let channel = connection.create_channel().await?;
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
RabbitMQ { connection, channel }
Ok(Self { connection, channel })
}
pub async fn declare_queue(&self, queue_name: &str) -> (String, u32, u32) {
pub async fn publish(&self, queue: &str, payload: &[u8]) -> Result<()> {
self.channel
.queue_declare(QueueDeclareArguments::durable_client_named(queue_name))
.await
.unwrap()
.unwrap()
.basic_publish(
"",
queue,
Default::default(),
payload,
BasicProperties::default(),
)
.await?;
Ok(())
}
pub async fn bind_queue(&self, queue_name: &str, exchange_name: &str, routing_key: &str) {
self.channel
.queue_bind(QueueBindArguments::new(queue_name, exchange_name, routing_key))
.await
.unwrap();
pub async fn consume(&self, queue: &str) -> Result<lapin::Consumer> {
let consumer = self.channel
.basic_consume(
queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await?;
Ok(consumer)
}
}

View File

@@ -1,9 +1,116 @@
use zettle_db::rabbitmq::RabbitMQ;
// use axum::{
// routing::post,
// Router,
// response::{IntoResponse, Response},
// Error, Extension, Json,
// };
// use serde::Deserialize;
// use tracing_subscriber::{fmt, prelude::*, EnvFilter};
// #[derive(Deserialize)]
// struct IngressPayload {
// payload: String,
// }
// use tracing::{info, error};
// use zettle_db::rabbitmq::RabbitMQClient;
// async fn ingress_handler(
// // Extension(rabbitmq): Extension<RabbitMQ>,
// Json(payload): Json<IngressPayload>
// ) -> Response {
// info!("Received payload: {:?}", payload.payload);
// let rabbitmqclient = RabbitMQClient::new("127.0.0.1").await;
// match rabbitmq.publish(&payload.payload).await {
// Ok(_) => {
// info!("Message published successfully");
// "thank you".to_string().into_response()
// },
// Err(e) => {
// error!("Failed to publish message: {:?}", e);
// (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
// }
// }
// }
// #[tokio::main(flavor = "multi_thread", worker_threads = 2)]
// async fn main() -> Result<(), Error> {
// // Set up tracing
// tracing_subscriber::registry()
// .with(fmt::layer())
// .with(EnvFilter::from_default_env())
// .try_init()
// .ok();
// // Set up RabbitMQ
// // let rabbitmq = RabbitMQ::new("amqprs.examples.basic2", "amq.topic", "amqprs.example2").await;
// // Create Axum router
// let app = Router::new()
// .route("/ingress", post(ingress_handler)); // .layer(Extension(rabbitmq));
// tracing::info!("Listening on 0.0.0.0:3000");
// let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
// axum::serve(listener, app).await.unwrap();
// Ok(())
// }
use axum::{
routing::post,
Router,
response::{IntoResponse, Response},
Error, Extension, Json,
};
use serde::Deserialize;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::rabbitmq::RabbitMQClient;
use std::sync::Arc;
#[derive(Deserialize)]
struct IngressPayload {
payload: String,
}
use tracing::{info, error};
async fn ingress_handler(
Extension(rabbitmq): Extension<Arc<RabbitMQClient>>,
Json(payload): Json<IngressPayload>
) -> Response {
info!("Received payload: {:?}", payload.payload);
match rabbitmq.publish("hello", payload.payload.as_bytes()).await {
Ok(_) => {
info!("Message published successfully");
"thank you".to_string().into_response()
},
Err(e) => {
error!("Failed to publish message: {:?}", e);
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
let rabbitmq = RabbitMQ::new().await;
let queue_name = rabbitmq.declare_queue("amqprs.examples.basic").await;
rabbitmq.bind_queue(&queue_name.0, "amq.topic", "amqprs.example").await;
//...
async fn main() -> Result<(), Error> {
// Set up tracing
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
// Set up RabbitMQ
let rabbitmq = Arc::new(RabbitMQClient::new("amqp://guest:guest@localhost:5672").await.expect("Failed to connect to RabbitMQ"));
// Create Axum router
let app = Router::new()
.route("/ingress", post(ingress_handler))
.layer(Extension(rabbitmq));
tracing::info!("Listening on 0.0.0.0:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
Ok(())
}