feat: s3 storage backend

This commit is contained in:
Per Stark
2026-01-16 23:38:47 +01:00
parent b25cfb4633
commit fa7f407306
5 changed files with 249 additions and 2 deletions

View File

@@ -6,6 +6,7 @@ use anyhow::{anyhow, Result as AnyResult};
use bytes::Bytes;
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::aws::AmazonS3Builder;
use object_store::local::LocalFileSystem;
use object_store::memory::InMemory;
use object_store::{path::Path as ObjPath, ObjectStore};
@@ -234,6 +235,39 @@ async fn create_storage_backend(
let store = InMemory::new();
Ok((Arc::new(store), None))
}
StorageKind::S3 => {
let bucket = cfg
.s3_bucket
.as_ref()
.ok_or_else(|| object_store::Error::Generic {
store: "S3",
source: anyhow!("s3_bucket is required for S3 storage").into(),
})?;
let mut builder = AmazonS3Builder::new()
.with_bucket_name(bucket)
.with_allow_http(true);
if let (Ok(key), Ok(secret)) = (
std::env::var("AWS_ACCESS_KEY_ID"),
std::env::var("AWS_SECRET_ACCESS_KEY"),
) {
builder = builder
.with_access_key_id(key)
.with_secret_access_key(secret);
}
if let Some(endpoint) = &cfg.s3_endpoint {
builder = builder.with_endpoint(endpoint);
}
if let Some(region) = &cfg.s3_region {
builder = builder.with_region(region);
}
let store = builder.build()?;
Ok((Arc::new(store), None))
}
}
}
@@ -290,6 +324,29 @@ pub mod testing {
}
}
/// Create a test configuration with S3 storage (MinIO).
///
/// This requires a running MinIO instance on localhost:9000.
pub fn test_config_s3() -> AppConfig {
AppConfig {
openai_api_key: "test".into(),
surrealdb_address: "test".into(),
surrealdb_username: "test".into(),
surrealdb_password: "test".into(),
surrealdb_namespace: "test".into(),
surrealdb_database: "test".into(),
data_dir: "/tmp/unused".into(),
http_port: 0,
openai_base_url: "..".into(),
storage: StorageKind::S3,
s3_bucket: Some("minne-tests".into()),
s3_endpoint: Some("http://localhost:9000".into()),
s3_region: Some("us-east-1".into()),
pdf_ingest_mode: PdfIngestMode::LlmFirst,
..Default::default()
}
}
/// A specialized StorageManager for testing scenarios.
///
/// This provides automatic setup for memory storage with proper isolation
@@ -332,6 +389,26 @@ pub mod testing {
})
}
/// Create a new TestStorageManager with S3 backend (MinIO).
///
/// This requires a running MinIO instance on localhost:9000 with
/// default credentials (minioadmin/minioadmin) and a 'minne-tests' bucket.
pub async fn new_s3() -> object_store::Result<Self> {
// Ensure credentials are set for MinIO
// We set these env vars for the process, which AmazonS3Builder will pick up
std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin");
std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin");
std::env::set_var("AWS_REGION", "us-east-1");
let cfg = test_config_s3();
let storage = StorageManager::new(&cfg).await?;
Ok(Self {
storage,
_temp_dir: None,
})
}
/// Create a TestStorageManager with custom configuration.
pub async fn with_config(cfg: &AppConfig) -> object_store::Result<Self> {
let storage = StorageManager::new(cfg).await?;
@@ -369,6 +446,14 @@ pub mod testing {
self.storage.get(location).await
}
/// Get a streaming handle for test data.
pub async fn get_stream(
&self,
location: &str,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
self.storage.get_stream(location).await
}
/// Delete test data below the specified prefix.
pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> {
self.storage.delete_prefix(prefix).await
@@ -837,4 +922,117 @@ mod tests {
// Verify it's using memory backend
assert_eq!(*test_storage.storage().backend_kind(), StorageKind::Memory);
}
// S3 Tests - Require MinIO on localhost:9000 with bucket 'minne-tests'
// These tests will fail if MinIO is not running or bucket doesn't exist.
#[tokio::test]
async fn test_storage_manager_s3_basic_operations() {
// Skip if S3 connection fails (e.g. no MinIO)
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
eprintln!("Skipping S3 test (setup failed)");
return;
};
let prefix = format!("test-basic-{}", Uuid::new_v4());
let location = format!("{prefix}/file.txt");
let data = b"test data for S3";
// Test put
if let Err(e) = storage.put(&location, data).await {
eprintln!("Skipping S3 test (put failed - bucket missing?): {e}");
return;
}
// Test get
let retrieved = storage.get(&location).await.expect("get");
assert_eq!(retrieved.as_ref(), data);
// Test exists
assert!(storage.exists(&location).await.expect("exists"));
// Test delete
storage
.delete_prefix(&format!("{prefix}/"))
.await
.expect("delete");
assert!(!storage
.exists(&location)
.await
.expect("exists after delete"));
}
#[tokio::test]
async fn test_storage_manager_s3_list_operations() {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return;
};
let prefix = format!("test-list-{}", Uuid::new_v4());
let files = vec![
(format!("{prefix}/file1.txt"), b"content1"),
(format!("{prefix}/file2.txt"), b"content2"),
(format!("{prefix}/sub/file3.txt"), b"content3"),
];
for (loc, data) in &files {
if storage.put(loc, *data).await.is_err() {
return; // Abort if put fails
}
}
// List with prefix
let list_prefix = format!("{prefix}/");
let items = storage.list(Some(&list_prefix)).await.expect("list");
assert_eq!(items.len(), 3);
// Cleanup
storage.delete_prefix(&list_prefix).await.expect("cleanup");
}
#[tokio::test]
async fn test_storage_manager_s3_stream_operations() {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return;
};
let prefix = format!("test-stream-{}", Uuid::new_v4());
let location = format!("{prefix}/large.bin");
let content = vec![42u8; 1024 * 10]; // 10KB
if storage.put(&location, &content).await.is_err() {
return;
}
let mut stream = storage.get_stream(&location).await.expect("get stream");
let mut collected = Vec::new();
while let Some(chunk) = stream.next().await {
collected.extend_from_slice(&chunk.expect("chunk"));
}
assert_eq!(collected, content);
storage
.delete_prefix(&format!("{prefix}/"))
.await
.expect("cleanup");
}
#[tokio::test]
async fn test_storage_manager_s3_backend_kind() {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return;
};
assert_eq!(*storage.storage().backend_kind(), StorageKind::S3);
}
#[tokio::test]
async fn test_storage_manager_s3_error_handling() {
let Ok(storage) = testing::TestStorageManager::new_s3().await else {
return;
};
let location = format!("nonexistent-{}/file.txt", Uuid::new_v4());
assert!(storage.get(&location).await.is_err());
assert!(!storage.exists(&location).await.expect("exists check"));
}
}

