ingestion-pipeline crated init, begun moving

This commit is contained in:
Per Stark
2025-03-06 15:29:13 +01:00
parent ef1478547e
commit 1a641db503
21 changed files with 648 additions and 601 deletions

21
Cargo.lock generated
View File

@@ -1071,7 +1071,6 @@ dependencies = [
"mockall", "mockall",
"plotly", "plotly",
"reqwest", "reqwest",
"scraper",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2",
@@ -1079,7 +1078,6 @@ dependencies = [
"tempfile", "tempfile",
"text-splitter", "text-splitter",
"thiserror", "thiserror",
"tiktoken-rs",
"tokio", "tokio",
"tower-http", "tower-http",
"tracing", "tracing",
@@ -2429,6 +2427,24 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "ingestion-pipeline"
version = "0.1.0"
dependencies = [
"async-openai",
"axum",
"chrono",
"common",
"reqwest",
"scraper",
"serde",
"serde_json",
"text-splitter",
"tiktoken-rs",
"tokio",
"tracing",
]
[[package]] [[package]]
name = "inotify" name = "inotify"
version = "0.10.2" version = "0.10.2"
@@ -2734,6 +2750,7 @@ dependencies = [
"config", "config",
"futures", "futures",
"html-router", "html-router",
"ingestion-pipeline",
"json-stream-parser", "json-stream-parser",
"lettre", "lettre",
"mime", "mime",

View File

@@ -3,7 +3,7 @@ members = [
"crates/main", "crates/main",
"crates/common", "crates/common",
"crates/api-router" "crates/api-router"
, "crates/html-router"] , "crates/html-router", "crates/ingestion-pipeline"]
resolver = "2" resolver = "2"
[workspace.dependencies] [workspace.dependencies]

View File

@@ -6,7 +6,7 @@ use axum::{
Router, Router,
}; };
use middleware_api_auth::api_auth; use middleware_api_auth::api_auth;
use routes::ingress::ingress_data; use routes::ingress::ingest_data;
pub mod api_state; pub mod api_state;
mod middleware_api_auth; mod middleware_api_auth;
@@ -19,7 +19,7 @@ where
ApiState: FromRef<S>, ApiState: FromRef<S>,
{ {
Router::new() Router::new()
.route("/ingress", post(ingress_data)) .route("/ingress", post(ingest_data))
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024)) .layer(DefaultBodyLimit::max(1024 * 1024 * 1024))
.route_layer(from_fn_with_state(app_state.clone(), api_auth)) .route_layer(from_fn_with_state(app_state.clone(), api_auth))
} }

View File

@@ -2,17 +2,19 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, Extension};
use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart}; use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart};
use common::{ use common::{
error::{ApiError, AppError}, error::{ApiError, AppError},
ingress::ingress_object::IngressObject, storage::types::{
storage::types::{file_info::FileInfo, job::Job, user::User}, file_info::FileInfo, ingestion_payload::IngestionPayload, ingestion_task::IngestionTask,
user::User,
},
}; };
use futures::{future::try_join_all, TryFutureExt}; use futures::{future::try_join_all, TryFutureExt};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tracing::{debug, info}; use tracing::info;
use crate::api_state::ApiState; use crate::api_state::ApiState;
#[derive(Debug, TryFromMultipart)] #[derive(Debug, TryFromMultipart)]
pub struct IngressParams { pub struct IngestParams {
pub content: Option<String>, pub content: Option<String>,
pub instructions: String, pub instructions: String,
pub category: String, pub category: String,
@@ -21,10 +23,10 @@ pub struct IngressParams {
pub files: Vec<FieldData<NamedTempFile>>, pub files: Vec<FieldData<NamedTempFile>>,
} }
pub async fn ingress_data( pub async fn ingest_data(
State(state): State<ApiState>, State(state): State<ApiState>,
Extension(user): Extension<User>, Extension(user): Extension<User>,
TypedMultipart(input): TypedMultipart<IngressParams>, TypedMultipart(input): TypedMultipart<IngestParams>,
) -> Result<impl IntoResponse, ApiError> { ) -> Result<impl IntoResponse, ApiError> {
info!("Received input: {:?}", input); info!("Received input: {:?}", input);
@@ -36,20 +38,19 @@ pub async fn ingress_data(
) )
.await?; .await?;
debug!("Got file infos"); let payloads = IngestionPayload::create_ingestion_payload(
let ingress_objects = IngressObject::create_ingress_objects(
input.content, input.content,
input.instructions, input.instructions,
input.category, input.category,
file_infos, file_infos,
user.id.as_str(), user.id.as_str(),
)?; )?;
debug!("Got ingress objects");
let futures: Vec<_> = ingress_objects let futures: Vec<_> = payloads
.into_iter() .into_iter()
.map(|object| Job::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)) .map(|object| {
IngestionTask::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)
})
.collect(); .collect();
try_join_all(futures).await.map_err(AppError::from)?; try_join_all(futures).await.map_err(AppError::from)?;

View File

@@ -34,12 +34,10 @@ minijinja-contrib = { version = "2.6.0", features = ["datetime", "timezone"] }
mockall = "0.13.0" mockall = "0.13.0"
plotly = "0.12.1" plotly = "0.12.1"
reqwest = {version = "0.12.12", features = ["charset", "json"]} reqwest = {version = "0.12.12", features = ["charset", "json"]}
scraper = "0.22.0"
sha2 = "0.10.8" sha2 = "0.10.8"
surrealdb = "2.0.4" surrealdb = "2.0.4"
tempfile = "3.12.0" tempfile = "3.12.0"
text-splitter = "0.18.1" text-splitter = "0.18.1"
tiktoken-rs = "0.6.0"
tower-http = { version = "0.6.2", features = ["fs"] } tower-http = { version = "0.6.2", features = ["fs"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] } url = { version = "2.5.2", features = ["serde"] }

