chore: tightening and removing super fn

This commit is contained in:
Per Stark
2026-05-27 11:23:39 +02:00
parent 9ec11e1f79
commit 2d630e2af9
2 changed files with 31 additions and 53 deletions
+30 -53
View File
@@ -1,12 +1,3 @@
#![allow(
clippy::cast_possible_wrap,
clippy::items_after_statements,
clippy::arithmetic_side_effects,
clippy::cast_sign_loss,
clippy::missing_docs_in_private_items,
clippy::trivially_copy_pass_by_ref,
clippy::expect_used
)]
use std::time::Duration; use std::time::Duration;
use chrono::Duration as ChronoDuration; use chrono::Duration as ChronoDuration;
@@ -91,7 +82,7 @@ enum TaskTransition {
} }
impl TaskTransition { impl TaskTransition {
fn as_str(&self) -> &'static str { fn as_str(self) -> &'static str {
match self { match self {
TaskTransition::StartProcessing => "start_processing", TaskTransition::StartProcessing => "start_processing",
TaskTransition::Succeed => "succeed", TaskTransition::Succeed => "succeed",
@@ -141,27 +132,9 @@ mod lifecycle {
pub(super) fn pending() -> TaskLifecycleMachine<(), Pending> { pub(super) fn pending() -> TaskLifecycleMachine<(), Pending> {
TaskLifecycleMachine::new(()) TaskLifecycleMachine::new(())
} }
pub(super) fn reserved() -> TaskLifecycleMachine<(), Reserved> {
pending()
.reserve()
.expect("reserve transition from Pending should exist")
} }
pub(super) fn processing() -> TaskLifecycleMachine<(), Processing> { fn invalid_transition(state: TaskState, event: TaskTransition) -> AppError {
reserved()
.start_processing()
.expect("start_processing transition from Reserved should exist")
}
pub(super) fn failed() -> TaskLifecycleMachine<(), Failed> {
processing()
.fail()
.expect("fail transition from Processing should exist")
}
}
fn invalid_transition(state: &TaskState, event: TaskTransition) -> AppError {
AppError::Validation(format!( AppError::Validation(format!(
"Invalid task transition: {} -> {}", "Invalid task transition: {} -> {}",
state.as_str(), state.as_str(),
@@ -225,7 +198,7 @@ impl IngestionTask {
} }
pub fn lease_duration(&self) -> Duration { pub fn lease_duration(&self) -> Duration {
Duration::from_secs(self.lease_duration_secs.max(0) as u64) Duration::from_secs(u64::try_from(self.lease_duration_secs.max(0)).unwrap_or(0))
} }
pub async fn create_and_add_to_db( pub async fn create_and_add_to_db(
@@ -244,9 +217,6 @@ impl IngestionTask {
now: chrono::DateTime<chrono::Utc>, now: chrono::DateTime<chrono::Utc>,
lease_duration: Duration, lease_duration: Duration,
) -> Result<Option<IngestionTask>, AppError> { ) -> Result<Option<IngestionTask>, AppError> {
debug_assert!(lifecycle::pending().reserve().is_ok());
debug_assert!(lifecycle::failed().reserve().is_ok());
const CLAIM_QUERY: &str = r#" const CLAIM_QUERY: &str = r#"
UPDATE ( UPDATE (
SELECT * FROM type::table($table) SELECT * FROM type::table($table)
@@ -276,6 +246,13 @@ impl IngestionTask {
RETURN *; RETURN *;
"#; "#;
debug_assert!(lifecycle::pending().reserve().is_ok());
debug_assert!(
lifecycle::pending()
.reserve()
.is_ok_and(|m| m.start_processing().is_ok_and(|m| m.fail().is_ok_and(|m| m.reserve().is_ok())))
);
let mut result = db let mut result = db
.client .client
.query(CLAIM_QUERY) .query(CLAIM_QUERY)
@@ -300,7 +277,7 @@ impl IngestionTask {
.bind(("reserved_state", TaskState::Reserved.as_str())) .bind(("reserved_state", TaskState::Reserved.as_str()))
.bind(("now", SurrealDatetime::from(now))) .bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", worker_id.to_string())) .bind(("worker_id", worker_id.to_string()))
.bind(("lease_secs", lease_duration.as_secs() as i64)) .bind(("lease_secs", i64::try_from(lease_duration.as_secs()).unwrap_or(i64::MAX)))
.await?; .await?;
let task: Option<IngestionTask> = result.take(0)?; let task: Option<IngestionTask> = result.take(0)?;
@@ -330,7 +307,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::StartProcessing)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::StartProcessing))
} }
pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> { pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
@@ -361,7 +338,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Succeed)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Succeed))
} }
pub async fn mark_failed( pub async fn mark_failed(
@@ -370,10 +347,6 @@ impl IngestionTask {
retry_delay: Duration, retry_delay: Duration,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> { ) -> Result<IngestionTask, AppError> {
let now = chrono::Utc::now();
let retry_at = now
+ ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30));
const FAIL_QUERY: &str = r#" const FAIL_QUERY: &str = r#"
UPDATE type::thing($table, $id) UPDATE type::thing($table, $id)
SET state = $failed, SET state = $failed,
@@ -388,6 +361,13 @@ impl IngestionTask {
RETURN *; RETURN *;
"#; "#;
let now = chrono::Utc::now();
let retry_at = now
.checked_add_signed(
ChronoDuration::from_std(retry_delay).unwrap_or_else(|_| ChronoDuration::seconds(30)),
)
.unwrap_or(now);
let mut result = db let mut result = db
.client .client
.query(FAIL_QUERY) .query(FAIL_QUERY)
@@ -403,7 +383,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Fail)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Fail))
} }
pub async fn mark_dead_letter( pub async fn mark_dead_letter(
@@ -439,7 +419,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::DeadLetter)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::DeadLetter))
} }
pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> { pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
@@ -472,7 +452,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Cancel)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Cancel))
} }
pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> { pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
@@ -498,7 +478,7 @@ impl IngestionTask {
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
updated.ok_or_else(|| invalid_transition(&self.state, TaskTransition::Release)) updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Release))
} }
pub async fn get_unfinished_tasks( pub async fn get_unfinished_tasks(
@@ -668,10 +648,9 @@ mod tests {
let task = IngestionTask::new(payload.clone(), user_id.to_string()); let task = IngestionTask::new(payload.clone(), user_id.to_string());
db.store_item(task.clone()).await.with_context(|| "store".to_string())?; db.store_item(task.clone()).await.with_context(|| "store".to_string())?;
let err = task let Err(err) = task
.mark_processing(&db) .mark_processing(&db)
.await .await else { anyhow::bail!("processing should fail without reservation") };
.expect_err("processing should fail without reservation");
match err { match err {
AppError::Validation(message) => { AppError::Validation(message) => {
@@ -694,7 +673,7 @@ mod tests {
let task = IngestionTask::new(payload.clone(), user_id.to_string()); let task = IngestionTask::new(payload.clone(), user_id.to_string());
db.store_item(task.clone()).await.with_context(|| "store".to_string())?; db.store_item(task.clone()).await.with_context(|| "store".to_string())?;
let err = task let Err(err) = task
.mark_failed( .mark_failed(
TaskErrorInfo { TaskErrorInfo {
code: None, code: None,
@@ -703,8 +682,7 @@ mod tests {
Duration::from_secs(30), Duration::from_secs(30),
&db, &db,
) )
.await .await else { anyhow::bail!("failing should require processing state") };
.expect_err("failing should require processing state");
match err { match err {
AppError::Validation(message) => { AppError::Validation(message) => {
@@ -727,10 +705,9 @@ mod tests {
let task = IngestionTask::new(payload.clone(), user_id.to_string()); let task = IngestionTask::new(payload.clone(), user_id.to_string());
db.store_item(task.clone()).await.with_context(|| "store".to_string())?; db.store_item(task.clone()).await.with_context(|| "store".to_string())?;
let err = task let Err(err) = task
.release(&db) .release(&db)
.await .await else { anyhow::bail!("release should require reserved state") };
.expect_err("release should require reserved state");
match err { match err {
AppError::Validation(message) => { AppError::Validation(message) => {
+1
View File
@@ -1,6 +1,7 @@
#![allow(clippy::module_name_repetitions)] #![allow(clippy::module_name_repetitions)]
use uuid::Uuid; use uuid::Uuid;
use std::fmt;
use std::fmt::Write; use std::fmt::Write;
use crate::stored_object; use crate::stored_object;