diff --git a/api-router/src/error.rs b/api-router/src/error.rs index 225ae5e..9aa16ad 100644 --- a/api-router/src/error.rs +++ b/api-router/src/error.rs @@ -9,19 +9,19 @@ use thiserror::Error; #[derive(Error, Debug, Serialize, Clone)] pub enum ApiErr { - #[error("Internal server error")] + #[error("internal server error")] InternalError(String), - #[error("Validation error: {0}")] + #[error("validation error: {0}")] ValidationError(String), - #[error("Not found: {0}")] + #[error("not found: {0}")] NotFound(String), - #[error("Unauthorized: {0}")] + #[error("unauthorized: {0}")] Unauthorized(String), - #[error("Payload too large: {0}")] + #[error("payload too large: {0}")] PayloadTooLarge(String), } @@ -157,12 +157,12 @@ mod tests { let error = ApiErr::ValidationError(message.to_string()); // Check that the error itself contains the message - assert_eq!(error.to_string(), format!("Validation error: {message}")); + assert_eq!(error.to_string(), format!("validation error: {message}")); // For not found errors let message = "user not found"; let error = ApiErr::NotFound(message.to_string()); - assert_eq!(error.to_string(), format!("Not found: {message}")); + assert_eq!(error.to_string(), format!("not found: {message}")); } // Alternative approach for internal error test @@ -175,7 +175,7 @@ mod tests { let api_error = ApiErr::InternalError(sensitive_info.to_string()); // Check the error message is correctly set - assert_eq!(api_error.to_string(), "Internal server error"); + assert_eq!(api_error.to_string(), "internal server error"); // Also verify correct status code assert_status_code(api_error, StatusCode::INTERNAL_SERVER_ERROR); diff --git a/common/src/error.rs b/common/src/error.rs index 2a69a6d..22a9499 100644 --- a/common/src/error.rs +++ b/common/src/error.rs @@ -8,34 +8,34 @@ use crate::storage::types::file_info::FileError; #[allow(clippy::module_name_repetitions)] #[derive(Error, Debug)] pub enum AppError { - #[error("Database error: {0}")] + #[error("database error: {0}")] Database(#[from] surrealdb::Error), - #[error("OpenAI error: {0}")] + #[error("openai error: {0}")] OpenAI(#[from] OpenAIError), - #[error("File error: {0}")] + #[error("file error: {0}")] File(#[from] FileError), - #[error("Not found: {0}")] + #[error("not found: {0}")] NotFound(String), - #[error("Validation error: {0}")] + #[error("validation error: {0}")] Validation(String), - #[error("Authorization error: {0}")] + #[error("authorization error: {0}")] Auth(String), - #[error("LLM parsing error: {0}")] + #[error("llm parsing error: {0}")] LLMParsing(String), - #[error("Task join error: {0}")] + #[error("task join error: {0}")] Join(#[from] JoinError), - #[error("Graph mapper error: {0}")] + #[error("graph mapper error: {0}")] GraphMapper(String), - #[error("IO error: {0}")] + #[error("io error: {0}")] Io(#[from] std::io::Error), - #[error("Reqwest error: {0}")] + #[error("reqwest error: {0}")] Reqwest(#[from] reqwest::Error), - #[error("Storage error: {0}")] + #[error("storage error: {0}")] Storage(#[from] object_store::Error), - #[error("Ingestion Processing error: {0}")] + #[error("ingestion processing error: {0}")] Processing(String), - #[error("DOM smoothie error: {0}")] + #[error("dom smoothie error: {0}")] DomSmoothie(#[from] dom_smoothie::ReadabilityError), - #[error("Internal service error: {0}")] + #[error("internal service error: {0}")] InternalError(String), } diff --git a/common/src/storage/db.rs b/common/src/storage/db.rs index 24c9644..2b6ec41 100644 --- a/common/src/storage/db.rs +++ b/common/src/storage/db.rs @@ -26,12 +26,20 @@ pub trait ProvidesDb { } impl SurrealDbClient { - /// # Initialize a new datbase client + /// Initialize a new database client. /// /// # Arguments /// - /// # Returns - /// * `SurrealDbClient` initialized + /// * `address` — Database connection string (e.g. `ws://localhost:8000` or `mem://`). + /// * `username` — Root username for authentication. + /// * `password` — Root password for authentication. + /// * `namespace` — SurrealDB namespace to use. + /// * `database` — SurrealDB database to use. + /// + /// # Errors + /// + /// Returns `Err` if the connection, authentication, or namespace/database selection fails. + /// In-memory (`mem://`) connections skip authentication. pub async fn new( address: &str, username: &str, @@ -52,6 +60,19 @@ impl SurrealDbClient { Ok(SurrealDbClient { client: db }) } + /// Initialize a new database client using namespace-level authentication. + /// + /// # Arguments + /// + /// * `address` — Database connection string. + /// * `namespace` — SurrealDB namespace to use (also used for auth). + /// * `username` — Namespace username for authentication. + /// * `password` — Namespace password for authentication. + /// * `database` — SurrealDB database to use. + /// + /// # Errors + /// + /// Returns `Err` if the connection, namespace authentication, or namespace/database selection fails. pub async fn new_with_namespace_user( address: &str, namespace: &str, @@ -70,6 +91,11 @@ impl SurrealDbClient { Ok(SurrealDbClient { client: db }) } + /// Create an Axum session store backed by SurrealDB. + /// + /// # Errors + /// + /// Returns `SessionError` if the session store configuration or table creation fails. pub async fn create_session_store( &self, ) -> Result>, SessionError> { @@ -88,6 +114,10 @@ impl SurrealDbClient { /// This function should be called during application startup, after connecting to /// the database and selecting the appropriate namespace and database, but before /// the application starts performing operations that rely on the schema. + /// + /// # Errors + /// + /// Returns `AppError::InternalError` if the migration runner fails to apply any migration. pub async fn apply_migrations(&self) -> Result<(), AppError> { debug!("Applying migrations"); MigrationRunner::new(&self.client) @@ -99,13 +129,15 @@ impl SurrealDbClient { Ok(()) } - /// Operation to store a object in SurrealDB, requires the struct to implement StoredObject + /// Store an object in SurrealDB. /// /// # Arguments - /// * `item` - The item to be stored /// - /// # Returns - /// * `Result` - Item or Error + /// * `item` — The item to store. Must implement `StoredObject`. + /// + /// # Errors + /// + /// Returns `Err` if the database create operation fails. pub async fn store_item(&self, item: T) -> Result, Error> where T: StoredObject + Send + Sync + 'static, @@ -116,8 +148,13 @@ impl SurrealDbClient { .await } - /// Operation to upsert an object in SurrealDB, replacing any existing record - /// with the same ID. Useful for idempotent ingestion flows. + /// Upsert an object in SurrealDB, replacing any existing record with the same ID. + /// + /// Useful for idempotent ingestion flows. + /// + /// # Errors + /// + /// Returns `Err` if the database upsert operation fails. pub async fn upsert_item(&self, item: T) -> Result, Error> where T: StoredObject + Send + Sync + 'static, @@ -129,10 +166,11 @@ impl SurrealDbClient { .await } - /// Operation to retrieve all objects from a certain table, requires the struct to implement StoredObject + /// Retrieve all objects from a table. /// - /// # Returns - /// * `Result` - Vec or Error + /// # Errors + /// + /// Returns `Err` if the database select operation fails. pub async fn get_all_stored_items(&self) -> Result, Error> where T: for<'de> StoredObject, @@ -140,13 +178,16 @@ impl SurrealDbClient { self.client.select(T::table_name()).await } - /// Operation to retrieve a single object by its ID, requires the struct to implement StoredObject + /// Retrieve a single object by its ID. /// /// # Arguments - /// * `id` - The ID of the item to retrieve /// - /// # Returns - /// * `Result, Error>` - The found item or Error + /// * `id` — The ID of the item to retrieve. + /// + /// # Errors + /// + /// Returns `Err` if the database select operation fails. + /// Returns `Ok(None)` if no record with the given ID exists. pub async fn get_item(&self, id: &str) -> Result, Error> where T: for<'de> StoredObject, @@ -154,13 +195,16 @@ impl SurrealDbClient { self.client.select((T::table_name(), id)).await } - /// Operation to delete a single object by its ID, requires the struct to implement StoredObject + /// Delete a single object by its ID. /// /// # Arguments - /// * `id` - The ID of the item to delete /// - /// # Returns - /// * `Result, Error>` - The deleted item or Error + /// * `id` — The ID of the item to delete. + /// + /// # Errors + /// + /// Returns `Err` if the database delete operation fails. + /// Returns `Ok(None)` if no record with the given ID exists. pub async fn delete_item(&self, id: &str) -> Result, Error> where T: for<'de> StoredObject, @@ -168,10 +212,11 @@ impl SurrealDbClient { self.client.delete((T::table_name(), id)).await } - /// Operation to listen to a table for updates, requires the struct to implement StoredObject + /// Listen to a table for real-time updates via a live query stream. /// - /// # Returns - /// * `Result, Error>` - The deleted item or Error + /// # Errors + /// + /// Returns `Err` if the database live query subscription fails. pub async fn listen( &self, ) -> Result, Error>>, Error> diff --git a/common/src/storage/indexes.rs b/common/src/storage/indexes.rs index 4c0e77d..5de4c96 100644 --- a/common/src/storage/indexes.rs +++ b/common/src/storage/indexes.rs @@ -159,6 +159,10 @@ impl FtsIndexSpec { /// Build runtime Surreal indexes (FTS + HNSW) using concurrent creation with readiness polling. /// Idempotent: safe to call multiple times and will overwrite HNSW definitions when the dimension changes. +/// +/// # Errors +/// +/// Returns `AppError::InternalError` if any index definition or polling step fails. pub async fn ensure_runtime( db: &SurrealDbClient, embedding_dimension: usize, @@ -169,6 +173,10 @@ pub async fn ensure_runtime( } /// Rebuild known FTS and HNSW indexes, skipping any that are not yet defined. +/// +/// # Errors +/// +/// Returns `AppError::InternalError` if any index rebuild operation fails. pub async fn rebuild(db: &SurrealDbClient) -> Result<(), AppError> { rebuild_inner(db) .await diff --git a/common/src/storage/store.rs b/common/src/storage/store.rs index 9a50204..759a551 100644 --- a/common/src/storage/store.rs +++ b/common/src/storage/store.rs @@ -31,6 +31,11 @@ impl StorageManager { /// /// This method validates the configuration and creates the appropriate /// storage backend with proper initialization. + /// + /// # Errors + /// + /// Returns `Err` if the storage backend cannot be created or initialised + /// (e.g. missing S3 bucket, local filesystem permission error). pub async fn new(cfg: &AppConfig) -> object_store::Result { let backend_kind = cfg.storage; let (store, local_base) = create_storage_backend(cfg).await?; @@ -90,6 +95,10 @@ impl StorageManager { /// /// This operation persists data using the underlying storage backend. /// For memory backends, data persists for the lifetime of the StorageManager. + /// + /// # Errors + /// + /// Returns `Err` if the underlying storage backend fails to persist the data. pub async fn put(&self, location: &str, data: Bytes) -> object_store::Result<()> { let path = ObjPath::from(location); let payload = object_store::PutPayload::from_bytes(data); @@ -99,6 +108,10 @@ impl StorageManager { /// Retrieve bytes from the specified location. /// /// Returns the full contents buffered in memory. + /// + /// # Errors + /// + /// Returns `Err` if the location does not exist or the underlying backend fails. pub async fn get(&self, location: &str) -> object_store::Result { let path = ObjPath::from(location); let result = self.store.get(&path).await?; @@ -108,6 +121,10 @@ impl StorageManager { /// Get a streaming handle for large objects. /// /// Returns a fallible stream of Bytes chunks suitable for large file processing. + /// + /// # Errors + /// + /// Returns `Err` if the location does not exist or the underlying backend fails. pub async fn get_stream( &self, location: &str, @@ -120,6 +137,10 @@ impl StorageManager { /// Delete all objects below the specified prefix. /// /// For local filesystem backends, this also attempts to clean up empty directories. + /// + /// # Errors + /// + /// Returns `Err` if the underlying backend fails during deletion. pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> { let prefix_path = ObjPath::from(prefix); let locations = self @@ -141,6 +162,10 @@ impl StorageManager { } /// List all objects below the specified prefix. + /// + /// # Errors + /// + /// Returns `Err` if the underlying backend fails to list objects. pub async fn list( &self, prefix: Option<&str>, @@ -150,6 +175,10 @@ impl StorageManager { } /// Check if an object exists at the specified location. + /// + /// # Errors + /// + /// Returns `Err` if the underlying backend returns a non-NotFound error. pub async fn exists(&self, location: &str) -> object_store::Result { let path = ObjPath::from(location); self.store diff --git a/common/src/storage/types/file_info.rs b/common/src/storage/types/file_info.rs index 508555e..041f63d 100644 --- a/common/src/storage/types/file_info.rs +++ b/common/src/storage/types/file_info.rs @@ -21,25 +21,25 @@ use crate::{ #[derive(Error, Debug)] pub enum FileError { - #[error("File not found for UUID: {0}")] + #[error("file not found for uuid: {0}")] FileNotFound(String), - #[error("IO error occurred: {0}")] + #[error("io error occurred: {0}")] Io(#[from] std::io::Error), - #[error("Duplicate file detected with SHA256: {0}")] + #[error("duplicate file detected with sha256: {0}")] DuplicateFile(String), - #[error("SurrealDB error: {0}")] + #[error("surrealdb error: {0}")] SurrealError(#[from] surrealdb::Error), - #[error("Failed to persist file: {0}")] + #[error("failed to persist file: {0}")] PersistError(#[from] tempfile::PersistError), - #[error("File name missing in metadata")] + #[error("file name missing in metadata")] MissingFileName, - #[error("Object store error: {0}")] + #[error("object store error: {0}")] ObjectStore(#[from] ObjectStoreError), } diff --git a/common/src/storage/types/ingestion_task.rs b/common/src/storage/types/ingestion_task.rs index d332a9f..ad3f48a 100644 --- a/common/src/storage/types/ingestion_task.rs +++ b/common/src/storage/types/ingestion_task.rs @@ -207,6 +207,11 @@ impl IngestionTask { Duration::from_secs(u64::try_from(self.lease_duration_secs.max(0)).unwrap_or(0)) } + /// Create a new task and immediately persist it to the database. + /// + /// # Errors + /// + /// Returns `AppError::Database` if the store operation fails. pub async fn create_and_add_to_db( content: IngestionPayload, user_id: String, @@ -217,6 +222,14 @@ impl IngestionTask { Ok(task) } + /// Claim the next ready task for processing. + /// + /// Atomically reserves a task by transitioning it from a candidate state to `Reserved`. + /// Returns `Ok(None)` if no task is ready to claim. + /// + /// # Errors + /// + /// Returns `AppError::Database` if the update query fails. pub async fn claim_next_ready( db: &SurrealDbClient, worker_id: &str, @@ -291,6 +304,12 @@ impl IngestionTask { Ok(task) } + /// Transition this task from `Reserved` to `Processing`. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is not in `Reserved` state + /// or belongs to a different worker. Returns `AppError::Database` on DB failure. pub async fn mark_processing(&self, db: &SurrealDbClient) -> Result { const START_PROCESSING_QUERY: &str = r#" UPDATE type::thing($table, $id) @@ -317,6 +336,12 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::StartProcessing)) } + /// Transition this task from `Processing` to `Succeeded`. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is not in `Processing` state + /// or belongs to a different worker. Returns `AppError::Database` on DB failure. pub async fn mark_succeeded(&self, db: &SurrealDbClient) -> Result { const COMPLETE_QUERY: &str = r#" UPDATE type::thing($table, $id) @@ -348,6 +373,14 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Succeed)) } + /// Transition this task from `Processing` to `Failed`. + /// + /// The task will be rescheduled for retry after `retry_delay`. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is not in `Processing` state + /// or belongs to a different worker. Returns `AppError::Database` on DB failure. pub async fn mark_failed( &self, error: TaskErrorInfo, @@ -394,6 +427,12 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Fail)) } + /// Transition this task from `Failed` to `DeadLetter`. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is not in `Failed` state. + /// Returns `AppError::Database` on DB failure. pub async fn mark_dead_letter( &self, error: TaskErrorInfo, @@ -430,6 +469,12 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::DeadLetter)) } + /// Transition this task to `Cancelled` from any non-terminal state. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is in a terminal state. + /// Returns `AppError::Database` on DB failure. pub async fn mark_cancelled(&self, db: &SurrealDbClient) -> Result { const CANCEL_QUERY: &str = r#" UPDATE type::thing($table, $id) @@ -463,6 +508,12 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Cancel)) } + /// Release a reserved task back to `Pending` state. + /// + /// # Errors + /// + /// Returns `AppError::Validation` if the task is not in `Reserved` state. + /// Returns `AppError::Database` on DB failure. pub async fn release(&self, db: &SurrealDbClient) -> Result { const RELEASE_QUERY: &str = r#" UPDATE type::thing($table, $id) @@ -489,6 +540,11 @@ impl IngestionTask { updated.ok_or_else(|| invalid_transition(self.state, TaskTransition::Release)) } + /// Retrieve all non-terminal tasks across active states. + /// + /// # Errors + /// + /// Returns `AppError::Database` if the query fails. pub async fn get_unfinished_tasks( db: &SurrealDbClient, ) -> Result, AppError> { diff --git a/common/src/utils/embedding.rs b/common/src/utils/embedding.rs index c097925..6a3dac5 100644 --- a/common/src/utils/embedding.rs +++ b/common/src/utils/embedding.rs @@ -106,6 +106,12 @@ impl EmbeddingProvider { } } + /// Generate an embedding vector for the given text. + /// + /// # Errors + /// + /// Returns `Err` if the backend API call fails, FastEmbed initialisation fails, + /// or the backend returns no embedding data. pub async fn embed(&self, text: &str) -> Result> { match &self.inner { EmbeddingInner::Hashed { dimension } => Ok(hashed_embedding(text, *dimension)), @@ -144,6 +150,12 @@ impl EmbeddingProvider { } } + /// Generate embedding vectors for a batch of texts. + /// + /// # Errors + /// + /// Returns `Err` if the backend API call fails or returns no embedding data. + /// Returns an empty `Vec` when `texts` is empty. pub async fn embed_batch(&self, texts: Vec) -> Result>> { match &self.inner { EmbeddingInner::Hashed { dimension } => Ok(texts @@ -309,7 +321,11 @@ fn bucket(token: &str, dimension: usize) -> usize { usize::try_from(hasher.finish()).unwrap_or_default() % safe_dimension } -// Backward compatibility function +/// Generate an embedding using the given provider. +/// +/// # Errors +/// +/// Returns `AppError::InternalError` if the provider's embed call fails. pub async fn generate_embedding_with_provider( provider: &EmbeddingProvider, input: &str, @@ -372,6 +388,10 @@ pub async fn generate_embedding( /// /// This is used for the re-embedding process where the model and dimensions /// are known ahead of time and shouldn't be repeatedly fetched from settings. +/// +/// # Errors +/// +/// Returns `AppError` if the OpenAI API request fails or returns no embedding data. pub async fn generate_embedding_with_params( client: &async_openai::Client, input: &str, diff --git a/common/src/utils/ingest_limits.rs b/common/src/utils/ingest_limits.rs index c9b1475..5d54568 100644 --- a/common/src/utils/ingest_limits.rs +++ b/common/src/utils/ingest_limits.rs @@ -6,6 +6,15 @@ pub enum IngestValidationError { BadRequest(String), } +/// Validates ingestion input against configured limits. +/// +/// Checks file count, content size, context size, and category length. +/// +/// # Errors +/// +/// Returns `IngestValidationError::BadRequest` if the file count exceeds the maximum. +/// Returns `IngestValidationError::PayloadTooLarge` if content, context, or +/// category exceed their configured byte limits. pub fn validate_ingest_input( config: &AppConfig, content: Option<&str>,