From 2d630e2af9f9a0713d9a51e36dfeefd9cf27fbd3 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Wed, 27 May 2026 11:23:39 +0200 Subject: [PATCH] chore: tightening and removing super fn --- common/src/storage/types/ingestion_task.rs | 83 ++++++++-------------- common/src/storage/types/message.rs | 1 + 2 files changed, 31 insertions(+), 53 deletions(-) diff --git a/common/src/storage/types/ingestion_task.rs b/common/src/storage/types/ingestion_task.rs index e17305b..5123617 100644 --- a/common/src/storage/types/ingestion_task.rs +++ b/common/src/storage/types/ingestion_task.rs @@ -1,12 +1,3 @@ -#![allow( - clippy::cast_possible_wrap, - clippy::items_after_statements, - clippy::arithmetic_side_effects, - clippy::cast_sign_loss, - clippy::missing_docs_in_private_items, - clippy::trivially_copy_pass_by_ref, - clippy::expect_used -)] use std::time::Duration; use chrono::Duration as ChronoDuration; @@ -91,7 +82,7 @@ enum TaskTransition { } impl TaskTransition { - fn as_str(&self) -> &'static str { + fn as_str(self) -> &'static str { match self { TaskTransition::StartProcessing => "start_processing", TaskTransition::Succeed => "succeed", @@ -141,27 +132,9 @@ mod lifecycle { pub(super) fn pending() -> TaskLifecycleMachine<(), Pending> { TaskLifecycleMachine::new(()) } - - pub(super) fn reserved() -> TaskLifecycleMachine<(), Reserved> { - pending() - .reserve() - .expect("reserve transition from Pending should exist") - } - - pub(super) fn processing() -> TaskLifecycleMachine<(), Processing> { - reserved() - .start_processing() - .expect("start_processing transition from Reserved should exist") - } - - pub(super) fn failed() -> TaskLifecycleMachine<(), Failed> { - processing() - .fail() - .expect("fail transition from Processing should exist") - } } -fn invalid_transition(state: &TaskState, event: TaskTransition) -> AppError { +fn invalid_transition(state: TaskState, event: TaskTransition) -> AppError { AppError::Validation(format!( "Invalid task transition: {} -> {}", state.as_str(), @@ -225,7 +198,7 @@ impl IngestionTask { } pub fn lease_duration(&self) -> Duration { - Duration::from_secs(self.lease_duration_secs.max(0) as u64) + Duration::from_secs(u64::try_from(self.lease_duration_secs.max(0)).unwrap_or(0)) } pub async fn create_and_add_to_db( @@ -244,9 +217,6 @@ impl IngestionTask { now: chrono::DateTime, lease_duration: Duration, ) -> Result, AppError> { - debug_assert!(lifecycle::pending().reserve().is_ok()); - debug_assert!(lifecycle::failed().reserve().is_ok()); - const CLAIM_QUERY: &str = r#" UPDATE ( SELECT * FROM type::table($table) @@ -276,6 +246,13 @@ impl IngestionTask { RETURN *; "#; + debug_assert!(lifecycle::pending().reserve().is_ok()); + debug_assert!( + lifecycle::pending() + .reserve() + .is_ok_and(|m| m.start_processing().is_ok_and(|m| m.fail().is_ok_and(|m| m.reserve().is_ok()))) + ); + let mut result = db .client .query(CLAIM_QUERY) @@ -300,7 +277,7 @@ impl IngestionTask { .bind(("reserved_state", TaskState::Reserved.as_str())) .bind(("now", SurrealDatetime::from(now))) .bind(("worker_id", worker_id.to_string())) - .bind(("lease_secs", lease_duration.as_secs() as i64)) + .bind(("lease_secs", i64::try_from(lease_duration.as_secs()).unwrap_or(i64::MAX))) .await?; let task: Option = result.take(0)?; @@ -330,7 +307,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::StartProcessing)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::StartProcessing)) } pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result { @@ -361,7 +338,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Succeed)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Succeed)) } pub async fn mark_failed( @@ -370,10 +347,6 @@ impl IngestionTask { retry_delay: Duration, db: &SurrealDbClient, ) -> Result { - let now = chrono::Utc::now(); - let retry_at = now - + ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30)); - const FAIL_QUERY: &str = r#" UPDATE type::thing($table, $id) SET state = $failed, @@ -388,6 +361,13 @@ impl IngestionTask { RETURN *; "#; + let now = chrono::Utc::now(); + let retry_at = now + .checked_add_signed( + ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30)), + ) + .unwrap_or(now); + let mut result = db .client .query(FAIL_QUERY) @@ -403,7 +383,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Fail)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Fail)) } pub async fn mark_dead_letter( @@ -439,7 +419,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::DeadLetter)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::DeadLetter)) } pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result { @@ -472,7 +452,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Cancel)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Cancel)) } pub async fn release(&self, db: &SurrealDbClient) -> Result { @@ -498,7 +478,7 @@ impl IngestionTask { .await?; let updated: Option = result.take(0)?; - updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Release)) + updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Release)) } pub async fn get_unfinished_tasks( @@ -668,10 +648,9 @@ mod tests { let task = IngestionTask::new(payload.clone(), user_id.to_string()); db.store_item(task.clone()).await.with_context(|| "store".to_string())?; - let err = task + let Err(err) = task .mark_processing(&db) - .await - .expect_err("processing should fail without reservation"); + .await else { anyhow::bail!("processing should fail without reservation") }; match err { AppError::Validation(message) => { @@ -694,7 +673,7 @@ mod tests { let task = IngestionTask::new(payload.clone(), user_id.to_string()); db.store_item(task.clone()).await.with_context(|| "store".to_string())?; - let err = task + let Err(err) = task .mark_failed( TaskErrorInfo { code: None, @@ -703,8 +682,7 @@ mod tests { Duration::from_secs(30), &db, ) - .await - .expect_err("failing should require processing state"); + .await else { anyhow::bail!("failing should require processing state") }; match err { AppError::Validation(message) => { @@ -727,10 +705,9 @@ mod tests { let task = IngestionTask::new(payload.clone(), user_id.to_string()); db.store_item(task.clone()).await.with_context(|| "store".to_string())?; - let err = task + let Err(err) = task .release(&db) - .await - .expect_err("release should require reserved state"); + .await else { anyhow::bail!("release should require reserved state") }; match err { AppError::Validation(message) => { diff --git a/common/src/storage/types/message.rs b/common/src/storage/types/message.rs index 4f2f7f8..9c8c4b1 100644 --- a/common/src/storage/types/message.rs +++ b/common/src/storage/types/message.rs @@ -1,6 +1,7 @@ #![allow(clippy::module_name_repetitions)] use uuid::Uuid; +use std::fmt; use std::fmt::Write; use crate::stored_object;