mirror of
https://github.com/perstarkse/minne.git
synced 2026-01-18 15:56:55 +01:00
wip file upload and storage to disk
This commit is contained in:
64
Cargo.lock
generated
64
Cargo.lock
generated
@@ -106,6 +106,12 @@ version = "1.0.89"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
|
||||
|
||||
[[package]]
|
||||
name = "arc-swap"
|
||||
version = "1.7.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
|
||||
|
||||
[[package]]
|
||||
name = "asn1-rs"
|
||||
version = "0.6.2"
|
||||
@@ -548,6 +554,20 @@ dependencies = [
|
||||
"x509-cert",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "combine"
|
||||
version = "4.6.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"memchr",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "concurrent-queue"
|
||||
version = "2.5.0"
|
||||
@@ -917,6 +937,7 @@ checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
|
||||
dependencies = [
|
||||
"futures-core",
|
||||
"futures-macro",
|
||||
"futures-sink",
|
||||
"futures-task",
|
||||
"pin-project-lite",
|
||||
"pin-utils",
|
||||
@@ -1680,6 +1701,29 @@ dependencies = [
|
||||
"futures-io",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redis"
|
||||
version = "0.27.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a7e86f5670bd8b028edfb240f0616cad620705b31ec389d55e4f3da2c38dcd48"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
"bytes",
|
||||
"combine",
|
||||
"futures-util",
|
||||
"itoa",
|
||||
"num-bigint",
|
||||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"ryu",
|
||||
"sha1_smol",
|
||||
"socket2 0.5.7",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "redox_syscall"
|
||||
version = "0.5.3"
|
||||
@@ -1992,6 +2036,12 @@ dependencies = [
|
||||
"digest",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sha1_smol"
|
||||
version = "1.0.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d"
|
||||
|
||||
[[package]]
|
||||
name = "sha2"
|
||||
version = "0.10.8"
|
||||
@@ -2267,6 +2317,19 @@ dependencies = [
|
||||
"syn 2.0.77",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-util"
|
||||
version = "0.7.12"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"futures-core",
|
||||
"futures-sink",
|
||||
"pin-project-lite",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tower"
|
||||
version = "0.4.13"
|
||||
@@ -2751,6 +2814,7 @@ dependencies = [
|
||||
"lapin",
|
||||
"mime",
|
||||
"mime_guess",
|
||||
"redis",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
|
||||
@@ -13,6 +13,7 @@ futures-lite = "2.3.0"
|
||||
lapin = { version = "2.5.0", features = ["serde_json"] }
|
||||
mime = "0.3.17"
|
||||
mime_guess = "2.0.5"
|
||||
redis = { version = "0.27.2", features = ["aio", "tokio-comp"] }
|
||||
serde = { version = "1.0.210", features = ["derive"] }
|
||||
serde_json = "1.0.128"
|
||||
sha2 = "0.10.8"
|
||||
|
||||
@@ -41,6 +41,9 @@
|
||||
languages.rust.enable = true;
|
||||
|
||||
services = {
|
||||
redis = {
|
||||
enable = true;
|
||||
};
|
||||
rabbitmq = {
|
||||
enable = true;
|
||||
plugins = ["tracing"];
|
||||
|
||||
1
hello_world.txt
Normal file
1
hello_world.txt
Normal file
@@ -0,0 +1 @@
|
||||
Hello World
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod models;
|
||||
pub mod rabbitmq;
|
||||
pub mod redis;
|
||||
pub mod routes;
|
||||
pub mod utils;
|
||||
|
||||
357
src/models/file_info.rs
Normal file
357
src/models/file_info.rs
Normal file
@@ -0,0 +1,357 @@
|
||||
use axum::{http::StatusCode, response::{IntoResponse, Response}, Json};
|
||||
use axum_typed_multipart::FieldData;
|
||||
use mime_guess::from_path;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{
|
||||
io::{BufReader, Read},
|
||||
path::{Path, PathBuf},
|
||||
};
|
||||
use tempfile::NamedTempFile;
|
||||
use thiserror::Error;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::redis::client::RedisClient;
|
||||
|
||||
/// Represents metadata and storage information for a file.
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct FileInfo {
|
||||
pub uuid: Uuid,
|
||||
pub sha256: String,
|
||||
pub path: String,
|
||||
pub mime_type: String,
|
||||
}
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum FileError {
|
||||
#[error("IO error occurred: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("UTF-8 conversion error: {0}")]
|
||||
Utf8(#[from] std::string::FromUtf8Error),
|
||||
|
||||
#[error("MIME type detection failed for input: {0}")]
|
||||
MimeDetection(String),
|
||||
|
||||
#[error("Unsupported MIME type: {0}")]
|
||||
UnsupportedMime(String),
|
||||
|
||||
#[error("Redis error: {0}")]
|
||||
RedisError(#[from] crate::redis::client::RedisError),
|
||||
|
||||
#[error("File not found for UUID: {0}")]
|
||||
FileNotFound(Uuid),
|
||||
|
||||
#[error("Duplicate file detected with SHA256: {0}")]
|
||||
DuplicateFile(String),
|
||||
|
||||
#[error("Hash collision detected")]
|
||||
HashCollision,
|
||||
|
||||
#[error("Invalid UUID format: {0}")]
|
||||
InvalidUuid(String),
|
||||
|
||||
#[error("File name missing in metadata")]
|
||||
MissingFileName,
|
||||
|
||||
#[error("Failed to persist file: {0}")]
|
||||
PersistError(String),
|
||||
|
||||
#[error("Serialization error: {0}")]
|
||||
SerializationError(String),
|
||||
|
||||
#[error("Deserialization error: {0}")]
|
||||
DeserializationError(String),
|
||||
|
||||
// Add more error variants as needed.
|
||||
}
|
||||
|
||||
impl IntoResponse for FileError {
|
||||
fn into_response(self) -> Response {
|
||||
let (status, error_message) = match self {
|
||||
FileError::Io(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error"),
|
||||
FileError::Utf8(_) => (StatusCode::BAD_REQUEST, "Invalid UTF-8 data"),
|
||||
FileError::MimeDetection(_) => (StatusCode::BAD_REQUEST, "MIME type detection failed"),
|
||||
FileError::UnsupportedMime(_) => (StatusCode::UNSUPPORTED_MEDIA_TYPE, "Unsupported MIME type"),
|
||||
FileError::RedisError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Redis error"),
|
||||
FileError::FileNotFound(_) => (StatusCode::NOT_FOUND, "File not found"),
|
||||
FileError::DuplicateFile(_) => (StatusCode::CONFLICT, "Duplicate file detected"),
|
||||
FileError::HashCollision => (StatusCode::INTERNAL_SERVER_ERROR, "Hash collision detected"),
|
||||
FileError::InvalidUuid(_) => (StatusCode::BAD_REQUEST, "Invalid UUID format"),
|
||||
FileError::MissingFileName => (StatusCode::BAD_REQUEST, "Missing file name in metadata"),
|
||||
FileError::PersistError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Failed to persist file"),
|
||||
FileError::SerializationError(_) => (StatusCode::INTERNAL_SERVER_ERROR, "Serialization error"),
|
||||
FileError::DeserializationError(_) => (StatusCode::BAD_REQUEST, "Deserialization error"),
|
||||
};
|
||||
|
||||
let body = Json(json!({
|
||||
"error": error_message,
|
||||
}));
|
||||
|
||||
(status, body).into_response()
|
||||
}
|
||||
}
|
||||
|
||||
impl FileInfo {
|
||||
/// Creates a new `FileInfo` instance from uploaded field data.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `field_data` - The uploaded file data.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<FileInfo, FileError>` - The created `FileInfo` or an error.
|
||||
pub async fn new(field_data: FieldData<NamedTempFile>, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
|
||||
let file = field_data.contents; // NamedTempFile
|
||||
let metadata = field_data.metadata;
|
||||
|
||||
// Extract file name from metadata
|
||||
let file_name = metadata.file_name.ok_or(FileError::MissingFileName)?;
|
||||
info!("File name: {:?}", file_name);
|
||||
|
||||
// Calculate SHA256 hash of the file
|
||||
let sha = Self::get_sha(&file).await?;
|
||||
info!("SHA256: {:?}", sha);
|
||||
|
||||
// Check if SHA exists in Redis
|
||||
if let Some(existing_file_info) = redis_client.get_file_info_by_sha(&sha).await? {
|
||||
info!("Duplicate file detected with SHA256: {}", sha);
|
||||
return Ok(existing_file_info);
|
||||
}
|
||||
|
||||
// Generate a new UUID
|
||||
let uuid = Uuid::new_v4();
|
||||
info!("UUID: {:?}", uuid);
|
||||
|
||||
// Sanitize file name
|
||||
let sanitized_file_name = sanitize_file_name(&file_name);
|
||||
info!("Sanitized file name: {:?}", sanitized_file_name);
|
||||
|
||||
// Persist the file to the filesystem
|
||||
let persisted_path = Self::persist_file(&uuid, file, &sanitized_file_name).await?;
|
||||
|
||||
// Guess the MIME type
|
||||
let mime_type = Self::guess_mime_type(&persisted_path);
|
||||
info!("Mime type: {:?}", mime_type);
|
||||
|
||||
// Construct the FileInfo object
|
||||
let file_info = FileInfo {
|
||||
uuid,
|
||||
sha256: sha.clone(),
|
||||
path: persisted_path.to_string_lossy().to_string(),
|
||||
mime_type,
|
||||
};
|
||||
|
||||
// Store FileInfo in Redis with SHA256 as key
|
||||
redis_client.set_file_info(&sha, &file_info).await?;
|
||||
|
||||
// Map UUID to SHA256 in Redis
|
||||
redis_client.set_sha_uuid_mapping(&uuid, &sha).await?;
|
||||
|
||||
Ok(file_info)
|
||||
}
|
||||
|
||||
/// Retrieves `FileInfo` based on UUID.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<FileInfo, FileError>` - The `FileInfo` or an error.
|
||||
pub async fn get(uuid: Uuid, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
|
||||
// Fetch SHA256 from UUID mapping
|
||||
let sha = redis_client.get_sha_by_uuid(&uuid).await?
|
||||
.ok_or(FileError::FileNotFound(uuid))?;
|
||||
|
||||
// Retrieve FileInfo by SHA256
|
||||
let file_info = redis_client.get_file_info_by_sha(&sha).await?
|
||||
.ok_or(FileError::FileNotFound(uuid))?;
|
||||
|
||||
Ok(file_info)
|
||||
}
|
||||
|
||||
/// Updates an existing file identified by UUID with new file data.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file to update.
|
||||
/// * `new_field_data` - The new file data.
|
||||
/// * `redis_client` - Reference to the RedisClient.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<FileInfo, FileError>` - The updated `FileInfo` or an error.
|
||||
pub async fn update(uuid: Uuid, new_field_data: FieldData<NamedTempFile>, redis_client: &RedisClient) -> Result<FileInfo, FileError> {
|
||||
let new_file = new_field_data.contents;
|
||||
let new_metadata = new_field_data.metadata;
|
||||
|
||||
// Extract new file name
|
||||
let new_file_name = new_metadata.file_name.ok_or(FileError::MissingFileName)?;
|
||||
|
||||
// Calculate SHA256 of the new file
|
||||
let new_sha = Self::get_sha(&new_file).await?;
|
||||
|
||||
// Check if the new SHA already exists
|
||||
if let Some(existing_file_info) = redis_client.get_file_info_by_sha(&new_sha).await? {
|
||||
info!("Duplicate file detected with SHA256: {}", new_sha);
|
||||
return Ok(existing_file_info);
|
||||
}
|
||||
|
||||
// Sanitize new file name
|
||||
let sanitized_new_file_name = sanitize_file_name(&new_file_name);
|
||||
|
||||
// Persist the new file
|
||||
let new_persisted_path = Self::persist_file(&uuid, new_file, &sanitized_new_file_name).await?;
|
||||
|
||||
// Guess the new MIME type
|
||||
let new_mime_type = Self::guess_mime_type(&new_persisted_path);
|
||||
|
||||
// Retrieve existing FileInfo to get old SHA
|
||||
let old_file_info = Self::get(uuid, redis_client).await?;
|
||||
|
||||
// Update FileInfo
|
||||
let updated_file_info = FileInfo {
|
||||
uuid,
|
||||
sha256: new_sha.clone(),
|
||||
path: new_persisted_path.to_string_lossy().to_string(),
|
||||
mime_type: new_mime_type,
|
||||
};
|
||||
|
||||
// Update Redis: Remove old SHA entry and add new SHA entry
|
||||
redis_client.delete_file_info(&old_file_info.sha256).await?;
|
||||
redis_client.set_file_info(&new_sha, &updated_file_info).await?;
|
||||
redis_client.set_sha_uuid_mapping(&uuid, &new_sha).await?;
|
||||
|
||||
// Optionally, delete the old file from the filesystem if it's no longer referenced
|
||||
// This requires reference counting or checking if other FileInfo entries point to the same SHA
|
||||
// For simplicity, this step is omitted.
|
||||
|
||||
Ok(updated_file_info)
|
||||
}
|
||||
|
||||
/// Deletes a file and its corresponding metadata based on UUID.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file to delete.
|
||||
/// * `redis_client` - Reference to the RedisClient.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(), FileError>` - Empty result or an error.
|
||||
pub async fn delete(uuid: Uuid, redis_client: &RedisClient) -> Result<(), FileError> {
|
||||
// Retrieve FileInfo to get SHA256 and path
|
||||
let file_info = Self::get(uuid, redis_client).await?;
|
||||
|
||||
// Delete the file from the filesystem
|
||||
let file_path = Path::new(&file_info.path);
|
||||
if file_path.exists() {
|
||||
tokio::fs::remove_file(file_path).await.map_err(FileError::Io)?;
|
||||
info!("Deleted file at path: {}", file_info.path);
|
||||
} else {
|
||||
info!("File path does not exist, skipping deletion: {}", file_info.path);
|
||||
}
|
||||
|
||||
// Delete the FileInfo from Redis
|
||||
redis_client.delete_file_info(&file_info.sha256).await?;
|
||||
|
||||
// Delete the UUID to SHA mapping
|
||||
redis_client.delete_sha_uuid_mapping(&uuid).await?;
|
||||
|
||||
// Remove the UUID directory if empty
|
||||
let uuid_dir = file_path.parent().ok_or(FileError::FileNotFound(uuid))?;
|
||||
if uuid_dir.exists() {
|
||||
let mut entries = tokio::fs::read_dir(uuid_dir).await.map_err(FileError::Io)?;
|
||||
if entries.next_entry().await?.is_none() {
|
||||
tokio::fs::remove_dir(uuid_dir).await.map_err(FileError::Io)?;
|
||||
info!("Deleted empty UUID directory: {:?}", uuid_dir);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Persists the file to the filesystem under `./data/{uuid}/{file_name}`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file.
|
||||
/// * `file` - The temporary file to persist.
|
||||
/// * `file_name` - The sanitized file name.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<PathBuf, FileError>` - The persisted file path or an error.
|
||||
async fn persist_file(uuid: &Uuid, file: NamedTempFile, file_name: &str) -> Result<PathBuf, FileError> {
|
||||
let base_dir = Path::new("./data");
|
||||
let uuid_dir = base_dir.join(uuid.to_string());
|
||||
|
||||
// Create the UUID directory if it doesn't exist
|
||||
tokio::fs::create_dir_all(&uuid_dir).await.map_err(FileError::Io)?;
|
||||
|
||||
// Define the final file path
|
||||
let final_path = uuid_dir.join(file_name);
|
||||
info!("Final path: {:?}", final_path);
|
||||
|
||||
// Persist the temporary file to the final path
|
||||
file.persist(&final_path).map_err(|e| FileError::PersistError(e.to_string()))?;
|
||||
|
||||
info!("Persisted file to {:?}", final_path);
|
||||
|
||||
Ok(final_path)
|
||||
}
|
||||
|
||||
/// Calculates the SHA256 hash of the given file.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `file` - The file to hash.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<String, FileError>` - The SHA256 hash as a hex string or an error.
|
||||
async fn get_sha(file: &NamedTempFile) -> Result<String, FileError> {
|
||||
let mut reader = BufReader::new(file.as_file());
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buffer = [0u8; 8192]; // 8KB buffer
|
||||
|
||||
loop {
|
||||
let n = reader.read(&mut buffer)?;
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
hasher.update(&buffer[..n]);
|
||||
}
|
||||
|
||||
let digest = hasher.finalize();
|
||||
Ok(format!("{:x}", digest))
|
||||
}
|
||||
|
||||
/// Guesses the MIME type based on the file extension.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `path` - The path to the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `String` - The guessed MIME type as a string.
|
||||
fn guess_mime_type(path: &Path) -> String {
|
||||
from_path(path)
|
||||
.first_or(mime::APPLICATION_OCTET_STREAM)
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
/// Sanitizes the file name to prevent security vulnerabilities like directory traversal.
|
||||
/// Replaces any non-alphanumeric characters (excluding '.' and '_') with underscores.
|
||||
fn sanitize_file_name(file_name: &str) -> String {
|
||||
file_name.chars()
|
||||
.map(|c| if c.is_ascii_alphanumeric() || c == '.' || c == '_' { c } else { '_' })
|
||||
.collect()
|
||||
}
|
||||
@@ -1,109 +0,0 @@
|
||||
use axum_typed_multipart::{FieldData, TryFromMultipart};
|
||||
use mime_guess::from_path;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::{io::{BufReader, Read}, path::Path};
|
||||
use tempfile::NamedTempFile;
|
||||
use thiserror::Error;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
use uuid::Uuid;
|
||||
|
||||
/// Error types for file and content handling.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum FileError{
|
||||
#[error("IO error occurred: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
|
||||
#[error("MIME type detection failed for input: {0}")]
|
||||
MimeDetection(String),
|
||||
|
||||
#[error("Unsupported MIME type: {0}")]
|
||||
UnsupportedMime(String),
|
||||
}
|
||||
|
||||
#[derive(Debug, TryFromMultipart)]
|
||||
pub struct FileUploadRequest {
|
||||
#[form_data(limit = "unlimited")]
|
||||
pub file: FieldData<NamedTempFile>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
pub struct FileInfo {
|
||||
pub uuid: Uuid,
|
||||
pub sha256: String,
|
||||
pub path: String,
|
||||
pub mime_type: String,
|
||||
}
|
||||
|
||||
impl FileInfo {
|
||||
pub async fn new(file: NamedTempFile) -> Result<FileInfo, FileError> {
|
||||
// Calculate SHA based on file
|
||||
let sha = Self::get_sha(&file).await?;
|
||||
// Check if SHA exists in redis db
|
||||
// If so, return existing FileInfo
|
||||
// Generate UUID
|
||||
// Persist file with uuid as path
|
||||
// Guess the mime_type
|
||||
// Construct the object
|
||||
// file.persist("./data/{id}");
|
||||
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: sha, path:String::new(), mime_type:String::new() })
|
||||
}
|
||||
|
||||
pub async fn get(id: String) -> Result<FileInfo, FileError> {
|
||||
// Get the SHA based on file in uuid path
|
||||
// Check if SHA exists in redis
|
||||
// If so, return FileInfo
|
||||
// Else return error
|
||||
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
|
||||
}
|
||||
|
||||
pub async fn update(id: String, file: NamedTempFile) -> Result<FileInfo, FileError> {
|
||||
// Calculate SHA based on file
|
||||
// Check if SHA exists in redis
|
||||
// If so, return existing FileInfo
|
||||
// Use the submitted UUID
|
||||
// Replace the old file with uuid as path
|
||||
// Guess the mime_type
|
||||
// Construct the object
|
||||
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
|
||||
}
|
||||
|
||||
pub async fn delete(id: String) -> Result<(), FileError> {
|
||||
// Get the SHA based on file in uuid path
|
||||
// Remove the entry from redis db
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_sha(file: &NamedTempFile) -> Result<String, FileError> {
|
||||
let input = file.as_file();
|
||||
let mut reader = BufReader::new(input);
|
||||
let digest = {
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buffer = [0; 1024];
|
||||
loop {
|
||||
let count = reader.read(&mut buffer)?;
|
||||
if count == 0 { break }
|
||||
hasher.update(&buffer[..count]);
|
||||
}
|
||||
hasher.finalize()
|
||||
};
|
||||
|
||||
Ok(format!("{:X}", digest))
|
||||
}
|
||||
}
|
||||
// let input = File::open(path)?;
|
||||
// let mut reader = BufReader::new(input);
|
||||
|
||||
// let digest = {
|
||||
// let mut hasher = Sha256::new();
|
||||
// let mut buffer = [0; 1024];
|
||||
// loop {
|
||||
// let count = reader.read(&mut buffer)?;
|
||||
// if count == 0 { break }
|
||||
// hasher.update(&buffer[..count]);
|
||||
// }
|
||||
// hasher.finalize()
|
||||
// };
|
||||
// Ok(HEXLOWER.encode(digest.as_ref()))
|
||||
|
||||
@@ -2,7 +2,7 @@ use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use tracing::info;
|
||||
use url::Url;
|
||||
use super::files::FileInfo;
|
||||
use super::file_info::FileInfo;
|
||||
|
||||
#[derive(Debug, Deserialize, Serialize)]
|
||||
pub enum Content {
|
||||
|
||||
@@ -1,2 +1,2 @@
|
||||
pub mod files;
|
||||
pub mod file_info;
|
||||
pub mod ingress;
|
||||
|
||||
166
src/redis/client.rs
Normal file
166
src/redis/client.rs
Normal file
@@ -0,0 +1,166 @@
|
||||
use redis::AsyncCommands;
|
||||
use thiserror::Error;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::models::file_info::FileInfo;
|
||||
|
||||
/// Represents errors that can occur during Redis operations.
|
||||
#[derive(Error, Debug)]
|
||||
pub enum RedisError {
|
||||
#[error("Redis connection error: {0}")]
|
||||
ConnectionError(String),
|
||||
|
||||
#[error("Redis command error: {0}")]
|
||||
CommandError(String),
|
||||
|
||||
// Add more error variants as needed.
|
||||
}
|
||||
|
||||
/// Provides Redis-related operations for `FileInfo`.
|
||||
pub struct RedisClient {
|
||||
redis_url: String,
|
||||
}
|
||||
|
||||
impl RedisClient {
|
||||
/// Creates a new instance of `RedisClient`.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `redis_url` - The Redis server URL (e.g., "redis://127.0.0.1/").
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Self` - A new `RedisClient` instance.
|
||||
pub fn new(redis_url: &str) -> Self {
|
||||
Self {
|
||||
redis_url: redis_url.to_string(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Establishes a new multiplexed asynchronous connection to Redis.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `MultiplexedConnection` - The established connection.
|
||||
pub async fn get_connection(&self) -> Result<redis::aio::MultiplexedConnection, RedisError> {
|
||||
let client = redis::Client::open(self.redis_url.clone())
|
||||
.map_err(|e| RedisError::ConnectionError(e.to_string()))?;
|
||||
let conn = client
|
||||
.get_multiplexed_async_connection()
|
||||
.await
|
||||
.map_err(|e| RedisError::ConnectionError(e.to_string()))?;
|
||||
Ok(conn)
|
||||
}
|
||||
|
||||
/// Stores `FileInfo` in Redis using SHA256 as the key.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `sha256` - The SHA256 hash of the file.
|
||||
/// * `file_info` - The `FileInfo` object to store.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(), RedisError>` - Empty result or an error.
|
||||
pub async fn set_file_info(&self, sha256: &str, file_info: &FileInfo) -> Result<(), RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("file_info:{}", sha256);
|
||||
let value = serde_json::to_string(file_info)
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
conn.set(key, value).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves `FileInfo` from Redis using SHA256 as the key.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `sha256` - The SHA256 hash of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<Option<FileInfo>, RedisError>` - The `FileInfo` if found, otherwise `None`, or an error.
|
||||
pub async fn get_file_info_by_sha(&self, sha256: &str) -> Result<Option<FileInfo>, RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("file_info:{}", sha256);
|
||||
let value: Option<String> = conn.get(key).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
if let Some(json_str) = value {
|
||||
let file_info: FileInfo = serde_json::from_str(&json_str)
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(Some(file_info))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
|
||||
/// Deletes `FileInfo` from Redis using SHA256 as the key.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `sha256` - The SHA256 hash of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(), RedisError>` - Empty result or an error.
|
||||
pub async fn delete_file_info(&self, sha256: &str) -> Result<(), RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("file_info:{}", sha256);
|
||||
conn.del(key).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Sets a mapping from UUID to SHA256.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file.
|
||||
/// * `sha256` - The SHA256 hash of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(), RedisError>` - Empty result or an error.
|
||||
pub async fn set_sha_uuid_mapping(&self, uuid: &Uuid, sha256: &str) -> Result<(), RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("uuid_sha:{}", uuid);
|
||||
conn.set(key, sha256).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Retrieves the SHA256 hash associated with a given UUID.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<Option<String>, RedisError>` - The SHA256 hash if found, otherwise `None`, or an error.
|
||||
pub async fn get_sha_by_uuid(&self, uuid: &Uuid) -> Result<Option<String>, RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("uuid_sha:{}", uuid);
|
||||
let sha: Option<String> = conn.get(key).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(sha)
|
||||
}
|
||||
|
||||
/// Deletes the UUID to SHA256 mapping from Redis.
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `uuid` - The UUID of the file.
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Result<(), RedisError>` - Empty result or an error.
|
||||
pub async fn delete_sha_uuid_mapping(&self, uuid: &Uuid) -> Result<(), RedisError> {
|
||||
let mut conn = self.get_connection().await?;
|
||||
let key = format!("uuid_sha:{}", uuid);
|
||||
conn.del(key).await
|
||||
.map_err(|e| RedisError::CommandError(e.to_string()))?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
1
src/redis/mod.rs
Normal file
1
src/redis/mod.rs
Normal file
@@ -0,0 +1 @@
|
||||
pub mod client;
|
||||
@@ -1,22 +1,138 @@
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum_typed_multipart::TypedMultipart;
|
||||
// === Contents of ./routes/file.rs ===
|
||||
|
||||
use crate::models::files::FileUploadRequest;
|
||||
use axum::{
|
||||
extract::Path,
|
||||
response::IntoResponse,
|
||||
Json,
|
||||
|
||||
};
|
||||
use axum_typed_multipart::{TypedMultipart, FieldData, TryFromMultipart};
|
||||
use serde_json::json;
|
||||
use tempfile::NamedTempFile;
|
||||
use tracing::{error, info};
|
||||
use uuid::Uuid;
|
||||
|
||||
// async fn upload_asset(
|
||||
// TypedMultipart(UploadAssetRequest { image, author }): TypedMultipart<UploadAssetRequest>,
|
||||
// ) -> StatusCode {
|
||||
// let file_name = image.metadata.file_name.unwrap_or(String::from("data.bin"));
|
||||
// let path = Path::new("/tmp").join(author).join(file_name);
|
||||
use crate::{
|
||||
models::file_info::{FileError, FileInfo},
|
||||
redis::client::RedisClient,
|
||||
rabbitmq::publisher::RabbitMQProducer,
|
||||
};
|
||||
|
||||
// match image.contents.persist(path) {
|
||||
// Ok(_) => StatusCode::CREATED,
|
||||
// Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
// }
|
||||
// }
|
||||
pub async fn upload_handler(TypedMultipart(input): TypedMultipart<FileUploadRequest>) -> Response {
|
||||
// let file_name = input.file.metadata.file_name.unwrap_or("newstring".to_string());
|
||||
//
|
||||
#[derive(Debug, TryFromMultipart)]
|
||||
pub struct FileUploadRequest {
|
||||
#[form_data(limit = "100000")] // Example limit: ~100 KB
|
||||
pub file: FieldData<NamedTempFile>,
|
||||
}
|
||||
|
||||
"Successfully processed".to_string().into_response()
|
||||
}
|
||||
/// Handler to upload a new file.
|
||||
///
|
||||
/// Route: POST /file
|
||||
pub async fn upload_handler(
|
||||
TypedMultipart(input): TypedMultipart<FileUploadRequest>,
|
||||
) -> Result<impl IntoResponse, FileError> {
|
||||
info!("Received an upload request");
|
||||
|
||||
// Initialize a new RedisClient instance
|
||||
let redis_client = RedisClient::new("redis://127.0.0.1/");
|
||||
|
||||
// Process the file upload
|
||||
let file_info = FileInfo::new(input.file, &redis_client).await?;
|
||||
|
||||
// Prepare the response JSON
|
||||
let response = json!({
|
||||
"uuid": file_info.uuid,
|
||||
"sha256": file_info.sha256,
|
||||
"path": file_info.path,
|
||||
"mime_type": file_info.mime_type,
|
||||
});
|
||||
|
||||
info!("File uploaded successfully: {:?}", file_info);
|
||||
|
||||
// Return the response with HTTP 200
|
||||
Ok((axum::http::StatusCode::OK, Json(response)))
|
||||
}
|
||||
|
||||
/// Handler to retrieve file information by UUID.
|
||||
///
|
||||
/// Route: GET /file/:uuid
|
||||
pub async fn get_file_handler(
|
||||
Path(uuid_str): Path<String>,
|
||||
) -> Result<impl IntoResponse, FileError> {
|
||||
// Parse UUID
|
||||
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
|
||||
|
||||
// Initialize RedisClient
|
||||
let redis_client = RedisClient::new("redis://127.0.0.1/");
|
||||
|
||||
// Retrieve FileInfo
|
||||
let file_info = FileInfo::get(uuid, &redis_client).await?;
|
||||
|
||||
// Prepare the response JSON
|
||||
let response = json!({
|
||||
"uuid": file_info.uuid,
|
||||
"sha256": file_info.sha256,
|
||||
"path": file_info.path,
|
||||
"mime_type": file_info.mime_type,
|
||||
});
|
||||
|
||||
info!("Retrieved FileInfo: {:?}", file_info);
|
||||
|
||||
// Return the response with HTTP 200
|
||||
Ok((axum::http::StatusCode::OK, Json(response)))
|
||||
}
|
||||
|
||||
/// Handler to update an existing file by UUID.
|
||||
///
|
||||
/// Route: PUT /file/:uuid
|
||||
pub async fn update_file_handler(
|
||||
Path(uuid_str): Path<String>,
|
||||
TypedMultipart(input): TypedMultipart<FileUploadRequest>,
|
||||
) -> Result<impl IntoResponse, FileError> {
|
||||
// Parse UUID
|
||||
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
|
||||
|
||||
// Initialize RedisClient
|
||||
let redis_client = RedisClient::new("redis://127.0.0.1/");
|
||||
|
||||
// Update the file
|
||||
let updated_file_info = FileInfo::update(uuid, input.file, &redis_client).await?;
|
||||
|
||||
// Prepare the response JSON
|
||||
let response = json!({
|
||||
"uuid": updated_file_info.uuid,
|
||||
"sha256": updated_file_info.sha256,
|
||||
"path": updated_file_info.path,
|
||||
"mime_type": updated_file_info.mime_type,
|
||||
});
|
||||
|
||||
info!("File updated successfully: {:?}", updated_file_info);
|
||||
|
||||
// Return the response with HTTP 200
|
||||
Ok((axum::http::StatusCode::OK, Json(response)))
|
||||
}
|
||||
|
||||
/// Handler to delete a file by UUID.
|
||||
///
|
||||
/// Route: DELETE /file/:uuid
|
||||
pub async fn delete_file_handler(
|
||||
Path(uuid_str): Path<String>,
|
||||
) -> Result<impl IntoResponse, FileError> {
|
||||
// Parse UUID
|
||||
let uuid = Uuid::parse_str(&uuid_str).map_err(|_| FileError::InvalidUuid(uuid_str.clone()))?;
|
||||
|
||||
// Initialize RedisClient
|
||||
let redis_client = RedisClient::new("redis://127.0.0.1/");
|
||||
|
||||
// Delete the file
|
||||
FileInfo::delete(uuid, &redis_client).await?;
|
||||
|
||||
info!("Deleted file with UUID: {}", uuid);
|
||||
|
||||
// Prepare the response JSON
|
||||
let response = json!({
|
||||
"message": "File deleted successfully",
|
||||
});
|
||||
|
||||
// Return the response with HTTP 204 No Content
|
||||
Ok((axum::http::StatusCode::NO_CONTENT, Json(response)))
|
||||
}
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use axum::{
|
||||
extract::DefaultBodyLimit, routing::{get, post}, Extension, Router
|
||||
extract::DefaultBodyLimit, routing::{delete, get, post, put}, Extension, Router
|
||||
};
|
||||
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
|
||||
use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::upload_handler, ingress::ingress_handler, queue_length::queue_length_handler}};
|
||||
use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::{delete_file_handler, get_file_handler, update_file_handler, upload_handler}, ingress::ingress_handler, queue_length::queue_length_handler}};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
|
||||
@@ -30,8 +30,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
.route("/message_count", get(queue_length_handler))
|
||||
.layer(Extension(producer))
|
||||
.route("/file", post(upload_handler))
|
||||
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024));
|
||||
|
||||
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024))
|
||||
.route("/file/:uuid", get(get_file_handler))
|
||||
.route("/file/:uuid", put(update_file_handler))
|
||||
.route("/file/:uuid", delete(delete_file_handler));
|
||||
|
||||
tracing::info!("Listening on 0.0.0.0:3000");
|
||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
|
||||
axum::serve(listener, app).await?;
|
||||
|
||||
Reference in New Issue
Block a user