View File

@@ -1,162 +1 @@
use std::{sync::Arc, time::Instant};
use chrono::Utc;
use text_splitter::TextSplitter;
use tracing::{debug, info};
use crate::{
error::AppError,
storage::{
db::SurrealDbClient,
types::{
job::{Job, JobStatus, MAX_ATTEMPTS},
knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk,
text_content::TextContent,
},
},
utils::embedding::generate_embedding,
};
use super::analysis::{
ingress_analyser::IngressAnalyzer, types::llm_analysis_result::LLMGraphAnalysisResult,
};
pub struct ContentProcessor {
db: Arc<SurrealDbClient>,
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
}
impl ContentProcessor {
pub async fn new(
db: Arc<SurrealDbClient>,
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<Self, AppError> {
Ok(Self { db, openai_client })
}
pub async fn process_job(&self, job: Job) -> Result<(), AppError> {
let current_attempts = match job.status {
JobStatus::InProgress { attempts, .. } => attempts + 1,
_ => 1,
};
// Update status to InProgress with attempt count
Job::update_status(
&job.id,
JobStatus::InProgress {
attempts: current_attempts,
last_attempt: Utc::now(),
},
&self.db,
)
.await?;
let text_content = job.content.to_text_content(&self.openai_client).await?;
match self.process(&text_content).await {
Ok(_) => {
Job::update_status(&job.id, JobStatus::Completed, &self.db).await?;
Ok(())
}
Err(e) => {
if current_attempts >= MAX_ATTEMPTS {
Job::update_status(
&job.id,
JobStatus::Error(format!("Max attempts reached: {}", e)),
&self.db,
)
.await?;
}
Err(AppError::Processing(e.to_string()))
}
}
}
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
let now = Instant::now();
// Perform analyis, this step also includes retrieval
let analysis = self.perform_semantic_analysis(content).await?;
let end = now.elapsed();
info!(
"{:?} time elapsed during creation of entities and relationships",
end
);
// Convert analysis to objects
let (entities, relationships) = analysis
.to_database_entities(&content.id, &content.user_id, &self.openai_client)
.await?;
// Store everything
tokio::try_join!(
self.store_graph_entities(entities, relationships),
self.store_vector_chunks(content),
)?;
// Store original content
self.db.store_item(content.to_owned()).await?;
self.db.rebuild_indexes().await?;
Ok(())
}
async fn perform_semantic_analysis(
&self,
content: &TextContent,
) -> Result<LLMGraphAnalysisResult, AppError> {
let analyser = IngressAnalyzer::new(&self.db, &self.openai_client);
analyser
.analyze_content(
&content.category,
&content.instructions,
&content.text,
&content.user_id,
)
.await
}
async fn store_graph_entities(
&self,
entities: Vec<KnowledgeEntity>,
relationships: Vec<KnowledgeRelationship>,
) -> Result<(), AppError> {
for entity in &entities {
debug!("Storing entity: {:?}", entity);
self.db.store_item(entity.clone()).await?;
}
for relationship in &relationships {
debug!("Storing relationship: {:?}", relationship);
relationship.store_relationship(&self.db).await?;
}
info!(
"Stored {} entities and {} relationships",
entities.len(),
relationships.len()
);
Ok(())
}
async fn store_vector_chunks(&self, content: &TextContent) -> Result<(), AppError> {
let splitter = TextSplitter::new(500..2000);
let chunks = splitter.chunks(&content.text);
// Could potentially process chunks in parallel with a bounded concurrent limit
for chunk in chunks {
let embedding = generate_embedding(&self.openai_client, chunk).await?;
let text_chunk = TextChunk::new(
content.id.to_string(),
chunk.to_string(),
embedding,
content.user_id.to_string(),
);
self.db.store_item(text_chunk).await?;
}
Ok(())
}
}

View File

