From 41fc7bb99c78104b988c76650beebf0fd9412bf0 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Sun, 12 Oct 2025 22:21:20 +0200 Subject: [PATCH] feat: state machine for tasks, multiple workers --- Cargo.lock | 31 +- Cargo.toml | 1 + common/Cargo.toml | 1 + ...51012_205900_state_machine_migration.surql | 173 ++++ common/src/storage/types/ingestion_task.rs | 815 +++++++++++++----- common/src/storage/types/mod.rs | 28 +- common/src/storage/types/user.rs | 70 +- html-router/src/routes/ingestion/handlers.rs | 70 +- .../templates/dashboard/active_jobs.html | 18 +- .../templates/dashboard/current_task.html | 2 +- ingestion-pipeline/src/lib.rs | 110 +-- ingestion-pipeline/src/pipeline.rs | 93 +- 12 files changed, 1031 insertions(+), 381 deletions(-) create mode 100644 common/migrations/20251012_205900_state_machine_migration.surql diff --git a/Cargo.lock b/Cargo.lock index 5234710..c737afa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1322,6 +1322,7 @@ dependencies = [ "serde", "serde_json", "sha2", + "state-machines", "surrealdb", "surrealdb-migrations", "tempfile", @@ -3291,7 +3292,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" [[package]] name = "main" -version = "0.2.2" +version = "0.2.3" dependencies = [ "anyhow", "api-router", @@ -5400,6 +5401,34 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "state-machines" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "806ba0bf43ae158b229036d8a84601649a58d9761e718b5e0e07c2953803f4c1" +dependencies = [ + "state-machines-core", + "state-machines-macro", +] + +[[package]] +name = "state-machines-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "949cc50e84bed6234117f28a0ba2980dc35e9c17984ffe4e0a3364fba3e77540" + +[[package]] +name = "state-machines-macro" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8322f5aa92d31b3c05faa1ec3231b82da479a20706836867d67ae89ce74927bd" +dependencies = [ + "proc-macro2", + "quote", + "state-machines-core", + "syn 2.0.101", +] + [[package]] name = "static_assertions_next" version = "1.1.2" diff --git a/Cargo.toml b/Cargo.toml index 24ca537..7548e64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ tokio-retry = "0.3.0" base64 = "0.22.1" object_store = { version = "0.11.2" } bytes = "1.7.1" +state-machines = "0.2.0" [profile.dist] inherits = "release" diff --git a/common/Cargo.toml b/common/Cargo.toml index ba3aead..a6092ab 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -41,6 +41,7 @@ surrealdb-migrations = { workspace = true } tokio-retry = { workspace = true } object_store = { workspace = true } bytes = { workspace = true } +state-machines = { workspace = true } [features] diff --git a/common/migrations/20251012_205900_state_machine_migration.surql b/common/migrations/20251012_205900_state_machine_migration.surql new file mode 100644 index 0000000..a3e5608 --- /dev/null +++ b/common/migrations/20251012_205900_state_machine_migration.surql @@ -0,0 +1,173 @@ +-- State machine migration for ingestion_task records + +DEFINE FIELD IF NOT EXISTS state ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS attempts ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS max_attempts ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS scheduled_at ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS locked_at ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS lease_duration_secs ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS worker_id ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS error_code ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS error_message ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS last_error_at ON TABLE ingestion_task TYPE option; +DEFINE FIELD IF NOT EXISTS priority ON TABLE ingestion_task TYPE option; + +REMOVE FIELD status ON TABLE ingestion_task; +DEFINE FIELD status ON TABLE ingestion_task TYPE option; + +DEFINE INDEX IF NOT EXISTS idx_ingestion_task_state_sched ON TABLE ingestion_task FIELDS state, scheduled_at; + +LET $needs_migration = (SELECT count() AS count FROM type::table('ingestion_task') WHERE state = NONE)[0].count; + +IF $needs_migration > 0 THEN { + -- Created -> Pending + UPDATE type::table('ingestion_task') + SET + state = "Pending", + attempts = 0, + max_attempts = 3, + scheduled_at = IF created_at != NONE THEN created_at ELSE time::now() END, + locked_at = NONE, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = NONE, + last_error_at = NONE, + priority = 0 + WHERE state = NONE + AND status != NONE + AND status.name = "Created"; + + -- InProgress -> Processing + UPDATE type::table('ingestion_task') + SET + state = "Processing", + attempts = IF status.attempts != NONE THEN status.attempts ELSE 1 END, + max_attempts = 3, + scheduled_at = IF status.last_attempt != NONE THEN status.last_attempt ELSE time::now() END, + locked_at = IF status.last_attempt != NONE THEN status.last_attempt ELSE time::now() END, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = NONE, + last_error_at = NONE, + priority = 0 + WHERE state = NONE + AND status != NONE + AND status.name = "InProgress"; + + -- Completed -> Succeeded + UPDATE type::table('ingestion_task') + SET + state = "Succeeded", + attempts = 1, + max_attempts = 3, + scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END, + locked_at = NONE, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = NONE, + last_error_at = NONE, + priority = 0 + WHERE state = NONE + AND status != NONE + AND status.name = "Completed"; + + -- Error -> DeadLetter (terminal failure) + UPDATE type::table('ingestion_task') + SET + state = "DeadLetter", + attempts = 3, + max_attempts = 3, + scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END, + locked_at = NONE, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = status.message, + last_error_at = IF updated_at != NONE THEN updated_at ELSE time::now() END, + priority = 0 + WHERE state = NONE + AND status != NONE + AND status.name = "Error"; + + -- Cancelled -> Cancelled + UPDATE type::table('ingestion_task') + SET + state = "Cancelled", + attempts = 0, + max_attempts = 3, + scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END, + locked_at = NONE, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = NONE, + last_error_at = NONE, + priority = 0 + WHERE state = NONE + AND status != NONE + AND status.name = "Cancelled"; + + -- Fallback for any remaining records missing state + UPDATE type::table('ingestion_task') + SET + state = "Pending", + attempts = 0, + max_attempts = 3, + scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END, + locked_at = NONE, + lease_duration_secs = 300, + worker_id = NONE, + error_code = NONE, + error_message = NONE, + last_error_at = NONE, + priority = 0 + WHERE state = NONE; +} END; + +-- Ensure defaults for newly added fields +UPDATE type::table('ingestion_task') +SET max_attempts = 3 +WHERE max_attempts = NONE; + +UPDATE type::table('ingestion_task') +SET lease_duration_secs = 300 +WHERE lease_duration_secs = NONE; + +UPDATE type::table('ingestion_task') +SET attempts = 0 +WHERE attempts = NONE; + +UPDATE type::table('ingestion_task') +SET priority = 0 +WHERE priority = NONE; + +UPDATE type::table('ingestion_task') +SET scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END +WHERE scheduled_at = NONE; + +UPDATE type::table('ingestion_task') +SET locked_at = NONE +WHERE locked_at = NONE; + +UPDATE type::table('ingestion_task') +SET worker_id = NONE +WHERE worker_id != NONE AND worker_id = ""; + +UPDATE type::table('ingestion_task') +SET error_code = NONE +WHERE error_code = NONE; + +UPDATE type::table('ingestion_task') +SET error_message = NONE +WHERE error_message = NONE; + +UPDATE type::table('ingestion_task') +SET last_error_at = NONE +WHERE last_error_at = NONE; + +UPDATE type::table('ingestion_task') +SET status = NONE +WHERE status != NONE; diff --git a/common/src/storage/types/ingestion_task.rs b/common/src/storage/types/ingestion_task.rs index 22ef9de..ab61902 100644 --- a/common/src/storage/types/ingestion_task.rs +++ b/common/src/storage/types/ingestion_task.rs @@ -1,116 +1,594 @@ -use futures::Stream; -use surrealdb::{opt::PatchOp, Notification}; +use std::time::Duration; + +use chrono::Duration as ChronoDuration; +use state_machines::state_machine; +use surrealdb::sql::Datetime as SurrealDatetime; use uuid::Uuid; use crate::{error::AppError, storage::db::SurrealDbClient, stored_object}; use super::ingestion_payload::IngestionPayload; -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] -#[serde(tag = "name")] -pub enum IngestionTaskStatus { - Created, - InProgress { - attempts: u32, - last_attempt: DateTime, - }, - Completed, - Error { - message: String, - }, +pub const MAX_ATTEMPTS: u32 = 3; +pub const DEFAULT_LEASE_SECS: i64 = 300; +pub const DEFAULT_PRIORITY: i32 = 0; + +#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub enum TaskState { + #[serde(rename = "Pending")] + #[default] + Pending, + #[serde(rename = "Reserved")] + Reserved, + #[serde(rename = "Processing")] + Processing, + #[serde(rename = "Succeeded")] + Succeeded, + #[serde(rename = "Failed")] + Failed, + #[serde(rename = "Cancelled")] Cancelled, + #[serde(rename = "DeadLetter")] + DeadLetter, +} + +impl TaskState { + pub fn as_str(&self) -> &'static str { + match self { + TaskState::Pending => "Pending", + TaskState::Reserved => "Reserved", + TaskState::Processing => "Processing", + TaskState::Succeeded => "Succeeded", + TaskState::Failed => "Failed", + TaskState::Cancelled => "Cancelled", + TaskState::DeadLetter => "DeadLetter", + } + } + + pub fn is_terminal(&self) -> bool { + matches!( + self, + TaskState::Succeeded | TaskState::Cancelled | TaskState::DeadLetter + ) + } + + pub fn display_label(&self) -> &'static str { + match self { + TaskState::Pending => "Pending", + TaskState::Reserved => "Reserved", + TaskState::Processing => "Processing", + TaskState::Succeeded => "Completed", + TaskState::Failed => "Retrying", + TaskState::Cancelled => "Cancelled", + TaskState::DeadLetter => "Dead Letter", + } + } +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)] +pub struct TaskErrorInfo { + pub code: Option, + pub message: String, +} + +#[derive(Debug, Clone, Copy)] +enum TaskTransition { + Reserve, + StartProcessing, + Succeed, + Fail, + Cancel, + DeadLetter, + Release, +} + +impl TaskTransition { + fn as_str(&self) -> &'static str { + match self { + TaskTransition::Reserve => "reserve", + TaskTransition::StartProcessing => "start_processing", + TaskTransition::Succeed => "succeed", + TaskTransition::Fail => "fail", + TaskTransition::Cancel => "cancel", + TaskTransition::DeadLetter => "deadletter", + TaskTransition::Release => "release", + } + } +} + +mod lifecycle { + use super::state_machine; + + state_machine! { + name: TaskLifecycleMachine, + initial: Pending, + states: [Pending, Reserved, Processing, Succeeded, Failed, Cancelled, DeadLetter], + events { + reserve { + transition: { from: Pending, to: Reserved } + transition: { from: Failed, to: Reserved } + } + start_processing { + transition: { from: Reserved, to: Processing } + } + succeed { + transition: { from: Processing, to: Succeeded } + } + fail { + transition: { from: Processing, to: Failed } + } + cancel { + transition: { from: Pending, to: Cancelled } + transition: { from: Reserved, to: Cancelled } + transition: { from: Processing, to: Cancelled } + } + deadletter { + transition: { from: Failed, to: DeadLetter } + } + release { + transition: { from: Reserved, to: Pending } + } + } + } + + pub(super) fn pending() -> TaskLifecycleMachine<(), Pending> { + TaskLifecycleMachine::new(()) + } + + pub(super) fn reserved() -> TaskLifecycleMachine<(), Reserved> { + pending() + .reserve() + .expect("reserve transition from Pending should exist") + } + + pub(super) fn processing() -> TaskLifecycleMachine<(), Processing> { + reserved() + .start_processing() + .expect("start_processing transition from Reserved should exist") + } + + pub(super) fn failed() -> TaskLifecycleMachine<(), Failed> { + processing() + .fail() + .expect("fail transition from Processing should exist") + } +} + +fn invalid_transition(state: &TaskState, event: TaskTransition) -> AppError { + AppError::Validation(format!( + "Invalid task transition: {} -> {}", + state.as_str(), + event.as_str() + )) +} + +fn compute_next_state(state: &TaskState, event: TaskTransition) -> Result { + use lifecycle::*; + match (state, event) { + (TaskState::Pending, TaskTransition::Reserve) => pending() + .reserve() + .map(|_| TaskState::Reserved) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Failed, TaskTransition::Reserve) => failed() + .reserve() + .map(|_| TaskState::Reserved) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Reserved, TaskTransition::StartProcessing) => reserved() + .start_processing() + .map(|_| TaskState::Processing) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Processing, TaskTransition::Succeed) => processing() + .succeed() + .map(|_| TaskState::Succeeded) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Processing, TaskTransition::Fail) => processing() + .fail() + .map(|_| TaskState::Failed) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Pending, TaskTransition::Cancel) => pending() + .cancel() + .map(|_| TaskState::Cancelled) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Reserved, TaskTransition::Cancel) => reserved() + .cancel() + .map(|_| TaskState::Cancelled) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Processing, TaskTransition::Cancel) => processing() + .cancel() + .map(|_| TaskState::Cancelled) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Failed, TaskTransition::DeadLetter) => failed() + .deadletter() + .map(|_| TaskState::DeadLetter) + .map_err(|_| invalid_transition(state, event)), + (TaskState::Reserved, TaskTransition::Release) => reserved() + .release() + .map(|_| TaskState::Pending) + .map_err(|_| invalid_transition(state, event)), + _ => Err(invalid_transition(state, event)), + } } stored_object!(IngestionTask, "ingestion_task", { content: IngestionPayload, - status: IngestionTaskStatus, - user_id: String + state: TaskState, + user_id: String, + attempts: u32, + max_attempts: u32, + #[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime")] + scheduled_at: chrono::DateTime, + #[serde( + serialize_with = "serialize_option_datetime", + deserialize_with = "deserialize_option_datetime", + default + )] + locked_at: Option>, + lease_duration_secs: i64, + worker_id: Option, + error_code: Option, + error_message: Option, + #[serde( + serialize_with = "serialize_option_datetime", + deserialize_with = "deserialize_option_datetime", + default + )] + last_error_at: Option>, + priority: i32 }); -pub const MAX_ATTEMPTS: u32 = 3; - impl IngestionTask { pub async fn new(content: IngestionPayload, user_id: String) -> Self { - let now = Utc::now(); + let now = chrono::Utc::now(); Self { id: Uuid::new_v4().to_string(), content, - status: IngestionTaskStatus::Created, + state: TaskState::Pending, + user_id, + attempts: 0, + max_attempts: MAX_ATTEMPTS, + scheduled_at: now, + locked_at: None, + lease_duration_secs: DEFAULT_LEASE_SECS, + worker_id: None, + error_code: None, + error_message: None, + last_error_at: None, + priority: DEFAULT_PRIORITY, created_at: now, updated_at: now, - user_id, } } - /// Creates a new job and stores it in the database + pub fn can_retry(&self) -> bool { + self.attempts < self.max_attempts + } + + pub fn lease_duration(&self) -> Duration { + Duration::from_secs(self.lease_duration_secs.max(0) as u64) + } + pub async fn create_and_add_to_db( content: IngestionPayload, user_id: String, db: &SurrealDbClient, ) -> Result { let task = Self::new(content, user_id).await; - db.store_item(task.clone()).await?; - Ok(task) } - // Update job status - pub async fn update_status( - id: &str, - status: IngestionTaskStatus, + pub async fn claim_next_ready( db: &SurrealDbClient, - ) -> Result<(), AppError> { - let _job: Option = db - .update((Self::table_name(), id)) - .patch(PatchOp::replace("/status", status)) - .patch(PatchOp::replace( - "/updated_at", - surrealdb::Datetime::from(Utc::now()), + worker_id: &str, + now: chrono::DateTime, + lease_duration: Duration, + ) -> Result, AppError> { + debug_assert!(compute_next_state(&TaskState::Pending, TaskTransition::Reserve).is_ok()); + debug_assert!(compute_next_state(&TaskState::Failed, TaskTransition::Reserve).is_ok()); + + const CLAIM_QUERY: &str = r#" + UPDATE ( + SELECT * FROM type::table($table) + WHERE state IN $candidate_states + AND scheduled_at <= $now + AND ( + attempts < max_attempts + OR state IN $sticky_states + ) + AND ( + locked_at = NONE + OR time::unix($now) - time::unix(locked_at) >= lease_duration_secs + ) + ORDER BY priority DESC, scheduled_at ASC, created_at ASC + LIMIT 1 + ) + SET state = $reserved_state, + attempts = if state IN $increment_states THEN + if attempts + 1 > max_attempts THEN max_attempts ELSE attempts + 1 END + ELSE + attempts + END, + locked_at = $now, + worker_id = $worker_id, + lease_duration_secs = $lease_secs, + updated_at = $now + RETURN *; + "#; + + let mut result = db + .client + .query(CLAIM_QUERY) + .bind(("table", Self::table_name())) + .bind(( + "candidate_states", + vec![ + TaskState::Pending.as_str(), + TaskState::Failed.as_str(), + TaskState::Reserved.as_str(), + TaskState::Processing.as_str(), + ], )) + .bind(( + "sticky_states", + vec![TaskState::Reserved.as_str(), TaskState::Processing.as_str()], + )) + .bind(( + "increment_states", + vec![TaskState::Pending.as_str(), TaskState::Failed.as_str()], + )) + .bind(("reserved_state", TaskState::Reserved.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .bind(("worker_id", worker_id.to_string())) + .bind(("lease_secs", lease_duration.as_secs() as i64)) .await?; - Ok(()) + let task: Option = result.take(0)?; + Ok(task) } - /// Listen for new jobs - pub async fn listen_for_tasks( + pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result { + let next = compute_next_state(&self.state, TaskTransition::StartProcessing)?; + debug_assert_eq!(next, TaskState::Processing); + + const START_PROCESSING_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $processing, + updated_at = $now, + locked_at = $now + WHERE state = $reserved AND worker_id = $worker_id + RETURN *; + "#; + + let now = chrono::Utc::now(); + let mut result = db + .client + .query(START_PROCESSING_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("processing", TaskState::Processing.as_str())) + .bind(("reserved", TaskState::Reserved.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::StartProcessing)) + } + + pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result { + let next = compute_next_state(&self.state, TaskTransition::Succeed)?; + debug_assert_eq!(next, TaskState::Succeeded); + + const COMPLETE_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $succeeded, + updated_at = $now, + locked_at = NONE, + worker_id = NONE, + scheduled_at = $now, + error_code = NONE, + error_message = NONE, + last_error_at = NONE + WHERE state = $processing AND worker_id = $worker_id + RETURN *; + "#; + + let now = chrono::Utc::now(); + let mut result = db + .client + .query(COMPLETE_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("succeeded", TaskState::Succeeded.as_str())) + .bind(("processing", TaskState::Processing.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Succeed)) + } + + pub async fn mark_failed( + &self, + error: TaskErrorInfo, + retry_delay: Duration, db: &SurrealDbClient, - ) -> Result, surrealdb::Error>>, surrealdb::Error> - { - db.listen::().await + ) -> Result { + let next = compute_next_state(&self.state, TaskTransition::Fail)?; + debug_assert_eq!(next, TaskState::Failed); + + let now = chrono::Utc::now(); + let retry_at = now + + ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30)); + + const FAIL_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $failed, + updated_at = $now, + locked_at = NONE, + worker_id = NONE, + scheduled_at = $retry_at, + error_code = $error_code, + error_message = $error_message, + last_error_at = $now + WHERE state = $processing AND worker_id = $worker_id + RETURN *; + "#; + + let mut result = db + .client + .query(FAIL_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("failed", TaskState::Failed.as_str())) + .bind(("processing", TaskState::Processing.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .bind(("retry_at", SurrealDatetime::from(retry_at))) + .bind(("error_code", error.code.clone())) + .bind(("error_message", error.message.clone())) + .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Fail)) } - /// Get all unfinished tasks, ie newly created and in progress up two times - pub async fn get_unfinished_tasks(db: &SurrealDbClient) -> Result, AppError> { - let jobs: Vec = db + pub async fn mark_dead_letter( + &self, + error: TaskErrorInfo, + db: &SurrealDbClient, + ) -> Result { + let next = compute_next_state(&self.state, TaskTransition::DeadLetter)?; + debug_assert_eq!(next, TaskState::DeadLetter); + + const DEAD_LETTER_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $dead, + updated_at = $now, + locked_at = NONE, + worker_id = NONE, + scheduled_at = $now, + error_code = $error_code, + error_message = $error_message, + last_error_at = $now + WHERE state = $failed + RETURN *; + "#; + + let now = chrono::Utc::now(); + let mut result = db + .client + .query(DEAD_LETTER_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("dead", TaskState::DeadLetter.as_str())) + .bind(("failed", TaskState::Failed.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .bind(("error_code", error.code.clone())) + .bind(("error_message", error.message.clone())) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::DeadLetter)) + } + + pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result { + compute_next_state(&self.state, TaskTransition::Cancel)?; + + const CANCEL_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $cancelled, + updated_at = $now, + locked_at = NONE, + worker_id = NONE + WHERE state IN $allow_states + RETURN *; + "#; + + let now = chrono::Utc::now(); + let mut result = db + .client + .query(CANCEL_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("cancelled", TaskState::Cancelled.as_str())) + .bind(( + "allow_states", + vec![ + TaskState::Pending.as_str(), + TaskState::Reserved.as_str(), + TaskState::Processing.as_str(), + ], + )) + .bind(("now", SurrealDatetime::from(now))) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Cancel)) + } + + pub async fn release(&self, db: &SurrealDbClient) -> Result { + compute_next_state(&self.state, TaskTransition::Release)?; + + const RELEASE_QUERY: &str = r#" + UPDATE type::thing($table, $id) + SET state = $pending, + updated_at = $now, + locked_at = NONE, + worker_id = NONE + WHERE state = $reserved + RETURN *; + "#; + + let now = chrono::Utc::now(); + let mut result = db + .client + .query(RELEASE_QUERY) + .bind(("table", Self::table_name())) + .bind(("id", self.id.clone())) + .bind(("pending", TaskState::Pending.as_str())) + .bind(("reserved", TaskState::Reserved.as_str())) + .bind(("now", SurrealDatetime::from(now))) + .await?; + + let updated: Option = result.take(0)?; + updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Release)) + } + + pub async fn get_unfinished_tasks( + db: &SurrealDbClient, + ) -> Result, AppError> { + let tasks: Vec = db .query( - "SELECT * FROM type::table($table) - WHERE - status.name = 'Created' - OR ( - status.name = 'InProgress' - AND status.attempts < $max_attempts - ) - ORDER BY created_at ASC", + "SELECT * FROM type::table($table) + WHERE state IN $active_states + ORDER BY scheduled_at ASC, created_at ASC", ) .bind(("table", Self::table_name())) - .bind(("max_attempts", MAX_ATTEMPTS)) + .bind(( + "active_states", + vec![ + TaskState::Pending.as_str(), + TaskState::Reserved.as_str(), + TaskState::Processing.as_str(), + TaskState::Failed.as_str(), + ], + )) .await? .take(0)?; - Ok(jobs) + Ok(tasks) } } #[cfg(test)] mod tests { use super::*; - use chrono::Utc; + use crate::storage::types::ingestion_payload::IngestionPayload; - // Helper function to create a test ingestion payload - fn create_test_payload(user_id: &str) -> IngestionPayload { + fn create_payload(user_id: &str) -> IngestionPayload { IngestionPayload::Text { text: "Test content".to_string(), context: "Test context".to_string(), @@ -119,182 +597,115 @@ mod tests { } } - #[tokio::test] - async fn test_new_ingestion_task() { - let user_id = "user123"; - let payload = create_test_payload(user_id); + async fn memory_db() -> SurrealDbClient { + let namespace = "test_ns"; + let database = Uuid::new_v4().to_string(); + SurrealDbClient::memory(namespace, &database) + .await + .expect("in-memory surrealdb") + } + #[tokio::test] + async fn test_new_task_defaults() { + let user_id = "user123"; + let payload = create_payload(user_id); let task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - // Verify task properties assert_eq!(task.user_id, user_id); assert_eq!(task.content, payload); - assert!(matches!(task.status, IngestionTaskStatus::Created)); - assert!(!task.id.is_empty()); + assert_eq!(task.state, TaskState::Pending); + assert_eq!(task.attempts, 0); + assert_eq!(task.max_attempts, MAX_ATTEMPTS); + assert!(task.locked_at.is_none()); + assert!(task.worker_id.is_none()); } #[tokio::test] - async fn test_create_and_add_to_db() { - // Setup in-memory database - let namespace = "test_ns"; - let database = &Uuid::new_v4().to_string(); - let db = SurrealDbClient::memory(namespace, database) - .await - .expect("Failed to start in-memory surrealdb"); - + async fn test_create_and_store_task() { + let db = memory_db().await; let user_id = "user123"; - let payload = create_test_payload(user_id); + let payload = create_payload(user_id); - // Create and store task - IngestionTask::create_and_add_to_db(payload.clone(), user_id.to_string(), &db) + let created = + IngestionTask::create_and_add_to_db(payload.clone(), user_id.to_string(), &db) + .await + .expect("store"); + + let stored: Option = db + .get_item::(&created.id) .await - .expect("Failed to create and add task to db"); + .expect("fetch"); - // Query to verify task was stored - let query = format!( - "SELECT * FROM {} WHERE user_id = '{}'", - IngestionTask::table_name(), - user_id - ); - let mut result = db.query(query).await.expect("Query failed"); - let tasks: Vec = result.take(0).unwrap_or_default(); - - // Verify task is in the database - assert!(!tasks.is_empty(), "Task should exist in the database"); - let stored_task = &tasks[0]; - assert_eq!(stored_task.user_id, user_id); - assert!(matches!(stored_task.status, IngestionTaskStatus::Created)); + let stored = stored.expect("task exists"); + assert_eq!(stored.id, created.id); + assert_eq!(stored.state, TaskState::Pending); + assert_eq!(stored.attempts, 0); } #[tokio::test] - async fn test_update_status() { - // Setup in-memory database - let namespace = "test_ns"; - let database = &Uuid::new_v4().to_string(); - let db = SurrealDbClient::memory(namespace, database) - .await - .expect("Failed to start in-memory surrealdb"); - + async fn test_claim_and_transition() { + let db = memory_db().await; let user_id = "user123"; - let payload = create_test_payload(user_id); + let payload = create_payload(user_id); + let task = IngestionTask::new(payload, user_id.to_string()).await; + db.store_item(task.clone()).await.expect("store"); - // Create task manually - let task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - let task_id = task.id.clone(); - - // Store task - db.store_item(task).await.expect("Failed to store task"); - - // Update status to InProgress - let now = Utc::now(); - let new_status = IngestionTaskStatus::InProgress { - attempts: 1, - last_attempt: now, - }; - - IngestionTask::update_status(&task_id, new_status.clone(), &db) + let worker_id = "worker-1"; + let now = chrono::Utc::now(); + let claimed = IngestionTask::claim_next_ready(&db, worker_id, now, Duration::from_secs(60)) .await - .expect("Failed to update status"); + .expect("claim"); - // Verify status updated - let updated_task: Option = db - .get_item::(&task_id) - .await - .expect("Failed to get updated task"); + let claimed = claimed.expect("task claimed"); + assert_eq!(claimed.state, TaskState::Reserved); + assert_eq!(claimed.worker_id.as_deref(), Some(worker_id)); - assert!(updated_task.is_some()); - let updated_task = updated_task.unwrap(); + let processing = claimed.mark_processing(&db).await.expect("processing"); + assert_eq!(processing.state, TaskState::Processing); - match updated_task.status { - IngestionTaskStatus::InProgress { attempts, .. } => { - assert_eq!(attempts, 1); - } - _ => panic!("Expected InProgress status"), - } + let succeeded = processing.mark_succeeded(&db).await.expect("succeeded"); + assert_eq!(succeeded.state, TaskState::Succeeded); + assert!(succeeded.worker_id.is_none()); + assert!(succeeded.locked_at.is_none()); } #[tokio::test] - async fn test_get_unfinished_tasks() { - // Setup in-memory database - let namespace = "test_ns"; - let database = &Uuid::new_v4().to_string(); - let db = SurrealDbClient::memory(namespace, database) - .await - .expect("Failed to start in-memory surrealdb"); - + async fn test_fail_and_dead_letter() { + let db = memory_db().await; let user_id = "user123"; - let payload = create_test_payload(user_id); + let payload = create_payload(user_id); + let task = IngestionTask::new(payload, user_id.to_string()).await; + db.store_item(task.clone()).await.expect("store"); - // Create tasks with different statuses - let created_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; + let worker_id = "worker-dead"; + let now = chrono::Utc::now(); + let claimed = IngestionTask::claim_next_ready(&db, worker_id, now, Duration::from_secs(60)) + .await + .expect("claim") + .expect("claimed"); - let mut in_progress_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - in_progress_task.status = IngestionTaskStatus::InProgress { - attempts: 1, - last_attempt: Utc::now(), + let processing = claimed.mark_processing(&db).await.expect("processing"); + + let error_info = TaskErrorInfo { + code: Some("pipeline_error".into()), + message: "failed".into(), }; - let mut max_attempts_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - max_attempts_task.status = IngestionTaskStatus::InProgress { - attempts: MAX_ATTEMPTS, - last_attempt: Utc::now(), - }; - - let mut completed_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - completed_task.status = IngestionTaskStatus::Completed; - - let mut error_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - error_task.status = IngestionTaskStatus::Error { - message: "Test error".to_string(), - }; - - // Store all tasks - db.store_item(created_task) + let failed = processing + .mark_failed(error_info.clone(), Duration::from_secs(30), &db) .await - .expect("Failed to store created task"); - db.store_item(in_progress_task) - .await - .expect("Failed to store in-progress task"); - db.store_item(max_attempts_task) - .await - .expect("Failed to store max-attempts task"); - db.store_item(completed_task) - .await - .expect("Failed to store completed task"); - db.store_item(error_task) - .await - .expect("Failed to store error task"); + .expect("failed update"); + assert_eq!(failed.state, TaskState::Failed); + assert_eq!(failed.error_message.as_deref(), Some("failed")); + assert!(failed.worker_id.is_none()); + assert!(failed.locked_at.is_none()); + assert!(failed.scheduled_at > now); - // Get unfinished tasks - let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db) + let dead = failed + .mark_dead_letter(error_info.clone(), &db) .await - .expect("Failed to get unfinished tasks"); - - // Verify only Created and InProgress with attempts < MAX_ATTEMPTS are returned - assert_eq!(unfinished_tasks.len(), 2); - - let statuses: Vec<_> = unfinished_tasks - .iter() - .map(|task| match &task.status { - IngestionTaskStatus::Created => "Created", - IngestionTaskStatus::InProgress { attempts, .. } => { - if *attempts < MAX_ATTEMPTS { - "InProgress=MAX" - } - } - IngestionTaskStatus::Completed => "Completed", - IngestionTaskStatus::Error { .. } => "Error", - IngestionTaskStatus::Cancelled => "Cancelled", - }) - .collect(); - - assert!(statuses.contains(&"Created")); - assert!(statuses.contains(&"InProgress=MAX")); - assert!(!statuses.contains(&"Completed")); - assert!(!statuses.contains(&"Error")); - assert!(!statuses.contains(&"Cancelled")); + .expect("dead letter"); + assert_eq!(dead.state, TaskState::DeadLetter); + assert_eq!(dead.error_message.as_deref(), Some("failed")); } } diff --git a/common/src/storage/types/mod.rs b/common/src/storage/types/mod.rs index 9667862..99b4bd0 100644 --- a/common/src/storage/types/mod.rs +++ b/common/src/storage/types/mod.rs @@ -83,6 +83,32 @@ macro_rules! stored_object { Ok(DateTime::::from(dt)) } + #[allow(dead_code)] + fn serialize_option_datetime( + date: &Option>, + serializer: S, + ) -> Result + where + S: serde::Serializer, + { + match date { + Some(dt) => serializer + .serialize_some(&Into::::into(*dt)), + None => serializer.serialize_none(), + } + } + + #[allow(dead_code)] + fn deserialize_option_datetime<'de, D>( + deserializer: D, + ) -> Result>, D::Error> + where + D: serde::Deserializer<'de>, + { + let value = Option::::deserialize(deserializer)?; + Ok(value.map(DateTime::::from)) + } + #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct $name { @@ -92,7 +118,7 @@ macro_rules! stored_object { pub created_at: DateTime, #[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime", default)] pub updated_at: DateTime, - $(pub $field: $ty),* + $( $(#[$attr])* pub $field: $ty),* } impl StoredObject for $name { diff --git a/common/src/storage/types/user.rs b/common/src/storage/types/user.rs index 28962c8..198fc61 100644 --- a/common/src/storage/types/user.rs +++ b/common/src/storage/types/user.rs @@ -8,7 +8,7 @@ use uuid::Uuid; use super::text_chunk::TextChunk; use super::{ conversation::Conversation, - ingestion_task::{IngestionTask, MAX_ATTEMPTS}, + ingestion_task::{IngestionTask, TaskState}, knowledge_entity::{KnowledgeEntity, KnowledgeEntityType}, knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings, @@ -535,19 +535,24 @@ impl User { let jobs: Vec = db .query( "SELECT * FROM type::table($table) - WHERE user_id = $user_id - AND ( - status.name = 'Created' - OR ( - status.name = 'InProgress' - AND status.attempts < $max_attempts - ) - ) - ORDER BY created_at DESC", + WHERE user_id = $user_id + AND ( + state IN $active_states + OR (state = $failed_state AND attempts < max_attempts) + ) + ORDER BY scheduled_at ASC, created_at DESC", ) .bind(("table", IngestionTask::table_name())) .bind(("user_id", user_id.to_owned())) - .bind(("max_attempts", MAX_ATTEMPTS)) + .bind(( + "active_states", + vec![ + TaskState::Pending.as_str(), + TaskState::Reserved.as_str(), + TaskState::Processing.as_str(), + ], + )) + .bind(("failed_state", TaskState::Failed.as_str())) .await? .take(0)?; @@ -605,7 +610,7 @@ impl User { mod tests { use super::*; use crate::storage::types::ingestion_payload::IngestionPayload; - use crate::storage::types::ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS}; + use crate::storage::types::ingestion_task::{IngestionTask, TaskState, MAX_ATTEMPTS}; use std::collections::HashSet; // Helper function to set up a test database with SystemSettings @@ -710,28 +715,32 @@ mod tests { .await .expect("Failed to store created task"); - let mut in_progress_allowed = - IngestionTask::new(payload.clone(), user_id.to_string()).await; - in_progress_allowed.status = IngestionTaskStatus::InProgress { - attempts: 1, - last_attempt: chrono::Utc::now(), - }; - db.store_item(in_progress_allowed.clone()) + let mut processing_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; + processing_task.state = TaskState::Processing; + processing_task.attempts = 1; + db.store_item(processing_task.clone()) .await - .expect("Failed to store in-progress task"); + .expect("Failed to store processing task"); - let mut in_progress_blocked = + let mut failed_retry_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; + failed_retry_task.state = TaskState::Failed; + failed_retry_task.attempts = 1; + failed_retry_task.scheduled_at = chrono::Utc::now() - chrono::Duration::minutes(5); + db.store_item(failed_retry_task.clone()) + .await + .expect("Failed to store retryable failed task"); + + let mut failed_blocked_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - in_progress_blocked.status = IngestionTaskStatus::InProgress { - attempts: MAX_ATTEMPTS, - last_attempt: chrono::Utc::now(), - }; - db.store_item(in_progress_blocked.clone()) + failed_blocked_task.state = TaskState::Failed; + failed_blocked_task.attempts = MAX_ATTEMPTS; + failed_blocked_task.error_message = Some("Too many failures".into()); + db.store_item(failed_blocked_task.clone()) .await .expect("Failed to store blocked task"); let mut completed_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; - completed_task.status = IngestionTaskStatus::Completed; + completed_task.state = TaskState::Succeeded; db.store_item(completed_task.clone()) .await .expect("Failed to store completed task"); @@ -755,10 +764,11 @@ mod tests { unfinished.iter().map(|task| task.id.clone()).collect(); assert!(unfinished_ids.contains(&created_task.id)); - assert!(unfinished_ids.contains(&in_progress_allowed.id)); - assert!(!unfinished_ids.contains(&in_progress_blocked.id)); + assert!(unfinished_ids.contains(&processing_task.id)); + assert!(unfinished_ids.contains(&failed_retry_task.id)); + assert!(!unfinished_ids.contains(&failed_blocked_task.id)); assert!(!unfinished_ids.contains(&completed_task.id)); - assert_eq!(unfinished_ids.len(), 2); + assert_eq!(unfinished_ids.len(), 3); } #[tokio::test] diff --git a/html-router/src/routes/ingestion/handlers.rs b/html-router/src/routes/ingestion/handlers.rs index 3dd2b5b..31bb1e4 100644 --- a/html-router/src/routes/ingestion/handlers.rs +++ b/html-router/src/routes/ingestion/handlers.rs @@ -20,7 +20,7 @@ use common::{ storage::types::{ file_info::FileInfo, ingestion_payload::IngestionPayload, - ingestion_task::{IngestionTask, IngestionTaskStatus}, + ingestion_task::{IngestionTask, TaskState}, user::User, }, }; @@ -178,40 +178,54 @@ pub async fn get_task_updates_stream( Ok(Some(updated_task)) => { consecutive_db_errors = 0; // Reset error count on success - // Format the status message based on IngestionTaskStatus - let status_message = match &updated_task.status { - IngestionTaskStatus::Created => "Created".to_string(), - IngestionTaskStatus::InProgress { attempts, .. } => { - // Following your template's current display - format!("In progress, attempt {}", attempts) + let status_message = match updated_task.state { + TaskState::Pending => "Pending".to_string(), + TaskState::Reserved => format!( + "Reserved (attempt {} of {})", + updated_task.attempts, + updated_task.max_attempts + ), + TaskState::Processing => format!( + "Processing (attempt {} of {})", + updated_task.attempts, + updated_task.max_attempts + ), + TaskState::Succeeded => "Completed".to_string(), + TaskState::Failed => { + let mut base = format!( + "Retry scheduled (attempt {} of {})", + updated_task.attempts, + updated_task.max_attempts + ); + if let Some(message) = updated_task.error_message.as_ref() { + base.push_str(": "); + base.push_str(message); + } + base } - IngestionTaskStatus::Completed => "Completed".to_string(), - IngestionTaskStatus::Error { message } => { - // Providing a user-friendly error message from the status - format!("Error: {}", message) + TaskState::Cancelled => "Cancelled".to_string(), + TaskState::DeadLetter => { + let mut base = "Failed permanently".to_string(); + if let Some(message) = updated_task.error_message.as_ref() { + base.push_str(": "); + base.push_str(message); + } + base } - IngestionTaskStatus::Cancelled => "Cancelled".to_string(), }; yield Ok(Event::default().event("status").data(status_message)); // Check for terminal states to close the stream - match updated_task.status { - IngestionTaskStatus::Completed - | IngestionTaskStatus::Error { .. } - | IngestionTaskStatus::Cancelled => { - // Send a specific event that HTMX uses to close the connection - // Send a event to reload the recent content - // Send a event to remove the loading indicatior - let check_icon = state.templates.render("icons/check_icon.html", &context!{}).unwrap_or("Ok".to_string()); - yield Ok(Event::default().event("stop_loading").data(check_icon)); - yield Ok(Event::default().event("update_latest_content").data("Update latest content")); - yield Ok(Event::default().event("close_stream").data("Stream complete")); - break; // Exit loop on terminal states - } - _ => { - // Not a terminal state, continue polling - } + if updated_task.state.is_terminal() { + // Send a specific event that HTMX uses to close the connection + // Send a event to reload the recent content + // Send a event to remove the loading indicatior + let check_icon = state.templates.render("icons/check_icon.html", &context!{}).unwrap_or("Ok".to_string()); + yield Ok(Event::default().event("stop_loading").data(check_icon)); + yield Ok(Event::default().event("update_latest_content").data("Update latest content")); + yield Ok(Event::default().event("close_stream").data("Stream complete")); + break; // Exit loop on terminal states } }, Ok(None) => { diff --git a/html-router/templates/dashboard/active_jobs.html b/html-router/templates/dashboard/active_jobs.html index 1ac600c..55ffdc9 100644 --- a/html-router/templates/dashboard/active_jobs.html +++ b/html-router/templates/dashboard/active_jobs.html @@ -23,12 +23,18 @@
- {% if item.status.name == "InProgress" %} - In progress, attempt {{ item.status.attempts }} - {% elif item.status.name == "Error" %} - Error: {{ item.status.message }} + {% if item.state == "Processing" %} + Processing, attempt {{ item.attempts }} of {{ item.max_attempts }} + {% elif item.state == "Reserved" %} + Reserved, attempt {{ item.attempts }} of {{ item.max_attempts }} + {% elif item.state == "Failed" %} + Retry scheduled (attempt {{ item.attempts }} of {{ item.max_attempts }}){% if item.error_message %}: {{ item.error_message }}{% endif %} + {% elif item.state == "DeadLetter" %} + Failed permanently{% if item.error_message %}: {{ item.error_message }}{% endif %} + {% elif item.state == "Succeeded" %} + Completed {% else %} - {{ item.status.name }} + {{ item.state }} {% endif %}
@@ -60,4 +66,4 @@ {% endif %} -{% endblock %} \ No newline at end of file +{% endblock %} diff --git a/html-router/templates/dashboard/current_task.html b/html-router/templates/dashboard/current_task.html index c2ab5b5..8321439 100644 --- a/html-router/templates/dashboard/current_task.html +++ b/html-router/templates/dashboard/current_task.html @@ -8,7 +8,7 @@
- Created + Pending
diff --git a/ingestion-pipeline/src/lib.rs b/ingestion-pipeline/src/lib.rs index 99d091e..0ff384a 100644 --- a/ingestion-pipeline/src/lib.rs +++ b/ingestion-pipeline/src/lib.rs @@ -3,101 +3,47 @@ pub mod pipeline; pub mod types; pub mod utils; +use chrono::Utc; use common::storage::{ db::SurrealDbClient, - types::ingestion_task::{IngestionTask, IngestionTaskStatus}, + types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS}, }; -use futures::StreamExt; use pipeline::IngestionPipeline; use std::sync::Arc; -use surrealdb::Action; -use tracing::{error, info}; +use tokio::time::{sleep, Duration}; +use tracing::{error, info, warn}; +use uuid::Uuid; pub async fn run_worker_loop( db: Arc, ingestion_pipeline: Arc, ) -> Result<(), Box> { + let worker_id = format!("ingestion-worker-{}", Uuid::new_v4()); + let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64); + let idle_backoff = Duration::from_millis(500); + loop { - // First, check for any unfinished tasks - let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db).await?; - if !unfinished_tasks.is_empty() { - info!("Found {} unfinished jobs", unfinished_tasks.len()); - for task in unfinished_tasks { - ingestion_pipeline.process_task(task).await?; - } - } - - // If no unfinished jobs, start listening for new ones - info!("Listening for new jobs..."); - let mut job_stream = IngestionTask::listen_for_tasks(&db).await?; - while let Some(notification) = job_stream.next().await { - match notification { - Ok(notification) => { - info!("Received notification: {:?}", notification); - match notification.action { - Action::Create => { - if let Err(e) = ingestion_pipeline.process_task(notification.data).await - { - error!("Error processing task: {}", e); - } - } - Action::Update => { - match notification.data.status { - IngestionTaskStatus::Completed - | IngestionTaskStatus::Error { .. } - | IngestionTaskStatus::Cancelled => { - info!( - "Skipping already completed/error/cancelled task: {}", - notification.data.id - ); - continue; - } - IngestionTaskStatus::InProgress { attempts, .. } => { - // Only process if this is a retry after an error, not our own update - if let Ok(Some(current_task)) = - db.get_item::(¬ification.data.id).await - { - match current_task.status { - IngestionTaskStatus::Error { .. } - if attempts - < common::storage::types::ingestion_task::MAX_ATTEMPTS => - { - // This is a retry after an error - if let Err(e) = - ingestion_pipeline.process_task(current_task).await - { - error!("Error processing task retry: {}", e); - } - } - _ => { - info!( - "Skipping in-progress update for task: {}", - notification.data.id - ); - continue; - } - } - } - } - IngestionTaskStatus::Created => { - // Shouldn't happen with Update action, but process if it does - if let Err(e) = - ingestion_pipeline.process_task(notification.data).await - { - error!("Error processing task: {}", e); - } - } - } - } - _ => {} // Ignore other actions - } + match IngestionTask::claim_next_ready(&db, &worker_id, Utc::now(), lease_duration).await { + Ok(Some(task)) => { + let task_id = task.id.clone(); + info!( + %worker_id, + %task_id, + attempt = task.attempts, + "claimed ingestion task" + ); + if let Err(err) = ingestion_pipeline.process_task(task).await { + error!(%worker_id, %task_id, error = %err, "ingestion task failed"); } - Err(e) => error!("Error in job notification: {}", e), + } + Ok(None) => { + sleep(idle_backoff).await; + } + Err(err) => { + error!(%worker_id, error = %err, "failed to claim ingestion task"); + warn!("Backing off for 1s after claim error"); + sleep(Duration::from_secs(1)).await; } } - - // If we reach here, the stream has ended (connection lost?) - error!("Database stream ended unexpectedly, reconnecting..."); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } } diff --git a/ingestion-pipeline/src/pipeline.rs b/ingestion-pipeline/src/pipeline.rs index cffaf90..59ad399 100644 --- a/ingestion-pipeline/src/pipeline.rs +++ b/ingestion-pipeline/src/pipeline.rs @@ -1,16 +1,15 @@ use std::{sync::Arc, time::Instant}; -use chrono::Utc; use text_splitter::TextSplitter; use tokio::time::{sleep, Duration}; -use tracing::{info, warn}; +use tracing::{info, info_span, warn}; use common::{ error::AppError, storage::{ db::SurrealDbClient, types::{ - ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS}, + ingestion_task::{IngestionTask, TaskErrorInfo}, knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, @@ -44,47 +43,81 @@ impl IngestionPipeline { }) } pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> { - let current_attempts = match task.status { - IngestionTaskStatus::InProgress { attempts, .. } => attempts + 1, - _ => 1, - }; + let task_id = task.id.clone(); + let attempt = task.attempts; + let worker_label = task + .worker_id + .clone() + .unwrap_or_else(|| "unknown-worker".to_string()); + let span = info_span!( + "ingestion_task", + %task_id, + attempt, + worker_id = %worker_label, + state = %task.state.as_str() + ); + let _enter = span.enter(); + let processing_task = task.mark_processing(&self.db).await?; - // Update status to InProgress with attempt count - IngestionTask::update_status( - &task.id, - IngestionTaskStatus::InProgress { - attempts: current_attempts, - last_attempt: Utc::now(), - }, + let text_content = to_text_content( + processing_task.content.clone(), &self.db, + &self.config, + &self.openai_client, ) .await?; - let text_content = - to_text_content(task.content, &self.db, &self.config, &self.openai_client).await?; - match self.process(&text_content).await { Ok(_) => { - IngestionTask::update_status(&task.id, IngestionTaskStatus::Completed, &self.db) - .await?; + processing_task.mark_succeeded(&self.db).await?; + info!(%task_id, attempt, "ingestion task succeeded"); Ok(()) } - Err(e) => { - if current_attempts >= MAX_ATTEMPTS { - IngestionTask::update_status( - &task.id, - IngestionTaskStatus::Error { - message: format!("Max attempts reached: {}", e), - }, - &self.db, - ) - .await?; + Err(err) => { + let reason = err.to_string(); + let error_info = TaskErrorInfo { + code: None, + message: reason.clone(), + }; + + if processing_task.can_retry() { + let delay = Self::retry_delay(processing_task.attempts); + processing_task + .mark_failed(error_info, delay, &self.db) + .await?; + warn!( + %task_id, + attempt = processing_task.attempts, + retry_in_secs = delay.as_secs(), + "ingestion task failed; scheduled retry" + ); + } else { + processing_task + .mark_dead_letter(error_info, &self.db) + .await?; + warn!( + %task_id, + attempt = processing_task.attempts, + "ingestion task failed; moved to dead letter queue" + ); } - Err(AppError::Processing(e.to_string())) + + Err(AppError::Processing(reason)) } } } + fn retry_delay(attempt: u32) -> Duration { + const BASE_SECONDS: u64 = 30; + const MAX_SECONDS: u64 = 15 * 60; + + let capped_attempt = attempt.saturating_sub(1).min(5) as u32; + let multiplier = 2_u64.pow(capped_attempt); + let delay = BASE_SECONDS * multiplier; + + Duration::from_secs(delay.min(MAX_SECONDS)) + } + pub async fn process(&self, content: &TextContent) -> Result<(), AppError> { let now = Instant::now();