revised approach

split file operation and ingress functionality
This commit is contained in:
Per Stark
2024-09-24 16:21:27 +02:00
parent eed07c31f9
commit f18bce3c45
18 changed files with 264 additions and 36464 deletions

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod rabbitmq;
pub mod routes;
pub mod utils;

109
src/models/files.rs Normal file
View File

@@ -0,0 +1,109 @@
use axum_typed_multipart::{FieldData, TryFromMultipart};
use mime_guess::from_path;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::{io::{BufReader, Read}, path::Path};
use tempfile::NamedTempFile;
use thiserror::Error;
use tracing::info;
use url::Url;
use uuid::Uuid;
/// Error types for file and content handling.
#[derive(Error, Debug)]
pub enum FileError{
#[error("IO error occurred: {0}")]
Io(#[from] std::io::Error),
#[error("MIME type detection failed for input: {0}")]
MimeDetection(String),
#[error("Unsupported MIME type: {0}")]
UnsupportedMime(String),
}
#[derive(Debug, TryFromMultipart)]
pub struct FileUploadRequest {
#[form_data(limit = "unlimited")]
pub file: FieldData<NamedTempFile>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileInfo {
pub uuid: Uuid,
pub sha256: String,
pub path: String,
pub mime_type: String,
}
impl FileInfo {
pub async fn new(file: NamedTempFile) -> Result<FileInfo, FileError> {
// Calculate SHA based on file
let sha = Self::get_sha(&file).await?;
// Check if SHA exists in redis db
// If so, return existing FileInfo
// Generate UUID
// Persist file with uuid as path
// Guess the mime_type
// Construct the object
// file.persist("./data/{id}");
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: sha, path:String::new(), mime_type:String::new() })
}
pub async fn get(id: String) -> Result<FileInfo, FileError> {
// Get the SHA based on file in uuid path
// Check if SHA exists in redis
// If so, return FileInfo
// Else return error
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
}
pub async fn update(id: String, file: NamedTempFile) -> Result<FileInfo, FileError> {
// Calculate SHA based on file
// Check if SHA exists in redis
// If so, return existing FileInfo
// Use the submitted UUID
// Replace the old file with uuid as path
// Guess the mime_type
// Construct the object
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
}
pub async fn delete(id: String) -> Result<(), FileError> {
// Get the SHA based on file in uuid path
// Remove the entry from redis db
Ok(())
}
async fn get_sha(file: &NamedTempFile) -> Result<String, FileError> {
let input = file.as_file();
let mut reader = BufReader::new(input);
let digest = {
let mut hasher = Sha256::new();
let mut buffer = [0; 1024];
loop {
let count = reader.read(&mut buffer)?;
if count == 0 { break }
hasher.update(&buffer[..count]);
}
hasher.finalize()
};
Ok(format!("{:X}", digest))
}
}
// let input = File::open(path)?;
// let mut reader = BufReader::new(input);
// let digest = {
// let mut hasher = Sha256::new();
// let mut buffer = [0; 1024];
// loop {
// let count = reader.read(&mut buffer)?;
// if count == 0 { break }
// hasher.update(&buffer[..count]);
// }
// hasher.finalize()
// };
// Ok(HEXLOWER.encode(digest.as_ref()))

View File

