feat: state machine for tasks, multiple workers

This commit is contained in:
Per Stark
2025-10-12 22:21:20 +02:00
parent 61d8d7abe7
commit 41fc7bb99c
12 changed files with 1031 additions and 381 deletions

31
Cargo.lock generated
View File

@@ -1322,6 +1322,7 @@ dependencies = [
"serde",
"serde_json",
"sha2",
"state-machines",
"surrealdb",
"surrealdb-migrations",
"tempfile",
@@ -3291,7 +3292,7 @@ checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4"
[[package]]
name = "main"
version = "0.2.2"
version = "0.2.3"
dependencies = [
"anyhow",
"api-router",
@@ -5400,6 +5401,34 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "state-machines"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806ba0bf43ae158b229036d8a84601649a58d9761e718b5e0e07c2953803f4c1"
dependencies = [
"state-machines-core",
"state-machines-macro",
]
[[package]]
name = "state-machines-core"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "949cc50e84bed6234117f28a0ba2980dc35e9c17984ffe4e0a3364fba3e77540"
[[package]]
name = "state-machines-macro"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8322f5aa92d31b3c05faa1ec3231b82da479a20706836867d67ae89ce74927bd"
dependencies = [
"proc-macro2",
"quote",
"state-machines-core",
"syn 2.0.101",
]
[[package]]
name = "static_assertions_next"
version = "1.1.2"

View File

@@ -55,6 +55,7 @@ tokio-retry = "0.3.0"
base64 = "0.22.1"
object_store = { version = "0.11.2" }
bytes = "1.7.1"
state-machines = "0.2.0"
[profile.dist]
inherits = "release"

View File

@@ -41,6 +41,7 @@ surrealdb-migrations = { workspace = true }
tokio-retry = { workspace = true }
object_store = { workspace = true }
bytes = { workspace = true }
state-machines = { workspace = true }
[features]

View File

@@ -0,0 +1,173 @@
-- State machine migration for ingestion_task records
DEFINE FIELD IF NOT EXISTS state ON TABLE ingestion_task TYPE option<string>;
DEFINE FIELD IF NOT EXISTS attempts ON TABLE ingestion_task TYPE option<number>;
DEFINE FIELD IF NOT EXISTS max_attempts ON TABLE ingestion_task TYPE option<number>;
DEFINE FIELD IF NOT EXISTS scheduled_at ON TABLE ingestion_task TYPE option<datetime>;
DEFINE FIELD IF NOT EXISTS locked_at ON TABLE ingestion_task TYPE option<datetime>;
DEFINE FIELD IF NOT EXISTS lease_duration_secs ON TABLE ingestion_task TYPE option<number>;
DEFINE FIELD IF NOT EXISTS worker_id ON TABLE ingestion_task TYPE option<string>;
DEFINE FIELD IF NOT EXISTS error_code ON TABLE ingestion_task TYPE option<string>;
DEFINE FIELD IF NOT EXISTS error_message ON TABLE ingestion_task TYPE option<string>;
DEFINE FIELD IF NOT EXISTS last_error_at ON TABLE ingestion_task TYPE option<datetime>;
DEFINE FIELD IF NOT EXISTS priority ON TABLE ingestion_task TYPE option<number>;
REMOVE FIELD status ON TABLE ingestion_task;
DEFINE FIELD status ON TABLE ingestion_task TYPE option<object>;
DEFINE INDEX IF NOT EXISTS idx_ingestion_task_state_sched ON TABLE ingestion_task FIELDS state, scheduled_at;
LET $needs_migration = (SELECT count() AS count FROM type::table('ingestion_task') WHERE state = NONE)[0].count;
IF $needs_migration > 0 THEN {
-- Created -> Pending
UPDATE type::table('ingestion_task')
SET
state = "Pending",
attempts = 0,
max_attempts = 3,
scheduled_at = IF created_at != NONE THEN created_at ELSE time::now() END,
locked_at = NONE,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = NONE,
last_error_at = NONE,
priority = 0
WHERE state = NONE
AND status != NONE
AND status.name = "Created";
-- InProgress -> Processing
UPDATE type::table('ingestion_task')
SET
state = "Processing",
attempts = IF status.attempts != NONE THEN status.attempts ELSE 1 END,
max_attempts = 3,
scheduled_at = IF status.last_attempt != NONE THEN status.last_attempt ELSE time::now() END,
locked_at = IF status.last_attempt != NONE THEN status.last_attempt ELSE time::now() END,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = NONE,
last_error_at = NONE,
priority = 0
WHERE state = NONE
AND status != NONE
AND status.name = "InProgress";
-- Completed -> Succeeded
UPDATE type::table('ingestion_task')
SET
state = "Succeeded",
attempts = 1,
max_attempts = 3,
scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END,
locked_at = NONE,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = NONE,
last_error_at = NONE,
priority = 0
WHERE state = NONE
AND status != NONE
AND status.name = "Completed";
-- Error -> DeadLetter (terminal failure)
UPDATE type::table('ingestion_task')
SET
state = "DeadLetter",
attempts = 3,
max_attempts = 3,
scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END,
locked_at = NONE,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = status.message,
last_error_at = IF updated_at != NONE THEN updated_at ELSE time::now() END,
priority = 0
WHERE state = NONE
AND status != NONE
AND status.name = "Error";
-- Cancelled -> Cancelled
UPDATE type::table('ingestion_task')
SET
state = "Cancelled",
attempts = 0,
max_attempts = 3,
scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END,
locked_at = NONE,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = NONE,
last_error_at = NONE,
priority = 0
WHERE state = NONE
AND status != NONE
AND status.name = "Cancelled";
-- Fallback for any remaining records missing state
UPDATE type::table('ingestion_task')
SET
state = "Pending",
attempts = 0,
max_attempts = 3,
scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END,
locked_at = NONE,
lease_duration_secs = 300,
worker_id = NONE,
error_code = NONE,
error_message = NONE,
last_error_at = NONE,
priority = 0
WHERE state = NONE;
} END;
-- Ensure defaults for newly added fields
UPDATE type::table('ingestion_task')
SET max_attempts = 3
WHERE max_attempts = NONE;
UPDATE type::table('ingestion_task')
SET lease_duration_secs = 300
WHERE lease_duration_secs = NONE;
UPDATE type::table('ingestion_task')
SET attempts = 0
WHERE attempts = NONE;
UPDATE type::table('ingestion_task')
SET priority = 0
WHERE priority = NONE;
UPDATE type::table('ingestion_task')
SET scheduled_at = IF updated_at != NONE THEN updated_at ELSE time::now() END
WHERE scheduled_at = NONE;
UPDATE type::table('ingestion_task')
SET locked_at = NONE
WHERE locked_at = NONE;
UPDATE type::table('ingestion_task')
SET worker_id = NONE
WHERE worker_id != NONE AND worker_id = "";
UPDATE type::table('ingestion_task')
SET error_code = NONE
WHERE error_code = NONE;
UPDATE type::table('ingestion_task')
SET error_message = NONE
WHERE error_message = NONE;
UPDATE type::table('ingestion_task')
SET last_error_at = NONE
WHERE last_error_at = NONE;
UPDATE type::table('ingestion_task')
SET status = NONE
WHERE status != NONE;

View File

@@ -1,116 +1,594 @@
use futures::Stream;
use surrealdb::{opt::PatchOp, Notification};
use std::time::Duration;
use chrono::Duration as ChronoDuration;
use state_machines::state_machine;
use surrealdb::sql::Datetime as SurrealDatetime;
use uuid::Uuid;
use crate::{error::AppError, storage::db::SurrealDbClient, stored_object};
use super::ingestion_payload::IngestionPayload;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "name")]
pub enum IngestionTaskStatus {
Created,
InProgress {
attempts: u32,
last_attempt: DateTime<Utc>,
},
Completed,
Error {
message: String,
},
pub const MAX_ATTEMPTS: u32 = 3;
pub const DEFAULT_LEASE_SECS: i64 = 300;
pub const DEFAULT_PRIORITY: i32 = 0;
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub enum TaskState {
#[serde(rename = "Pending")]
#[default]
Pending,
#[serde(rename = "Reserved")]
Reserved,
#[serde(rename = "Processing")]
Processing,
#[serde(rename = "Succeeded")]
Succeeded,
#[serde(rename = "Failed")]
Failed,
#[serde(rename = "Cancelled")]
Cancelled,
#[serde(rename = "DeadLetter")]
DeadLetter,
}
impl TaskState {
pub fn as_str(&self) -> &'static str {
match self {
TaskState::Pending => "Pending",
TaskState::Reserved => "Reserved",
TaskState::Processing => "Processing",
TaskState::Succeeded => "Succeeded",
TaskState::Failed => "Failed",
TaskState::Cancelled => "Cancelled",
TaskState::DeadLetter => "DeadLetter",
}
}
pub fn is_terminal(&self) -> bool {
matches!(
self,
TaskState::Succeeded | TaskState::Cancelled | TaskState::DeadLetter
)
}
pub fn display_label(&self) -> &'static str {
match self {
TaskState::Pending => "Pending",
TaskState::Reserved => "Reserved",
TaskState::Processing => "Processing",
TaskState::Succeeded => "Completed",
TaskState::Failed => "Retrying",
TaskState::Cancelled => "Cancelled",
TaskState::DeadLetter => "Dead Letter",
}
}
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq, Default)]
pub struct TaskErrorInfo {
pub code: Option<String>,
pub message: String,
}
#[derive(Debug, Clone, Copy)]
enum TaskTransition {
Reserve,
StartProcessing,
Succeed,
Fail,
Cancel,
DeadLetter,
Release,
}
impl TaskTransition {
fn as_str(&self) -> &'static str {
match self {
TaskTransition::Reserve => "reserve",
TaskTransition::StartProcessing => "start_processing",
TaskTransition::Succeed => "succeed",
TaskTransition::Fail => "fail",
TaskTransition::Cancel => "cancel",
TaskTransition::DeadLetter => "deadletter",
TaskTransition::Release => "release",
}
}
}
mod lifecycle {
use super::state_machine;
state_machine! {
name: TaskLifecycleMachine,
initial: Pending,
states: [Pending, Reserved, Processing, Succeeded, Failed, Cancelled, DeadLetter],
events {
reserve {
transition: { from: Pending, to: Reserved }
transition: { from: Failed, to: Reserved }
}
start_processing {
transition: { from: Reserved, to: Processing }
}
succeed {
transition: { from: Processing, to: Succeeded }
}
fail {
transition: { from: Processing, to: Failed }
}
cancel {
transition: { from: Pending, to: Cancelled }
transition: { from: Reserved, to: Cancelled }
transition: { from: Processing, to: Cancelled }
}
deadletter {
transition: { from: Failed, to: DeadLetter }
}
release {
transition: { from: Reserved, to: Pending }
}
}
}
pub(super) fn pending() -> TaskLifecycleMachine<(), Pending> {
TaskLifecycleMachine::new(())
}
pub(super) fn reserved() -> TaskLifecycleMachine<(), Reserved> {
pending()
.reserve()
.expect("reserve transition from Pending should exist")
}
pub(super) fn processing() -> TaskLifecycleMachine<(), Processing> {
reserved()
.start_processing()
.expect("start_processing transition from Reserved should exist")
}
pub(super) fn failed() -> TaskLifecycleMachine<(), Failed> {
processing()
.fail()
.expect("fail transition from Processing should exist")
}
}
fn invalid_transition(state: &TaskState, event: TaskTransition) -> AppError {
AppError::Validation(format!(
"Invalid task transition: {} -> {}",
state.as_str(),
event.as_str()
))
}
fn compute_next_state(state: &TaskState, event: TaskTransition) -> Result<TaskState, AppError> {
use lifecycle::*;
match (state, event) {
(TaskState::Pending, TaskTransition::Reserve) => pending()
.reserve()
.map(|_| TaskState::Reserved)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Failed, TaskTransition::Reserve) => failed()
.reserve()
.map(|_| TaskState::Reserved)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::StartProcessing) => reserved()
.start_processing()
.map(|_| TaskState::Processing)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Succeed) => processing()
.succeed()
.map(|_| TaskState::Succeeded)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Fail) => processing()
.fail()
.map(|_| TaskState::Failed)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Pending, TaskTransition::Cancel) => pending()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::Cancel) => reserved()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Processing, TaskTransition::Cancel) => processing()
.cancel()
.map(|_| TaskState::Cancelled)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Failed, TaskTransition::DeadLetter) => failed()
.deadletter()
.map(|_| TaskState::DeadLetter)
.map_err(|_| invalid_transition(state, event)),
(TaskState::Reserved, TaskTransition::Release) => reserved()
.release()
.map(|_| TaskState::Pending)
.map_err(|_| invalid_transition(state, event)),
_ => Err(invalid_transition(state, event)),
}
}
stored_object!(IngestionTask, "ingestion_task", {
content: IngestionPayload,
status: IngestionTaskStatus,
user_id: String
state: TaskState,
user_id: String,
attempts: u32,
max_attempts: u32,
#[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime")]
scheduled_at: chrono::DateTime<chrono::Utc>,
#[serde(
serialize_with = "serialize_option_datetime",
deserialize_with = "deserialize_option_datetime",
default
)]
locked_at: Option<chrono::DateTime<chrono::Utc>>,
lease_duration_secs: i64,
worker_id: Option<String>,
error_code: Option<String>,
error_message: Option<String>,
#[serde(
serialize_with = "serialize_option_datetime",
deserialize_with = "deserialize_option_datetime",
default
)]
last_error_at: Option<chrono::DateTime<chrono::Utc>>,
priority: i32
});
pub const MAX_ATTEMPTS: u32 = 3;
impl IngestionTask {
pub async fn new(content: IngestionPayload, user_id: String) -> Self {
let now = Utc::now();
let now = chrono::Utc::now();
Self {
id: Uuid::new_v4().to_string(),
content,
status: IngestionTaskStatus::Created,
state: TaskState::Pending,
user_id,
attempts: 0,
max_attempts: MAX_ATTEMPTS,
scheduled_at: now,
locked_at: None,
lease_duration_secs: DEFAULT_LEASE_SECS,
worker_id: None,
error_code: None,
error_message: None,
last_error_at: None,
priority: DEFAULT_PRIORITY,
created_at: now,
updated_at: now,
user_id,
}
}
/// Creates a new job and stores it in the database
pub fn can_retry(&self) -> bool {
self.attempts < self.max_attempts
}
pub fn lease_duration(&self) -> Duration {
Duration::from_secs(self.lease_duration_secs.max(0) as u64)
}
pub async fn create_and_add_to_db(
content: IngestionPayload,
user_id: String,
db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> {
let task = Self::new(content, user_id).await;
db.store_item(task.clone()).await?;
Ok(task)
}
// Update job status
pub async fn update_status(
id: &str,
status: IngestionTaskStatus,
pub async fn claim_next_ready(
db: &SurrealDbClient,
) -> Result<(), AppError> {
let _job: Option<Self> = db
.update((Self::table_name(), id))
.patch(PatchOp::replace("/status", status))
.patch(PatchOp::replace(
"/updated_at",
surrealdb::Datetime::from(Utc::now()),
worker_id: &str,
now: chrono::DateTime<chrono::Utc>,
lease_duration: Duration,
) -> Result<Option<IngestionTask>, AppError> {
debug_assert!(compute_next_state(&TaskState::Pending, TaskTransition::Reserve).is_ok());
debug_assert!(compute_next_state(&TaskState::Failed, TaskTransition::Reserve).is_ok());
const CLAIM_QUERY: &str = r#"
UPDATE (
SELECT * FROM type::table($table)
WHERE state IN $candidate_states
AND scheduled_at <= $now
AND (
attempts < max_attempts
OR state IN $sticky_states
)
AND (
locked_at = NONE
OR time::unix($now) - time::unix(locked_at) >= lease_duration_secs
)
ORDER BY priority DESC, scheduled_at ASC, created_at ASC
LIMIT 1
)
SET state = $reserved_state,
attempts = if state IN $increment_states THEN
if attempts + 1 > max_attempts THEN max_attempts ELSE attempts + 1 END
ELSE
attempts
END,
locked_at = $now,
worker_id = $worker_id,
lease_duration_secs = $lease_secs,
updated_at = $now
RETURN *;
"#;
let mut result = db
.client
.query(CLAIM_QUERY)
.bind(("table", Self::table_name()))
.bind((
"candidate_states",
vec![
TaskState::Pending.as_str(),
TaskState::Failed.as_str(),
TaskState::Reserved.as_str(),
TaskState::Processing.as_str(),
],
))
.bind((
"sticky_states",
vec![TaskState::Reserved.as_str(), TaskState::Processing.as_str()],
))
.bind((
"increment_states",
vec![TaskState::Pending.as_str(), TaskState::Failed.as_str()],
))
.bind(("reserved_state", TaskState::Reserved.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", worker_id.to_string()))
.bind(("lease_secs", lease_duration.as_secs() as i64))
.await?;
Ok(())
let task: Option<IngestionTask> = result.take(0)?;
Ok(task)
}
/// Listen for new jobs
pub async fn listen_for_tasks(
pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::StartProcessing)?;
debug_assert_eq!(next, TaskState::Processing);
const START_PROCESSING_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $processing,
updated_at = $now,
locked_at = $now
WHERE state = $reserved AND worker_id = $worker_id
RETURN *;
"#;
let now = chrono::Utc::now();
let mut result = db
.client
.query(START_PROCESSING_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("processing", TaskState::Processing.as_str()))
.bind(("reserved", TaskState::Reserved.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default()))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::StartProcessing))
}
pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::Succeed)?;
debug_assert_eq!(next, TaskState::Succeeded);
const COMPLETE_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $succeeded,
updated_at = $now,
locked_at = NONE,
worker_id = NONE,
scheduled_at = $now,
error_code = NONE,
error_message = NONE,
last_error_at = NONE
WHERE state = $processing AND worker_id = $worker_id
RETURN *;
"#;
let now = chrono::Utc::now();
let mut result = db
.client
.query(COMPLETE_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("succeeded", TaskState::Succeeded.as_str()))
.bind(("processing", TaskState::Processing.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default()))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Succeed))
}
pub async fn mark_failed(
&self,
error: TaskErrorInfo,
retry_delay: Duration,
db: &SurrealDbClient,
) -> Result<impl Stream<Item = Result<Notification<Self>, surrealdb::Error>>, surrealdb::Error>
{
db.listen::<Self>().await
) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::Fail)?;
debug_assert_eq!(next, TaskState::Failed);
let now = chrono::Utc::now();
let retry_at = now
+ ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30));
const FAIL_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $failed,
updated_at = $now,
locked_at = NONE,
worker_id = NONE,
scheduled_at = $retry_at,
error_code = $error_code,
error_message = $error_message,
last_error_at = $now
WHERE state = $processing AND worker_id = $worker_id
RETURN *;
"#;
let mut result = db
.client
.query(FAIL_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("failed", TaskState::Failed.as_str()))
.bind(("processing", TaskState::Processing.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.bind(("retry_at", SurrealDatetime::from(retry_at)))
.bind(("error_code", error.code.clone()))
.bind(("error_message", error.message.clone()))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default()))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Fail))
}
/// Get all unfinished tasks, ie newly created and in progress up two times
pub async fn get_unfinished_tasks(db: &SurrealDbClient) -> Result<Vec<Self>, AppError> {
let jobs: Vec<Self> = db
pub async fn mark_dead_letter(
&self,
error: TaskErrorInfo,
db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> {
let next = compute_next_state(&self.state, TaskTransition::DeadLetter)?;
debug_assert_eq!(next, TaskState::DeadLetter);
const DEAD_LETTER_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $dead,
updated_at = $now,
locked_at = NONE,
worker_id = NONE,
scheduled_at = $now,
error_code = $error_code,
error_message = $error_message,
last_error_at = $now
WHERE state = $failed
RETURN *;
"#;
let now = chrono::Utc::now();
let mut result = db
.client
.query(DEAD_LETTER_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("dead", TaskState::DeadLetter.as_str()))
.bind(("failed", TaskState::Failed.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.bind(("error_code", error.code.clone()))
.bind(("error_message", error.message.clone()))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::DeadLetter))
}
pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
compute_next_state(&self.state, TaskTransition::Cancel)?;
const CANCEL_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $cancelled,
updated_at = $now,
locked_at = NONE,
worker_id = NONE
WHERE state IN $allow_states
RETURN *;
"#;
let now = chrono::Utc::now();
let mut result = db
.client
.query(CANCEL_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("cancelled", TaskState::Cancelled.as_str()))
.bind((
"allow_states",
vec![
TaskState::Pending.as_str(),
TaskState::Reserved.as_str(),
TaskState::Processing.as_str(),
],
))
.bind(("now", SurrealDatetime::from(now)))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Cancel))
}
pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
compute_next_state(&self.state, TaskTransition::Release)?;
const RELEASE_QUERY: &str = r#"
UPDATE type::thing($table, $id)
SET state = $pending,
updated_at = $now,
locked_at = NONE,
worker_id = NONE
WHERE state = $reserved
RETURN *;
"#;
let now = chrono::Utc::now();
let mut result = db
.client
.query(RELEASE_QUERY)
.bind(("table", Self::table_name()))
.bind(("id", self.id.clone()))
.bind(("pending", TaskState::Pending.as_str()))
.bind(("reserved", TaskState::Reserved.as_str()))
.bind(("now", SurrealDatetime::from(now)))
.await?;
let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Release))
}
pub async fn get_unfinished_tasks(
db: &SurrealDbClient,
) -> Result<Vec<IngestionTask>, AppError> {
let tasks: Vec<IngestionTask> = db
.query(
"SELECT * FROM type::table($table)
WHERE
status.name = 'Created'
OR (
status.name = 'InProgress'
AND status.attempts < $max_attempts
)
ORDER BY created_at ASC",
"SELECT * FROM type::table($table)
WHERE state IN $active_states
ORDER BY scheduled_at ASC, created_at ASC",
)
.bind(("table", Self::table_name()))
.bind(("max_attempts", MAX_ATTEMPTS))
.bind((
"active_states",
vec![
TaskState::Pending.as_str(),
TaskState::Reserved.as_str(),
TaskState::Processing.as_str(),
TaskState::Failed.as_str(),
],
))
.await?
.take(0)?;
Ok(jobs)
Ok(tasks)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use crate::storage::types::ingestion_payload::IngestionPayload;
// Helper function to create a test ingestion payload
fn create_test_payload(user_id: &str) -> IngestionPayload {
fn create_payload(user_id: &str) -> IngestionPayload {
IngestionPayload::Text {
text: "Test content".to_string(),
context: "Test context".to_string(),
@@ -119,182 +597,115 @@ mod tests {
}
}
#[tokio::test]
async fn test_new_ingestion_task() {
let user_id = "user123";
let payload = create_test_payload(user_id);
async fn memory_db() -> SurrealDbClient {
let namespace = "test_ns";
let database = Uuid::new_v4().to_string();
SurrealDbClient::memory(namespace, &database)
.await
.expect("in-memory surrealdb")
}
#[tokio::test]
async fn test_new_task_defaults() {
let user_id = "user123";
let payload = create_payload(user_id);
let task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
// Verify task properties
assert_eq!(task.user_id, user_id);
assert_eq!(task.content, payload);
assert!(matches!(task.status, IngestionTaskStatus::Created));
assert!(!task.id.is_empty());
assert_eq!(task.state, TaskState::Pending);
assert_eq!(task.attempts, 0);
assert_eq!(task.max_attempts, MAX_ATTEMPTS);
assert!(task.locked_at.is_none());
assert!(task.worker_id.is_none());
}
#[tokio::test]
async fn test_create_and_add_to_db() {
// Setup in-memory database
let namespace = "test_ns";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.expect("Failed to start in-memory surrealdb");
async fn test_create_and_store_task() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_test_payload(user_id);
let payload = create_payload(user_id);
// Create and store task
IngestionTask::create_and_add_to_db(payload.clone(), user_id.to_string(), &db)
let created =
IngestionTask::create_and_add_to_db(payload.clone(), user_id.to_string(), &db)
.await
.expect("store");
let stored: Option<IngestionTask> = db
.get_item::<IngestionTask>(&created.id)
.await
.expect("Failed to create and add task to db");
.expect("fetch");
// Query to verify task was stored
let query = format!(
"SELECT * FROM {} WHERE user_id = '{}'",
IngestionTask::table_name(),
user_id
);
let mut result = db.query(query).await.expect("Query failed");
let tasks: Vec<IngestionTask> = result.take(0).unwrap_or_default();
// Verify task is in the database
assert!(!tasks.is_empty(), "Task should exist in the database");
let stored_task = &tasks[0];
assert_eq!(stored_task.user_id, user_id);
assert!(matches!(stored_task.status, IngestionTaskStatus::Created));
let stored = stored.expect("task exists");
assert_eq!(stored.id, created.id);
assert_eq!(stored.state, TaskState::Pending);
assert_eq!(stored.attempts, 0);
}
#[tokio::test]
async fn test_update_status() {
// Setup in-memory database
let namespace = "test_ns";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.expect("Failed to start in-memory surrealdb");
async fn test_claim_and_transition() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_test_payload(user_id);
let payload = create_payload(user_id);
let task = IngestionTask::new(payload, user_id.to_string()).await;
db.store_item(task.clone()).await.expect("store");
// Create task manually
let task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
let task_id = task.id.clone();
// Store task
db.store_item(task).await.expect("Failed to store task");
// Update status to InProgress
let now = Utc::now();
let new_status = IngestionTaskStatus::InProgress {
attempts: 1,
last_attempt: now,
};
IngestionTask::update_status(&task_id, new_status.clone(), &db)
let worker_id = "worker-1";
let now = chrono::Utc::now();
let claimed = IngestionTask::claim_next_ready(&db, worker_id, now, Duration::from_secs(60))
.await
.expect("Failed to update status");
.expect("claim");
// Verify status updated
let updated_task: Option<IngestionTask> = db
.get_item::<IngestionTask>(&task_id)
.await
.expect("Failed to get updated task");
let claimed = claimed.expect("task claimed");
assert_eq!(claimed.state, TaskState::Reserved);
assert_eq!(claimed.worker_id.as_deref(), Some(worker_id));
assert!(updated_task.is_some());
let updated_task = updated_task.unwrap();
let processing = claimed.mark_processing(&db).await.expect("processing");
assert_eq!(processing.state, TaskState::Processing);
match updated_task.status {
IngestionTaskStatus::InProgress { attempts, .. } => {
assert_eq!(attempts, 1);
}
_ => panic!("Expected InProgress status"),
}
let succeeded = processing.mark_succeeded(&db).await.expect("succeeded");
assert_eq!(succeeded.state, TaskState::Succeeded);
assert!(succeeded.worker_id.is_none());
assert!(succeeded.locked_at.is_none());
}
#[tokio::test]
async fn test_get_unfinished_tasks() {
// Setup in-memory database
let namespace = "test_ns";
let database = &Uuid::new_v4().to_string();
let db = SurrealDbClient::memory(namespace, database)
.await
.expect("Failed to start in-memory surrealdb");
async fn test_fail_and_dead_letter() {
let db = memory_db().await;
let user_id = "user123";
let payload = create_test_payload(user_id);
let payload = create_payload(user_id);
let task = IngestionTask::new(payload, user_id.to_string()).await;
db.store_item(task.clone()).await.expect("store");
// Create tasks with different statuses
let created_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
let worker_id = "worker-dead";
let now = chrono::Utc::now();
let claimed = IngestionTask::claim_next_ready(&db, worker_id, now, Duration::from_secs(60))
.await
.expect("claim")
.expect("claimed");
let mut in_progress_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
in_progress_task.status = IngestionTaskStatus::InProgress {
attempts: 1,
last_attempt: Utc::now(),
let processing = claimed.mark_processing(&db).await.expect("processing");
let error_info = TaskErrorInfo {
code: Some("pipeline_error".into()),
message: "failed".into(),
};
let mut max_attempts_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
max_attempts_task.status = IngestionTaskStatus::InProgress {
attempts: MAX_ATTEMPTS,
last_attempt: Utc::now(),
};
let mut completed_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
completed_task.status = IngestionTaskStatus::Completed;
let mut error_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
error_task.status = IngestionTaskStatus::Error {
message: "Test error".to_string(),
};
// Store all tasks
db.store_item(created_task)
let failed = processing
.mark_failed(error_info.clone(), Duration::from_secs(30), &db)
.await
.expect("Failed to store created task");
db.store_item(in_progress_task)
.await
.expect("Failed to store in-progress task");
db.store_item(max_attempts_task)
.await
.expect("Failed to store max-attempts task");
db.store_item(completed_task)
.await
.expect("Failed to store completed task");
db.store_item(error_task)
.await
.expect("Failed to store error task");
.expect("failed update");
assert_eq!(failed.state, TaskState::Failed);
assert_eq!(failed.error_message.as_deref(), Some("failed"));
assert!(failed.worker_id.is_none());
assert!(failed.locked_at.is_none());
assert!(failed.scheduled_at > now);
// Get unfinished tasks
let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db)
let dead = failed
.mark_dead_letter(error_info.clone(), &db)
.await
.expect("Failed to get unfinished tasks");
// Verify only Created and InProgress with attempts < MAX_ATTEMPTS are returned
assert_eq!(unfinished_tasks.len(), 2);
let statuses: Vec<_> = unfinished_tasks
.iter()
.map(|task| match &task.status {
IngestionTaskStatus::Created => "Created",
IngestionTaskStatus::InProgress { attempts, .. } => {
if *attempts < MAX_ATTEMPTS {
"InProgress<MAX"
} else {
"InProgress>=MAX"
}
}
IngestionTaskStatus::Completed => "Completed",
IngestionTaskStatus::Error { .. } => "Error",
IngestionTaskStatus::Cancelled => "Cancelled",
})
.collect();
assert!(statuses.contains(&"Created"));
assert!(statuses.contains(&"InProgress<MAX"));
assert!(!statuses.contains(&"InProgress>=MAX"));
assert!(!statuses.contains(&"Completed"));
assert!(!statuses.contains(&"Error"));
assert!(!statuses.contains(&"Cancelled"));
.expect("dead letter");
assert_eq!(dead.state, TaskState::DeadLetter);
assert_eq!(dead.error_message.as_deref(), Some("failed"));
}
}

View File

@@ -83,6 +83,32 @@ macro_rules! stored_object {
Ok(DateTime::<Utc>::from(dt))
}
#[allow(dead_code)]
fn serialize_option_datetime<S>(
date: &Option<DateTime<Utc>>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match date {
Some(dt) => serializer
.serialize_some(&Into::<surrealdb::sql::Datetime>::into(*dt)),
None => serializer.serialize_none(),
}
}
#[allow(dead_code)]
fn deserialize_option_datetime<'de, D>(
deserializer: D,
) -> Result<Option<DateTime<Utc>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = Option::<surrealdb::sql::Datetime>::deserialize(deserializer)?;
Ok(value.map(DateTime::<Utc>::from))
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct $name {
@@ -92,7 +118,7 @@ macro_rules! stored_object {
pub created_at: DateTime<Utc>,
#[serde(serialize_with = "serialize_datetime", deserialize_with = "deserialize_datetime", default)]
pub updated_at: DateTime<Utc>,
$(pub $field: $ty),*
$( $(#[$attr])* pub $field: $ty),*
}
impl StoredObject for $name {

View File

@@ -8,7 +8,7 @@ use uuid::Uuid;
use super::text_chunk::TextChunk;
use super::{
conversation::Conversation,
ingestion_task::{IngestionTask, MAX_ATTEMPTS},
ingestion_task::{IngestionTask, TaskState},
knowledge_entity::{KnowledgeEntity, KnowledgeEntityType},
knowledge_relationship::KnowledgeRelationship,
system_settings::SystemSettings,
@@ -535,19 +535,24 @@ impl User {
let jobs: Vec<IngestionTask> = db
.query(
"SELECT * FROM type::table($table)
WHERE user_id = $user_id
AND (
status.name = 'Created'
OR (
status.name = 'InProgress'
AND status.attempts < $max_attempts
)
)
ORDER BY created_at DESC",
WHERE user_id = $user_id
AND (
state IN $active_states
OR (state = $failed_state AND attempts < max_attempts)
)
ORDER BY scheduled_at ASC, created_at DESC",
)
.bind(("table", IngestionTask::table_name()))
.bind(("user_id", user_id.to_owned()))
.bind(("max_attempts", MAX_ATTEMPTS))
.bind((
"active_states",
vec![
TaskState::Pending.as_str(),
TaskState::Reserved.as_str(),
TaskState::Processing.as_str(),
],
))
.bind(("failed_state", TaskState::Failed.as_str()))
.await?
.take(0)?;
@@ -605,7 +610,7 @@ impl User {
mod tests {
use super::*;
use crate::storage::types::ingestion_payload::IngestionPayload;
use crate::storage::types::ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS};
use crate::storage::types::ingestion_task::{IngestionTask, TaskState, MAX_ATTEMPTS};
use std::collections::HashSet;
// Helper function to set up a test database with SystemSettings
@@ -710,28 +715,32 @@ mod tests {
.await
.expect("Failed to store created task");
let mut in_progress_allowed =
IngestionTask::new(payload.clone(), user_id.to_string()).await;
in_progress_allowed.status = IngestionTaskStatus::InProgress {
attempts: 1,
last_attempt: chrono::Utc::now(),
};
db.store_item(in_progress_allowed.clone())
let mut processing_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
processing_task.state = TaskState::Processing;
processing_task.attempts = 1;
db.store_item(processing_task.clone())
.await
.expect("Failed to store in-progress task");
.expect("Failed to store processing task");
let mut in_progress_blocked =
let mut failed_retry_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
failed_retry_task.state = TaskState::Failed;
failed_retry_task.attempts = 1;
failed_retry_task.scheduled_at = chrono::Utc::now() - chrono::Duration::minutes(5);
db.store_item(failed_retry_task.clone())
.await
.expect("Failed to store retryable failed task");
let mut failed_blocked_task =
IngestionTask::new(payload.clone(), user_id.to_string()).await;
in_progress_blocked.status = IngestionTaskStatus::InProgress {
attempts: MAX_ATTEMPTS,
last_attempt: chrono::Utc::now(),
};
db.store_item(in_progress_blocked.clone())
failed_blocked_task.state = TaskState::Failed;
failed_blocked_task.attempts = MAX_ATTEMPTS;
failed_blocked_task.error_message = Some("Too many failures".into());
db.store_item(failed_blocked_task.clone())
.await
.expect("Failed to store blocked task");
let mut completed_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
completed_task.status = IngestionTaskStatus::Completed;
completed_task.state = TaskState::Succeeded;
db.store_item(completed_task.clone())
.await
.expect("Failed to store completed task");
@@ -755,10 +764,11 @@ mod tests {
unfinished.iter().map(|task| task.id.clone()).collect();
assert!(unfinished_ids.contains(&created_task.id));
assert!(unfinished_ids.contains(&in_progress_allowed.id));
assert!(!unfinished_ids.contains(&in_progress_blocked.id));
assert!(unfinished_ids.contains(&processing_task.id));
assert!(unfinished_ids.contains(&failed_retry_task.id));
assert!(!unfinished_ids.contains(&failed_blocked_task.id));
assert!(!unfinished_ids.contains(&completed_task.id));
assert_eq!(unfinished_ids.len(), 2);
assert_eq!(unfinished_ids.len(), 3);
}
#[tokio::test]

View File

@@ -20,7 +20,7 @@ use common::{
storage::types::{
file_info::FileInfo,
ingestion_payload::IngestionPayload,
ingestion_task::{IngestionTask, IngestionTaskStatus},
ingestion_task::{IngestionTask, TaskState},
user::User,
},
};
@@ -178,40 +178,54 @@ pub async fn get_task_updates_stream(
Ok(Some(updated_task)) => {
consecutive_db_errors = 0; // Reset error count on success
// Format the status message based on IngestionTaskStatus
let status_message = match &updated_task.status {
IngestionTaskStatus::Created => "Created".to_string(),
IngestionTaskStatus::InProgress { attempts, .. } => {
// Following your template's current display
format!("In progress, attempt {}", attempts)
let status_message = match updated_task.state {
TaskState::Pending => "Pending".to_string(),
TaskState::Reserved => format!(
"Reserved (attempt {} of {})",
updated_task.attempts,
updated_task.max_attempts
),
TaskState::Processing => format!(
"Processing (attempt {} of {})",
updated_task.attempts,
updated_task.max_attempts
),
TaskState::Succeeded => "Completed".to_string(),
TaskState::Failed => {
let mut base = format!(
"Retry scheduled (attempt {} of {})",
updated_task.attempts,
updated_task.max_attempts
);
if let Some(message) = updated_task.error_message.as_ref() {
base.push_str(": ");
base.push_str(message);
}
base
}
IngestionTaskStatus::Completed => "Completed".to_string(),
IngestionTaskStatus::Error { message } => {
// Providing a user-friendly error message from the status
format!("Error: {}", message)
TaskState::Cancelled => "Cancelled".to_string(),
TaskState::DeadLetter => {
let mut base = "Failed permanently".to_string();
if let Some(message) = updated_task.error_message.as_ref() {
base.push_str(": ");
base.push_str(message);
}
base
}
IngestionTaskStatus::Cancelled => "Cancelled".to_string(),
};
yield Ok(Event::default().event("status").data(status_message));
// Check for terminal states to close the stream
match updated_task.status {
IngestionTaskStatus::Completed
| IngestionTaskStatus::Error { .. }
| IngestionTaskStatus::Cancelled => {
// Send a specific event that HTMX uses to close the connection
// Send a event to reload the recent content
// Send a event to remove the loading indicatior
let check_icon = state.templates.render("icons/check_icon.html", &context!{}).unwrap_or("Ok".to_string());
yield Ok(Event::default().event("stop_loading").data(check_icon));
yield Ok(Event::default().event("update_latest_content").data("Update latest content"));
yield Ok(Event::default().event("close_stream").data("Stream complete"));
break; // Exit loop on terminal states
}
_ => {
// Not a terminal state, continue polling
}
if updated_task.state.is_terminal() {
// Send a specific event that HTMX uses to close the connection
// Send a event to reload the recent content
// Send a event to remove the loading indicatior
let check_icon = state.templates.render("icons/check_icon.html", &context!{}).unwrap_or("Ok".to_string());
yield Ok(Event::default().event("stop_loading").data(check_icon));
yield Ok(Event::default().event("update_latest_content").data("Update latest content"));
yield Ok(Event::default().event("close_stream").data("Stream complete"));
break; // Exit loop on terminal states
}
},
Ok(None) => {

View File

@@ -23,12 +23,18 @@
</div>
<div class="space-y-1">
<div class="text-sm font-semibold">
{% if item.status.name == "InProgress" %}
In progress, attempt {{ item.status.attempts }}
{% elif item.status.name == "Error" %}
Error: {{ item.status.message }}
{% if item.state == "Processing" %}
Processing, attempt {{ item.attempts }} of {{ item.max_attempts }}
{% elif item.state == "Reserved" %}
Reserved, attempt {{ item.attempts }} of {{ item.max_attempts }}
{% elif item.state == "Failed" %}
Retry scheduled (attempt {{ item.attempts }} of {{ item.max_attempts }}){% if item.error_message %}: {{ item.error_message }}{% endif %}
{% elif item.state == "DeadLetter" %}
Failed permanently{% if item.error_message %}: {{ item.error_message }}{% endif %}
{% elif item.state == "Succeeded" %}
Completed
{% else %}
{{ item.status.name }}
{{ item.state }}
{% endif %}
</div>
<div class="text-xs font-semibold opacity-60">
@@ -60,4 +66,4 @@
</ul>
{% endif %}
</section>
{% endblock %}
{% endblock %}

View File

@@ -8,7 +8,7 @@
</div>
<div class="space-y-1">
<div class="text-sm font-semibold flex gap-2 items-center">
<span sse-swap="status" hx-swap="innerHTML">Created</span>
<span sse-swap="status" hx-swap="innerHTML">Pending</span>
<div hx-get="/content/recent" hx-target="#latest_content_section" hx-swap="outerHTML"
hx-trigger="sse:update_latest_content"></div>
</div>

View File

@@ -3,101 +3,47 @@ pub mod pipeline;
pub mod types;
pub mod utils;
use chrono::Utc;
use common::storage::{
db::SurrealDbClient,
types::ingestion_task::{IngestionTask, IngestionTaskStatus},
types::ingestion_task::{IngestionTask, DEFAULT_LEASE_SECS},
};
use futures::StreamExt;
use pipeline::IngestionPipeline;
use std::sync::Arc;
use surrealdb::Action;
use tracing::{error, info};
use tokio::time::{sleep, Duration};
use tracing::{error, info, warn};
use uuid::Uuid;
pub async fn run_worker_loop(
db: Arc<SurrealDbClient>,
ingestion_pipeline: Arc<IngestionPipeline>,
) -> Result<(), Box<dyn std::error::Error>> {
let worker_id = format!("ingestion-worker-{}", Uuid::new_v4());
let lease_duration = Duration::from_secs(DEFAULT_LEASE_SECS as u64);
let idle_backoff = Duration::from_millis(500);
loop {
// First, check for any unfinished tasks
let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db).await?;
if !unfinished_tasks.is_empty() {
info!("Found {} unfinished jobs", unfinished_tasks.len());
for task in unfinished_tasks {
ingestion_pipeline.process_task(task).await?;
}
}
// If no unfinished jobs, start listening for new ones
info!("Listening for new jobs...");
let mut job_stream = IngestionTask::listen_for_tasks(&db).await?;
while let Some(notification) = job_stream.next().await {
match notification {
Ok(notification) => {
info!("Received notification: {:?}", notification);
match notification.action {
Action::Create => {
if let Err(e) = ingestion_pipeline.process_task(notification.data).await
{
error!("Error processing task: {}", e);
}
}
Action::Update => {
match notification.data.status {
IngestionTaskStatus::Completed
| IngestionTaskStatus::Error { .. }
| IngestionTaskStatus::Cancelled => {
info!(
"Skipping already completed/error/cancelled task: {}",
notification.data.id
);
continue;
}
IngestionTaskStatus::InProgress { attempts, .. } => {
// Only process if this is a retry after an error, not our own update
if let Ok(Some(current_task)) =
db.get_item::<IngestionTask>(&notification.data.id).await
{
match current_task.status {
IngestionTaskStatus::Error { .. }
if attempts
< common::storage::types::ingestion_task::MAX_ATTEMPTS =>
{
// This is a retry after an error
if let Err(e) =
ingestion_pipeline.process_task(current_task).await
{
error!("Error processing task retry: {}", e);
}
}
_ => {
info!(
"Skipping in-progress update for task: {}",
notification.data.id
);
continue;
}
}
}
}
IngestionTaskStatus::Created => {
// Shouldn't happen with Update action, but process if it does
if let Err(e) =
ingestion_pipeline.process_task(notification.data).await
{
error!("Error processing task: {}", e);
}
}
}
}
_ => {} // Ignore other actions
}
match IngestionTask::claim_next_ready(&db, &worker_id, Utc::now(), lease_duration).await {
Ok(Some(task)) => {
let task_id = task.id.clone();
info!(
%worker_id,
%task_id,
attempt = task.attempts,
"claimed ingestion task"
);
if let Err(err) = ingestion_pipeline.process_task(task).await {
error!(%worker_id, %task_id, error = %err, "ingestion task failed");
}
Err(e) => error!("Error in job notification: {}", e),
}
Ok(None) => {
sleep(idle_backoff).await;
}
Err(err) => {
error!(%worker_id, error = %err, "failed to claim ingestion task");
warn!("Backing off for 1s after claim error");
sleep(Duration::from_secs(1)).await;
}
}
// If we reach here, the stream has ended (connection lost?)
error!("Database stream ended unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}

View File

@@ -1,16 +1,15 @@
use std::{sync::Arc, time::Instant};
use chrono::Utc;
use text_splitter::TextSplitter;
use tokio::time::{sleep, Duration};
use tracing::{info, warn};
use tracing::{info, info_span, warn};
use common::{
error::AppError,
storage::{
db::SurrealDbClient,
types::{
ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS},
ingestion_task::{IngestionTask, TaskErrorInfo},
knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk,
@@ -44,47 +43,81 @@ impl IngestionPipeline {
})
}
pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> {
let current_attempts = match task.status {
IngestionTaskStatus::InProgress { attempts, .. } => attempts + 1,
_ => 1,
};
let task_id = task.id.clone();
let attempt = task.attempts;
let worker_label = task
.worker_id
.clone()
.unwrap_or_else(|| "unknown-worker".to_string());
let span = info_span!(
"ingestion_task",
%task_id,
attempt,
worker_id = %worker_label,
state = %task.state.as_str()
);
let _enter = span.enter();
let processing_task = task.mark_processing(&self.db).await?;
// Update status to InProgress with attempt count
IngestionTask::update_status(
&task.id,
IngestionTaskStatus::InProgress {
attempts: current_attempts,
last_attempt: Utc::now(),
},
let text_content = to_text_content(
processing_task.content.clone(),
&self.db,
&self.config,
&self.openai_client,
)
.await?;
let text_content =
to_text_content(task.content, &self.db, &self.config, &self.openai_client).await?;
match self.process(&text_content).await {
Ok(_) => {
IngestionTask::update_status(&task.id, IngestionTaskStatus::Completed, &self.db)
.await?;
processing_task.mark_succeeded(&self.db).await?;
info!(%task_id, attempt, "ingestion task succeeded");
Ok(())
}
Err(e) => {
if current_attempts >= MAX_ATTEMPTS {
IngestionTask::update_status(
&task.id,
IngestionTaskStatus::Error {
message: format!("Max attempts reached: {}", e),
},
&self.db,
)
.await?;
Err(err) => {
let reason = err.to_string();
let error_info = TaskErrorInfo {
code: None,
message: reason.clone(),
};
if processing_task.can_retry() {
let delay = Self::retry_delay(processing_task.attempts);
processing_task
.mark_failed(error_info, delay, &self.db)
.await?;
warn!(
%task_id,
attempt = processing_task.attempts,
retry_in_secs = delay.as_secs(),
"ingestion task failed; scheduled retry"
);
} else {
processing_task
.mark_dead_letter(error_info, &self.db)
.await?;
warn!(
%task_id,
attempt = processing_task.attempts,
"ingestion task failed; moved to dead letter queue"
);
}
Err(AppError::Processing(e.to_string()))
Err(AppError::Processing(reason))
}
}
}
fn retry_delay(attempt: u32) -> Duration {
const BASE_SECONDS: u64 = 30;
const MAX_SECONDS: u64 = 15 * 60;
let capped_attempt = attempt.saturating_sub(1).min(5) as u32;
let multiplier = 2_u64.pow(capped_attempt);
let delay = BASE_SECONDS * multiplier;
Duration::from_secs(delay.min(MAX_SECONDS))
}
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
let now = Instant::now();