multipart wip

This commit is contained in:
Per Stark
2024-09-24 14:02:38 +02:00
parent 990f995caf
commit eed07c31f9
15 changed files with 871 additions and 97 deletions

View File

@@ -2,10 +2,13 @@ use lapin::{
message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue
};
use futures_lite::stream::StreamExt;
use tracing::info;
use crate::models::ingress::IngressContent;
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
use tokio::fs;
/// Struct to consume messages from RabbitMQ.
pub struct RabbitMQConsumer {
common: RabbitMQCommon,
pub queue: Queue,
@@ -26,7 +29,7 @@ impl RabbitMQConsumer {
// Initialize the consumer
let consumer = Self::initialize_consumer(&common.channel, &config).await?;
Ok(Self { common, queue, consumer})
Ok(Self { common, queue, consumer })
}
async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> {
@@ -67,16 +70,21 @@ impl RabbitMQConsumer {
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
pub async fn consume(&self) -> Result<(String, Delivery), RabbitMQError> {
/// Consumes a message and returns the deserialized IngressContent along with the Delivery
pub async fn consume(&self) -> Result<(IngressContent, Delivery), RabbitMQError> {
// Receive the next message
let delivery = self.consumer.clone().next().await
.ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))?
.map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?;
let message = String::from_utf8_lossy(&delivery.data).to_string();
// Deserialize the message payload into IngressContent
let ingress: IngressContent = serde_json::from_slice(&delivery.data)
.map_err(|e| RabbitMQError::ConsumeError(format!("Deserialization Error: {}", e)))?;
Ok((message, delivery))
Ok((ingress, delivery))
}
/// Acknowledges the message after processing
pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> {
self.common.channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
@@ -85,5 +93,37 @@ impl RabbitMQConsumer {
Ok(())
}
}
/// Processes messages in a loop
pub async fn process_messages(&self) -> Result<(), RabbitMQError> {
loop {
match self.consume().await {
Ok((ingress, delivery)) => {
info!("Received ingress object: {:?}", ingress);
// Process the ingress object
self.handle_ingress_content(&ingress).await;
info!("Processing done, acknowledging message");
self.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
Ok(())
}
/// Handles the IngressContent based on its type
async fn handle_ingress_content(&self, ingress: &IngressContent) {
info!("Processing content: {:?}", ingress);
}
}

View File

@@ -1,8 +1,8 @@
pub mod producer;
pub mod publisher;
pub mod consumer;
use lapin::{
options::{ExchangeDeclareOptions, QueueDeclareOptions}, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind, Queue
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind
};
use thiserror::Error;

View File

@@ -1,39 +0,0 @@
use lapin::{
options::*, publisher_confirm::Confirmation, BasicProperties,
};
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
pub struct RabbitMQProducer {
common: RabbitMQCommon,
exchange_name: String,
routing_key: String,
}
impl RabbitMQProducer {
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?;
Ok(Self {
common,
exchange_name: config.exchange.clone(),
routing_key: config.routing_key.clone(),
})
}
pub async fn publish(&self, payload: &[u8]) -> Result<Confirmation, RabbitMQError> {
self.common.channel
.basic_publish(
&self.exchange_name,
&self.routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(|e| RabbitMQError::PublishError(e.to_string()))?
.await
.map_err(|e| RabbitMQError::PublishError(e.to_string()))
}
}

60
src/rabbitmq/publisher.rs Normal file
View File

@@ -0,0 +1,60 @@
use lapin::{
options::*, publisher_confirm::Confirmation, BasicProperties,
};
use crate::models::ingress::IngressContent;
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
pub struct RabbitMQProducer {
common: RabbitMQCommon,
exchange_name: String,
routing_key: String,
}
impl RabbitMQProducer {
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?;
Ok(Self {
common,
exchange_name: config.exchange.clone(),
routing_key: config.routing_key.clone(),
})
}
/// Publishes an IngressContent object to RabbitMQ after serializing it to JSON.
pub async fn publish(&self, ingress: &IngressContent) -> Result<Confirmation, RabbitMQError> {
// Serialize IngressContent to JSON
let payload = serde_json::to_vec(ingress)
.map_err(|e| {
error!("Serialization Error: {}", e);
RabbitMQError::PublishError(format!("Serialization Error: {}", e))
})?;
// Publish the serialized payload to RabbitMQ
let confirmation = self.common.channel
.basic_publish(
&self.exchange_name,
&self.routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| {
error!("Publish Error: {}", e);
RabbitMQError::PublishError(format!("Publish Error: {}", e))
})?
.await
.map_err(|e| {
error!("Publish Confirmation Error: {}", e);
RabbitMQError::PublishError(format!("Publish Confirmation Error: {}", e))
})?;
info!("Published message to exchange '{}' with routing key '{}'", self.exchange_name, self.routing_key);
Ok(confirmation)
}
}