mirror of
https://github.com/perstarkse/minne.git
synced 2026-03-23 09:51:36 +01:00
refactored queue into Job
This commit is contained in:
@@ -1,15 +1,19 @@
|
||||
use std::{sync::Arc, time::Instant};
|
||||
|
||||
use chrono::Utc;
|
||||
use text_splitter::TextSplitter;
|
||||
use tracing::{debug, info};
|
||||
|
||||
use crate::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::{store_item, SurrealDbClient},
|
||||
db::SurrealDbClient,
|
||||
types::{
|
||||
knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship,
|
||||
text_chunk::TextChunk, text_content::TextContent,
|
||||
job::{Job, JobStatus, MAX_ATTEMPTS},
|
||||
knowledge_entity::KnowledgeEntity,
|
||||
knowledge_relationship::KnowledgeRelationship,
|
||||
text_chunk::TextChunk,
|
||||
text_content::TextContent,
|
||||
},
|
||||
},
|
||||
utils::embedding::generate_embedding,
|
||||
@@ -20,19 +24,53 @@ use super::analysis::{
|
||||
};
|
||||
|
||||
pub struct ContentProcessor {
|
||||
db_client: Arc<SurrealDbClient>,
|
||||
db: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
}
|
||||
|
||||
impl ContentProcessor {
|
||||
pub async fn new(
|
||||
surreal_db_client: Arc<SurrealDbClient>,
|
||||
db: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
) -> Result<Self, AppError> {
|
||||
Ok(Self {
|
||||
db_client: surreal_db_client,
|
||||
openai_client,
|
||||
})
|
||||
Ok(Self { db, openai_client })
|
||||
}
|
||||
pub async fn process_job(&self, job: Job) -> Result<(), AppError> {
|
||||
let current_attempts = match job.status {
|
||||
JobStatus::InProgress { attempts, .. } => attempts + 1,
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
// Update status to InProgress with attempt count
|
||||
Job::update_status(
|
||||
&job.id,
|
||||
JobStatus::InProgress {
|
||||
attempts: current_attempts,
|
||||
last_attempt: Utc::now(),
|
||||
},
|
||||
&self.db,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let text_content = job.content.to_text_content(&self.openai_client).await?;
|
||||
|
||||
match self.process(&text_content).await {
|
||||
Ok(_) => {
|
||||
Job::update_status(&job.id, JobStatus::Completed, &self.db).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
if current_attempts >= MAX_ATTEMPTS {
|
||||
Job::update_status(
|
||||
&job.id,
|
||||
JobStatus::Error(format!("Max attempts reached: {}", e)),
|
||||
&self.db,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(AppError::Processing(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
|
||||
@@ -59,9 +97,9 @@ impl ContentProcessor {
|
||||
)?;
|
||||
|
||||
// Store original content
|
||||
store_item(&self.db_client, content.to_owned()).await?;
|
||||
self.db.store_item(content.to_owned()).await?;
|
||||
|
||||
self.db_client.rebuild_indexes().await?;
|
||||
self.db.rebuild_indexes().await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -69,7 +107,7 @@ impl ContentProcessor {
|
||||
&self,
|
||||
content: &TextContent,
|
||||
) -> Result<LLMGraphAnalysisResult, AppError> {
|
||||
let analyser = IngressAnalyzer::new(&self.db_client, &self.openai_client);
|
||||
let analyser = IngressAnalyzer::new(&self.db, &self.openai_client);
|
||||
analyser
|
||||
.analyze_content(
|
||||
&content.category,
|
||||
@@ -87,12 +125,12 @@ impl ContentProcessor {
|
||||
) -> Result<(), AppError> {
|
||||
for entity in &entities {
|
||||
debug!("Storing entity: {:?}", entity);
|
||||
store_item(&self.db_client, entity.clone()).await?;
|
||||
self.db.store_item(entity.clone()).await?;
|
||||
}
|
||||
|
||||
for relationship in &relationships {
|
||||
debug!("Storing relationship: {:?}", relationship);
|
||||
relationship.store_relationship(&self.db_client).await?;
|
||||
relationship.store_relationship(&self.db).await?;
|
||||
}
|
||||
|
||||
info!(
|
||||
@@ -116,7 +154,7 @@ impl ContentProcessor {
|
||||
embedding,
|
||||
content.user_id.to_string(),
|
||||
);
|
||||
store_item(&self.db_client, text_chunk).await?;
|
||||
self.db.store_item(text_chunk).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
@@ -1,182 +0,0 @@
|
||||
use chrono::Utc;
|
||||
use futures::Stream;
|
||||
use std::sync::Arc;
|
||||
use surrealdb::{opt::PatchOp, Error, Notification};
|
||||
use tracing::{debug, error, info};
|
||||
|
||||
use crate::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::{delete_item, get_item, store_item, SurrealDbClient},
|
||||
types::{
|
||||
job::{Job, JobStatus},
|
||||
StoredObject,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
use super::{content_processor::ContentProcessor, ingress_object::IngressObject};
|
||||
|
||||
pub struct JobQueue {
|
||||
pub db: Arc<SurrealDbClient>,
|
||||
}
|
||||
|
||||
pub const MAX_ATTEMPTS: u32 = 3;
|
||||
|
||||
impl JobQueue {
|
||||
pub fn new(db: Arc<SurrealDbClient>) -> Self {
|
||||
Self { db }
|
||||
}
|
||||
|
||||
/// Creates a new job and stores it in the database
|
||||
pub async fn enqueue(&self, content: IngressObject, user_id: String) -> Result<(), AppError> {
|
||||
let job = Job::new(content, user_id).await;
|
||||
|
||||
info!("{:?}", job);
|
||||
|
||||
store_item(&self.db, job).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets all jobs for a specific user
|
||||
pub async fn get_user_jobs(&self, user_id: &str) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = self
|
||||
.db
|
||||
.query("SELECT * FROM job WHERE user_id = $user_id ORDER BY created_at DESC")
|
||||
.bind(("user_id", user_id.to_owned()))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
debug!("{:?}", jobs);
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
/// Gets all active jobs for a specific user
|
||||
pub async fn get_unfinished_user_jobs(&self, user_id: &str) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = self
|
||||
.db
|
||||
.query(
|
||||
"SELECT * FROM type::table($table)
|
||||
WHERE user_id = $user_id
|
||||
AND (
|
||||
status = 'Created'
|
||||
OR (
|
||||
status.InProgress != NONE
|
||||
AND status.InProgress.attempts < $max_attempts
|
||||
)
|
||||
)
|
||||
ORDER BY created_at DESC",
|
||||
)
|
||||
.bind(("table", Job::table_name()))
|
||||
.bind(("user_id", user_id.to_owned()))
|
||||
.bind(("max_attempts", MAX_ATTEMPTS))
|
||||
.await?
|
||||
.take(0)?;
|
||||
debug!("{:?}", jobs);
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
pub async fn delete_job(&self, id: &str, user_id: &str) -> Result<(), AppError> {
|
||||
get_item::<Job>(&self.db.client, id)
|
||||
.await?
|
||||
.filter(|job| job.user_id == user_id)
|
||||
.ok_or_else(|| {
|
||||
error!("Unauthorized attempt to delete job {id} by user {user_id}");
|
||||
AppError::Auth("Not authorized to delete this job".into())
|
||||
})?;
|
||||
|
||||
info!("Deleting job {id} for user {user_id}");
|
||||
delete_item::<Job>(&self.db.client, id)
|
||||
.await
|
||||
.map_err(AppError::Database)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn update_status(&self, id: &str, status: JobStatus) -> Result<(), AppError> {
|
||||
let _job: Option<Job> = self
|
||||
.db
|
||||
.update((Job::table_name(), id))
|
||||
.patch(PatchOp::replace("/status", status))
|
||||
.patch(PatchOp::replace(
|
||||
"/updated_at",
|
||||
surrealdb::sql::Datetime::default(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Listen for new jobs
|
||||
pub async fn listen_for_jobs(
|
||||
&self,
|
||||
) -> Result<impl Stream<Item = Result<Notification<Job>, Error>>, Error> {
|
||||
self.db.select("job").live().await
|
||||
}
|
||||
|
||||
/// Get unfinished jobs, ie newly created and in progress up two times
|
||||
pub async fn get_unfinished_jobs(&self) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = self
|
||||
.db
|
||||
.query(
|
||||
"SELECT * FROM type::table($table)
|
||||
WHERE
|
||||
status = 'Created'
|
||||
OR (
|
||||
status.InProgress != NONE
|
||||
AND status.InProgress.attempts < $max_attempts
|
||||
)
|
||||
ORDER BY created_at ASC",
|
||||
)
|
||||
.bind(("table", Job::table_name()))
|
||||
.bind(("max_attempts", MAX_ATTEMPTS))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
// Method to process a single job
|
||||
pub async fn process_job(
|
||||
&self,
|
||||
job: Job,
|
||||
processor: &ContentProcessor,
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
) -> Result<(), AppError> {
|
||||
let current_attempts = match job.status {
|
||||
JobStatus::InProgress { attempts, .. } => attempts + 1,
|
||||
_ => 1,
|
||||
};
|
||||
|
||||
// Update status to InProgress with attempt count
|
||||
self.update_status(
|
||||
&job.id,
|
||||
JobStatus::InProgress {
|
||||
attempts: current_attempts,
|
||||
last_attempt: Utc::now(),
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
let text_content = job.content.to_text_content(&openai_client).await?;
|
||||
|
||||
match processor.process(&text_content).await {
|
||||
Ok(_) => {
|
||||
self.update_status(&job.id, JobStatus::Completed).await?;
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
if current_attempts >= MAX_ATTEMPTS {
|
||||
self.update_status(
|
||||
&job.id,
|
||||
JobStatus::Error(format!("Max attempts reached: {}", e)),
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
Err(AppError::Processing(e.to_string()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,5 +2,3 @@ pub mod analysis;
|
||||
pub mod content_processor;
|
||||
pub mod ingress_input;
|
||||
pub mod ingress_object;
|
||||
pub mod jobqueue;
|
||||
pub mod queue_task;
|
||||
|
||||
@@ -1,13 +0,0 @@
|
||||
use crate::ingress::ingress_object::IngressObject;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct QueueTask {
|
||||
pub delivery_tag: u64,
|
||||
pub content: IngressObject,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct QueueTaskResponse {
|
||||
pub tasks: Vec<QueueTask>,
|
||||
}
|
||||
@@ -1,13 +1,14 @@
|
||||
use crate::error::AppError;
|
||||
|
||||
use super::types::{analytics::Analytics, system_settings::SystemSettings, StoredObject};
|
||||
use super::types::{analytics::Analytics, job::Job, system_settings::SystemSettings, StoredObject};
|
||||
use axum_session::{SessionConfig, SessionError, SessionStore};
|
||||
use axum_session_surreal::SessionSurrealPool;
|
||||
use futures::Stream;
|
||||
use std::ops::Deref;
|
||||
use surrealdb::{
|
||||
engine::any::{connect, Any},
|
||||
opt::auth::Root,
|
||||
Error, Surreal,
|
||||
Error, Notification, Surreal,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
@@ -53,8 +54,8 @@ impl SurrealDbClient {
|
||||
}
|
||||
|
||||
pub async fn ensure_initialized(&self) -> Result<(), AppError> {
|
||||
Self::build_indexes(&self).await?;
|
||||
Self::setup_auth(&self).await?;
|
||||
Self::build_indexes(self).await?;
|
||||
Self::setup_auth(self).await?;
|
||||
|
||||
Analytics::ensure_initialized(self).await?;
|
||||
SystemSettings::ensure_initialized(self).await?;
|
||||
@@ -107,6 +108,75 @@ impl SurrealDbClient {
|
||||
{
|
||||
self.client.delete(T::table_name()).await
|
||||
}
|
||||
|
||||
/// Operation to store a object in SurrealDB, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `item` - The item to be stored
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result` - Item or Error
|
||||
pub async fn store_item<T>(&self, item: T) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: StoredObject + Send + Sync + 'static,
|
||||
{
|
||||
self.client
|
||||
.create((T::table_name(), item.get_id()))
|
||||
.content(item)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Operation to retrieve all objects from a certain table, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result` - Vec<T> or Error
|
||||
pub async fn get_all_stored_items<T>(&self) -> Result<Vec<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
self.client.select(T::table_name()).await
|
||||
}
|
||||
|
||||
/// Operation to retrieve a single object by its ID, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `id` - The ID of the item to retrieve
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Option<T>, Error>` - The found item or Error
|
||||
pub async fn get_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
self.client.select((T::table_name(), id)).await
|
||||
}
|
||||
|
||||
/// Operation to delete a single object by its ID, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `id` - The ID of the item to delete
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Option<T>, Error>` - The deleted item or Error
|
||||
pub async fn delete_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
self.client.delete((T::table_name(), id)).await
|
||||
}
|
||||
|
||||
/// Operation to listen to a table for updates, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Option<T>, Error>` - The deleted item or Error
|
||||
pub async fn listen<T>(
|
||||
&self,
|
||||
) -> Result<impl Stream<Item = Result<Notification<Job>, Error>>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
self.client.select(T::table_name()).live().await
|
||||
}
|
||||
}
|
||||
|
||||
impl Deref for SurrealDbClient {
|
||||
@@ -116,65 +186,3 @@ impl Deref for SurrealDbClient {
|
||||
&self.client
|
||||
}
|
||||
}
|
||||
|
||||
/// Operation to store a object in SurrealDB, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `db_client` - A initialized database client
|
||||
/// * `item` - The item to be stored
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result` - Item or Error
|
||||
pub async fn store_item<T>(db_client: &Surreal<Any>, item: T) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: StoredObject + Send + Sync + 'static,
|
||||
{
|
||||
db_client
|
||||
.create((T::table_name(), item.get_id()))
|
||||
.content(item)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Operation to retrieve all objects from a certain table, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `db_client` - A initialized database client
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result` - Vec<T> or Error
|
||||
pub async fn get_all_stored_items<T>(db_client: &Surreal<Any>) -> Result<Vec<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
db_client.select(T::table_name()).await
|
||||
}
|
||||
|
||||
/// Operation to retrieve a single object by its ID, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `db_client` - An initialized database client
|
||||
/// * `id` - The ID of the item to retrieve
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Option<T>, Error>` - The found item or Error
|
||||
pub async fn get_item<T>(db_client: &Surreal<Any>, id: &str) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
db_client.select((T::table_name(), id)).await
|
||||
}
|
||||
|
||||
/// Operation to delete a single object by its ID, requires the struct to implement StoredObject
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `db_client` - An initialized database client
|
||||
/// * `id` - The ID of the item to delete
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Option<T>, Error>` - The deleted item or Error
|
||||
pub async fn delete_item<T>(db_client: &Surreal<Any>, id: &str) -> Result<Option<T>, Error>
|
||||
where
|
||||
T: for<'de> StoredObject,
|
||||
{
|
||||
db_client.delete((T::table_name(), id)).await
|
||||
}
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
error::AppError,
|
||||
storage::db::{get_item, SurrealDbClient},
|
||||
stored_object,
|
||||
};
|
||||
use crate::{error::AppError, storage::db::SurrealDbClient, stored_object};
|
||||
|
||||
use super::message::Message;
|
||||
|
||||
@@ -30,7 +26,8 @@ impl Conversation {
|
||||
user_id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<(Self, Vec<Message>), AppError> {
|
||||
let conversation: Conversation = get_item(&db, conversation_id)
|
||||
let conversation: Conversation = db
|
||||
.get_item(conversation_id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Conversation not found".to_string()))?;
|
||||
|
||||
|
||||
@@ -11,10 +11,7 @@ use tokio::fs::remove_dir_all;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
storage::db::{delete_item, get_item, store_item, SurrealDbClient},
|
||||
stored_object,
|
||||
};
|
||||
use crate::{storage::db::SurrealDbClient, stored_object};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum FileError {
|
||||
@@ -89,7 +86,7 @@ impl FileInfo {
|
||||
};
|
||||
|
||||
// Store in database
|
||||
store_item(&db_client.client, file_info.clone()).await?;
|
||||
db_client.store_item(file_info.clone()).await?;
|
||||
|
||||
Ok(file_info)
|
||||
}
|
||||
@@ -210,7 +207,7 @@ impl FileInfo {
|
||||
/// `Result<(), FileError>`
|
||||
pub async fn delete_by_id(id: &str, db_client: &SurrealDbClient) -> Result<(), FileError> {
|
||||
// Get the FileInfo from the database
|
||||
let file_info = match get_item::<FileInfo>(db_client, id).await? {
|
||||
let file_info = match db_client.get_item::<FileInfo>(id).await? {
|
||||
Some(info) => info,
|
||||
None => {
|
||||
return Err(FileError::FileNotFound(format!(
|
||||
@@ -241,7 +238,7 @@ impl FileInfo {
|
||||
}
|
||||
|
||||
// Delete the FileInfo from the database
|
||||
delete_item::<FileInfo>(db_client, id).await?;
|
||||
db_client.delete_item::<FileInfo>(id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,6 +1,11 @@
|
||||
use futures::Stream;
|
||||
use surrealdb::{opt::PatchOp, Notification};
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{ingress::ingress_object::IngressObject, stored_object};
|
||||
use crate::{
|
||||
error::AppError, ingress::ingress_object::IngressObject, storage::db::SurrealDbClient,
|
||||
stored_object,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub enum JobStatus {
|
||||
@@ -20,6 +25,8 @@ stored_object!(Job, "job", {
|
||||
user_id: String
|
||||
});
|
||||
|
||||
pub const MAX_ATTEMPTS: u32 = 3;
|
||||
|
||||
impl Job {
|
||||
pub async fn new(content: IngressObject, user_id: String) -> Self {
|
||||
let now = Utc::now();
|
||||
@@ -33,4 +40,64 @@ impl Job {
|
||||
user_id,
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a new job and stores it in the database
|
||||
pub async fn create_and_add_to_db(
|
||||
content: IngressObject,
|
||||
user_id: String,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<(), AppError> {
|
||||
let job = Self::new(content, user_id).await;
|
||||
|
||||
db.store_item(job).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Update job status
|
||||
pub async fn update_status(
|
||||
id: &str,
|
||||
status: JobStatus,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<(), AppError> {
|
||||
let _job: Option<Job> = db
|
||||
.update((Self::table_name(), id))
|
||||
.patch(PatchOp::replace("/status", status))
|
||||
.patch(PatchOp::replace(
|
||||
"/updated_at",
|
||||
surrealdb::sql::Datetime::default(),
|
||||
))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Listen for new jobs
|
||||
pub async fn listen_for_jobs(
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<impl Stream<Item = Result<Notification<Job>, surrealdb::Error>>, surrealdb::Error>
|
||||
{
|
||||
db.listen::<Job>().await
|
||||
}
|
||||
|
||||
/// Get all unfinished jobs, ie newly created and in progress up two times
|
||||
pub async fn get_unfinished_jobs(db: &SurrealDbClient) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = db
|
||||
.query(
|
||||
"SELECT * FROM type::table($table)
|
||||
WHERE
|
||||
status = 'Created'
|
||||
OR (
|
||||
status.InProgress != NONE
|
||||
AND status.InProgress.attempts < $max_attempts
|
||||
)
|
||||
ORDER BY created_at ASC",
|
||||
)
|
||||
.bind(("table", Self::table_name()))
|
||||
.bind(("max_attempts", MAX_ATTEMPTS))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,10 @@
|
||||
use crate::{
|
||||
error::AppError,
|
||||
storage::db::{get_item, SurrealDbClient},
|
||||
stored_object,
|
||||
};
|
||||
use crate::{error::AppError, storage::db::SurrealDbClient, stored_object};
|
||||
use axum_session_auth::Authentication;
|
||||
use surrealdb::{engine::any::Any, Surreal};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{
|
||||
conversation::Conversation, knowledge_entity::KnowledgeEntity,
|
||||
conversation::Conversation, job::Job, knowledge_entity::KnowledgeEntity,
|
||||
knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings,
|
||||
text_content::TextContent,
|
||||
};
|
||||
@@ -32,7 +28,10 @@ stored_object!(User, "user", {
|
||||
impl Authentication<User, String, Surreal<Any>> for User {
|
||||
async fn load_user(userid: String, db: Option<&Surreal<Any>>) -> Result<User, anyhow::Error> {
|
||||
let db = db.unwrap();
|
||||
Ok(get_item::<Self>(db, userid.as_str()).await?.unwrap())
|
||||
Ok(db
|
||||
.select((Self::table_name(), userid.as_str()))
|
||||
.await?
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
fn is_authenticated(&self) -> bool {
|
||||
@@ -308,7 +307,8 @@ impl User {
|
||||
user_id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<KnowledgeEntity, AppError> {
|
||||
let entity: KnowledgeEntity = get_item(db, &id)
|
||||
let entity: KnowledgeEntity = db
|
||||
.get_item(id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Entity not found".into()))?;
|
||||
|
||||
@@ -324,7 +324,8 @@ impl User {
|
||||
user_id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<TextContent, AppError> {
|
||||
let text_content: TextContent = get_item(db, &id)
|
||||
let text_content: TextContent = db
|
||||
.get_item(id)
|
||||
.await?
|
||||
.ok_or_else(|| AppError::NotFound("Content not found".into()))?;
|
||||
|
||||
@@ -349,4 +350,49 @@ impl User {
|
||||
|
||||
Ok(conversations)
|
||||
}
|
||||
|
||||
/// Gets all active jobs for the specified user
|
||||
pub async fn get_unfinished_jobs(
|
||||
user_id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<Vec<Job>, AppError> {
|
||||
let jobs: Vec<Job> = db
|
||||
.query(
|
||||
"SELECT * FROM type::table($table)
|
||||
WHERE user_id = $user_id
|
||||
AND (
|
||||
status = 'Created'
|
||||
OR (
|
||||
status.InProgress != NONE
|
||||
AND status.InProgress.attempts < $max_attempts
|
||||
)
|
||||
)
|
||||
ORDER BY created_at DESC",
|
||||
)
|
||||
.bind(("table", Job::table_name()))
|
||||
.bind(("user_id", user_id.to_owned()))
|
||||
.bind(("max_attempts", 3))
|
||||
.await?
|
||||
.take(0)?;
|
||||
|
||||
Ok(jobs)
|
||||
}
|
||||
|
||||
/// Validate and delete job
|
||||
pub async fn validate_and_delete_job(
|
||||
id: &str,
|
||||
user_id: &str,
|
||||
db: &SurrealDbClient,
|
||||
) -> Result<(), AppError> {
|
||||
db.get_item::<Job>(id)
|
||||
.await?
|
||||
.filter(|job| job.user_id == user_id)
|
||||
.ok_or_else(|| AppError::Auth("Not authorized to delete this job".into()))?;
|
||||
|
||||
db.delete_item::<Job>(id)
|
||||
.await
|
||||
.map_err(AppError::Database)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user