mirror of
https://github.com/perstarkse/minne.git
synced 2026-01-11 12:40:24 +01:00
fix: improved storage manager, prep for s3
This commit is contained in:
@@ -1,15 +1,22 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{storage::db::SurrealDbClient, utils::config::AppConfig};
|
||||
use common::{
|
||||
storage::{db::SurrealDbClient, store::StorageManager},
|
||||
utils::config::AppConfig,
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiState {
|
||||
pub db: Arc<SurrealDbClient>,
|
||||
pub config: AppConfig,
|
||||
pub storage: StorageManager,
|
||||
}
|
||||
|
||||
impl ApiState {
|
||||
pub async fn new(config: &AppConfig) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
pub async fn new(
|
||||
config: &AppConfig,
|
||||
storage: StorageManager,
|
||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
let surreal_db_client = Arc::new(
|
||||
SurrealDbClient::new(
|
||||
&config.surrealdb_address,
|
||||
@@ -26,6 +33,7 @@ impl ApiState {
|
||||
let app_state = Self {
|
||||
db: surreal_db_client.clone(),
|
||||
config: config.clone(),
|
||||
storage,
|
||||
};
|
||||
|
||||
Ok(app_state)
|
||||
|
||||
@@ -32,7 +32,8 @@ pub async fn ingest_data(
|
||||
info!("Received input: {:?}", input);
|
||||
|
||||
let file_infos = try_join_all(input.files.into_iter().map(|file| {
|
||||
FileInfo::new(file, &state.db, &user.id, &state.config).map_err(AppError::from)
|
||||
FileInfo::new_with_storage(file, &state.db, &user.id, &state.storage)
|
||||
.map_err(AppError::from)
|
||||
}))
|
||||
.await?;
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,4 +1,5 @@
|
||||
use axum_typed_multipart::FieldData;
|
||||
use bytes;
|
||||
use mime_guess::from_path;
|
||||
use object_store::Error as ObjectStoreError;
|
||||
use sha2::{Digest, Sha256};
|
||||
@@ -13,9 +14,8 @@ use uuid::Uuid;
|
||||
|
||||
use crate::{
|
||||
error::AppError,
|
||||
storage::{db::SurrealDbClient, store},
|
||||
storage::{db::SurrealDbClient, store, store::StorageManager},
|
||||
stored_object,
|
||||
utils::config::AppConfig,
|
||||
};
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -51,54 +51,6 @@ stored_object!(FileInfo, "file", {
|
||||
});
|
||||
|
||||
impl FileInfo {
|
||||
pub async fn new(
|
||||
field_data: FieldData<NamedTempFile>,
|
||||
db_client: &SurrealDbClient,
|
||||
user_id: &str,
|
||||
config: &AppConfig,
|
||||
) -> Result<Self, FileError> {
|
||||
let file = field_data.contents;
|
||||
let file_name = field_data
|
||||
.metadata
|
||||
.file_name
|
||||
.ok_or(FileError::MissingFileName)?;
|
||||
|
||||
// Calculate SHA256
|
||||
let sha256 = Self::get_sha(&file).await?;
|
||||
|
||||
// Early return if file already exists
|
||||
match Self::get_by_sha(&sha256, db_client).await {
|
||||
Ok(existing_file) => {
|
||||
info!("File already exists with SHA256: {}", sha256);
|
||||
return Ok(existing_file);
|
||||
}
|
||||
Err(FileError::FileNotFound(_)) => (), // Expected case for new files
|
||||
Err(e) => return Err(e), // Propagate unexpected errors
|
||||
}
|
||||
|
||||
// Generate UUID and prepare paths
|
||||
let uuid = Uuid::new_v4();
|
||||
let sanitized_file_name = Self::sanitize_file_name(&file_name);
|
||||
|
||||
let now = Utc::now();
|
||||
// Create new FileInfo instance
|
||||
let file_info = Self {
|
||||
id: uuid.to_string(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
file_name,
|
||||
sha256,
|
||||
path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config).await?,
|
||||
mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)),
|
||||
user_id: user_id.to_string(),
|
||||
};
|
||||
|
||||
// Store in database
|
||||
db_client.store_item(file_info.clone()).await?;
|
||||
|
||||
Ok(file_info)
|
||||
}
|
||||
|
||||
/// Guesses the MIME type based on the file extension.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -167,36 +119,6 @@ impl FileInfo {
|
||||
}
|
||||
}
|
||||
|
||||
/// Persists the file under the logical location `{user_id}/{uuid}/{file_name}`.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `uuid` - The UUID of the file.
|
||||
/// * `file` - The temporary file to persist.
|
||||
/// * `file_name` - The sanitized file name.
|
||||
/// * `user-id` - User id
|
||||
/// * `config` - Application configuration containing data directory path
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<String, FileError>` - The logical object location or an error.
|
||||
async fn persist_file(
|
||||
uuid: &Uuid,
|
||||
file: NamedTempFile,
|
||||
file_name: &str,
|
||||
user_id: &str,
|
||||
config: &AppConfig,
|
||||
) -> Result<String, FileError> {
|
||||
// Logical object location relative to the store root
|
||||
let location = format!("{}/{}/{}", user_id, uuid, file_name);
|
||||
info!("Persisting to object location: {}", location);
|
||||
|
||||
let bytes = tokio::fs::read(file.path()).await?;
|
||||
store::put_bytes_at(&location, bytes.into(), config)
|
||||
.await
|
||||
.map_err(FileError::from)?;
|
||||
|
||||
Ok(location)
|
||||
}
|
||||
|
||||
/// Retrieves a `FileInfo` by SHA256.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -215,41 +137,6 @@ impl FileInfo {
|
||||
.ok_or(FileError::FileNotFound(sha256.to_string()))
|
||||
}
|
||||
|
||||
/// Removes FileInfo from database and file from disk
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `id` - Id of the FileInfo
|
||||
/// * `db_client` - Reference to SurrealDbClient
|
||||
///
|
||||
/// # Returns
|
||||
/// `Result<(), FileError>`
|
||||
pub async fn delete_by_id(
|
||||
id: &str,
|
||||
db_client: &SurrealDbClient,
|
||||
config: &AppConfig,
|
||||
) -> Result<(), AppError> {
|
||||
// Get the FileInfo from the database
|
||||
let Some(file_info) = db_client.get_item::<FileInfo>(id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Remove the object's parent prefix in the object store
|
||||
let (parent_prefix, _file_name) = store::split_object_path(&file_info.path)
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
store::delete_prefix_at(&parent_prefix, config)
|
||||
.await
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
info!(
|
||||
"Removed object prefix {} and its contents via object_store",
|
||||
parent_prefix
|
||||
);
|
||||
|
||||
// Delete the FileInfo from the database
|
||||
db_client.delete_item::<FileInfo>(id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves a `FileInfo` by its ID.
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -265,34 +152,168 @@ impl FileInfo {
|
||||
Err(e) => Err(FileError::SurrealError(e)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new FileInfo using StorageManager for persistent storage operations.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `field_data` - The uploaded file data
|
||||
/// * `db_client` - Reference to the SurrealDbClient
|
||||
/// * `user_id` - The user ID
|
||||
/// * `storage` - A StorageManager instance for storage operations
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<Self, FileError>` - The created FileInfo or an error
|
||||
pub async fn new_with_storage(
|
||||
field_data: FieldData<NamedTempFile>,
|
||||
db_client: &SurrealDbClient,
|
||||
user_id: &str,
|
||||
storage: &StorageManager,
|
||||
) -> Result<Self, FileError> {
|
||||
let file = field_data.contents;
|
||||
let file_name = field_data
|
||||
.metadata
|
||||
.file_name
|
||||
.ok_or(FileError::MissingFileName)?;
|
||||
let original_file_name = file_name.clone();
|
||||
|
||||
// Calculate SHA256
|
||||
let sha256 = Self::get_sha(&file).await?;
|
||||
|
||||
// Early return if file already exists
|
||||
match Self::get_by_sha(&sha256, db_client).await {
|
||||
Ok(existing_file) => {
|
||||
info!("File already exists with SHA256: {}", sha256);
|
||||
return Ok(existing_file);
|
||||
}
|
||||
Err(FileError::FileNotFound(_)) => (), // Expected case for new files
|
||||
Err(e) => return Err(e), // Propagate unexpected errors
|
||||
}
|
||||
|
||||
// Generate UUID and prepare paths
|
||||
let uuid = Uuid::new_v4();
|
||||
let sanitized_file_name = Self::sanitize_file_name(&file_name);
|
||||
let now = Utc::now();
|
||||
|
||||
let path =
|
||||
Self::persist_file_with_storage(&uuid, file, &sanitized_file_name, user_id, storage)
|
||||
.await?;
|
||||
|
||||
// Create FileInfo struct
|
||||
let file_info = FileInfo {
|
||||
id: uuid.to_string(),
|
||||
user_id: user_id.to_string(),
|
||||
sha256,
|
||||
file_name: original_file_name,
|
||||
path,
|
||||
mime_type: Self::guess_mime_type(Path::new(&file_name)),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
};
|
||||
|
||||
// Store in database
|
||||
db_client
|
||||
.store_item(file_info.clone())
|
||||
.await
|
||||
.map_err(FileError::SurrealError)?;
|
||||
|
||||
Ok(file_info)
|
||||
}
|
||||
|
||||
/// Delete a FileInfo by ID using StorageManager for storage operations.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `id` - ID of the FileInfo
|
||||
/// * `db_client` - Reference to SurrealDbClient
|
||||
/// * `storage` - A StorageManager instance for storage operations
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<(), AppError>` - Success or error
|
||||
pub async fn delete_by_id_with_storage(
|
||||
id: &str,
|
||||
db_client: &SurrealDbClient,
|
||||
storage: &StorageManager,
|
||||
) -> Result<(), AppError> {
|
||||
// Get the FileInfo from the database
|
||||
let Some(file_info) = db_client.get_item::<FileInfo>(id).await? else {
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
// Remove the object's parent prefix in the object store
|
||||
let (parent_prefix, _file_name) = store::split_object_path(&file_info.path)
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
storage
|
||||
.delete_prefix(&parent_prefix)
|
||||
.await
|
||||
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
|
||||
info!(
|
||||
"Removed object prefix {} and its contents via StorageManager",
|
||||
parent_prefix
|
||||
);
|
||||
|
||||
// Delete the FileInfo from the database
|
||||
db_client.delete_item::<FileInfo>(id).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieve file content using StorageManager for storage operations.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `storage` - A StorageManager instance for storage operations
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<bytes::Bytes, AppError>` - The file content or an error
|
||||
pub async fn get_content_with_storage(
|
||||
&self,
|
||||
storage: &StorageManager,
|
||||
) -> Result<bytes::Bytes, AppError> {
|
||||
storage
|
||||
.get(&self.path)
|
||||
.await
|
||||
.map_err(|e: object_store::Error| AppError::from(anyhow::anyhow!(e)))
|
||||
}
|
||||
|
||||
/// Persist file to storage using StorageManager.
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `uuid` - The UUID for the file
|
||||
/// * `file` - The temporary file to persist
|
||||
/// * `file_name` - The name of the file
|
||||
/// * `user_id` - The user ID
|
||||
/// * `storage` - A StorageManager instance for storage operations
|
||||
///
|
||||
/// # Returns
|
||||
/// * `Result<String, FileError>` - The logical object location or an error.
|
||||
async fn persist_file_with_storage(
|
||||
uuid: &Uuid,
|
||||
file: NamedTempFile,
|
||||
file_name: &str,
|
||||
user_id: &str,
|
||||
storage: &StorageManager,
|
||||
) -> Result<String, FileError> {
|
||||
// Logical object location relative to the store root
|
||||
let location = format!("{}/{}/{}", user_id, uuid, file_name);
|
||||
info!("Persisting to object location: {}", location);
|
||||
|
||||
let bytes = tokio::fs::read(file.path()).await?;
|
||||
storage
|
||||
.put(&location, bytes.into())
|
||||
.await
|
||||
.map_err(FileError::from)?;
|
||||
|
||||
Ok(location)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::utils::config::{AppConfig, PdfIngestMode::LlmFirst, StorageKind};
|
||||
use crate::storage::store::testing::TestStorageManager;
|
||||
use axum::http::HeaderMap;
|
||||
use axum_typed_multipart::FieldMetadata;
|
||||
use std::io::Write;
|
||||
use std::{io::Write, path::Path};
|
||||
use tempfile::NamedTempFile;
|
||||
|
||||
fn test_config(data_dir: &str) -> AppConfig {
|
||||
AppConfig {
|
||||
data_dir: data_dir.to_string(),
|
||||
openai_api_key: "test_key".to_string(),
|
||||
surrealdb_address: "test_address".to_string(),
|
||||
surrealdb_username: "test_user".to_string(),
|
||||
surrealdb_password: "test_pass".to_string(),
|
||||
surrealdb_namespace: "test_ns".to_string(),
|
||||
surrealdb_database: "test_db".to_string(),
|
||||
http_port: 3000,
|
||||
openai_base_url: "..".to_string(),
|
||||
storage: StorageKind::Local,
|
||||
pdf_ingest_mode: LlmFirst,
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates a test temporary file with the given content
|
||||
fn create_test_file(content: &[u8], file_name: &str) -> FieldData<NamedTempFile> {
|
||||
let mut temp_file = NamedTempFile::new().expect("Failed to create temp file");
|
||||
@@ -316,33 +337,39 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fileinfo_create_read_delete() {
|
||||
// Setup in-memory database for testing
|
||||
async fn test_fileinfo_create_read_delete_with_storage_manager() {
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
// Create a test file
|
||||
let content = b"This is a test file for cross-filesystem operations";
|
||||
let file_name = "cross_fs_test.txt";
|
||||
let content = b"This is a test file for StorageManager operations";
|
||||
let file_name = "storage_manager_test.txt";
|
||||
let field_data = create_test_file(content, file_name);
|
||||
|
||||
// Create a FileInfo instance with data_dir in /tmp
|
||||
// Create test storage manager (memory backend)
|
||||
let test_storage = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("Failed to create test storage manager");
|
||||
|
||||
// Create a FileInfo instance with storage manager
|
||||
let user_id = "test_user";
|
||||
let config = test_config("/tmp/minne_test_data");
|
||||
|
||||
// Test file creation
|
||||
let file_info = FileInfo::new(field_data, &db, user_id, &config)
|
||||
.await
|
||||
.expect("Failed to create file across filesystems");
|
||||
// Test file creation with StorageManager
|
||||
let file_info =
|
||||
FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to create file with StorageManager");
|
||||
assert_eq!(file_info.file_name, file_name);
|
||||
|
||||
// Verify the file exists via object_store and has correct content
|
||||
let bytes = store::get_bytes_at(&file_info.path, &config)
|
||||
// Verify the file exists via StorageManager and has correct content
|
||||
let bytes = file_info
|
||||
.get_content_with_storage(test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to read file content via object_store");
|
||||
assert_eq!(bytes, content.as_slice());
|
||||
.expect("Failed to read file content via StorageManager");
|
||||
assert_eq!(bytes.as_ref(), content);
|
||||
|
||||
// Test file reading
|
||||
let retrieved = FileInfo::get_by_id(&file_info.id, &db)
|
||||
@@ -350,51 +377,89 @@ mod tests {
|
||||
.expect("Failed to retrieve file info");
|
||||
assert_eq!(retrieved.id, file_info.id);
|
||||
assert_eq!(retrieved.sha256, file_info.sha256);
|
||||
assert_eq!(retrieved.file_name, file_name);
|
||||
|
||||
// Test file deletion
|
||||
FileInfo::delete_by_id(&file_info.id, &db, &config)
|
||||
// Test file deletion with StorageManager
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &db, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
assert!(
|
||||
store::get_bytes_at(&file_info.path, &config).await.is_err(),
|
||||
"File should be deleted"
|
||||
);
|
||||
.expect("Failed to delete file with StorageManager");
|
||||
|
||||
// Clean up the test directory
|
||||
let _ = tokio::fs::remove_dir_all(&config.data_dir).await;
|
||||
let deleted_result = file_info
|
||||
.get_content_with_storage(test_storage.storage())
|
||||
.await;
|
||||
assert!(deleted_result.is_err(), "File should be deleted");
|
||||
|
||||
// No cleanup needed - TestStorageManager handles it automatically
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fileinfo_duplicate_detection() {
|
||||
// Setup in-memory database for testing
|
||||
async fn test_fileinfo_preserves_original_filename_and_sanitizes_path() {
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
// Create a test file
|
||||
let content = b"This is a test file for cross-filesystem duplicate detection";
|
||||
let file_name = "cross_fs_duplicate.txt";
|
||||
let content = b"filename sanitization";
|
||||
let original_name = "Complex name (1).txt";
|
||||
let expected_sanitized = "Complex_name__1_.txt";
|
||||
let field_data = create_test_file(content, original_name);
|
||||
|
||||
let test_storage = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("Failed to create test storage manager");
|
||||
|
||||
let file_info =
|
||||
FileInfo::new_with_storage(field_data, &db, "sanitized_user", test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to create file via storage manager");
|
||||
|
||||
assert_eq!(file_info.file_name, original_name);
|
||||
|
||||
let stored_name = Path::new(&file_info.path)
|
||||
.file_name()
|
||||
.and_then(|name| name.to_str())
|
||||
.expect("stored name");
|
||||
assert_eq!(stored_name, expected_sanitized);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fileinfo_duplicate_detection_with_storage_manager() {
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
let content = b"This is a test file for StorageManager duplicate detection";
|
||||
let file_name = "storage_manager_duplicate.txt";
|
||||
let field_data = create_test_file(content, file_name);
|
||||
|
||||
// Create a FileInfo instance with data_dir in /tmp
|
||||
// Create test storage manager
|
||||
let test_storage = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("Failed to create test storage manager");
|
||||
|
||||
// Create a FileInfo instance with storage manager
|
||||
let user_id = "test_user";
|
||||
let config = test_config("/tmp/minne_test_data");
|
||||
|
||||
// Store the original file
|
||||
let original_file_info = FileInfo::new(field_data, &db, user_id, &config)
|
||||
.await
|
||||
.expect("Failed to create original file");
|
||||
let original_file_info =
|
||||
FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to create original file with StorageManager");
|
||||
|
||||
// Create another file with the same content but different name
|
||||
let duplicate_name = "cross_fs_duplicate_2.txt";
|
||||
let duplicate_name = "storage_manager_duplicate_2.txt";
|
||||
let field_data2 = create_test_file(content, duplicate_name);
|
||||
|
||||
// The system should detect it's the same file and return the original FileInfo
|
||||
let duplicate_file_info = FileInfo::new(field_data2, &db, user_id, &config)
|
||||
.await
|
||||
.expect("Failed to process duplicate file");
|
||||
let duplicate_file_info =
|
||||
FileInfo::new_with_storage(field_data2, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to process duplicate file with StorageManager");
|
||||
|
||||
// Verify duplicate detection worked
|
||||
assert_eq!(duplicate_file_info.id, original_file_info.id);
|
||||
@@ -402,34 +467,48 @@ mod tests {
|
||||
assert_eq!(duplicate_file_info.file_name, file_name);
|
||||
assert_ne!(duplicate_file_info.file_name, duplicate_name);
|
||||
|
||||
// Clean up
|
||||
FileInfo::delete_by_id(&original_file_info.id, &db, &config)
|
||||
// Verify both files have the same content (they should point to the same file)
|
||||
let original_content = original_file_info
|
||||
.get_content_with_storage(test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
let _ = tokio::fs::remove_dir_all(&config.data_dir).await;
|
||||
.unwrap();
|
||||
let duplicate_content = duplicate_file_info
|
||||
.get_content_with_storage(test_storage.storage())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(original_content.as_ref(), content);
|
||||
assert_eq!(duplicate_content.as_ref(), content);
|
||||
|
||||
// Clean up
|
||||
FileInfo::delete_by_id_with_storage(&original_file_info.id, &db, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete original file with StorageManager");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_creation() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations()
|
||||
.await
|
||||
.expect("Failed to apply migrations");
|
||||
|
||||
// Create a test file
|
||||
let content = b"This is a test file content";
|
||||
let file_name = "test_file.txt";
|
||||
let field_data = create_test_file(content, file_name);
|
||||
|
||||
// Create a FileInfo instance
|
||||
// Create a FileInfo instance with StorageManager
|
||||
let user_id = "test_user";
|
||||
let config = test_config("./data");
|
||||
let file_info = FileInfo::new(field_data, &db, user_id, &config).await;
|
||||
let test_storage = TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("create test storage manager");
|
||||
let file_info =
|
||||
FileInfo::new_with_storage(field_data, &db, user_id, test_storage.storage()).await;
|
||||
|
||||
// We can't fully test persistence to disk in unit tests,
|
||||
// but we can verify the database record was created
|
||||
// Verify the FileInfo was created successfully
|
||||
assert!(file_info.is_ok());
|
||||
let file_info = file_info.unwrap();
|
||||
|
||||
@@ -459,33 +538,39 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_duplicate_detection() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations()
|
||||
.await
|
||||
.expect("Failed to apply migrations");
|
||||
|
||||
// First, store a file with known content
|
||||
let content = b"This is a test file for duplicate detection";
|
||||
let file_name = "original.txt";
|
||||
let user_id = "test_user";
|
||||
|
||||
let config = test_config("./data");
|
||||
let test_storage = TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("create test storage manager");
|
||||
|
||||
let field_data1 = create_test_file(content, file_name);
|
||||
let original_file_info = FileInfo::new(field_data1, &db, user_id, &config)
|
||||
.await
|
||||
.expect("Failed to create original file");
|
||||
let original_file_info =
|
||||
FileInfo::new_with_storage(field_data1, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to create original file");
|
||||
|
||||
// Now try to store another file with the same content but different name
|
||||
let duplicate_name = "duplicate.txt";
|
||||
let field_data2 = create_test_file(content, duplicate_name);
|
||||
|
||||
// The system should detect it's the same file and return the original FileInfo
|
||||
let duplicate_file_info = FileInfo::new(field_data2, &db, user_id, &config)
|
||||
.await
|
||||
.expect("Failed to process duplicate file");
|
||||
let duplicate_file_info =
|
||||
FileInfo::new_with_storage(field_data2, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("Failed to process duplicate file");
|
||||
|
||||
// The returned FileInfo should match the original
|
||||
assert_eq!(duplicate_file_info.id, original_file_info.id);
|
||||
@@ -553,7 +638,6 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_by_sha_not_found() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
@@ -574,7 +658,6 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_manual_file_info_creation() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
@@ -615,23 +698,28 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_id() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
db.apply_migrations()
|
||||
.await
|
||||
.expect("Failed to apply migrations");
|
||||
|
||||
// Create and persist a test file via FileInfo::new
|
||||
// Create and persist a test file via FileInfo::new_with_storage
|
||||
let user_id = "user123";
|
||||
let cfg = test_config("./data");
|
||||
let test_storage = TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("create test storage manager");
|
||||
let temp = create_test_file(b"test content", "test_file.txt");
|
||||
let file_info = FileInfo::new(temp, &db, user_id, &cfg)
|
||||
let file_info = FileInfo::new_with_storage(temp, &db, user_id, test_storage.storage())
|
||||
.await
|
||||
.expect("create file");
|
||||
|
||||
// Delete the file
|
||||
let delete_result = FileInfo::delete_by_id(&file_info.id, &db, &cfg).await;
|
||||
// Delete the file using StorageManager
|
||||
let delete_result =
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &db, test_storage.storage()).await;
|
||||
|
||||
// Delete should be successful
|
||||
assert!(
|
||||
@@ -650,13 +738,12 @@ mod tests {
|
||||
"FileInfo should be deleted from the database"
|
||||
);
|
||||
|
||||
// Verify content no longer retrievable
|
||||
assert!(store::get_bytes_at(&file_info.path, &cfg).await.is_err());
|
||||
// Verify content no longer retrievable from storage
|
||||
assert!(test_storage.storage().get(&file_info.path).await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_delete_by_id_not_found() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
@@ -664,19 +751,16 @@ mod tests {
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
|
||||
// Try to delete a file that doesn't exist
|
||||
let result = FileInfo::delete_by_id(
|
||||
"nonexistent_id",
|
||||
&db,
|
||||
&test_config("./data"),
|
||||
)
|
||||
.await;
|
||||
let test_storage = TestStorageManager::new_memory().await.unwrap();
|
||||
let result =
|
||||
FileInfo::delete_by_id_with_storage("nonexistent_id", &db, test_storage.storage())
|
||||
.await;
|
||||
|
||||
// Should succeed even if the file record does not exist
|
||||
assert!(result.is_ok());
|
||||
}
|
||||
#[tokio::test]
|
||||
async fn test_get_by_id() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
@@ -717,7 +801,6 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_get_by_id_not_found() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
@@ -741,43 +824,197 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
// StorageManager-based tests
|
||||
#[tokio::test]
|
||||
async fn test_fileinfo_persist_with_custom_root() {
|
||||
// Setup in-memory database for testing
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
async fn test_file_info_new_with_storage_memory() {
|
||||
// Setup
|
||||
let db = SurrealDbClient::memory("test_ns", "test_file_storage_memory")
|
||||
.await
|
||||
.expect("Failed to start in-memory surrealdb");
|
||||
.unwrap();
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
// Create a test file
|
||||
let content = b"This is a test file for data directory configuration";
|
||||
let file_name = "data_dir_test.txt";
|
||||
let field_data = create_test_file(content, file_name);
|
||||
|
||||
// Create a FileInfo instance with a custom data directory
|
||||
let content = b"This is a test file for StorageManager";
|
||||
let field_data = create_test_file(content, "test_storage.txt");
|
||||
let user_id = "test_user";
|
||||
let custom_data_dir = "/tmp/minne_custom_data_dir";
|
||||
let config = test_config(custom_data_dir);
|
||||
|
||||
// Test file creation
|
||||
let file_info = FileInfo::new(field_data, &db, user_id, &config)
|
||||
// Create test storage manager
|
||||
let storage = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.expect("Failed to create file in custom data directory");
|
||||
.unwrap();
|
||||
|
||||
// Verify the file has the correct content via object_store
|
||||
let file_content = store::get_bytes_at(&file_info.path, &config)
|
||||
// Test file creation with StorageManager
|
||||
let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage())
|
||||
.await
|
||||
.expect("Failed to read file content");
|
||||
assert_eq!(file_content.as_ref(), content);
|
||||
.expect("Failed to create file with StorageManager");
|
||||
|
||||
// Test file deletion
|
||||
FileInfo::delete_by_id(&file_info.id, &db, &config)
|
||||
// Verify the file was created correctly
|
||||
assert_eq!(file_info.user_id, user_id);
|
||||
assert_eq!(file_info.file_name, "test_storage.txt");
|
||||
assert!(!file_info.sha256.is_empty());
|
||||
assert!(!file_info.path.is_empty());
|
||||
|
||||
// Test content retrieval with StorageManager
|
||||
let retrieved_content = file_info
|
||||
.get_content_with_storage(storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete file");
|
||||
assert!(store::get_bytes_at(&file_info.path, &config).await.is_err());
|
||||
.expect("Failed to get file content with StorageManager");
|
||||
assert_eq!(retrieved_content.as_ref(), content);
|
||||
|
||||
// Clean up the test directory
|
||||
let _ = tokio::fs::remove_dir_all(custom_data_dir).await;
|
||||
// Test file deletion with StorageManager
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &db, storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete file with StorageManager");
|
||||
|
||||
// Verify file is deleted
|
||||
let deleted_content_result = file_info.get_content_with_storage(storage.storage()).await;
|
||||
assert!(deleted_content_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_info_new_with_storage_local() {
|
||||
// Setup
|
||||
let db = SurrealDbClient::memory("test_ns", "test_file_storage_local")
|
||||
.await
|
||||
.unwrap();
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
let content = b"This is a test file for StorageManager with local storage";
|
||||
let field_data = create_test_file(content, "test_local.txt");
|
||||
let user_id = "test_user";
|
||||
|
||||
// Create test storage manager with local backend
|
||||
let storage = store::testing::TestStorageManager::new_local()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Test file creation with StorageManager
|
||||
let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage())
|
||||
.await
|
||||
.expect("Failed to create file with StorageManager");
|
||||
|
||||
// Verify the file was created correctly
|
||||
assert_eq!(file_info.user_id, user_id);
|
||||
assert_eq!(file_info.file_name, "test_local.txt");
|
||||
assert!(!file_info.sha256.is_empty());
|
||||
assert!(!file_info.path.is_empty());
|
||||
|
||||
// Test content retrieval with StorageManager
|
||||
let retrieved_content = file_info
|
||||
.get_content_with_storage(storage.storage())
|
||||
.await
|
||||
.expect("Failed to get file content with StorageManager");
|
||||
assert_eq!(retrieved_content.as_ref(), content);
|
||||
|
||||
// Test file deletion with StorageManager
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &db, storage.storage())
|
||||
.await
|
||||
.expect("Failed to delete file with StorageManager");
|
||||
|
||||
// Verify file is deleted
|
||||
let deleted_content_result = file_info.get_content_with_storage(storage.storage()).await;
|
||||
assert!(deleted_content_result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_info_storage_manager_persistence() {
|
||||
// Setup
|
||||
let db = SurrealDbClient::memory("test_ns", "test_file_persistence")
|
||||
.await
|
||||
.unwrap();
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
let content = b"Test content for persistence";
|
||||
let field_data = create_test_file(content, "persistence_test.txt");
|
||||
let user_id = "test_user";
|
||||
|
||||
// Create test storage manager
|
||||
let storage = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Create file
|
||||
let file_info = FileInfo::new_with_storage(field_data, &db, user_id, storage.storage())
|
||||
.await
|
||||
.expect("Failed to create file");
|
||||
|
||||
// Test that data persists across multiple operations with the same StorageManager
|
||||
let retrieved_content_1 = file_info
|
||||
.get_content_with_storage(storage.storage())
|
||||
.await
|
||||
.unwrap();
|
||||
let retrieved_content_2 = file_info
|
||||
.get_content_with_storage(storage.storage())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(retrieved_content_1.as_ref(), content);
|
||||
assert_eq!(retrieved_content_2.as_ref(), content);
|
||||
|
||||
// Test that different StorageManager instances don't share data (memory storage isolation)
|
||||
let storage2 = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.unwrap();
|
||||
let isolated_content_result = file_info.get_content_with_storage(storage2.storage()).await;
|
||||
assert!(
|
||||
isolated_content_result.is_err(),
|
||||
"Different StorageManager should not have access to same data"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_file_info_storage_manager_equivalence() {
|
||||
// Setup
|
||||
let db = SurrealDbClient::memory("test_ns", "test_file_equivalence")
|
||||
.await
|
||||
.unwrap();
|
||||
db.apply_migrations().await.unwrap();
|
||||
|
||||
let content = b"Test content for equivalence testing";
|
||||
let field_data1 = create_test_file(content, "equivalence_test_1.txt");
|
||||
let field_data2 = create_test_file(content, "equivalence_test_2.txt");
|
||||
let user_id = "test_user";
|
||||
|
||||
// Create single storage manager and reuse it
|
||||
let storage_manager = store::testing::TestStorageManager::new_memory()
|
||||
.await
|
||||
.unwrap();
|
||||
let storage = storage_manager.storage();
|
||||
|
||||
// Create multiple files with the same storage manager
|
||||
let file_info_1 = FileInfo::new_with_storage(field_data1, &db, user_id, &storage)
|
||||
.await
|
||||
.expect("Failed to create file 1");
|
||||
|
||||
let file_info_2 = FileInfo::new_with_storage(field_data2, &db, user_id, &storage)
|
||||
.await
|
||||
.expect("Failed to create file 2");
|
||||
|
||||
// Test that both files can be retrieved with the same storage backend
|
||||
let content_1 = file_info_1
|
||||
.get_content_with_storage(&storage)
|
||||
.await
|
||||
.unwrap();
|
||||
let content_2 = file_info_2
|
||||
.get_content_with_storage(&storage)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(content_1.as_ref(), content);
|
||||
assert_eq!(content_2.as_ref(), content);
|
||||
|
||||
// Test that files can be deleted with the same storage manager
|
||||
FileInfo::delete_by_id_with_storage(&file_info_1.id, &db, &storage)
|
||||
.await
|
||||
.unwrap();
|
||||
FileInfo::delete_by_id_with_storage(&file_info_2.id, &db, &storage)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Verify files are deleted
|
||||
let deleted_content_1 = file_info_1.get_content_with_storage(&storage).await;
|
||||
let deleted_content_2 = file_info_2.get_content_with_storage(&storage).await;
|
||||
|
||||
assert!(deleted_content_1.is_err());
|
||||
assert!(deleted_content_2.is_err());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -471,12 +471,15 @@ mod tests {
|
||||
.expect("Failed to apply migrations");
|
||||
|
||||
let user_id = "test_user_123";
|
||||
let scratchpad = Scratchpad::new(user_id.to_string(), "Test Timezone Scratchpad".to_string());
|
||||
let scratchpad =
|
||||
Scratchpad::new(user_id.to_string(), "Test Timezone Scratchpad".to_string());
|
||||
let scratchpad_id = scratchpad.id.clone();
|
||||
|
||||
db.store_item(scratchpad).await.unwrap();
|
||||
|
||||
let retrieved = Scratchpad::get_by_id(&scratchpad_id, user_id, &db).await.unwrap();
|
||||
let retrieved = Scratchpad::get_by_id(&scratchpad_id, user_id, &db)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Test that datetime fields are preserved and can be used for timezone formatting
|
||||
assert!(retrieved.created_at.timestamp() > 0);
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use config::{Config, ConfigError, Environment, File};
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Clone, Deserialize, Debug)]
|
||||
#[derive(Clone, Deserialize, Debug, PartialEq)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum StorageKind {
|
||||
Local,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use common::storage::db::SurrealDbClient;
|
||||
use common::storage::{db::SurrealDbClient, store::StorageManager};
|
||||
use common::utils::template_engine::{ProvidesTemplateEngine, TemplateEngine};
|
||||
use common::{create_template_engine, storage::db::ProvidesDb, utils::config::AppConfig};
|
||||
use composite_retrieval::reranking::RerankerPool;
|
||||
@@ -14,14 +14,16 @@ pub struct HtmlState {
|
||||
pub templates: Arc<TemplateEngine>,
|
||||
pub session_store: Arc<SessionStoreType>,
|
||||
pub config: AppConfig,
|
||||
pub storage: StorageManager,
|
||||
pub reranker_pool: Option<Arc<RerankerPool>>,
|
||||
}
|
||||
|
||||
impl HtmlState {
|
||||
pub fn new_with_resources(
|
||||
pub async fn new_with_resources(
|
||||
db: Arc<SurrealDbClient>,
|
||||
openai_client: Arc<OpenAIClientType>,
|
||||
session_store: Arc<SessionStoreType>,
|
||||
storage: StorageManager,
|
||||
config: AppConfig,
|
||||
reranker_pool: Option<Arc<RerankerPool>>,
|
||||
) -> Result<Self, Box<dyn std::error::Error>> {
|
||||
@@ -34,6 +36,7 @@ impl HtmlState {
|
||||
session_store,
|
||||
templates: Arc::new(template_engine),
|
||||
config,
|
||||
storage,
|
||||
reranker_pool,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -190,7 +190,7 @@ pub async fn delete_text_content(
|
||||
TextContent::has_other_with_file(&file_info.id, &text_content.id, &state.db).await?;
|
||||
|
||||
if !file_in_use {
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?;
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &state.db, &state.storage).await?;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -17,7 +17,6 @@ use crate::{
|
||||
utils::text_content_preview::truncate_text_contents,
|
||||
AuthSessionType,
|
||||
};
|
||||
use common::storage::store;
|
||||
use common::storage::types::user::DashboardStats;
|
||||
use common::{
|
||||
error::AppError,
|
||||
@@ -86,7 +85,7 @@ pub async fn delete_text_content(
|
||||
TextContent::has_other_with_file(&file_info.id, &text_content.id, &state.db).await?;
|
||||
|
||||
if !file_in_use {
|
||||
FileInfo::delete_by_id(&file_info.id, &state.db, &state.config).await?;
|
||||
FileInfo::delete_by_id_with_storage(&file_info.id, &state.db, &state.storage).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -278,7 +277,7 @@ pub async fn serve_file(
|
||||
return Ok(TemplateResponse::unauthorized().into_response());
|
||||
}
|
||||
|
||||
let stream = match store::get_stream_at(&file_info.path, &state.config).await {
|
||||
let stream = match state.storage.get_stream(&file_info.path).await {
|
||||
Ok(s) => s,
|
||||
Err(_) => return Ok(TemplateResponse::server_error().into_response()),
|
||||
};
|
||||
|
||||
@@ -98,7 +98,8 @@ pub async fn process_ingress_form(
|
||||
info!("{:?}", input);
|
||||
|
||||
let file_infos = try_join_all(input.files.into_iter().map(|file| {
|
||||
FileInfo::new(file, &state.db, &user.id, &state.config).map_err(AppError::from)
|
||||
FileInfo::new_with_storage(file, &state.db, &user.id, &state.storage)
|
||||
.map_err(AppError::from)
|
||||
}))
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -18,7 +18,8 @@ async-openai = { workspace = true }
|
||||
surrealdb = { workspace = true }
|
||||
dom_smoothie = { workspace = true }
|
||||
tempfile = { workspace = true }
|
||||
axum_typed_multipart = { workspace = true}
|
||||
axum_typed_multipart = { workspace = true}
|
||||
anyhow = { workspace = true }
|
||||
reqwest = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
text-splitter = { workspace = true }
|
||||
@@ -28,6 +29,7 @@ headless_chrome = { workspace = true }
|
||||
base64 = { workspace = true }
|
||||
pdf-extract = "0.9"
|
||||
lopdf = "0.32"
|
||||
bytes = { workspace = true }
|
||||
|
||||
common = { path = "../common" }
|
||||
composite-retrieval = { path = "../composite-retrieval" }
|
||||
|
||||
@@ -19,6 +19,7 @@ use common::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
store::StorageManager,
|
||||
types::{
|
||||
ingestion_payload::IngestionPayload,
|
||||
ingestion_task::{IngestionTask, TaskErrorInfo},
|
||||
@@ -47,12 +48,14 @@ impl IngestionPipeline {
|
||||
openai_client: Arc<Client<async_openai::config::OpenAIConfig>>,
|
||||
config: AppConfig,
|
||||
reranker_pool: Option<Arc<RerankerPool>>,
|
||||
storage: StorageManager,
|
||||
) -> Result<Self, AppError> {
|
||||
let services = DefaultPipelineServices::new(
|
||||
db.clone(),
|
||||
openai_client.clone(),
|
||||
config.clone(),
|
||||
reranker_pool,
|
||||
storage,
|
||||
);
|
||||
|
||||
Self::with_services(db, IngestionConfig::default(), Arc::new(services))
|
||||
|
||||
@@ -2,6 +2,7 @@ use common::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
store::StorageManager,
|
||||
types::{
|
||||
ingestion_payload::IngestionPayload,
|
||||
text_content::{TextContent, UrlInfo},
|
||||
@@ -19,6 +20,7 @@ pub(crate) async fn to_text_content(
|
||||
db: &SurrealDbClient,
|
||||
config: &AppConfig,
|
||||
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
storage: &StorageManager,
|
||||
) -> Result<TextContent, AppError> {
|
||||
match ingestion_payload {
|
||||
IngestionPayload::Url {
|
||||
@@ -27,7 +29,7 @@ pub(crate) async fn to_text_content(
|
||||
category,
|
||||
user_id,
|
||||
} => {
|
||||
let (article, file_info) = extract_text_from_url(&url, db, &user_id, config).await?;
|
||||
let (article, file_info) = extract_text_from_url(&url, db, &user_id, storage).await?;
|
||||
Ok(TextContent::new(
|
||||
article.text_content.into(),
|
||||
Some(context),
|
||||
@@ -60,7 +62,8 @@ pub(crate) async fn to_text_content(
|
||||
category,
|
||||
user_id,
|
||||
} => {
|
||||
let text = extract_text_from_file(&file_info, db, openai_client, config).await?;
|
||||
let text =
|
||||
extract_text_from_file(&file_info, db, openai_client, config, storage).await?;
|
||||
Ok(TextContent::new(
|
||||
text,
|
||||
Some(context),
|
||||
|
||||
@@ -10,6 +10,7 @@ use common::{
|
||||
error::AppError,
|
||||
storage::{
|
||||
db::SurrealDbClient,
|
||||
store::StorageManager,
|
||||
types::{
|
||||
ingestion_payload::IngestionPayload, knowledge_entity::KnowledgeEntity,
|
||||
knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings,
|
||||
@@ -65,6 +66,7 @@ pub struct DefaultPipelineServices {
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
config: AppConfig,
|
||||
reranker_pool: Option<Arc<RerankerPool>>,
|
||||
storage: StorageManager,
|
||||
}
|
||||
|
||||
impl DefaultPipelineServices {
|
||||
@@ -73,12 +75,14 @@ impl DefaultPipelineServices {
|
||||
openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
|
||||
config: AppConfig,
|
||||
reranker_pool: Option<Arc<RerankerPool>>,
|
||||
storage: StorageManager,
|
||||
) -> Self {
|
||||
Self {
|
||||
db,
|
||||
openai_client,
|
||||
config,
|
||||
reranker_pool,
|
||||
storage,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -144,7 +148,14 @@ impl PipelineServices for DefaultPipelineServices {
|
||||
&self,
|
||||
payload: IngestionPayload,
|
||||
) -> Result<TextContent, AppError> {
|
||||
to_text_content(payload, &self.db, &self.config, &self.openai_client).await
|
||||
to_text_content(
|
||||
payload,
|
||||
&self.db,
|
||||
&self.config,
|
||||
&self.openai_client,
|
||||
&self.storage,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
async fn retrieve_similar_entities(
|
||||
|
||||
@@ -1,63 +1,200 @@
|
||||
use anyhow::anyhow;
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::{db::SurrealDbClient, store, types::file_info::FileInfo},
|
||||
storage::{db::SurrealDbClient, store::StorageManager, types::file_info::FileInfo},
|
||||
utils::config::AppConfig,
|
||||
};
|
||||
use std::{
|
||||
env,
|
||||
io::{Error as IoError, ErrorKind},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::{
|
||||
audio_transcription::transcribe_audio_file, image_parsing::extract_text_from_image,
|
||||
pdf_ingestion::extract_pdf_content,
|
||||
};
|
||||
|
||||
struct TempPathGuard {
|
||||
path: PathBuf,
|
||||
}
|
||||
|
||||
impl TempPathGuard {
|
||||
fn as_path(&self) -> &Path {
|
||||
&self.path
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for TempPathGuard {
|
||||
fn drop(&mut self) {
|
||||
let _ = std::fs::remove_file(&self.path);
|
||||
}
|
||||
}
|
||||
|
||||
async fn materialize_temp_file(
|
||||
bytes: &[u8],
|
||||
extension: Option<&str>,
|
||||
) -> Result<TempPathGuard, AppError> {
|
||||
let mut path = env::temp_dir();
|
||||
let mut file_name = format!("minne-ingest-{}", Uuid::new_v4());
|
||||
|
||||
if let Some(ext) = extension {
|
||||
if !ext.is_empty() {
|
||||
file_name.push('.');
|
||||
file_name.push_str(ext);
|
||||
}
|
||||
}
|
||||
|
||||
path.push(file_name);
|
||||
|
||||
tokio::fs::write(&path, bytes).await?;
|
||||
|
||||
Ok(TempPathGuard { path })
|
||||
}
|
||||
|
||||
async fn resolve_existing_local_path(storage: &StorageManager, location: &str) -> Option<PathBuf> {
|
||||
let path = storage.resolve_local_path(location)?;
|
||||
match tokio::fs::metadata(&path).await {
|
||||
Ok(_) => Some(path),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn infer_extension(file_info: &FileInfo) -> Option<String> {
|
||||
Path::new(&file_info.path)
|
||||
.extension()
|
||||
.and_then(|ext| ext.to_str())
|
||||
.map(|ext| ext.to_string())
|
||||
}
|
||||
|
||||
pub async fn extract_text_from_file(
|
||||
file_info: &FileInfo,
|
||||
db_client: &SurrealDbClient,
|
||||
openai_client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
config: &AppConfig,
|
||||
storage: &StorageManager,
|
||||
) -> Result<String, AppError> {
|
||||
let base_path = store::resolve_base_dir(config);
|
||||
let absolute_path = base_path.join(&file_info.path);
|
||||
let file_bytes = storage
|
||||
.get(&file_info.path)
|
||||
.await
|
||||
.map_err(|e| AppError::from(anyhow!(e)))?;
|
||||
let local_path = resolve_existing_local_path(storage, &file_info.path).await;
|
||||
|
||||
match file_info.mime_type.as_str() {
|
||||
"text/plain" | "text/markdown" | "application/octet-stream" | "text/x-rust" => {
|
||||
let content = tokio::fs::read_to_string(&absolute_path).await?;
|
||||
let content = String::from_utf8(file_bytes.to_vec())
|
||||
.map_err(|err| AppError::Io(IoError::new(ErrorKind::InvalidData, err)))?;
|
||||
Ok(content)
|
||||
}
|
||||
"application/pdf" => {
|
||||
extract_pdf_content(
|
||||
&absolute_path,
|
||||
if let Some(path) = local_path.as_ref() {
|
||||
return extract_pdf_content(
|
||||
path,
|
||||
db_client,
|
||||
openai_client,
|
||||
&config.pdf_ingest_mode,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
let temp_guard = materialize_temp_file(file_bytes.as_ref(), Some("pdf")).await?;
|
||||
let result = extract_pdf_content(
|
||||
temp_guard.as_path(),
|
||||
db_client,
|
||||
openai_client,
|
||||
&config.pdf_ingest_mode,
|
||||
)
|
||||
.await
|
||||
.await;
|
||||
drop(temp_guard);
|
||||
result
|
||||
}
|
||||
"image/png" | "image/jpeg" => {
|
||||
let path_str = absolute_path
|
||||
.to_str()
|
||||
.ok_or_else(|| {
|
||||
AppError::Processing(format!(
|
||||
"Encountered a non-UTF8 path while reading image {}",
|
||||
file_info.id
|
||||
))
|
||||
})?
|
||||
.to_string();
|
||||
let content = extract_text_from_image(&path_str, db_client, openai_client).await?;
|
||||
let content =
|
||||
extract_text_from_image(file_bytes.as_ref(), db_client, openai_client).await?;
|
||||
Ok(content)
|
||||
}
|
||||
"audio/mpeg" | "audio/mp3" | "audio/wav" | "audio/x-wav" | "audio/webm" | "audio/mp4"
|
||||
| "audio/ogg" | "audio/flac" => {
|
||||
let path_str = absolute_path
|
||||
.to_str()
|
||||
.ok_or_else(|| {
|
||||
if let Some(path) = local_path.as_ref() {
|
||||
let path_str = path.to_str().ok_or_else(|| {
|
||||
AppError::Processing(format!(
|
||||
"Encountered a non-UTF8 path while reading audio {}",
|
||||
file_info.id
|
||||
))
|
||||
})?
|
||||
.to_string();
|
||||
transcribe_audio_file(&path_str, db_client, openai_client).await
|
||||
})?;
|
||||
return transcribe_audio_file(path_str, db_client, openai_client).await;
|
||||
}
|
||||
|
||||
let extension = infer_extension(file_info);
|
||||
let temp_guard =
|
||||
materialize_temp_file(file_bytes.as_ref(), extension.as_deref()).await?;
|
||||
let path_str = temp_guard.as_path().to_str().ok_or_else(|| {
|
||||
AppError::Processing(format!(
|
||||
"Encountered a non-UTF8 path while reading audio {}",
|
||||
file_info.id
|
||||
))
|
||||
})?;
|
||||
let result = transcribe_audio_file(path_str, db_client, openai_client).await;
|
||||
drop(temp_guard);
|
||||
result
|
||||
}
|
||||
_ => Err(AppError::NotFound(file_info.mime_type.clone())),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use async_openai::{config::OpenAIConfig, Client};
|
||||
use bytes::Bytes;
|
||||
use chrono::Utc;
|
||||
use common::{
|
||||
storage::{db::SurrealDbClient, store::StorageManager},
|
||||
utils::config::{AppConfig, StorageKind},
|
||||
};
|
||||
|
||||
#[tokio::test]
|
||||
async fn extracts_text_using_memory_storage_backend() {
|
||||
let mut config = AppConfig::default();
|
||||
config.storage = StorageKind::Memory;
|
||||
|
||||
let storage = StorageManager::new(&config)
|
||||
.await
|
||||
.expect("create storage manager");
|
||||
|
||||
let location = "user/test/file.txt";
|
||||
let contents = b"hello from memory storage";
|
||||
|
||||
storage
|
||||
.put(location, Bytes::from(contents.as_slice().to_vec()))
|
||||
.await
|
||||
.expect("write object");
|
||||
|
||||
let now = Utc::now();
|
||||
let file_info = FileInfo {
|
||||
id: "file".into(),
|
||||
created_at: now,
|
||||
updated_at: now,
|
||||
sha256: "sha256".into(),
|
||||
path: location.to_string(),
|
||||
file_name: "file.txt".into(),
|
||||
mime_type: "text/plain".into(),
|
||||
user_id: "user".into(),
|
||||
};
|
||||
|
||||
let namespace = "test_ns";
|
||||
let database = &Uuid::new_v4().to_string();
|
||||
let db = SurrealDbClient::memory(namespace, database)
|
||||
.await
|
||||
.expect("create surreal memory");
|
||||
|
||||
let openai_client = Client::with_config(OpenAIConfig::default());
|
||||
|
||||
let text = extract_text_from_file(&file_info, &db, &openai_client, &config, &storage)
|
||||
.await
|
||||
.expect("extract text");
|
||||
|
||||
assert_eq!(text, String::from_utf8_lossy(contents));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,14 +10,13 @@ use common::{
|
||||
};
|
||||
|
||||
pub async fn extract_text_from_image(
|
||||
path: &str,
|
||||
image_bytes: &[u8],
|
||||
db: &SurrealDbClient,
|
||||
client: &async_openai::Client<async_openai::config::OpenAIConfig>,
|
||||
) -> Result<String, AppError> {
|
||||
let system_settings = SystemSettings::get_current(db).await?;
|
||||
let image_bytes = tokio::fs::read(&path).await?;
|
||||
|
||||
let base64_image = STANDARD.encode(&image_bytes);
|
||||
let base64_image = STANDARD.encode(image_bytes);
|
||||
|
||||
let image_url = format!("data:image/png;base64,{base64_image}");
|
||||
|
||||
|
||||
@@ -3,8 +3,7 @@ use axum_typed_multipart::{FieldData, FieldMetadata};
|
||||
use chrono::Utc;
|
||||
use common::{
|
||||
error::AppError,
|
||||
storage::{db::SurrealDbClient, types::file_info::FileInfo},
|
||||
utils::config::AppConfig,
|
||||
storage::{db::SurrealDbClient, store::StorageManager, types::file_info::FileInfo},
|
||||
};
|
||||
use dom_smoothie::{Article, Readability, TextMode};
|
||||
use headless_chrome::Browser;
|
||||
@@ -19,7 +18,7 @@ pub async fn extract_text_from_url(
|
||||
url: &str,
|
||||
db: &SurrealDbClient,
|
||||
user_id: &str,
|
||||
config: &AppConfig,
|
||||
storage: &StorageManager,
|
||||
) -> Result<(Article, FileInfo), AppError> {
|
||||
info!("Fetching URL: {}", url);
|
||||
let now = Instant::now();
|
||||
@@ -81,7 +80,7 @@ pub async fn extract_text_from_url(
|
||||
metadata,
|
||||
};
|
||||
|
||||
let file_info = FileInfo::new(field_data, db, user_id, config).await?;
|
||||
let file_info = FileInfo::new_with_storage(field_data, db, user_id, storage).await?;
|
||||
|
||||
let config = dom_smoothie::Config {
|
||||
text_mode: TextMode::Markdown,
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use api_router::{api_routes_v1, api_state::ApiState};
|
||||
use axum::{extract::FromRef, Router};
|
||||
use common::{storage::db::SurrealDbClient, utils::config::get_config};
|
||||
use common::{
|
||||
storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config,
|
||||
};
|
||||
use composite_retrieval::reranking::RerankerPool;
|
||||
use html_router::{html_routes, html_state::HtmlState};
|
||||
use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop};
|
||||
@@ -46,18 +48,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let reranker_pool = RerankerPool::maybe_from_config(&config)?;
|
||||
|
||||
// Create global storage manager
|
||||
let storage = StorageManager::new(&config).await?;
|
||||
|
||||
let html_state = HtmlState::new_with_resources(
|
||||
db,
|
||||
openai_client,
|
||||
session_store,
|
||||
storage.clone(),
|
||||
config.clone(),
|
||||
reranker_pool.clone(),
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
let api_state = ApiState {
|
||||
db: html_state.db.clone(),
|
||||
config: config.clone(),
|
||||
};
|
||||
let api_state = ApiState::new(&config, storage.clone()).await?;
|
||||
|
||||
// Create Axum router
|
||||
let app = Router::new()
|
||||
@@ -115,6 +119,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
openai_client.clone(),
|
||||
config.clone(),
|
||||
reranker_pool.clone(),
|
||||
storage.clone(), // Use the global storage manager
|
||||
)
|
||||
.await
|
||||
.unwrap(),
|
||||
@@ -147,6 +152,7 @@ struct AppState {
|
||||
mod tests {
|
||||
use super::*;
|
||||
use axum::{body::Body, http::Request, http::StatusCode, Router};
|
||||
use common::storage::store::StorageManager;
|
||||
use common::utils::config::{AppConfig, PdfIngestMode, StorageKind};
|
||||
use std::{path::Path, sync::Arc};
|
||||
use tower::ServiceExt;
|
||||
@@ -195,18 +201,25 @@ mod tests {
|
||||
.with_api_base(&config.openai_base_url),
|
||||
));
|
||||
|
||||
let storage = StorageManager::new(&config)
|
||||
.await
|
||||
.expect("failed to build storage manager");
|
||||
|
||||
let html_state = HtmlState::new_with_resources(
|
||||
db.clone(),
|
||||
openai_client,
|
||||
session_store,
|
||||
storage.clone(),
|
||||
config.clone(),
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.expect("failed to build html state");
|
||||
|
||||
let api_state = ApiState {
|
||||
db: html_state.db.clone(),
|
||||
db: db.clone(),
|
||||
config: config.clone(),
|
||||
storage,
|
||||
};
|
||||
|
||||
let app = Router::new()
|
||||
|
||||
@@ -2,7 +2,9 @@ use std::sync::Arc;
|
||||
|
||||
use api_router::{api_routes_v1, api_state::ApiState};
|
||||
use axum::{extract::FromRef, Router};
|
||||
use common::{storage::db::SurrealDbClient, utils::config::get_config};
|
||||
use common::{
|
||||
storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config,
|
||||
};
|
||||
use composite_retrieval::reranking::RerankerPool;
|
||||
use html_router::{html_routes, html_state::HtmlState};
|
||||
use tracing::info;
|
||||
@@ -44,18 +46,20 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let reranker_pool = RerankerPool::maybe_from_config(&config)?;
|
||||
|
||||
// Create global storage manager
|
||||
let storage = StorageManager::new(&config).await?;
|
||||
|
||||
let html_state = HtmlState::new_with_resources(
|
||||
db,
|
||||
openai_client,
|
||||
session_store,
|
||||
storage.clone(),
|
||||
config.clone(),
|
||||
reranker_pool,
|
||||
)?;
|
||||
)
|
||||
.await?;
|
||||
|
||||
let api_state = ApiState {
|
||||
db: html_state.db.clone(),
|
||||
config: config.clone(),
|
||||
};
|
||||
let api_state = ApiState::new(&config, storage).await?;
|
||||
|
||||
// Create Axum router
|
||||
let app = Router::new()
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use common::{storage::db::SurrealDbClient, utils::config::get_config};
|
||||
use common::{
|
||||
storage::db::SurrealDbClient, storage::store::StorageManager, utils::config::get_config,
|
||||
};
|
||||
use composite_retrieval::reranking::RerankerPool;
|
||||
use ingestion_pipeline::{pipeline::IngestionPipeline, run_worker_loop};
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
@@ -35,8 +37,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
|
||||
let reranker_pool = RerankerPool::maybe_from_config(&config)?;
|
||||
|
||||
// Create global storage manager
|
||||
let storage = StorageManager::new(&config).await?;
|
||||
|
||||
let ingestion_pipeline = Arc::new(
|
||||
IngestionPipeline::new(db.clone(), openai_client.clone(), config, reranker_pool).await?,
|
||||
IngestionPipeline::new(
|
||||
db.clone(),
|
||||
openai_client.clone(),
|
||||
config,
|
||||
reranker_pool,
|
||||
storage,
|
||||
)
|
||||
.await?,
|
||||
);
|
||||
|
||||
run_worker_loop(db, ingestion_pipeline).await
|
||||
|
||||
Reference in New Issue
Block a user