mirror of
https://github.com/perstarkse/minne.git
synced 2026-04-21 08:21:25 +02:00
email wip
This commit is contained in:
@@ -1,6 +1,10 @@
|
||||
use tracing::info;
|
||||
use tracing::{error, info};
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig};
|
||||
use zettle_db::{
|
||||
ingress::content_processor::ContentProcessor,
|
||||
rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError},
|
||||
utils::config::get_config,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
@@ -13,19 +17,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
info!("Starting RabbitMQ consumer");
|
||||
|
||||
let config = get_config()?;
|
||||
|
||||
// Set up RabbitMQ config
|
||||
let config = RabbitMQConfig {
|
||||
amqp_addr: "amqp://localhost".to_string(),
|
||||
exchange: "my_exchange".to_string(),
|
||||
queue: "my_queue".to_string(),
|
||||
routing_key: "my_key".to_string(),
|
||||
let rabbitmq_config = RabbitMQConfig {
|
||||
amqp_addr: config.rabbitmq_address.clone(),
|
||||
exchange: config.rabbitmq_exchange.clone(),
|
||||
queue: config.rabbitmq_queue.clone(),
|
||||
routing_key: config.rabbitmq_routing_key.clone(),
|
||||
};
|
||||
|
||||
// Create a RabbitMQ consumer
|
||||
let consumer = RabbitMQConsumer::new(&config, true).await?;
|
||||
let consumer = RabbitMQConsumer::new(&rabbitmq_config, true).await?;
|
||||
|
||||
// Start consuming messages
|
||||
consumer.process_messages().await?;
|
||||
loop {
|
||||
match consumer.consume().await {
|
||||
Ok((ingress, delivery)) => {
|
||||
info!("Received IngressObject: {:?}", ingress);
|
||||
// Get the TextContent
|
||||
let text_content = ingress.to_text_content().await?;
|
||||
|
||||
// Initialize ContentProcessor which handles LLM analysis and storage
|
||||
let content_processor = ContentProcessor::new(&config).await?;
|
||||
|
||||
// Begin processing of TextContent
|
||||
content_processor.process(&text_content).await?;
|
||||
|
||||
// Remove from queue
|
||||
consumer.ack_delivery(delivery).await?;
|
||||
}
|
||||
Err(RabbitMQError::ConsumeError(e)) => {
|
||||
error!("Error consuming message: {}", e);
|
||||
// Optionally add a delay before trying again
|
||||
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Unexpected error: {}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user