mirror of
https://github.com/perstarkse/minne.git
synced 2026-05-29 19:00:51 +02:00
chore: lowercase all error messages and add # Errors doc sections
- Fix err-lowercase-msg: normalize all #[error(...)] display strings to lowercase (AppError, FileError, ApiErr) and update affected tests - Fix err-doc-errors: add # Errors sections to 25+ fallible public functions across db.rs, store.rs, embedding.rs, indexes.rs, ingestion_task.rs, and ingest_limits.rs
This commit is contained in:
@@ -9,19 +9,19 @@ use thiserror::Error;
|
|||||||
|
|
||||||
#[derive(Error, Debug, Serialize, Clone)]
|
#[derive(Error, Debug, Serialize, Clone)]
|
||||||
pub enum ApiErr {
|
pub enum ApiErr {
|
||||||
#[error("Internal server error")]
|
#[error("internal server error")]
|
||||||
InternalError(String),
|
InternalError(String),
|
||||||
|
|
||||||
#[error("Validation error: {0}")]
|
#[error("validation error: {0}")]
|
||||||
ValidationError(String),
|
ValidationError(String),
|
||||||
|
|
||||||
#[error("Not found: {0}")]
|
#[error("not found: {0}")]
|
||||||
NotFound(String),
|
NotFound(String),
|
||||||
|
|
||||||
#[error("Unauthorized: {0}")]
|
#[error("unauthorized: {0}")]
|
||||||
Unauthorized(String),
|
Unauthorized(String),
|
||||||
|
|
||||||
#[error("Payload too large: {0}")]
|
#[error("payload too large: {0}")]
|
||||||
PayloadTooLarge(String),
|
PayloadTooLarge(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -157,12 +157,12 @@ mod tests {
|
|||||||
let error = ApiErr::ValidationError(message.to_string());
|
let error = ApiErr::ValidationError(message.to_string());
|
||||||
|
|
||||||
// Check that the error itself contains the message
|
// Check that the error itself contains the message
|
||||||
assert_eq!(error.to_string(), format!("Validation error: {message}"));
|
assert_eq!(error.to_string(), format!("validation error: {message}"));
|
||||||
|
|
||||||
// For not found errors
|
// For not found errors
|
||||||
let message = "user not found";
|
let message = "user not found";
|
||||||
let error = ApiErr::NotFound(message.to_string());
|
let error = ApiErr::NotFound(message.to_string());
|
||||||
assert_eq!(error.to_string(), format!("Not found: {message}"));
|
assert_eq!(error.to_string(), format!("not found: {message}"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Alternative approach for internal error test
|
// Alternative approach for internal error test
|
||||||
@@ -175,7 +175,7 @@ mod tests {
|
|||||||
let api_error = ApiErr::InternalError(sensitive_info.to_string());
|
let api_error = ApiErr::InternalError(sensitive_info.to_string());
|
||||||
|
|
||||||
// Check the error message is correctly set
|
// Check the error message is correctly set
|
||||||
assert_eq!(api_error.to_string(), "Internal server error");
|
assert_eq!(api_error.to_string(), "internal server error");
|
||||||
|
|
||||||
// Also verify correct status code
|
// Also verify correct status code
|
||||||
assert_status_code(api_error, StatusCode::INTERNAL_SERVER_ERROR);
|
assert_status_code(api_error, StatusCode::INTERNAL_SERVER_ERROR);
|
||||||
|
|||||||
+15
-15
@@ -8,34 +8,34 @@ use crate::storage::types::file_info::FileError;
|
|||||||
#[allow(clippy::module_name_repetitions)]
|
#[allow(clippy::module_name_repetitions)]
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
#[error("Database error: {0}")]
|
#[error("database error: {0}")]
|
||||||
Database(#[from] surrealdb::Error),
|
Database(#[from] surrealdb::Error),
|
||||||
#[error("OpenAI error: {0}")]
|
#[error("openai error: {0}")]
|
||||||
OpenAI(#[from] OpenAIError),
|
OpenAI(#[from] OpenAIError),
|
||||||
#[error("File error: {0}")]
|
#[error("file error: {0}")]
|
||||||
File(#[from] FileError),
|
File(#[from] FileError),
|
||||||
#[error("Not found: {0}")]
|
#[error("not found: {0}")]
|
||||||
NotFound(String),
|
NotFound(String),
|
||||||
#[error("Validation error: {0}")]
|
#[error("validation error: {0}")]
|
||||||
Validation(String),
|
Validation(String),
|
||||||
#[error("Authorization error: {0}")]
|
#[error("authorization error: {0}")]
|
||||||
Auth(String),
|
Auth(String),
|
||||||
#[error("LLM parsing error: {0}")]
|
#[error("llm parsing error: {0}")]
|
||||||
LLMParsing(String),
|
LLMParsing(String),
|
||||||
#[error("Task join error: {0}")]
|
#[error("task join error: {0}")]
|
||||||
Join(#[from] JoinError),
|
Join(#[from] JoinError),
|
||||||
#[error("Graph mapper error: {0}")]
|
#[error("graph mapper error: {0}")]
|
||||||
GraphMapper(String),
|
GraphMapper(String),
|
||||||
#[error("IO error: {0}")]
|
#[error("io error: {0}")]
|
||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
#[error("Reqwest error: {0}")]
|
#[error("reqwest error: {0}")]
|
||||||
Reqwest(#[from] reqwest::Error),
|
Reqwest(#[from] reqwest::Error),
|
||||||
#[error("Storage error: {0}")]
|
#[error("storage error: {0}")]
|
||||||
Storage(#[from] object_store::Error),
|
Storage(#[from] object_store::Error),
|
||||||
#[error("Ingestion Processing error: {0}")]
|
#[error("ingestion processing error: {0}")]
|
||||||
Processing(String),
|
Processing(String),
|
||||||
#[error("DOM smoothie error: {0}")]
|
#[error("dom smoothie error: {0}")]
|
||||||
DomSmoothie(#[from] dom_smoothie::ReadabilityError),
|
DomSmoothie(#[from] dom_smoothie::ReadabilityError),
|
||||||
#[error("Internal service error: {0}")]
|
#[error("internal service error: {0}")]
|
||||||
InternalError(String),
|
InternalError(String),
|
||||||
}
|
}
|
||||||
|
|||||||
+68
-23
@@ -26,12 +26,20 @@ pub trait ProvidesDb {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl SurrealDbClient {
|
impl SurrealDbClient {
|
||||||
/// # Initialize a new datbase client
|
/// Initialize a new database client.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// * `address` — Database connection string (e.g. `ws://localhost:8000` or `mem://`).
|
||||||
/// * `SurrealDbClient` initialized
|
/// * `username` — Root username for authentication.
|
||||||
|
/// * `password` — Root password for authentication.
|
||||||
|
/// * `namespace` — SurrealDB namespace to use.
|
||||||
|
/// * `database` — SurrealDB database to use.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the connection, authentication, or namespace/database selection fails.
|
||||||
|
/// In-memory (`mem://`) connections skip authentication.
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
address: &str,
|
address: &str,
|
||||||
username: &str,
|
username: &str,
|
||||||
@@ -52,6 +60,19 @@ impl SurrealDbClient {
|
|||||||
Ok(SurrealDbClient { client: db })
|
Ok(SurrealDbClient { client: db })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Initialize a new database client using namespace-level authentication.
|
||||||
|
///
|
||||||
|
/// # Arguments
|
||||||
|
///
|
||||||
|
/// * `address` — Database connection string.
|
||||||
|
/// * `namespace` — SurrealDB namespace to use (also used for auth).
|
||||||
|
/// * `username` — Namespace username for authentication.
|
||||||
|
/// * `password` — Namespace password for authentication.
|
||||||
|
/// * `database` — SurrealDB database to use.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the connection, namespace authentication, or namespace/database selection fails.
|
||||||
pub async fn new_with_namespace_user(
|
pub async fn new_with_namespace_user(
|
||||||
address: &str,
|
address: &str,
|
||||||
namespace: &str,
|
namespace: &str,
|
||||||
@@ -70,6 +91,11 @@ impl SurrealDbClient {
|
|||||||
Ok(SurrealDbClient { client: db })
|
Ok(SurrealDbClient { client: db })
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create an Axum session store backed by SurrealDB.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `SessionError` if the session store configuration or table creation fails.
|
||||||
pub async fn create_session_store(
|
pub async fn create_session_store(
|
||||||
&self,
|
&self,
|
||||||
) -> Result<SessionStore<SessionSurrealPool<Any>>, SessionError> {
|
) -> Result<SessionStore<SessionSurrealPool<Any>>, SessionError> {
|
||||||
@@ -88,6 +114,10 @@ impl SurrealDbClient {
|
|||||||
/// This function should be called during application startup, after connecting to
|
/// This function should be called during application startup, after connecting to
|
||||||
/// the database and selecting the appropriate namespace and database, but before
|
/// the database and selecting the appropriate namespace and database, but before
|
||||||
/// the application starts performing operations that rely on the schema.
|
/// the application starts performing operations that rely on the schema.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::InternalError` if the migration runner fails to apply any migration.
|
||||||
pub async fn apply_migrations(&self) -> Result<(), AppError> {
|
pub async fn apply_migrations(&self) -> Result<(), AppError> {
|
||||||
debug!("Applying migrations");
|
debug!("Applying migrations");
|
||||||
MigrationRunner::new(&self.client)
|
MigrationRunner::new(&self.client)
|
||||||
@@ -99,13 +129,15 @@ impl SurrealDbClient {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to store a object in SurrealDB, requires the struct to implement StoredObject
|
/// Store an object in SurrealDB.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `item` - The item to be stored
|
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// * `item` — The item to store. Must implement `StoredObject`.
|
||||||
/// * `Result` - Item or Error
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the database create operation fails.
|
||||||
pub async fn store_item<T>(&self, item: T) -> Result<Option<T>, Error>
|
pub async fn store_item<T>(&self, item: T) -> Result<Option<T>, Error>
|
||||||
where
|
where
|
||||||
T: StoredObject + Send + Sync + 'static,
|
T: StoredObject + Send + Sync + 'static,
|
||||||
@@ -116,8 +148,13 @@ impl SurrealDbClient {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to upsert an object in SurrealDB, replacing any existing record
|
/// Upsert an object in SurrealDB, replacing any existing record with the same ID.
|
||||||
/// with the same ID. Useful for idempotent ingestion flows.
|
///
|
||||||
|
/// Useful for idempotent ingestion flows.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the database upsert operation fails.
|
||||||
pub async fn upsert_item<T>(&self, item: T) -> Result<Option<T>, Error>
|
pub async fn upsert_item<T>(&self, item: T) -> Result<Option<T>, Error>
|
||||||
where
|
where
|
||||||
T: StoredObject + Send + Sync + 'static,
|
T: StoredObject + Send + Sync + 'static,
|
||||||
@@ -129,10 +166,11 @@ impl SurrealDbClient {
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to retrieve all objects from a certain table, requires the struct to implement StoredObject
|
/// Retrieve all objects from a table.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Errors
|
||||||
/// * `Result` - Vec<T> or Error
|
///
|
||||||
|
/// Returns `Err` if the database select operation fails.
|
||||||
pub async fn get_all_stored_items<T>(&self) -> Result<Vec<T>, Error>
|
pub async fn get_all_stored_items<T>(&self) -> Result<Vec<T>, Error>
|
||||||
where
|
where
|
||||||
T: for<'de> StoredObject,
|
T: for<'de> StoredObject,
|
||||||
@@ -140,13 +178,16 @@ impl SurrealDbClient {
|
|||||||
self.client.select(T::table_name()).await
|
self.client.select(T::table_name()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to retrieve a single object by its ID, requires the struct to implement StoredObject
|
/// Retrieve a single object by its ID.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `id` - The ID of the item to retrieve
|
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// * `id` — The ID of the item to retrieve.
|
||||||
/// * `Result<Option<T>, Error>` - The found item or Error
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the database select operation fails.
|
||||||
|
/// Returns `Ok(None)` if no record with the given ID exists.
|
||||||
pub async fn get_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
pub async fn get_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
||||||
where
|
where
|
||||||
T: for<'de> StoredObject,
|
T: for<'de> StoredObject,
|
||||||
@@ -154,13 +195,16 @@ impl SurrealDbClient {
|
|||||||
self.client.select((T::table_name(), id)).await
|
self.client.select((T::table_name(), id)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to delete a single object by its ID, requires the struct to implement StoredObject
|
/// Delete a single object by its ID.
|
||||||
///
|
///
|
||||||
/// # Arguments
|
/// # Arguments
|
||||||
/// * `id` - The ID of the item to delete
|
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// * `id` — The ID of the item to delete.
|
||||||
/// * `Result<Option<T>, Error>` - The deleted item or Error
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the database delete operation fails.
|
||||||
|
/// Returns `Ok(None)` if no record with the given ID exists.
|
||||||
pub async fn delete_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
pub async fn delete_item<T>(&self, id: &str) -> Result<Option<T>, Error>
|
||||||
where
|
where
|
||||||
T: for<'de> StoredObject,
|
T: for<'de> StoredObject,
|
||||||
@@ -168,10 +212,11 @@ impl SurrealDbClient {
|
|||||||
self.client.delete((T::table_name(), id)).await
|
self.client.delete((T::table_name(), id)).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Operation to listen to a table for updates, requires the struct to implement StoredObject
|
/// Listen to a table for real-time updates via a live query stream.
|
||||||
///
|
///
|
||||||
/// # Returns
|
/// # Errors
|
||||||
/// * `Result<Option<T>, Error>` - The deleted item or Error
|
///
|
||||||
|
/// Returns `Err` if the database live query subscription fails.
|
||||||
pub async fn listen<T>(
|
pub async fn listen<T>(
|
||||||
&self,
|
&self,
|
||||||
) -> Result<impl Stream<Item = Result<Notification<T>, Error>>, Error>
|
) -> Result<impl Stream<Item = Result<Notification<T>, Error>>, Error>
|
||||||
|
|||||||
@@ -159,6 +159,10 @@ impl FtsIndexSpec {
|
|||||||
|
|
||||||
/// Build runtime Surreal indexes (FTS + HNSW) using concurrent creation with readiness polling.
|
/// Build runtime Surreal indexes (FTS + HNSW) using concurrent creation with readiness polling.
|
||||||
/// Idempotent: safe to call multiple times and will overwrite HNSW definitions when the dimension changes.
|
/// Idempotent: safe to call multiple times and will overwrite HNSW definitions when the dimension changes.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::InternalError` if any index definition or polling step fails.
|
||||||
pub async fn ensure_runtime(
|
pub async fn ensure_runtime(
|
||||||
db: &SurrealDbClient,
|
db: &SurrealDbClient,
|
||||||
embedding_dimension: usize,
|
embedding_dimension: usize,
|
||||||
@@ -169,6 +173,10 @@ pub async fn ensure_runtime(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined.
|
/// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::InternalError` if any index rebuild operation fails.
|
||||||
pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> {
|
pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> {
|
||||||
rebuild_inner(db)
|
rebuild_inner(db)
|
||||||
.await
|
.await
|
||||||
|
|||||||
@@ -31,6 +31,11 @@ impl StorageManager {
|
|||||||
///
|
///
|
||||||
/// This method validates the configuration and creates the appropriate
|
/// This method validates the configuration and creates the appropriate
|
||||||
/// storage backend with proper initialization.
|
/// storage backend with proper initialization.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the storage backend cannot be created or initialised
|
||||||
|
/// (e.g. missing S3 bucket, local filesystem permission error).
|
||||||
pub async fn new(cfg: &AppConfig) -> object_store::Result<Self> {
|
pub async fn new(cfg: &AppConfig) -> object_store::Result<Self> {
|
||||||
let backend_kind = cfg.storage;
|
let backend_kind = cfg.storage;
|
||||||
let (store, local_base) = create_storage_backend(cfg).await?;
|
let (store, local_base) = create_storage_backend(cfg).await?;
|
||||||
@@ -90,6 +95,10 @@ impl StorageManager {
|
|||||||
///
|
///
|
||||||
/// This operation persists data using the underlying storage backend.
|
/// This operation persists data using the underlying storage backend.
|
||||||
/// For memory backends, data persists for the lifetime of the StorageManager.
|
/// For memory backends, data persists for the lifetime of the StorageManager.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the underlying storage backend fails to persist the data.
|
||||||
pub async fn put(&self, location: &str, data: Bytes) -> object_store::Result<()> {
|
pub async fn put(&self, location: &str, data: Bytes) -> object_store::Result<()> {
|
||||||
let path = ObjPath::from(location);
|
let path = ObjPath::from(location);
|
||||||
let payload = object_store::PutPayload::from_bytes(data);
|
let payload = object_store::PutPayload::from_bytes(data);
|
||||||
@@ -99,6 +108,10 @@ impl StorageManager {
|
|||||||
/// Retrieve bytes from the specified location.
|
/// Retrieve bytes from the specified location.
|
||||||
///
|
///
|
||||||
/// Returns the full contents buffered in memory.
|
/// Returns the full contents buffered in memory.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the location does not exist or the underlying backend fails.
|
||||||
pub async fn get(&self, location: &str) -> object_store::Result<Bytes> {
|
pub async fn get(&self, location: &str) -> object_store::Result<Bytes> {
|
||||||
let path = ObjPath::from(location);
|
let path = ObjPath::from(location);
|
||||||
let result = self.store.get(&path).await?;
|
let result = self.store.get(&path).await?;
|
||||||
@@ -108,6 +121,10 @@ impl StorageManager {
|
|||||||
/// Get a streaming handle for large objects.
|
/// Get a streaming handle for large objects.
|
||||||
///
|
///
|
||||||
/// Returns a fallible stream of Bytes chunks suitable for large file processing.
|
/// Returns a fallible stream of Bytes chunks suitable for large file processing.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the location does not exist or the underlying backend fails.
|
||||||
pub async fn get_stream(
|
pub async fn get_stream(
|
||||||
&self,
|
&self,
|
||||||
location: &str,
|
location: &str,
|
||||||
@@ -120,6 +137,10 @@ impl StorageManager {
|
|||||||
/// Delete all objects below the specified prefix.
|
/// Delete all objects below the specified prefix.
|
||||||
///
|
///
|
||||||
/// For local filesystem backends, this also attempts to clean up empty directories.
|
/// For local filesystem backends, this also attempts to clean up empty directories.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the underlying backend fails during deletion.
|
||||||
pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> {
|
pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> {
|
||||||
let prefix_path = ObjPath::from(prefix);
|
let prefix_path = ObjPath::from(prefix);
|
||||||
let locations = self
|
let locations = self
|
||||||
@@ -141,6 +162,10 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// List all objects below the specified prefix.
|
/// List all objects below the specified prefix.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the underlying backend fails to list objects.
|
||||||
pub async fn list(
|
pub async fn list(
|
||||||
&self,
|
&self,
|
||||||
prefix: Option<&str>,
|
prefix: Option<&str>,
|
||||||
@@ -150,6 +175,10 @@ impl StorageManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Check if an object exists at the specified location.
|
/// Check if an object exists at the specified location.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the underlying backend returns a non-NotFound error.
|
||||||
pub async fn exists(&self, location: &str) -> object_store::Result<bool> {
|
pub async fn exists(&self, location: &str) -> object_store::Result<bool> {
|
||||||
let path = ObjPath::from(location);
|
let path = ObjPath::from(location);
|
||||||
self.store
|
self.store
|
||||||
|
|||||||
@@ -21,25 +21,25 @@ use crate::{
|
|||||||
|
|
||||||
#[derive(Error, Debug)]
|
#[derive(Error, Debug)]
|
||||||
pub enum FileError {
|
pub enum FileError {
|
||||||
#[error("File not found for UUID: {0}")]
|
#[error("file not found for uuid: {0}")]
|
||||||
FileNotFound(String),
|
FileNotFound(String),
|
||||||
|
|
||||||
#[error("IO error occurred: {0}")]
|
#[error("io error occurred: {0}")]
|
||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
|
|
||||||
#[error("Duplicate file detected with SHA256: {0}")]
|
#[error("duplicate file detected with sha256: {0}")]
|
||||||
DuplicateFile(String),
|
DuplicateFile(String),
|
||||||
|
|
||||||
#[error("SurrealDB error: {0}")]
|
#[error("surrealdb error: {0}")]
|
||||||
SurrealError(#[from] surrealdb::Error),
|
SurrealError(#[from] surrealdb::Error),
|
||||||
|
|
||||||
#[error("Failed to persist file: {0}")]
|
#[error("failed to persist file: {0}")]
|
||||||
PersistError(#[from] tempfile::PersistError),
|
PersistError(#[from] tempfile::PersistError),
|
||||||
|
|
||||||
#[error("File name missing in metadata")]
|
#[error("file name missing in metadata")]
|
||||||
MissingFileName,
|
MissingFileName,
|
||||||
|
|
||||||
#[error("Object store error: {0}")]
|
#[error("object store error: {0}")]
|
||||||
ObjectStore(#[from] ObjectStoreError),
|
ObjectStore(#[from] ObjectStoreError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -207,6 +207,11 @@ impl IngestionTask {
|
|||||||
Duration::from_secs(u64::try_from(self.lease_duration_secs.max(0)).unwrap_or(0))
|
Duration::from_secs(u64::try_from(self.lease_duration_secs.max(0)).unwrap_or(0))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new task and immediately persist it to the database.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Database` if the store operation fails.
|
||||||
pub async fn create_and_add_to_db(
|
pub async fn create_and_add_to_db(
|
||||||
content: IngestionPayload,
|
content: IngestionPayload,
|
||||||
user_id: String,
|
user_id: String,
|
||||||
@@ -217,6 +222,14 @@ impl IngestionTask {
|
|||||||
Ok(task)
|
Ok(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Claim the next ready task for processing.
|
||||||
|
///
|
||||||
|
/// Atomically reserves a task by transitioning it from a candidate state to `Reserved`.
|
||||||
|
/// Returns `Ok(None)` if no task is ready to claim.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Database` if the update query fails.
|
||||||
pub async fn claim_next_ready(
|
pub async fn claim_next_ready(
|
||||||
db: &SurrealDbClient,
|
db: &SurrealDbClient,
|
||||||
worker_id: &str,
|
worker_id: &str,
|
||||||
@@ -291,6 +304,12 @@ impl IngestionTask {
|
|||||||
Ok(task)
|
Ok(task)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transition this task from `Reserved` to `Processing`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is not in `Reserved` state
|
||||||
|
/// or belongs to a different worker. Returns `AppError::Database` on DB failure.
|
||||||
pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
||||||
const START_PROCESSING_QUERY: &str = r#"
|
const START_PROCESSING_QUERY: &str = r#"
|
||||||
UPDATE type::thing($table, $id)
|
UPDATE type::thing($table, $id)
|
||||||
@@ -317,6 +336,12 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::StartProcessing))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::StartProcessing))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transition this task from `Processing` to `Succeeded`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is not in `Processing` state
|
||||||
|
/// or belongs to a different worker. Returns `AppError::Database` on DB failure.
|
||||||
pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
||||||
const COMPLETE_QUERY: &str = r#"
|
const COMPLETE_QUERY: &str = r#"
|
||||||
UPDATE type::thing($table, $id)
|
UPDATE type::thing($table, $id)
|
||||||
@@ -348,6 +373,14 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Succeed))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Succeed))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transition this task from `Processing` to `Failed`.
|
||||||
|
///
|
||||||
|
/// The task will be rescheduled for retry after `retry_delay`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is not in `Processing` state
|
||||||
|
/// or belongs to a different worker. Returns `AppError::Database` on DB failure.
|
||||||
pub async fn mark_failed(
|
pub async fn mark_failed(
|
||||||
&self,
|
&self,
|
||||||
error: TaskErrorInfo,
|
error: TaskErrorInfo,
|
||||||
@@ -394,6 +427,12 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Fail))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Fail))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transition this task from `Failed` to `DeadLetter`.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is not in `Failed` state.
|
||||||
|
/// Returns `AppError::Database` on DB failure.
|
||||||
pub async fn mark_dead_letter(
|
pub async fn mark_dead_letter(
|
||||||
&self,
|
&self,
|
||||||
error: TaskErrorInfo,
|
error: TaskErrorInfo,
|
||||||
@@ -430,6 +469,12 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::DeadLetter))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::DeadLetter))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transition this task to `Cancelled` from any non-terminal state.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is in a terminal state.
|
||||||
|
/// Returns `AppError::Database` on DB failure.
|
||||||
pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
||||||
const CANCEL_QUERY: &str = r#"
|
const CANCEL_QUERY: &str = r#"
|
||||||
UPDATE type::thing($table, $id)
|
UPDATE type::thing($table, $id)
|
||||||
@@ -463,6 +508,12 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Cancel))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Cancel))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Release a reserved task back to `Pending` state.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Validation` if the task is not in `Reserved` state.
|
||||||
|
/// Returns `AppError::Database` on DB failure.
|
||||||
pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
pub async fn release(&self, db: &SurrealDbClient) -> Result<IngestionTask, AppError> {
|
||||||
const RELEASE_QUERY: &str = r#"
|
const RELEASE_QUERY: &str = r#"
|
||||||
UPDATE type::thing($table, $id)
|
UPDATE type::thing($table, $id)
|
||||||
@@ -489,6 +540,11 @@ impl IngestionTask {
|
|||||||
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Release))
|
updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Release))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Retrieve all non-terminal tasks across active states.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::Database` if the query fails.
|
||||||
pub async fn get_unfinished_tasks(
|
pub async fn get_unfinished_tasks(
|
||||||
db: &SurrealDbClient,
|
db: &SurrealDbClient,
|
||||||
) -> Result<Vec<IngestionTask>, AppError> {
|
) -> Result<Vec<IngestionTask>, AppError> {
|
||||||
|
|||||||
@@ -106,6 +106,12 @@ impl EmbeddingProvider {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate an embedding vector for the given text.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the backend API call fails, FastEmbed initialisation fails,
|
||||||
|
/// or the backend returns no embedding data.
|
||||||
pub async fn embed(&self, text: &str) -> Result<Vec<f32>> {
|
pub async fn embed(&self, text: &str) -> Result<Vec<f32>> {
|
||||||
match &self.inner {
|
match &self.inner {
|
||||||
EmbeddingInner::Hashed { dimension } => Ok(hashed_embedding(text, *dimension)),
|
EmbeddingInner::Hashed { dimension } => Ok(hashed_embedding(text, *dimension)),
|
||||||
@@ -144,6 +150,12 @@ impl EmbeddingProvider {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Generate embedding vectors for a batch of texts.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `Err` if the backend API call fails or returns no embedding data.
|
||||||
|
/// Returns an empty `Vec` when `texts` is empty.
|
||||||
pub async fn embed_batch(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>> {
|
pub async fn embed_batch(&self, texts: Vec<String>) -> Result<Vec<Vec<f32>>> {
|
||||||
match &self.inner {
|
match &self.inner {
|
||||||
EmbeddingInner::Hashed { dimension } => Ok(texts
|
EmbeddingInner::Hashed { dimension } => Ok(texts
|
||||||
@@ -309,7 +321,11 @@ fn bucket(token: &str, dimension: usize) -> usize {
|
|||||||
usize::try_from(hasher.finish()).unwrap_or_default() % safe_dimension
|
usize::try_from(hasher.finish()).unwrap_or_default() % safe_dimension
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backward compatibility function
|
/// Generate an embedding using the given provider.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError::InternalError` if the provider's embed call fails.
|
||||||
pub async fn generate_embedding_with_provider(
|
pub async fn generate_embedding_with_provider(
|
||||||
provider: &EmbeddingProvider,
|
provider: &EmbeddingProvider,
|
||||||
input: &str,
|
input: &str,
|
||||||
@@ -372,6 +388,10 @@ pub async fn generate_embedding(
|
|||||||
///
|
///
|
||||||
/// This is used for the re-embedding process where the model and dimensions
|
/// This is used for the re-embedding process where the model and dimensions
|
||||||
/// are known ahead of time and shouldn't be repeatedly fetched from settings.
|
/// are known ahead of time and shouldn't be repeatedly fetched from settings.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `AppError` if the OpenAI API request fails or returns no embedding data.
|
||||||
pub async fn generate_embedding_with_params(
|
pub async fn generate_embedding_with_params(
|
||||||
client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||||
input: &str,
|
input: &str,
|
||||||
|
|||||||
@@ -6,6 +6,15 @@ pub enum IngestValidationError {
|
|||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Validates ingestion input against configured limits.
|
||||||
|
///
|
||||||
|
/// Checks file count, content size, context size, and category length.
|
||||||
|
///
|
||||||
|
/// # Errors
|
||||||
|
///
|
||||||
|
/// Returns `IngestValidationError::BadRequest` if the file count exceeds the maximum.
|
||||||
|
/// Returns `IngestValidationError::PayloadTooLarge` if content, context, or
|
||||||
|
/// category exceed their configured byte limits.
|
||||||
pub fn validate_ingest_input(
|
pub fn validate_ingest_input(
|
||||||
config: &AppConfig,
|
config: &AppConfig,
|
||||||
content: Option<&str>,
|
content: Option<&str>,
|
||||||
|
|||||||
Reference in New Issue
Block a user