fix: remove remnant job table, normalize taskstatus enum

This commit is contained in:
Per Stark
2025-06-27 23:18:16 +02:00
parent 43263fa77e
commit ec16f2100c
9 changed files with 33 additions and 22 deletions

View File

@@ -0,0 +1 @@
REMOVE TABLE job;

View File

@@ -1 +1 @@
{"schemas":"--- original\n+++ modified\n@@ -157,10 +157,12 @@\n DEFINE FIELD IF NOT EXISTS require_email_verification ON system_settings TYPE bool;\n DEFINE FIELD IF NOT EXISTS query_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS processing_model ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS image_processing_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS embedding_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS embedding_dimensions ON system_settings TYPE int;\n DEFINE FIELD IF NOT EXISTS query_system_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;\n\n # Defines the schema for the 'text_chunk' table.\n\n","events":null} {"schemas":"--- original\n+++ modified\n@@ -57,10 +57,7 @@\n DEFINE FIELD IF NOT EXISTS created_at ON ingestion_task TYPE string;\n DEFINE FIELD IF NOT EXISTS updated_at ON ingestion_task TYPE string;\n\n-# Custom fields from the IngestionTask struct\n-# IngestionPayload is complex, store as object\n DEFINE FIELD IF NOT EXISTS content ON ingestion_task TYPE object;\n-# IngestionTaskStatus can hold data (InProgress), store as object\n DEFINE FIELD IF NOT EXISTS status ON ingestion_task TYPE object;\n DEFINE FIELD IF NOT EXISTS user_id ON ingestion_task TYPE string;\n\n@@ -157,10 +154,12 @@\n DEFINE FIELD IF NOT EXISTS require_email_verification ON system_settings TYPE bool;\n DEFINE FIELD IF NOT EXISTS query_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS processing_model ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS image_processing_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS embedding_model ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS embedding_dimensions ON system_settings TYPE int;\n DEFINE FIELD IF NOT EXISTS query_system_prompt ON system_settings TYPE string;\n DEFINE FIELD IF NOT EXISTS ingestion_system_prompt ON system_settings TYPE string;\n+DEFINE FIELD IF NOT EXISTS image_processing_prompt ON system_settings TYPE string;\n\n # Defines the schema for the 'text_chunk' table.\n\n","events":null}

View File

@@ -6,10 +6,7 @@ DEFINE TABLE IF NOT EXISTS ingestion_task SCHEMALESS;
DEFINE FIELD IF NOT EXISTS created_at ON ingestion_task TYPE string; DEFINE FIELD IF NOT EXISTS created_at ON ingestion_task TYPE string;
DEFINE FIELD IF NOT EXISTS updated_at ON ingestion_task TYPE string; DEFINE FIELD IF NOT EXISTS updated_at ON ingestion_task TYPE string;
# Custom fields from the IngestionTask struct
# IngestionPayload is complex, store as object
DEFINE FIELD IF NOT EXISTS content ON ingestion_task TYPE object; DEFINE FIELD IF NOT EXISTS content ON ingestion_task TYPE object;
# IngestionTaskStatus can hold data (InProgress), store as object
DEFINE FIELD IF NOT EXISTS status ON ingestion_task TYPE object; DEFINE FIELD IF NOT EXISTS status ON ingestion_task TYPE object;
DEFINE FIELD IF NOT EXISTS user_id ON ingestion_task TYPE string; DEFINE FIELD IF NOT EXISTS user_id ON ingestion_task TYPE string;

View File

@@ -7,6 +7,7 @@ use crate::{error::AppError, storage::db::SurrealDbClient, stored_object};
use super::ingestion_payload::IngestionPayload; use super::ingestion_payload::IngestionPayload;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "name")]
pub enum IngestionTaskStatus { pub enum IngestionTaskStatus {
Created, Created,
InProgress { InProgress {
@@ -14,7 +15,9 @@ pub enum IngestionTaskStatus {
last_attempt: DateTime<Utc>, last_attempt: DateTime<Utc>,
}, },
Completed, Completed,
Error(String), Error {
message: String,
},
Cancelled, Cancelled,
} }
@@ -85,10 +88,10 @@ impl IngestionTask {
.query( .query(
"SELECT * FROM type::table($table) "SELECT * FROM type::table($table)
WHERE WHERE
status = 'Created' status.name = 'Created'
OR ( OR (
status.InProgress != NONE status.name = 'InProgress'
AND status.InProgress.attempts < $max_attempts AND status.attempts < $max_attempts
) )
ORDER BY created_at ASC", ORDER BY created_at ASC",
) )
@@ -241,7 +244,9 @@ mod tests {
completed_task.status = IngestionTaskStatus::Completed; completed_task.status = IngestionTaskStatus::Completed;
let mut error_task = IngestionTask::new(payload.clone(), user_id.to_string()).await; let mut error_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
error_task.status = IngestionTaskStatus::Error("Test error".to_string()); error_task.status = IngestionTaskStatus::Error {
message: "Test error".to_string(),
};
// Store all tasks // Store all tasks
db.store_item(created_task) db.store_item(created_task)
@@ -280,7 +285,7 @@ mod tests {
} }
} }
IngestionTaskStatus::Completed => "Completed", IngestionTaskStatus::Completed => "Completed",
IngestionTaskStatus::Error(_) => "Error", IngestionTaskStatus::Error { .. } => "Error",
IngestionTaskStatus::Cancelled => "Cancelled", IngestionTaskStatus::Cancelled => "Cancelled",
}) })
.collect(); .collect();

