feat: task archive

fix: simplified
This commit is contained in:
Per Stark
2025-10-13 15:02:22 +02:00
parent 41fc7bb99c
commit aa0b1462a1
7 changed files with 404 additions and 73 deletions

View File

@@ -73,7 +73,6 @@ pub struct TaskErrorInfo {
#[derive(Debug, Clone, Copy)]
enum TaskTransition {
Reserve,
StartProcessing,
Succeed,
Fail,
@@ -85,7 +84,6 @@ enum TaskTransition {
impl TaskTransition {
fn as_str(&self) -> &'static str {
match self {
TaskTransition::Reserve => "reserve",
TaskTransition::StartProcessing => "start_processing",
TaskTransition::Succeed => "succeed",
TaskTransition::Fail => "fail",
@@ -162,53 +160,6 @@ fn invalid_transition(state: &TaskState, event: TaskTransition) -> AppError {
))
}
fn compute_next_state(state: &TaskState, event: TaskTransition) -> Result<TaskState, AppError> {
use lifecycle::*;
match (state, event) {
(TaskState::Pending, TaskTransition::Reserve) => pending()
.reserve()
.map(|_| TaskState::Reserved)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Failed, TaskTransition::Reserve) => failed()
.reserve()
.map(|_| TaskState::Reserved)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::StartProcessing) => reserved()
.start_processing()
.map(|_| TaskState::Processing)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Succeed) => processing()
.succeed()
.map(|_| TaskState::Succeeded)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Fail) => processing()
.fail()
.map(|_| TaskState::Failed)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Pending, TaskTransition::Cancel) => pending()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::Cancel) => reserved()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Cancel) => processing()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Failed, TaskTransition::DeadLetter) => failed()
.deadletter()
.map(|_| TaskState::DeadLetter)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::Release) => reserved()
.release()
.map(|_| TaskState::Pending)
.map_err(|_| invalid_transition(state, event)),
_ => Err(invalid_transition(state, event)),
}
}
stored_object!(IngestionTask, "ingestion_task", {
content: IngestionPayload,
state: TaskState,
@@ -284,8 +235,8 @@ impl IngestionTask {
now: chrono::DateTime<chrono::Utc>,
lease_duration: Duration,
) -> Result<Option<IngestionTask>, AppError> {
debug_assert!(compute_next_state(&TaskState::Pending, TaskTransition::Reserve).is_ok());
debug_assert!(compute_next_state(&TaskState::Failed, TaskTransition::Reserve).is_ok());
debug_assert!(lifecycle::pending().reserve().is_ok());
debug_assert!(lifecycle::failed().reserve().is_ok());
const CLAIM_QUERY: &str = r#"
UPDATE (
@@ -348,9 +299,6 @@ impl IngestionTask {
}
pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::StartProcessing)?;
debug_assert_eq!(next, TaskState::Processing);
const START_PROCESSING_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $processing,
@@ -377,9 +325,6 @@ impl IngestionTask {
}
pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::Succeed)?;
debug_assert_eq!(next, TaskState::Succeeded);
const COMPLETE_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $succeeded,
@@ -416,9 +361,6 @@ impl IngestionTask {
retry_delay: Duration,
db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::Fail)?;
debug_assert_eq!(next, TaskState::Failed);
let now = chrono::Utc::now();
let retry_at = now
+ ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30));
@@ -460,9 +402,6 @@ impl IngestionTask {
error: TaskErrorInfo,
db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::DeadLetter)?;
debug_assert_eq!(next, TaskState::DeadLetter);
const DEAD_LETTER_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $dead,
@@ -495,8 +434,6 @@ impl IngestionTask {
}
pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
compute_next_state(&self.state, TaskTransition::Cancel)?;
const CANCEL_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $cancelled,
@@ -530,8 +467,6 @@ impl IngestionTask {
}
pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
compute_next_state(&self.state, TaskTransition::Release)?;
const RELEASE_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $pending,
@@ -708,4 +643,86 @@ mod tests {
assert_eq!(dead.state, TaskState::DeadLetter);
assert_eq!(dead.error_message.as_deref(), Some("failed"));
}
#[tokio::test]
async fn test_mark_processing_requires_reservation() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_payload(user_id);
let task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
db.store_item(task.clone()).await.expect("store");
let err = task
.mark_processing(&db)
.await
.expect_err("processing should fail without reservation");
match err {
AppError::Validation(message) => {
assert!(
message.contains("Pending -> start_processing"),
"unexpected message: {message}"
);
}
other => panic!("expected validation error, got {other:?}"),
}
}
#[tokio::test]
async fn test_mark_failed_requires_processing() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_payload(user_id);
let task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
db.store_item(task.clone()).await.expect("store");
let err = task
.mark_failed(
TaskErrorInfo {
code: None,
message: "boom".into(),
},
Duration::from_secs(30),
&db,
)
.await
.expect_err("failing should require processing state");
match err {
AppError::Validation(message) => {
assert!(
message.contains("Pending -> fail"),
"unexpected message: {message}"
);
}
other => panic!("expected validation error, got {other:?}"),
}
}
#[tokio::test]
async fn test_release_requires_reservation() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_payload(user_id);
let task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
db.store_item(task.clone()).await.expect("store");
let err = task
.release(&db)
.await
.expect_err("release should require reserved state");
match err {
AppError::Validation(message) => {
assert!(
message.contains("Pending -> release"),
"unexpected message: {message}"
);
}
other => panic!("expected validation error, got {other:?}"),
}
}
}

