revised approach

split file operation and ingress functionality
This commit is contained in:
Per Stark
2024-09-24 16:21:27 +02:00
parent 1bd2eee8e1
commit 18c84f5c88
18 changed files with 264 additions and 36464 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,81 +0,0 @@
# 2024-09-18 12:16:15.162114
+/help
# 2024-09-18 12:16:20.233100
+/models
# 2024-09-18 12:16:24.809720
+/models gpt
# 2024-09-18 12:16:48.569734
+/models sonett
# 2024-09-18 12:16:51.577488
+/models sonnett
# 2024-09-18 12:17:00.056944
+/models openrouter/anth
# 2024-09-18 12:17:14.809931
+/model openrouter/anthropic/claude-3.5-sonnet
# 2024-09-18 12:17:33.881262
+/map-refresh
# 2024-09-18 12:17:41.641177
+/help
# 2024-09-18 12:17:57.961398
+/map
# 2024-09-18 12:18:05.401158
+/git add .
# 2024-09-18 12:18:07.192985
+/map
# 2024-09-18 12:18:28.009751
+/add src/server.rs src/rabbitmq/mod.rs
# 2024-09-18 12:19:07.145078
+/git add .
# 2024-09-18 12:19:10.360827
+/add src/consumer.rs
# 2024-09-18 12:19:21.032773
+/help
# 2024-09-18 12:19:29.945442
+/web https://github.com/gftea/amqprs/blob/main/examples/src/basic_pub_sub.rs
# 2024-09-18 12:20:07.212864
+/run cargo run --bin server
# 2024-09-18 12:20:36.696824
+/undo
# 2024-09-18 12:22:06.040133
+/ask Welcome to the project, were setting up a rust server running axum and a ingress route that takes a string for now. The ingress route will publish a message containing the string to the rabbitmq connection. The consumer consumes messages and prints them for now. We should have a rabbitmq module that is usable and testable from both the server and consumer
# 2024-09-18 12:23:21.895428
+/code implement the changes
# 2024-09-18 12:23:59.290161
+/run cargo run --bin server
# 2024-09-18 12:24:03.149084
+/run cargo run --bin consumer
# 2024-09-18 12:24:12.199239
+/code fix the errors
# 2024-09-18 12:24:56.459326
+/run cargo run --bin server
# 2024-09-18 12:25:06.791601
+/undo
# 2024-09-18 12:25:14.774649
+/exit

Binary file not shown.

33
changes.md Normal file
View File

@@ -0,0 +1,33 @@
Your proposed structure for the API sounds solid and modular, making it easier to manage files and their relationships with other data. Heres a breakdown of how this can work and how it could be used with an iOS shortcut or a custom application:
API Structure
File Management Endpoints:
POST /file: Accepts a file upload and returns a unique identifier (ID) for that file.
PUT /file/{id}: Updates the metadata of the file identified by {id}.
DELETE /file/{id}: Deletes the file and its associated metadata from the database.
Data Ingress Endpoint:
POST /ingress: Accepts a JSON body containing references (IDs) to files and other necessary data, linking them in the database as needed.
Using with iOS Shortcuts
You can create shortcuts to interact with your API endpoints without the need for a full-fledged application. Here's how:
File Upload Shortcut:
Use the "Get File" action to select a file from the device.
Use the "Post" action to send a multipart/form-data request to the /file endpoint.
Parse the response to get the returned file ID.
Data Ingress Shortcut:
Use the "Ask for Input" action to gather the necessary fields for the ingress (like instructions, category, etc.) and the file ID(s).
Use another "Post" action to send this data to the /ingress endpoint as JSON.
Developing a CLI Tool
A CLI tool could also be developed for easier interaction with your API. This tool could:
Upload Files: Handle file uploads and return file IDs.
Link Data: Accept user input for instructions, category, and linked file IDs, then submit this data to the /ingress endpoint.
Additional Considerations
Error Handling: Ensure that both the upload and ingress endpoints handle errors gracefully and provide meaningful messages.
Documentation: Create clear API documentation to guide users in constructing requests correctly, whether they are using a shortcut, CLI, or custom application.
Authentication: Consider adding authentication to your API endpoints for security, especially if sensitive data is being handled.
This approach gives you the flexibility to support various clients, streamline interactions, and keep the server-side implementation clean and manageable. Would you like more specifics on any part of this setup?

