Files
minne/common/src/storage/store.rs
T
Per Stark c60db0fb56 perf: avoid small own clones and intermediate Vec allocations
- Derive Copy on 6 small enums (MessageRole, TaskState, StorageKind, EmbeddingBackend, PdfIngestMode, KnowledgeEntityType)
- Change create_ingestion_payload files param from Vec<FileInfo> to &[FileInfo]
- Remove 5 intermediate Vec allocations (4 embedding serialization + 1 format_history) using write! loop
- Remove 7 unnecessary .clone() calls exposed by Copy derive
2026-05-27 10:28:08 +02:00

1105 lines
38 KiB
Rust

use std::io::ErrorKind;
use std::path::{Component, Path, PathBuf};
use std::sync::Arc;
use anyhow::{anyhow, Result as AnyResult};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::{path::Path as ObjPath, ObjectStore};
use crate::utils::config::{AppConfig, StorageKind};
pub type DynStorage = Arc<dyn ObjectStore>;
/// Storage manager with persistent state and proper lifecycle management.
#[derive(Clone)]
pub struct StorageManager {
// Store from objectstore wrapped as dyn
store: DynStorage,
// Simple enum to track which kind
backend_kind: StorageKind,
// Where on disk
local_base: Option<PathBuf>,
}
impl StorageManager {
/// Create a new StorageManager with the specified configuration.
///
/// This method validates the configuration and creates the appropriate
/// storage backend with proper initialization.
pub async fn new(cfg: &AppConfig) -> object_store::Result<Self> {
let backend_kind = cfg.storage;
let (store, local_base) = create_storage_backend(cfg).await?;
Ok(Self {
store,
backend_kind,
local_base,
})
}
/// Create a StorageManager with a custom storage backend.
///
/// This method is useful for testing scenarios where you want to inject
/// a specific storage backend.
pub fn with_backend(store: DynStorage, backend_kind: StorageKind) -> Self {
Self {
store,
backend_kind,
local_base: None,
}
}
/// Get the storage backend kind.
pub fn backend_kind(&self) -> &StorageKind {
&self.backend_kind
}
/// Access the resolved local base directory when using the local backend.
pub fn local_base_path(&self) -> Option<&Path> {
self.local_base.as_deref()
}
/// Resolve an object location to a filesystem path when using the local backend.
///
/// Returns `None` when the backend is not local or when the provided location includes
/// unsupported components (absolute paths or parent traversals).
pub fn resolve_local_path(&self, location: &str) -> Option<PathBuf> {
let base = self.local_base_path()?;
let relative = Path::new(location);
if relative.is_absolute()
|| relative
.components()
.any(|component| matches!(component, Component::ParentDir | Component::Prefix(_)))
{
return None;
}
Some(base.join(relative))
}
/// Store bytes at the specified location.
///
/// This operation persists data using the underlying storage backend.
/// For memory backends, data persists for the lifetime of the StorageManager.
pub async fn put(&self, location: &str, data: Bytes) -> object_store::Result<()> {
let path = ObjPath::from(location);
let payload = object_store::PutPayload::from_bytes(data);
self.store.put(&path, payload).await.map(|_| ())
}
/// Retrieve bytes from the specified location.
///
/// Returns the full contents buffered in memory.
pub async fn get(&self, location: &str) -> object_store::Result<Bytes> {
let path = ObjPath::from(location);
let result = self.store.get(&path).await?;
result.bytes().await
}
/// Get a streaming handle for large objects.
///
/// Returns a fallible stream of Bytes chunks suitable for large file processing.
pub async fn get_stream(
&self,
location: &str,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let path = ObjPath::from(location);
let result = self.store.get(&path).await?;
Ok(result.into_stream())
}
/// Delete all objects below the specified prefix.
///
/// For local filesystem backends, this also attempts to clean up empty directories.
pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> {
let prefix_path = ObjPath::from(prefix);
let locations = self
.store
.list(Some(&prefix_path))
.map_ok(|m| m.location)
.boxed();
self.store
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;
// Cleanup filesystem directories only for local backend
if matches!(self.backend_kind, StorageKind::Local) {
self.cleanup_filesystem_directories(prefix).await?;
}
Ok(())
}
/// List all objects below the specified prefix.
pub async fn list(
&self,
prefix: Option<&str>,
) -> object_store::Result<Vec<object_store::ObjectMeta>> {
let prefix_path = prefix.map(ObjPath::from);
self.store.list(prefix_path.as_ref()).try_collect().await
}
/// Check if an object exists at the specified location.
pub async fn exists(&self, location: &str) -> object_store::Result<bool> {
let path = ObjPath::from(location);
self.store
.head(&path)
.await
.map(|_| true)
.or_else(|e| match e {
object_store::Error::NotFound { .. } => Ok(false),
_ => Err(e),
})
}
/// Cleanup filesystem directories for local backend.
///
/// This is a best-effort cleanup and ignores errors.
async fn cleanup_filesystem_directories(&self, prefix: &str) -> object_store::Result<()> {
if !matches!(self.backend_kind, StorageKind::Local) {
return Ok(());
}
let Some(base) = &self.local_base else {
return Ok(());
};
let relative = Path::new(prefix);
if relative.is_absolute()
|| relative
.components()
.any(|component| matches!(component, Component::ParentDir | Component::Prefix(_)))
{
tracing::warn!(
prefix = %prefix,
"Skipping directory cleanup for unsupported prefix components"
);
return Ok(());
}
let mut current = base.join(relative);
while current.starts_with(base) && current.as_path() != base.as_path() {
match tokio::fs::remove_dir(&current).await {
Ok(()) => {}
Err(err) => match err.kind() {
ErrorKind::NotFound => {}
ErrorKind::DirectoryNotEmpty => break,
_ => tracing::debug!(
error = %err,
path = %current.display(),
"Failed to remove directory during cleanup"
),
},
}
if let Some(parent) = current.parent() {
current = parent.to_path_buf();
} else {
break;
}
}
Ok(())
}
}
/// Create a storage backend based on configuration.
///
/// This factory function handles the creation and initialization of different
/// storage backends with proper error handling and validation.
async fn create_storage_backend(
cfg: &AppConfig,
) -> object_store::Result<(DynStorage, Option<PathBuf>)> {
match cfg.storage {
StorageKind::Local => {
let base = resolve_base_dir(cfg);
if !base.exists() {
tokio::fs::create_dir_all(&base).await.map_err(|e| {
object_store::Error::Generic {
store: "LocalFileSystem",
source: e.into(),
}
})?;
}
let store = LocalFileSystem::new_with_prefix(base.clone())?;
Ok((Arc::new(store), Some(base)))
}
StorageKind::Memory => {
let store = InMemory::new();
Ok((Arc::new(store), None))
}
StorageKind::S3 => {
let bucket = cfg
.s3_bucket
.as_ref()
.ok_or_else(|| object_store::Error::Generic {
store: "S3",
source: anyhow!("s3_bucket is required for S3 storage").into(),
})?;
let mut builder = AmazonS3Builder::new()
.with_bucket_name(bucket)
.with_allow_http(true);
if let (Ok(key), Ok(secret)) = (
std::env::var("AWS_ACCESS_KEY_ID"),
std::env::var("AWS_SECRET_ACCESS_KEY"),
) {
builder = builder
.with_access_key_id(key)
.with_secret_access_key(secret);
}
if let Some(endpoint) = &cfg.s3_endpoint {
builder = builder.with_endpoint(endpoint);
}
builder = builder.with_region(&cfg.s3_region);
let store = builder.build()?;
Ok((Arc::new(store), None))
}
}
}
/// Testing utilities for storage operations.
///
/// This module provides specialized utilities for testing scenarios with
/// automatic memory backend setup and proper test isolation.
#[cfg(test)]
pub mod testing {
use super::*;
use crate::utils::config::{AppConfig, PdfIngestMode};
use uuid;
const DEFAULT_TEST_S3_BUCKET: &str = "minne-tests";
const DEFAULT_TEST_S3_ENDPOINT: &str = "http://127.0.0.1:19000";
fn configured_test_s3_bucket() -> String {
std::env::var("MINNE_TEST_S3_BUCKET")
.ok()
.filter(|value| !value.trim().is_empty())
.or_else(|| {
std::env::var("S3_BUCKET")
.ok()
.filter(|value| !value.trim().is_empty())
})
.unwrap_or_else(|| DEFAULT_TEST_S3_BUCKET.to_string())
}
fn configured_test_s3_endpoint() -> String {
std::env::var("MINNE_TEST_S3_ENDPOINT")
.ok()
.filter(|value| !value.trim().is_empty())
.or_else(|| {
std::env::var("S3_ENDPOINT")
.ok()
.filter(|value| !value.trim().is_empty())
})
.unwrap_or_else(|| DEFAULT_TEST_S3_ENDPOINT.to_string())
}
/// Create a test configuration with memory storage.
///
/// This provides a ready-to-use configuration for testing scenarios
/// that don't require filesystem persistence.
pub fn test_config_memory() -> AppConfig {
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: "/tmp/unused".into(), // Ignored for memory storage
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::Memory,
pdf_ingest_mode: PdfIngestMode::LlmFirst,
..Default::default()
}
}
/// Create a test configuration with local storage.
///
/// This provides a ready-to-use configuration for testing scenarios
/// that require actual filesystem operations.
pub fn test_config_local() -> AppConfig {
let base = format!("/tmp/minne_test_storage_{}", uuid::Uuid::new_v4());
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: base,
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::Local,
pdf_ingest_mode: PdfIngestMode::LlmFirst,
..Default::default()
}
}
/// Create a test configuration with S3 storage (MinIO).
///
/// Uses `MINNE_TEST_S3_ENDPOINT` / `S3_ENDPOINT` and
/// `MINNE_TEST_S3_BUCKET` / `S3_BUCKET` when provided.
pub fn test_config_s3() -> AppConfig {
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: "/tmp/unused".into(),
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::S3,
s3_bucket: Some(configured_test_s3_bucket()),
s3_endpoint: Some(configured_test_s3_endpoint()),
s3_region: "us-east-1".into(),
pdf_ingest_mode: PdfIngestMode::LlmFirst,
..Default::default()
}
}
/// A specialized StorageManager for testing scenarios.
///
/// This provides automatic setup for memory storage with proper isolation
/// and cleanup capabilities for test environments.
#[derive(Clone)]
pub struct TestStorageManager {
storage: StorageManager,
temp_dir: Option<(String, std::path::PathBuf)>, // For local storage cleanup
}
impl TestStorageManager {
/// Create a new TestStorageManager with memory backend.
///
/// This is the preferred method for unit tests as it provides
/// fast execution and complete isolation.
pub async fn new_memory() -> object_store::Result<Self> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg).await?;
Ok(Self {
storage,
temp_dir: None,
})
}
/// Create a new TestStorageManager with local filesystem backend.
///
/// This method creates a temporary directory that will be automatically
/// cleaned up when the TestStorageManager is dropped.
pub async fn new_local() -> object_store::Result<Self> {
let cfg = test_config_local();
let storage = StorageManager::new(&cfg).await?;
let resolved = storage
.local_base_path()
.map(|path| (cfg.data_dir.clone(), path.to_path_buf()));
Ok(Self {
storage,
temp_dir: resolved,
})
}
/// Create a new TestStorageManager with S3 backend (MinIO).
///
/// This requires a reachable MinIO endpoint and an existing test bucket.
pub async fn new_s3() -> object_store::Result<Self> {
// Ensure credentials are set for MinIO
// We set these env vars for the process, which AmazonS3Builder will pick up
std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
std::env::set_var("AWS_REGION", "us-east-1");
let cfg = test_config_s3();
let storage = StorageManager::new(&cfg).await?;
// Probe the bucket so tests can cleanly skip when the endpoint is unreachable
// or the test bucket is not provisioned.
let probe_prefix = format!("__minne_s3_probe__/{}", uuid::Uuid::new_v4());
storage.list(Some(&probe_prefix)).await?;
Ok(Self {
storage,
temp_dir: None,
})
}
/// Create a TestStorageManager with custom configuration.
pub async fn with_config(cfg: &AppConfig) -> object_store::Result<Self> {
let storage = StorageManager::new(cfg).await?;
let temp_dir = if matches!(cfg.storage, StorageKind::Local) {
storage
.local_base_path()
.map(|path| (cfg.data_dir.clone(), path.to_path_buf()))
} else {
None
};
Ok(Self {
storage,
temp_dir,
})
}
/// Get a reference to the underlying StorageManager.
pub fn storage(&self) -> &StorageManager {
&self.storage
}
/// Clone the underlying StorageManager.
pub fn clone_storage(&self) -> StorageManager {
self.storage.clone()
}
/// Store test data at the specified location.
pub async fn put(&self, location: &str, data: &[u8]) -> object_store::Result<()> {
self.storage.put(location, Bytes::from(data.to_vec())).await
}
/// Retrieve test data from the specified location.
pub async fn get(&self, location: &str) -> object_store::Result<Bytes> {
self.storage.get(location).await
}
/// Get a streaming handle for test data.
pub async fn get_stream(
&self,
location: &str,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
self.storage.get_stream(location).await
}
/// Delete test data below the specified prefix.
pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> {
self.storage.delete_prefix(prefix).await
}
/// Check if test data exists at the specified location.
pub async fn exists(&self, location: &str) -> object_store::Result<bool> {
self.storage.exists(location).await
}
/// List all test objects below the specified prefix.
pub async fn list(
&self,
prefix: Option<&str>,
) -> object_store::Result<Vec<object_store::ObjectMeta>> {
self.storage.list(prefix).await
}
}
impl Drop for TestStorageManager {
fn drop(&mut self) {
// Clean up temporary directories for local storage
if let Some((_, path)) = &self.temp_dir {
if path.exists() {
let _ = std::fs::remove_dir_all(path);
}
}
}
}
/// Convenience macro for creating memory storage tests.
///
/// This macro simplifies the creation of test storage with memory backend.
#[macro_export]
macro_rules! test_storage_memory {
() => {{
async move {
$crate::storage::store::testing::TestStorageManager::new_memory()
.await
.expect("Failed to create test memory storage")
}
}};
}
/// Convenience macro for creating local storage tests.
///
/// This macro simplifies the creation of test storage with local filesystem backend.
#[macro_export]
macro_rules! test_storage_local {
() => {{
async move {
$crate::storage::store::testing::TestStorageManager::new_local()
.await
.expect("Failed to create test local storage")
}
}};
}
}
/// Resolve the absolute base directory used for local storage from config.
///
/// If `data_dir` is relative, it is resolved against the current working directory.
pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf {
if cfg.data_dir.starts_with('/') {
PathBuf::from(&cfg.data_dir)
} else {
std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(&cfg.data_dir)
}
}
/// Split an absolute filesystem path into `(parent_dir, file_name)`.
pub fn split_abs_path(path: &str) -> AnyResult<(PathBuf, String)> {
let pb = PathBuf::from(path);
let parent = pb
.parent()
.ok_or_else(|| anyhow!("Path has no parent: {path}"))?
.to_path_buf();
let file = pb
.file_name()
.ok_or_else(|| anyhow!("Path has no file name: {path}"))?
.to_string_lossy()
.to_string();
Ok((parent, file))
}
/// Split a logical object location `"a/b/c"` into `("a/b", "c")`.
pub fn split_object_path(path: &str) -> AnyResult<(String, String)> {
if let Some((p, f)) = path.rsplit_once('/') {
return Ok((p.to_string(), f.to_string()));
}
Err(anyhow!("Object path has no separator: {path}"))
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Context;
use crate::utils::config::{PdfIngestMode::LlmFirst, StorageKind};
use bytes::Bytes;
use uuid::Uuid;
fn test_config(root: &str) -> AppConfig {
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: root.into(),
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::Local,
pdf_ingest_mode: LlmFirst,
..Default::default()
}
}
fn test_config_memory() -> AppConfig {
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: "/tmp/unused".into(), // Ignored for memory storage
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::Memory,
pdf_ingest_mode: LlmFirst,
..Default::default()
}
}
#[tokio::test]
async fn test_storage_manager_memory_basic_operations() -> anyhow::Result<()> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
assert!(storage.local_base_path().is_none());
let location = "test/data/file.txt";
let data = b"test data for storage manager";
// Test put and get
storage
.put(location, Bytes::from(data.to_vec()))
.await
.with_context(|| "put".to_string())?;
let retrieved = storage.get(location).await.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
// Test exists
assert!(storage.exists(location).await.with_context(|| "exists check".to_string())?);
// Test delete
storage.delete_prefix("test/data/").await.with_context(|| "delete".to_string())?;
assert!(!storage
.exists(location)
.await
.with_context(|| "exists check after delete".to_string())?);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_local_basic_operations() -> anyhow::Result<()> {
let base = format!("/tmp/minne_storage_test_{}", Uuid::new_v4());
let cfg = test_config(&base);
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
let resolved_base = storage
.local_base_path()
.with_context(|| "resolved base dir".to_string())?
.to_path_buf();
assert_eq!(resolved_base, PathBuf::from(&base));
let location = "test/data/file.txt";
let data = b"test data for local storage";
// Test put and get
storage
.put(location, Bytes::from(data.to_vec()))
.await
.with_context(|| "put".to_string())?;
let retrieved = storage.get(location).await.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
let object_dir = resolved_base.join("test/data");
tokio::fs::metadata(&object_dir)
.await
.with_context(|| "object directory exists after write".to_string())?;
// Test exists
assert!(storage.exists(location).await.with_context(|| "exists check".to_string())?);
// Test delete
storage.delete_prefix("test/data/").await.with_context(|| "delete".to_string())?;
assert!(!storage
.exists(location)
.await
.with_context(|| "exists check after delete".to_string())?);
assert!(
tokio::fs::metadata(&object_dir).await.is_err(),
"object directory should be removed"
);
tokio::fs::metadata(&resolved_base)
.await
.with_context(|| "base directory remains intact".to_string())?;
// Clean up
let _ = tokio::fs::remove_dir_all(&base).await;
Ok(())
}
#[tokio::test]
async fn test_storage_manager_memory_persistence() -> anyhow::Result<()> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
let location = "persistence/test.txt";
let data1 = b"first data";
let data2 = b"second data";
// Put first data
storage
.put(location, Bytes::from(data1.to_vec()))
.await
.with_context(|| "put first".to_string())?;
// Retrieve and verify first data
let retrieved1 = storage.get(location).await.with_context(|| "get first".to_string())?;
assert_eq!(retrieved1.as_ref(), data1);
// Overwrite with second data
storage
.put(location, Bytes::from(data2.to_vec()))
.await
.with_context(|| "put second".to_string())?;
// Retrieve and verify second data
let retrieved2 = storage.get(location).await.with_context(|| "get second".to_string())?;
assert_eq!(retrieved2.as_ref(), data2);
// Data persists across multiple operations using the same StorageManager
assert_ne!(retrieved1.as_ref(), retrieved2.as_ref());
Ok(())
}
#[tokio::test]
async fn test_storage_manager_list_operations() -> anyhow::Result<()> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
// Create multiple files
let files = vec![
("dir1/file1.txt", b"content1"),
("dir1/file2.txt", b"content2"),
("dir2/file3.txt", b"content3"),
];
for (location, data) in &files {
storage
.put(location, Bytes::from(data.to_vec()))
.await
.with_context(|| "put".to_string())?;
}
// Test listing without prefix
let all_files = storage.list(None).await.with_context(|| "list all".to_string())?;
assert_eq!(all_files.len(), 3);
// Test listing with prefix
let dir1_files = storage.list(Some("dir1/")).await.with_context(|| "list dir1".to_string())?;
assert_eq!(dir1_files.len(), 2);
assert!(dir1_files
.iter()
.any(|meta| meta.location.as_ref().contains("file1.txt")));
assert!(dir1_files
.iter()
.any(|meta| meta.location.as_ref().contains("file2.txt")));
// Test listing non-existent prefix
let empty_files = storage
.list(Some("nonexistent/"))
.await
.with_context(|| "list nonexistent".to_string())?;
assert_eq!(empty_files.len(), 0);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_stream_operations() -> anyhow::Result<()> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
let location = "stream/test.bin";
let content = vec![42u8; 1024 * 64]; // 64KB of data
// Put large data
storage
.put(location, Bytes::from(content.clone()))
.await
.with_context(|| "put large data".to_string())?;
// Get as stream
let mut stream = storage.get_stream(location).await.with_context(|| "get stream".to_string())?;
let mut collected = Vec::new();
while let Some(chunk) = stream.next().await {
let chunk = chunk.with_context(|| "stream chunk".to_string())?;
collected.extend_from_slice(&chunk);
}
assert_eq!(collected, content);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_with_custom_backend() -> anyhow::Result<()> {
use object_store::memory::InMemory;
// Create custom memory backend
let custom_store = InMemory::new();
let storage = StorageManager::with_backend(Arc::new(custom_store), StorageKind::Memory);
let location = "custom/test.txt";
let data = b"custom backend test";
// Test operations with custom backend
storage
.put(location, Bytes::from(data.to_vec()))
.await
.with_context(|| "put".to_string())?;
let retrieved = storage.get(location).await.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
assert!(storage.exists(location).await.with_context(|| "exists".to_string())?);
assert_eq!(*storage.backend_kind(), StorageKind::Memory);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_error_handling() -> anyhow::Result<()> {
let cfg = test_config_memory();
let storage = StorageManager::new(&cfg)
.await
.with_context(|| "create storage manager".to_string())?;
// Test getting non-existent file
let result = storage.get("nonexistent.txt").await;
assert!(result.is_err());
// Test checking existence of non-existent file
let exists = storage
.exists("nonexistent.txt")
.await
.with_context(|| "exists check".to_string())?;
assert!(!exists);
// Test listing with invalid location (should not panic)
let _result = storage.get("").await;
// This may or may not error depending on the backend implementation
// The important thing is that it doesn't panic
Ok(())
}
// TestStorageManager tests
#[tokio::test]
async fn test_test_storage_manager_memory() -> anyhow::Result<()> {
let test_storage = testing::TestStorageManager::new_memory()
.await
.with_context(|| "create test storage".to_string())?;
let location = "test/storage/file.txt";
let data = b"test data with TestStorageManager";
// Test put and get
test_storage.put(location, data).await.with_context(|| "put".to_string())?;
let retrieved = test_storage.get(location).await.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
// Test existence check
assert!(test_storage.exists(location).await.with_context(|| "exists".to_string())?);
// Test list
let files = test_storage
.list(Some("test/storage/"))
.await
.with_context(|| "list".to_string())?;
assert_eq!(files.len(), 1);
// Test delete
test_storage
.delete_prefix("test/storage/")
.await
.with_context(|| "delete".to_string())?;
assert!(!test_storage
.exists(location)
.await
.with_context(|| "exists after delete".to_string())?);
Ok(())
}
#[tokio::test]
async fn test_test_storage_manager_local() -> anyhow::Result<()> {
let test_storage = testing::TestStorageManager::new_local()
.await
.with_context(|| "create test storage".to_string())?;
let location = "test/local/file.txt";
let data = b"test data with local TestStorageManager";
test_storage.put(location, data).await
.with_context(|| "put".to_string())?;
let retrieved = test_storage.get(location).await
.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
assert!(test_storage.exists(location).await
.with_context(|| "exists".to_string())?);
Ok(())
}
#[tokio::test]
async fn test_test_storage_manager_isolation() -> anyhow::Result<()> {
let storage1 = testing::TestStorageManager::new_memory()
.await
.with_context(|| "create test storage 1".to_string())?;
let storage2 = testing::TestStorageManager::new_memory()
.await
.with_context(|| "create test storage 2".to_string())?;
let location = "isolation/test.txt";
let data1 = b"storage 1 data";
let data2 = b"storage 2 data";
storage1.put(location, data1).await
.with_context(|| "put storage 1".to_string())?;
storage2.put(location, data2).await
.with_context(|| "put storage 2".to_string())?;
let retrieved1 = storage1.get(location).await
.with_context(|| "get storage 1".to_string())?;
let retrieved2 = storage2.get(location).await
.with_context(|| "get storage 2".to_string())?;
assert_eq!(retrieved1.as_ref(), data1);
assert_eq!(retrieved2.as_ref(), data2);
assert_ne!(retrieved1.as_ref(), retrieved2.as_ref());
Ok(())
}
#[tokio::test]
async fn test_test_storage_manager_config() -> anyhow::Result<()> {
let cfg = testing::test_config_memory();
let test_storage = testing::TestStorageManager::with_config(&cfg)
.await
.with_context(|| "create test storage with config".to_string())?;
let location = "config/test.txt";
let data = b"test data with custom config";
test_storage.put(location, data).await
.with_context(|| "put".to_string())?;
let retrieved = test_storage.get(location).await
.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
assert_eq!(*test_storage.storage().backend_kind(), StorageKind::Memory);
Ok(())
}
// S3 Tests - Require a reachable MinIO endpoint and test bucket.
// `TestStorageManager::new_s3()` probes connectivity and these tests auto-skip when unavailable.
#[tokio::test]
async fn test_storage_manager_s3_basic_operations() -> anyhow::Result<()> {
// Skip if S3 connection fails (e.g. no MinIO)
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
eprintln!("Skipping S3 test (setup failed)");
return Ok(());
};
let prefix = format!("test-basic-{}", Uuid::new_v4());
let location = format!("{prefix}/file.txt");
let data = b"test data for S3";
// Test put
if let Err(e) = storage.put(&location, data).await {
eprintln!("Skipping S3 test (put failed - bucket missing?): {e}");
return Ok(());
}
// Test get
let retrieved = storage.get(&location).await.with_context(|| "get".to_string())?;
assert_eq!(retrieved.as_ref(), data);
// Test exists
assert!(storage.exists(&location).await.with_context(|| "exists".to_string())?);
// Test delete
storage
.delete_prefix(&format!("{prefix}/"))
.await
.with_context(|| "delete".to_string())?;
assert!(!storage
.exists(&location)
.await
.with_context(|| "exists after delete".to_string())?);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_s3_list_operations() -> anyhow::Result<()> {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return Ok(());
};
let prefix = format!("test-list-{}", Uuid::new_v4());
let files = vec![
(format!("{prefix}/file1.txt"), b"content1"),
(format!("{prefix}/file2.txt"), b"content2"),
(format!("{prefix}/sub/file3.txt"), b"content3"),
];
for (loc, data) in &files {
if storage.put(loc, *data).await.is_err() {
return Ok(()); // Abort if put fails
}
}
// List with prefix
let list_prefix = format!("{prefix}/");
let items = storage.list(Some(&list_prefix)).await.with_context(|| "list".to_string())?;
assert_eq!(items.len(), 3);
// Cleanup
storage.delete_prefix(&list_prefix).await.with_context(|| "cleanup".to_string())?;
Ok(())
}
#[tokio::test]
async fn test_storage_manager_s3_stream_operations() -> anyhow::Result<()> {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return Ok(());
};
let prefix = format!("test-stream-{}", Uuid::new_v4());
let location = format!("{prefix}/large.bin");
let content = vec![42u8; 1024 * 10]; // 10KB
if storage.put(&location, &content).await.is_err() {
return Ok(());
}
let mut stream = storage.get_stream(&location).await.with_context(|| "get stream".to_string())?;
let mut collected = Vec::new();
while let Some(chunk) = stream.next().await {
collected.extend_from_slice(&chunk.with_context(|| "chunk".to_string())?);
}
assert_eq!(collected, content);
storage
.delete_prefix(&format!("{prefix}/"))
.await
.with_context(|| "cleanup".to_string())?;
Ok(())
}
#[tokio::test]
async fn test_storage_manager_s3_backend_kind() -> anyhow::Result<()> {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return Ok(());
};
assert_eq!(*storage.storage().backend_kind(), StorageKind::S3);
Ok(())
}
#[tokio::test]
async fn test_storage_manager_s3_error_handling() -> anyhow::Result<()> {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return Ok(());
};
let location = format!("nonexistent-{}/file.txt", Uuid::new_v4());
assert!(storage.get(&location).await.is_err());
// exists may fail if S3 is unavailable; treat error as false
assert!(!storage.exists(&location).await.unwrap_or(false));
Ok(())
}
}