mirror of
https://github.com/perstarkse/minne.git
synced 2026-03-26 11:21:35 +01:00
feat: job queue html
This commit is contained in:
@@ -1,11 +1,18 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::StreamExt;
|
||||
use surrealdb::Action;
|
||||
use tracing::{error, info};
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use zettle_db::{
|
||||
ingress::{content_processor::ContentProcessor, jobqueue::JobQueue},
|
||||
storage::db::SurrealDbClient,
|
||||
ingress::{
|
||||
content_processor::ContentProcessor,
|
||||
jobqueue::{self, JobQueue, MAX_ATTEMPTS},
|
||||
},
|
||||
storage::{
|
||||
db::{get_item, SurrealDbClient},
|
||||
types::job::{Job, JobStatus},
|
||||
},
|
||||
utils::config::get_config,
|
||||
};
|
||||
|
||||
@@ -54,12 +61,66 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
while let Some(notification) = job_stream.next().await {
|
||||
match notification {
|
||||
Ok(notification) => {
|
||||
info!("Received new job: {}", notification.data.id);
|
||||
if let Err(e) = job_queue
|
||||
.process_job(notification.data, &content_processor)
|
||||
.await
|
||||
{
|
||||
error!("Error processing job: {}", e);
|
||||
info!("Received notification: {:?}", notification);
|
||||
|
||||
match notification.action {
|
||||
Action::Create => {
|
||||
if let Err(e) = job_queue
|
||||
.process_job(notification.data, &content_processor)
|
||||
.await
|
||||
{
|
||||
error!("Error processing job: {}", e);
|
||||
}
|
||||
}
|
||||
Action::Update => {
|
||||
match notification.data.status {
|
||||
JobStatus::Completed
|
||||
| JobStatus::Error(_)
|
||||
| JobStatus::Cancelled => {
|
||||
info!(
|
||||
"Skipping already completed/error/cancelled job: {}",
|
||||
notification.data.id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
JobStatus::InProgress { attempts, .. } => {
|
||||
// Only process if this is a retry after an error, not our own update
|
||||
if let Ok(Some(current_job)) =
|
||||
get_item::<Job>(&job_queue.db.client, ¬ification.data.id)
|
||||
.await
|
||||
{
|
||||
match current_job.status {
|
||||
JobStatus::Error(_) if attempts < MAX_ATTEMPTS => {
|
||||
// This is a retry after an error
|
||||
if let Err(e) = job_queue
|
||||
.process_job(current_job, &content_processor)
|
||||
.await
|
||||
{
|
||||
error!("Error processing job retry: {}", e);
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
info!(
|
||||
"Skipping in-progress update for job: {}",
|
||||
notification.data.id
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
JobStatus::Created => {
|
||||
// Shouldn't happen with Update action, but process if it does
|
||||
if let Err(e) = job_queue
|
||||
.process_job(notification.data, &content_processor)
|
||||
.await
|
||||
{
|
||||
error!("Error processing job: {}", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {} // Ignore other actions
|
||||
}
|
||||
}
|
||||
Err(e) => error!("Error in job notification: {}", e),
|
||||
|
||||
Reference in New Issue
Block a user