From 560249e5ae4a4b55c13f911d8ea49dde72a8f96c Mon Sep 17 00:00:00 2001 From: Per Stark Date: Mon, 13 Jan 2025 22:10:41 +0100 Subject: [PATCH] working datetime impl --- src/bin/server.rs | 9 +-------- src/ingress/jobqueue.rs | 29 +++++++-------------------- src/storage/types/mod.rs | 43 ++++++---------------------------------- 3 files changed, 14 insertions(+), 67 deletions(-) diff --git a/src/bin/server.rs b/src/bin/server.rs index 91bde6a..31bf083 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -40,14 +40,7 @@ use zettle_db::{ }, AppState, }, - storage::{ - db::SurrealDbClient, - types::{ - file_info::FileInfo, job::Job, knowledge_entity::KnowledgeEntity, - knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, - text_content::TextContent, user::User, - }, - }, + storage::{db::SurrealDbClient, types::user::User}, utils::{config::get_config, mailer::Mailer}, }; diff --git a/src/ingress/jobqueue.rs b/src/ingress/jobqueue.rs index 008fe36..cc3379e 100644 --- a/src/ingress/jobqueue.rs +++ b/src/ingress/jobqueue.rs @@ -1,9 +1,6 @@ use chrono::Utc; use futures::Stream; -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; +use std::sync::Arc; use surrealdb::{opt::PatchOp, Error, Notification}; use tracing::{error, info}; @@ -74,17 +71,11 @@ impl JobQueue { id: &str, status: JobStatus, ) -> Result, AppError> { - let now = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - .to_string(); - let job: Option = self .db .update((Job::table_name(), id)) .patch(PatchOp::replace("/status", status)) - .patch(PatchOp::replace("/updated_at", now)) + .patch(PatchOp::replace("/updated_at", Utc::now())) .await?; Ok(job) @@ -98,11 +89,9 @@ impl JobQueue { } /// Get unfinished jobs, ie newly created and in progress up two times - // pub async fn get_unfinished_jobs(&self) -> Result, AppError> { - pub async fn get_unfinished_jobs(&self) -> Result<(), AppError> { + pub async fn get_unfinished_jobs(&self) -> Result, AppError> { info!("Getting unfinished jobs"); - // let jobs: Vec = self - let jobs = self + let jobs: Vec = self .db .query( "SELECT * FROM type::table($table) @@ -116,14 +105,10 @@ impl JobQueue { ) .bind(("table", Job::table_name())) .bind(("max_attempts", MAX_ATTEMPTS)) - .await?; - // .take(0)?; + .await? + .take(0)?; - info!("{:?}", jobs); - // println!("Unfinished jobs found: {}", jobs.len()); - - Ok(()) - // Ok(jobs) + Ok(jobs) } // Method to process a single job diff --git a/src/storage/types/mod.rs b/src/storage/types/mod.rs index 2fa0352..d15dcca 100644 --- a/src/storage/types/mod.rs +++ b/src/storage/types/mod.rs @@ -72,45 +72,14 @@ macro_rules! stored_object { Into::::into(*date).serialize(serializer) } - // fn deserialize_datetime<'de, D>(deserializer: D) -> Result, D::Error> - // where - // D: serde::Deserializer<'de>, - // { - // let dt = surrealdb::sql::Datetime::deserialize(deserializer)?; - // Ok(DateTime::::from(dt)) - // } fn deserialize_datetime<'de, D>(deserializer: D) -> Result, D::Error> -where - D: serde::Deserializer<'de>, -{ - use serde::de::Error; - - // Accept various formats - let value = serde_json::Value::deserialize(deserializer)?; - - match value { - // Handle string format - serde_json::Value::String(s) => { - if s.starts_with("d\"") && s.ends_with('\"') { - let cleaned = &s[2..s.len()-1]; - DateTime::parse_from_rfc3339(cleaned) - .map(|dt| dt.with_timezone(&Utc)) - .map_err(Error::custom) - } else { - DateTime::parse_from_rfc3339(&s) - .map(|dt| dt.with_timezone(&Utc)) - .map_err(Error::custom) - } - }, - // Handle object format (in case SurrealDB returns datetime as an object) - serde_json::Value::Object(_) => { - let dt = surrealdb::sql::Datetime::deserialize(value) - .map_err(Error::custom)?; + where + D: serde::Deserializer<'de>, + { + let dt = surrealdb::sql::Datetime::deserialize(deserializer)?; Ok(DateTime::::from(dt)) - }, - _ => Err(Error::custom("unexpected datetime format")), - } -} + } + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct $name {