View File

@@ -20,6 +20,7 @@ pub enum EmbeddingBackend {
pub enum StorageKind {
Local,
Memory,
S3,
}
/// Default storage backend when none is configured.
@@ -27,6 +28,10 @@ fn default_storage_kind() -> StorageKind {
StorageKind::Local
}
fn default_s3_region() -> Option<String> {
Some("us-east-1".to_string())
}
/// Selects the strategy used for PDF ingestion.
#[derive(Clone, Deserialize, Debug)]
#[serde(rename_all = "kebab-case")]
@@ -59,6 +64,12 @@ pub struct AppConfig {
pub openai_base_url: String,
#[serde(default = "default_storage_kind")]
pub storage: StorageKind,
#[serde(default)]
pub s3_bucket: Option<String>,
#[serde(default)]
pub s3_endpoint: Option<String>,
#[serde(default = "default_s3_region")]
pub s3_region: Option<String>,
#[serde(default = "default_pdf_ingest_mode")]
pub pdf_ingest_mode: PdfIngestMode,
#[serde(default = "default_reranking_enabled")]
@@ -135,6 +146,9 @@ impl Default for AppConfig {
http_port: 0,
openai_base_url: default_base_url(),
storage: default_storage_kind(),
s3_bucket: None,
s3_endpoint: None,
s3_region: default_s3_region(),
pdf_ingest_mode: default_pdf_ingest_mode(),
reranking_enabled: default_reranking_enabled(),
reranking_pool_size: None,