View File

@@ -7,7 +7,7 @@ use axum_htmx::{HxBoosted, HxRequest};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use common::storage::types::{ use common::storage::types::{
conversation::Conversation, file_info::FileInfo, text_content::TextContent, user::User, conversation::Conversation, file_info::FileInfo, text_content::TextContent, user::User, knowledge_entity::KnowledgeEntity, text_chunk::TextChunk,
}; };
use crate::{ use crate::{
@@ -138,6 +138,10 @@ pub async fn delete_text_content(
FileInfo::delete_by_id(&file_info.id, &state.db).await?; FileInfo::delete_by_id(&file_info.id, &state.db).await?;
} }
// Delete related knowledge entities and text chunks
KnowledgeEntity::delete_by_source_id(&id, &state.db).await?;
TextChunk::delete_by_source_id(&id, &state.db).await?;
// Delete the text content // Delete the text content
state.db.delete_item::<TextContent>(&id).await?; state.db.delete_item::<TextContent>(&id).await?;

View File

@@ -186,9 +186,9 @@ pub async fn get_task_updates_stream(
format!("In progress, attempt {}", attempts) format!("In progress, attempt {}", attempts)
} }
IngestionTaskStatus::Completed => "Completed".to_string(), IngestionTaskStatus::Completed => "Completed".to_string(),
IngestionTaskStatus::Error(ref err_msg) => { IngestionTaskStatus::Error { message } => {
// Providing a user-friendly error message from the status // Providing a user-friendly error message from the status
format!("Error: {}", err_msg) format!("Error: {}", message)
} }
IngestionTaskStatus::Cancelled => "Cancelled".to_string(), IngestionTaskStatus::Cancelled => "Cancelled".to_string(),
}; };
@@ -197,9 +197,9 @@ pub async fn get_task_updates_stream(
// Check for terminal states to close the stream // Check for terminal states to close the stream
match updated_task.status { match updated_task.status {
IngestionTaskStatus::Completed | IngestionTaskStatus::Completed
IngestionTaskStatus::Error(_) | | IngestionTaskStatus::Error { .. }
IngestionTaskStatus::Cancelled => { | IngestionTaskStatus::Cancelled => {
// Send a specific event that HTMX uses to close the connection // Send a specific event that HTMX uses to close the connection
// Send a event to reload the recent content // Send a event to reload the recent content
// Send a event to remove the loading indicatior // Send a event to remove the loading indicatior

View File

@@ -19,10 +19,12 @@
</div> </div>
<div> <div>
<div class="[&:before]:content-['Status:_'] [&:before]:opacity-60"> <div class="[&:before]:content-['Status:_'] [&:before]:opacity-60">
{% if item.status.InProgress %} {% if item.status.name == "InProgress" %}
In Progress, attempt {{item.status.InProgress.attempts}} In Progress, attempt {{item.status.attempts}}
{% elif item.status.name == "Error" %}
Error: {{item.status.message}}
{% else %} {% else %}
{{item.status}} {{item.status.name}}
{% endif %} {% endif %}
</div> </div>
<div class="text-xs font-semibold opacity-60"> <div class="text-xs font-semibold opacity-60">

View File

@@ -44,7 +44,7 @@ pub async fn run_worker_loop(
Action::Update => { Action::Update => {
match notification.data.status { match notification.data.status {
IngestionTaskStatus::Completed IngestionTaskStatus::Completed
| IngestionTaskStatus::Error(_) | IngestionTaskStatus::Error { .. }
| IngestionTaskStatus::Cancelled => { | IngestionTaskStatus::Cancelled => {
info!( info!(
"Skipping already completed/error/cancelled task: {}", "Skipping already completed/error/cancelled task: {}",
@@ -58,7 +58,7 @@ pub async fn run_worker_loop(
db.get_item::<IngestionTask>(&notification.data.id).await db.get_item::<IngestionTask>(&notification.data.id).await
{ {
match current_task.status { match current_task.status {
IngestionTaskStatus::Error(_) IngestionTaskStatus::Error { .. }
if attempts if attempts
< common::storage::types::ingestion_task::MAX_ATTEMPTS => < common::storage::types::ingestion_task::MAX_ATTEMPTS =>
{ {

View File

@@ -72,7 +72,9 @@ impl IngestionPipeline {
if current_attempts >= MAX_ATTEMPTS { if current_attempts >= MAX_ATTEMPTS {
IngestionTask::update_status( IngestionTask::update_status(
&task.id, &task.id,
IngestionTaskStatus::Error(format!("Max attempts reached: {}", e)), IngestionTaskStatus::Error {
message: format!("Max attempts reached: {}", e),
},
&self.db, &self.db,
) )
.await?; .await?;