From 97beb917108e644e49e9d50dd21db410682cf312 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Thu, 28 May 2026 21:01:36 +0200 Subject: [PATCH] chore: optimize ingest payloads and add parallel task batch store Parse content before building file payloads to move shared metadata when possible, add create_all_and_add_to_db for concurrent stores, and extend tests for batch persistence and payload edge cases. --- api-router/src/routes/ingest.rs | 7 +- common/src/storage/types/ingestion_payload.rs | 261 +++++++++++++++--- common/src/storage/types/ingestion_task.rs | 94 ++++++- html-router/src/routes/ingestion/handlers.rs | 8 +- 4 files changed, 303 insertions(+), 67 deletions(-) diff --git a/api-router/src/routes/ingest.rs b/api-router/src/routes/ingest.rs index 8334f79..d269cf6 100644 --- a/api-router/src/routes/ingest.rs +++ b/api-router/src/routes/ingest.rs @@ -77,12 +77,7 @@ pub async fn ingest_data( user_id.clone(), )?; - let futures: Vec<_> = payloads - .into_iter() - .map(|object| IngestionTask::create_and_add_to_db(object, user_id.clone(), &state.db)) - .collect(); - - try_join_all(futures).await?; + IngestionTask::create_all_and_add_to_db(payloads, &user_id, &state.db).await?; Ok((StatusCode::OK, Json(json!({ "status": "success" })))) } diff --git a/common/src/storage/types/ingestion_payload.rs b/common/src/storage/types/ingestion_payload.rs index 5b95ae3..5788381 100644 --- a/common/src/storage/types/ingestion_payload.rs +++ b/common/src/storage/types/ingestion_payload.rs @@ -26,19 +26,39 @@ pub enum IngestionPayload { }, } +/// Shared ingest metadata moved or cloned into each payload variant. +struct IngestFields { + context: String, + category: String, + user_id: String, +} + +/// Result of parsing optional ingest content before file payloads are built. +#[derive(Debug)] +enum ParsedContent { + /// No URL or text payload should be appended. + Skip, + Url(String), + Text(String), +} + +impl ParsedContent { + #[must_use] + fn follows(&self) -> bool { + !matches!(self, Self::Skip) + } +} + impl IngestionPayload { /// Creates ingestion payloads from the provided content, context, and files. /// - /// # Arguments - /// * `content` - Optional textual content to be ingressed - /// * `context` - context for processing the ingress content - /// * `category` - Category to classify the ingressed content - /// * `files` - Vector of `FileInfo` objects containing information about uploaded files - /// * `user_id` - Identifier of the user performing the ingress operation + /// Files are emitted first. When both files and content are present, shared + /// metadata is cloned per file; otherwise the last file-only payload moves + /// `context`, `category`, and `user_id` without cloning. /// - /// # Returns - /// * `Result, AppError>` - On success, returns a vector of ingress objects - /// (one per file/content type). On failure, returns an `AppError`. + /// # Errors + /// + /// Returns [`AppError::NotFound`] when no valid files or content are provided. #[allow(clippy::similar_names)] pub fn create_ingestion_payload( content: Option, @@ -47,54 +67,90 @@ impl IngestionPayload { files: Vec, user_id: String, ) -> Result, AppError> { - let has_content = content - .as_ref() - .is_some_and(|c| c.len() > 2); + let parsed = Self::parse_content(content); + let content_follows = parsed.follows(); + let file_count = files.len(); #[allow(clippy::arithmetic_side_effects)] - let capacity = files.len() + usize::from(has_content); + let capacity = file_count + usize::from(content_follows); let mut object_list = Vec::with_capacity(capacity); - for file in files { - object_list.push(IngestionPayload::File { - file_info: file, - context: context.clone(), - category: category.clone(), - user_id: user_id.clone(), - }); + let mut fields = Some(IngestFields { + context, + category, + user_id, + }); + + for (index, file) in files.into_iter().enumerate() { + let is_last_file = index + 1 == file_count; + if content_follows || !is_last_file { + let Some(shared) = fields.as_ref() else { + return Err(AppError::internal("shared ingest fields consumed early")); + }; + object_list.push(Self::File { + file_info: file, + context: shared.context.clone(), + category: shared.category.clone(), + user_id: shared.user_id.clone(), + }); + } else { + let Some(shared) = fields.take() else { + return Err(AppError::internal("shared ingest fields missing for file")); + }; + object_list.push(Self::File { + file_info: file, + context: shared.context, + category: shared.category, + user_id: shared.user_id, + }); + } } - if let Some(input_content) = content { - match Url::parse(&input_content) { - Ok(url) => { - info!("Detected URL: {}", url); - object_list.push(IngestionPayload::Url { - url: url.to_string(), - context, - category, - user_id, - }); - } - Err(_) => { - if input_content.len() > 2 { - info!("Treating input as plain text"); - object_list.push(IngestionPayload::Text { - text: input_content, - context, - category, - user_id, - }); - } - } - } + if let ParsedContent::Url(url) = parsed { + info!("Detected URL: {url}"); + let Some(shared) = fields.take() else { + return Err(AppError::internal("shared ingest fields missing for url")); + }; + object_list.push(Self::Url { + url, + context: shared.context, + category: shared.category, + user_id: shared.user_id, + }); + } else if let ParsedContent::Text(text) = parsed { + info!("Treating input as plain text"); + let Some(shared) = fields.take() else { + return Err(AppError::internal("shared ingest fields missing for text")); + }; + object_list.push(Self::Text { + text, + context: shared.context, + category: shared.category, + user_id: shared.user_id, + }); } if object_list.is_empty() { return Err(AppError::NotFound( - "No valid content or files provided".into(), + "no valid content or files provided".into(), )); } Ok(object_list) } + + fn parse_content(content: Option) -> ParsedContent { + let Some(input_content) = content else { + return ParsedContent::Skip; + }; + + if input_content.len() <= 2 { + return ParsedContent::Skip; + } + + match Url::parse(&input_content) { + Ok(url) => ParsedContent::Url(url.to_string()), + Err(_) => ParsedContent::Text(input_content), + } + } } #[cfg(test)] @@ -303,7 +359,7 @@ mod tests { assert!(result.is_err()); match result { Err(AppError::NotFound(msg)) => { - assert_eq!(msg, "No valid content or files provided"); + assert_eq!(msg, "no valid content or files provided"); } _ => anyhow::bail!("Expected NotFound error"), } @@ -328,10 +384,123 @@ mod tests { assert!(result.is_err()); match result { Err(AppError::NotFound(msg)) => { - assert_eq!(msg, "No valid content or files provided"); + assert_eq!(msg, "no valid content or files provided"); } _ => anyhow::bail!("Expected NotFound error"), } Ok(()) } + + #[test] + fn test_create_ingestion_payload_with_file_and_text() -> anyhow::Result<()> { + let text = "plain notes"; + let context = "ctx"; + let category = "cat"; + let user_id = "user123"; + let file_info: FileInfo = MockFileInfo { + id: "file1".to_string(), + } + .into(); + + let result = IngestionPayload::create_ingestion_payload( + Some(text.to_string()), + context.to_string(), + category.to_string(), + vec![file_info], + user_id.to_string(), + )?; + + assert_eq!(result.len(), 2); + match (&result[0], &result[1]) { + ( + IngestionPayload::File { + file_info: payload_file, + context: file_context, + .. + }, + IngestionPayload::Text { + text: payload_text, + context: text_context, + category: text_category, + user_id: text_user_id, + }, + ) => { + assert_eq!(payload_file.id, "file1"); + assert_eq!(file_context, context); + assert_eq!(payload_text, text); + assert_eq!(text_context, context); + assert_eq!(text_category, category); + assert_eq!(text_user_id, user_id); + } + _ => anyhow::bail!("expected File then Text"), + } + Ok(()) + } + + #[test] + fn test_create_ingestion_payload_short_content_with_file_only_yields_file() -> anyhow::Result<()> { + let context = "ctx"; + let category = "cat"; + let user_id = "user123"; + let file_info: FileInfo = MockFileInfo { + id: "file1".to_string(), + } + .into(); + + let result = IngestionPayload::create_ingestion_payload( + Some("ab".to_string()), + context.to_string(), + category.to_string(), + vec![file_info], + user_id.to_string(), + )?; + + assert_eq!(result.len(), 1); + match result.first().context("expected one file payload")? { + IngestionPayload::File { + file_info, + context: payload_context, + category: payload_category, + user_id: payload_user_id, + } => { + assert_eq!(file_info.id, "file1"); + assert_eq!(payload_context, context); + assert_eq!(payload_category, category); + assert_eq!(payload_user_id, user_id); + } + _ => anyhow::bail!("expected File variant only"), + } + Ok(()) + } + + #[test] + fn test_create_ingestion_payload_two_files_without_content() -> anyhow::Result<()> { + let context = "ctx"; + let category = "cat"; + let user_id = "user123"; + + let files = vec![ + MockFileInfo { + id: "file1".to_string(), + } + .into(), + MockFileInfo { + id: "file2".to_string(), + } + .into(), + ]; + + let result = IngestionPayload::create_ingestion_payload( + None, + context.to_string(), + category.to_string(), + files, + user_id.to_string(), + )?; + + assert_eq!(result.len(), 2); + assert!(matches!(result[0], IngestionPayload::File { .. })); + assert!(matches!(result[1], IngestionPayload::File { .. })); + Ok(()) + } } diff --git a/common/src/storage/types/ingestion_task.rs b/common/src/storage/types/ingestion_task.rs index aec5ebf..2e5930d 100644 --- a/common/src/storage/types/ingestion_task.rs +++ b/common/src/storage/types/ingestion_task.rs @@ -1,6 +1,7 @@ -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use chrono::Duration as ChronoDuration; +use futures::future::try_join_all; use state_machines::state_machine; use surrealdb::sql::Datetime as SurrealDatetime; use uuid::Uuid; @@ -143,12 +144,16 @@ mod lifecycle { fn invalid_transition(state: TaskState, event: TaskTransition) -> AppError { AppError::Validation(format!( - "Invalid task transition: {} -> {}", + "invalid task transition: {} -> {}", state.as_str(), event.as_str() )) } +fn worker_id_for_bind(worker_id: &Option) -> String { + worker_id.as_deref().unwrap_or("").to_string() +} + stored_object!(IngestionTask, "ingestion_task", { content: IngestionPayload, state: TaskState, @@ -216,14 +221,44 @@ impl IngestionTask { /// # Errors /// /// Returns `AppError::Database` if the store operation fails. + /// Returns `AppError::internal` if the database returns no stored record. pub async fn create_and_add_to_db( content: IngestionPayload, - user_id: String, + user_id: impl AsRef, db: &SurrealDbClient, ) -> Result { - let task = Self::new(content, user_id); - db.store_item(task.clone()).await?; - Ok(task) + let task = Self::new(content, user_id.as_ref().to_string()); + db.store_item(task) + .await? + .ok_or_else(|| AppError::internal("ingestion task store returned no record")) + } + + /// Create and persist multiple tasks concurrently (one `CREATE` per payload). + /// + /// Use this when ingest produces several payloads (files plus URL/text). For a + /// single payload, call [`Self::create_and_add_to_db`] instead. + /// + /// # Errors + /// + /// Returns the first [`AppError`] from any failed store, same as [`try_join_all`]. + pub async fn create_all_and_add_to_db( + contents: Vec, + user_id: impl AsRef, + db: &SurrealDbClient, + ) -> Result, AppError> { + if contents.is_empty() { + return Ok(Vec::new()); + } + + let user_id = Arc::new(user_id.as_ref().to_string()); + let db = db.clone(); + + try_join_all(contents.into_iter().map(|content| { + let user_id = Arc::clone(&user_id); + let db = db.clone(); + async move { Self::create_and_add_to_db(content, user_id.as_ref(), &db).await } + })) + .await } /// Claim the next ready task for processing. @@ -325,6 +360,7 @@ impl IngestionTask { "#; let now = chrono::Utc::now(); + let worker_id = worker_id_for_bind(&self.worker_id); let mut result = db .client .query(START_PROCESSING_QUERY) @@ -333,7 +369,7 @@ impl IngestionTask { .bind(("processing", TaskState::Processing.as_str())) .bind(("reserved", TaskState::Reserved.as_str())) .bind(("now", SurrealDatetime::from(now))) - .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .bind(("worker_id", worker_id)) .await?; let updated: Option = result.take(0)?; @@ -362,6 +398,7 @@ impl IngestionTask { "#; let now = chrono::Utc::now(); + let worker_id = worker_id_for_bind(&self.worker_id); let mut result = db .client .query(COMPLETE_QUERY) @@ -370,7 +407,7 @@ impl IngestionTask { .bind(("succeeded", TaskState::Succeeded.as_str())) .bind(("processing", TaskState::Processing.as_str())) .bind(("now", SurrealDatetime::from(now))) - .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .bind(("worker_id", worker_id)) .await?; let updated: Option = result.take(0)?; @@ -413,6 +450,7 @@ impl IngestionTask { ) .unwrap_or(now); + let worker_id = worker_id_for_bind(&self.worker_id); let mut result = db .client .query(FAIL_QUERY) @@ -424,7 +462,7 @@ impl IngestionTask { .bind(("retry_at", SurrealDatetime::from(retry_at))) .bind(("error_code", error.code.clone())) .bind(("error_message", error.message.clone())) - .bind(("worker_id", self.worker_id.clone().unwrap_or_default())) + .bind(("worker_id", worker_id)) .await?; let updated: Option = result.take(0)?; @@ -616,6 +654,44 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_create_all_and_add_to_db_empty() -> anyhow::Result<()> { + let db = memory_db().await?; + let tasks = IngestionTask::create_all_and_add_to_db(vec![], "user123", &db).await?; + assert!(tasks.is_empty()); + Ok(()) + } + + #[tokio::test] + async fn test_create_all_and_add_to_db_stores_multiple() -> anyhow::Result<()> { + let db = memory_db().await?; + let user_id = "user123"; + let payloads = vec![ + create_payload(user_id), + IngestionPayload::Text { + text: "second payload".to_string(), + context: "ctx".to_string(), + category: "cat".to_string(), + user_id: user_id.to_string(), + }, + ]; + + let created = + IngestionTask::create_all_and_add_to_db(payloads, user_id, &db).await?; + + assert_eq!(created.len(), 2); + assert_ne!(created[0].id, created[1].id); + + for task in &created { + let stored: Option = db.get_item::(&task.id).await?; + let stored = stored.with_context(|| format!("task {} should exist", task.id))?; + assert_eq!(stored.id, task.id); + assert_eq!(stored.state, TaskState::Pending); + assert_eq!(stored.user_id, user_id); + } + Ok(()) + } + #[tokio::test] async fn test_create_and_store_task() -> anyhow::Result<()> { let db = memory_db().await?; diff --git a/html-router/src/routes/ingestion/handlers.rs b/html-router/src/routes/ingestion/handlers.rs index 14bdcec..ea7cc64 100644 --- a/html-router/src/routes/ingestion/handlers.rs +++ b/html-router/src/routes/ingestion/handlers.rs @@ -146,12 +146,8 @@ pub async fn process_ingest_form( user.id.clone(), )?; - let futures: Vec<_> = payloads - .into_iter() - .map(|object| IngestionTask::create_and_add_to_db(object, user.id.clone(), &state.db)) - .collect(); - - let tasks = try_join_all(futures).await?; + let tasks = + IngestionTask::create_all_and_add_to_db(payloads, &user.id, &state.db).await?; #[derive(Serialize)] struct NewTasksData {