103
doc.rs
View File

@@ -1,103 +0,0 @@
use amqprs::{
callbacks::{DefaultChannelCallback, DefaultConnectionCallback},
channel::{
BasicConsumeArguments, BasicPublishArguments, QueueBindArguments, QueueDeclareArguments,
},
connection::{Connection, OpenConnectionArguments},
consumer::DefaultConsumer,
BasicProperties,
};
use tokio::time;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() {
// construct a subscriber that prints formatted traces to stdout
// global subscriber with log level according to RUST_LOG
tracing_subscriber::registry()
.with(fmt::layer())
.with(EnvFilter::from_default_env())
.try_init()
.ok();
// open a connection to RabbitMQ server
let connection = Connection::open(&OpenConnectionArguments::new(
"localhost",
5672,
"user",
"bitnami",
))
.await
.unwrap();
connection
.register_callback(DefaultConnectionCallback)
.await
.unwrap();
// open a channel on the connection
let channel = connection.open_channel(None).await.unwrap();
channel
.register_callback(DefaultChannelCallback)
.await
.unwrap();
// declare a durable queue
let (queue_name, _, _) = channel
.queue_declare(QueueDeclareArguments::durable_client_named(
"amqprs.examples.basic",
))
.await
.unwrap()
.unwrap();
// bind the queue to exchange
let routing_key = "amqprs.example";
let exchange_name = "amq.topic";
channel
.queue_bind(QueueBindArguments::new(
&queue_name,
exchange_name,
routing_key,
))
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// start consumer with given name
let args = BasicConsumeArguments::new(&queue_name, "example_basic_pub_sub");
channel
.basic_consume(DefaultConsumer::new(args.no_ack), args)
.await
.unwrap();
//////////////////////////////////////////////////////////////////////////////
// publish message
let content = String::from(
r#"
{
"publisher": "example"
"data": "Hello, amqprs!"
}
"#,
)
.into_bytes();
// create arguments for basic_publish
let args = BasicPublishArguments::new(exchange_name, routing_key);
channel
.basic_publish(BasicProperties::default(), content, args)
.await
.unwrap();
// keep the `channel` and `connection` object from dropping before pub/sub is done.
// channel/connection will be closed when drop.
time::sleep(time::Duration::from_secs(1)).await;
// explicitly close
channel.close().await.unwrap();
connection.close().await.unwrap();
}

File diff suppressed because one or more lines are too long

13
log
View File

@@ -1,13 +0,0 @@
2024-09-20T08:47:35.861748Z INFO server: Listening on 0.0.0.0:3000
2024-09-20T08:47:40.074036Z INFO server: Received payload: "hello"
2024-09-20T08:47:40.116592Z INFO amqprs::api::connection: open connection AMQPRS000@localhost:5672/
2024-09-20T08:47:40.117117Z INFO amqprs::api::connection: open channel 1 [open] of connection 'AMQPRS000@localhost:5672/ [open]'
2024-09-20T08:47:41.120340Z INFO server: Message published successfully
2024-09-20T08:47:41.120554Z INFO amqprs::api::connection: try to close connection AMQPRS000@localhost:5672/ at drop
2024-09-20T08:47:41.120634Z INFO amqprs::api::channel: try to close channel 1 at drop
2024-09-20T08:47:41.122283Z WARN amqprs::net::reader_handler: CloseOk responder not found, probably connection 'AMQPRS000@localhost:5672/ [closed]' has dropped
2024-09-20T08:47:41.122309Z INFO amqprs::net::reader_handler: close connection 'AMQPRS000@localhost:5672/ [closed]' OK
2024-09-20T08:47:41.122362Z INFO amqprs::net::reader_handler: connection 'AMQPRS000@localhost:5672/ [closed]' is closed, shutting down socket I/O handlers
2024-09-20T08:47:41.122424Z INFO amqprs::api::channel::dispatcher: exit dispatcher of channel 1 [closed] of connection 'AMQPRS000@localhost:5672/ [closed]'
2024-09-20T08:47:41.122450Z INFO amqprs::net::writer_handler: received shutdown notification for connection 'AMQPRS000@localhost:5672/ [closed]'
2024-09-20T08:47:41.122472Z ERROR amqprs::api::channel: failed to gracefully close channel 1 at drop, cause: 'AMQP internal communication error: channel closed'

