diff --git a/Cargo.lock b/Cargo.lock index be5aeb4..1b40f35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1212,6 +1212,7 @@ dependencies = [ "axum_session_auth", "axum_session_surreal", "axum_typed_multipart", + "bytes", "chrono", "chrono-tz", "config", @@ -1224,6 +1225,7 @@ dependencies = [ "minijinja-autoreload", "minijinja-contrib", "minijinja-embed", + "object_store 0.11.2", "reqwest", "serde", "serde_json", @@ -3543,6 +3545,27 @@ dependencies = [ "memchr", ] +[[package]] +name = "object_store" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf" +dependencies = [ + "async-trait", + "bytes", + "chrono", + "futures", + "humantime", + "itertools 0.13.0", + "parking_lot", + "percent-encoding", + "snafu", + "tokio", + "tracing", + "url", + "walkdir", +] + [[package]] name = "object_store" version = "0.12.0" @@ -5042,6 +5065,27 @@ dependencies = [ "serde", ] +[[package]] +name = "snafu" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2" +dependencies = [ + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.101", +] + [[package]] name = "snap" version = "1.1.1" @@ -5271,7 +5315,7 @@ dependencies = [ "ndarray-stats", "num-traits", "num_cpus", - "object_store", + "object_store 0.12.0", "parking_lot", "pbkdf2", "pharos", diff --git a/Cargo.toml b/Cargo.toml index 30cd736..a27b176 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,8 @@ url = { version = "2.5.2", features = ["serde"] } uuid = { version = "1.10.0", features = ["v4", "serde"] } tokio-retry = "0.3.0" base64 = "0.22.1" +object_store = { version = "0.11.2" } +bytes = "1.7.1" [profile.dist] inherits = "release" diff --git a/common/Cargo.toml b/common/Cargo.toml index c0b2329..ba3aead 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -39,6 +39,8 @@ url = { workspace = true } uuid = { workspace = true } surrealdb-migrations = { workspace = true } tokio-retry = { workspace = true } +object_store = { workspace = true } +bytes = { workspace = true } [features] diff --git a/common/src/storage/mod.rs b/common/src/storage/mod.rs index 48a6962..83cf3d4 100644 --- a/common/src/storage/mod.rs +++ b/common/src/storage/mod.rs @@ -1,2 +1,3 @@ pub mod db; pub mod types; +pub mod store; diff --git a/common/src/storage/store.rs b/common/src/storage/store.rs new file mode 100644 index 0000000..003eb34 --- /dev/null +++ b/common/src/storage/store.rs @@ -0,0 +1,256 @@ +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use anyhow::{anyhow, Result as AnyResult}; +use bytes::Bytes; +use futures::{StreamExt, TryStreamExt}; +use futures::stream::BoxStream; +use object_store::local::LocalFileSystem; +use object_store::{path::Path as ObjPath, ObjectStore}; + +use crate::utils::config::{AppConfig, StorageKind}; + +pub type DynStore = Arc; + +/// Build an object store instance anchored at the given filesystem `prefix`. +/// +/// - For the `Local` backend, `prefix` is the absolute directory on disk that +/// serves as the root for all object paths passed to the store. +/// - `prefix` must already exist; this function will create it if missing. +/// +/// Example (Local): +/// - prefix: `/var/data` +/// - object location: `user/uuid/file.txt` +/// - absolute path: `/var/data/user/uuid/file.txt` +pub async fn build_store(prefix: &Path, cfg: &AppConfig) -> object_store::Result { + match cfg.storage { + StorageKind::Local => { + if !prefix.exists() { + tokio::fs::create_dir_all(prefix) + .await + .map_err(|e| object_store::Error::Generic { + store: "LocalFileSystem", + source: e.into(), + })?; + } + let store = LocalFileSystem::new_with_prefix(prefix)?; + Ok(Arc::new(store)) + } + } +} + +/// Resolve the absolute base directory used for local storage from config. +/// +/// If `data_dir` is relative, it is resolved against the current working directory. +pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf { + if cfg.data_dir.starts_with('/') { + PathBuf::from(&cfg.data_dir) + } else { + std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")) + .join(&cfg.data_dir) + } +} + +/// Build an object store rooted at the configured data directory. +/// +/// This is the recommended way to obtain a store for logical object operations +/// such as `put_bytes_at`, `get_bytes_at`, and `delete_prefix_at`. +pub async fn build_store_root(cfg: &AppConfig) -> object_store::Result { + let base = resolve_base_dir(cfg); + build_store(&base, cfg).await +} + +/// Write bytes to `file_name` within a filesystem `prefix` using the configured store. +/// +/// Prefer [`put_bytes_at`] for location-based writes that do not need to compute +/// a separate filesystem prefix. +pub async fn put_bytes(prefix: &Path, file_name: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> { + let store = build_store(prefix, cfg).await?; + let payload = object_store::PutPayload::from_bytes(data); + store.put(&ObjPath::from(file_name), payload).await?; + Ok(()) +} + +/// Write bytes to the provided logical object `location`, e.g. `"user/uuid/file"`. +/// +/// The store root is taken from `AppConfig::data_dir` for the local backend. +/// This performs an atomic write as guaranteed by `object_store`. +pub async fn put_bytes_at(location: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> { + let store = build_store_root(cfg).await?; + let payload = object_store::PutPayload::from_bytes(data); + store.put(&ObjPath::from(location), payload).await?; + Ok(()) +} + +/// Read bytes from `file_name` within a filesystem `prefix` using the configured store. +/// +/// Prefer [`get_bytes_at`] for location-based reads. +pub async fn get_bytes(prefix: &Path, file_name: &str, cfg: &AppConfig) -> object_store::Result { + let store = build_store(prefix, cfg).await?; + let r = store.get(&ObjPath::from(file_name)).await?; + let b = r.bytes().await?; + Ok(b) +} + +/// Read bytes from the provided logical object `location`. +/// +/// Returns the full contents buffered in memory. +pub async fn get_bytes_at(location: &str, cfg: &AppConfig) -> object_store::Result { + let store = build_store_root(cfg).await?; + let r = store.get(&ObjPath::from(location)).await?; + r.bytes().await +} + +/// Get a streaming body for the provided logical object `location`. +/// +/// Returns a fallible `BoxStream` of `Bytes`, suitable for use with +/// `axum::body::Body::from_stream` to stream responses without buffering. +pub async fn get_stream_at(location: &str, cfg: &AppConfig) -> object_store::Result>> { + let store = build_store_root(cfg).await?; + let r = store.get(&ObjPath::from(location)).await?; + Ok(r.into_stream()) +} + +/// Delete all objects below the provided filesystem `prefix`. +/// +/// This is a low-level variant for when a dedicated on-disk prefix is used for a +/// particular object grouping. Prefer [`delete_prefix_at`] for location-based stores. +pub async fn delete_prefix(prefix: &Path, cfg: &AppConfig) -> object_store::Result<()> { + let store = build_store(prefix, cfg).await?; + // list everything and delete + let locations = store.list(None).map_ok(|m| m.location).boxed(); + store.delete_stream(locations).try_collect::>().await?; + // Best effort remove the directory itself + if tokio::fs::try_exists(prefix).await.unwrap_or(false) { + let _ = tokio::fs::remove_dir_all(prefix).await; + } + Ok(()) +} + +/// Delete all objects below the provided logical object `prefix`, e.g. `"user/uuid/"`. +/// +/// After deleting, attempts a best-effort cleanup of the now-empty directory on disk +/// when using the local backend. +pub async fn delete_prefix_at(prefix: &str, cfg: &AppConfig) -> object_store::Result<()> { + let store = build_store_root(cfg).await?; + let prefix_path = ObjPath::from(prefix); + let locations = store.list(Some(&prefix_path)).map_ok(|m| m.location).boxed(); + store.delete_stream(locations).try_collect::>().await?; + // Best effort remove empty directory on disk for local storage + let base_dir = resolve_base_dir(cfg).join(prefix); + if tokio::fs::try_exists(&base_dir).await.unwrap_or(false) { + let _ = tokio::fs::remove_dir_all(&base_dir).await; + } + Ok(()) +} + +/// Split an absolute filesystem path into `(parent_dir, file_name)`. +pub fn split_abs_path(path: &str) -> AnyResult<(PathBuf, String)> { + let pb = PathBuf::from(path); + let parent = pb + .parent() + .ok_or_else(|| anyhow!("Path has no parent: {path}"))? + .to_path_buf(); + let file = pb + .file_name() + .ok_or_else(|| anyhow!("Path has no file name: {path}"))? + .to_string_lossy() + .to_string(); + Ok((parent, file)) +} + +/// Split a logical object location `"a/b/c"` into `("a/b", "c")`. +pub fn split_object_path(path: &str) -> AnyResult<(String, String)> { + if let Some((p, f)) = path.rsplit_once('/') { + return Ok((p.to_string(), f.to_string())); + } + Err(anyhow!("Object path has no separator: {path}")) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::utils::config::StorageKind; + use bytes::Bytes; + use futures::TryStreamExt; + use uuid::Uuid; + + fn test_config(root: &str) -> AppConfig { + AppConfig { + openai_api_key: "test".into(), + surrealdb_address: "test".into(), + surrealdb_username: "test".into(), + surrealdb_password: "test".into(), + surrealdb_namespace: "test".into(), + surrealdb_database: "test".into(), + data_dir: root.into(), + http_port: 0, + openai_base_url: "..".into(), + storage: StorageKind::Local, + } + } + + #[tokio::test] + async fn test_build_store_root_creates_base() { + let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); + let cfg = test_config(&base); + let _ = build_store_root(&cfg).await.expect("build store root"); + assert!(tokio::fs::try_exists(&base).await.unwrap_or(false)); + let _ = tokio::fs::remove_dir_all(&base).await; + } + + #[tokio::test] + async fn test_put_get_bytes_at_and_delete_prefix_at() { + let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); + let cfg = test_config(&base); + + let location_prefix = format!("{}/{}", "user1", Uuid::new_v4()); + let file_name = "file.txt"; + let location = format!("{}/{}", &location_prefix, file_name); + let payload = Bytes::from_static(b"hello world"); + + put_bytes_at(&location, payload.clone(), &cfg).await.expect("put"); + let got = get_bytes_at(&location, &cfg).await.expect("get"); + assert_eq!(got.as_ref(), payload.as_ref()); + + // Delete the whole prefix and ensure retrieval fails + delete_prefix_at(&location_prefix, &cfg).await.expect("delete prefix"); + assert!(get_bytes_at(&location, &cfg).await.is_err()); + + let _ = tokio::fs::remove_dir_all(&base).await; + } + + #[tokio::test] + async fn test_get_stream_at() { + let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4()); + let cfg = test_config(&base); + + let location = format!("{}/{}/stream.bin", "user2", Uuid::new_v4()); + let content = vec![7u8; 32 * 1024]; // 32KB payload + + put_bytes_at(&location, Bytes::from(content.clone()), &cfg) + .await + .expect("put"); + + let stream = get_stream_at(&location, &cfg).await.expect("stream"); + let combined: Vec = stream + .map_ok(|chunk| chunk.to_vec()) + .try_fold(Vec::new(), |mut acc, mut chunk| async move { + acc.append(&mut chunk); + Ok(acc) + }) + .await + .expect("collect"); + + assert_eq!(combined, content); + + delete_prefix_at( + &split_object_path(&location).unwrap().0, + &cfg, + ) + .await + .ok(); + + let _ = tokio::fs::remove_dir_all(&base).await; + } +} diff --git a/common/src/storage/types/file_info.rs b/common/src/storage/types/file_info.rs index b43a08f..e2fdeeb 100644 --- a/common/src/storage/types/file_info.rs +++ b/common/src/storage/types/file_info.rs @@ -3,16 +3,20 @@ use mime_guess::from_path; use sha2::{Digest, Sha256}; use std::{ io::{BufReader, Read}, - path::{Path, PathBuf}, + path::Path, }; use tempfile::NamedTempFile; use thiserror::Error; -use tokio::fs::remove_dir_all; +use object_store::Error as ObjectStoreError; +// futures imports no longer needed here after abstraction use tracing::info; use uuid::Uuid; use crate::{ - error::AppError, storage::db::SurrealDbClient, stored_object, utils::config::AppConfig, + error::AppError, + storage::{db::SurrealDbClient, store}, + stored_object, + utils::config::AppConfig, }; #[derive(Error, Debug)] @@ -34,6 +38,9 @@ pub enum FileError { #[error("File name missing in metadata")] MissingFileName, + + #[error("Object store error: {0}")] + ObjectStore(#[from] ObjectStoreError), } stored_object!(FileInfo, "file", { @@ -84,9 +91,7 @@ impl FileInfo { file_name, sha256, path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config) - .await? - .to_string_lossy() - .into(), + .await?, mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)), user_id: user_id.to_string(), }; @@ -165,7 +170,7 @@ impl FileInfo { } } - /// Persists the file to the filesystem under `{data_dir}/{user_id}/{uuid}/{file_name}`. + /// Persists the file under the logical location `{user_id}/{uuid}/{file_name}`. /// /// # Arguments /// * `uuid` - The UUID of the file. @@ -175,43 +180,24 @@ impl FileInfo { /// * `config` - Application configuration containing data directory path /// /// # Returns - /// * `Result` - The persisted file path or an error. + /// * `Result` - The logical object location or an error. async fn persist_file( uuid: &Uuid, file: NamedTempFile, file_name: &str, user_id: &str, config: &AppConfig, - ) -> Result { - info!("Data dir: {:?}", config.data_dir); - // Convert relative path to absolute path - let base_dir = if config.data_dir.starts_with('/') { - Path::new(&config.data_dir).to_path_buf() - } else { - std::env::current_dir() - .map_err(FileError::Io)? - .join(&config.data_dir) - }; + ) -> Result { + // Logical object location relative to the store root + let location = format!("{}/{}/{}", user_id, uuid, file_name); + info!("Persisting to object location: {}", location); - let user_dir = base_dir.join(user_id); // Create the user directory - let uuid_dir = user_dir.join(uuid.to_string()); // Create the UUID directory under the user directory - - // Create the user and UUID directories if they don't exist - tokio::fs::create_dir_all(&uuid_dir) + let bytes = tokio::fs::read(file.path()).await?; + store::put_bytes_at(&location, bytes.into(), config) .await - .map_err(FileError::Io)?; + .map_err(FileError::from)?; - // Define the final file path - let final_path = uuid_dir.join(file_name); - info!("Final path: {:?}", final_path); - - // Copy the temporary file to the final path - tokio::fs::copy(file.path(), &final_path) - .await - .map_err(FileError::Io)?; - info!("Copied file to {:?}", final_path); - - Ok(final_path) + Ok(location) } /// Retrieves a `FileInfo` by SHA256. @@ -240,7 +226,11 @@ impl FileInfo { /// /// # Returns /// `Result<(), FileError>` - pub async fn delete_by_id(id: &str, db_client: &SurrealDbClient) -> Result<(), AppError> { + pub async fn delete_by_id( + id: &str, + db_client: &SurrealDbClient, + config: &AppConfig, + ) -> Result<(), AppError> { // Get the FileInfo from the database let file_info = match db_client.get_item::(id).await? { Some(info) => info, @@ -252,25 +242,13 @@ impl FileInfo { } }; - // Remove the file and its parent directory - let file_path = Path::new(&file_info.path); - if file_path.exists() { - // Get the parent directory of the file - if let Some(parent_dir) = file_path.parent() { - // Remove the entire directory containing the file - remove_dir_all(parent_dir).await?; - info!("Removed directory {:?} and its contents", parent_dir); - } else { - return Err(AppError::from(FileError::FileNotFound( - "File has no parent directory".to_string(), - ))); - } - } else { - return Err(AppError::from(FileError::FileNotFound(format!( - "File at path {:?} was not found", - file_path - )))); - } + // Remove the object's parent prefix in the object store + let (parent_prefix, _file_name) = store::split_object_path(&file_info.path) + .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; + store::delete_prefix_at(&parent_prefix, config) + .await + .map_err(|e| AppError::from(anyhow::anyhow!(e)))?; + info!("Removed object prefix {} and its contents via object_store", parent_prefix); // Delete the FileInfo from the database db_client.delete_item::(id).await?; @@ -300,6 +278,7 @@ mod tests { use super::*; use axum::http::HeaderMap; use axum_typed_multipart::FieldMetadata; + use crate::utils::config::StorageKind; use std::io::Write; use tempfile::NamedTempFile; @@ -326,7 +305,7 @@ mod tests { } #[tokio::test] - async fn test_cross_filesystem_file_operations() { + async fn test_fileinfo_create_read_delete() { // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); @@ -351,6 +330,7 @@ mod tests { surrealdb_database: "test_db".to_string(), http_port: 3000, openai_base_url: "..".to_string(), + storage: StorageKind::Local, }; // Test file creation @@ -358,14 +338,11 @@ mod tests { .await .expect("Failed to create file across filesystems"); - // Verify the file exists and has correct content - let file_path = Path::new(&file_info.path); - assert!(file_path.exists(), "File should exist at {:?}", file_path); - - let file_content = tokio::fs::read_to_string(file_path) + // Verify the file exists via object_store and has correct content + let bytes = store::get_bytes_at(&file_info.path, &config) .await - .expect("Failed to read file content"); - assert_eq!(file_content, String::from_utf8_lossy(content)); + .expect("Failed to read file content via object_store"); + assert_eq!(bytes, content.as_slice()); // Test file reading let retrieved = FileInfo::get_by_id(&file_info.id, &db) @@ -375,17 +352,20 @@ mod tests { assert_eq!(retrieved.sha256, file_info.sha256); // Test file deletion - FileInfo::delete_by_id(&file_info.id, &db) + FileInfo::delete_by_id(&file_info.id, &db, &config) .await .expect("Failed to delete file"); - assert!(!file_path.exists(), "File should be deleted"); + assert!( + store::get_bytes_at(&file_info.path, &config).await.is_err(), + "File should be deleted" + ); // Clean up the test directory let _ = tokio::fs::remove_dir_all(&config.data_dir).await; } #[tokio::test] - async fn test_cross_filesystem_duplicate_detection() { + async fn test_fileinfo_duplicate_detection() { // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); @@ -410,6 +390,7 @@ mod tests { surrealdb_database: "test_db".to_string(), http_port: 3000, openai_base_url: "..".to_string(), + storage: StorageKind::Local, }; // Store the original file @@ -433,7 +414,7 @@ mod tests { assert_ne!(duplicate_file_info.file_name, duplicate_name); // Clean up - FileInfo::delete_by_id(&original_file_info.id, &db) + FileInfo::delete_by_id(&original_file_info.id, &db, &config) .await .expect("Failed to delete file"); let _ = tokio::fs::remove_dir_all(&config.data_dir).await; @@ -465,6 +446,7 @@ mod tests { surrealdb_database: "test_db".to_string(), http_port: 3000, openai_base_url: "..".to_string(), + storage: StorageKind::Local, }; let file_info = FileInfo::new(field_data, &db, user_id, &config).await; @@ -478,6 +460,11 @@ mod tests { assert_eq!(file_info.file_name, file_name); assert!(!file_info.sha256.is_empty()); assert!(!file_info.path.is_empty()); + // path should be logical: "user_id/uuid/file_name" + let parts: Vec<&str> = file_info.path.split('/').collect(); + assert_eq!(parts.len(), 3); + assert_eq!(parts[0], user_id); + assert_eq!(parts[2], file_name); assert!(file_info.mime_type.contains("text/plain")); // Verify it's in the database @@ -516,6 +503,7 @@ mod tests { surrealdb_database: "test_db".to_string(), http_port: 3000, openai_base_url: "..".to_string(), + storage: StorageKind::Local, }; let field_data1 = create_test_file(content, file_name); @@ -667,50 +655,14 @@ mod tests { .await .expect("Failed to start in-memory surrealdb"); - // Create a FileInfo instance directly (without persistence to disk) - let now = Utc::now(); - let file_id = Uuid::new_v4().to_string(); - - // Create a temporary directory that mimics the structure we would have on disk - let base_dir = Path::new("./data"); - let user_id = "test_user"; - let user_dir = base_dir.join(user_id); - let uuid_dir = user_dir.join(&file_id); - - tokio::fs::create_dir_all(&uuid_dir) - .await - .expect("Failed to create test directories"); - - // Create a test file in the directory - let test_file_path = uuid_dir.join("test_file.txt"); - tokio::fs::write(&test_file_path, b"test content") - .await - .expect("Failed to write test file"); - - // The file path should point to our test file - let file_info = FileInfo { - id: file_id.clone(), - user_id: "user123".to_string(), - created_at: now, - updated_at: now, - sha256: "test_sha256_hash".to_string(), - path: test_file_path.to_string_lossy().to_string(), - file_name: "test_file.txt".to_string(), - mime_type: "text/plain".to_string(), - }; - - // Store it in the database - db.store_item(file_info.clone()) - .await - .expect("Failed to store file info"); - - // Verify file exists on disk - assert!(tokio::fs::try_exists(&test_file_path) - .await - .unwrap_or(false)); + // Create and persist a test file via FileInfo::new + let user_id = "user123"; + let cfg = AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local }; + let temp = create_test_file(b"test content", "test_file.txt"); + let file_info = FileInfo::new(temp, &db, user_id, &cfg).await.expect("create file"); // Delete the file - let delete_result = FileInfo::delete_by_id(&file_id, &db).await; + let delete_result = FileInfo::delete_by_id(&file_info.id, &db, &cfg).await; // Delete should be successful assert!( @@ -721,7 +673,7 @@ mod tests { // Verify the file is removed from the database let retrieved: Option = db - .get_item(&file_id) + .get_item(&file_info.id) .await .expect("Failed to query database"); assert!( @@ -729,14 +681,8 @@ mod tests { "FileInfo should be deleted from the database" ); - // Verify directory is gone - assert!( - !tokio::fs::try_exists(&uuid_dir).await.unwrap_or(true), - "UUID directory should be deleted" - ); - - // Clean up test directory if it exists - let _ = tokio::fs::remove_dir_all(base_dir).await; + // Verify content no longer retrievable + assert!(store::get_bytes_at(&file_info.path, &cfg).await.is_err()); } #[tokio::test] @@ -749,7 +695,7 @@ mod tests { .expect("Failed to start in-memory surrealdb"); // Try to delete a file that doesn't exist - let result = FileInfo::delete_by_id("nonexistent_id", &db).await; + let result = FileInfo::delete_by_id("nonexistent_id", &db, &AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local }).await; // Should fail with FileNotFound error assert!(result.is_err()); @@ -828,7 +774,7 @@ mod tests { } #[tokio::test] - async fn test_data_directory_configuration() { + async fn test_fileinfo_persist_with_custom_root() { // Setup in-memory database for testing let namespace = "test_ns"; let database = &Uuid::new_v4().to_string(); @@ -854,6 +800,7 @@ mod tests { surrealdb_database: "test_db".to_string(), http_port: 3000, openai_base_url: "..".to_string(), + storage: StorageKind::Local, }; // Test file creation @@ -861,27 +808,17 @@ mod tests { .await .expect("Failed to create file in custom data directory"); - // Verify the file exists in the correct location - let file_path = Path::new(&file_info.path); - assert!(file_path.exists(), "File should exist at {:?}", file_path); - - // Verify the file is in the correct data directory - assert!( - file_path.starts_with(custom_data_dir), - "File should be stored in the custom data directory" - ); - - // Verify the file has the correct content - let file_content = tokio::fs::read_to_string(file_path) + // Verify the file has the correct content via object_store + let file_content = store::get_bytes_at(&file_info.path, &config) .await .expect("Failed to read file content"); - assert_eq!(file_content, String::from_utf8_lossy(content)); + assert_eq!(file_content.as_ref(), content); // Test file deletion - FileInfo::delete_by_id(&file_info.id, &db) + FileInfo::delete_by_id(&file_info.id, &db, &config) .await .expect("Failed to delete file"); - assert!(!file_path.exists(), "File should be deleted"); + assert!(store::get_bytes_at(&file_info.path, &config).await.is_err()); // Clean up the test directory let _ = tokio::fs::remove_dir_all(custom_data_dir).await; diff --git a/common/src/utils/config.rs b/common/src/utils/config.rs index 91c3b8f..bd4b36e 100644 --- a/common/src/utils/config.rs +++ b/common/src/utils/config.rs @@ -1,6 +1,16 @@ use config::{Config, ConfigError, Environment, File}; use serde::Deserialize; +#[derive(Clone, Deserialize, Debug)] +#[serde(rename_all = "lowercase")] +pub enum StorageKind { + Local, +} + +fn default_storage_kind() -> StorageKind { + StorageKind::Local +} + #[derive(Clone, Deserialize, Debug)] pub struct AppConfig { pub openai_api_key: String, @@ -14,6 +24,8 @@ pub struct AppConfig { pub http_port: u16, #[serde(default = "default_base_url")] pub openai_base_url: String, + #[serde(default = "default_storage_kind")] + pub storage: StorageKind, } fn default_data_dir() -> String { diff --git a/html-router/src/routes/content/handlers.rs b/html-router/src/routes/content/handlers.rs index 1f7635a..6266176 100644 --- a/html-router/src/routes/content/handlers.rs +++ b/html-router/src/routes/content/handlers.rs @@ -135,7 +135,7 @@ pub async fn delete_text_content( // If it has file info, delete that too if let Some(file_info) = &text_content.file_info { - FileInfo::delete_by_id(&file_info.id, &state.db).await?; + FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?; } // Delete related knowledge entities and text chunks diff --git a/html-router/src/routes/index/handlers.rs b/html-router/src/routes/index/handlers.rs index f9823d0..497acaf 100644 --- a/html-router/src/routes/index/handlers.rs +++ b/html-router/src/routes/index/handlers.rs @@ -5,8 +5,7 @@ use axum::{ response::IntoResponse, }; use serde::Serialize; -use tokio::{fs::File, join}; -use tokio_util::io::ReaderStream; +use tokio::join; use crate::{ middlewares::{ @@ -23,6 +22,7 @@ use common::{ text_chunk::TextChunk, text_content::TextContent, user::User, }, }; +use common::storage::store; use crate::html_state::HtmlState; @@ -77,7 +77,7 @@ pub async fn delete_text_content( let (_res1, _res2, _res3, _res4, _res5) = join!( async { if let Some(file_info) = text_content.file_info { - FileInfo::delete_by_id(&file_info.id, &state.db).await + FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await } else { Ok(()) } @@ -177,14 +177,10 @@ pub async fn serve_file( return Ok(TemplateResponse::unauthorized().into_response()); } - let path = std::path::Path::new(&file_info.path); - - let file = match File::open(path).await { - Ok(f) => f, - Err(_e) => return Ok(TemplateResponse::server_error().into_response()), + let stream = match store::get_stream_at(&file_info.path, &state.config).await { + Ok(s) => s, + Err(_) => return Ok(TemplateResponse::server_error().into_response()), }; - - let stream = ReaderStream::new(file); let body = Body::from_stream(stream); let mut headers = HeaderMap::new();