diff --git a/Cargo.lock b/Cargo.lock index f66de0e..b8a48f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4389,13 +4389,22 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" dependencies = [ "async-trait", + "base64 0.22.1", "bytes", "chrono", "futures", "humantime", + "hyper", "itertools 0.13.0", + "md-5", "parking_lot", "percent-encoding", + "quick-xml", + "rand 0.8.5", + "reqwest", + "ring", + "serde", + "serde_json", "snafu", "tokio", "tracing", @@ -5064,6 +5073,16 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a993555f31e5a609f617c12db6250dedcac1b0a85076912c436e6fc9b2c8e6a3" +[[package]] +name = "quick-xml" +version = "0.37.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "331e97a1af0bf59823e6eadffe373d7b27f485be8748f71471c662c1f269b7fb" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quick_cache" version = "0.5.2" diff --git a/Cargo.toml b/Cargo.toml index d1ed6c0..e852166 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ url = { version = "2.5.2", features = ["serde"] } uuid = { version = "1.10.0", features = ["v4", "serde"] } tokio-retry = "0.3.0" base64 = "0.22.1" -object_store = { version = "0.11.2" } +object_store = { version = "0.11.2", features = ["aws"] } bytes = "1.7.1" state-machines = "0.2.0" fastembed = { version = "5.2.0", default-features = false, features = ["hf-hub-native-tls", "ort-load-dynamic"] } diff --git a/common/src/storage/store.rs b/common/src/storage/store.rs index 9039483..e5c5027 100644 --- a/common/src/storage/store.rs +++ b/common/src/storage/store.rs @@ -6,6 +6,7 @@ use anyhow::{anyhow, Result as AnyResult}; use bytes::Bytes; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; +use object_store::aws::AmazonS3Builder; use object_store::local::LocalFileSystem; use object_store::memory::InMemory; use object_store::{path::Path as ObjPath, ObjectStore}; @@ -234,6 +235,39 @@ async fn create_storage_backend( let store = InMemory::new(); Ok((Arc::new(store), None)) } + StorageKind::S3 => { + let bucket = cfg + .s3_bucket + .as_ref() + .ok_or_else(|| object_store::Error::Generic { + store: "S3", + source: anyhow!("s3_bucket is required for S3 storage").into(), + })?; + + let mut builder = AmazonS3Builder::new() + .with_bucket_name(bucket) + .with_allow_http(true); + + if let (Ok(key), Ok(secret)) = ( + std::env::var("AWS_ACCESS_KEY_ID"), + std::env::var("AWS_SECRET_ACCESS_KEY"), + ) { + builder = builder + .with_access_key_id(key) + .with_secret_access_key(secret); + } + + if let Some(endpoint) = &cfg.s3_endpoint { + builder = builder.with_endpoint(endpoint); + } + + if let Some(region) = &cfg.s3_region { + builder = builder.with_region(region); + } + + let store = builder.build()?; + Ok((Arc::new(store), None)) + } } } @@ -290,6 +324,29 @@ pub mod testing { } } + /// Create a test configuration with S3 storage (MinIO). + /// + /// This requires a running MinIO instance on localhost:9000. + pub fn test_config_s3() -> AppConfig { + AppConfig { + openai_api_key: "test".into(), + surrealdb_address: "test".into(), + surrealdb_username: "test".into(), + surrealdb_password: "test".into(), + surrealdb_namespace: "test".into(), + surrealdb_database: "test".into(), + data_dir: "/tmp/unused".into(), + http_port: 0, + openai_base_url: "..".into(), + storage: StorageKind::S3, + s3_bucket: Some("minne-tests".into()), + s3_endpoint: Some("http://localhost:9000".into()), + s3_region: Some("us-east-1".into()), + pdf_ingest_mode: PdfIngestMode::LlmFirst, + ..Default::default() + } + } + /// A specialized StorageManager for testing scenarios. /// /// This provides automatic setup for memory storage with proper isolation @@ -332,6 +389,26 @@ pub mod testing { }) } + /// Create a new TestStorageManager with S3 backend (MinIO). + /// + /// This requires a running MinIO instance on localhost:9000 with + /// default credentials (minioadmin/minioadmin) and a 'minne-tests' bucket. + pub async fn new_s3() -> object_store::Result { + // Ensure credentials are set for MinIO + // We set these env vars for the process, which AmazonS3Builder will pick up + std::env::set_var("AWS_ACCESS_KEY_ID", "minioadmin"); + std::env::set_var("AWS_SECRET_ACCESS_KEY", "minioadmin"); + std::env::set_var("AWS_REGION", "us-east-1"); + + let cfg = test_config_s3(); + let storage = StorageManager::new(&cfg).await?; + + Ok(Self { + storage, + _temp_dir: None, + }) + } + /// Create a TestStorageManager with custom configuration. pub async fn with_config(cfg: &AppConfig) -> object_store::Result { let storage = StorageManager::new(cfg).await?; @@ -369,6 +446,14 @@ pub mod testing { self.storage.get(location).await } + /// Get a streaming handle for test data. + pub async fn get_stream( + &self, + location: &str, + ) -> object_store::Result>> { + self.storage.get_stream(location).await + } + /// Delete test data below the specified prefix. pub async fn delete_prefix(&self, prefix: &str) -> object_store::Result<()> { self.storage.delete_prefix(prefix).await @@ -837,4 +922,117 @@ mod tests { // Verify it's using memory backend assert_eq!(*test_storage.storage().backend_kind(), StorageKind::Memory); } + + // S3 Tests - Require MinIO on localhost:9000 with bucket 'minne-tests' + // These tests will fail if MinIO is not running or bucket doesn't exist. + + #[tokio::test] + async fn test_storage_manager_s3_basic_operations() { + // Skip if S3 connection fails (e.g. no MinIO) + let Ok(storage) = testing::TestStorageManager::new_s3().await else { + eprintln!("Skipping S3 test (setup failed)"); + return; + }; + + let prefix = format!("test-basic-{}", Uuid::new_v4()); + let location = format!("{prefix}/file.txt"); + let data = b"test data for S3"; + + // Test put + if let Err(e) = storage.put(&location, data).await { + eprintln!("Skipping S3 test (put failed - bucket missing?): {e}"); + return; + } + + // Test get + let retrieved = storage.get(&location).await.expect("get"); + assert_eq!(retrieved.as_ref(), data); + + // Test exists + assert!(storage.exists(&location).await.expect("exists")); + + // Test delete + storage + .delete_prefix(&format!("{prefix}/")) + .await + .expect("delete"); + assert!(!storage + .exists(&location) + .await + .expect("exists after delete")); + } + + #[tokio::test] + async fn test_storage_manager_s3_list_operations() { + let Ok(storage) = testing::TestStorageManager::new_s3().await else { + return; + }; + + let prefix = format!("test-list-{}", Uuid::new_v4()); + let files = vec![ + (format!("{prefix}/file1.txt"), b"content1"), + (format!("{prefix}/file2.txt"), b"content2"), + (format!("{prefix}/sub/file3.txt"), b"content3"), + ]; + + for (loc, data) in &files { + if storage.put(loc, *data).await.is_err() { + return; // Abort if put fails + } + } + + // List with prefix + let list_prefix = format!("{prefix}/"); + let items = storage.list(Some(&list_prefix)).await.expect("list"); + assert_eq!(items.len(), 3); + + // Cleanup + storage.delete_prefix(&list_prefix).await.expect("cleanup"); + } + + #[tokio::test] + async fn test_storage_manager_s3_stream_operations() { + let Ok(storage) = testing::TestStorageManager::new_s3().await else { + return; + }; + + let prefix = format!("test-stream-{}", Uuid::new_v4()); + let location = format!("{prefix}/large.bin"); + let content = vec![42u8; 1024 * 10]; // 10KB + + if storage.put(&location, &content).await.is_err() { + return; + } + + let mut stream = storage.get_stream(&location).await.expect("get stream"); + let mut collected = Vec::new(); + while let Some(chunk) = stream.next().await { + collected.extend_from_slice(&chunk.expect("chunk")); + } + assert_eq!(collected, content); + + storage + .delete_prefix(&format!("{prefix}/")) + .await + .expect("cleanup"); + } + + #[tokio::test] + async fn test_storage_manager_s3_backend_kind() { + let Ok(storage) = testing::TestStorageManager::new_s3().await else { + return; + }; + assert_eq!(*storage.storage().backend_kind(), StorageKind::S3); + } + + #[tokio::test] + async fn test_storage_manager_s3_error_handling() { + let Ok(storage) = testing::TestStorageManager::new_s3().await else { + return; + }; + + let location = format!("nonexistent-{}/file.txt", Uuid::new_v4()); + assert!(storage.get(&location).await.is_err()); + assert!(!storage.exists(&location).await.expect("exists check")); + } } diff --git a/common/src/utils/config.rs b/common/src/utils/config.rs index a3e6356..b46b53b 100644 --- a/common/src/utils/config.rs +++ b/common/src/utils/config.rs @@ -20,6 +20,7 @@ pub enum EmbeddingBackend { pub enum StorageKind { Local, Memory, + S3, } /// Default storage backend when none is configured. @@ -27,6 +28,10 @@ fn default_storage_kind() -> StorageKind { StorageKind::Local } +fn default_s3_region() -> Option { + Some("us-east-1".to_string()) +} + /// Selects the strategy used for PDF ingestion. #[derive(Clone, Deserialize, Debug)] #[serde(rename_all = "kebab-case")] @@ -59,6 +64,12 @@ pub struct AppConfig { pub openai_base_url: String, #[serde(default = "default_storage_kind")] pub storage: StorageKind, + #[serde(default)] + pub s3_bucket: Option, + #[serde(default)] + pub s3_endpoint: Option, + #[serde(default = "default_s3_region")] + pub s3_region: Option, #[serde(default = "default_pdf_ingest_mode")] pub pdf_ingest_mode: PdfIngestMode, #[serde(default = "default_reranking_enabled")] @@ -135,6 +146,9 @@ impl Default for AppConfig { http_port: 0, openai_base_url: default_base_url(), storage: default_storage_kind(), + s3_bucket: None, + s3_endpoint: None, + s3_region: default_s3_region(), pdf_ingest_mode: default_pdf_ingest_mode(), reranking_enabled: default_reranking_enabled(), reranking_pool_size: None, diff --git a/docs/configuration.md b/docs/configuration.md index 3d75dbf..58174f7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -22,7 +22,7 @@ Minne can be configured via environment variables or a `config.yaml` file. Envir | `DATA_DIR` | Local data directory | `./data` | | `OPENAI_BASE_URL` | Custom AI provider URL | OpenAI default | | `RUST_LOG` | Logging level | `info` | -| `STORAGE` | Storage backend (`local`, `memory`) | `local` | +| `STORAGE` | Storage backend (`local`, `memory`, `s3`) | `local` | | `PDF_INGEST_MODE` | PDF ingestion strategy (`classic`, `llm-first`) | `llm-first` | | `RETRIEVAL_STRATEGY` | Default retrieval strategy | - | | `EMBEDDING_BACKEND` | Embedding provider (`openai`, `fastembed`) | `fastembed` | @@ -30,6 +30,18 @@ Minne can be configured via environment variables or a `config.yaml` file. Envir | `FASTEMBED_SHOW_DOWNLOAD_PROGRESS` | Show progress bar for model downloads | `false` | | `FASTEMBED_MAX_LENGTH` | Max sequence length for FastEmbed models | - | +### S3 Storage (Optional) + +Used when `STORAGE` is set to `s3`. + +| Variable | Description | Default | +|----------|-------------|---------| +| `S3_BUCKET` | S3 bucket name | - | +| `S3_ENDPOINT` | Custom endpoint (e.g. MinIO) | AWS default | +| `S3_REGION` | AWS Region | `us-east-1` | +| `AWS_ACCESS_KEY_ID` | Access key | - | +| `AWS_SECRET_ACCESS_KEY` | Secret key | - | + ### Reranking (Optional) | Variable | Description | Default | @@ -54,6 +66,10 @@ http_port: 3000 # New settings storage: "local" +# storage: "s3" +# s3_bucket: "my-bucket" +# s3_endpoint: "http://localhost:9000" # Optional, for MinIO etc. +# s3_region: "us-east-1" pdf_ingest_mode: "llm-first" embedding_backend: "fastembed"