View File

@@ -1,3 +1,4 @@
pub mod models;
pub mod rabbitmq;
pub mod routes;
pub mod utils;

109
src/models/files.rs Normal file
View File

@@ -0,0 +1,109 @@
use axum_typed_multipart::{FieldData, TryFromMultipart};
use mime_guess::from_path;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::{io::{BufReader, Read}, path::Path};
use tempfile::NamedTempFile;
use thiserror::Error;
use tracing::info;
use url::Url;
use uuid::Uuid;
/// Error types for file and content handling.
#[derive(Error, Debug)]
pub enum FileError{
#[error("IO error occurred: {0}")]
Io(#[from] std::io::Error),
#[error("MIME type detection failed for input: {0}")]
MimeDetection(String),
#[error("Unsupported MIME type: {0}")]
UnsupportedMime(String),
}
#[derive(Debug, TryFromMultipart)]
pub struct FileUploadRequest {
#[form_data(limit = "unlimited")]
pub file: FieldData<NamedTempFile>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileInfo {
pub uuid: Uuid,
pub sha256: String,
pub path: String,
pub mime_type: String,
}
impl FileInfo {
pub async fn new(file: NamedTempFile) -> Result<FileInfo, FileError> {
// Calculate SHA based on file
let sha = Self::get_sha(&file).await?;
// Check if SHA exists in redis db
// If so, return existing FileInfo
// Generate UUID
// Persist file with uuid as path
// Guess the mime_type
// Construct the object
// file.persist("./data/{id}");
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: sha, path:String::new(), mime_type:String::new() })
}
pub async fn get(id: String) -> Result<FileInfo, FileError> {
// Get the SHA based on file in uuid path
// Check if SHA exists in redis
// If so, return FileInfo
// Else return error
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
}
pub async fn update(id: String, file: NamedTempFile) -> Result<FileInfo, FileError> {
// Calculate SHA based on file
// Check if SHA exists in redis
// If so, return existing FileInfo
// Use the submitted UUID
// Replace the old file with uuid as path
// Guess the mime_type
// Construct the object
Ok(FileInfo { uuid: Uuid::new_v4(), sha256: String::new() , path:String::new(), mime_type:String::new() })
}
pub async fn delete(id: String) -> Result<(), FileError> {
// Get the SHA based on file in uuid path
// Remove the entry from redis db
Ok(())
}
async fn get_sha(file: &NamedTempFile) -> Result<String, FileError> {
let input = file.as_file();
let mut reader = BufReader::new(input);
let digest = {
let mut hasher = Sha256::new();
let mut buffer = [0; 1024];
loop {
let count = reader.read(&mut buffer)?;
if count == 0 { break }
hasher.update(&buffer[..count]);
}
hasher.finalize()
};
Ok(format!("{:X}", digest))
}
}
// let input = File::open(path)?;
// let mut reader = BufReader::new(input);
// let digest = {
// let mut hasher = Sha256::new();
// let mut buffer = [0; 1024];
// loop {
// let count = reader.read(&mut buffer)?;
// if count == 0 { break }
// hasher.update(&buffer[..count]);
// }
// hasher.finalize()
// };
// Ok(HEXLOWER.encode(digest.as_ref()))

View File

