diff --git a/api-router/src/api_state.rs b/api-router/src/api_state.rs index 4f271d7..5c1735d 100644 --- a/api-router/src/api_state.rs +++ b/api-router/src/api_state.rs @@ -1,15 +1,22 @@ use std::sync::Arc; -use common::{storage::db::SurrealDbClient, utils::config::AppConfig}; +use common::{ + storage::{db::SurrealDbClient, store::StorageManager}, + utils::config::AppConfig, +}; #[derive(Clone)] pub struct ApiState { pub db: Arc, pub config: AppConfig, + pub storage: StorageManager, } impl ApiState { - pub async fn new(config: &AppConfig) -> Result> { + pub async fn new( + config: &AppConfig, + storage: StorageManager, + ) -> Result> { let surreal_db_client = Arc::new( SurrealDbClient::new( &config.surrealdb_address, @@ -26,6 +33,7 @@ impl ApiState { let app_state = Self { db: surreal_db_client.clone(), config: config.clone(), + storage, }; Ok(app_state) diff --git a/api-router/src/routes/ingress.rs b/api-router/src/routes/ingress.rs index 4629da3..7d1548e 100644 --- a/api-router/src/routes/ingress.rs +++ b/api-router/src/routes/ingress.rs @@ -32,7 +32,8 @@ pub async fn ingest_data( info!("Received input: {:?}", input); let file_infos = try_join_all(input.files.into_iter().map(|file| { - FileInfo::new(file, &state.db, &user.id, &state.config).map_err(AppError::from) + FileInfo::new_with_storage(file, &state.db, &user.id, &state.storage) + .map_err(AppError::from) })) .await?; diff --git a/common/src/storage/store.rs b/common/src/storage/store.rs index 8be0417..9038ecb 100644 --- a/common/src/storage/store.rs +++ b/common/src/storage/store.rs @@ -1,4 +1,5 @@ -use std::path::{Path, PathBuf}; +use std::io::ErrorKind; +use std::path::{Component, Path, PathBuf}; use std::sync::Arc; use anyhow::{anyhow, Result as AnyResult}; @@ -13,44 +14,417 @@ use crate::utils::config::{AppConfig, StorageKind}; pub type DynStore = Arc; -/// Build an object store instance anchored at the given filesystem `prefix`. +/// Storage manager with persistent state and proper lifecycle management. +#[derive(Clone)] +pub struct StorageManager { + store: DynStore, + backend_kind: StorageKind, + local_base: Option, +} + +impl StorageManager { + /// Create a new StorageManager with the specified configuration. + /// + /// This method validates the configuration and creates the appropriate + /// storage backend with proper initialization. + pub async fn new(cfg: &AppConfig) -> object_store::Result { + let backend_kind = cfg.storage.clone(); + let (store, local_base) = create_storage_backend(cfg).await?; + + Ok(Self { + store, + backend_kind, + local_base, + }) + } + + /// Create a StorageManager with a custom storage backend. + /// + /// This method is useful for testing scenarios where you want to inject + /// a specific storage backend. + pub fn with_backend(store: DynStore, backend_kind: StorageKind) -> Self { + Self { + store, + backend_kind, + local_base: None, + } + } + + /// Get the storage backend kind. + pub fn backend_kind(&self) -> &StorageKind { + &self.backend_kind + } + + /// Access the resolved local base directory when using the local backend. + pub fn local_base_path(&self) -> Option<&Path> { + self.local_base.as_deref() + } + + /// Resolve an object location to a filesystem path when using the local backend. + /// + /// Returns `None` when the backend is not local or when the provided location includes + /// unsupported components (absolute paths or parent traversals). + pub fn resolve_local_path(&self, location: &str) -> Option { + let base = self.local_base_path()?; + let relative = Path::new(location); + if relative.is_absolute() + || relative + .components() + .any(|component| matches!(component, Component::ParentDir | Component::Prefix(_))) + { + return None; + } + + Some(base.join(relative)) + } + + /// Store bytes at the specified location. + /// + /// This operation persists data using the underlying storage backend. + /// For memory backends, data persists for the lifetime of the StorageManager. + pub async fn put(&self, location: &str, data: Bytes) -> object_store::Result<()> { + let path = ObjPath::from(location); + let payload = object_store::PutPayload::from_bytes(data); + self.store.put(&path, payload).await.map(|_| ()) + } + + /// Retrieve bytes from the specified location. + /// + /// Returns the full contents buffered in memory. + pub async fn get(&self, location: &str) -> object_store::Result { + let path = ObjPath::from(location); + let result = self.store.get(&path).await?; + result.bytes().await + } + + /// Get a streaming handle for large objects. + /// + /// Returns a fallible stream of Bytes chunks suitable for large file processing. + pub async fn get_stream( + &self, + location: &str, + ) -> object_store::Result>> { + let path = ObjPath::from(location); + let result = self.store.get(&path).await?; + Ok(result.into_stream()) + } + + /// Delete all objects below the specified prefix. + /// + /// For local filesystem backends, this also attempts to clean up empty directories. + pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> { + let prefix_path = ObjPath::from(prefix); + let locations = self + .store + .list(Some(&prefix_path)) + .map_ok(|m| m.location) + .boxed(); + self.store + .delete_stream(locations) + .try_collect::>() + .await?; + + // Cleanup filesystem directories only for local backend + if matches!(self.backend_kind, StorageKind::Local) { + self.cleanup_filesystem_directories(prefix).await?; + } + + Ok(()) + } + + /// List all objects below the specified prefix. + pub async fn list( + &self, + prefix: Option<&str>, + ) -> object_store::Result> { + let prefix_path = prefix.map(ObjPath::from); + self.store.list(prefix_path.as_ref()).try_collect().await + } + + /// Check if an object exists at the specified location. + pub async fn exists(&self, location: &str) -> object_store::Result { + let path = ObjPath::from(location); + self.store + .head(&path) + .await + .map(|_| true) + .or_else(|e| match e { + object_store::Error::NotFound { .. } => Ok(false), + _ => Err(e), + }) + } + + /// Cleanup filesystem directories for local backend. + /// + /// This is a best-effort cleanup and ignores errors. + async fn cleanup_filesystem_directories(&self, prefix: &str) -> object_store::Result<()> { + if !matches!(self.backend_kind, StorageKind::Local) { + return Ok(()); + } + + let Some(base) = &self.local_base else { + return Ok(()); + }; + + let relative = Path::new(prefix); + if relative.is_absolute() + || relative + .components() + .any(|component| matches!(component, Component::ParentDir | Component::Prefix(_))) + { + tracing::warn!( + prefix = %prefix, + "Skipping directory cleanup for unsupported prefix components" + ); + return Ok(()); + } + + let mut current = base.join(relative); + + while current.starts_with(base) && current.as_path() != base.as_path() { + match tokio::fs::remove_dir(¤t).await { + Ok(_) => {} + Err(err) => match err.kind() { + ErrorKind::NotFound => {} + ErrorKind::DirectoryNotEmpty => break, + _ => tracing::debug!( + error = %err, + path = %current.display(), + "Failed to remove directory during cleanup" + ), + }, + } + + if let Some(parent) = current.parent() { + current = parent.to_path_buf(); + } else { + break; + } + } + + Ok(()) + } +} + +/// Create a storage backend based on configuration. /// -/// - For the `Local` backend, `prefix` is the absolute directory on disk that -/// serves as the root for all object paths passed to the store. -/// - For the `Memory` backend, `prefix` is ignored as storage is purely in-memory. -/// - `prefix` must already exist for Local storage; this function will create it if missing. -/// -/// Example (Local): -/// - prefix: `/var/data` -/// - object location: `user/uuid/file.txt` -/// - absolute path: `/var/data/user/uuid/file.txt` -/// -/// Example (Memory): -/// - prefix: ignored (any value works) -/// - object location: `user/uuid/file.txt` -/// - stored in memory for the duration of the process -pub async fn build_store(prefix: &Path, cfg: &AppConfig) -> object_store::Result { +/// This factory function handles the creation and initialization of different +/// storage backends with proper error handling and validation. +async fn create_storage_backend( + cfg: &AppConfig, +) -> object_store::Result<(DynStore, Option)> { match cfg.storage { StorageKind::Local => { - if !prefix.exists() { - tokio::fs::create_dir_all(prefix).await.map_err(|e| { + let base = resolve_base_dir(cfg); + if !base.exists() { + tokio::fs::create_dir_all(&base).await.map_err(|e| { object_store::Error::Generic { store: "LocalFileSystem", source: e.into(), } })?; } - let store = LocalFileSystem::new_with_prefix(prefix)?; - Ok(Arc::new(store)) + let store = LocalFileSystem::new_with_prefix(base.clone())?; + Ok((Arc::new(store), Some(base))) } StorageKind::Memory => { - // For memory storage, we ignore the prefix as it's purely in-memory let store = InMemory::new(); - Ok(Arc::new(store)) + Ok((Arc::new(store), None)) } } } +/// Testing utilities for storage operations. +/// +/// This module provides specialized utilities for testing scenarios with +/// automatic memory backend setup and proper test isolation. +#[cfg(test)] +pub mod testing { + use super::*; + use crate::utils::config::{AppConfig, PdfIngestMode}; + use uuid; + + /// Create a test configuration with memory storage. + /// + /// This provides a ready-to-use configuration for testing scenarios + /// that don't require filesystem persistence. + pub fn test_config_memory() -> AppConfig { + AppConfig { + openai_api_key: "test".into(), + surrealdb_address: "test".into(), + surrealdb_username: "test".into(), + surrealdb_password: "test".into(), + surrealdb_namespace: "test".into(), + surrealdb_database: "test".into(), + data_dir: "/tmp/unused".into(), // Ignored for memory storage + http_port: 0, + openai_base_url: "..".into(), + storage: StorageKind::Memory, + pdf_ingest_mode: PdfIngestMode::LlmFirst, + ..Default::default() + } + } + + /// Create a test configuration with local storage. + /// + /// This provides a ready-to-use configuration for testing scenarios + /// that require actual filesystem operations. + pub fn test_config_local() -> AppConfig { + let base = format!("/tmp/minne_test_storage_{}", uuid::Uuid::new_v4()); + AppConfig { + openai_api_key: "test".into(), + surrealdb_address: "test".into(), + surrealdb_username: "test".into(), + surrealdb_password: "test".into(), + surrealdb_namespace: "test".into(), + surrealdb_database: "test".into(), + data_dir: base.into(), + http_port: 0, + openai_base_url: "..".into(), + storage: StorageKind::Local, + pdf_ingest_mode: PdfIngestMode::LlmFirst, + ..Default::default() + } + } + + /// A specialized StorageManager for testing scenarios. + /// + /// This provides automatic setup for memory storage with proper isolation + /// and cleanup capabilities for test environments. + #[derive(Clone)] + pub struct TestStorageManager { + storage: StorageManager, + _temp_dir: Option<(String, std::path::PathBuf)>, // For local storage cleanup + } + + impl TestStorageManager { + /// Create a new TestStorageManager with memory backend. + /// + /// This is the preferred method for unit tests as it provides + /// fast execution and complete isolation. + pub async fn new_memory() -> object_store::Result { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg).await?; + + Ok(Self { + storage, + _temp_dir: None, + }) + } + + /// Create a new TestStorageManager with local filesystem backend. + /// + /// This method creates a temporary directory that will be automatically + /// cleaned up when the TestStorageManager is dropped. + pub async fn new_local() -> object_store::Result { + let cfg = test_config_local(); + let storage = StorageManager::new(&cfg).await?; + let resolved = storage + .local_base_path() + .map(|path| (cfg.data_dir.clone(), path.to_path_buf())); + + Ok(Self { + storage, + _temp_dir: resolved, + }) + } + + /// Create a TestStorageManager with custom configuration. + pub async fn with_config(cfg: &AppConfig) -> object_store::Result { + let storage = StorageManager::new(cfg).await?; + let temp_dir = if matches!(cfg.storage, StorageKind::Local) { + storage + .local_base_path() + .map(|path| (cfg.data_dir.clone(), path.to_path_buf())) + } else { + None + }; + + Ok(Self { + storage, + _temp_dir: temp_dir, + }) + } + + /// Get a reference to the underlying StorageManager. + pub fn storage(&self) -> &StorageManager { + &self.storage + } + + /// Clone the underlying StorageManager. + pub fn clone_storage(&self) -> StorageManager { + self.storage.clone() + } + + /// Store test data at the specified location. + pub async fn put(&self, location: &str, data: &[u8]) -> object_store::Result<()> { + self.storage.put(location, Bytes::from(data.to_vec())).await + } + + /// Retrieve test data from the specified location. + pub async fn get(&self, location: &str) -> object_store::Result { + self.storage.get(location).await + } + + /// Delete test data below the specified prefix. + pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> { + self.storage.delete_prefix(prefix).await + } + + /// Check if test data exists at the specified location. + pub async fn exists(&self, location: &str) -> object_store::Result { + self.storage.exists(location).await + } + + /// List all test objects below the specified prefix. + pub async fn list( + &self, + prefix: Option<&str>, + ) -> object_store::Result> { + self.storage.list(prefix).await + } + } + + impl Drop for TestStorageManager { + fn drop(&mut self) { + // Clean up temporary directories for local storage + if let Some((_, path)) = &self._temp_dir { + if path.exists() { + let _ = std::fs::remove_dir_all(path); + } + } + } + } + + /// Convenience macro for creating memory storage tests. + /// + /// This macro simplifies the creation of test storage with memory backend. + #[macro_export] + macro_rules! test_storage_memory { + () => {{ + async move { + $crate::storage::store::testing::TestStorageManager::new_memory() + .await + .expect("Failed to create test memory storage") + } + }}; + } + + /// Convenience macro for creating local storage tests. + /// + /// This macro simplifies the creation of test storage with local filesystem backend. + #[macro_export] + macro_rules! test_storage_local { + () => {{ + async move { + $crate::storage::store::testing::TestStorageManager::new_local() + .await + .expect("Failed to create test local storage") + } + }}; + } +} + /// Resolve the absolute base directory used for local storage from config. /// /// If `data_dir` is relative, it is resolved against the current working directory. @@ -64,137 +438,6 @@ pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf { } } -/// Build an object store rooted at the configured data directory. -/// -/// This is the recommended way to obtain a store for logical object operations -/// such as `put_bytes_at`, `get_bytes_at`, and `delete_prefix_at`. -/// -/// For `StorageKind::Local`, this creates a filesystem-backed store. -/// For `StorageKind::Memory`, this creates an in-memory store useful for testing. -pub async fn build_store_root(cfg: &AppConfig) -> object_store::Result { - let base = resolve_base_dir(cfg); - build_store(&base, cfg).await -} - -/// Write bytes to `file_name` within a filesystem `prefix` using the configured store. -/// -/// Prefer [`put_bytes_at`] for location-based writes that do not need to compute -/// a separate filesystem prefix. -pub async fn put_bytes( - prefix: &Path, - file_name: &str, - data: Bytes, - cfg: &AppConfig, -) -> object_store::Result<()> { - let store = build_store(prefix, cfg).await?; - let payload = object_store::PutPayload::from_bytes(data); - store.put(&ObjPath::from(file_name), payload).await?; - Ok(()) -} - -/// Write bytes to the provided logical object `location`, e.g. `"user/uuid/file"`. -/// -/// The store root is taken from `AppConfig::data_dir` for the local backend. -/// For memory storage, data is stored in memory for the duration of the process. -/// This performs an atomic write as guaranteed by `object_store`. -/// -/// **Note**: Each call creates a new store instance. For memory storage, -/// this means data is not persisted across different function calls. -/// Use `build_store_root()` directly when you need to persist data across operations. -pub async fn put_bytes_at( - location: &str, - data: Bytes, - cfg: &AppConfig, -) -> object_store::Result<()> { - let store = build_store_root(cfg).await?; - let payload = object_store::PutPayload::from_bytes(data); - store.put(&ObjPath::from(location), payload).await?; - Ok(()) -} - -/// Read bytes from `file_name` within a filesystem `prefix` using the configured store. -/// -/// Prefer [`get_bytes_at`] for location-based reads. -pub async fn get_bytes( - prefix: &Path, - file_name: &str, - cfg: &AppConfig, -) -> object_store::Result { - let store = build_store(prefix, cfg).await?; - let r = store.get(&ObjPath::from(file_name)).await?; - let b = r.bytes().await?; - Ok(b) -} - -/// Read bytes from the provided logical object `location`. -/// -/// Returns the full contents buffered in memory. -/// -/// **Note**: Each call creates a new store instance. For memory storage, -/// this means you can only retrieve data that was written using the same -/// store instance. Use `build_store_root()` directly when you need to -/// persist data across operations. -pub async fn get_bytes_at(location: &str, cfg: &AppConfig) -> object_store::Result { - let store = build_store_root(cfg).await?; - let r = store.get(&ObjPath::from(location)).await?; - r.bytes().await -} - -/// Get a streaming body for the provided logical object `location`. -/// -/// Returns a fallible `BoxStream` of `Bytes`, suitable for use with -/// `axum::body::Body::from_stream` to stream responses without buffering. -pub async fn get_stream_at( - location: &str, - cfg: &AppConfig, -) -> object_store::Result>> { - let store = build_store_root(cfg).await?; - let r = store.get(&ObjPath::from(location)).await?; - Ok(r.into_stream()) -} - -/// Delete all objects below the provided filesystem `prefix`. -/// -/// This is a low-level variant for when a dedicated on-disk prefix is used for a -/// particular object grouping. Prefer [`delete_prefix_at`] for location-based stores. -pub async fn delete_prefix(prefix: &Path, cfg: &AppConfig) -> object_store::Result<()> { - let store = build_store(prefix, cfg).await?; - // list everything and delete - let locations = store.list(None).map_ok(|m| m.location).boxed(); - store - .delete_stream(locations) - .try_collect::>() - .await?; - // Best effort remove the directory itself - if tokio::fs::try_exists(prefix).await.unwrap_or(false) { - let _ = tokio::fs::remove_dir_all(prefix).await; - } - Ok(()) -} - -/// Delete all objects below the provided logical object `prefix`, e.g. `"user/uuid/"`. -/// -/// After deleting, attempts a best-effort cleanup of the now-empty directory on disk -/// when using the local backend. -pub async fn delete_prefix_at(prefix: &str, cfg: &AppConfig) -> object_store::Result<()> { - let store = build_store_root(cfg).await?; - let prefix_path = ObjPath::from(prefix); - let locations = store - .list(Some(&prefix_path)) - .map_ok(|m| m.location) - .boxed(); - store - .delete_stream(locations) - .try_collect::>() - .await?; - // Best effort remove empty directory on disk for local storage - let base_dir = resolve_base_dir(cfg).join(prefix); - if tokio::fs::try_exists(&base_dir).await.unwrap_or(false) { - let _ = tokio::fs::remove_dir_all(&base_dir).await; - } - Ok(()) -} - /// Split an absolute filesystem path into `(parent_dir, file_name)`. pub fn split_abs_path(path: &str) -> AnyResult<(PathBuf, String)> { let pb = PathBuf::from(path); @@ -223,7 +466,6 @@ mod tests { use super::*; use crate::utils::config::{PdfIngestMode::LlmFirst, StorageKind}; use bytes::Bytes; - use futures::TryStreamExt; use uuid::Uuid; fn test_config(root: &str) -> AppConfig { @@ -261,215 +503,335 @@ mod tests { } #[tokio::test] - async fn test_build_store_root_creates_base() { - let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); - let cfg = test_config(&base); - let _ = build_store_root(&cfg).await.expect("build store root"); - assert!(tokio::fs::try_exists(&base).await.unwrap_or(false)); - let _ = tokio::fs::remove_dir_all(&base).await; - } + async fn test_storage_manager_memory_basic_operations() { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + assert!(storage.local_base_path().is_none()); - #[tokio::test] - async fn test_put_get_bytes_at_and_delete_prefix_at() { - let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); - let cfg = test_config(&base); + let location = "test/data/file.txt"; + let data = b"test data for storage manager"; - let location_prefix = format!("{}/{}", "user1", Uuid::new_v4()); - let file_name = "file.txt"; - let location = format!("{}/{}", &location_prefix, file_name); - let payload = Bytes::from_static(b"hello world"); - - put_bytes_at(&location, payload.clone(), &cfg) + // Test put and get + storage + .put(location, Bytes::from(data.to_vec())) .await .expect("put"); - let got = get_bytes_at(&location, &cfg).await.expect("get"); - assert_eq!(got.as_ref(), payload.as_ref()); + let retrieved = storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); - // Delete the whole prefix and ensure retrieval fails - delete_prefix_at(&location_prefix, &cfg) + // Test exists + assert!(storage.exists(location).await.expect("exists check")); + + // Test delete + storage.delete_prefix("test/data/").await.expect("delete"); + assert!(!storage + .exists(location) .await - .expect("delete prefix"); - assert!(get_bytes_at(&location, &cfg).await.is_err()); - - let _ = tokio::fs::remove_dir_all(&base).await; + .expect("exists check after delete")); } #[tokio::test] - async fn test_get_stream_at() { - let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); + async fn test_storage_manager_local_basic_operations() { + let base = format!("/tmp/minne_storage_test_{}", Uuid::new_v4()); let cfg = test_config(&base); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + let resolved_base = storage + .local_base_path() + .expect("resolved base dir") + .to_path_buf(); + assert_eq!(resolved_base, PathBuf::from(&base)); - let location = format!("{}/{}/stream.bin", "user2", Uuid::new_v4()); - let content = vec![7u8; 32 * 1024]; // 32KB payload + let location = "test/data/file.txt"; + let data = b"test data for local storage"; - put_bytes_at(&location, Bytes::from(content.clone()), &cfg) + // Test put and get + storage + .put(location, Bytes::from(data.to_vec())) .await .expect("put"); + let retrieved = storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); - let stream = get_stream_at(&location, &cfg).await.expect("stream"); - let combined: Vec = stream - .map_ok(|chunk| chunk.to_vec()) - .try_fold(Vec::new(), |mut acc, mut chunk| async move { - acc.append(&mut chunk); - Ok(acc) - }) + let object_dir = resolved_base.join("test/data"); + tokio::fs::metadata(&object_dir) .await - .expect("collect"); + .expect("object directory exists after write"); - assert_eq!(combined, content); + // Test exists + assert!(storage.exists(location).await.expect("exists check")); - delete_prefix_at(&split_object_path(&location).unwrap().0, &cfg) + // Test delete + storage.delete_prefix("test/data/").await.expect("delete"); + assert!(!storage + .exists(location) .await - .ok(); - - let _ = tokio::fs::remove_dir_all(&base).await; - } - - // Memory storage tests - // - // Example usage for testing with memory storage: - // - // ```rust - // let cfg = AppConfig { - // storage: StorageKind::Memory, - // // ... other fields - // ..Default::default() - // }; - // - // let store = build_store_root(&cfg).await?; - // // Use the store for multiple operations to maintain data persistence - // store.put(&path, data).await?; - // let result = store.get(&path).await?; - // ``` - #[tokio::test] - async fn test_build_store_memory_creates_store() { - let cfg = test_config_memory(); - let _ = build_store_root(&cfg).await.expect("build memory store root"); - // Memory store should be created without any filesystem operations - } - - #[tokio::test] - async fn test_memory_put_get_bytes_at() { - let cfg = test_config_memory(); - - // Create a single store instance to reuse across operations - let store = build_store_root(&cfg).await.expect("build memory store root"); - let location_prefix = format!("{}/{}", "user1", Uuid::new_v4()); - let file_name = "file.txt"; - let location = format!("{}/{}", &location_prefix, file_name); - let payload = Bytes::from_static(b"hello world from memory"); - - // Use the store directly instead of the convenience functions - let obj_path = ObjPath::from(location.as_str()); - store.put(&obj_path, object_store::PutPayload::from_bytes(payload.clone())).await.expect("put to memory"); - let got = store.get(&obj_path).await.expect("get from memory").bytes().await.expect("get bytes"); - assert_eq!(got.as_ref(), payload.as_ref()); - - // Delete the whole prefix and ensure retrieval fails - let prefix_path = ObjPath::from(location_prefix.as_str()); - let locations = store.list(Some(&prefix_path)).map_ok(|m| m.location).boxed(); - store.delete_stream(locations).try_collect::>().await.expect("delete prefix from memory"); - assert!(store.get(&obj_path).await.is_err()); - } - - #[tokio::test] - async fn test_memory_get_stream_at() { - let cfg = test_config_memory(); - - // Create a single store instance to reuse across operations - let store = build_store_root(&cfg).await.expect("build memory store root"); - let location = format!("{}/{}/stream.bin", "user2", Uuid::new_v4()); - let content = vec![42u8; 32 * 1024]; // 32KB payload - - // Use the store directly - let obj_path = ObjPath::from(location.as_str()); - store.put(&obj_path, object_store::PutPayload::from_bytes(Bytes::from(content.clone()))).await.expect("put to memory"); - - let stream = store.get(&obj_path).await.expect("get from memory").into_stream(); - let combined: Vec = stream - .map_ok(|chunk| chunk.to_vec()) - .try_fold(Vec::new(), |mut acc, mut chunk| async move { - acc.append(&mut chunk); - Ok(acc) - }) + .expect("exists check after delete")); + assert!( + tokio::fs::metadata(&object_dir).await.is_err(), + "object directory should be removed" + ); + tokio::fs::metadata(&resolved_base) .await - .expect("collect"); - - assert_eq!(combined, content); - - // Clean up - delete_prefix_at(&split_object_path(&location).unwrap().0, &cfg) - .await - .ok(); - } - - #[tokio::test] - async fn test_memory_store_isolation() { - // Create two different memory stores to test isolation - let cfg1 = test_config_memory(); - let cfg2 = test_config_memory(); - - let store1 = build_store_root(&cfg1).await.expect("build memory store 1"); - let store2 = build_store_root(&cfg2).await.expect("build memory store 2"); - - let location = "test/isolation/file.txt".to_string(); - let payload1 = Bytes::from_static(b"store 1 content"); - let payload2 = Bytes::from_static(b"store 2 content"); - - // Put different data in each store - let obj_path = ObjPath::from(location.as_str()); - store1.put(&obj_path, object_store::PutPayload::from_bytes(payload1.clone())).await.expect("put to store 1"); - store2.put(&obj_path, object_store::PutPayload::from_bytes(payload2.clone())).await.expect("put to store 2"); - - // Verify isolation - each store should only see its own data - let got1 = store1.get(&obj_path).await.expect("get from store 1").bytes().await.expect("get bytes 1"); - let got2 = store2.get(&obj_path).await.expect("get from store 2").bytes().await.expect("get bytes 2"); - - assert_eq!(got1.as_ref(), payload1.as_ref()); - assert_eq!(got2.as_ref(), payload2.as_ref()); - assert_ne!(got1.as_ref(), got2.as_ref()); - } - - #[tokio::test] - async fn test_memory_vs_local_behavior_equivalence() { - // Test that memory and local storage have equivalent behavior - let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); - let local_cfg = test_config(&base); - let memory_cfg = test_config_memory(); - - // Create stores - let local_store = build_store_root(&local_cfg).await.expect("build local store"); - let memory_store = build_store_root(&memory_cfg).await.expect("build memory store"); - - let location = "test/comparison/data.txt".to_string(); - let payload = Bytes::from_static(b"test data for comparison"); - let obj_path = ObjPath::from(location.as_str()); - - // Put data in both stores - local_store.put(&obj_path, object_store::PutPayload::from_bytes(payload.clone())).await.expect("put to local"); - memory_store.put(&obj_path, object_store::PutPayload::from_bytes(payload.clone())).await.expect("put to memory"); - - // Get data from both stores - let local_result = local_store.get(&obj_path).await.expect("get from local").bytes().await.expect("get local bytes"); - let memory_result = memory_store.get(&obj_path).await.expect("get from memory").bytes().await.expect("get memory bytes"); - - // Verify equivalence - assert_eq!(local_result.as_ref(), memory_result.as_ref()); - assert_eq!(local_result.as_ref(), payload.as_ref()); - - // Test listing behavior - let local_prefix = ObjPath::from("test"); - let memory_prefix = ObjPath::from("test"); - - let local_list: Vec = local_store.list(Some(&local_prefix)) - .try_collect().await.expect("list local"); - let memory_list: Vec = memory_store.list(Some(&memory_prefix)) - .try_collect().await.expect("list memory"); - - assert_eq!(local_list.len(), memory_list.len()); - assert_eq!(local_list[0].location, memory_list[0].location); + .expect("base directory remains intact"); // Clean up let _ = tokio::fs::remove_dir_all(&base).await; } + + #[tokio::test] + async fn test_storage_manager_memory_persistence() { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + + let location = "persistence/test.txt"; + let data1 = b"first data"; + let data2 = b"second data"; + + // Put first data + storage + .put(location, Bytes::from(data1.to_vec())) + .await + .expect("put first"); + + // Retrieve and verify first data + let retrieved1 = storage.get(location).await.expect("get first"); + assert_eq!(retrieved1.as_ref(), data1); + + // Overwrite with second data + storage + .put(location, Bytes::from(data2.to_vec())) + .await + .expect("put second"); + + // Retrieve and verify second data + let retrieved2 = storage.get(location).await.expect("get second"); + assert_eq!(retrieved2.as_ref(), data2); + + // Data persists across multiple operations using the same StorageManager + assert_ne!(retrieved1.as_ref(), retrieved2.as_ref()); + } + + #[tokio::test] + async fn test_storage_manager_list_operations() { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + + // Create multiple files + let files = vec![ + ("dir1/file1.txt", b"content1"), + ("dir1/file2.txt", b"content2"), + ("dir2/file3.txt", b"content3"), + ]; + + for (location, data) in &files { + storage + .put(location, Bytes::from(data.to_vec())) + .await + .expect("put"); + } + + // Test listing without prefix + let all_files = storage.list(None).await.expect("list all"); + assert_eq!(all_files.len(), 3); + + // Test listing with prefix + let dir1_files = storage.list(Some("dir1/")).await.expect("list dir1"); + assert_eq!(dir1_files.len(), 2); + assert!(dir1_files + .iter() + .any(|meta| meta.location.as_ref().contains("file1.txt"))); + assert!(dir1_files + .iter() + .any(|meta| meta.location.as_ref().contains("file2.txt"))); + + // Test listing non-existent prefix + let empty_files = storage + .list(Some("nonexistent/")) + .await + .expect("list nonexistent"); + assert_eq!(empty_files.len(), 0); + } + + #[tokio::test] + async fn test_storage_manager_stream_operations() { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + + let location = "stream/test.bin"; + let content = vec![42u8; 1024 * 64]; // 64KB of data + + // Put large data + storage + .put(location, Bytes::from(content.clone())) + .await + .expect("put large data"); + + // Get as stream + let mut stream = storage.get_stream(location).await.expect("get stream"); + let mut collected = Vec::new(); + + while let Some(chunk) = stream.next().await { + let chunk = chunk.expect("stream chunk"); + collected.extend_from_slice(&chunk); + } + + assert_eq!(collected, content); + } + + #[tokio::test] + async fn test_storage_manager_with_custom_backend() { + use object_store::memory::InMemory; + + // Create custom memory backend + let custom_store = InMemory::new(); + let storage = StorageManager::with_backend(Arc::new(custom_store), StorageKind::Memory); + + let location = "custom/test.txt"; + let data = b"custom backend test"; + + // Test operations with custom backend + storage + .put(location, Bytes::from(data.to_vec())) + .await + .expect("put"); + let retrieved = storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); + + assert!(storage.exists(location).await.expect("exists")); + assert_eq!(*storage.backend_kind(), StorageKind::Memory); + } + + #[tokio::test] + async fn test_storage_manager_error_handling() { + let cfg = test_config_memory(); + let storage = StorageManager::new(&cfg) + .await + .expect("create storage manager"); + + // Test getting non-existent file + let result = storage.get("nonexistent.txt").await; + assert!(result.is_err()); + + // Test checking existence of non-existent file + let exists = storage + .exists("nonexistent.txt") + .await + .expect("exists check"); + assert!(!exists); + + // Test listing with invalid location (should not panic) + let _result = storage.get("").await; + // This may or may not error depending on the backend implementation + // The important thing is that it doesn't panic + } + + // TestStorageManager tests + #[tokio::test] + async fn test_test_storage_manager_memory() { + let test_storage = testing::TestStorageManager::new_memory() + .await + .expect("create test storage"); + + let location = "test/storage/file.txt"; + let data = b"test data with TestStorageManager"; + + // Test put and get + test_storage.put(location, data).await.expect("put"); + let retrieved = test_storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); + + // Test existence check + assert!(test_storage.exists(location).await.expect("exists")); + + // Test list + let files = test_storage + .list(Some("test/storage/")) + .await + .expect("list"); + assert_eq!(files.len(), 1); + + // Test delete + test_storage + .delete_prefix("test/storage/") + .await + .expect("delete"); + assert!(!test_storage + .exists(location) + .await + .expect("exists after delete")); + } + + #[tokio::test] + async fn test_test_storage_manager_local() { + let test_storage = testing::TestStorageManager::new_local() + .await + .expect("create test storage"); + + let location = "test/local/file.txt"; + let data = b"test data with local TestStorageManager"; + + // Test put and get + test_storage.put(location, data).await.expect("put"); + let retrieved = test_storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); + + // Test existence check + assert!(test_storage.exists(location).await.expect("exists")); + + // The storage should be automatically cleaned up when test_storage is dropped + } + + #[tokio::test] + async fn test_test_storage_manager_isolation() { + let storage1 = testing::TestStorageManager::new_memory() + .await + .expect("create test storage 1"); + let storage2 = testing::TestStorageManager::new_memory() + .await + .expect("create test storage 2"); + + let location = "isolation/test.txt"; + let data1 = b"storage 1 data"; + let data2 = b"storage 2 data"; + + // Put different data in each storage + storage1.put(location, data1).await.expect("put storage 1"); + storage2.put(location, data2).await.expect("put storage 2"); + + // Verify isolation + let retrieved1 = storage1.get(location).await.expect("get storage 1"); + let retrieved2 = storage2.get(location).await.expect("get storage 2"); + + assert_eq!(retrieved1.as_ref(), data1); + assert_eq!(retrieved2.as_ref(), data2); + assert_ne!(retrieved1.as_ref(), retrieved2.as_ref()); + } + + #[tokio::test] + async fn test_test_storage_manager_config() { + let cfg = testing::test_config_memory(); + let test_storage = testing::TestStorageManager::with_config(&cfg) + .await + .expect("create test storage with config"); + + let location = "config/test.txt"; + let data = b"test data with custom config"; + + test_storage.put(location, data).await.expect("put"); + let retrieved = test_storage.get(location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); + + // Verify it's using memory backend + assert_eq!(*test_storage.storage().backend_kind(), StorageKind::Memory); + } } diff --git a/common/src/storage/types/file_info.rs b/common/src/storage/types/file_info.rs index 08c8075..73371d9 100644 --- a/common/src/storage/types/file_info.rs +++ b/common/src/storage/types/file_info.rs @@ -1,4 +1,5 @@ use axum_typed_multipart::FieldData; +use bytes; use mime_guess::from_path; use object_store::Error as ObjectStoreError; use sha2::{Digest, Sha256}; @@ -13,9 +14,8 @@ use uuid::Uuid; use crate::{ error::AppError, - storage::{db::SurrealDbClient, store}, + storage::{db::SurrealDbClient, store, store::StorageManager}, stored_object, - utils::config::AppConfig, }; #[derive(Error, Debug)] @@ -51,54 +51,6 @@ stored_object!(FileInfo, "file", { }); impl FileInfo { - pub async fn new( - field_data: FieldData, - db_client: &SurrealDbClient, - user_id: &str, - config: &AppConfig, - ) -> Result { - let file = field_data.contents; - let file_name = field_data - .metadata - .file_name - .ok_or(FileError::MissingFileName)?; - - // Calculate SHA256 - let sha256 = Self::get_sha(&file).await?; - - // Early return if file already exists - match Self::get_by_sha(&sha256, db_client).await { - Ok(existing_file) => { - info!("File already exists with SHA256: {}", sha256); - return Ok(existing_file); - } - Err(FileError::FileNotFound(_)) => (), // Expected case for new files - Err(e) => return Err(e), // Propagate unexpected errors - } - - // Generate UUID and prepare paths - let uuid = Uuid::new_v4(); - let sanitized_file_name = Self::sanitize_file_name(&file_name); - - let now = Utc::now(); - // Create new FileInfo instance - let file_info = Self { - id: uuid.to_string(), - created_at: now, - updated_at: now, - file_name, - sha256, - path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config).await?, - mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)), - user_id: user_id.to_string(), - }; - - // Store in database - db_client.store_item(file_info.clone()).await?; - - Ok(file_info) - } - /// Guesses the MIME type based on the file extension. /// /// # Arguments @@ -167,36 +119,6 @@ impl FileInfo { } } - /// Persists the file under the logical location `{user_id}/{uuid}/{file_name}`. - /// - /// # Arguments - /// * `uuid` - The UUID of the file. - /// * `file` - The temporary file to persist. - /// * `file_name` - The sanitized file name. - /// * `user-id` - User id - /// * `config` - Application configuration containing data directory path - /// - /// # Returns - /// * `Result` - The logical object location or an error. - async fn persist_file( - uuid: &Uuid, - file: NamedTempFile, - file_name: &str, - user_id: &str, - config: &AppConfig, - ) -> Result { - // Logical object location relative to the store root - let location = format!("{}/{}/{}", user_id, uuid, file_name); - info!("Persisting to object location: {}", location); - - let bytes = tokio::fs::read(file.path()).await?; - store::put_bytes_at(&location, bytes.into(), config) - .await - .map_err(FileError::from)?; - - Ok(location) - } - /// Retrieves a `FileInfo` by SHA256. /// /// # Arguments @@ -215,41 +137,6 @@ impl FileInfo { .ok_or(FileError::FileNotFound(sha256.to_string())) } - /// Removes FileInfo from database and file from disk - /// - /// # Arguments - /// * `id` - Id of the FileInfo - /// * `db_client` - Reference to SurrealDbClient - /// - /// # Returns - /// `Result<(), FileError>` - pub async fn delete_by_id( - id: &str, - db_client: &SurrealDbClient, - config: &AppConfig, - ) -> Result<(), AppError> { - // Get the FileInfo from the database - let Some(file_info) = db_client.get_item::(id).await? else { - return Ok(()); - }; - - // Remove the object's parent prefix in the object store - let (parent_prefix, _file_name) = store::split_object_path(&file_info.path) - .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; - store::delete_prefix_at(&parent_prefix, config) - .await - .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; - info!( - "Removed object prefix {} and its contents via object_store", - parent_prefix - ); - - // Delete the FileInfo from the database - db_client.delete_item::(id).await?; - - Ok(()) - } - /// Retrieves a `FileInfo` by its ID. /// /// # Arguments @@ -265,34 +152,168 @@ impl FileInfo { Err(e) => Err(FileError::SurrealError(e)), } } + + /// Create a new FileInfo using StorageManager for persistent storage operations. + /// + /// # Arguments + /// * `field_data` - The uploaded file data + /// * `db_client` - Reference to the SurrealDbClient + /// * `user_id` - The user ID + /// * `storage` - A StorageManager instance for storage operations + /// + /// # Returns + /// * `Result` - The created FileInfo or an error + pub async fn new_with_storage( + field_data: FieldData, + db_client: &SurrealDbClient, + user_id: &str, + storage: &StorageManager, + ) -> Result { + let file = field_data.contents; + let file_name = field_data + .metadata + .file_name + .ok_or(FileError::MissingFileName)?; + let original_file_name = file_name.clone(); + + // Calculate SHA256 + let sha256 = Self::get_sha(&file).await?; + + // Early return if file already exists + match Self::get_by_sha(&sha256, db_client).await { + Ok(existing_file) => { + info!("File already exists with SHA256: {}", sha256); + return Ok(existing_file); + } + Err(FileError::FileNotFound(_)) => (), // Expected case for new files + Err(e) => return Err(e), // Propagate unexpected errors + } + + // Generate UUID and prepare paths + let uuid = Uuid::new_v4(); + let sanitized_file_name = Self::sanitize_file_name(&file_name); + let now = Utc::now(); + + let path = + Self::persist_file_with_storage(&uuid, file, &sanitized_file_name, user_id, storage) + .await?; + + // Create FileInfo struct + let file_info = FileInfo { + id: uuid.to_string(), + user_id: user_id.to_string(), + sha256, + file_name: original_file_name, + path, + mime_type: Self::guess_mime_type(Path::new(&file_name)), + created_at: now, + updated_at: now, + }; + + // Store in database + db_client + .store_item(file_info.clone()) + .await + .map_err(FileError::SurrealError)?; + + Ok(file_info) + } + + /// Delete a FileInfo by ID using StorageManager for storage operations. + /// + /// # Arguments + /// * `id` - ID of the FileInfo + /// * `db_client` - Reference to SurrealDbClient + /// * `storage` - A StorageManager instance for storage operations + /// + /// # Returns + /// * `Result<(), AppError>` - Success or error + pub async fn delete_by_id_with_storage( + id: &str, + db_client: &SurrealDbClient, + storage: &StorageManager, + ) -> Result<(), AppError> { + // Get the FileInfo from the database + let Some(file_info) = db_client.get_item::(id).await? else { + return Ok(()); + }; + + // Remove the object's parent prefix in the object store + let (parent_prefix, _file_name) = store::split_object_path(&file_info.path) + .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; + storage + .delete_prefix(&parent_prefix) + .await + .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; + info!( + "Removed object prefix {} and its contents via StorageManager", + parent_prefix + ); + + // Delete the FileInfo from the database + db_client.delete_item::(id).await?; + + Ok(()) + } + + /// Retrieve file content using StorageManager for storage operations. + /// + /// # Arguments + /// * `storage` - A StorageManager instance for storage operations + /// + /// # Returns + /// * `Result` - The file content or an error + pub async fn get_content_with_storage( + &self, + storage: &StorageManager, + ) -> Result { + storage + .get(&self.path) + .await + .map_err(|e: object_store::Error| AppError::from(anyhow::anyhow!(e))) + } + + /// Persist file to storage using StorageManager. + /// + /// # Arguments + /// * `uuid` - The UUID for the file + /// * `file` - The temporary file to persist + /// * `file_name` - The name of the file + /// * `user_id` - The user ID + /// * `storage` - A StorageManager instance for storage operations + /// + /// # Returns + /// * `Result` - The logical object location or an error. + async fn persist_file_with_storage( + uuid: &Uuid, + file: NamedTempFile, + file_name: &str, + user_id: &str, + storage: &StorageManager, + ) -> Result { + // Logical object location relative to the store root + let location = format!("{}/{}/{}", user_id, uuid, file_name); + info!("Persisting to object location: {}", location); + + let bytes = tokio::fs::read(file.path()).await?; + storage + .put(&location, bytes.into()) + .await + .map_err(FileError::from)?; + + Ok(location) + } } #[cfg(test)] mod tests { use super::*; - use crate::utils::config::{AppConfig, PdfIngestMode::LlmFirst, StorageKind}; + use crate::storage::store::testing::TestStorageManager; use axum::http::HeaderMap; use axum_typed_multipart::FieldMetadata; - use std::io::Write; + use std::{io::Write, path::Path}; use tempfile::NamedTempFile; - fn test_config(data_dir: &str) -> AppConfig { - AppConfig { - data_dir: data_dir.to_string(), - openai_api_key: "test_key".to_string(), - surrealdb_address: "test_address".to_string(), - surrealdb_username: "test_user".to_string(), - surrealdb_password: "test_pass".to_string(), - surrealdb_namespace: "test_ns".to_string(), - surrealdb_database: "test_db".to_string(), - http_port: 3000, - openai_base_url: "..".to_string(), - storage: StorageKind::Local, - pdf_ingest_mode: LlmFirst, - ..Default::default() - } - } - /// Creates a test temporary file with the given content fn create_test_file(content: &[u8], file_name: &str) -> FieldData { let mut temp_file = NamedTempFile::new().expect("Failed to create temp file"); @@ -316,33 +337,39 @@ mod tests { } #[tokio::test] - async fn test_fileinfo_create_read_delete() { - // Setup in-memory database for testing + async fn test_fileinfo_create_read_delete_with_storage_manager() { let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) .await .expect("Failed to start in-memory surrealdb"); + db.apply_migrations().await.unwrap(); - // Create a test file - let content = b"This is a test file for cross-filesystem operations"; - let file_name = "cross_fs_test.txt"; + let content = b"This is a test file for StorageManager operations"; + let file_name = "storage_manager_test.txt"; let field_data = create_test_file(content, file_name); - // Create a FileInfo instance with data_dir in /tmp + // Create test storage manager (memory backend) + let test_storage = store::testing::TestStorageManager::new_memory() + .await + .expect("Failed to create test storage manager"); + + // Create a FileInfo instance with storage manager let user_id = "test_user"; - let config = test_config("/tmp/minne_test_data"); - // Test file creation - let file_info = FileInfo::new(field_data, &db, user_id, &config) - .await - .expect("Failed to create file across filesystems"); + // Test file creation with StorageManager + let file_info = + FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage()) + .await + .expect("Failed to create file with StorageManager"); + assert_eq!(file_info.file_name, file_name); - // Verify the file exists via object_store and has correct content - let bytes = store::get_bytes_at(&file_info.path, &config) + // Verify the file exists via StorageManager and has correct content + let bytes = file_info + .get_content_with_storage(test_storage.storage()) .await - .expect("Failed to read file content via object_store"); - assert_eq!(bytes, content.as_slice()); + .expect("Failed to read file content via StorageManager"); + assert_eq!(bytes.as_ref(), content); // Test file reading let retrieved = FileInfo::get_by_id(&file_info.id, &db) @@ -350,51 +377,89 @@ mod tests { .expect("Failed to retrieve file info"); assert_eq!(retrieved.id, file_info.id); assert_eq!(retrieved.sha256, file_info.sha256); + assert_eq!(retrieved.file_name, file_name); - // Test file deletion - FileInfo::delete_by_id(&file_info.id, &db, &config) + // Test file deletion with StorageManager + FileInfo::delete_by_id_with_storage(&file_info.id, &db, test_storage.storage()) .await - .expect("Failed to delete file"); - assert!( - store::get_bytes_at(&file_info.path, &config).await.is_err(), - "File should be deleted" - ); + .expect("Failed to delete file with StorageManager"); - // Clean up the test directory - let _ = tokio::fs::remove_dir_all(&config.data_dir).await; + let deleted_result = file_info + .get_content_with_storage(test_storage.storage()) + .await; + assert!(deleted_result.is_err(), "File should be deleted"); + + // No cleanup needed - TestStorageManager handles it automatically } #[tokio::test] - async fn test_fileinfo_duplicate_detection() { - // Setup in-memory database for testing + async fn test_fileinfo_preserves_original_filename_and_sanitizes_path() { let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) .await .expect("Failed to start in-memory surrealdb"); + db.apply_migrations().await.unwrap(); - // Create a test file - let content = b"This is a test file for cross-filesystem duplicate detection"; - let file_name = "cross_fs_duplicate.txt"; + let content = b"filename sanitization"; + let original_name = "Complex name (1).txt"; + let expected_sanitized = "Complex_name__1_.txt"; + let field_data = create_test_file(content, original_name); + + let test_storage = store::testing::TestStorageManager::new_memory() + .await + .expect("Failed to create test storage manager"); + + let file_info = + FileInfo::new_with_storage(field_data, &db, "sanitized_user", test_storage.storage()) + .await + .expect("Failed to create file via storage manager"); + + assert_eq!(file_info.file_name, original_name); + + let stored_name = Path::new(&file_info.path) + .file_name() + .and_then(|name| name.to_str()) + .expect("stored name"); + assert_eq!(stored_name, expected_sanitized); + } + + #[tokio::test] + async fn test_fileinfo_duplicate_detection_with_storage_manager() { + let namespace = "test_ns"; + let database = &Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, database) + .await + .expect("Failed to start in-memory surrealdb"); + db.apply_migrations().await.unwrap(); + + let content = b"This is a test file for StorageManager duplicate detection"; + let file_name = "storage_manager_duplicate.txt"; let field_data = create_test_file(content, file_name); - // Create a FileInfo instance with data_dir in /tmp + // Create test storage manager + let test_storage = store::testing::TestStorageManager::new_memory() + .await + .expect("Failed to create test storage manager"); + + // Create a FileInfo instance with storage manager let user_id = "test_user"; - let config = test_config("/tmp/minne_test_data"); // Store the original file - let original_file_info = FileInfo::new(field_data, &db, user_id, &config) - .await - .expect("Failed to create original file"); + let original_file_info = + FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage()) + .await + .expect("Failed to create original file with StorageManager"); // Create another file with the same content but different name - let duplicate_name = "cross_fs_duplicate_2.txt"; + let duplicate_name = "storage_manager_duplicate_2.txt"; let field_data2 = create_test_file(content, duplicate_name); // The system should detect it's the same file and return the original FileInfo - let duplicate_file_info = FileInfo::new(field_data2, &db, user_id, &config) - .await - .expect("Failed to process duplicate file"); + let duplicate_file_info = + FileInfo::new_with_storage(field_data2, &db, user_id, test_storage.storage()) + .await + .expect("Failed to process duplicate file with StorageManager"); // Verify duplicate detection worked assert_eq!(duplicate_file_info.id, original_file_info.id); @@ -402,34 +467,48 @@ mod tests { assert_eq!(duplicate_file_info.file_name, file_name); assert_ne!(duplicate_file_info.file_name, duplicate_name); - // Clean up - FileInfo::delete_by_id(&original_file_info.id, &db, &config) + // Verify both files have the same content (they should point to the same file) + let original_content = original_file_info + .get_content_with_storage(test_storage.storage()) .await - .expect("Failed to delete file"); - let _ = tokio::fs::remove_dir_all(&config.data_dir).await; + .unwrap(); + let duplicate_content = duplicate_file_info + .get_content_with_storage(test_storage.storage()) + .await + .unwrap(); + assert_eq!(original_content.as_ref(), content); + assert_eq!(duplicate_content.as_ref(), content); + + // Clean up + FileInfo::delete_by_id_with_storage(&original_file_info.id, &db, test_storage.storage()) + .await + .expect("Failed to delete original file with StorageManager"); } #[tokio::test] async fn test_file_creation() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) .await .expect("Failed to start in-memory surrealdb"); + db.apply_migrations() + .await + .expect("Failed to apply migrations"); - // Create a test file let content = b"This is a test file content"; let file_name = "test_file.txt"; let field_data = create_test_file(content, file_name); - // Create a FileInfo instance + // Create a FileInfo instance with StorageManager let user_id = "test_user"; - let config = test_config("./data"); - let file_info = FileInfo::new(field_data, &db, user_id, &config).await; + let test_storage = TestStorageManager::new_memory() + .await + .expect("create test storage manager"); + let file_info = + FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage()).await; - // We can't fully test persistence to disk in unit tests, - // but we can verify the database record was created + // Verify the FileInfo was created successfully assert!(file_info.is_ok()); let file_info = file_info.unwrap(); @@ -459,33 +538,39 @@ mod tests { #[tokio::test] async fn test_file_duplicate_detection() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) .await .expect("Failed to start in-memory surrealdb"); + db.apply_migrations() + .await + .expect("Failed to apply migrations"); // First, store a file with known content let content = b"This is a test file for duplicate detection"; let file_name = "original.txt"; let user_id = "test_user"; - let config = test_config("./data"); + let test_storage = TestStorageManager::new_memory() + .await + .expect("create test storage manager"); let field_data1 = create_test_file(content, file_name); - let original_file_info = FileInfo::new(field_data1, &db, user_id, &config) - .await - .expect("Failed to create original file"); + let original_file_info = + FileInfo::new_with_storage(field_data1, &db, user_id, test_storage.storage()) + .await + .expect("Failed to create original file"); // Now try to store another file with the same content but different name let duplicate_name = "duplicate.txt"; let field_data2 = create_test_file(content, duplicate_name); // The system should detect it's the same file and return the original FileInfo - let duplicate_file_info = FileInfo::new(field_data2, &db, user_id, &config) - .await - .expect("Failed to process duplicate file"); + let duplicate_file_info = + FileInfo::new_with_storage(field_data2, &db, user_id, test_storage.storage()) + .await + .expect("Failed to process duplicate file"); // The returned FileInfo should match the original assert_eq!(duplicate_file_info.id, original_file_info.id); @@ -553,7 +638,6 @@ mod tests { #[tokio::test] async fn test_get_by_sha_not_found() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) @@ -574,7 +658,6 @@ mod tests { #[tokio::test] async fn test_manual_file_info_creation() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) @@ -615,23 +698,28 @@ mod tests { #[tokio::test] async fn test_delete_by_id() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) .await .expect("Failed to start in-memory surrealdb"); + db.apply_migrations() + .await + .expect("Failed to apply migrations"); - // Create and persist a test file via FileInfo::new + // Create and persist a test file via FileInfo::new_with_storage let user_id = "user123"; - let cfg = test_config("./data"); + let test_storage = TestStorageManager::new_memory() + .await + .expect("create test storage manager"); let temp = create_test_file(b"test content", "test_file.txt"); - let file_info = FileInfo::new(temp, &db, user_id, &cfg) + let file_info = FileInfo::new_with_storage(temp, &db, user_id, test_storage.storage()) .await .expect("create file"); - // Delete the file - let delete_result = FileInfo::delete_by_id(&file_info.id, &db, &cfg).await; + // Delete the file using StorageManager + let delete_result = + FileInfo::delete_by_id_with_storage(&file_info.id, &db, test_storage.storage()).await; // Delete should be successful assert!( @@ -650,13 +738,12 @@ mod tests { "FileInfo should be deleted from the database" ); - // Verify content no longer retrievable - assert!(store::get_bytes_at(&file_info.path, &cfg).await.is_err()); + // Verify content no longer retrievable from storage + assert!(test_storage.storage().get(&file_info.path).await.is_err()); } #[tokio::test] async fn test_delete_by_id_not_found() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) @@ -664,19 +751,16 @@ mod tests { .expect("Failed to start in-memory surrealdb"); // Try to delete a file that doesn't exist - let result = FileInfo::delete_by_id( - "nonexistent_id", - &db, - &test_config("./data"), - ) - .await; + let test_storage = TestStorageManager::new_memory().await.unwrap(); + let result = + FileInfo::delete_by_id_with_storage("nonexistent_id", &db, test_storage.storage()) + .await; // Should succeed even if the file record does not exist assert!(result.is_ok()); } #[tokio::test] async fn test_get_by_id() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) @@ -717,7 +801,6 @@ mod tests { #[tokio::test] async fn test_get_by_id_not_found() { - // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); let db = SurrealDbClient::memory(namespace, database) @@ -741,43 +824,197 @@ mod tests { } } + // StorageManager-based tests #[tokio::test] - async fn test_fileinfo_persist_with_custom_root() { - // Setup in-memory database for testing - let namespace = "test_ns"; - let database = &Uuid::new_v4().to_string(); - let db = SurrealDbClient::memory(namespace, database) + async fn test_file_info_new_with_storage_memory() { + // Setup + let db = SurrealDbClient::memory("test_ns", "test_file_storage_memory") .await - .expect("Failed to start in-memory surrealdb"); + .unwrap(); + db.apply_migrations().await.unwrap(); - // Create a test file - let content = b"This is a test file for data directory configuration"; - let file_name = "data_dir_test.txt"; - let field_data = create_test_file(content, file_name); - - // Create a FileInfo instance with a custom data directory + let content = b"This is a test file for StorageManager"; + let field_data = create_test_file(content, "test_storage.txt"); let user_id = "test_user"; - let custom_data_dir = "/tmp/minne_custom_data_dir"; - let config = test_config(custom_data_dir); - // Test file creation - let file_info = FileInfo::new(field_data, &db, user_id, &config) + // Create test storage manager + let storage = store::testing::TestStorageManager::new_memory() .await - .expect("Failed to create file in custom data directory"); + .unwrap(); - // Verify the file has the correct content via object_store - let file_content = store::get_bytes_at(&file_info.path, &config) + // Test file creation with StorageManager + let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage()) .await - .expect("Failed to read file content"); - assert_eq!(file_content.as_ref(), content); + .expect("Failed to create file with StorageManager"); - // Test file deletion - FileInfo::delete_by_id(&file_info.id, &db, &config) + // Verify the file was created correctly + assert_eq!(file_info.user_id, user_id); + assert_eq!(file_info.file_name, "test_storage.txt"); + assert!(!file_info.sha256.is_empty()); + assert!(!file_info.path.is_empty()); + + // Test content retrieval with StorageManager + let retrieved_content = file_info + .get_content_with_storage(storage.storage()) .await - .expect("Failed to delete file"); - assert!(store::get_bytes_at(&file_info.path, &config).await.is_err()); + .expect("Failed to get file content with StorageManager"); + assert_eq!(retrieved_content.as_ref(), content); - // Clean up the test directory - let _ = tokio::fs::remove_dir_all(custom_data_dir).await; + // Test file deletion with StorageManager + FileInfo::delete_by_id_with_storage(&file_info.id, &db, storage.storage()) + .await + .expect("Failed to delete file with StorageManager"); + + // Verify file is deleted + let deleted_content_result = file_info.get_content_with_storage(storage.storage()).await; + assert!(deleted_content_result.is_err()); + } + + #[tokio::test] + async fn test_file_info_new_with_storage_local() { + // Setup + let db = SurrealDbClient::memory("test_ns", "test_file_storage_local") + .await + .unwrap(); + db.apply_migrations().await.unwrap(); + + let content = b"This is a test file for StorageManager with local storage"; + let field_data = create_test_file(content, "test_local.txt"); + let user_id = "test_user"; + + // Create test storage manager with local backend + let storage = store::testing::TestStorageManager::new_local() + .await + .unwrap(); + + // Test file creation with StorageManager + let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage()) + .await + .expect("Failed to create file with StorageManager"); + + // Verify the file was created correctly + assert_eq!(file_info.user_id, user_id); + assert_eq!(file_info.file_name, "test_local.txt"); + assert!(!file_info.sha256.is_empty()); + assert!(!file_info.path.is_empty()); + + // Test content retrieval with StorageManager + let retrieved_content = file_info + .get_content_with_storage(storage.storage()) + .await + .expect("Failed to get file content with StorageManager"); + assert_eq!(retrieved_content.as_ref(), content); + + // Test file deletion with StorageManager + FileInfo::delete_by_id_with_storage(&file_info.id, &db, storage.storage()) + .await + .expect("Failed to delete file with StorageManager"); + + // Verify file is deleted + let deleted_content_result = file_info.get_content_with_storage(storage.storage()).await; + assert!(deleted_content_result.is_err()); + } + + #[tokio::test] + async fn test_file_info_storage_manager_persistence() { + // Setup + let db = SurrealDbClient::memory("test_ns", "test_file_persistence") + .await + .unwrap(); + db.apply_migrations().await.unwrap(); + + let content = b"Test content for persistence"; + let field_data = create_test_file(content, "persistence_test.txt"); + let user_id = "test_user"; + + // Create test storage manager + let storage = store::testing::TestStorageManager::new_memory() + .await + .unwrap(); + + // Create file + let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage()) + .await + .expect("Failed to create file"); + + // Test that data persists across multiple operations with the same StorageManager + let retrieved_content_1 = file_info + .get_content_with_storage(storage.storage()) + .await + .unwrap(); + let retrieved_content_2 = file_info + .get_content_with_storage(storage.storage()) + .await + .unwrap(); + + assert_eq!(retrieved_content_1.as_ref(), content); + assert_eq!(retrieved_content_2.as_ref(), content); + + // Test that different StorageManager instances don't share data (memory storage isolation) + let storage2 = store::testing::TestStorageManager::new_memory() + .await + .unwrap(); + let isolated_content_result = file_info.get_content_with_storage(storage2.storage()).await; + assert!( + isolated_content_result.is_err(), + "Different StorageManager should not have access to same data" + ); + } + + #[tokio::test] + async fn test_file_info_storage_manager_equivalence() { + // Setup + let db = SurrealDbClient::memory("test_ns", "test_file_equivalence") + .await + .unwrap(); + db.apply_migrations().await.unwrap(); + + let content = b"Test content for equivalence testing"; + let field_data1 = create_test_file(content, "equivalence_test_1.txt"); + let field_data2 = create_test_file(content, "equivalence_test_2.txt"); + let user_id = "test_user"; + + // Create single storage manager and reuse it + let storage_manager = store::testing::TestStorageManager::new_memory() + .await + .unwrap(); + let storage = storage_manager.storage(); + + // Create multiple files with the same storage manager + let file_info_1 = FileInfo::new_with_storage(field_data1, &db, user_id, &storage) + .await + .expect("Failed to create file 1"); + + let file_info_2 = FileInfo::new_with_storage(field_data2, &db, user_id, &storage) + .await + .expect("Failed to create file 2"); + + // Test that both files can be retrieved with the same storage backend + let content_1 = file_info_1 + .get_content_with_storage(&storage) + .await + .unwrap(); + let content_2 = file_info_2 + .get_content_with_storage(&storage) + .await + .unwrap(); + + assert_eq!(content_1.as_ref(), content); + assert_eq!(content_2.as_ref(), content); + + // Test that files can be deleted with the same storage manager + FileInfo::delete_by_id_with_storage(&file_info_1.id, &db, &storage) + .await + .unwrap(); + FileInfo::delete_by_id_with_storage(&file_info_2.id, &db, &storage) + .await + .unwrap(); + + // Verify files are deleted + let deleted_content_1 = file_info_1.get_content_with_storage(&storage).await; + let deleted_content_2 = file_info_2.get_content_with_storage(&storage).await; + + assert!(deleted_content_1.is_err()); + assert!(deleted_content_2.is_err()); } } diff --git a/common/src/storage/types/scratchpad.rs b/common/src/storage/types/scratchpad.rs index 796d225..ae3fedb 100644 --- a/common/src/storage/types/scratchpad.rs +++ b/common/src/storage/types/scratchpad.rs @@ -471,12 +471,15 @@ mod tests { .expect("Failed to apply migrations"); let user_id = "test_user_123"; - let scratchpad = Scratchpad::new(user_id.to_string(), "Test Timezone Scratchpad".to_string()); + let scratchpad = + Scratchpad::new(user_id.to_string(), "Test Timezone Scratchpad".to_string()); let scratchpad_id = scratchpad.id.clone(); db.store_item(scratchpad).await.unwrap(); - let retrieved = Scratchpad::get_by_id(&scratchpad_id, user_id, &db).await.unwrap(); + let retrieved = Scratchpad::get_by_id(&scratchpad_id, user_id, &db) + .await + .unwrap(); // Test that datetime fields are preserved and can be used for timezone formatting assert!(retrieved.created_at.timestamp() > 0); diff --git a/common/src/utils/config.rs b/common/src/utils/config.rs index 99b3110..cf4ef27 100644 --- a/common/src/utils/config.rs +++ b/common/src/utils/config.rs @@ -1,7 +1,7 @@ use config::{Config, ConfigError, Environment, File}; use serde::Deserialize; -#[derive(Clone, Deserialize, Debug)] +#[derive(Clone, Deserialize, Debug, PartialEq)] #[serde(rename_all = "lowercase")] pub enum StorageKind { Local, diff --git a/html-router/src/html_state.rs b/html-router/src/html_state.rs index c982fce..a372098 100644 --- a/html-router/src/html_state.rs +++ b/html-router/src/html_state.rs @@ -1,4 +1,4 @@ -use common::storage::db::SurrealDbClient; +use common::storage::{db::SurrealDbClient, store::StorageManager}; use common::utils::template_engine::{ProvidesTemplateEngine, TemplateEngine}; use common::{create_template_engine, storage::db::ProvidesDb, utils::config::AppConfig}; use composite_retrieval::reranking::RerankerPool; @@ -14,14 +14,16 @@ pub struct HtmlState { pub templates: Arc, pub session_store: Arc, pub config: AppConfig, + pub storage: StorageManager, pub reranker_pool: Option>, } impl HtmlState { - pub fn new_with_resources( + pub async fn new_with_resources( db: Arc, openai_client: Arc, session_store: Arc, + storage: StorageManager, config: AppConfig, reranker_pool: Option>, ) -> Result> { @@ -34,6 +36,7 @@ impl HtmlState { session_store, templates: Arc::new(template_engine), config, + storage, reranker_pool, }) } diff --git a/html-router/src/routes/content/handlers.rs b/html-router/src/routes/content/handlers.rs index 2679766..c2f0ec5 100644 --- a/html-router/src/routes/content/handlers.rs +++ b/html-router/src/routes/content/handlers.rs @@ -190,7 +190,7 @@ pub async fn delete_text_content( TextContent::has_other_with_file(&file_info.id, &text_content.id, &state.db).await?; if !file_in_use { - FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?; + FileInfo::delete_by_id_with_storage(&file_info.id, &state.db, &state.storage).await?; } } diff --git a/html-router/src/routes/index/handlers.rs b/html-router/src/routes/index/handlers.rs index b43eef1..fa6e5fa 100644 --- a/html-router/src/routes/index/handlers.rs +++ b/html-router/src/routes/index/handlers.rs @@ -17,7 +17,6 @@ use crate::{ utils::text_content_preview::truncate_text_contents, AuthSessionType, }; -use common::storage::store; use common::storage::types::user::DashboardStats; use common::{ error::AppError, @@ -86,7 +85,7 @@ pub async fn delete_text_content( TextContent::has_other_with_file(&file_info.id, &text_content.id, &state.db).await?; if !file_in_use { - FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?; + FileInfo::delete_by_id_with_storage(&file_info.id, &state.db, &state.storage).await?; } } @@ -278,7 +277,7 @@ pub async fn serve_file( return Ok(TemplateResponse::unauthorized().into_response()); } - let stream = match store::get_stream_at(&file_info.path, &state.config).await { + let stream = match state.storage.get_stream(&file_info.path).await { Ok(s) => s, Err(_) => return Ok(TemplateResponse::server_error().into_response()), }; diff --git a/html-router/src/routes/ingestion/handlers.rs b/html-router/src/routes/ingestion/handlers.rs index 8168eaa..eaa5eac 100644 --- a/html-router/src/routes/ingestion/handlers.rs +++ b/html-router/src/routes/ingestion/handlers.rs @@ -98,7 +98,8 @@ pub async fn process_ingress_form( info!("{:?}", input); let file_infos = try_join_all(input.files.into_iter().map(|file| { - FileInfo::new(file, &state.db, &user.id, &state.config).map_err(AppError::from) + FileInfo::new_with_storage(file, &state.db, &user.id, &state.storage) + .map_err(AppError::from) })) .await?; diff --git a/ingestion-pipeline/Cargo.toml b/ingestion-pipeline/Cargo.toml index 9a200f2..124288a 100644 --- a/ingestion-pipeline/Cargo.toml +++ b/ingestion-pipeline/Cargo.toml @@ -18,7 +18,8 @@ async-openai = { workspace = true } surrealdb = { workspace = true } dom_smoothie = { workspace = true } tempfile = { workspace = true } -axum_typed_multipart = { workspace = true} +axum_typed_multipart = { workspace = true} +anyhow = { workspace = true } reqwest = { workspace = true } chrono = { workspace = true } text-splitter = { workspace = true } @@ -28,6 +29,7 @@ headless_chrome = { workspace = true } base64 = { workspace = true } pdf-extract = "0.9" lopdf = "0.32" +bytes = { workspace = true } common = { path = "../common" } composite-retrieval = { path = "../composite-retrieval" } diff --git a/ingestion-pipeline/src/pipeline/mod.rs b/ingestion-pipeline/src/pipeline/mod.rs index 6abb810..686162b 100644 --- a/ingestion-pipeline/src/pipeline/mod.rs +++ b/ingestion-pipeline/src/pipeline/mod.rs @@ -19,6 +19,7 @@ use common::{ error::AppError, storage::{ db::SurrealDbClient, + store::StorageManager, types::{ ingestion_payload::IngestionPayload, ingestion_task::{IngestionTask, TaskErrorInfo}, @@ -47,12 +48,14 @@ impl IngestionPipeline { openai_client: Arc>, config: AppConfig, reranker_pool: Option>, + storage: StorageManager, ) -> Result { let services = DefaultPipelineServices::new( db.clone(), openai_client.clone(), config.clone(), reranker_pool, + storage, ); Self::with_services(db, IngestionConfig::default(), Arc::new(services)) diff --git a/ingestion-pipeline/src/pipeline/preparation.rs b/ingestion-pipeline/src/pipeline/preparation.rs index 2582bab..b66b1d2 100644 --- a/ingestion-pipeline/src/pipeline/preparation.rs +++ b/ingestion-pipeline/src/pipeline/preparation.rs @@ -2,6 +2,7 @@ use common::{ error::AppError, storage::{ db::SurrealDbClient, + store::StorageManager, types::{ ingestion_payload::IngestionPayload, text_content::{TextContent, UrlInfo}, @@ -19,6 +20,7 @@ pub(crate) async fn to_text_content( db: &SurrealDbClient, config: &AppConfig, openai_client: &async_openai::Client, + storage: &StorageManager, ) -> Result { match ingestion_payload { IngestionPayload::Url { @@ -27,7 +29,7 @@ pub(crate) async fn to_text_content( category, user_id, } => { - let (article, file_info) = extract_text_from_url(&url, db, &user_id, config).await?; + let (article, file_info) = extract_text_from_url(&url, db, &user_id, storage).await?; Ok(TextContent::new( article.text_content.into(), Some(context), @@ -60,7 +62,8 @@ pub(crate) async fn to_text_content( category, user_id, } => { - let text = extract_text_from_file(&file_info, db, openai_client, config).await?; + let text = + extract_text_from_file(&file_info, db, openai_client, config, storage).await?; Ok(TextContent::new( text, Some(context), diff --git a/ingestion-pipeline/src/pipeline/services.rs b/ingestion-pipeline/src/pipeline/services.rs index 0ac753e..3a63203 100644 --- a/ingestion-pipeline/src/pipeline/services.rs +++ b/ingestion-pipeline/src/pipeline/services.rs @@ -10,6 +10,7 @@ use common::{ error::AppError, storage::{ db::SurrealDbClient, + store::StorageManager, types::{ ingestion_payload::IngestionPayload, knowledge_entity::KnowledgeEntity, knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings, @@ -65,6 +66,7 @@ pub struct DefaultPipelineServices { openai_client: Arc>, config: AppConfig, reranker_pool: Option>, + storage: StorageManager, } impl DefaultPipelineServices { @@ -73,12 +75,14 @@ impl DefaultPipelineServices { openai_client: Arc>, config: AppConfig, reranker_pool: Option>, + storage: StorageManager, ) -> Self { Self { db, openai_client, config, reranker_pool, + storage, } } @@ -144,7 +148,14 @@ impl PipelineServices for DefaultPipelineServices { &self, payload: IngestionPayload, ) -> Result { - to_text_content(payload, &self.db, &self.config, &self.openai_client).await + to_text_content( + payload, + &self.db, + &self.config, + &self.openai_client, + &self.storage, + ) + .await } async fn retrieve_similar_entities( diff --git a/ingestion-pipeline/src/utils/file_text_extraction.rs b/ingestion-pipeline/src/utils/file_text_extraction.rs index 21d6479..06b3dab 100644 --- a/ingestion-pipeline/src/utils/file_text_extraction.rs +++ b/ingestion-pipeline/src/utils/file_text_extraction.rs @@ -1,63 +1,200 @@ +use anyhow::anyhow; use common::{ error::AppError, - storage::{db::SurrealDbClient, store, types::file_info::FileInfo}, + storage::{db::SurrealDbClient, store::StorageManager, types::file_info::FileInfo}, utils::config::AppConfig, }; +use std::{ + env, + io::{Error as IoError, ErrorKind}, + path::{Path, PathBuf}, +}; +use uuid::Uuid; use super::{ audio_transcription::transcribe_audio_file, image_parsing::extract_text_from_image, pdf_ingestion::extract_pdf_content, }; +struct TempPathGuard { + path: PathBuf, +} + +impl TempPathGuard { + fn as_path(&self) -> &Path { + &self.path + } +} + +impl Drop for TempPathGuard { + fn drop(&mut self) { + let _ = std::fs::remove_file(&self.path); + } +} + +async fn materialize_temp_file( + bytes: &[u8], + extension: Option<&str>, +) -> Result { + let mut path = env::temp_dir(); + let mut file_name = format!("minne-ingest-{}", Uuid::new_v4()); + + if let Some(ext) = extension { + if !ext.is_empty() { + file_name.push('.'); + file_name.push_str(ext); + } + } + + path.push(file_name); + + tokio::fs::write(&path, bytes).await?; + + Ok(TempPathGuard { path }) +} + +async fn resolve_existing_local_path(storage: &StorageManager, location: &str) -> Option { + let path = storage.resolve_local_path(location)?; + match tokio::fs::metadata(&path).await { + Ok(_) => Some(path), + Err(_) => None, + } +} + +fn infer_extension(file_info: &FileInfo) -> Option { + Path::new(&file_info.path) + .extension() + .and_then(|ext| ext.to_str()) + .map(|ext| ext.to_string()) +} + pub async fn extract_text_from_file( file_info: &FileInfo, db_client: &SurrealDbClient, openai_client: &async_openai::Client, config: &AppConfig, + storage: &StorageManager, ) -> Result { - let base_path = store::resolve_base_dir(config); - let absolute_path = base_path.join(&file_info.path); + let file_bytes = storage + .get(&file_info.path) + .await + .map_err(|e| AppError::from(anyhow!(e)))?; + let local_path = resolve_existing_local_path(storage, &file_info.path).await; match file_info.mime_type.as_str() { "text/plain" | "text/markdown" | "application/octet-stream" | "text/x-rust" => { - let content = tokio::fs::read_to_string(&absolute_path).await?; + let content = String::from_utf8(file_bytes.to_vec()) + .map_err(|err| AppError::Io(IoError::new(ErrorKind::InvalidData, err)))?; Ok(content) } "application/pdf" => { - extract_pdf_content( - &absolute_path, + if let Some(path) = local_path.as_ref() { + return extract_pdf_content( + path, + db_client, + openai_client, + &config.pdf_ingest_mode, + ) + .await; + } + + let temp_guard = materialize_temp_file(file_bytes.as_ref(), Some("pdf")).await?; + let result = extract_pdf_content( + temp_guard.as_path(), db_client, openai_client, &config.pdf_ingest_mode, ) - .await + .await; + drop(temp_guard); + result } "image/png" | "image/jpeg" => { - let path_str = absolute_path - .to_str() - .ok_or_else(|| { - AppError::Processing(format!( - "Encountered a non-UTF8 path while reading image {}", - file_info.id - )) - })? - .to_string(); - let content = extract_text_from_image(&path_str, db_client, openai_client).await?; + let content = + extract_text_from_image(file_bytes.as_ref(), db_client, openai_client).await?; Ok(content) } "audio/mpeg" | "audio/mp3" | "audio/wav" | "audio/x-wav" | "audio/webm" | "audio/mp4" | "audio/ogg" | "audio/flac" => { - let path_str = absolute_path - .to_str() - .ok_or_else(|| { + if let Some(path) = local_path.as_ref() { + let path_str = path.to_str().ok_or_else(|| { AppError::Processing(format!( "Encountered a non-UTF8 path while reading audio {}", file_info.id )) - })? - .to_string(); - transcribe_audio_file(&path_str, db_client, openai_client).await + })?; + return transcribe_audio_file(path_str, db_client, openai_client).await; + } + + let extension = infer_extension(file_info); + let temp_guard = + materialize_temp_file(file_bytes.as_ref(), extension.as_deref()).await?; + let path_str = temp_guard.as_path().to_str().ok_or_else(|| { + AppError::Processing(format!( + "Encountered a non-UTF8 path while reading audio {}", + file_info.id + )) + })?; + let result = transcribe_audio_file(path_str, db_client, openai_client).await; + drop(temp_guard); + result } _ => Err(AppError::NotFound(file_info.mime_type.clone())), } } + +#[cfg(test)] +mod tests { + use super::*; + use async_openai::{config::OpenAIConfig, Client}; + use bytes::Bytes; + use chrono::Utc; + use common::{ + storage::{db::SurrealDbClient, store::StorageManager}, + utils::config::{AppConfig, StorageKind}, + }; + + #[tokio::test] + async fn extracts_text_using_memory_storage_backend() { + let mut config = AppConfig::default(); + config.storage = StorageKind::Memory; + + let storage = StorageManager::new(&config) + .await + .expect("create storage manager"); + + let location = "user/test/file.txt"; + let contents = b"hello from memory storage"; + + storage + .put(location, Bytes::from(contents.as_slice().to_vec())) + .await + .expect("write object"); + + let now = Utc::now(); + let file_info = FileInfo { + id: "file".into(), + created_at: now, + updated_at: now, + sha256: "sha256".into(), + path: location.to_string(), + file_name: "file.txt".into(), + mime_type: "text/plain".into(), + user_id: "user".into(), + }; + + let namespace = "test_ns"; + let database = &Uuid::new_v4().to_string(); + let db = SurrealDbClient::memory(namespace, database) + .await + .expect("create surreal memory"); + + let openai_client = Client::with_config(OpenAIConfig::default()); + + let text = extract_text_from_file(&file_info, &db, &openai_client, &config, &storage) + .await + .expect("extract text"); + + assert_eq!(text, String::from_utf8_lossy(contents)); + } +} diff --git a/ingestion-pipeline/src/utils/image_parsing.rs b/ingestion-pipeline/src/utils/image_parsing.rs index 8dd184d..f882f4d 100644 --- a/ingestion-pipeline/src/utils/image_parsing.rs +++ b/ingestion-pipeline/src/utils/image_parsing.rs @@ -10,14 +10,13 @@ use common::{ }; pub async fn extract_text_from_image( - path: &str, + image_bytes: &[u8], db: &SurrealDbClient, client: &async_openai::Client, ) -> Result { let system_settings = SystemSettings::get_current(db).await?; - let image_bytes = tokio::fs::read(&path).await?; - let base64_image = STANDARD.encode(&image_bytes); + let base64_image = STANDARD.encode(image_bytes); let image_url = format!("data:image/png;base64,{base64_image}"); diff --git a/ingestion-pipeline/src/utils/url_text_retrieval.rs b/ingestion-pipeline/src/utils/url_text_retrieval.rs index d8cca7e..ca61273 100644 --- a/ingestion-pipeline/src/utils/url_text_retrieval.rs +++ b/ingestion-pipeline/src/utils/url_text_retrieval.rs @@ -3,8 +3,7 @@ use axum_typed_multipart::{FieldData, FieldMetadata}; use chrono::Utc; use common::{ error::AppError, - storage::{db::SurrealDbClient, types::file_info::FileInfo}, - utils::config::AppConfig, + storage::{db::SurrealDbClient, store::StorageManager, types::file_info::FileInfo}, }; use dom_smoothie::{Article, Readability, TextMode}; use headless_chrome::Browser; @@ -19,7 +18,7 @@ pub async fn extract_text_from_url( url: &str, db: &SurrealDbClient, user_id: &str, - config: &AppConfig, + storage: &StorageManager, ) -> Result<(Article, FileInfo), AppError> { info!("Fetching URL: {}", url); let now = Instant::now(); @@ -81,7 +80,7 @@ pub async fn extract_text_from_url( metadata, }; - let file_info = FileInfo::new(field_data, db, user_id, config).await?; + let file_info = FileInfo::new_with_storage(field_data, db, user_id, storage).await?; let config = dom_smoothie::Config { text_mode: TextMode::Markdown, diff --git a/main/src/main.rs b/main/src/main.rs index ccf161a..b2bd31c 100644 --- a/main/src/main.rs +++ b/main/src/main.rs @@ -1,6 +1,8 @@ use api_router::{api_routes_v1, api_state::ApiState}; use axum::{extract::FromRef, Router}; -use common::{storage::db::SurrealDbClient, utils::config::get_config}; +use common::{ + storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config, +}; use composite_retrieval::reranking::RerankerPool; use html_router::{html_routes, html_state::HtmlState}; use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop}; @@ -46,18 +48,20 @@ async fn main() -> Result<(), Box> { let reranker_pool = RerankerPool::maybe_from_config(&config)?; + // Create global storage manager + let storage = StorageManager::new(&config).await?; + let html_state = HtmlState::new_with_resources( db, openai_client, session_store, + storage.clone(), config.clone(), reranker_pool.clone(), - )?; + ) + .await?; - let api_state = ApiState { - db: html_state.db.clone(), - config: config.clone(), - }; + let api_state = ApiState::new(&config, storage.clone()).await?; // Create Axum router let app = Router::new() @@ -115,6 +119,7 @@ async fn main() -> Result<(), Box> { openai_client.clone(), config.clone(), reranker_pool.clone(), + storage.clone(), // Use the global storage manager ) .await .unwrap(), @@ -147,6 +152,7 @@ struct AppState { mod tests { use super::*; use axum::{body::Body, http::Request, http::StatusCode, Router}; + use common::storage::store::StorageManager; use common::utils::config::{AppConfig, PdfIngestMode, StorageKind}; use std::{path::Path, sync::Arc}; use tower::ServiceExt; @@ -195,18 +201,25 @@ mod tests { .with_api_base(&config.openai_base_url), )); + let storage = StorageManager::new(&config) + .await + .expect("failed to build storage manager"); + let html_state = HtmlState::new_with_resources( db.clone(), openai_client, session_store, + storage.clone(), config.clone(), None, ) + .await .expect("failed to build html state"); let api_state = ApiState { - db: html_state.db.clone(), + db: db.clone(), config: config.clone(), + storage, }; let app = Router::new() diff --git a/main/src/server.rs b/main/src/server.rs index 280d659..c5fe83f 100644 --- a/main/src/server.rs +++ b/main/src/server.rs @@ -2,7 +2,9 @@ use std::sync::Arc; use api_router::{api_routes_v1, api_state::ApiState}; use axum::{extract::FromRef, Router}; -use common::{storage::db::SurrealDbClient, utils::config::get_config}; +use common::{ + storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config, +}; use composite_retrieval::reranking::RerankerPool; use html_router::{html_routes, html_state::HtmlState}; use tracing::info; @@ -44,18 +46,20 @@ async fn main() -> Result<(), Box> { let reranker_pool = RerankerPool::maybe_from_config(&config)?; + // Create global storage manager + let storage = StorageManager::new(&config).await?; + let html_state = HtmlState::new_with_resources( db, openai_client, session_store, + storage.clone(), config.clone(), reranker_pool, - )?; + ) + .await?; - let api_state = ApiState { - db: html_state.db.clone(), - config: config.clone(), - }; + let api_state = ApiState::new(&config, storage).await?; // Create Axum router let app = Router::new() diff --git a/main/src/worker.rs b/main/src/worker.rs index 9f7c050..d56e9f6 100644 --- a/main/src/worker.rs +++ b/main/src/worker.rs @@ -1,6 +1,8 @@ use std::sync::Arc; -use common::{storage::db::SurrealDbClient, utils::config::get_config}; +use common::{ + storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config, +}; use composite_retrieval::reranking::RerankerPool; use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop}; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; @@ -35,8 +37,18 @@ async fn main() -> Result<(), Box> { let reranker_pool = RerankerPool::maybe_from_config(&config)?; + // Create global storage manager + let storage = StorageManager::new(&config).await?; + let ingestion_pipeline = Arc::new( - IngestionPipeline::new(db.clone(), openai_client.clone(), config, reranker_pool).await?, + IngestionPipeline::new( + db.clone(), + openai_client.clone(), + config, + reranker_pool, + storage, + ) + .await?, ); run_worker_loop(db, ingestion_pipeline).await