@@ -1,35 +1,8 @@
use axum::extract::Multipart;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs;
use tracing::info;
use url::Url;
use uuid::Uuid;
use sha2::{Digest, Sha256};
use std::path::Path;
use mime_guess::from_path;
use axum_typed_multipart::{FieldData, TryFromMultipart };
use tempfile::NamedTempFile;
#[derive(Debug, TryFromMultipart)]
pub struct IngressMultipart {
/// JSON content field
pub content: Option<String>,
pub instructions: String,
pub category: String,
/// Optional file
#[form_data(limit = "unlimited")]
pub file: Option<FieldData<NamedTempFile>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileInfo {
pub uuid: Uuid,
pub sha256: String,
pub path: String,
pub mime_type: String,
}
use super::files::FileInfo;
#[derive(Debug, Deserialize, Serialize)]
pub enum Content {
@@ -37,6 +10,14 @@ pub enum Content {
Text(String),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressInput {
pub content: Option<String>,
pub instructions: String,
pub category: String,
pub files: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressContent {
pub content: Option<Content>,
@@ -65,29 +46,32 @@ pub enum IngressContentError {
}
impl IngressContent {
/// Create a new `IngressContent` from `IngressMultipart`.
/// Create a new `IngressContent` from `IngressInput`.
pub async fn new(
content: Option<String>, instructions: String, category: String,
file: Option<FileInfo>
input: IngressInput
) -> Result<IngressContent, IngressContentError> {
let content = if let Some(content_str) = content {
let content = if let Some(input_content) = input.content {
// Check if the content is a URL
if let Ok(url) = Url::parse(&content_str) {
if let Ok(url) = Url::parse(&input_content) {
info!("Detected URL: {}", url);
Some(Content::Url(url.to_string()))
} else {
info!("Treating input as plain text");
Some(Content::Text(content_str))
Some(Content::Text(input_content))
}
} else {
None
};
// Check if there is files
// Use FileInfo.get(id) with all the ids in files vec
// Return a vec<FileInfo> as file_info_list
Ok(IngressContent {
content,
instructions,
category,
files: file.map(|f| vec![f]), // Single file wrapped in a Vec
instructions: input.instructions,
category: input.category,
files: None,
})
}
}

View File

@@ -1 +1,2 @@
pub mod files;
pub mod ingress;

22
src/routes/file.rs Normal file
View File

@@ -0,0 +1,22 @@
use axum::response::{IntoResponse, Response};
use axum_typed_multipart::TypedMultipart;
use crate::models::files::FileUploadRequest;
// async fn upload_asset(
// TypedMultipart(UploadAssetRequest { image, author }): TypedMultipart<UploadAssetRequest>,
// ) -> StatusCode {
// let file_name = image.metadata.file_name.unwrap_or(String::from("data.bin"));
// let path = Path::new("/tmp").join(author).join(file_name);
// match image.contents.persist(path) {
// Ok(_) => StatusCode::CREATED,
// Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
// }
// }
pub async fn upload_handler(TypedMultipart(input): TypedMultipart<FileUploadRequest>) -> Response {
// let file_name = input.file.metadata.file_name.unwrap_or("newstring".to_string());
//
"Successfully processed".to_string().into_response()
}

33
src/routes/ingress.rs Normal file
View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use tracing::{error, info};
use crate::{models::ingress::{IngressContent, IngressInput}, rabbitmq::publisher::RabbitMQProducer};
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
Json(input): Json<IngressInput>,
) -> impl IntoResponse {
info!("Recieved input: {:?}", input);
if let Ok(content) = IngressContent::new(input).await {
// Publish content to RabbitMQ (or other system)
match producer.publish(&content).await {
Ok(_) => {
info!("Message published successfully");
"Successfully processed".to_string().into_response()
}
Err(e) => {
error!("Failed to publish message: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
else {
error!("Failed to create IngressContent object" );
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to create object").into_response()
}
}

3
src/routes/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod file;
pub mod ingress;
pub mod queue_length;

View File

@@ -0,0 +1,36 @@
use axum::{http::StatusCode, response::{IntoResponse, Response}};
use tracing::{error, info};
use crate::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig};
pub async fn queue_length_handler() -> Response {
info!("Getting queue length");
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
};
// Create a new consumer
match RabbitMQConsumer::new(&config).await {
Ok(consumer) => {
info!("Consumer connected to RabbitMQ");
// Get the queue length
let queue_length = consumer.queue.message_count();
info!("Queue length: {}", queue_length);
// Return the queue length with a 200 OK status
(StatusCode::OK, queue_length.to_string()).into_response()
},
Err(e) => {
error!("Failed to create consumer: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to connect to RabbitMQ".to_string()).into_response()
}
}
}

View File

@@ -1,107 +1,9 @@
use axum::{
extract::Multipart, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router
extract::DefaultBodyLimit, routing::{get, post}, Extension, Router
};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use uuid::Uuid;
use zettle_db::{models::ingress::{FileInfo, IngressContent, IngressMultipart }, rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}};
use zettle_db::rabbitmq::publisher::RabbitMQProducer;
use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::upload_handler, ingress::ingress_handler, queue_length::queue_length_handler}};
use std::sync::Arc;
use axum_typed_multipart::TypedMultipart;
use axum::debug_handler;
use tracing::{info, error};
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
TypedMultipart(multipart_data): TypedMultipart<IngressMultipart>, // Parse form data
) -> impl IntoResponse {
info!("Received multipart data: {:?}", &multipart_data);
let file_info = if let Some(file) = multipart_data.file {
// File name or default to "data.bin" if none is provided
let file_name = file.metadata.file_name.unwrap_or(String::from("data.bin"));
let mime_type = mime_guess::from_path(&file_name)
.first_or_octet_stream()
.to_string();
let uuid = Uuid::new_v4();
let path = std::path::Path::new("/tmp").join(uuid.to_string()).join(&file_name);
// Persist the file
match file.contents.persist(&path) {
Ok(_) => {
info!("File saved at: {:?}", path);
// Generate FileInfo
let file_info = FileInfo {
uuid,
sha256: "sha-12412".to_string(),
path: path.to_string_lossy().to_string(),
mime_type,
};
Some(file_info)
}
Err(e) => {
error!("Failed to save file: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to store file").into_response();
}
}
} else {
None // No file was uploaded
};
// Convert `IngressMultipart` to `IngressContent`
let content = match IngressContent::new(multipart_data.content, multipart_data.instructions,multipart_data.category, file_info).await {
Ok(content) => content,
Err(e) => {
error!("Error creating IngressContent: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create content").into_response();
}
};
// Publish content to RabbitMQ (or other system)
match producer.publish(&content).await {
Ok(_) => {
info!("Message published successfully");
"Successfully processed".to_string().into_response()
}
Err(e) => {
error!("Failed to publish message: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
async fn queue_length_handler() -> Response {
info!("Getting queue length");
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
};
// Create a new consumer
match RabbitMQConsumer::new(&config).await {
Ok(consumer) => {
info!("Consumer connected to RabbitMQ");
// Get the queue length
let queue_length = consumer.queue.message_count();
info!("Queue length: {}", queue_length);
// Return the queue length with a 200 OK status
(StatusCode::OK, queue_length.to_string()).into_response()
},
Err(e) => {
error!("Failed to create consumer: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to connect to RabbitMQ".to_string()).into_response()
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -126,7 +28,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let app = Router::new()
.route("/ingress", post(ingress_handler))
.route("/message_count", get(queue_length_handler))
.layer(Extension(producer));
.layer(Extension(producer))
.route("/file", post(upload_handler))
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024));
tracing::info!("Listening on 0.0.0.0:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;