diff --git a/Cargo.lock b/Cargo.lock index bd55943..ed92636 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1071,7 +1071,6 @@ dependencies = [ "mockall", "plotly", "reqwest", - "scraper", "serde", "serde_json", "sha2", @@ -1079,7 +1078,6 @@ dependencies = [ "tempfile", "text-splitter", "thiserror", - "tiktoken-rs", "tokio", "tower-http", "tracing", @@ -2429,6 +2427,24 @@ dependencies = [ "serde", ] +[[package]] +name = "ingestion-pipeline" +version = "0.1.0" +dependencies = [ + "async-openai", + "axum", + "chrono", + "common", + "reqwest", + "scraper", + "serde", + "serde_json", + "text-splitter", + "tiktoken-rs", + "tokio", + "tracing", +] + [[package]] name = "inotify" version = "0.10.2" @@ -2734,6 +2750,7 @@ dependencies = [ "config", "futures", "html-router", + "ingestion-pipeline", "json-stream-parser", "lettre", "mime", diff --git a/Cargo.toml b/Cargo.toml index d4b2c68..f192162 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = [ "crates/main", "crates/common", "crates/api-router" -, "crates/html-router"] +, "crates/html-router", "crates/ingestion-pipeline"] resolver = "2" [workspace.dependencies] diff --git a/crates/api-router/src/lib.rs b/crates/api-router/src/lib.rs index c88a3e7..d31ac06 100644 --- a/crates/api-router/src/lib.rs +++ b/crates/api-router/src/lib.rs @@ -6,7 +6,7 @@ use axum::{ Router, }; use middleware_api_auth::api_auth; -use routes::ingress::ingress_data; +use routes::ingress::ingest_data; pub mod api_state; mod middleware_api_auth; @@ -19,7 +19,7 @@ where ApiState: FromRef, { Router::new() - .route("/ingress", post(ingress_data)) + .route("/ingress", post(ingest_data)) .layer(DefaultBodyLimit::max(1024 * 1024 * 1024)) .route_layer(from_fn_with_state(app_state.clone(), api_auth)) } diff --git a/crates/api-router/src/routes/ingress.rs b/crates/api-router/src/routes/ingress.rs index 44fb72f..6fde00e 100644 --- a/crates/api-router/src/routes/ingress.rs +++ b/crates/api-router/src/routes/ingress.rs @@ -2,17 +2,19 @@ use axum::{extract::State, http::StatusCode, response::IntoResponse, Extension}; use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart}; use common::{ error::{ApiError, AppError}, - ingress::ingress_object::IngressObject, - storage::types::{file_info::FileInfo, job::Job, user::User}, + storage::types::{ + file_info::FileInfo, ingestion_payload::IngestionPayload, ingestion_task::IngestionTask, + user::User, + }, }; use futures::{future::try_join_all, TryFutureExt}; use tempfile::NamedTempFile; -use tracing::{debug, info}; +use tracing::info; use crate::api_state::ApiState; #[derive(Debug, TryFromMultipart)] -pub struct IngressParams { +pub struct IngestParams { pub content: Option, pub instructions: String, pub category: String, @@ -21,10 +23,10 @@ pub struct IngressParams { pub files: Vec>, } -pub async fn ingress_data( +pub async fn ingest_data( State(state): State, Extension(user): Extension, - TypedMultipart(input): TypedMultipart, + TypedMultipart(input): TypedMultipart, ) -> Result { info!("Received input: {:?}", input); @@ -36,20 +38,19 @@ pub async fn ingress_data( ) .await?; - debug!("Got file infos"); - - let ingress_objects = IngressObject::create_ingress_objects( + let payloads = IngestionPayload::create_ingestion_payload( input.content, input.instructions, input.category, file_infos, user.id.as_str(), )?; - debug!("Got ingress objects"); - let futures: Vec<_> = ingress_objects + let futures: Vec<_> = payloads .into_iter() - .map(|object| Job::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)) + .map(|object| { + IngestionTask::create_and_add_to_db(object.clone(), user.id.clone(), &state.db) + }) .collect(); try_join_all(futures).await.map_err(AppError::from)?; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index d0aaf3d..f7c6ff6 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -34,12 +34,10 @@ minijinja-contrib = { version = "2.6.0", features = ["datetime", "timezone"] } mockall = "0.13.0" plotly = "0.12.1" reqwest = {version = "0.12.12", features = ["charset", "json"]} -scraper = "0.22.0" sha2 = "0.10.8" surrealdb = "2.0.4" tempfile = "3.12.0" text-splitter = "0.18.1" -tiktoken-rs = "0.6.0" tower-http = { version = "0.6.2", features = ["fs"] } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.2", features = ["serde"] } diff --git a/crates/common/src/ingress/content_processor.rs b/crates/common/src/ingress/content_processor.rs index fe8ff8e..8b13789 100644 --- a/crates/common/src/ingress/content_processor.rs +++ b/crates/common/src/ingress/content_processor.rs @@ -1,162 +1 @@ -use std::{sync::Arc, time::Instant}; -use chrono::Utc; -use text_splitter::TextSplitter; -use tracing::{debug, info}; - -use crate::{ - error::AppError, - storage::{ - db::SurrealDbClient, - types::{ - job::{Job, JobStatus, MAX_ATTEMPTS}, - knowledge_entity::KnowledgeEntity, - knowledge_relationship::KnowledgeRelationship, - text_chunk::TextChunk, - text_content::TextContent, - }, - }, - utils::embedding::generate_embedding, -}; - -use super::analysis::{ - ingress_analyser::IngressAnalyzer, types::llm_analysis_result::LLMGraphAnalysisResult, -}; - -pub struct ContentProcessor { - db: Arc, - openai_client: Arc>, -} - -impl ContentProcessor { - pub async fn new( - db: Arc, - openai_client: Arc>, - ) -> Result { - Ok(Self { db, openai_client }) - } - pub async fn process_job(&self, job: Job) -> Result<(), AppError> { - let current_attempts = match job.status { - JobStatus::InProgress { attempts, .. } => attempts + 1, - _ => 1, - }; - - // Update status to InProgress with attempt count - Job::update_status( - &job.id, - JobStatus::InProgress { - attempts: current_attempts, - last_attempt: Utc::now(), - }, - &self.db, - ) - .await?; - - let text_content = job.content.to_text_content(&self.openai_client).await?; - - match self.process(&text_content).await { - Ok(_) => { - Job::update_status(&job.id, JobStatus::Completed, &self.db).await?; - Ok(()) - } - Err(e) => { - if current_attempts >= MAX_ATTEMPTS { - Job::update_status( - &job.id, - JobStatus::Error(format!("Max attempts reached: {}", e)), - &self.db, - ) - .await?; - } - Err(AppError::Processing(e.to_string())) - } - } - } - - pub async fn process(&self, content: &TextContent) -> Result<(), AppError> { - let now = Instant::now(); - - // Perform analyis, this step also includes retrieval - let analysis = self.perform_semantic_analysis(content).await?; - - let end = now.elapsed(); - info!( - "{:?} time elapsed during creation of entities and relationships", - end - ); - - // Convert analysis to objects - let (entities, relationships) = analysis - .to_database_entities(&content.id, &content.user_id, &self.openai_client) - .await?; - - // Store everything - tokio::try_join!( - self.store_graph_entities(entities, relationships), - self.store_vector_chunks(content), - )?; - - // Store original content - self.db.store_item(content.to_owned()).await?; - - self.db.rebuild_indexes().await?; - Ok(()) - } - - async fn perform_semantic_analysis( - &self, - content: &TextContent, - ) -> Result { - let analyser = IngressAnalyzer::new(&self.db, &self.openai_client); - analyser - .analyze_content( - &content.category, - &content.instructions, - &content.text, - &content.user_id, - ) - .await - } - - async fn store_graph_entities( - &self, - entities: Vec, - relationships: Vec, - ) -> Result<(), AppError> { - for entity in &entities { - debug!("Storing entity: {:?}", entity); - self.db.store_item(entity.clone()).await?; - } - - for relationship in &relationships { - debug!("Storing relationship: {:?}", relationship); - relationship.store_relationship(&self.db).await?; - } - - info!( - "Stored {} entities and {} relationships", - entities.len(), - relationships.len() - ); - Ok(()) - } - - async fn store_vector_chunks(&self, content: &TextContent) -> Result<(), AppError> { - let splitter = TextSplitter::new(500..2000); - let chunks = splitter.chunks(&content.text); - - // Could potentially process chunks in parallel with a bounded concurrent limit - for chunk in chunks { - let embedding = generate_embedding(&self.openai_client, chunk).await?; - let text_chunk = TextChunk::new( - content.id.to_string(), - chunk.to_string(), - embedding, - content.user_id.to_string(), - ); - self.db.store_item(text_chunk).await?; - } - - Ok(()) - } -} diff --git a/crates/common/src/ingress/ingress_object.rs b/crates/common/src/ingress/ingress_object.rs deleted file mode 100644 index 7ca9847..0000000 --- a/crates/common/src/ingress/ingress_object.rs +++ /dev/null @@ -1,345 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use crate::{ - error::AppError, - storage::types::{file_info::FileInfo, text_content::TextContent}, -}; -use async_openai::types::{ - ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, - CreateChatCompletionRequestArgs, -}; -use reqwest; -use scraper::{Html, Selector}; -use serde::{Deserialize, Serialize}; -use std::fmt::Write; -use tiktoken_rs::{o200k_base, CoreBPE}; -use tracing::info; -use url::Url; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum IngressObject { - Url { - url: String, - instructions: String, - category: String, - user_id: String, - }, - Text { - text: String, - instructions: String, - category: String, - user_id: String, - }, - File { - file_info: FileInfo, - instructions: String, - category: String, - user_id: String, - }, -} - -impl IngressObject { - /// Creates ingress objects from the provided content, instructions, and files. - /// - /// # Arguments - /// * `content` - Optional textual content to be ingressed - /// * `instructions` - Instructions for processing the ingress content - /// * `category` - Category to classify the ingressed content - /// * `files` - Vector of `FileInfo` objects containing information about uploaded files - /// * `user_id` - Identifier of the user performing the ingress operation - /// - /// # Returns - /// * `Result, AppError>` - On success, returns a vector of ingress objects - /// (one per file/content type). On failure, returns an `AppError`. - pub fn create_ingress_objects( - content: Option, - instructions: String, - category: String, - files: Vec, - user_id: &str, - ) -> Result, AppError> { - // Initialize list - let mut object_list = Vec::new(); - - // Create a IngressObject from content if it exists, checking for URL or text - if let Some(input_content) = content { - match Url::parse(&input_content) { - Ok(url) => { - info!("Detected URL: {}", url); - object_list.push(IngressObject::Url { - url: url.to_string(), - instructions: instructions.clone(), - category: category.clone(), - user_id: user_id.into(), - }); - } - Err(_) => { - if input_content.len() > 2 { - info!("Treating input as plain text"); - object_list.push(IngressObject::Text { - text: input_content.to_string(), - instructions: instructions.clone(), - category: category.clone(), - user_id: user_id.into(), - }); - } - } - } - } - - for file in files { - object_list.push(IngressObject::File { - file_info: file, - instructions: instructions.clone(), - category: category.clone(), - user_id: user_id.into(), - }) - } - - // If no objects are constructed, we return Err - if object_list.is_empty() { - return Err(AppError::NotFound( - "No valid content or files provided".into(), - )); - } - - Ok(object_list) - } - /// Creates a new `TextContent` instance from a `IngressObject`. - /// - /// # Arguments - /// `&self` - A reference to the `IngressObject`. - /// - /// # Returns - /// `TextContent` - An object containing a text representation of the object, could be a scraped URL, parsed PDF, etc. - pub async fn to_text_content( - &self, - openai_client: &Arc>, - ) -> Result { - match self { - IngressObject::Url { - url, - instructions, - category, - user_id, - } => { - let text = Self::fetch_text_from_url(url, openai_client).await?; - Ok(TextContent::new( - text, - instructions.into(), - category.into(), - None, - Some(url.into()), - user_id.into(), - )) - } - IngressObject::Text { - text, - instructions, - category, - user_id, - } => Ok(TextContent::new( - text.into(), - instructions.into(), - category.into(), - None, - None, - user_id.into(), - )), - IngressObject::File { - file_info, - instructions, - category, - user_id, - } => { - let text = Self::extract_text_from_file(file_info).await?; - Ok(TextContent::new( - text, - instructions.into(), - category.into(), - Some(file_info.to_owned()), - None, - user_id.into(), - )) - } - } - } - - /// Get text from url, will return it as a markdown formatted string - async fn fetch_text_from_url( - url: &str, - openai_client: &Arc>, - ) -> Result { - // Use a client with timeouts and reuse - let client = reqwest::ClientBuilder::new() - .timeout(Duration::from_secs(30)) - .build()?; - let response = client.get(url).send().await?.text().await?; - - // Preallocate string with capacity - let mut structured_content = String::with_capacity(response.len() / 2); - - let document = Html::parse_document(&response); - let main_selectors = Selector::parse( - "article, main, .article-content, .post-content, .entry-content, [role='main']", - ) - .unwrap(); - - let content_element = document - .select(&main_selectors) - .next() - .or_else(|| document.select(&Selector::parse("body").unwrap()).next()) - .ok_or(AppError::NotFound("No content found".into()))?; - - // Compile selectors once - let heading_selector = Selector::parse("h1, h2, h3").unwrap(); - let paragraph_selector = Selector::parse("p").unwrap(); - - // Process content in one pass - for element in content_element.select(&heading_selector) { - let _ = writeln!( - structured_content, - "{}", - element.text().collect::().trim() - ); - } - for element in content_element.select(¶graph_selector) { - let _ = writeln!( - structured_content, - "{}", - element.text().collect::().trim() - ); - } - - let content = structured_content - .replace(|c: char| c.is_control(), " ") - .replace(" ", " "); - Self::process_web_content(content, openai_client).await - } - - pub async fn process_web_content( - content: String, - openai_client: &Arc>, - ) -> Result { - const MAX_TOKENS: usize = 122000; - const SYSTEM_PROMPT: &str = r#" - You are a precise content extractor for web pages. Your task: - - 1. Extract ONLY the main article/content from the provided text - 2. Maintain the original content - do not summarize or modify the core information - 3. Ignore peripheral content such as: - - Navigation elements - - Error messages (e.g., "JavaScript required") - - Related articles sections - - Comments - - Social media links - - Advertisement text - - FORMAT: - - Convert tags to markdown headings (#, ##, ###) - - Convert tags to markdown paragraphs - - Preserve quotes and important formatting - - Remove duplicate content - - Remove any metadata or technical artifacts - - OUTPUT RULES: - - Output ONLY the cleaned content in markdown - - Do not add any explanations or meta-commentary - - Do not add summaries or conclusions - - Do not use any XML/HTML tags in the output - "#; - - let bpe = o200k_base()?; - - // Process content in chunks if needed - let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS { - Self::truncate_content(&content, MAX_TOKENS, &bpe)? - } else { - content - }; - - let request = CreateChatCompletionRequestArgs::default() - .model("gpt-4o-mini") - .temperature(0.0) - .max_tokens(16200u32) - .messages([ - ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(), - ChatCompletionRequestUserMessage::from(truncated_content).into(), - ]) - .build()?; - - let response = openai_client.chat().create(request).await?; - - response - .choices - .first() - .and_then(|choice| choice.message.content.as_ref()) - .map(|content| content.to_owned()) - .ok_or(AppError::LLMParsing("No content in response".into())) - } - - fn truncate_content( - content: &str, - max_tokens: usize, - tokenizer: &CoreBPE, - ) -> Result { - // Pre-allocate with estimated size - let mut result = String::with_capacity(content.len() / 2); - let mut current_tokens = 0; - - // Process content by paragraph to maintain context - for paragraph in content.split("\n\n") { - let tokens = tokenizer.encode_with_special_tokens(paragraph).len(); - - // Check if adding paragraph exceeds limit - if current_tokens + tokens > max_tokens { - break; - } - - result.push_str(paragraph); - result.push_str("\n\n"); - current_tokens += tokens; - } - - // Ensure we return valid content - if result.is_empty() { - return Err(AppError::Processing("Content exceeds token limit".into())); - } - - Ok(result.trim_end().to_string()) - } - - /// Extracts text from a file based on its MIME type. - async fn extract_text_from_file(file_info: &FileInfo) -> Result { - match file_info.mime_type.as_str() { - "text/plain" => { - // Read the file and return its content - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "text/markdown" => { - // Read the file and return its content - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "application/pdf" => { - // TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf` - Err(AppError::NotFound(file_info.mime_type.clone())) - } - "image/png" | "image/jpeg" => { - // TODO: Implement OCR on image using a crate like `tesseract` - Err(AppError::NotFound(file_info.mime_type.clone())) - } - "application/octet-stream" => { - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "text/x-rust" => { - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - // Handle other MIME types as needed - _ => Err(AppError::NotFound(file_info.mime_type.clone())), - } - } -} diff --git a/crates/common/src/ingress/mod.rs b/crates/common/src/ingress/mod.rs index 646645f..e2a6c38 100644 --- a/crates/common/src/ingress/mod.rs +++ b/crates/common/src/ingress/mod.rs @@ -1,3 +1,2 @@ pub mod analysis; pub mod content_processor; -pub mod ingress_object; diff --git a/crates/common/src/storage/db.rs b/crates/common/src/storage/db.rs index 3883483..c7be825 100644 --- a/crates/common/src/storage/db.rs +++ b/crates/common/src/storage/db.rs @@ -1,6 +1,6 @@ use crate::error::AppError; -use super::types::{analytics::Analytics, job::Job, system_settings::SystemSettings, StoredObject}; +use super::types::{analytics::Analytics, system_settings::SystemSettings, StoredObject}; use axum_session::{SessionConfig, SessionError, SessionStore}; use axum_session_surreal::SessionSurrealPool; use futures::Stream; @@ -171,9 +171,9 @@ impl SurrealDbClient { /// * `Result, Error>` - The deleted item or Error pub async fn listen( &self, - ) -> Result, Error>>, Error> + ) -> Result, Error>>, Error> where - T: for<'de> StoredObject, + T: for<'de> StoredObject + std::marker::Unpin, { self.client.select(T::table_name()).live().await } diff --git a/crates/common/src/storage/types/ingestion_payload.rs b/crates/common/src/storage/types/ingestion_payload.rs new file mode 100644 index 0000000..7d7a523 --- /dev/null +++ b/crates/common/src/storage/types/ingestion_payload.rs @@ -0,0 +1,95 @@ +use crate::{error::AppError, storage::types::file_info::FileInfo}; +use serde::{Deserialize, Serialize}; +use tracing::info; +use url::Url; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum IngestionPayload { + Url { + url: String, + instructions: String, + category: String, + user_id: String, + }, + Text { + text: String, + instructions: String, + category: String, + user_id: String, + }, + File { + file_info: FileInfo, + instructions: String, + category: String, + user_id: String, + }, +} + +impl IngestionPayload { + /// Creates ingestion payloads from the provided content, instructions, and files. + /// + /// # Arguments + /// * `content` - Optional textual content to be ingressed + /// * `instructions` - Instructions for processing the ingress content + /// * `category` - Category to classify the ingressed content + /// * `files` - Vector of `FileInfo` objects containing information about uploaded files + /// * `user_id` - Identifier of the user performing the ingress operation + /// + /// # Returns + /// * `Result, AppError>` - On success, returns a vector of ingress objects + /// (one per file/content type). On failure, returns an `AppError`. + pub fn create_ingestion_payload( + content: Option, + instructions: String, + category: String, + files: Vec, + user_id: &str, + ) -> Result, AppError> { + // Initialize list + let mut object_list = Vec::new(); + + // Create a IngestionPayload from content if it exists, checking for URL or text + if let Some(input_content) = content { + match Url::parse(&input_content) { + Ok(url) => { + info!("Detected URL: {}", url); + object_list.push(IngestionPayload::Url { + url: url.to_string(), + instructions: instructions.clone(), + category: category.clone(), + user_id: user_id.into(), + }); + } + Err(_) => { + if input_content.len() > 2 { + info!("Treating input as plain text"); + object_list.push(IngestionPayload::Text { + text: input_content.to_string(), + instructions: instructions.clone(), + category: category.clone(), + user_id: user_id.into(), + }); + } + } + } + } + + for file in files { + object_list.push(IngestionPayload::File { + file_info: file, + instructions: instructions.clone(), + category: category.clone(), + user_id: user_id.into(), + }) + } + + // If no objects are constructed, we return Err + if object_list.is_empty() { + return Err(AppError::NotFound( + "No valid content or files provided".into(), + )); + } + + Ok(object_list) + } +} diff --git a/crates/common/src/storage/types/job.rs b/crates/common/src/storage/types/ingestion_task.rs similarity index 67% rename from crates/common/src/storage/types/job.rs rename to crates/common/src/storage/types/ingestion_task.rs index 9239a00..1a89df1 100644 --- a/crates/common/src/storage/types/job.rs +++ b/crates/common/src/storage/types/ingestion_task.rs @@ -2,13 +2,12 @@ use futures::Stream; use surrealdb::{opt::PatchOp, Notification}; use uuid::Uuid; -use crate::{ - error::AppError, ingress::ingress_object::IngressObject, storage::db::SurrealDbClient, - stored_object, -}; +use crate::{error::AppError, storage::db::SurrealDbClient, stored_object}; + +use super::ingestion_payload::IngestionPayload; #[derive(Debug, Clone, Serialize, Deserialize)] -pub enum JobStatus { +pub enum IngestionTaskStatus { Created, InProgress { attempts: u32, @@ -19,22 +18,22 @@ pub enum JobStatus { Cancelled, } -stored_object!(Job, "job", { - content: IngressObject, - status: JobStatus, +stored_object!(IngestionTask, "job", { + content: IngestionPayload, + status: IngestionTaskStatus, user_id: String }); pub const MAX_ATTEMPTS: u32 = 3; -impl Job { - pub async fn new(content: IngressObject, user_id: String) -> Self { +impl IngestionTask { + pub async fn new(content: IngestionPayload, user_id: String) -> Self { let now = Utc::now(); Self { id: Uuid::new_v4().to_string(), content, - status: JobStatus::Created, + status: IngestionTaskStatus::Created, created_at: now, updated_at: now, user_id, @@ -43,7 +42,7 @@ impl Job { /// Creates a new job and stores it in the database pub async fn create_and_add_to_db( - content: IngressObject, + content: IngestionPayload, user_id: String, db: &SurrealDbClient, ) -> Result<(), AppError> { @@ -57,10 +56,10 @@ impl Job { // Update job status pub async fn update_status( id: &str, - status: JobStatus, + status: IngestionTaskStatus, db: &SurrealDbClient, ) -> Result<(), AppError> { - let _job: Option = db + let _job: Option = db .update((Self::table_name(), id)) .patch(PatchOp::replace("/status", status)) .patch(PatchOp::replace( @@ -73,16 +72,16 @@ impl Job { } /// Listen for new jobs - pub async fn listen_for_jobs( + pub async fn listen_for_tasks( db: &SurrealDbClient, - ) -> Result, surrealdb::Error>>, surrealdb::Error> + ) -> Result, surrealdb::Error>>, surrealdb::Error> { - db.listen::().await + db.listen::().await } - /// Get all unfinished jobs, ie newly created and in progress up two times - pub async fn get_unfinished_jobs(db: &SurrealDbClient) -> Result, AppError> { - let jobs: Vec = db + /// Get all unfinished tasks, ie newly created and in progress up two times + pub async fn get_unfinished_tasks(db: &SurrealDbClient) -> Result, AppError> { + let jobs: Vec = db .query( "SELECT * FROM type::table($table) WHERE diff --git a/crates/common/src/storage/types/mod.rs b/crates/common/src/storage/types/mod.rs index a3701a4..17cc2fd 100644 --- a/crates/common/src/storage/types/mod.rs +++ b/crates/common/src/storage/types/mod.rs @@ -3,7 +3,8 @@ use serde::{Deserialize, Serialize}; pub mod analytics; pub mod conversation; pub mod file_info; -pub mod job; +pub mod ingestion_payload; +pub mod ingestion_task; pub mod knowledge_entity; pub mod knowledge_relationship; pub mod message; diff --git a/crates/common/src/storage/types/user.rs b/crates/common/src/storage/types/user.rs index 2bffd6c..c26434f 100644 --- a/crates/common/src/storage/types/user.rs +++ b/crates/common/src/storage/types/user.rs @@ -4,7 +4,7 @@ use surrealdb::{engine::any::Any, Surreal}; use uuid::Uuid; use super::{ - conversation::Conversation, job::Job, knowledge_entity::KnowledgeEntity, + conversation::Conversation, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings, text_content::TextContent, }; @@ -351,12 +351,12 @@ impl User { Ok(conversations) } - /// Gets all active jobs for the specified user - pub async fn get_unfinished_jobs( + /// Gets all active ingestion tasks for the specified user + pub async fn get_unfinished_ingestion_tasks( user_id: &str, db: &SurrealDbClient, - ) -> Result, AppError> { - let jobs: Vec = db + ) -> Result, AppError> { + let jobs: Vec = db .query( "SELECT * FROM type::table($table) WHERE user_id = $user_id @@ -369,7 +369,7 @@ impl User { ) ORDER BY created_at DESC", ) - .bind(("table", Job::table_name())) + .bind(("table", IngestionTask::table_name())) .bind(("user_id", user_id.to_owned())) .bind(("max_attempts", 3)) .await? @@ -384,12 +384,12 @@ impl User { user_id: &str, db: &SurrealDbClient, ) -> Result<(), AppError> { - db.get_item::(id) + db.get_item::(id) .await? .filter(|job| job.user_id == user_id) .ok_or_else(|| AppError::Auth("Not authorized to delete this job".into()))?; - db.delete_item::(id) + db.delete_item::(id) .await .map_err(AppError::Database)?; diff --git a/crates/html-router/src/routes/index.rs b/crates/html-router/src/routes/index.rs index a0f0f55..bb0b669 100644 --- a/crates/html-router/src/routes/index.rs +++ b/crates/html-router/src/routes/index.rs @@ -12,7 +12,7 @@ use tracing::info; use common::{ error::{AppError, HtmlError}, storage::types::{ - file_info::FileInfo, job::Job, knowledge_entity::KnowledgeEntity, + file_info::FileInfo, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship, text_chunk::TextChunk, text_content::TextContent, user::User, }, @@ -26,7 +26,7 @@ page_data!(IndexData, "index/index.html", { gdpr_accepted: bool, user: Option, latest_text_contents: Vec, - active_jobs: Vec + active_jobs: Vec }); pub async fn index_handler( @@ -39,9 +39,11 @@ pub async fn index_handler( let gdpr_accepted = auth.current_user.is_some() | session.get("gdpr_accepted").unwrap_or(false); let active_jobs = match auth.current_user.is_some() { - true => User::get_unfinished_jobs(&auth.current_user.clone().unwrap().id, &state.db) - .await - .map_err(|e| HtmlError::new(e, state.templates.clone()))?, + true => { + User::get_unfinished_ingestion_tasks(&auth.current_user.clone().unwrap().id, &state.db) + .await + .map_err(|e| HtmlError::new(e, state.templates.clone()))? + } false => vec![], }; @@ -172,7 +174,7 @@ async fn get_and_validate_text_content( #[derive(Serialize)] pub struct ActiveJobsData { - pub active_jobs: Vec, + pub active_jobs: Vec, pub user: User, } @@ -190,7 +192,7 @@ pub async fn delete_job( .await .map_err(|e| HtmlError::new(e, state.templates.clone()))?; - let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) + let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db) .await .map_err(|e| HtmlError::new(e, state.templates.clone()))?; @@ -216,7 +218,7 @@ pub async fn show_active_jobs( None => return Ok(Redirect::to("/signin").into_response()), }; - let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) + let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db) .await .map_err(|e| HtmlError::new(e, state.templates.clone()))?; diff --git a/crates/html-router/src/routes/ingress_form.rs b/crates/html-router/src/routes/ingress_form.rs index 5a1736b..01d4693 100644 --- a/crates/html-router/src/routes/ingress_form.rs +++ b/crates/html-router/src/routes/ingress_form.rs @@ -12,8 +12,10 @@ use tracing::info; use common::{ error::{AppError, HtmlError, IntoHtmlError}, - ingress::ingress_object::IngressObject, - storage::types::{file_info::FileInfo, job::Job, user::User}, + storage::types::{ + file_info::FileInfo, ingestion_payload::IngestionPayload, ingestion_task::IngestionTask, + user::User, + }, }; use crate::{ @@ -112,7 +114,7 @@ pub async fn process_ingress_form( })) .await?; - let ingress_objects = IngressObject::create_ingress_objects( + let payloads = IngestionPayload::create_ingestion_payload( input.content, input.instructions, input.category, @@ -121,9 +123,11 @@ pub async fn process_ingress_form( ) .map_err(|e| HtmlError::new(e, state.templates.clone()))?; - let futures: Vec<_> = ingress_objects + let futures: Vec<_> = payloads .into_iter() - .map(|object| Job::create_and_add_to_db(object.clone(), user.id.clone(), &state.db)) + .map(|object| { + IngestionTask::create_and_add_to_db(object.clone(), user.id.clone(), &state.db) + }) .collect(); try_join_all(futures) @@ -132,7 +136,7 @@ pub async fn process_ingress_form( .map_err(|e| HtmlError::new(e, state.templates.clone()))?; // Update the active jobs page with the newly created job - let active_jobs = User::get_unfinished_jobs(&user.id, &state.db) + let active_jobs = User::get_unfinished_ingestion_tasks(&user.id, &state.db) .await .map_err(|e| HtmlError::new(e, state.templates.clone()))?; diff --git a/crates/ingestion-pipeline/Cargo.toml b/crates/ingestion-pipeline/Cargo.toml new file mode 100644 index 0000000..2f6e913 --- /dev/null +++ b/crates/ingestion-pipeline/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "ingestion-pipeline" +version = "0.1.0" +edition = "2021" + +[dependencies] +# Workspace dependencies +tokio = { workspace = true } +serde = { workspace = true } +axum = { workspace = true } +tracing = { workspace = true } +serde_json = { workspace = true } + +async-openai = "0.24.1" +tiktoken-rs = "0.6.0" +reqwest = {version = "0.12.12", features = ["charset", "json"]} +scraper = "0.22.0" +chrono = { version = "0.4.39", features = ["serde"] } +text-splitter = "0.18.1" + +common = { path = "../common" } diff --git a/crates/ingestion-pipeline/src/lib.rs b/crates/ingestion-pipeline/src/lib.rs new file mode 100644 index 0000000..6fca8de --- /dev/null +++ b/crates/ingestion-pipeline/src/lib.rs @@ -0,0 +1,2 @@ +pub mod pipeline; +pub mod types; diff --git a/crates/ingestion-pipeline/src/pipeline.rs b/crates/ingestion-pipeline/src/pipeline.rs new file mode 100644 index 0000000..713f386 --- /dev/null +++ b/crates/ingestion-pipeline/src/pipeline.rs @@ -0,0 +1,165 @@ +use std::{sync::Arc, time::Instant}; + +use chrono::Utc; +use text_splitter::TextSplitter; +use tracing::{debug, info}; + +use common::{ + error::AppError, + storage::{ + db::SurrealDbClient, + types::{ + ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS}, + knowledge_entity::KnowledgeEntity, + knowledge_relationship::KnowledgeRelationship, + text_chunk::TextChunk, + text_content::TextContent, + }, + }, + utils::embedding::generate_embedding, +}; + +use common::ingress::analysis::{ + ingress_analyser::IngressAnalyzer, types::llm_analysis_result::LLMGraphAnalysisResult, +}; + +use crate::types::to_text_content; + +pub struct IngestionPipeline { + db: Arc, + openai_client: Arc>, +} + +impl IngestionPipeline { + pub async fn new( + db: Arc, + openai_client: Arc>, + ) -> Result { + Ok(Self { db, openai_client }) + } + pub async fn process_task(&self, task: IngestionTask) -> Result<(), AppError> { + let current_attempts = match task.status { + IngestionTaskStatus::InProgress { attempts, .. } => attempts + 1, + _ => 1, + }; + + // Update status to InProgress with attempt count + IngestionTask::update_status( + &task.id, + IngestionTaskStatus::InProgress { + attempts: current_attempts, + last_attempt: Utc::now(), + }, + &self.db, + ) + .await?; + + let text_content = to_text_content(task.content, &self.openai_client).await?; + + match self.process(&text_content).await { + Ok(_) => { + IngestionTask::update_status(&task.id, IngestionTaskStatus::Completed, &self.db) + .await?; + Ok(()) + } + Err(e) => { + if current_attempts >= MAX_ATTEMPTS { + IngestionTask::update_status( + &task.id, + IngestionTaskStatus::Error(format!("Max attempts reached: {}", e)), + &self.db, + ) + .await?; + } + Err(AppError::Processing(e.to_string())) + } + } + } + + pub async fn process(&self, content: &TextContent) -> Result<(), AppError> { + let now = Instant::now(); + + // Perform analyis, this step also includes retrieval + let analysis = self.perform_semantic_analysis(content).await?; + + let end = now.elapsed(); + info!( + "{:?} time elapsed during creation of entities and relationships", + end + ); + + // Convert analysis to application objects + let (entities, relationships) = analysis + .to_database_entities(&content.id, &content.user_id, &self.openai_client) + .await?; + + // Store everything + tokio::try_join!( + self.store_graph_entities(entities, relationships), + self.store_vector_chunks(content), + )?; + + // Store original content + self.db.store_item(content.to_owned()).await?; + + self.db.rebuild_indexes().await?; + Ok(()) + } + + async fn perform_semantic_analysis( + &self, + content: &TextContent, + ) -> Result { + let analyser = IngressAnalyzer::new(&self.db, &self.openai_client); + analyser + .analyze_content( + &content.category, + &content.instructions, + &content.text, + &content.user_id, + ) + .await + } + + async fn store_graph_entities( + &self, + entities: Vec, + relationships: Vec, + ) -> Result<(), AppError> { + for entity in &entities { + debug!("Storing entity: {:?}", entity); + self.db.store_item(entity.clone()).await?; + } + + for relationship in &relationships { + debug!("Storing relationship: {:?}", relationship); + relationship.store_relationship(&self.db).await?; + } + + info!( + "Stored {} entities and {} relationships", + entities.len(), + relationships.len() + ); + Ok(()) + } + + async fn store_vector_chunks(&self, content: &TextContent) -> Result<(), AppError> { + let splitter = TextSplitter::new(500..2000); + let chunks = splitter.chunks(&content.text); + + // Could potentially process chunks in parallel with a bounded concurrent limit + for chunk in chunks { + let embedding = generate_embedding(&self.openai_client, chunk).await?; + let text_chunk = TextChunk::new( + content.id.to_string(), + chunk.to_string(), + embedding, + content.user_id.to_string(), + ); + self.db.store_item(text_chunk).await?; + } + + Ok(()) + } +} diff --git a/crates/ingestion-pipeline/src/types/mod.rs b/crates/ingestion-pipeline/src/types/mod.rs new file mode 100644 index 0000000..2ee6e7a --- /dev/null +++ b/crates/ingestion-pipeline/src/types/mod.rs @@ -0,0 +1,247 @@ +use std::{sync::Arc, time::Duration}; + +use async_openai::types::{ + ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, + CreateChatCompletionRequestArgs, +}; +use common::{ + error::AppError, + storage::types::{ + file_info::FileInfo, ingestion_payload::IngestionPayload, text_content::TextContent, + }, +}; +use reqwest; +use scraper::{Html, Selector}; +use std::fmt::Write; +use tiktoken_rs::{o200k_base, CoreBPE}; + +pub async fn to_text_content( + ingestion_payload: IngestionPayload, + openai_client: &Arc>, +) -> Result { + match ingestion_payload { + IngestionPayload::Url { + url, + instructions, + category, + user_id, + } => { + let text = fetch_text_from_url(&url, openai_client).await?; + Ok(TextContent::new( + text, + instructions.into(), + category.into(), + None, + Some(url.into()), + user_id.into(), + )) + } + IngestionPayload::Text { + text, + instructions, + category, + user_id, + } => Ok(TextContent::new( + text.into(), + instructions.into(), + category.into(), + None, + None, + user_id.into(), + )), + IngestionPayload::File { + file_info, + instructions, + category, + user_id, + } => { + let text = extract_text_from_file(&file_info).await?; + Ok(TextContent::new( + text, + instructions.into(), + category.into(), + Some(file_info.to_owned()), + None, + user_id.into(), + )) + } + } +} + +/// Get text from url, will return it as a markdown formatted string +async fn fetch_text_from_url( + url: &str, + openai_client: &Arc>, +) -> Result { + // Use a client with timeouts and reuse + let client = reqwest::ClientBuilder::new() + .timeout(Duration::from_secs(30)) + .build()?; + let response = client.get(url).send().await?.text().await?; + + // Preallocate string with capacity + let mut structured_content = String::with_capacity(response.len() / 2); + + let document = Html::parse_document(&response); + let main_selectors = Selector::parse( + "article, main, .article-content, .post-content, .entry-content, [role='main']", + ) + .unwrap(); + + let content_element = document + .select(&main_selectors) + .next() + .or_else(|| document.select(&Selector::parse("body").unwrap()).next()) + .ok_or(AppError::NotFound("No content found".into()))?; + + // Compile selectors once + let heading_selector = Selector::parse("h1, h2, h3").unwrap(); + let paragraph_selector = Selector::parse("p").unwrap(); + + // Process content in one pass + for element in content_element.select(&heading_selector) { + let _ = writeln!( + structured_content, + "{}", + element.text().collect::().trim() + ); + } + for element in content_element.select(¶graph_selector) { + let _ = writeln!( + structured_content, + "{}", + element.text().collect::().trim() + ); + } + + let content = structured_content + .replace(|c: char| c.is_control(), " ") + .replace(" ", " "); + process_web_content(content, openai_client).await +} + +pub async fn process_web_content( + content: String, + openai_client: &Arc>, +) -> Result { + const MAX_TOKENS: usize = 122000; + const SYSTEM_PROMPT: &str = r#" + You are a precise content extractor for web pages. Your task: + + 1. Extract ONLY the main article/content from the provided text + 2. Maintain the original content - do not summarize or modify the core information + 3. Ignore peripheral content such as: + - Navigation elements + - Error messages (e.g., "JavaScript required") + - Related articles sections + - Comments + - Social media links + - Advertisement text + + FORMAT: + - Convert tags to markdown headings (#, ##, ###) + - Convert tags to markdown paragraphs + - Preserve quotes and important formatting + - Remove duplicate content + - Remove any metadata or technical artifacts + + OUTPUT RULES: + - Output ONLY the cleaned content in markdown + - Do not add any explanations or meta-commentary + - Do not add summaries or conclusions + - Do not use any XML/HTML tags in the output + "#; + + let bpe = o200k_base()?; + + // Process content in chunks if needed + let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS { + truncate_content(&content, MAX_TOKENS, &bpe)? + } else { + content + }; + + let request = CreateChatCompletionRequestArgs::default() + .model("gpt-4o-mini") + .temperature(0.0) + .max_tokens(16200u32) + .messages([ + ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(), + ChatCompletionRequestUserMessage::from(truncated_content).into(), + ]) + .build()?; + + let response = openai_client.chat().create(request).await?; + + response + .choices + .first() + .and_then(|choice| choice.message.content.as_ref()) + .map(|content| content.to_owned()) + .ok_or(AppError::LLMParsing("No content in response".into())) +} + +fn truncate_content( + content: &str, + max_tokens: usize, + tokenizer: &CoreBPE, +) -> Result { + // Pre-allocate with estimated size + let mut result = String::with_capacity(content.len() / 2); + let mut current_tokens = 0; + + // Process content by paragraph to maintain context + for paragraph in content.split("\n\n") { + let tokens = tokenizer.encode_with_special_tokens(paragraph).len(); + + // Check if adding paragraph exceeds limit + if current_tokens + tokens > max_tokens { + break; + } + + result.push_str(paragraph); + result.push_str("\n\n"); + current_tokens += tokens; + } + + // Ensure we return valid content + if result.is_empty() { + return Err(AppError::Processing("Content exceeds token limit".into())); + } + + Ok(result.trim_end().to_string()) +} + +/// Extracts text from a file based on its MIME type. +async fn extract_text_from_file(file_info: &FileInfo) -> Result { + match file_info.mime_type.as_str() { + "text/plain" => { + // Read the file and return its content + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } + "text/markdown" => { + // Read the file and return its content + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } + "application/pdf" => { + // TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf` + Err(AppError::NotFound(file_info.mime_type.clone())) + } + "image/png" | "image/jpeg" => { + // TODO: Implement OCR on image using a crate like `tesseract` + Err(AppError::NotFound(file_info.mime_type.clone())) + } + "application/octet-stream" => { + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } + "text/x-rust" => { + let content = tokio::fs::read_to_string(&file_info.path).await?; + Ok(content) + } + // Handle other MIME types as needed + _ => Err(AppError::NotFound(file_info.mime_type.clone())), + } +} diff --git a/crates/main/Cargo.toml b/crates/main/Cargo.toml index c683be2..a86316d 100644 --- a/crates/main/Cargo.toml +++ b/crates/main/Cargo.toml @@ -45,6 +45,7 @@ url = { version = "2.5.2", features = ["serde"] } uuid = { version = "1.10.0", features = ["v4", "serde"] } # Reference to api-router +ingestion-pipeline = { path = "../ingestion-pipeline" } api-router = { path = "../api-router" } html-router = { path = "../html-router" } common = { path = "../common" } diff --git a/crates/main/src/worker.rs b/crates/main/src/worker.rs index 04cc7b5..40800a8 100644 --- a/crates/main/src/worker.rs +++ b/crates/main/src/worker.rs @@ -1,14 +1,14 @@ use std::sync::Arc; use common::{ - ingress::content_processor::ContentProcessor, storage::{ db::SurrealDbClient, - types::job::{Job, JobStatus}, + types::ingestion_task::{IngestionTask, IngestionTaskStatus}, }, utils::config::get_config, }; use futures::StreamExt; +use ingestion_pipeline::pipeline::IngestionPipeline; use surrealdb::Action; use tracing::{error, info}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -37,23 +37,23 @@ async fn main() -> Result<(), Box> { let openai_client = Arc::new(async_openai::Client::new()); - let content_processor = ContentProcessor::new(db.clone(), openai_client.clone()).await?; + let ingestion_pipeline = IngestionPipeline::new(db.clone(), openai_client.clone()).await?; loop { - // First, check for any unfinished jobs - let unfinished_jobs = Job::get_unfinished_jobs(&db).await?; + // First, check for any unfinished tasks + let unfinished_tasks = IngestionTask::get_unfinished_tasks(&db).await?; - if !unfinished_jobs.is_empty() { - info!("Found {} unfinished jobs", unfinished_jobs.len()); + if !unfinished_tasks.is_empty() { + info!("Found {} unfinished jobs", unfinished_tasks.len()); - for job in unfinished_jobs { - content_processor.process_job(job).await?; + for task in unfinished_tasks { + ingestion_pipeline.process_task(task).await?; } } // If no unfinished jobs, start listening for new ones info!("Listening for new jobs..."); - let mut job_stream = Job::listen_for_jobs(&db).await?; + let mut job_stream = IngestionTask::listen_for_tasks(&db).await?; while let Some(notification) = job_stream.next().await { match notification { @@ -62,41 +62,42 @@ async fn main() -> Result<(), Box> { match notification.action { Action::Create => { - if let Err(e) = content_processor.process_job(notification.data).await { - error!("Error processing job: {}", e); + if let Err(e) = ingestion_pipeline.process_task(notification.data).await + { + error!("Error processing task: {}", e); } } Action::Update => { match notification.data.status { - JobStatus::Completed - | JobStatus::Error(_) - | JobStatus::Cancelled => { + IngestionTaskStatus::Completed + | IngestionTaskStatus::Error(_) + | IngestionTaskStatus::Cancelled => { info!( - "Skipping already completed/error/cancelled job: {}", + "Skipping already completed/error/cancelled task: {}", notification.data.id ); continue; } - JobStatus::InProgress { attempts, .. } => { + IngestionTaskStatus::InProgress { attempts, .. } => { // Only process if this is a retry after an error, not our own update - if let Ok(Some(current_job)) = - db.get_item::(¬ification.data.id).await + if let Ok(Some(current_task)) = + db.get_item::(¬ification.data.id).await { - match current_job.status { - JobStatus::Error(_) + match current_task.status { + IngestionTaskStatus::Error(_) if attempts - < common::storage::types::job::MAX_ATTEMPTS => + < common::storage::types::ingestion_task::MAX_ATTEMPTS => { // This is a retry after an error if let Err(e) = - content_processor.process_job(current_job).await + ingestion_pipeline.process_task(current_task).await { - error!("Error processing job retry: {}", e); + error!("Error processing task retry: {}", e); } } _ => { info!( - "Skipping in-progress update for job: {}", + "Skipping in-progress update for task: {}", notification.data.id ); continue; @@ -104,12 +105,12 @@ async fn main() -> Result<(), Box> { } } } - JobStatus::Created => { + IngestionTaskStatus::Created => { // Shouldn't happen with Update action, but process if it does if let Err(e) = - content_processor.process_job(notification.data).await + ingestion_pipeline.process_task(notification.data).await { - error!("Error processing job: {}", e); + error!("Error processing task: {}", e); } } } @@ -122,7 +123,7 @@ async fn main() -> Result<(), Box> { } // If we reach here, the stream has ended (connection lost?) - error!("Job stream ended unexpectedly, reconnecting..."); + error!("Database stream ended unexpectedly, reconnecting..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } }