feat: job queue html

This commit is contained in:
Per Stark
2025-01-10 20:37:16 +01:00
parent 0f8a83429a
commit 4b7eabe809
6 changed files with 155 additions and 62 deletions

File diff suppressed because one or more lines are too long

View File

@@ -158,7 +158,7 @@ fn html_routes(
get(show_ingress_form).post(process_ingress_form), get(show_ingress_form).post(process_ingress_form),
) )
.route("/queue", get(show_queue_tasks)) .route("/queue", get(show_queue_tasks))
.route("/queue/:delivery_tag", post(delete_task)) .route("/queue/:delivery_tag", delete(delete_task))
.route("/account", get(show_account_page)) .route("/account", get(show_account_page))
.route("/set-api-key", post(set_api_key)) .route("/set-api-key", post(set_api_key))
.route("/delete-account", delete(delete_account)) .route("/delete-account", delete(delete_account))

View File

@@ -1,11 +1,18 @@
use std::sync::Arc; use std::sync::Arc;
use futures::StreamExt; use futures::StreamExt;
use surrealdb::Action;
use tracing::{error, info}; use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::{ use zettle_db::{
ingress::{content_processor::ContentProcessor, jobqueue::JobQueue}, ingress::{
storage::db::SurrealDbClient, content_processor::ContentProcessor,
jobqueue::{self, JobQueue, MAX_ATTEMPTS},
},
storage::{
db::{get_item, SurrealDbClient},
types::job::{Job, JobStatus},
},
utils::config::get_config, utils::config::get_config,
}; };
@@ -54,12 +61,66 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
while let Some(notification) = job_stream.next().await { while let Some(notification) = job_stream.next().await {
match notification { match notification {
Ok(notification) => { Ok(notification) => {
info!("Received new job: {}", notification.data.id); info!("Received notification: {:?}", notification);
if let Err(e) = job_queue
.process_job(notification.data, &content_processor) match notification.action {
.await Action::Create => {
{ if let Err(e) = job_queue
error!("Error processing job: {}", e); .process_job(notification.data, &content_processor)
.await
{
error!("Error processing job: {}", e);
}
}
Action::Update => {
match notification.data.status {
JobStatus::Completed
| JobStatus::Error(_)
| JobStatus::Cancelled => {
info!(
"Skipping already completed/error/cancelled job: {}",
notification.data.id
);
continue;
}
JobStatus::InProgress { attempts, .. } => {
// Only process if this is a retry after an error, not our own update
if let Ok(Some(current_job)) =
get_item::<Job>(&job_queue.db.client, &notification.data.id)
.await
{
match current_job.status {
JobStatus::Error(_) if attempts < MAX_ATTEMPTS => {
// This is a retry after an error
if let Err(e) = job_queue
.process_job(current_job, &content_processor)
.await
{
error!("Error processing job retry: {}", e);
}
}
_ => {
info!(
"Skipping in-progress update for job: {}",
notification.data.id
);
continue;
}
}
}
}
JobStatus::Created => {
// Shouldn't happen with Update action, but process if it does
if let Err(e) = job_queue
.process_job(notification.data, &content_processor)
.await
{
error!("Error processing job: {}", e);
}
}
}
}
_ => {} // Ignore other actions
} }
} }
Err(e) => error!("Error in job notification: {}", e), Err(e) => error!("Error in job notification: {}", e),

View File

@@ -9,7 +9,7 @@ use tracing::{error, info};
use crate::{ use crate::{
error::AppError, error::AppError,
storage::{ storage::{
db::{store_item, SurrealDbClient}, db::{delete_item, get_item, store_item, SurrealDbClient},
types::{ types::{
job::{Job, JobStatus}, job::{Job, JobStatus},
StoredObject, StoredObject,
@@ -20,10 +20,10 @@ use crate::{
use super::{content_processor::ContentProcessor, types::ingress_object::IngressObject}; use super::{content_processor::ContentProcessor, types::ingress_object::IngressObject};
pub struct JobQueue { pub struct JobQueue {
db: Arc<SurrealDbClient>, pub db: Arc<SurrealDbClient>,
} }
const MAX_ATTEMPTS: u32 = 3; pub const MAX_ATTEMPTS: u32 = 3;
impl JobQueue { impl JobQueue {
pub fn new(db: Arc<SurrealDbClient>) -> Self { pub fn new(db: Arc<SurrealDbClient>) -> Self {
@@ -50,34 +50,22 @@ impl JobQueue {
} }
pub async fn delete_job(&self, id: &str, user_id: &str) -> Result<(), AppError> { pub async fn delete_job(&self, id: &str, user_id: &str) -> Result<(), AppError> {
// First, validate that the job exists and belongs to the user get_item::<Job>(&self.db.client, id)
let job: Option<Job> = self
.db
.query("SELECT * FROM job WHERE id = $id AND user_id = $user_id")
.bind(("id", id.to_string()))
.bind(("user_id", user_id.to_string()))
.await? .await?
.take(0)?; .filter(|job| job.user_id == user_id)
.ok_or_else(|| {
// If no job is found or it doesn't belong to the user, return Unauthorized error!("Unauthorized attempt to delete job {id} by user {user_id}");
if job.is_none() { AppError::Auth("Not authorized to delete this job".into())
error!("Unauthorized attempt to delete job {id} by user {user_id}"); })?;
return Err(AppError::Auth("Not authorized to delete this job".into()));
}
info!("Deleting job {id} for user {user_id}"); info!("Deleting job {id} for user {user_id}");
delete_item::<Job>(&self.db.client, id)
// If validation passes, delete the job
let _deleted: Option<Job> = self
.db
.delete((Job::table_name(), id))
.await .await
.map_err(AppError::Database)?; .map_err(AppError::Database)?;
Ok(()) Ok(())
} }
/// Update status for job
pub async fn update_status( pub async fn update_status(
&self, &self,
id: &str, id: &str,
@@ -89,13 +77,10 @@ impl JobQueue {
.as_millis() .as_millis()
.to_string(); .to_string();
let status_value =
serde_json::to_value(status).map_err(|e| AppError::LLMParsing(e.to_string()))?;
let job: Option<Job> = self let job: Option<Job> = self
.db .db
.update((Job::table_name(), id)) .update((Job::table_name(), id))
.patch(PatchOp::replace("/status", status_value)) .patch(PatchOp::replace("/status", status))
.patch(PatchOp::replace("/updated_at", now)) .patch(PatchOp::replace("/updated_at", now))
.await?; .await?;
@@ -109,14 +94,27 @@ impl JobQueue {
self.db.select("job").live().await self.db.select("job").live().await
} }
/// Get unfinished jobs, ie newly created and in progress up two times
pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> { pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
let jobs: Vec<Job> = self let jobs: Vec<Job> = self
.db .db
.query( .query(
"SELECT * FROM job WHERE status.Created = true OR (status.InProgress.attempts < $max_attempts) ORDER BY created_at ASC") "SELECT * FROM type::table($table)
WHERE
status = 'Created'
OR (
status.InProgress != NONE
AND status.InProgress.attempts < $max_attempts
)
ORDER BY created_at ASC",
)
.bind(("table", Job::table_name()))
.bind(("max_attempts", MAX_ATTEMPTS)) .bind(("max_attempts", MAX_ATTEMPTS))
.await? .await?
.take(0)?; .take(0)?;
println!("Unfinished jobs found: {}", jobs.len());
Ok(jobs) Ok(jobs)
} }

View File

@@ -2,7 +2,10 @@ use crate::{
error::{AppError, HtmlError}, error::{AppError, HtmlError},
page_data, page_data,
server::AppState, server::AppState,
storage::types::{job::Job, user::User}, storage::types::{
job::{Job, JobStatus},
user::User,
},
}; };
use axum::{ use axum::{
extract::{Path, State}, extract::{Path, State},
@@ -11,10 +14,11 @@ use axum::{
use axum_session_auth::AuthSession; use axum_session_auth::AuthSession;
use axum_session_surreal::SessionSurrealPool; use axum_session_surreal::SessionSurrealPool;
use surrealdb::{engine::any::Any, Surreal}; use surrealdb::{engine::any::Any, Surreal};
use tracing::info;
use super::render_template; use super::render_template;
page_data!(ShowQueueTasks, "queue_tasks.html", {jobs: Vec<Job>}); page_data!(ShowQueueTasks, "queue_tasks.html", {user : User,jobs: Vec<Job>});
pub async fn show_queue_tasks( pub async fn show_queue_tasks(
State(state): State<AppState>, State(state): State<AppState>,
@@ -31,9 +35,16 @@ pub async fn show_queue_tasks(
.await .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;
for job in &jobs {
match job.status {
JobStatus::Created => info!("Found a created job"),
_ => continue,
}
}
let rendered = render_template( let rendered = render_template(
ShowQueueTasks::template_name(), ShowQueueTasks::template_name(),
ShowQueueTasks { jobs }, ShowQueueTasks { jobs, user },
state.templates.clone(), state.templates.clone(),
) )
.map_err(|e| HtmlError::new(AppError::from(e), state.templates.clone()))?; .map_err(|e| HtmlError::new(AppError::from(e), state.templates.clone()))?;

View File

@@ -1,37 +1,60 @@
{% extends "body_base.html" %} {% extends "body_base.html" %}
{% block main %} {% block main %}
<div class="container mx-auto p-4"> <div class="container mx-auto p-4">
<h1 class="text-2xl font-bold mb-4">Queue Tasks</h1> <h1 class="text-2xl font-bold mb-4">Active Tasks</h1>
{% if not jobs %}
{% if tasks|length == 0 %}
<div class="alert alert-info"> <div class="alert alert-info">
<span>No tasks in queue</span> <span>No active tasks</span>
</div> </div>
{% else %} {% else %}
<div class="grid gap-4"> <div class="grid gap-4">
{% for (task, tag) in tasks %} {% for job in jobs %}
<div class="card bg-base-200 shadow-xl"> {% if job.status == "Created" or job.status is mapping and job.status.InProgress %}
<div class="card bg-base-200 shadow-xl" id="job-card-{{ job.id }}">
<div class="card-body"> <div class="card-body">
{% if task is object("Url") %} <div class="card-title">
<h2 class="card-title">URL Task</h2> {% if job.content.Url %}
<p>URL: {{ task.url }}</p> <h2>URL Task</h2>
{% elif task is object("Text") %} <p class="text-sm text-gray-500 break-all">{{ job.content.Url.url }}</p>
<h2 class="card-title">Text Task</h2> {% elif job.content.File %}
<p>Text: {{ task.text }}</p> <h2>File Task</h2>
{% elif task is object("File") %} <p class="text-sm text-gray-500">{{ job.content.File.file_info.path }}</p>
<h2 class="card-title">File Task</h2> {% elif job.content.Text %}
<p>File: {{ task.file_info.original_name }}</p> <h2>Text Task</h2>
{% endif %} <p class="text-sm text-gray-500">{{ job.content.Text.text }}</p>
<p>Instructions: {{ task.instructions }}</p> {% endif %}
<p>Category: {{ task.category }}</p> </div>
<div class="card-actions justify-end"> <div class="space-y-2">
<form action="/queue/{{ tag }}" method="POST"> <p><span class="font-medium">Status:</span>
<button class="btn btn-error">Delete Task</button> {% if job.status == "Created" %}
Created
{% elif job.status.InProgress %}
In Progress
{% endif %}
</p>
{% if job.status.InProgress %}
<p><span class="font-medium">Attempts:</span> {{ job.status.InProgress.attempts }}</p>
<p><span class="font-medium">Last Attempt:</span>
{{ job.status.InProgress.last_attempt }}</p>
{% endif %}
<p><span class="font-medium">Category:</span>
{% if job.content.Url %}
{{ job.content.Url.category }}
{% elif job.content.File %}
{{ job.content.File.category }}
{% elif job.content.Text %}
{{ job.content.Text.category }}
{% endif %}
</p>
</div>
<div class="card-actions justify-end mt-4">
<form hx-delete="/queue/{{ job.id }}" hx-target="#job-card-{{ job.id }}" hx-swap="outerHTML">
<button class="btn btn-error">Cancel Task</button>
</form> </form>
</div> </div>
</div> </div>
</div> </div>
{% endif %}
{% endfor %} {% endfor %}
</div> </div>
{% endif %} {% endif %}