mirror of
https://github.com/perstarkse/minne.git
synced 2026-04-23 09:18:36 +02:00
more documentation, wip llm processing
This commit is contained in:
@@ -5,7 +5,7 @@ use futures_lite::stream::StreamExt;
|
||||
|
||||
use crate::models::{ingress_content::IngressContentError, ingress_object::IngressObject };
|
||||
|
||||
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
|
||||
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
|
||||
use tracing::{info, error};
|
||||
|
||||
/// Struct to consume messages from RabbitMQ.
|
||||
@@ -20,10 +20,10 @@ impl RabbitMQConsumer {
|
||||
/// declares a exchange if needed, declares and binds a queue and initializes the consumer
|
||||
///
|
||||
/// # Arguments
|
||||
/// * 'config' - A initialized RabbitMQConfig containing required configurations
|
||||
/// * `config` - A initialized RabbitMQConfig containing required configurations
|
||||
///
|
||||
/// # Returns
|
||||
/// * 'Result<Self, RabbitMQError>' - The created client or an error.
|
||||
/// * `Result<Self, RabbitMQError>` - The created client or an error.
|
||||
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
|
||||
let common = RabbitMQCommon::new(config).await?;
|
||||
|
||||
@@ -41,6 +41,13 @@ impl RabbitMQConsumer {
|
||||
}
|
||||
|
||||
/// Sets up the consumer based on the channel and `RabbitMQConfig`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `channel` - Lapin Channel.
|
||||
/// * `config` - A initialized RabbitMQConfig containing required information
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Consumer, RabbitMQError>` - The initialized consumer or error
|
||||
async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> {
|
||||
channel
|
||||
.basic_consume(
|
||||
@@ -52,6 +59,12 @@ impl RabbitMQConsumer {
|
||||
.await.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string()))
|
||||
}
|
||||
/// Declares the queue based on the channel and `RabbitMQConfig`.
|
||||
/// # Arguments
|
||||
/// * `channel` - Lapin Channel.
|
||||
/// * `config` - A initialized RabbitMQConfig containing required information
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Queue, RabbitMQError>` - The initialized queue or error
|
||||
async fn declare_queue(channel: &Channel, config: &RabbitMQConfig) -> Result<Queue, RabbitMQError> {
|
||||
channel
|
||||
.queue_declare(
|
||||
@@ -65,7 +78,16 @@ impl RabbitMQConsumer {
|
||||
.await
|
||||
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
|
||||
}
|
||||
|
||||
/// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`.
|
||||
/// # Arguments
|
||||
/// * `channel` - Lapin Channel.
|
||||
/// * `exchange` - String value of the exchange name
|
||||
/// * `queue` - Lapin queue thats declared
|
||||
/// * `config` - A initialized RabbitMQConfig containing required information
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<(), RabbitMQError>` - Ok or error
|
||||
async fn bind_queue(channel: &Channel, exchange: &str, queue: &Queue, config: &RabbitMQConfig) -> Result<(), RabbitMQError> {
|
||||
channel
|
||||
.queue_bind(
|
||||
@@ -101,6 +123,13 @@ impl RabbitMQConsumer {
|
||||
}
|
||||
|
||||
/// Acknowledges the message after processing
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `self` - Reference to self
|
||||
/// * `delivery` - Delivery reciept
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<(), RabbitMQError>` - Ok or error
|
||||
pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> {
|
||||
self.common.channel
|
||||
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
pub mod publisher;
|
||||
pub mod consumer;
|
||||
|
||||
use axum::async_trait;
|
||||
use lapin::{
|
||||
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind
|
||||
};
|
||||
@@ -41,6 +42,15 @@ pub struct RabbitMQCommon {
|
||||
pub channel: Channel,
|
||||
}
|
||||
|
||||
|
||||
/// Defines the behavior for RabbitMQCommon client operations.
|
||||
#[cfg_attr(test, mockall::automock)]
|
||||
#[async_trait]
|
||||
pub trait RabbitMQCommonTrait: Send + Sync {
|
||||
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError>;
|
||||
async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError>;
|
||||
}
|
||||
|
||||
impl RabbitMQCommon {
|
||||
/// Sets up a new RabbitMQ client or error
|
||||
///
|
||||
@@ -54,7 +64,10 @@ impl RabbitMQCommon {
|
||||
let channel = connection.create_channel().await?;
|
||||
Ok(Self { connection, channel })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RabbitMQCommonTrait for RabbitMQCommon {
|
||||
/// Function to set up the connection
|
||||
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError> {
|
||||
debug!("Creating connection");
|
||||
@@ -64,7 +77,7 @@ impl RabbitMQCommon {
|
||||
}
|
||||
|
||||
/// Function to declare the exchange required
|
||||
pub async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> {
|
||||
async fn declare_exchange(&self, config: &RabbitMQConfig, passive: bool) -> Result<(), RabbitMQError> {
|
||||
debug!("Declaring exchange");
|
||||
self.channel
|
||||
.exchange_declare(
|
||||
@@ -81,4 +94,3 @@ impl RabbitMQCommon {
|
||||
.map_err(|e| RabbitMQError::ExchangeError(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,9 +4,10 @@ use lapin::{
|
||||
|
||||
use crate::models::ingress_object::IngressObject;
|
||||
|
||||
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
|
||||
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
|
||||
use tracing::{info, error};
|
||||
|
||||
/// Struct to publish messages to RabbitMQ.
|
||||
pub struct RabbitMQProducer {
|
||||
common: RabbitMQCommon,
|
||||
exchange_name: String,
|
||||
@@ -14,6 +15,14 @@ pub struct RabbitMQProducer {
|
||||
}
|
||||
|
||||
impl RabbitMQProducer {
|
||||
/// Creates a new `RabbitMQProducer` instance which sets up a RabbitMQ client,
|
||||
/// declares a exchange if needed.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `config` - A initialized RabbitMQConfig containing required configurations
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Self, RabbitMQError>` - The created client or an error.
|
||||
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
|
||||
let common = RabbitMQCommon::new(config).await?;
|
||||
common.declare_exchange(config, false).await?;
|
||||
@@ -26,6 +35,13 @@ impl RabbitMQProducer {
|
||||
}
|
||||
|
||||
/// Publishes an IngressObject to RabbitMQ after serializing it to JSON.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `self` - Reference to self
|
||||
/// * `ingress_object` - A initialized IngressObject
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Confirmation, RabbitMQError>` - Confirmation of sent message or error
|
||||
pub async fn publish(&self, ingress_object: &IngressObject) -> Result<Confirmation, RabbitMQError> {
|
||||
// Serialize IngressObject to JSON
|
||||
let payload = serde_json::to_vec(ingress_object)
|
||||
|
||||
Reference in New Issue
Block a user