View File

@@ -559,6 +559,25 @@ impl User {
Ok(jobs)
}
/// Gets all ingestion tasks for the specified user ordered by newest first
pub async fn get_all_ingestion_tasks(
user_id: &str,
db: &SurrealDbClient,
) -> Result<Vec<IngestionTask>, AppError> {
let jobs: Vec<IngestionTask> = db
.query(
"SELECT * FROM type::table($table)
WHERE user_id = $user_id
ORDER BY created_at DESC",
)
.bind(("table", IngestionTask::table_name()))
.bind(("user_id", user_id.to_owned()))
.await?
.take(0)?;
Ok(jobs)
}
/// Validate and delete job
pub async fn validate_and_delete_job(
id: &str,
@@ -771,6 +790,49 @@ mod tests {
assert_eq!(unfinished_ids.len(), 3);
}
#[tokio::test]
async fn test_get_all_ingestion_tasks_returns_sorted() {
let db = setup_test_db().await;
let user_id = "archive_user";
let other_user_id = "other_user";
let payload = IngestionPayload::Text {
text: "One".to_string(),
context: "Context".to_string(),
category: "Category".to_string(),
user_id: user_id.to_string(),
};
// Oldest task
let mut first = IngestionTask::new(payload.clone(), user_id.to_string()).await;
first.created_at = first.created_at - chrono::Duration::minutes(1);
first.updated_at = first.created_at;
first.state = TaskState::Succeeded;
db.store_item(first.clone()).await.expect("store first");
// Latest task
let mut second = IngestionTask::new(payload.clone(), user_id.to_string()).await;
second.state = TaskState::Processing;
db.store_item(second.clone()).await.expect("store second");
let other_payload = IngestionPayload::Text {
text: "Other".to_string(),
context: "Context".to_string(),
category: "Category".to_string(),
user_id: other_user_id.to_string(),
};
let other_task = IngestionTask::new(other_payload, other_user_id.to_string()).await;
db.store_item(other_task).await.expect("store other");
let tasks = User::get_all_ingestion_tasks(user_id, &db)
.await
.expect("fetch all tasks");
assert_eq!(tasks.len(), 2);
assert_eq!(tasks[0].id, second.id); // newest first
assert_eq!(tasks[1].id, first.id);
}
#[tokio::test]
async fn test_find_by_email() {
// Setup test database