feat: surrealdb queue and remove lapin and rabbitmq

This commit is contained in:
Per Stark
2025-01-09 21:13:42 +01:00
parent 2b1348aae2
commit d77f07c626
28 changed files with 622 additions and 1306 deletions
+47 -37
View File
@@ -1,8 +1,11 @@
use std::sync::Arc;
use futures::StreamExt;
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::{
ingress::content_processor::ContentProcessor,
rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError},
ingress::{content_processor::ContentProcessor, jobqueue::JobQueue},
storage::db::SurrealDbClient,
utils::config::get_config,
};
@@ -15,49 +18,56 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.try_init()
.ok();
info!("Starting RabbitMQ consumer");
let config = get_config()?;
// Set up RabbitMQ config
let rabbitmq_config = RabbitMQConfig {
amqp_addr: config.rabbitmq_address.clone(),
exchange: config.rabbitmq_exchange.clone(),
queue: config.rabbitmq_queue.clone(),
routing_key: config.rabbitmq_routing_key.clone(),
};
let job_queue = JobQueue::new(Arc::new(
SurrealDbClient::new(
&config.surrealdb_address,
&config.surrealdb_username,
&config.surrealdb_password,
&config.surrealdb_namespace,
&config.surrealdb_database,
)
.await?,
));
// Create a RabbitMQ consumer
let consumer = RabbitMQConsumer::new(&rabbitmq_config, true).await?;
let content_processor = ContentProcessor::new(&config).await?;
// Start consuming messages
loop {
match consumer.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
// Get the TextContent
let text_content = ingress.to_text_content().await?;
// First, check for any unfinished jobs
let unfinished_jobs = job_queue.get_unfinished_jobs().await?;
// Initialize ContentProcessor which handles LLM analysis and storage
let content_processor = ContentProcessor::new(&config).await?;
if !unfinished_jobs.is_empty() {
info!("Found {} unfinished jobs", unfinished_jobs.len());
// Begin processing of TextContent
content_processor.process(&text_content).await?;
// Remove from queue
consumer.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
for job in unfinished_jobs {
if let Err(e) = job_queue.process_job(job.clone(), &content_processor).await {
error!("Error processing job {}: {}", job.id, e);
}
}
}
}
Ok(())
// If no unfinished jobs, start listening for new ones
info!("Listening for new jobs...");
let mut job_stream = job_queue.listen_for_jobs().await?;
while let Some(notification) = job_stream.next().await {
match notification {
Ok(notification) => {
info!("Received new job: {}", notification.data.id);
if let Err(e) = job_queue
.process_job(notification.data, &content_processor)
.await
{
error!("Error processing job: {}", e);
}
}
Err(e) => error!("Error in job notification: {}", e),
}
}
// If we reach here, the stream has ended (connection lost?)
error!("Job stream ended unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}