feat: surrealdb queue and remove lapin and rabbitmq

This commit is contained in:
Per Stark
2025-01-09 21:13:42 +01:00
parent a87cb82b75
commit 0f8a83429a
28 changed files with 622 additions and 1306 deletions

994
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -13,8 +13,8 @@ axum_session_auth = "0.14.1"
axum_session_surreal = "0.2.1"
axum_typed_multipart = "0.12.1"
config = "0.15.4"
dom_smoothie = "0.3.0"
futures = "0.3.31"
lapin = { version = "2.5.0", features = ["serde_json"] }
lettre = { version = "0.11.11", features = ["rustls-tls"] }
mime = "0.3.17"
mime_guess = "2.0.5"

File diff suppressed because one or more lines are too long

View File

@@ -15,12 +15,15 @@ use tower_http::services::ServeDir;
use tracing::info;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::{
rabbitmq::{consumer::RabbitMQConsumer, publisher::RabbitMQProducer, RabbitMQConfig},
ingress::jobqueue::JobQueue,
server::{
middleware_api_auth::api_auth,
routes::{
api::{
ingress::ingress_data, query::query_handler, queue_length::queue_length_handler,
ingress::ingress_data,
ingress_task::{delete_queue_task, get_queue_tasks},
query::query_handler,
queue_length::queue_length_handler,
},
html::{
account::{delete_account, set_api_key, show_account_page},
@@ -28,6 +31,7 @@ use zettle_db::{
gdpr::{accept_gdpr, deny_gdpr},
index::index_handler,
ingress::{process_ingress_form, show_ingress_form},
ingress_tasks::{delete_task, show_queue_tasks},
search_result::search_result_handler,
signin::{authenticate_user, show_signin_form},
signout::sign_out_user,
@@ -53,14 +57,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("{:?}", config);
// Set up RabbitMQ
let rabbitmq_config = RabbitMQConfig {
amqp_addr: config.rabbitmq_address,
exchange: config.rabbitmq_exchange,
queue: config.rabbitmq_queue,
routing_key: config.rabbitmq_routing_key,
};
let reloader = AutoReloader::new(move |notifier| {
let template_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("templates");
let mut env = Environment::new();
@@ -71,19 +67,19 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(env)
});
let surreal_db_client = Arc::new(
SurrealDbClient::new(
&config.surrealdb_address,
&config.surrealdb_username,
&config.surrealdb_password,
&config.surrealdb_namespace,
&config.surrealdb_database,
)
.await?,
);
let app_state = AppState {
rabbitmq_producer: Arc::new(RabbitMQProducer::new(&rabbitmq_config).await?),
rabbitmq_consumer: Arc::new(RabbitMQConsumer::new(&rabbitmq_config, false).await?),
surreal_db_client: Arc::new(
SurrealDbClient::new(
&config.surrealdb_address,
&config.surrealdb_username,
&config.surrealdb_password,
&config.surrealdb_namespace,
&config.surrealdb_database,
)
.await?,
),
surreal_db_client: surreal_db_client.clone(),
openai_client: Arc::new(async_openai::Client::new()),
templates: Arc::new(reloader),
mailer: Arc::new(Mailer::new(
@@ -91,6 +87,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
config.smtp_relayer,
config.smtp_password,
)?),
job_queue: Arc::new(JobQueue::new(surreal_db_client)),
};
// setup_auth(&app_state.surreal_db_client).await?;
@@ -134,6 +131,8 @@ fn api_routes_v1(app_state: &AppState) -> Router<AppState> {
// Ingress routes
.route("/ingress", post(ingress_data))
.route("/message_count", get(queue_length_handler))
.route("/queue", get(get_queue_tasks))
.route("/queue/:delivery_tag", delete(delete_queue_task))
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024))
// Query routes
.route("/query", post(query_handler))
@@ -158,6 +157,8 @@ fn html_routes(
"/ingress",
get(show_ingress_form).post(process_ingress_form),
)
.route("/queue", get(show_queue_tasks))
.route("/queue/:delivery_tag", post(delete_task))
.route("/account", get(show_account_page))
.route("/set-api-key", post(set_api_key))
.route("/delete-account", delete(delete_account))

View File

@@ -1,8 +1,11 @@
use std::sync::Arc;
use futures::StreamExt;
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::{
ingress::content_processor::ContentProcessor,
rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError},
ingress::{content_processor::ContentProcessor, jobqueue::JobQueue},
storage::db::SurrealDbClient,
utils::config::get_config,
};
@@ -15,49 +18,56 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.try_init()
.ok();
info!("Starting RabbitMQ consumer");
let config = get_config()?;
// Set up RabbitMQ config
let rabbitmq_config = RabbitMQConfig {
amqp_addr: config.rabbitmq_address.clone(),
exchange: config.rabbitmq_exchange.clone(),
queue: config.rabbitmq_queue.clone(),
routing_key: config.rabbitmq_routing_key.clone(),
};
let job_queue = JobQueue::new(Arc::new(
SurrealDbClient::new(
&config.surrealdb_address,
&config.surrealdb_username,
&config.surrealdb_password,
&config.surrealdb_namespace,
&config.surrealdb_database,
)
.await?,
));
// Create a RabbitMQ consumer
let consumer = RabbitMQConsumer::new(&rabbitmq_config, true).await?;
let content_processor = ContentProcessor::new(&config).await?;
// Start consuming messages
loop {
match consumer.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
// Get the TextContent
let text_content = ingress.to_text_content().await?;
// First, check for any unfinished jobs
let unfinished_jobs = job_queue.get_unfinished_jobs().await?;
// Initialize ContentProcessor which handles LLM analysis and storage
let content_processor = ContentProcessor::new(&config).await?;
if !unfinished_jobs.is_empty() {
info!("Found {} unfinished jobs", unfinished_jobs.len());
// Begin processing of TextContent
content_processor.process(&text_content).await?;
// Remove from queue
consumer.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
for job in unfinished_jobs {
if let Err(e) = job_queue.process_job(job.clone(), &content_processor).await {
error!("Error processing job {}: {}", job.id, e);
}
}
}
}
Ok(())
// If no unfinished jobs, start listening for new ones
info!("Listening for new jobs...");
let mut job_stream = job_queue.listen_for_jobs().await?;
while let Some(notification) = job_stream.next().await {
match notification {
Ok(notification) => {
info!("Received new job: {}", notification.data.id);
if let Err(e) = job_queue
.process_job(notification.data, &content_processor)
.await
{
error!("Error processing job: {}", e);
}
}
Err(e) => error!("Error in job notification: {}", e),
}
}
// If we reach here, the stream has ended (connection lost?)
error!("Job stream ended unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

View File

@@ -13,9 +13,7 @@ use serde_json::json;
use thiserror::Error;
use tokio::task::JoinError;
use crate::{
rabbitmq::RabbitMQError, storage::types::file_info::FileError, utils::mailer::EmailError,
};
use crate::{storage::types::file_info::FileError, utils::mailer::EmailError};
// Core internal errors
#[derive(Error, Debug)]
@@ -24,8 +22,6 @@ pub enum AppError {
Database(#[from] surrealdb::Error),
#[error("OpenAI error: {0}")]
OpenAI(#[from] OpenAIError),
#[error("RabbitMQ error: {0}")]
RabbitMQ(#[from] RabbitMQError),
#[error("File error: {0}")]
File(#[from] FileError),
#[error("Email error: {0}")]
@@ -50,6 +46,8 @@ pub enum AppError {
Reqwest(#[from] reqwest::Error),
#[error("Tiktoken error: {0}")]
Tiktoken(#[from] anyhow::Error),
#[error("Ingress Processing error: {0}")]
Processing(String),
}
// API-specific errors

169
src/ingress/jobqueue.rs Normal file
View File

@@ -0,0 +1,169 @@
use futures::Stream;
use std::{
sync::Arc,
time::{SystemTime, UNIX_EPOCH},
};
use surrealdb::{opt::PatchOp, Error, Notification};
use tracing::{error, info};
use crate::{
error::AppError,
storage::{
db::{store_item, SurrealDbClient},
types::{
job::{Job, JobStatus},
StoredObject,
},
},
};
use super::{content_processor::ContentProcessor, types::ingress_object::IngressObject};
pub struct JobQueue {
db: Arc<SurrealDbClient>,
}
const MAX_ATTEMPTS: u32 = 3;
impl JobQueue {
pub fn new(db: Arc<SurrealDbClient>) -> Self {
Self { db }
}
/// Creates a new job and stores it in the database
pub async fn enqueue(&self, content: IngressObject, user_id: String) -> Result<Job, AppError> {
let job = Job::new(content, user_id).await;
store_item(&self.db, job.clone()).await?;
Ok(job)
}
/// Gets all jobs for a specific user
pub async fn get_user_jobs(&self, user_id: &str) -> Result<Vec<Job>, AppError> {
let jobs: Vec<Job> = self
.db
.query("SELECT * FROM job WHERE user_id = $user_id ORDER BY created_at DESC")
.bind(("user_id", user_id.to_string()))
.await?
.take(0)?;
Ok(jobs)
}
pub async fn delete_job(&self, id: &str, user_id: &str) -> Result<(), AppError> {
// First, validate that the job exists and belongs to the user
let job: Option<Job> = self
.db
.query("SELECT * FROM job WHERE id = $id AND user_id = $user_id")
.bind(("id", id.to_string()))
.bind(("user_id", user_id.to_string()))
.await?
.take(0)?;
// If no job is found or it doesn't belong to the user, return Unauthorized
if job.is_none() {
error!("Unauthorized attempt to delete job {id} by user {user_id}");
return Err(AppError::Auth("Not authorized to delete this job".into()));
}
info!("Deleting job {id} for user {user_id}");
// If validation passes, delete the job
let _deleted: Option<Job> = self
.db
.delete((Job::table_name(), id))
.await
.map_err(AppError::Database)?;
Ok(())
}
/// Update status for job
pub async fn update_status(
&self,
id: &str,
status: JobStatus,
) -> Result<Option<Job>, AppError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
let status_value =
serde_json::to_value(status).map_err(|e| AppError::LLMParsing(e.to_string()))?;
let job: Option<Job> = self
.db
.update((Job::table_name(), id))
.patch(PatchOp::replace("/status", status_value))
.patch(PatchOp::replace("/updated_at", now))
.await?;
Ok(job)
}
/// Listen for new jobs
pub async fn listen_for_jobs(
&self,
) -> Result<impl Stream<Item = Result<Notification<Job>, Error>>, Error> {
self.db.select("job").live().await
}
pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
let jobs: Vec<Job> = self
.db
.query(
"SELECT * FROM job WHERE status.Created = true OR (status.InProgress.attempts < $max_attempts) ORDER BY created_at ASC")
.bind(("max_attempts", MAX_ATTEMPTS))
.await?
.take(0)?;
Ok(jobs)
}
// Method to process a single job
pub async fn process_job(
&self,
job: Job,
processor: &ContentProcessor,
) -> Result<(), AppError> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
let current_attempts = match job.status {
JobStatus::InProgress { attempts, .. } => attempts + 1,
_ => 1,
};
// Update status to InProgress with attempt count
self.update_status(
&job.id,
JobStatus::InProgress {
attempts: current_attempts,
last_attempt: now.clone(),
},
)
.await?;
let text_content = job.content.to_text_content().await?;
match processor.process(&text_content).await {
Ok(_) => {
self.update_status(&job.id, JobStatus::Completed).await?;
Ok(())
}
Err(e) => {
if current_attempts >= MAX_ATTEMPTS {
self.update_status(
&job.id,
JobStatus::Error(format!("Max attempts reached: {}", e)),
)
.await?;
}
Err(AppError::Processing(e.to_string()))
}
}
}
}

View File

@@ -1,3 +1,4 @@
pub mod analysis;
pub mod content_processor;
pub mod jobqueue;
pub mod types;

View File

@@ -1,2 +1,3 @@
pub mod ingress_input;
pub mod ingress_object;
pub mod queue_task;

View File

@@ -0,0 +1,13 @@
use crate::ingress::types::ingress_object::IngressObject;
use serde::Serialize;
#[derive(Serialize)]
pub struct QueueTask {
pub delivery_tag: u64,
pub content: IngressObject,
}
#[derive(Serialize)]
pub struct QueueTaskResponse {
pub tasks: Vec<QueueTask>,
}

View File

@@ -1,6 +1,5 @@
pub mod error;
pub mod ingress;
pub mod rabbitmq;
pub mod retrieval;
pub mod server;
pub mod storage;

View File

@@ -1,194 +0,0 @@
use futures::StreamExt;
use lapin::{message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue};
use crate::ingress::types::ingress_object::IngressObject;
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
/// Struct to consume messages from RabbitMQ.
pub struct RabbitMQConsumer {
common: RabbitMQCommon,
pub queue: Queue,
consumer: Option<Consumer>,
}
impl RabbitMQConsumer {
/// Creates a new 'RabbitMQConsumer' instance which sets up a rabbitmq client,
/// declares a exchange if needed, declares and binds a queue and initializes the consumer
///
/// # Arguments
/// * `config` - A initialized RabbitMQConfig containing required configurations
/// * `start_consuming` - Set to true to start consuming messages
///
/// # Returns
/// * `Result<Self, RabbitMQError>` - The created client or an error.
pub async fn new(
config: &RabbitMQConfig,
start_consuming: bool,
) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
// Passively declare the exchange (it should already exist)
common.declare_exchange(config, true).await?;
// Declare queue and bind it to the channel
let queue = Self::declare_queue(&common.channel, config).await?;
Self::bind_queue(&common.channel, &config.exchange, &queue, config).await?;
// Initialize the consumer
let consumer = if start_consuming {
Some(Self::initialize_consumer(&common.channel, config).await?)
} else {
None
};
Ok(Self {
common,
queue,
consumer,
})
}
/// Sets up the consumer based on the channel and `RabbitMQConfig`.
///
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<Consumer, RabbitMQError>` - The initialized consumer or error
async fn initialize_consumer(
channel: &Channel,
config: &RabbitMQConfig,
) -> Result<Consumer, RabbitMQError> {
channel
.basic_consume(
&config.queue,
"consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| RabbitMQError::InitializeConsumerError(e.to_string()))
}
/// Operation to get the current queue length
/// Will redeclare queue to get a updated number
pub async fn get_queue_length(&self) -> Result<u32, RabbitMQError> {
let queue_info = self
.common
.channel
.queue_declare(
&self.queue.name().to_string(),
QueueDeclareOptions {
durable: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.map_err(|e| RabbitMQError::QueueError(e.to_string()))?;
Ok(queue_info.message_count())
}
/// Declares the queue based on the channel and `RabbitMQConfig`.
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<Queue, RabbitMQError>` - The initialized queue or error
async fn declare_queue(
channel: &Channel,
config: &RabbitMQConfig,
) -> Result<Queue, RabbitMQError> {
channel
.queue_declare(
&config.queue,
QueueDeclareOptions {
durable: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
/// Binds the queue based on the channel, declared exchange, queue and `RabbitMQConfig`.
/// # Arguments
/// * `channel` - Lapin Channel.
/// * `exchange` - String value of the exchange name
/// * `queue` - Lapin queue thats declared
/// * `config` - A initialized RabbitMQConfig containing required information
///
/// # Returns
/// * `Result<(), RabbitMQError>` - Ok or error
async fn bind_queue(
channel: &Channel,
exchange: &str,
queue: &Queue,
config: &RabbitMQConfig,
) -> Result<(), RabbitMQError> {
channel
.queue_bind(
queue.name().as_str(),
exchange,
&config.routing_key,
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
/// Consumes a message and returns the message along with delivery details.
///
/// # Arguments
/// * `&self` - A reference to self
///
/// # Returns
/// `IngressObject` - The object containing content and metadata.
/// `Delivery` - A delivery reciept, required to ack or nack the delivery.
pub async fn consume(&self) -> Result<(IngressObject, Delivery), RabbitMQError> {
// Get consumer or return error if not initialized
let consumer: &lapin::Consumer = self.consumer.as_ref().ok_or_else(|| {
RabbitMQError::ConsumeError(
"Consumer not initialized. Call new() with start_consuming=true".to_string(),
)
})?;
// Receive the next message
let delivery = consumer
.clone()
.next()
.await
.ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))?
.map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?;
// Deserialize the message payload into IngressContent
let ingress: IngressObject = serde_json::from_slice(&delivery.data)
.map_err(|e| RabbitMQError::ConsumeError(format!("Deserialization Error: {}", e)))?;
Ok((ingress, delivery))
}
/// Acknowledges the message after processing
///
/// # Arguments
/// * `self` - Reference to self
/// * `delivery` - Delivery reciept
///
/// # Returns
/// * `Result<(), RabbitMQError>` - Ok or error
pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> {
self.common
.channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
.await
.map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?;
Ok(())
}
}

View File

@@ -1,107 +0,0 @@
pub mod consumer;
pub mod publisher;
use axum::async_trait;
use lapin::{
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties,
ExchangeKind,
};
use thiserror::Error;
use tracing::debug;
/// Possible errors related to RabbitMQ operations.
#[derive(Error, Debug)]
pub enum RabbitMQError {
#[error("Failed to connect to RabbitMQ: {0}")]
ConnectionError(#[from] lapin::Error),
#[error("Channel error: {0}")]
ChannelError(String),
#[error("Consume error: {0}")]
ConsumeError(String),
#[error("Exchange error: {0}")]
ExchangeError(String),
#[error("Publish error: {0}")]
PublishError(String),
#[error("Error initializing consumer: {0}")]
InitializeConsumerError(String),
#[error("Queue error: {0}")]
QueueError(String),
}
/// Struct containing the information required to set up a client and connection.
#[derive(Clone)]
pub struct RabbitMQConfig {
pub amqp_addr: String,
pub exchange: String,
pub queue: String,
pub routing_key: String,
}
/// Struct containing the connection and channel of a client
pub struct RabbitMQCommon {
pub connection: Connection,
pub channel: Channel,
}
/// Defines the behavior for RabbitMQCommon client operations.
#[cfg_attr(test, mockall::automock)]
#[async_trait]
pub trait RabbitMQCommonTrait: Send + Sync {
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError>;
async fn declare_exchange(
&self,
config: &RabbitMQConfig,
passive: bool,
) -> Result<(), RabbitMQError>;
}
impl RabbitMQCommon {
/// Sets up a new RabbitMQ client or error
///
/// # Arguments
/// * `RabbitMQConfig` - Configuration object with required information
///
/// # Returns
/// * `self` - A initialized instance of the client
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let connection = Self::create_connection(config).await?;
let channel = connection.create_channel().await?;
Ok(Self {
connection,
channel,
})
}
}
#[async_trait]
impl RabbitMQCommonTrait for RabbitMQCommon {
/// Function to set up the connection
async fn create_connection(config: &RabbitMQConfig) -> Result<Connection, RabbitMQError> {
debug!("Creating connection");
Connection::connect(&config.amqp_addr, ConnectionProperties::default())
.await
.map_err(RabbitMQError::ConnectionError)
}
/// Function to declare the exchange required
async fn declare_exchange(
&self,
config: &RabbitMQConfig,
passive: bool,
) -> Result<(), RabbitMQError> {
debug!("Declaring exchange");
self.channel
.exchange_declare(
&config.exchange,
ExchangeKind::Topic,
ExchangeDeclareOptions {
passive,
durable: true,
..ExchangeDeclareOptions::default()
},
FieldTable::default(),
)
.await
.map_err(|e| RabbitMQError::ExchangeError(e.to_string()))
}
}

View File

@@ -1,82 +0,0 @@
use lapin::{options::*, publisher_confirm::Confirmation, BasicProperties};
use crate::ingress::types::ingress_object::IngressObject;
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
use tracing::{error, info};
/// Struct to publish messages to RabbitMQ.
pub struct RabbitMQProducer {
common: RabbitMQCommon,
exchange_name: String,
routing_key: String,
}
impl RabbitMQProducer {
/// Creates a new `RabbitMQProducer` instance which sets up a RabbitMQ client,
/// declares a exchange if needed.
///
/// # Arguments
/// * `config` - A initialized RabbitMQConfig containing required configurations
///
/// # Returns
/// * `Result<Self, RabbitMQError>` - The created client or an error.
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?;
Ok(Self {
common,
exchange_name: config.exchange.clone(),
routing_key: config.routing_key.clone(),
})
}
/// Publishes an IngressObject to RabbitMQ after serializing it to JSON.
///
/// # Arguments
/// * `self` - Reference to self
/// * `ingress_object` - A initialized IngressObject
///
/// # Returns
/// * `Result<Confirmation, RabbitMQError>` - Confirmation of sent message or error
pub async fn publish(
&self,
ingress_object: IngressObject,
) -> Result<Confirmation, RabbitMQError> {
// Serialize IngressObject to JSON
let payload = serde_json::to_vec(&ingress_object).map_err(|e| {
error!("Serialization Error: {}", e);
RabbitMQError::PublishError(format!("Serialization Error: {}", e))
})?;
// Publish the serialized payload to RabbitMQ
let confirmation = self
.common
.channel
.basic_publish(
&self.exchange_name,
&self.routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| {
error!("Publish Error: {}", e);
RabbitMQError::PublishError(format!("Publish Error: {}", e))
})?
.await
.map_err(|e| {
error!("Publish Confirmation Error: {}", e);
RabbitMQError::PublishError(format!("Publish Confirmation Error: {}", e))
})?;
info!(
"Published IngressObject to exchange '{}' with routing key '{}'",
self.exchange_name, self.routing_key
);
Ok(confirmation)
}
}

View File

@@ -1,5 +1,4 @@
use crate::rabbitmq::consumer::RabbitMQConsumer;
use crate::rabbitmq::publisher::RabbitMQProducer;
use crate::ingress::jobqueue::JobQueue;
use crate::storage::db::SurrealDbClient;
use crate::utils::mailer::Mailer;
use minijinja_autoreload::AutoReloader;
@@ -10,10 +9,9 @@ pub mod routes;
#[derive(Clone)]
pub struct AppState {
pub rabbitmq_producer: Arc<RabbitMQProducer>,
pub rabbitmq_consumer: Arc<RabbitMQConsumer>,
pub surreal_db_client: Arc<SurrealDbClient>,
pub openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
pub templates: Arc<AutoReloader>,
pub mailer: Arc<Mailer>,
pub job_queue: Arc<JobQueue>,
}

View File

@@ -44,7 +44,7 @@ pub async fn ingress_data(
let futures: Vec<_> = ingress_objects
.into_iter()
.map(|object| state.rabbitmq_producer.publish(object))
.map(|object| state.job_queue.enqueue(object.clone(), user.id.clone()))
.collect();
try_join_all(futures).await.map_err(AppError::from)?;

View File

@@ -0,0 +1,34 @@
use crate::{error::ApiError, server::AppState, storage::types::user::User};
use axum::{
extract::{Path, State},
http::StatusCode,
response::IntoResponse,
Extension, Json,
};
pub async fn get_queue_tasks(
State(state): State<AppState>,
Extension(user): Extension<User>,
) -> Result<impl IntoResponse, ApiError> {
let user_tasks = state
.job_queue
.get_user_jobs(&user.id)
.await
.map_err(ApiError::from)?;
Ok(Json(user_tasks))
}
pub async fn delete_queue_task(
State(state): State<AppState>,
Extension(user): Extension<User>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, ApiError> {
state
.job_queue
.delete_job(&id, &user.id)
.await
.map_err(ApiError::from)?;
Ok(StatusCode::OK)
}

View File

@@ -1,3 +1,4 @@
pub mod ingress;
pub mod ingress_task;
pub mod query;
pub mod queue_length;

View File

@@ -4,6 +4,7 @@ use tracing::info;
use crate::{
error::{ApiError, AppError},
server::AppState,
storage::{db::get_all_stored_items, types::job::Job},
};
pub async fn queue_length_handler(
@@ -11,11 +12,10 @@ pub async fn queue_length_handler(
) -> Result<impl IntoResponse, ApiError> {
info!("Getting queue length");
let queue_length = state
.rabbitmq_consumer
.get_queue_length()
let queue_length = get_all_stored_items::<Job>(&state.surreal_db_client)
.await
.map_err(AppError::from)?;
.map_err(AppError::from)?
.len();
info!("Queue length: {}", queue_length);

View File

@@ -29,10 +29,11 @@ pub async fn index_handler(
let queue_length = match auth.current_user.is_some() {
true => state
.rabbitmq_consumer
.get_queue_length()
.job_queue
.get_user_jobs(&auth.current_user.clone().unwrap().id)
.await
.map_err(|e| HtmlError::new(AppError::from(e), state.templates.clone()))?,
.map_err(|e| HtmlError::new(e, state.templates.clone()))?
.len(),
false => 0,
};
@@ -47,7 +48,7 @@ pub async fn index_handler(
let output = render_template(
IndexData::template_name(),
IndexData {
queue_length,
queue_length: queue_length.try_into().unwrap(),
gdpr_accepted,
user: auth.current_user,
},

View File

@@ -80,7 +80,7 @@ pub async fn process_ingress_form(
let futures: Vec<_> = ingress_objects
.into_iter()
.map(|object| state.rabbitmq_producer.publish(object))
.map(|object| state.job_queue.enqueue(object.clone(), user.id.clone()))
.collect();
try_join_all(futures)

View File

@@ -0,0 +1,61 @@
use crate::{
error::{AppError, HtmlError},
page_data,
server::AppState,
storage::types::{job::Job, user::User},
};
use axum::{
extract::{Path, State},
response::{Html, IntoResponse, Redirect},
};
use axum_session_auth::AuthSession;
use axum_session_surreal::SessionSurrealPool;
use surrealdb::{engine::any::Any, Surreal};
use super::render_template;
page_data!(ShowQueueTasks, "queue_tasks.html", {jobs: Vec<Job>});
pub async fn show_queue_tasks(
State(state): State<AppState>,
auth: AuthSession<User, String, SessionSurrealPool<Any>, Surreal<Any>>,
) -> Result<impl IntoResponse, HtmlError> {
let user = match auth.current_user {
Some(user) => user,
None => return Ok(Redirect::to("/signin").into_response()),
};
let jobs = state
.job_queue
.get_user_jobs(&user.id)
.await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?;
let rendered = render_template(
ShowQueueTasks::template_name(),
ShowQueueTasks { jobs },
state.templates.clone(),
)
.map_err(|e| HtmlError::new(AppError::from(e), state.templates.clone()))?;
Ok(rendered.into_response())
}
pub async fn delete_task(
State(state): State<AppState>,
auth: AuthSession<User, String, SessionSurrealPool<Any>, Surreal<Any>>,
Path(id): Path<String>,
) -> Result<impl IntoResponse, HtmlError> {
let user = match auth.current_user {
Some(user) => user,
None => return Ok(Redirect::to("/signin").into_response()),
};
state
.job_queue
.delete_job(&id, &user.id)
.await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?;
Ok(Html("").into_response())
}

View File

@@ -8,6 +8,7 @@ pub mod documentation;
pub mod gdpr;
pub mod index;
pub mod ingress;
pub mod ingress_tasks;
pub mod search_result;
pub mod signin;
pub mod signout;

View File

@@ -40,6 +40,16 @@ impl SurrealDbClient {
self.client.query("DEFINE INDEX idx_embedding_chunks ON text_chunk FIELDS embedding HNSW DIMENSION 1536").await?;
self.client.query("DEFINE INDEX idx_embedding_entities ON knowledge_entity FIELDS embedding HNSW DIMENSION 1536").await?;
self.client
.query("DEFINE INDEX idx_job_status ON job FIELDS status")
.await?;
self.client
.query("DEFINE INDEX idx_job_user ON job FIELDS user_id")
.await?;
self.client
.query("DEFINE INDEX idx_job_created ON job FIELDS created_at")
.await?;
Ok(())
}

43
src/storage/types/job.rs Normal file
View File

@@ -0,0 +1,43 @@
use std::time::{SystemTime, UNIX_EPOCH};
use uuid::Uuid;
use crate::{ingress::types::ingress_object::IngressObject, stored_object};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobStatus {
Created,
InProgress {
attempts: u32,
last_attempt: String, // timestamp
},
Completed,
Error(String),
Cancelled,
}
stored_object!(Job, "job", {
content: IngressObject,
status: JobStatus,
created_at: String,
updated_at: String,
user_id: String
});
impl Job {
pub async fn new(content: IngressObject, user_id: String) -> Self {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
.to_string();
Self {
id: Uuid::new_v4().to_string(),
content,
status: JobStatus::Created,
created_at: now.clone(),
updated_at: now,
user_id,
}
}
}

View File

@@ -1,6 +1,7 @@
use axum::async_trait;
use serde::{Deserialize, Serialize};
pub mod file_info;
pub mod job;
pub mod knowledge_entity;
pub mod knowledge_relationship;
pub mod text_chunk;

View File

@@ -5,10 +5,6 @@ pub struct AppConfig {
pub smtp_username: String,
pub smtp_password: String,
pub smtp_relayer: String,
pub rabbitmq_address: String,
pub rabbitmq_exchange: String,
pub rabbitmq_queue: String,
pub rabbitmq_routing_key: String,
pub surrealdb_address: String,
pub surrealdb_username: String,
pub surrealdb_password: String,
@@ -25,10 +21,6 @@ pub fn get_config() -> Result<AppConfig, ConfigError> {
smtp_username: config.get_string("SMTP_USERNAME")?,
smtp_password: config.get_string("SMTP_PASSWORD")?,
smtp_relayer: config.get_string("SMTP_RELAYER")?,
rabbitmq_address: config.get_string("RABBITMQ_ADDRESS")?,
rabbitmq_exchange: config.get_string("RABBITMQ_EXCHANGE")?,
rabbitmq_queue: config.get_string("RABBITMQ_QUEUE")?,
rabbitmq_routing_key: config.get_string("RABBITMQ_ROUTING_KEY")?,
surrealdb_address: config.get_string("SURREALDB_ADDRESS")?,
surrealdb_username: config.get_string("SURREALDB_USERNAME")?,
surrealdb_password: config.get_string("SURREALDB_PASSWORD")?,

View File

@@ -0,0 +1,39 @@
{% extends "body_base.html" %}
{% block main %}
<div class="container mx-auto p-4">
<h1 class="text-2xl font-bold mb-4">Queue Tasks</h1>
{% if tasks|length == 0 %}
<div class="alert alert-info">
<span>No tasks in queue</span>
</div>
{% else %}
<div class="grid gap-4">
{% for (task, tag) in tasks %}
<div class="card bg-base-200 shadow-xl">
<div class="card-body">
{% if task is object("Url") %}
<h2 class="card-title">URL Task</h2>
<p>URL: {{ task.url }}</p>
{% elif task is object("Text") %}
<h2 class="card-title">Text Task</h2>
<p>Text: {{ task.text }}</p>
{% elif task is object("File") %}
<h2 class="card-title">File Task</h2>
<p>File: {{ task.file_info.original_name }}</p>
{% endif %}
<p>Instructions: {{ task.instructions }}</p>
<p>Category: {{ task.category }}</p>
<div class="card-actions justify-end">
<form action="/queue/{{ tag }}" method="POST">
<button class="btn btn-error">Delete Task</button>
</form>
</div>
</div>
</div>
{% endfor %}
</div>
{% endif %}
</div>
{% endblock %}