@@ -1,345 +0,0 @@
use std::{sync::Arc, time::Duration};
use crate::{
error::AppError,
storage::types::{file_info::FileInfo, text_content::TextContent},
};
use async_openai::types::{
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
CreateChatCompletionRequestArgs,
};
use reqwest;
use scraper::{Html, Selector};
use serde::{Deserialize, Serialize};
use std::fmt::Write;
use tiktoken_rs::{o200k_base, CoreBPE};
use tracing::info;
use url::Url;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum IngressObject {
Url {
url: String,
instructions: String,
category: String,
user_id: String,
},
Text {
text: String,
instructions: String,
category: String,
user_id: String,
},
File {
file_info: FileInfo,
instructions: String,
category: String,
user_id: String,
},
}
impl IngressObject {
/// Creates ingress objects from the provided content, instructions, and files.
///
/// # Arguments
/// * `content` - Optional textual content to be ingressed
/// * `instructions` - Instructions for processing the ingress content
/// * `category` - Category to classify the ingressed content
/// * `files` - Vector of `FileInfo` objects containing information about uploaded files
/// * `user_id` - Identifier of the user performing the ingress operation
///
/// # Returns
/// * `Result<Vec<IngressObject>, AppError>` - On success, returns a vector of ingress objects
/// (one per file/content type). On failure, returns an `AppError`.
pub fn create_ingress_objects(
content: Option<String>,
instructions: String,
category: String,
files: Vec<FileInfo>,
user_id: &str,
) -> Result<Vec<IngressObject>, AppError> {
// Initialize list
let mut object_list = Vec::new();
// Create a IngressObject from content if it exists, checking for URL or text
if let Some(input_content) = content {
match Url::parse(&input_content) {
Ok(url) => {
info!("Detected URL: {}", url);
object_list.push(IngressObject::Url {
url: url.to_string(),
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
});
}
Err(_) => {
if input_content.len() > 2 {
info!("Treating input as plain text");
object_list.push(IngressObject::Text {
text: input_content.to_string(),
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
});
}
}
}
}
for file in files {
object_list.push(IngressObject::File {
file_info: file,
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
})
}
// If no objects are constructed, we return Err
if object_list.is_empty() {
return Err(AppError::NotFound(
"No valid content or files provided".into(),
));
}
Ok(object_list)
}
/// Creates a new `TextContent` instance from a `IngressObject`.
///
/// # Arguments
/// `&self` - A reference to the `IngressObject`.
///
/// # Returns
/// `TextContent` - An object containing a text representation of the object, could be a scraped URL, parsed PDF, etc.
pub async fn to_text_content(
&self,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<TextContent, AppError> {
match self {
IngressObject::Url {
url,
instructions,
category,
user_id,
} => {
let text = Self::fetch_text_from_url(url, openai_client).await?;
Ok(TextContent::new(
text,
instructions.into(),
category.into(),
None,
Some(url.into()),
user_id.into(),
))
}
IngressObject::Text {
text,
instructions,
category,
user_id,
} => Ok(TextContent::new(
text.into(),
instructions.into(),
category.into(),
None,
None,
user_id.into(),
)),
IngressObject::File {
file_info,
instructions,
category,
user_id,
} => {
let text = Self::extract_text_from_file(file_info).await?;
Ok(TextContent::new(
text,
instructions.into(),
category.into(),
Some(file_info.to_owned()),
None,
user_id.into(),
))
}
}
}
/// Get text from url, will return it as a markdown formatted string
async fn fetch_text_from_url(
url: &str,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<String, AppError> {
// Use a client with timeouts and reuse
let client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(30))
.build()?;
let response = client.get(url).send().await?.text().await?;
// Preallocate string with capacity
let mut structured_content = String::with_capacity(response.len() / 2);
let document = Html::parse_document(&response);
let main_selectors = Selector::parse(
"article, main, .article-content, .post-content, .entry-content, [role='main']",
)
.unwrap();
let content_element = document
.select(&main_selectors)
.next()
.or_else(|| document.select(&Selector::parse("body").unwrap()).next())
.ok_or(AppError::NotFound("No content found".into()))?;
// Compile selectors once
let heading_selector = Selector::parse("h1, h2, h3").unwrap();
let paragraph_selector = Selector::parse("p").unwrap();
// Process content in one pass
for element in content_element.select(&heading_selector) {
let _ = writeln!(
structured_content,
"<heading>{}</heading>",
element.text().collect::<String>().trim()
);
}
for element in content_element.select(&paragraph_selector) {
let _ = writeln!(
structured_content,
"<paragraph>{}</paragraph>",
element.text().collect::<String>().trim()
);
}
let content = structured_content
.replace(|c: char| c.is_control(), " ")
.replace(" ", " ");
Self::process_web_content(content, openai_client).await
}
pub async fn process_web_content(
content: String,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<String, AppError> {
const MAX_TOKENS: usize = 122000;
const SYSTEM_PROMPT: &str = r#"
You are a precise content extractor for web pages. Your task:
1. Extract ONLY the main article/content from the provided text
2. Maintain the original content - do not summarize or modify the core information
3. Ignore peripheral content such as:
- Navigation elements
- Error messages (e.g., "JavaScript required")
- Related articles sections
- Comments
- Social media links
- Advertisement text
FORMAT:
- Convert <heading> tags to markdown headings (#, ##, ###)
- Convert <paragraph> tags to markdown paragraphs
- Preserve quotes and important formatting
- Remove duplicate content
- Remove any metadata or technical artifacts
OUTPUT RULES:
- Output ONLY the cleaned content in markdown
- Do not add any explanations or meta-commentary
- Do not add summaries or conclusions
- Do not use any XML/HTML tags in the output
"#;
let bpe = o200k_base()?;
// Process content in chunks if needed
let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS {
Self::truncate_content(&content, MAX_TOKENS, &bpe)?
} else {
content
};
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.temperature(0.0)
.max_tokens(16200u32)
.messages([
ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(),
ChatCompletionRequestUserMessage::from(truncated_content).into(),
])
.build()?;
let response = openai_client.chat().create(request).await?;
response
.choices
.first()
.and_then(|choice| choice.message.content.as_ref())
.map(|content| content.to_owned())
.ok_or(AppError::LLMParsing("No content in response".into()))
}
fn truncate_content(
content: &str,
max_tokens: usize,
tokenizer: &CoreBPE,
) -> Result<String, AppError> {
// Pre-allocate with estimated size
let mut result = String::with_capacity(content.len() / 2);
let mut current_tokens = 0;
// Process content by paragraph to maintain context
for paragraph in content.split("\n\n") {
let tokens = tokenizer.encode_with_special_tokens(paragraph).len();
// Check if adding paragraph exceeds limit
if current_tokens + tokens > max_tokens {
break;
}
result.push_str(paragraph);
result.push_str("\n\n");
current_tokens += tokens;
}
// Ensure we return valid content
if result.is_empty() {
return Err(AppError::Processing("Content exceeds token limit".into()));
}
Ok(result.trim_end().to_string())
}
/// Extracts text from a file based on its MIME type.
async fn extract_text_from_file(file_info: &FileInfo) -> Result<String, AppError> {
match file_info.mime_type.as_str() {
"text/plain" => {
// Read the file and return its content
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"text/markdown" => {
// Read the file and return its content
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"application/pdf" => {
// TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf`
Err(AppError::NotFound(file_info.mime_type.clone()))
}
"image/png" | "image/jpeg" => {
// TODO: Implement OCR on image using a crate like `tesseract`
Err(AppError::NotFound(file_info.mime_type.clone()))
}
"application/octet-stream" => {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"text/x-rust" => {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
// Handle other MIME types as needed
_ => Err(AppError::NotFound(file_info.mime_type.clone())),
}
}
}

View File

@@ -1,3 +1,2 @@
pub mod analysis; pub mod analysis;
pub mod content_processor; pub mod content_processor;
pub mod ingress_object;

View File

@@ -1,6 +1,6 @@
use crate::error::AppError; use crate::error::AppError;
use super::types::{analytics::Analytics, job::Job, system_settings::SystemSettings, StoredObject}; use super::types::{analytics::Analytics, system_settings::SystemSettings, StoredObject};
use axum_session::{SessionConfig, SessionError, SessionStore}; use axum_session::{SessionConfig, SessionError, SessionStore};
use axum_session_surreal::SessionSurrealPool; use axum_session_surreal::SessionSurrealPool;
use futures::Stream; use futures::Stream;
@@ -171,9 +171,9 @@ impl SurrealDbClient {
/// * `Result<Option<T>, Error>` - The deleted item or Error /// * `Result<Option<T>, Error>` - The deleted item or Error
pub async fn listen<T>( pub async fn listen<T>(
&self, &self,
) -> Result<impl Stream<Item = Result<Notification<Job>, Error>>, Error> ) -> Result<impl Stream<Item = Result<Notification<T>, Error>>, Error>
where where
T: for<'de> StoredObject, T: for<'de> StoredObject + std::marker::Unpin,
{ {
self.client.select(T::table_name()).live().await self.client.select(T::table_name()).live().await
} }

View File

@@ -0,0 +1,95 @@
use crate::{error::AppError, storage::types::file_info::FileInfo};
use serde::{Deserialize, Serialize};
use tracing::info;
use url::Url;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum IngestionPayload {
Url {
url: String,
instructions: String,
category: String,
user_id: String,
},
Text {
text: String,
instructions: String,
category: String,
user_id: String,
},
File {
file_info: FileInfo,
instructions: String,
category: String,
user_id: String,
},
}
impl IngestionPayload {
/// Creates ingestion payloads from the provided content, instructions, and files.
///
/// # Arguments
/// * `content` - Optional textual content to be ingressed
/// * `instructions` - Instructions for processing the ingress content
/// * `category` - Category to classify the ingressed content
/// * `files` - Vector of `FileInfo` objects containing information about uploaded files
/// * `user_id` - Identifier of the user performing the ingress operation
///
/// # Returns
/// * `Result<Vec<IngestionPayload>, AppError>` - On success, returns a vector of ingress objects
/// (one per file/content type). On failure, returns an `AppError`.
pub fn create_ingestion_payload(
content: Option<String>,
instructions: String,
category: String,
files: Vec<FileInfo>,
user_id: &str,
) -> Result<Vec<IngestionPayload>, AppError> {
// Initialize list
let mut object_list = Vec::new();
// Create a IngestionPayload from content if it exists, checking for URL or text
if let Some(input_content) = content {
match Url::parse(&input_content) {
Ok(url) => {
info!("Detected URL: {}", url);
object_list.push(IngestionPayload::Url {
url: url.to_string(),
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
});
}
Err(_) => {
if input_content.len() > 2 {
info!("Treating input as plain text");
object_list.push(IngestionPayload::Text {
text: input_content.to_string(),
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
});
}
}
}
}
for file in files {
object_list.push(IngestionPayload::File {
file_info: file,
instructions: instructions.clone(),
category: category.clone(),
user_id: user_id.into(),
})
}
// If no objects are constructed, we return Err
if object_list.is_empty() {
return Err(AppError::NotFound(
"No valid content or files provided".into(),
));
}
Ok(object_list)
}
}

View File

@@ -2,13 +2,12 @@ use futures::Stream;
use surrealdb::{opt::PatchOp, Notification}; use surrealdb::{opt::PatchOp, Notification};
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{error::AppError, storage::db::SurrealDbClient, stored_object};
error::AppError, ingress::ingress_object::IngressObject, storage::db::SurrealDbClient,
stored_object, use super::ingestion_payload::IngestionPayload;
};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum JobStatus { pub enum IngestionTaskStatus {
Created, Created,
InProgress { InProgress {
attempts: u32, attempts: u32,
@@ -19,22 +18,22 @@ pub enum JobStatus {
Cancelled, Cancelled,
} }
stored_object!(Job, "job", { stored_object!(IngestionTask, "job", {
content: IngressObject, content: IngestionPayload,
status: JobStatus, status: IngestionTaskStatus,
user_id: String user_id: String
}); });
pub const MAX_ATTEMPTS: u32 = 3; pub const MAX_ATTEMPTS: u32 = 3;
impl Job { impl IngestionTask {
pub async fn new(content: IngressObject, user_id: String) -> Self { pub async fn new(content: IngestionPayload, user_id: String) -> Self {
let now = Utc::now(); let now = Utc::now();
Self { Self {
id: Uuid::new_v4().to_string(), id: Uuid::new_v4().to_string(),
content, content,
status: JobStatus::Created, status: IngestionTaskStatus::Created,
created_at: now, created_at: now,
updated_at: now, updated_at: now,
user_id, user_id,
@@ -43,7 +42,7 @@ impl Job {
/// Creates a new job and stores it in the database /// Creates a new job and stores it in the database
pub async fn create_and_add_to_db( pub async fn create_and_add_to_db(
content: IngressObject, content: IngestionPayload,
user_id: String, user_id: String,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<(), AppError> { ) -> Result<(), AppError> {
@@ -57,10 +56,10 @@ impl Job {
// Update job status // Update job status
pub async fn update_status( pub async fn update_status(
id: &str, id: &str,
status: JobStatus, status: IngestionTaskStatus,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<(), AppError> { ) -> Result<(), AppError> {
let _job: Option<Job> = db let _job: Option<Self> = db
.update((Self::table_name(), id)) .update((Self::table_name(), id))
.patch(PatchOp::replace("/status", status)) .patch(PatchOp::replace("/status", status))
.patch(PatchOp::replace( .patch(PatchOp::replace(
@@ -73,16 +72,16 @@ impl Job {
} }
/// Listen for new jobs /// Listen for new jobs
pub async fn listen_for_jobs( pub async fn listen_for_tasks(
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<impl Stream<Item = Result<Notification<Job>, surrealdb::Error>>, surrealdb::Error> ) -> Result<impl Stream<Item = Result<Notification<Self>, surrealdb::Error>>, surrealdb::Error>
{ {
db.listen::<Job>().await db.listen::<Self>().await
} }
/// Get all unfinished jobs, ie newly created and in progress up two times /// Get all unfinished tasks, ie newly created and in progress up two times
pub async fn get_unfinished_jobs(db: &SurrealDbClient) -> Result<Vec<Job>, AppError> { pub async fn get_unfinished_tasks(db: &SurrealDbClient) -> Result<Vec<Self>, AppError> {
let jobs: Vec<Job> = db let jobs: Vec<Self> = db
.query( .query(
"SELECT * FROM type::table($table) "SELECT * FROM type::table($table)
WHERE WHERE

View File

@@ -3,7 +3,8 @@ use serde::{Deserialize, Serialize};
pub mod analytics; pub mod analytics;
pub mod conversation; pub mod conversation;
pub mod file_info; pub mod file_info;
pub mod job; pub mod ingestion_payload;
pub mod ingestion_task;
pub mod knowledge_entity; pub mod knowledge_entity;
pub mod knowledge_relationship; pub mod knowledge_relationship;
pub mod message; pub mod message;

View File

@@ -4,7 +4,7 @@ use surrealdb::{engine::any::Any, Surreal};
use uuid::Uuid; use uuid::Uuid;
use super::{ use super::{
conversation::Conversation, job::Job, knowledge_entity::KnowledgeEntity, conversation::Conversation, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings, knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings,
text_content::TextContent, text_content::TextContent,
}; };
@@ -351,12 +351,12 @@ impl User {
Ok(conversations) Ok(conversations)
} }
/// Gets all active jobs for the specified user /// Gets all active ingestion tasks for the specified user
pub async fn get_unfinished_jobs( pub async fn get_unfinished_ingestion_tasks(
user_id: &str, user_id: &str,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<Vec<Job>, AppError> { ) -> Result<Vec<IngestionTask>, AppError> {
let jobs: Vec<Job> = db let jobs: Vec<IngestionTask> = db
.query( .query(
"SELECT * FROM type::table($table) "SELECT * FROM type::table($table)
WHERE user_id = $user_id WHERE user_id = $user_id
@@ -369,7 +369,7 @@ impl User {
) )
ORDER BY created_at DESC", ORDER BY created_at DESC",
) )
.bind(("table", Job::table_name())) .bind(("table", IngestionTask::table_name()))
.bind(("user_id", user_id.to_owned())) .bind(("user_id", user_id.to_owned()))
.bind(("max_attempts", 3)) .bind(("max_attempts", 3))
.await? .await?
@@ -384,12 +384,12 @@ impl User {
user_id: &str, user_id: &str,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<(), AppError> { ) -> Result<(), AppError> {
db.get_item::<Job>(id) db.get_item::<IngestionTask>(id)
.await? .await?
.filter(|job| job.user_id == user_id) .filter(|job| job.user_id == user_id)
.ok_or_else(|| AppError::Auth("Not authorized to delete this job".into()))?; .ok_or_else(|| AppError::Auth("Not authorized to delete this job".into()))?;
db.delete_item::<Job>(id) db.delete_item::<IngestionTask>(id)
.await .await
.map_err(AppError::Database)?; .map_err(AppError::Database)?;

View File

@@ -12,7 +12,7 @@ use tracing::info;
use common::{ use common::{
error::{AppError, HtmlError}, error::{AppError, HtmlError},
storage::types::{ storage::types::{
file_info::FileInfo, job::Job, knowledge_entity::KnowledgeEntity, file_info::FileInfo, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk,
text_content::TextContent, user::User, text_content::TextContent, user::User,
}, },
@@ -26,7 +26,7 @@ page_data!(IndexData, "index/index.html", {
gdpr_accepted: bool, gdpr_accepted: bool,
user: Option<User>, user: Option<User>,
latest_text_contents: Vec<TextContent>, latest_text_contents: Vec<TextContent>,
active_jobs: Vec<Job> active_jobs: Vec<IngestionTask>
}); });
pub async fn index_handler( pub async fn index_handler(
@@ -39,9 +39,11 @@ pub async fn index_handler(
let gdpr_accepted = auth.current_user.is_some() | session.get("gdpr_accepted").unwrap_or(false); let gdpr_accepted = auth.current_user.is_some() | session.get("gdpr_accepted").unwrap_or(false);
let active_jobs = match auth.current_user.is_some() { let active_jobs = match auth.current_user.is_some() {
true => User::get_unfinished_jobs(&auth.current_user.clone().unwrap().id, &state.db) true => {
.await User::get_unfinished_ingestion_tasks(&auth.current_user.clone().unwrap().id, &state.db)
.map_err(|e| HtmlError::new(e, state.templates.clone()))?, .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?
}
false => vec![], false => vec![],
}; };
@@ -172,7 +174,7 @@ async fn get_and_validate_text_content(
#[derive(Serialize)] #[derive(Serialize)]
pub struct ActiveJobsData { pub struct ActiveJobsData {
pub active_jobs: Vec<Job>, pub active_jobs: Vec<IngestionTask>,
pub user: User, pub user: User,
} }
@@ -190,7 +192,7 @@ pub async fn delete_job(
.await .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;
let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db)
.await .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;
@@ -216,7 +218,7 @@ pub async fn show_active_jobs(
None => return Ok(Redirect::to("/signin").into_response()), None => return Ok(Redirect::to("/signin").into_response()),
}; };
let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db)
.await .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;

View File

@@ -12,8 +12,10 @@ use tracing::info;
use common::{ use common::{
error::{AppError, HtmlError, IntoHtmlError}, error::{AppError, HtmlError, IntoHtmlError},
ingress::ingress_object::IngressObject, storage::types::{
storage::types::{file_info::FileInfo, job::Job, user::User}, file_info::FileInfo, ingestion_payload::IngestionPayload, ingestion_task::IngestionTask,
user::User,
},
}; };
use crate::{ use crate::{
@@ -112,7 +114,7 @@ pub async fn process_ingress_form(
})) }))
.await?; .await?;
let ingress_objects = IngressObject::create_ingress_objects( let payloads = IngestionPayload::create_ingestion_payload(
input.content, input.content,
input.instructions, input.instructions,
input.category, input.category,
@@ -121,9 +123,11 @@ pub async fn process_ingress_form(
) )
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;
let futures: Vec<_> = ingress_objects let futures: Vec<_> = payloads
.into_iter() .into_iter()
.map(|object| Job::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)) .map(|object| {
IngestionTask::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)
})
.collect(); .collect();
try_join_all(futures) try_join_all(futures)
@@ -132,7 +136,7 @@ pub async fn process_ingress_form(
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;
// Update the active jobs page with the newly created job // Update the active jobs page with the newly created job
let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db)
.await .await
.map_err(|e| HtmlError::new(e, state.templates.clone()))?; .map_err(|e| HtmlError::new(e, state.templates.clone()))?;

View File

@@ -0,0 +1,21 @@
[package]
name = "ingestion-pipeline"
version = "0.1.0"
edition = "2021"
[dependencies]
# Workspace dependencies
tokio = { workspace = true }
serde = { workspace = true }
axum = { workspace = true }
tracing = { workspace = true }
serde_json = { workspace = true }
async-openai = "0.24.1"
tiktoken-rs = "0.6.0"
reqwest = {version = "0.12.12", features = ["charset", "json"]}
scraper = "0.22.0"
chrono = { version = "0.4.39", features = ["serde"] }
text-splitter = "0.18.1"
common = { path = "../common" }

View File

@@ -0,0 +1,2 @@
pub mod pipeline;
pub mod types;

View File

@@ -0,0 +1,165 @@
use std::{sync::Arc, time::Instant};
use chrono::Utc;
use text_splitter::TextSplitter;
use tracing::{debug, info};
use common::{
error::AppError,
storage::{
db::SurrealDbClient,
types::{
ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS},
knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship,
text_chunk::TextChunk,
text_content::TextContent,
},
},
utils::embedding::generate_embedding,
};
use common::ingress::analysis::{
ingress_analyser::IngressAnalyzer, types::llm_analysis_result::LLMGraphAnalysisResult,
};
use crate::types::to_text_content;
pub struct IngestionPipeline {
db: Arc<SurrealDbClient>,
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
}
impl IngestionPipeline {
pub async fn new(
db: Arc<SurrealDbClient>,
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<Self, AppError> {
Ok(Self { db, openai_client })
}
pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> {
let current_attempts = match task.status {
IngestionTaskStatus::InProgress { attempts, .. } => attempts + 1,
_ => 1,
};
// Update status to InProgress with attempt count
IngestionTask::update_status(
&task.id,
IngestionTaskStatus::InProgress {
attempts: current_attempts,
last_attempt: Utc::now(),
},
&self.db,
)
.await?;
let text_content = to_text_content(task.content, &self.openai_client).await?;
match self.process(&text_content).await {
Ok(_) => {
IngestionTask::update_status(&task.id, IngestionTaskStatus::Completed, &self.db)
.await?;
Ok(())
}
Err(e) => {
if current_attempts >= MAX_ATTEMPTS {
IngestionTask::update_status(
&task.id,
IngestionTaskStatus::Error(format!("Max attempts reached: {}", e)),
&self.db,
)
.await?;
}
Err(AppError::Processing(e.to_string()))
}
}
}
pub async fn process(&self, content: &TextContent) -> Result<(), AppError> {
let now = Instant::now();
// Perform analyis, this step also includes retrieval
let analysis = self.perform_semantic_analysis(content).await?;
let end = now.elapsed();
info!(
"{:?} time elapsed during creation of entities and relationships",
end
);
// Convert analysis to application objects
let (entities, relationships) = analysis
.to_database_entities(&content.id, &content.user_id, &self.openai_client)
.await?;
// Store everything
tokio::try_join!(
self.store_graph_entities(entities, relationships),
self.store_vector_chunks(content),
)?;
// Store original content
self.db.store_item(content.to_owned()).await?;
self.db.rebuild_indexes().await?;
Ok(())
}
async fn perform_semantic_analysis(
&self,
content: &TextContent,
) -> Result<LLMGraphAnalysisResult, AppError> {
let analyser = IngressAnalyzer::new(&self.db, &self.openai_client);
analyser
.analyze_content(
&content.category,
&content.instructions,
&content.text,
&content.user_id,
)
.await
}
async fn store_graph_entities(
&self,
entities: Vec<KnowledgeEntity>,
relationships: Vec<KnowledgeRelationship>,
) -> Result<(), AppError> {
for entity in &entities {
debug!("Storing entity: {:?}", entity);
self.db.store_item(entity.clone()).await?;
}
for relationship in &relationships {
debug!("Storing relationship: {:?}", relationship);
relationship.store_relationship(&self.db).await?;
}
info!(
"Stored {} entities and {} relationships",
entities.len(),
relationships.len()
);
Ok(())
}
async fn store_vector_chunks(&self, content: &TextContent) -> Result<(), AppError> {
let splitter = TextSplitter::new(500..2000);
let chunks = splitter.chunks(&content.text);
// Could potentially process chunks in parallel with a bounded concurrent limit
for chunk in chunks {
let embedding = generate_embedding(&self.openai_client, chunk).await?;
let text_chunk = TextChunk::new(
content.id.to_string(),
chunk.to_string(),
embedding,
content.user_id.to_string(),
);
self.db.store_item(text_chunk).await?;
}
Ok(())
}
}

View File

@@ -0,0 +1,247 @@
use std::{sync::Arc, time::Duration};
use async_openai::types::{
ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage,
CreateChatCompletionRequestArgs,
};
use common::{
error::AppError,
storage::types::{
file_info::FileInfo, ingestion_payload::IngestionPayload, text_content::TextContent,
},
};
use reqwest;
use scraper::{Html, Selector};
use std::fmt::Write;
use tiktoken_rs::{o200k_base, CoreBPE};
pub async fn to_text_content(
ingestion_payload: IngestionPayload,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<TextContent, AppError> {
match ingestion_payload {
IngestionPayload::Url {
url,
instructions,
category,
user_id,
} => {
let text = fetch_text_from_url(&url, openai_client).await?;
Ok(TextContent::new(
text,
instructions.into(),
category.into(),
None,
Some(url.into()),
user_id.into(),
))
}
IngestionPayload::Text {
text,
instructions,
category,
user_id,
} => Ok(TextContent::new(
text.into(),
instructions.into(),
category.into(),
None,
None,
user_id.into(),
)),
IngestionPayload::File {
file_info,
instructions,
category,
user_id,
} => {
let text = extract_text_from_file(&file_info).await?;
Ok(TextContent::new(
text,
instructions.into(),
category.into(),
Some(file_info.to_owned()),
None,
user_id.into(),
))
}
}
}
/// Get text from url, will return it as a markdown formatted string
async fn fetch_text_from_url(
url: &str,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<String, AppError> {
// Use a client with timeouts and reuse
let client = reqwest::ClientBuilder::new()
.timeout(Duration::from_secs(30))
.build()?;
let response = client.get(url).send().await?.text().await?;
// Preallocate string with capacity
let mut structured_content = String::with_capacity(response.len() / 2);
let document = Html::parse_document(&response);
let main_selectors = Selector::parse(
"article, main, .article-content, .post-content, .entry-content, [role='main']",
)
.unwrap();
let content_element = document
.select(&main_selectors)
.next()
.or_else(|| document.select(&Selector::parse("body").unwrap()).next())
.ok_or(AppError::NotFound("No content found".into()))?;
// Compile selectors once
let heading_selector = Selector::parse("h1, h2, h3").unwrap();
let paragraph_selector = Selector::parse("p").unwrap();
// Process content in one pass
for element in content_element.select(&heading_selector) {
let _ = writeln!(
structured_content,
"<heading>{}</heading>",
element.text().collect::<String>().trim()
);
}
for element in content_element.select(&paragraph_selector) {
let _ = writeln!(
structured_content,
"<paragraph>{}</paragraph>",
element.text().collect::<String>().trim()
);
}
let content = structured_content
.replace(|c: char| c.is_control(), " ")
.replace(" ", " ");
process_web_content(content, openai_client).await
}
pub async fn process_web_content(
content: String,
openai_client: &Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
) -> Result<String, AppError> {
const MAX_TOKENS: usize = 122000;
const SYSTEM_PROMPT: &str = r#"
You are a precise content extractor for web pages. Your task:
1. Extract ONLY the main article/content from the provided text
2. Maintain the original content - do not summarize or modify the core information
3. Ignore peripheral content such as:
- Navigation elements
- Error messages (e.g., "JavaScript required")
- Related articles sections
- Comments
- Social media links
- Advertisement text
FORMAT:
- Convert <heading> tags to markdown headings (#, ##, ###)
- Convert <paragraph> tags to markdown paragraphs
- Preserve quotes and important formatting
- Remove duplicate content
- Remove any metadata or technical artifacts
OUTPUT RULES:
- Output ONLY the cleaned content in markdown
- Do not add any explanations or meta-commentary
- Do not add summaries or conclusions
- Do not use any XML/HTML tags in the output
"#;
let bpe = o200k_base()?;
// Process content in chunks if needed
let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS {
truncate_content(&content, MAX_TOKENS, &bpe)?
} else {
content
};
let request = CreateChatCompletionRequestArgs::default()
.model("gpt-4o-mini")
.temperature(0.0)
.max_tokens(16200u32)
.messages([
ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(),
ChatCompletionRequestUserMessage::from(truncated_content).into(),
])
.build()?;
let response = openai_client.chat().create(request).await?;
response
.choices
.first()
.and_then(|choice| choice.message.content.as_ref())
.map(|content| content.to_owned())
.ok_or(AppError::LLMParsing("No content in response".into()))
}
fn truncate_content(
content: &str,
max_tokens: usize,
tokenizer: &CoreBPE,
) -> Result<String, AppError> {
// Pre-allocate with estimated size
let mut result = String::with_capacity(content.len() / 2);
let mut current_tokens = 0;
// Process content by paragraph to maintain context
for paragraph in content.split("\n\n") {
let tokens = tokenizer.encode_with_special_tokens(paragraph).len();
// Check if adding paragraph exceeds limit
if current_tokens + tokens > max_tokens {
break;
}
result.push_str(paragraph);
result.push_str("\n\n");
current_tokens += tokens;
}
// Ensure we return valid content
if result.is_empty() {
return Err(AppError::Processing("Content exceeds token limit".into()));
}
Ok(result.trim_end().to_string())
}
/// Extracts text from a file based on its MIME type.
async fn extract_text_from_file(file_info: &FileInfo) -> Result<String, AppError> {
match file_info.mime_type.as_str() {
"text/plain" => {
// Read the file and return its content
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"text/markdown" => {
// Read the file and return its content
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"application/pdf" => {
// TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf`
Err(AppError::NotFound(file_info.mime_type.clone()))
}
"image/png" | "image/jpeg" => {
// TODO: Implement OCR on image using a crate like `tesseract`
Err(AppError::NotFound(file_info.mime_type.clone()))
}
"application/octet-stream" => {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
"text/x-rust" => {
let content = tokio::fs::read_to_string(&file_info.path).await?;
Ok(content)
}
// Handle other MIME types as needed
_ => Err(AppError::NotFound(file_info.mime_type.clone())),
}
}

View File

@@ -45,6 +45,7 @@ url = { version = "2.5.2", features = ["serde"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] } uuid = { version = "1.10.0", features = ["v4", "serde"] }
# Reference to api-router # Reference to api-router
ingestion-pipeline = { path = "../ingestion-pipeline" }
api-router = { path = "../api-router" } api-router = { path = "../api-router" }
html-router = { path = "../html-router" } html-router = { path = "../html-router" }
common = { path = "../common" } common = { path = "../common" }

View File

@@ -1,14 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use common::{ use common::{
ingress::content_processor::ContentProcessor,
storage::{ storage::{
db::SurrealDbClient, db::SurrealDbClient,
types::job::{Job, JobStatus}, types::ingestion_task::{IngestionTask, IngestionTaskStatus},
}, },
utils::config::get_config, utils::config::get_config,
}; };
use futures::StreamExt; use futures::StreamExt;
use ingestion_pipeline::pipeline::IngestionPipeline;
use surrealdb::Action; use surrealdb::Action;
use tracing::{error, info}; use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter}; use tracing_subscriber::{fmt, prelude::*, EnvFilter};
@@ -37,23 +37,23 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let openai_client = Arc::new(async_openai::Client::new()); let openai_client = Arc::new(async_openai::Client::new());
let content_processor = ContentProcessor::new(db.clone(), openai_client.clone()).await?; let ingestion_pipeline = IngestionPipeline::new(db.clone(), openai_client.clone()).await?;
loop { loop {
// First, check for any unfinished jobs // First, check for any unfinished tasks
let unfinished_jobs = Job::get_unfinished_jobs(&db).await?; let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db).await?;
if !unfinished_jobs.is_empty() { if !unfinished_tasks.is_empty() {
info!("Found {} unfinished jobs", unfinished_jobs.len()); info!("Found {} unfinished jobs", unfinished_tasks.len());
for job in unfinished_jobs { for task in unfinished_tasks {
content_processor.process_job(job).await?; ingestion_pipeline.process_task(task).await?;
} }
} }
// If no unfinished jobs, start listening for new ones // If no unfinished jobs, start listening for new ones
info!("Listening for new jobs..."); info!("Listening for new jobs...");
let mut job_stream = Job::listen_for_jobs(&db).await?; let mut job_stream = IngestionTask::listen_for_tasks(&db).await?;
while let Some(notification) = job_stream.next().await { while let Some(notification) = job_stream.next().await {
match notification { match notification {
@@ -62,41 +62,42 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
match notification.action { match notification.action {
Action::Create => { Action::Create => {
if let Err(e) = content_processor.process_job(notification.data).await { if let Err(e) = ingestion_pipeline.process_task(notification.data).await
error!("Error processing job: {}", e); {
error!("Error processing task: {}", e);
} }
} }
Action::Update => { Action::Update => {
match notification.data.status { match notification.data.status {
JobStatus::Completed IngestionTaskStatus::Completed
| JobStatus::Error(_) | IngestionTaskStatus::Error(_)
| JobStatus::Cancelled => { | IngestionTaskStatus::Cancelled => {
info!( info!(
"Skipping already completed/error/cancelled job: {}", "Skipping already completed/error/cancelled task: {}",
notification.data.id notification.data.id
); );
continue; continue;
} }
JobStatus::InProgress { attempts, .. } => { IngestionTaskStatus::InProgress { attempts, .. } => {
// Only process if this is a retry after an error, not our own update // Only process if this is a retry after an error, not our own update
if let Ok(Some(current_job)) = if let Ok(Some(current_task)) =
db.get_item::<Job>(&notification.data.id).await db.get_item::<IngestionTask>(&notification.data.id).await
{ {
match current_job.status { match current_task.status {
JobStatus::Error(_) IngestionTaskStatus::Error(_)
if attempts if attempts
< common::storage::types::job::MAX_ATTEMPTS => < common::storage::types::ingestion_task::MAX_ATTEMPTS =>
{ {
// This is a retry after an error // This is a retry after an error
if let Err(e) = if let Err(e) =
content_processor.process_job(current_job).await ingestion_pipeline.process_task(current_task).await
{ {
error!("Error processing job retry: {}", e); error!("Error processing task retry: {}", e);
} }
} }
_ => { _ => {
info!( info!(
"Skipping in-progress update for job: {}", "Skipping in-progress update for task: {}",
notification.data.id notification.data.id
); );
continue; continue;
@@ -104,12 +105,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
} }
} }
JobStatus::Created => { IngestionTaskStatus::Created => {
// Shouldn't happen with Update action, but process if it does // Shouldn't happen with Update action, but process if it does
if let Err(e) = if let Err(e) =
content_processor.process_job(notification.data).await ingestion_pipeline.process_task(notification.data).await
{ {
error!("Error processing job: {}", e); error!("Error processing task: {}", e);
} }
} }
} }
@@ -122,7 +123,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
// If we reach here, the stream has ended (connection lost?) // If we reach here, the stream has ended (connection lost?)
error!("Job stream ended unexpectedly, reconnecting..."); error!("Database stream ended unexpectedly, reconnecting...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} }
} }