@@ -1,35 +1,8 @@
use axum::extract::Multipart;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs;
use tracing::info;
use url::Url;
use uuid::Uuid;
use sha2::{Digest, Sha256};
use std::path::Path;
use mime_guess::from_path;
use axum_typed_multipart::{FieldData, TryFromMultipart };
use tempfile::NamedTempFile;
#[derive(Debug, TryFromMultipart)]
pub struct IngressMultipart {
/// JSON content field
pub content: Option<String>,
pub instructions: String,
pub category: String,
/// Optional file
#[form_data(limit = "unlimited")]
pub file: Option<FieldData<NamedTempFile>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileInfo {
pub uuid: Uuid,
pub sha256: String,
pub path: String,
pub mime_type: String,
}
use super::files::FileInfo;
#[derive(Debug, Deserialize, Serialize)]
pub enum Content {
@@ -37,6 +10,14 @@ pub enum Content {
Text(String),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressInput {
pub content: Option<String>,
pub instructions: String,
pub category: String,
pub files: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressContent {
pub content: Option<Content>,
@@ -65,29 +46,32 @@ pub enum IngressContentError {
}
impl IngressContent {
/// Create a new `IngressContent` from `IngressMultipart`.
/// Create a new `IngressContent` from `IngressInput`.
pub async fn new(
content: Option<String>, instructions: String, category: String,
file: Option<FileInfo>
input: IngressInput
) -> Result<IngressContent, IngressContentError> {
let content = if let Some(content_str) = content {
let content = if let Some(input_content) = input.content {
// Check if the content is a URL
if let Ok(url) = Url::parse(&content_str) {
if let Ok(url) = Url::parse(&input_content) {
info!("Detected URL: {}", url);
Some(Content::Url(url.to_string()))
} else {
info!("Treating input as plain text");
Some(Content::Text(content_str))
Some(Content::Text(input_content))
}
} else {
None
};
// Check if there is files
// Use FileInfo.get(id) with all the ids in files vec
// Return a vec<FileInfo> as file_info_list
Ok(IngressContent {
content,
instructions,
category,
files: file.map(|f| vec![f]), // Single file wrapped in a Vec
instructions: input.instructions,
category: input.category,
files: None,
})
}
}

View File

@@ -1 +1,2 @@
pub mod files;
pub mod ingress;

22
src/routes/file.rs Normal file
View File

@@ -0,0 +1,22 @@
use axum::response::{IntoResponse, Response};
use axum_typed_multipart::TypedMultipart;
use crate::models::files::FileUploadRequest;
// async fn upload_asset(
// TypedMultipart(UploadAssetRequest { image, author }): TypedMultipart<UploadAssetRequest>,
// ) -> StatusCode {
// let file_name = image.metadata.file_name.unwrap_or(String::from("data.bin"));
// let path = Path::new("/tmp").join(author).join(file_name);
// match image.contents.persist(path) {
// Ok(_) => StatusCode::CREATED,
// Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
// }
// }
pub async fn upload_handler(TypedMultipart(input): TypedMultipart<FileUploadRequest>) -> Response {
// let file_name = input.file.metadata.file_name.unwrap_or("newstring".to_string());
//
"Successfully processed".to_string().into_response()
}

33
src/routes/ingress.rs Normal file
View File

@@ -0,0 +1,33 @@
use std::sync::Arc;
use axum::{http::StatusCode, response::IntoResponse, Extension, Json};
use tracing::{error, info};
use crate::{models::ingress::{IngressContent, IngressInput}, rabbitmq::publisher::RabbitMQProducer};
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
Json(input): Json<IngressInput>,
) -> impl IntoResponse {
info!("Recieved input: {:?}", input);
if let Ok(content) = IngressContent::new(input).await {
// Publish content to RabbitMQ (or other system)
match producer.publish(&content).await {
Ok(_) => {
info!("Message published successfully");
"Successfully processed".to_string().into_response()
}
Err(e) => {
error!("Failed to publish message: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
else {
error!("Failed to create IngressContent object" );
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to create object").into_response()
}
}

3
src/routes/mod.rs Normal file
View File

@@ -0,0 +1,3 @@
pub mod file;
pub mod ingress;
pub mod queue_length;

View File

@@ -0,0 +1,36 @@
use axum::{http::StatusCode, response::{IntoResponse, Response}};
use tracing::{error, info};
use crate::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig};
pub async fn queue_length_handler() -> Response {
info!("Getting queue length");
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
};
// Create a new consumer
match RabbitMQConsumer::new(&config).await {
Ok(consumer) => {
info!("Consumer connected to RabbitMQ");
// Get the queue length
let queue_length = consumer.queue.message_count();
info!("Queue length: {}", queue_length);
// Return the queue length with a 200 OK status
(StatusCode::OK, queue_length.to_string()).into_response()
},
Err(e) => {
error!("Failed to create consumer: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to connect to RabbitMQ".to_string()).into_response()
}
}
}

View File

@@ -1,107 +1,9 @@
use axum::{
extract::Multipart, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router
extract::DefaultBodyLimit, routing::{get, post}, Extension, Router
};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use uuid::Uuid;
use zettle_db::{models::ingress::{FileInfo, IngressContent, IngressMultipart }, rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}};
use zettle_db::rabbitmq::publisher::RabbitMQProducer;
use zettle_db::{rabbitmq::{publisher::RabbitMQProducer, RabbitMQConfig}, routes::{file::upload_handler, ingress::ingress_handler, queue_length::queue_length_handler}};
use std::sync::Arc;
use axum_typed_multipart::TypedMultipart;
use axum::debug_handler;
use tracing::{info, error};
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
TypedMultipart(multipart_data): TypedMultipart<IngressMultipart>, // Parse form data
) -> impl IntoResponse {
info!("Received multipart data: {:?}", &multipart_data);
let file_info = if let Some(file) = multipart_data.file {
// File name or default to "data.bin" if none is provided
let file_name = file.metadata.file_name.unwrap_or(String::from("data.bin"));
let mime_type = mime_guess::from_path(&file_name)
.first_or_octet_stream()
.to_string();
let uuid = Uuid::new_v4();
let path = std::path::Path::new("/tmp").join(uuid.to_string()).join(&file_name);
// Persist the file
match file.contents.persist(&path) {
Ok(_) => {
info!("File saved at: {:?}", path);
// Generate FileInfo
let file_info = FileInfo {
uuid,
sha256: "sha-12412".to_string(),
path: path.to_string_lossy().to_string(),
mime_type,
};
Some(file_info)
}
Err(e) => {
error!("Failed to save file: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to store file").into_response();
}
}
} else {
None // No file was uploaded
};
// Convert `IngressMultipart` to `IngressContent`
let content = match IngressContent::new(multipart_data.content, multipart_data.instructions,multipart_data.category, file_info).await {
Ok(content) => content,
Err(e) => {
error!("Error creating IngressContent: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create content").into_response();
}
};
// Publish content to RabbitMQ (or other system)
match producer.publish(&content).await {
Ok(_) => {
info!("Message published successfully");
"Successfully processed".to_string().into_response()
}
Err(e) => {
error!("Failed to publish message: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
async fn queue_length_handler() -> Response {
info!("Getting queue length");
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
};
// Create a new consumer
match RabbitMQConsumer::new(&config).await {
Ok(consumer) => {
info!("Consumer connected to RabbitMQ");
// Get the queue length
let queue_length = consumer.queue.message_count();
info!("Queue length: {}", queue_length);
// Return the queue length with a 200 OK status
(StatusCode::OK, queue_length.to_string()).into_response()
},
Err(e) => {
error!("Failed to create consumer: {:?}", e);
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to connect to RabbitMQ".to_string()).into_response()
}
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -126,7 +28,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let app = Router::new()
.route("/ingress", post(ingress_handler))
.route("/message_count", get(queue_length_handler))
.layer(Extension(producer));
.layer(Extension(producer))
.route("/file", post(upload_handler))
.layer(DefaultBodyLimit::max(1024 * 1024 * 1024));
tracing::info!("Listening on 0.0.0.0:3000");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;

View File

@@ -1 +0,0 @@
test

View File

@@ -1 +0,0 @@
Unsupported content