mirror of
https://github.com/perstarkse/minne.git
synced 2026-03-21 00:49:54 +01:00
refactor: merge new storage backend into main
This is in preparation for s3 storage support
This commit is contained in:
46
Cargo.lock
generated
46
Cargo.lock
generated
@@ -1212,6 +1212,7 @@ dependencies = [
|
||||
"axum_session_auth",
|
||||
"axum_session_surreal",
|
||||
"axum_typed_multipart",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"chrono-tz",
|
||||
"config",
|
||||
@@ -1224,6 +1225,7 @@ dependencies = [
|
||||
"minijinja-autoreload",
|
||||
"minijinja-contrib",
|
||||
"minijinja-embed",
|
||||
"object_store 0.11.2",
|
||||
"reqwest",
|
||||
"serde",
|
||||
"serde_json",
|
||||
@@ -3543,6 +3545,27 @@ dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object_store"
|
||||
version = "0.11.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "3cfccb68961a56facde1163f9319e0d15743352344e7808a11795fb99698dcaf"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"futures",
|
||||
"humantime",
|
||||
"itertools 0.13.0",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"snafu",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
"walkdir",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "object_store"
|
||||
version = "0.12.0"
|
||||
@@ -5042,6 +5065,27 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu"
|
||||
version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6e84b3f4eacbf3a1ce05eac6763b4d629d60cbc94d632e4092c54ade71f1e1a2"
|
||||
dependencies = [
|
||||
"snafu-derive",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snafu-derive"
|
||||
version = "0.8.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "c1c97747dbf44bb1ca44a561ece23508e99cb592e862f22222dcf42f51d1e451"
|
||||
dependencies = [
|
||||
"heck",
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.101",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "snap"
|
||||
version = "1.1.1"
|
||||
@@ -5271,7 +5315,7 @@ dependencies = [
|
||||
"ndarray-stats",
|
||||
"num-traits",
|
||||
"num_cpus",
|
||||
"object_store",
|
||||
"object_store 0.12.0",
|
||||
"parking_lot",
|
||||
"pbkdf2",
|
||||
"pharos",
|
||||
|
||||
@@ -53,6 +53,8 @@ url = { version = "2.5.2", features = ["serde"] }
|
||||
uuid = { version = "1.10.0", features = ["v4", "serde"] }
|
||||
tokio-retry = "0.3.0"
|
||||
base64 = "0.22.1"
|
||||
object_store = { version = "0.11.2" }
|
||||
bytes = "1.7.1"
|
||||
|
||||
[profile.dist]
|
||||
inherits = "release"
|
||||
|
||||
@@ -39,6 +39,8 @@ url = { workspace = true }
|
||||
uuid = { workspace = true }
|
||||
surrealdb-migrations = { workspace = true }
|
||||
tokio-retry = { workspace = true }
|
||||
object_store = { workspace = true }
|
||||
bytes = { workspace = true }
|
||||
|
||||
|
||||
[features]
|
||||
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod db;
|
||||
pub mod types;
|
||||
pub mod store;
|
||||
|
||||
256
common/src/storage/store.rs
Normal file
256
common/src/storage/store.rs
Normal file
@@ -0,0 +1,256 @@
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
|
||||
use anyhow::{anyhow, Result as AnyResult};
|
||||
use bytes::Bytes;
|
||||
use futures::{StreamExt, TryStreamExt};
|
||||
use futures::stream::BoxStream;
|
||||
use object_store::local::LocalFileSystem;
|
||||
use object_store::{path::Path as ObjPath, ObjectStore};
|
||||
|
||||
use crate::utils::config::{AppConfig, StorageKind};
|
||||
|
||||
pub type DynStore = Arc<dyn ObjectStore>;
|
||||
|
||||
/// Build an object store instance anchored at the given filesystem `prefix`.
|
||||
///
|
||||
/// - For the `Local` backend, `prefix` is the absolute directory on disk that
|
||||
/// serves as the root for all object paths passed to the store.
|
||||
/// - `prefix` must already exist; this function will create it if missing.
|
||||
///
|
||||
/// Example (Local):
|
||||
/// - prefix: `/var/data`
|
||||
/// - object location: `user/uuid/file.txt`
|
||||
/// - absolute path: `/var/data/user/uuid/file.txt`
|
||||
pub async fn build_store(prefix: &Path, cfg: &AppConfig) -> object_store::Result<DynStore> {
|
||||
match cfg.storage {
|
||||
StorageKind::Local => {
|
||||
if !prefix.exists() {
|
||||
tokio::fs::create_dir_all(prefix)
|
||||
.await
|
||||
.map_err(|e| object_store::Error::Generic {
|
||||
store: "LocalFileSystem",
|
||||
source: e.into(),
|
||||
})?;
|
||||
}
|
||||
let store = LocalFileSystem::new_with_prefix(prefix)?;
|
||||
Ok(Arc::new(store))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve the absolute base directory used for local storage from config.
|
||||
///
|
||||
/// If `data_dir` is relative, it is resolved against the current working directory.
|
||||
pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf {
|
||||
if cfg.data_dir.starts_with('/') {
|
||||
PathBuf::from(&cfg.data_dir)
|
||||
} else {
|
||||
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
|
||||
.join(&cfg.data_dir)
|
||||
}
|
||||
}
|
||||
|
||||
/// Build an object store rooted at the configured data directory.
|
||||
///
|
||||
/// This is the recommended way to obtain a store for logical object operations
|
||||
/// such as `put_bytes_at`, `get_bytes_at`, and `delete_prefix_at`.
|
||||
pub async fn build_store_root(cfg: &AppConfig) -> object_store::Result<DynStore> {
|
||||
let base = resolve_base_dir(cfg);
|
||||
build_store(&base, cfg).await
|
||||
}
|
||||
|
||||
/// Write bytes to `file_name` within a filesystem `prefix` using the configured store.
|
||||
///
|
||||
/// Prefer [`put_bytes_at`] for location-based writes that do not need to compute
|
||||
/// a separate filesystem prefix.
|
||||
pub async fn put_bytes(prefix: &Path, file_name: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> {
|
||||
let store = build_store(prefix, cfg).await?;
|
||||
let payload = object_store::PutPayload::from_bytes(data);
|
||||
store.put(&ObjPath::from(file_name), payload).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write bytes to the provided logical object `location`, e.g. `"user/uuid/file"`.
|
||||
///
|
||||
/// The store root is taken from `AppConfig::data_dir` for the local backend.
|
||||
/// This performs an atomic write as guaranteed by `object_store`.
|
||||
pub async fn put_bytes_at(location: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> {
|
||||
let store = build_store_root(cfg).await?;
|
||||
let payload = object_store::PutPayload::from_bytes(data);
|
||||
store.put(&ObjPath::from(location), payload).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read bytes from `file_name` within a filesystem `prefix` using the configured store.
|
||||
///
|
||||
/// Prefer [`get_bytes_at`] for location-based reads.
|
||||
pub async fn get_bytes(prefix: &Path, file_name: &str, cfg: &AppConfig) -> object_store::Result<Bytes> {
|
||||
let store = build_store(prefix, cfg).await?;
|
||||
let r = store.get(&ObjPath::from(file_name)).await?;
|
||||
let b = r.bytes().await?;
|
||||
Ok(b)
|
||||
}
|
||||
|
||||
/// Read bytes from the provided logical object `location`.
|
||||
///
|
||||
/// Returns the full contents buffered in memory.
|
||||
pub async fn get_bytes_at(location: &str, cfg: &AppConfig) -> object_store::Result<Bytes> {
|
||||
let store = build_store_root(cfg).await?;
|
||||
let r = store.get(&ObjPath::from(location)).await?;
|
||||
r.bytes().await
|
||||
}
|
||||
|
||||
/// Get a streaming body for the provided logical object `location`.
|
||||
///
|
||||
/// Returns a fallible `BoxStream` of `Bytes`, suitable for use with
|
||||
/// `axum::body::Body::from_stream` to stream responses without buffering.
|
||||
pub async fn get_stream_at(location: &str, cfg: &AppConfig) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
|
||||
let store = build_store_root(cfg).await?;
|
||||
let r = store.get(&ObjPath::from(location)).await?;
|
||||
Ok(r.into_stream())
|
||||
}
|
||||
|
||||
/// Delete all objects below the provided filesystem `prefix`.
|
||||
///
|
||||
/// This is a low-level variant for when a dedicated on-disk prefix is used for a
|
||||
/// particular object grouping. Prefer [`delete_prefix_at`] for location-based stores.
|
||||
pub async fn delete_prefix(prefix: &Path, cfg: &AppConfig) -> object_store::Result<()> {
|
||||
let store = build_store(prefix, cfg).await?;
|
||||
// list everything and delete
|
||||
let locations = store.list(None).map_ok(|m| m.location).boxed();
|
||||
store.delete_stream(locations).try_collect::<Vec<_>>().await?;
|
||||
// Best effort remove the directory itself
|
||||
if tokio::fs::try_exists(prefix).await.unwrap_or(false) {
|
||||
let _ = tokio::fs::remove_dir_all(prefix).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete all objects below the provided logical object `prefix`, e.g. `"user/uuid/"`.
|
||||
///
|
||||
/// After deleting, attempts a best-effort cleanup of the now-empty directory on disk
|
||||
/// when using the local backend.
|
||||
pub async fn delete_prefix_at(prefix: &str, cfg: &AppConfig) -> object_store::Result<()> {
|
||||
let store = build_store_root(cfg).await?;
|
||||
let prefix_path = ObjPath::from(prefix);
|
||||
let locations = store.list(Some(&prefix_path)).map_ok(|m| m.location).boxed();
|
||||
store.delete_stream(locations).try_collect::<Vec<_>>().await?;
|
||||
// Best effort remove empty directory on disk for local storage
|
||||
let base_dir = resolve_base_dir(cfg).join(prefix);
|
||||
if tokio::fs::try_exists(&base_dir).await.unwrap_or(false) {
|
||||
let _ = tokio::fs::remove_dir_all(&base_dir).await;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Split an absolute filesystem path into `(parent_dir, file_name)`.
|
||||
pub fn split_abs_path(path: &str) -> AnyResult<(PathBuf, String)> {
|
||||
let pb = PathBuf::from(path);
|
||||
let parent = pb
|
||||
.parent()
|
||||
.ok_or_else(|| anyhow!("Path has no parent: {path}"))?
|
||||
.to_path_buf();
|
||||
let file = pb
|
||||
.file_name()
|
||||
.ok_or_else(|| anyhow!("Path has no file name: {path}"))?
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
Ok((parent, file))
|
||||
}
|
||||
|
||||
/// Split a logical object location `"a/b/c"` into `("a/b", "c")`.
|
||||
pub fn split_object_path(path: &str) -> AnyResult<(String, String)> {
|
||||
if let Some((p, f)) = path.rsplit_once('/') {
|
||||
return Ok((p.to_string(), f.to_string()));
|
||||
}
|
||||
Err(anyhow!("Object path has no separator: {path}"))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::utils::config::StorageKind;
|
||||
use bytes::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
use uuid::Uuid;
|
||||
|
||||
fn test_config(root: &str) -> AppConfig {
|
||||
AppConfig {
|
||||
openai_api_key: "test".into(),
|
||||
surrealdb_address: "test".into(),
|
||||
surrealdb_username: "test".into(),
|
||||
surrealdb_password: "test".into(),
|
||||
surrealdb_namespace: "test".into(),
|
||||
surrealdb_database: "test".into(),
|
||||
data_dir: root.into(),
|
||||
http_port: 0,
|
||||
openai_base_url: "..".into(),
|
||||
storage: StorageKind::Local,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_build_store_root_creates_base() {
|
||||
let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4());
|
||||
let cfg = test_config(&base);
|
||||
let _ = build_store_root(&cfg).await.expect("build store root");
|
||||
assert!(tokio::fs::try_exists(&base).await.unwrap_or(false));
|
||||
let _ = tokio::fs::remove_dir_all(&base).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_put_get_bytes_at_and_delete_prefix_at() {
|
||||
let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4());
|
||||
let cfg = test_config(&base);
|
||||
|
||||
let location_prefix = format!("{}/{}", "user1", Uuid::new_v4());
|
||||
let file_name = "file.txt";
|
||||
let location = format!("{}/{}", &location_prefix, file_name);
|
||||
let payload = Bytes::from_static(b"hello world");
|
||||
|
||||
put_bytes_at(&location, payload.clone(), &cfg).await.expect("put");
|
||||
let got = get_bytes_at(&location, &cfg).await.expect("get");
|
||||
assert_eq!(got.as_ref(), payload.as_ref());
|
||||
|
||||
// Delete the whole prefix and ensure retrieval fails
|
||||
delete_prefix_at(&location_prefix, &cfg).await.expect("delete prefix");
|
||||
assert!(get_bytes_at(&location, &cfg).await.is_err());
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(&base).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_stream_at() {
|
||||
let base = format!("/tmp/minne_store_test_{}", Uuid::new_v4());
|
||||
let cfg = test_config(&base);
|
||||
|
||||
let location = format!("{}/{}/stream.bin", "user2", Uuid::new_v4());
|
||||
let content = vec![7u8; 32 * 1024]; // 32KB payload
|
||||
|
||||
put_bytes_at(&location, Bytes::from(content.clone()), &cfg)
|
||||
.await
|
||||
.expect("put");
|
||||
|
||||
let stream = get_stream_at(&location, &cfg).await.expect("stream");
|
||||
let combined: Vec<u8> = stream
|
||||
.map_ok(|chunk| chunk.to_vec())
|
||||
.try_fold(Vec::new(), |mut acc, mut chunk| async move {
|
||||
acc.append(&mut chunk);
|
||||
Ok(acc)
|
||||
})
|
||||
.await
|
||||
.expect("collect");
|
||||
|
||||
assert_eq!(combined, content);
|
||||
|
||||
delete_prefix_at(
|
||||
&split_object_path(&location).unwrap().0,
|
||||
&cfg,
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
let _ = tokio::fs::remove_dir_all(&base).await;
|
||||
}
|
||||
}
|
||||
@@ -3,16 +3,20 @@ use mime_guess::from_path;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
io::{BufReader, Read},
|
||||
path::{Path, PathBuf},
|
||||
path::Path,
|
||||
};
|
||||
use tempfile::NamedTempFile;
|
||||
use thiserror::Error;
|
||||
use tokio::fs::remove_dir_all;
|
||||
use object_store::Error as ObjectStoreError;
|
||||
// futures imports no longer needed here after abstraction
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
error::AppError, storage::db::SurrealDbClient, stored_object, utils::config::AppConfig,
|
||||
error::AppError,
|
||||
storage::{db::SurrealDbClient, store},
|
||||
stored_object,
|
||||
utils::config::AppConfig,
|
||||
};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -34,6 +38,9 @@ pub enum FileError {
|
||||
|
||||
#[error("File name missing in metadata")]
|
||||
MissingFileName,
|
||||
|
||||
#[error("Object store error: {0}")]
|
||||
ObjectStore(#[from] ObjectStoreError),
|
||||
}
|
||||
|
||||
stored_object!(FileInfo, "file", {
|
||||
@@ -84,9 +91,7 @@ impl FileInfo {
|
||||
file_name,
|
||||
sha256,
|
||||
path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config)
|
||||
.await?
|
||||
.to_string_lossy()
|
||||
.into(),
|
||||
.await?,
|
||||
mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)),
|
||||
user_id: user_id.to_string(),
|
||||
};
|
||||
@@ -165,7 +170,7 @@ impl FileInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Persists the file to the filesystem under `{data_dir}/{user_id}/{uuid}/{file_name}`.
|
||||
/// Persists the file under the logical location `{user_id}/{uuid}/{file_name}`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `uuid` - The UUID of the file.
|
||||
@@ -175,43 +180,24 @@ impl FileInfo {
|
||||
/// * `config` - Application configuration containing data directory path
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<PathBuf, FileError>` - The persisted file path or an error.
|
||||
/// * `Result<String, FileError>` - The logical object location or an error.
|
||||
async fn persist_file(
|
||||
uuid: &Uuid,
|
||||
file: NamedTempFile,
|
||||
file_name: &str,
|
||||
user_id: &str,
|
||||
config: &AppConfig,
|
||||
) -> Result<PathBuf, FileError> {
|
||||
info!("Data dir: {:?}", config.data_dir);
|
||||
// Convert relative path to absolute path
|
||||
let base_dir = if config.data_dir.starts_with('/') {
|
||||
Path::new(&config.data_dir).to_path_buf()
|
||||
} else {
|
||||
std::env::current_dir()
|
||||
.map_err(FileError::Io)?
|
||||
.join(&config.data_dir)
|
||||
};
|
||||
) -> Result<String, FileError> {
|
||||
// Logical object location relative to the store root
|
||||
let location = format!("{}/{}/{}", user_id, uuid, file_name);
|
||||
info!("Persisting to object location: {}", location);
|
||||
|
||||
let user_dir = base_dir.join(user_id); // Create the user directory
|
||||
let uuid_dir = user_dir.join(uuid.to_string()); // Create the UUID directory under the user directory
|
||||
|
||||
// Create the user and UUID directories if they don't exist
|
||||
tokio::fs::create_dir_all(&uuid_dir)
|
||||
let bytes = tokio::fs::read(file.path()).await?;
|
||||
store::put_bytes_at(&location, bytes.into(), config)
|
||||
.await
|
||||
.map_err(FileError::Io)?;
|
||||
.map_err(FileError::from)?;
|
||||
|
||||
// Define the final file path
|
||||
let final_path = uuid_dir.join(file_name);
|
||||
info!("Final path: {:?}", final_path);
|
||||
|
||||
// Copy the temporary file to the final path
|
||||
tokio::fs::copy(file.path(), &final_path)
|
||||
.await
|
||||
.map_err(FileError::Io)?;
|
||||
info!("Copied file to {:?}", final_path);
|
||||
|
||||
Ok(final_path)
|
||||
Ok(location)
|
||||
}
|
||||
|
||||
/// Retrieves a `FileInfo` by SHA256.
|
||||
@@ -240,7 +226,11 @@ impl FileInfo {
|
||||
///
|
||||
/// # Returns
|
||||
/// `Result<(), FileError>`
|
||||
pub async fn delete_by_id(id: &str, db_client: &SurrealDbClient) -> Result<(), AppError> {
|
||||
pub async fn delete_by_id(
|
||||
id: &str,
|
||||
db_client: &SurrealDbClient,
|
||||
config: &AppConfig,
|
||||
) -> Result<(), AppError> {
|
||||
// Get the FileInfo from the database
|
||||
let file_info = match db_client.get_item::<FileInfo>(id).await? {
|
||||
Some(info) => info,
|
||||
@@ -252,25 +242,13 @@ impl FileInfo {
|
||||
}
|
||||
};
|
||||
|
||||
// Remove the file and its parent directory
|
||||
let file_path = Path::new(&file_info.path);
|
||||
if file_path.exists() {
|
||||
// Get the parent directory of the file
|
||||
if let Some(parent_dir) = file_path.parent() {
|
||||
// Remove the entire directory containing the file
|
||||
remove_dir_all(parent_dir).await?;
|
||||
info!("Removed directory {:?} and its contents", parent_dir);
|
||||
} else {
|
||||
return Err(AppError::from(FileError::FileNotFound(
|
||||
"File has no parent directory".to_string(),
|
||||
)));
|
||||
}
|
||||
} else {
|
||||
return Err(AppError::from(FileError::FileNotFound(format!(
|
||||
"File at path {:?} was not found",
|
||||
file_path
|
||||
))));
|
||||
}
|
||||
// Remove the object's parent prefix in the object store
|
||||
let (parent_prefix, _file_name) = store::split_object_path(&file_info.path)
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
store::delete_prefix_at(&parent_prefix, config)
|
||||
.await
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
info!("Removed object prefix {} and its contents via object_store", parent_prefix);
|
||||
|
||||
// Delete the FileInfo from the database
|
||||
db_client.delete_item::<FileInfo>(id).await?;
|
||||
@@ -300,6 +278,7 @@ mod tests {
|
||||
use super::*;
|
||||
use axum::http::HeaderMap;
|
||||
use axum_typed_multipart::FieldMetadata;
|
||||
use crate::utils::config::StorageKind;
|
||||
use std::io::Write;
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
@@ -326,7 +305,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cross_filesystem_file_operations() {
|
||||
async fn test_fileinfo_create_read_delete() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
@@ -351,6 +330,7 @@ mod tests {
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
};
|
||||
|
||||
// Test file creation
|
||||
@@ -358,14 +338,11 @@ mod tests {
|
||||
.await
|
||||
.expect("Failed to create file across filesystems");
|
||||
|
||||
// Verify the file exists and has correct content
|
||||
let file_path = Path::new(&file_info.path);
|
||||
assert!(file_path.exists(), "File should exist at {:?}", file_path);
|
||||
|
||||
let file_content = tokio::fs::read_to_string(file_path)
|
||||
// Verify the file exists via object_store and has correct content
|
||||
let bytes = store::get_bytes_at(&file_info.path, &config)
|
||||
.await
|
||||
.expect("Failed to read file content");
|
||||
assert_eq!(file_content, String::from_utf8_lossy(content));
|
||||
.expect("Failed to read file content via object_store");
|
||||
assert_eq!(bytes, content.as_slice());
|
||||
|
||||
// Test file reading
|
||||
let retrieved = FileInfo::get_by_id(&file_info.id, &db)
|
||||
@@ -375,17 +352,20 @@ mod tests {
|
||||
assert_eq!(retrieved.sha256, file_info.sha256);
|
||||
|
||||
// Test file deletion
|
||||
FileInfo::delete_by_id(&file_info.id, &db)
|
||||
FileInfo::delete_by_id(&file_info.id, &db, &config)
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
assert!(!file_path.exists(), "File should be deleted");
|
||||
assert!(
|
||||
store::get_bytes_at(&file_info.path, &config).await.is_err(),
|
||||
"File should be deleted"
|
||||
);
|
||||
|
||||
// Clean up the test directory
|
||||
let _ = tokio::fs::remove_dir_all(&config.data_dir).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cross_filesystem_duplicate_detection() {
|
||||
async fn test_fileinfo_duplicate_detection() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
@@ -410,6 +390,7 @@ mod tests {
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
};
|
||||
|
||||
// Store the original file
|
||||
@@ -433,7 +414,7 @@ mod tests {
|
||||
assert_ne!(duplicate_file_info.file_name, duplicate_name);
|
||||
|
||||
// Clean up
|
||||
FileInfo::delete_by_id(&original_file_info.id, &db)
|
||||
FileInfo::delete_by_id(&original_file_info.id, &db, &config)
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
let _ = tokio::fs::remove_dir_all(&config.data_dir).await;
|
||||
@@ -465,6 +446,7 @@ mod tests {
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
};
|
||||
let file_info = FileInfo::new(field_data, &db, user_id, &config).await;
|
||||
|
||||
@@ -478,6 +460,11 @@ mod tests {
|
||||
assert_eq!(file_info.file_name, file_name);
|
||||
assert!(!file_info.sha256.is_empty());
|
||||
assert!(!file_info.path.is_empty());
|
||||
// path should be logical: "user_id/uuid/file_name"
|
||||
let parts: Vec<&str> = file_info.path.split('/').collect();
|
||||
assert_eq!(parts.len(), 3);
|
||||
assert_eq!(parts[0], user_id);
|
||||
assert_eq!(parts[2], file_name);
|
||||
assert!(file_info.mime_type.contains("text/plain"));
|
||||
|
||||
// Verify it's in the database
|
||||
@@ -516,6 +503,7 @@ mod tests {
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
};
|
||||
|
||||
let field_data1 = create_test_file(content, file_name);
|
||||
@@ -667,50 +655,14 @@ mod tests {
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
|
||||
// Create a FileInfo instance directly (without persistence to disk)
|
||||
let now = Utc::now();
|
||||
let file_id = Uuid::new_v4().to_string();
|
||||
|
||||
// Create a temporary directory that mimics the structure we would have on disk
|
||||
let base_dir = Path::new("./data");
|
||||
let user_id = "test_user";
|
||||
let user_dir = base_dir.join(user_id);
|
||||
let uuid_dir = user_dir.join(&file_id);
|
||||
|
||||
tokio::fs::create_dir_all(&uuid_dir)
|
||||
.await
|
||||
.expect("Failed to create test directories");
|
||||
|
||||
// Create a test file in the directory
|
||||
let test_file_path = uuid_dir.join("test_file.txt");
|
||||
tokio::fs::write(&test_file_path, b"test content")
|
||||
.await
|
||||
.expect("Failed to write test file");
|
||||
|
||||
// The file path should point to our test file
|
||||
let file_info = FileInfo {
|
||||
id: file_id.clone(),
|
||||
user_id: "user123".to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
sha256: "test_sha256_hash".to_string(),
|
||||
path: test_file_path.to_string_lossy().to_string(),
|
||||
file_name: "test_file.txt".to_string(),
|
||||
mime_type: "text/plain".to_string(),
|
||||
};
|
||||
|
||||
// Store it in the database
|
||||
db.store_item(file_info.clone())
|
||||
.await
|
||||
.expect("Failed to store file info");
|
||||
|
||||
// Verify file exists on disk
|
||||
assert!(tokio::fs::try_exists(&test_file_path)
|
||||
.await
|
||||
.unwrap_or(false));
|
||||
// Create and persist a test file via FileInfo::new
|
||||
let user_id = "user123";
|
||||
let cfg = AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local };
|
||||
let temp = create_test_file(b"test content", "test_file.txt");
|
||||
let file_info = FileInfo::new(temp, &db, user_id, &cfg).await.expect("create file");
|
||||
|
||||
// Delete the file
|
||||
let delete_result = FileInfo::delete_by_id(&file_id, &db).await;
|
||||
let delete_result = FileInfo::delete_by_id(&file_info.id, &db, &cfg).await;
|
||||
|
||||
// Delete should be successful
|
||||
assert!(
|
||||
@@ -721,7 +673,7 @@ mod tests {
|
||||
|
||||
// Verify the file is removed from the database
|
||||
let retrieved: Option<FileInfo> = db
|
||||
.get_item(&file_id)
|
||||
.get_item(&file_info.id)
|
||||
.await
|
||||
.expect("Failed to query database");
|
||||
assert!(
|
||||
@@ -729,14 +681,8 @@ mod tests {
|
||||
"FileInfo should be deleted from the database"
|
||||
);
|
||||
|
||||
// Verify directory is gone
|
||||
assert!(
|
||||
!tokio::fs::try_exists(&uuid_dir).await.unwrap_or(true),
|
||||
"UUID directory should be deleted"
|
||||
);
|
||||
|
||||
// Clean up test directory if it exists
|
||||
let _ = tokio::fs::remove_dir_all(base_dir).await;
|
||||
// Verify content no longer retrievable
|
||||
assert!(store::get_bytes_at(&file_info.path, &cfg).await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -749,7 +695,7 @@ mod tests {
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
|
||||
// Try to delete a file that doesn't exist
|
||||
let result = FileInfo::delete_by_id("nonexistent_id", &db).await;
|
||||
let result = FileInfo::delete_by_id("nonexistent_id", &db, &AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local }).await;
|
||||
|
||||
// Should fail with FileNotFound error
|
||||
assert!(result.is_err());
|
||||
@@ -828,7 +774,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_data_directory_configuration() {
|
||||
async fn test_fileinfo_persist_with_custom_root() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
@@ -854,6 +800,7 @@ mod tests {
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
};
|
||||
|
||||
// Test file creation
|
||||
@@ -861,27 +808,17 @@ mod tests {
|
||||
.await
|
||||
.expect("Failed to create file in custom data directory");
|
||||
|
||||
// Verify the file exists in the correct location
|
||||
let file_path = Path::new(&file_info.path);
|
||||
assert!(file_path.exists(), "File should exist at {:?}", file_path);
|
||||
|
||||
// Verify the file is in the correct data directory
|
||||
assert!(
|
||||
file_path.starts_with(custom_data_dir),
|
||||
"File should be stored in the custom data directory"
|
||||
);
|
||||
|
||||
// Verify the file has the correct content
|
||||
let file_content = tokio::fs::read_to_string(file_path)
|
||||
// Verify the file has the correct content via object_store
|
||||
let file_content = store::get_bytes_at(&file_info.path, &config)
|
||||
.await
|
||||
.expect("Failed to read file content");
|
||||
assert_eq!(file_content, String::from_utf8_lossy(content));
|
||||
assert_eq!(file_content.as_ref(), content);
|
||||
|
||||
// Test file deletion
|
||||
FileInfo::delete_by_id(&file_info.id, &db)
|
||||
FileInfo::delete_by_id(&file_info.id, &db, &config)
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
assert!(!file_path.exists(), "File should be deleted");
|
||||
assert!(store::get_bytes_at(&file_info.path, &config).await.is_err());
|
||||
|
||||
// Clean up the test directory
|
||||
let _ = tokio::fs::remove_dir_all(custom_data_dir).await;
|
||||
|
||||
@@ -1,6 +1,16 @@
|
||||
use config::{Config, ConfigError, Environment, File};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum StorageKind {
|
||||
Local,
|
||||
}
|
||||
|
||||
fn default_storage_kind() -> StorageKind {
|
||||
StorageKind::Local
|
||||
}
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
pub struct AppConfig {
|
||||
pub openai_api_key: String,
|
||||
@@ -14,6 +24,8 @@ pub struct AppConfig {
|
||||
pub http_port: u16,
|
||||
#[serde(default = "default_base_url")]
|
||||
pub openai_base_url: String,
|
||||
#[serde(default = "default_storage_kind")]
|
||||
pub storage: StorageKind,
|
||||
}
|
||||
|
||||
fn default_data_dir() -> String {
|
||||
|
||||
@@ -135,7 +135,7 @@ pub async fn delete_text_content(
|
||||
|
||||
// If it has file info, delete that too
|
||||
if let Some(file_info) = &text_content.file_info {
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db).await?;
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?;
|
||||
}
|
||||
|
||||
// Delete related knowledge entities and text chunks
|
||||
|
||||
@@ -5,8 +5,7 @@ use axum::{
|
||||
response::IntoResponse,
|
||||
};
|
||||
use serde::Serialize;
|
||||
use tokio::{fs::File, join};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tokio::join;
|
||||
|
||||
use crate::{
|
||||
middlewares::{
|
||||
@@ -23,6 +22,7 @@ use common::{
|
||||
text_chunk::TextChunk, text_content::TextContent, user::User,
|
||||
},
|
||||
};
|
||||
use common::storage::store;
|
||||
|
||||
use crate::html_state::HtmlState;
|
||||
|
||||
@@ -77,7 +77,7 @@ pub async fn delete_text_content(
|
||||
let (_res1, _res2, _res3, _res4, _res5) = join!(
|
||||
async {
|
||||
if let Some(file_info) = text_content.file_info {
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db).await
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
@@ -177,14 +177,10 @@ pub async fn serve_file(
|
||||
return Ok(TemplateResponse::unauthorized().into_response());
|
||||
}
|
||||
|
||||
let path = std::path::Path::new(&file_info.path);
|
||||
|
||||
let file = match File::open(path).await {
|
||||
Ok(f) => f,
|
||||
Err(_e) => return Ok(TemplateResponse::server_error().into_response()),
|
||||
let stream = match store::get_stream_at(&file_info.path, &state.config).await {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Ok(TemplateResponse::server_error().into_response()),
|
||||
};
|
||||
|
||||
let stream = ReaderStream::new(file);
|
||||
let body = Body::from_stream(stream);
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
|
||||
Reference in New Issue
Block a user