chore: optimize ingest payloads and add parallel task batch store

Parse content before building file payloads to move shared metadata when
possible, add create_all_and_add_to_db for concurrent stores, and extend
tests for batch persistence and payload edge cases.
This commit is contained in:
Per Stark
2026-05-28 21:01:36 +02:00
parent 85336d77a3
commit 97beb91710
4 changed files with 303 additions and 67 deletions
+1 -6
View File
@@ -77,12 +77,7 @@ pub async fn ingest_data(
user_id.clone(), user_id.clone(),
)?; )?;
let futures: Vec<_> = payloads IngestionTask::create_all_and_add_to_db(payloads, &user_id, &state.db).await?;
.into_iter()
.map(|object| IngestionTask::create_and_add_to_db(object, user_id.clone(), &state.db))
.collect();
try_join_all(futures).await?;
Ok((StatusCode::OK, Json(json!({ "status": "success" })))) Ok((StatusCode::OK, Json(json!({ "status": "success" }))))
} }
+211 -42
View File
@@ -26,19 +26,39 @@ pub enum IngestionPayload {
}, },
} }
/// Shared ingest metadata moved or cloned into each payload variant.
struct IngestFields {
context: String,
category: String,
user_id: String,
}
/// Result of parsing optional ingest content before file payloads are built.
#[derive(Debug)]
enum ParsedContent {
/// No URL or text payload should be appended.
Skip,
Url(String),
Text(String),
}
impl ParsedContent {
#[must_use]
fn follows(&self) -> bool {
!matches!(self, Self::Skip)
}
}
impl IngestionPayload { impl IngestionPayload {
/// Creates ingestion payloads from the provided content, context, and files. /// Creates ingestion payloads from the provided content, context, and files.
/// ///
/// # Arguments /// Files are emitted first. When both files and content are present, shared
/// * `content` - Optional textual content to be ingressed /// metadata is cloned per file; otherwise the last file-only payload moves
/// * `context` - context for processing the ingress content /// `context`, `category`, and `user_id` without cloning.
/// * `category` - Category to classify the ingressed content
/// * `files` - Vector of `FileInfo` objects containing information about uploaded files
/// * `user_id` - Identifier of the user performing the ingress operation
/// ///
/// # Returns /// # Errors
/// * `Result<Vec<IngestionPayload>, AppError>` - On success, returns a vector of ingress objects ///
/// (one per file/content type). On failure, returns an `AppError`. /// Returns [`AppError::NotFound`] when no valid files or content are provided.
#[allow(clippy::similar_names)] #[allow(clippy::similar_names)]
pub fn create_ingestion_payload( pub fn create_ingestion_payload(
content: Option<String>, content: Option<String>,
@@ -47,54 +67,90 @@ impl IngestionPayload {
files: Vec<FileInfo>, files: Vec<FileInfo>,
user_id: String, user_id: String,
) -> Result<Vec<IngestionPayload>, AppError> { ) -> Result<Vec<IngestionPayload>, AppError> {
let has_content = content let parsed = Self::parse_content(content);
.as_ref() let content_follows = parsed.follows();
.is_some_and(|c| c.len() > 2); let file_count = files.len();
#[allow(clippy::arithmetic_side_effects)] #[allow(clippy::arithmetic_side_effects)]
let capacity = files.len() + usize::from(has_content); let capacity = file_count + usize::from(content_follows);
let mut object_list = Vec::with_capacity(capacity); let mut object_list = Vec::with_capacity(capacity);
for file in files { let mut fields = Some(IngestFields {
object_list.push(IngestionPayload::File { context,
file_info: file, category,
context: context.clone(), user_id,
category: category.clone(),
user_id: user_id.clone(),
}); });
for (index, file) in files.into_iter().enumerate() {
let is_last_file = index + 1 == file_count;
if content_follows || !is_last_file {
let Some(shared) = fields.as_ref() else {
return Err(AppError::internal("shared ingest fields consumed early"));
};
object_list.push(Self::File {
file_info: file,
context: shared.context.clone(),
category: shared.category.clone(),
user_id: shared.user_id.clone(),
});
} else {
let Some(shared) = fields.take() else {
return Err(AppError::internal("shared ingest fields missing for file"));
};
object_list.push(Self::File {
file_info: file,
context: shared.context,
category: shared.category,
user_id: shared.user_id,
});
}
} }
if let Some(input_content) = content { if let ParsedContent::Url(url) = parsed {
match Url::parse(&input_content) { info!("Detected URL: {url}");
Ok(url) => { let Some(shared) = fields.take() else {
info!("Detected URL: {}", url); return Err(AppError::internal("shared ingest fields missing for url"));
object_list.push(IngestionPayload::Url { };
url: url.to_string(), object_list.push(Self::Url {
context, url,
category, context: shared.context,
user_id, category: shared.category,
user_id: shared.user_id,
}); });
} } else if let ParsedContent::Text(text) = parsed {
Err(_) => {
if input_content.len() > 2 {
info!("Treating input as plain text"); info!("Treating input as plain text");
object_list.push(IngestionPayload::Text { let Some(shared) = fields.take() else {
text: input_content, return Err(AppError::internal("shared ingest fields missing for text"));
context, };
category, object_list.push(Self::Text {
user_id, text,
context: shared.context,
category: shared.category,
user_id: shared.user_id,
}); });
} }
}
}
}
if object_list.is_empty() { if object_list.is_empty() {
return Err(AppError::NotFound( return Err(AppError::NotFound(
"No valid content or files provided".into(), "no valid content or files provided".into(),
)); ));
} }
Ok(object_list) Ok(object_list)
} }
fn parse_content(content: Option<String>) -> ParsedContent {
let Some(input_content) = content else {
return ParsedContent::Skip;
};
if input_content.len() <= 2 {
return ParsedContent::Skip;
}
match Url::parse(&input_content) {
Ok(url) => ParsedContent::Url(url.to_string()),
Err(_) => ParsedContent::Text(input_content),
}
}
} }
#[cfg(test)] #[cfg(test)]
@@ -303,7 +359,7 @@ mod tests {
assert!(result.is_err()); assert!(result.is_err());
match result { match result {
Err(AppError::NotFound(msg)) => { Err(AppError::NotFound(msg)) => {
assert_eq!(msg, "No valid content or files provided"); assert_eq!(msg, "no valid content or files provided");
} }
_ => anyhow::bail!("Expected NotFound error"), _ => anyhow::bail!("Expected NotFound error"),
} }
@@ -328,10 +384,123 @@ mod tests {
assert!(result.is_err()); assert!(result.is_err());
match result { match result {
Err(AppError::NotFound(msg)) => { Err(AppError::NotFound(msg)) => {
assert_eq!(msg, "No valid content or files provided"); assert_eq!(msg, "no valid content or files provided");
} }
_ => anyhow::bail!("Expected NotFound error"), _ => anyhow::bail!("Expected NotFound error"),
} }
Ok(()) Ok(())
} }
#[test]
fn test_create_ingestion_payload_with_file_and_text() -> anyhow::Result<()> {
let text = "plain notes";
let context = "ctx";
let category = "cat";
let user_id = "user123";
let file_info: FileInfo = MockFileInfo {
id: "file1".to_string(),
}
.into();
let result = IngestionPayload::create_ingestion_payload(
Some(text.to_string()),
context.to_string(),
category.to_string(),
vec![file_info],
user_id.to_string(),
)?;
assert_eq!(result.len(), 2);
match (&result[0], &result[1]) {
(
IngestionPayload::File {
file_info: payload_file,
context: file_context,
..
},
IngestionPayload::Text {
text: payload_text,
context: text_context,
category: text_category,
user_id: text_user_id,
},
) => {
assert_eq!(payload_file.id, "file1");
assert_eq!(file_context, context);
assert_eq!(payload_text, text);
assert_eq!(text_context, context);
assert_eq!(text_category, category);
assert_eq!(text_user_id, user_id);
}
_ => anyhow::bail!("expected File then Text"),
}
Ok(())
}
#[test]
fn test_create_ingestion_payload_short_content_with_file_only_yields_file() -> anyhow::Result<()> {
let context = "ctx";
let category = "cat";
let user_id = "user123";
let file_info: FileInfo = MockFileInfo {
id: "file1".to_string(),
}
.into();
let result = IngestionPayload::create_ingestion_payload(
Some("ab".to_string()),
context.to_string(),
category.to_string(),
vec![file_info],
user_id.to_string(),
)?;
assert_eq!(result.len(), 1);
match result.first().context("expected one file payload")? {
IngestionPayload::File {
file_info,
context: payload_context,
category: payload_category,
user_id: payload_user_id,
} => {
assert_eq!(file_info.id, "file1");
assert_eq!(payload_context, context);
assert_eq!(payload_category, category);
assert_eq!(payload_user_id, user_id);
}
_ => anyhow::bail!("expected File variant only"),
}
Ok(())
}
#[test]
fn test_create_ingestion_payload_two_files_without_content() -> anyhow::Result<()> {
let context = "ctx";
let category = "cat";
let user_id = "user123";
let files = vec![
MockFileInfo {
id: "file1".to_string(),
}
.into(),
MockFileInfo {
id: "file2".to_string(),
}
.into(),
];
let result = IngestionPayload::create_ingestion_payload(
None,
context.to_string(),
category.to_string(),
files,
user_id.to_string(),
)?;
assert_eq!(result.len(), 2);
assert!(matches!(result[0], IngestionPayload::File { .. }));
assert!(matches!(result[1], IngestionPayload::File { .. }));
Ok(())
}
} }
+85 -9
View File
@@ -1,6 +1,7 @@
use std::time::Duration; use std::{sync::Arc, time::Duration};
use chrono::Duration as ChronoDuration; use chrono::Duration as ChronoDuration;
use futures::future::try_join_all;
use state_machines::state_machine; use state_machines::state_machine;
use surrealdb::sql::Datetime as SurrealDatetime; use surrealdb::sql::Datetime as SurrealDatetime;
use uuid::Uuid; use uuid::Uuid;
@@ -143,12 +144,16 @@ mod lifecycle {
fn invalid_transition(state: TaskState, event: TaskTransition) -> AppError { fn invalid_transition(state: TaskState, event: TaskTransition) -> AppError {
AppError::Validation(format!( AppError::Validation(format!(
"Invalid task transition: {} -> {}", "invalid task transition: {} -> {}",
state.as_str(), state.as_str(),
event.as_str() event.as_str()
)) ))
} }
fn worker_id_for_bind(worker_id: &Option<String>) -> String {
worker_id.as_deref().unwrap_or("").to_string()
}
stored_object!(IngestionTask, "ingestion_task", { stored_object!(IngestionTask, "ingestion_task", {
content: IngestionPayload, content: IngestionPayload,
state: TaskState, state: TaskState,
@@ -216,14 +221,44 @@ impl IngestionTask {
/// # Errors /// # Errors
/// ///
/// Returns `AppError::Database` if the store operation fails. /// Returns `AppError::Database` if the store operation fails.
/// Returns `AppError::internal` if the database returns no stored record.
pub async fn create_and_add_to_db( pub async fn create_and_add_to_db(
content: IngestionPayload, content: IngestionPayload,
user_id: String, user_id: impl AsRef<str>,
db: &SurrealDbClient, db: &SurrealDbClient,
) -> Result<IngestionTask, AppError> { ) -> Result<IngestionTask, AppError> {
let task = Self::new(content, user_id); let task = Self::new(content, user_id.as_ref().to_string());
db.store_item(task.clone()).await?; db.store_item(task)
Ok(task) .await?
.ok_or_else(|| AppError::internal("ingestion task store returned no record"))
}
/// Create and persist multiple tasks concurrently (one `CREATE` per payload).
///
/// Use this when ingest produces several payloads (files plus URL/text). For a
/// single payload, call [`Self::create_and_add_to_db`] instead.
///
/// # Errors
///
/// Returns the first [`AppError`] from any failed store, same as [`try_join_all`].
pub async fn create_all_and_add_to_db(
contents: Vec<IngestionPayload>,
user_id: impl AsRef<str>,
db: &SurrealDbClient,
) -> Result<Vec<IngestionTask>, AppError> {
if contents.is_empty() {
return Ok(Vec::new());
}
let user_id = Arc::new(user_id.as_ref().to_string());
let db = db.clone();
try_join_all(contents.into_iter().map(|content| {
let user_id = Arc::clone(&user_id);
let db = db.clone();
async move { Self::create_and_add_to_db(content, user_id.as_ref(), &db).await }
}))
.await
} }
/// Claim the next ready task for processing. /// Claim the next ready task for processing.
@@ -325,6 +360,7 @@ impl IngestionTask {
"#; "#;
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let worker_id = worker_id_for_bind(&self.worker_id);
let mut result = db let mut result = db
.client .client
.query(START_PROCESSING_QUERY) .query(START_PROCESSING_QUERY)
@@ -333,7 +369,7 @@ impl IngestionTask {
.bind(("processing", TaskState::Processing.as_str())) .bind(("processing", TaskState::Processing.as_str()))
.bind(("reserved", TaskState::Reserved.as_str())) .bind(("reserved", TaskState::Reserved.as_str()))
.bind(("now", SurrealDatetime::from(now))) .bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default())) .bind(("worker_id", worker_id))
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
@@ -362,6 +398,7 @@ impl IngestionTask {
"#; "#;
let now = chrono::Utc::now(); let now = chrono::Utc::now();
let worker_id = worker_id_for_bind(&self.worker_id);
let mut result = db let mut result = db
.client .client
.query(COMPLETE_QUERY) .query(COMPLETE_QUERY)
@@ -370,7 +407,7 @@ impl IngestionTask {
.bind(("succeeded", TaskState::Succeeded.as_str())) .bind(("succeeded", TaskState::Succeeded.as_str()))
.bind(("processing", TaskState::Processing.as_str())) .bind(("processing", TaskState::Processing.as_str()))
.bind(("now", SurrealDatetime::from(now))) .bind(("now", SurrealDatetime::from(now)))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default())) .bind(("worker_id", worker_id))
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
@@ -413,6 +450,7 @@ impl IngestionTask {
) )
.unwrap_or(now); .unwrap_or(now);
let worker_id = worker_id_for_bind(&self.worker_id);
let mut result = db let mut result = db
.client .client
.query(FAIL_QUERY) .query(FAIL_QUERY)
@@ -424,7 +462,7 @@ impl IngestionTask {
.bind(("retry_at", SurrealDatetime::from(retry_at))) .bind(("retry_at", SurrealDatetime::from(retry_at)))
.bind(("error_code", error.code.clone())) .bind(("error_code", error.code.clone()))
.bind(("error_message", error.message.clone())) .bind(("error_message", error.message.clone()))
.bind(("worker_id", self.worker_id.clone().unwrap_or_default())) .bind(("worker_id", worker_id))
.await?; .await?;
let updated: Option<IngestionTask> = result.take(0)?; let updated: Option<IngestionTask> = result.take(0)?;
@@ -616,6 +654,44 @@ mod tests {
Ok(()) Ok(())
} }
#[tokio::test]
async fn test_create_all_and_add_to_db_empty() -> anyhow::Result<()> {
let db = memory_db().await?;
let tasks = IngestionTask::create_all_and_add_to_db(vec![], "user123", &db).await?;
assert!(tasks.is_empty());
Ok(())
}
#[tokio::test]
async fn test_create_all_and_add_to_db_stores_multiple() -> anyhow::Result<()> {
let db = memory_db().await?;
let user_id = "user123";
let payloads = vec![
create_payload(user_id),
IngestionPayload::Text {
text: "second payload".to_string(),
context: "ctx".to_string(),
category: "cat".to_string(),
user_id: user_id.to_string(),
},
];
let created =
IngestionTask::create_all_and_add_to_db(payloads, user_id, &db).await?;
assert_eq!(created.len(), 2);
assert_ne!(created[0].id, created[1].id);
for task in &created {
let stored: Option<IngestionTask> = db.get_item::<IngestionTask>(&task.id).await?;
let stored = stored.with_context(|| format!("task {} should exist", task.id))?;
assert_eq!(stored.id, task.id);
assert_eq!(stored.state, TaskState::Pending);
assert_eq!(stored.user_id, user_id);
}
Ok(())
}
#[tokio::test] #[tokio::test]
async fn test_create_and_store_task() -> anyhow::Result<()> { async fn test_create_and_store_task() -> anyhow::Result<()> {
let db = memory_db().await?; let db = memory_db().await?;
+2 -6
View File
@@ -146,12 +146,8 @@ pub async fn process_ingest_form(
user.id.clone(), user.id.clone(),
)?; )?;
let futures: Vec<_> = payloads let tasks =
.into_iter() IngestionTask::create_all_and_add_to_db(payloads, &user.id, &state.db).await?;
.map(|object| IngestionTask::create_and_add_to_db(object, user.id.clone(), &state.db))
.collect();
let tasks = try_join_all(futures).await?;
#[derive(Serialize)] #[derive(Serialize)]
struct NewTasksData { struct NewTasksData {