multipart wip

This commit is contained in:
Per Stark
2024-09-24 14:02:38 +02:00
parent 7fbfa0fdbc
commit 1bd2eee8e1
15 changed files with 871 additions and 97 deletions

394
Cargo.lock generated
View File

@@ -85,6 +85,27 @@ dependencies = [
"url",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anyhow"
version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6"
[[package]]
name = "asn1-rs"
version = "0.6.2"
@@ -109,7 +130,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
"synstructure",
]
@@ -121,7 +142,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -260,7 +281,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -283,6 +304,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
dependencies = [
"async-trait",
"axum-core",
"axum-macros",
"bytes",
"futures-util",
"http",
@@ -294,6 +316,7 @@ dependencies = [
"matchit",
"memchr",
"mime",
"multer",
"percent-encoding",
"pin-project-lite",
"rustversion",
@@ -330,6 +353,50 @@ dependencies = [
"tracing",
]
[[package]]
name = "axum-macros"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
]
[[package]]
name = "axum_typed_multipart"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d097f2fcf54695aea6617d0c1dcc50bf2b7ed50e880fd992e4baf65ab334230e"
dependencies = [
"anyhow",
"axum",
"axum_typed_multipart_macros",
"bytes",
"chrono",
"futures-core",
"futures-util",
"tempfile",
"thiserror",
"tokio",
"uuid",
]
[[package]]
name = "axum_typed_multipart_macros"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fbe748f9a46f680070920e4a3ef57939503c8d30fe30ca5525b6c63c96b7abfe"
dependencies = [
"darling",
"heck",
"proc-macro-error",
"quote",
"syn 2.0.77",
"ubyte",
]
[[package]]
name = "backtrace"
version = "0.3.74"
@@ -400,6 +467,12 @@ dependencies = [
"piper",
]
[[package]]
name = "bumpalo"
version = "3.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c"
[[package]]
name = "byteorder"
version = "1.5.0"
@@ -411,6 +484,9 @@ name = "bytes"
version = "1.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3"
dependencies = [
"serde",
]
[[package]]
name = "cbc"
@@ -436,6 +512,20 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "chrono"
version = "0.4.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets 0.52.6",
]
[[package]]
name = "cipher"
version = "0.4.4"
@@ -520,6 +610,41 @@ dependencies = [
"typenum",
]
[[package]]
name = "darling"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989"
dependencies = [
"darling_core",
"darling_macro",
]
[[package]]
name = "darling_core"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim",
"syn 2.0.77",
]
[[package]]
name = "darling_macro"
version = "0.20.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806"
dependencies = [
"darling_core",
"quote",
"syn 2.0.77",
]
[[package]]
name = "data-encoding"
version = "2.6.0"
@@ -561,7 +686,7 @@ checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -601,7 +726,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -610,6 +735,15 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "encoding_rs"
version = "0.8.34"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59"
dependencies = [
"cfg-if",
]
[[package]]
name = "errno"
version = "0.3.9"
@@ -752,6 +886,17 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "futures-macro"
version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
]
[[package]]
name = "futures-sink"
version = "0.3.30"
@@ -771,9 +916,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [
"futures-core",
"futures-macro",
"futures-task",
"pin-project-lite",
"pin-utils",
"slab",
]
[[package]]
@@ -803,6 +950,12 @@ version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64"
[[package]]
name = "heck"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "hermit-abi"
version = "0.3.9"
@@ -910,6 +1063,35 @@ dependencies = [
"tokio",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "ident_case"
version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39"
[[package]]
name = "idna"
version = "0.5.0"
@@ -956,6 +1138,15 @@ version = "1.0.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b"
[[package]]
name = "js-sys"
version = "0.3.70"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a"
dependencies = [
"wasm-bindgen",
]
[[package]]
name = "lapin"
version = "2.5.0"
@@ -1046,6 +1237,16 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "mime_guess"
version = "2.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e"
dependencies = [
"mime",
"unicase",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@@ -1073,6 +1274,23 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "multer"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b"
dependencies = [
"bytes",
"encoding_rs",
"futures-util",
"http",
"httparse",
"memchr",
"mime",
"spin",
"version_check",
]
[[package]]
name = "nom"
version = "7.1.3"
@@ -1256,7 +1474,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -1370,6 +1588,30 @@ dependencies = [
"zerocopy",
]
[[package]]
name = "proc-macro-error"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c"
dependencies = [
"proc-macro-error-attr",
"proc-macro2",
"quote",
"syn 1.0.109",
"version_check",
]
[[package]]
name = "proc-macro-error-attr"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869"
dependencies = [
"proc-macro2",
"quote",
"version_check",
]
[[package]]
name = "proc-macro2"
version = "1.0.86"
@@ -1702,7 +1944,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -1839,12 +2081,28 @@ dependencies = [
"der",
]
[[package]]
name = "strsim"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "subtle"
version = "2.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.77"
@@ -1876,7 +2134,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -1891,6 +2149,19 @@ dependencies = [
"rustls-pemfile",
]
[[package]]
name = "tempfile"
version = "3.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64"
dependencies = [
"cfg-if",
"fastrand 2.1.1",
"once_cell",
"rustix 0.38.37",
"windows-sys 0.59.0",
]
[[package]]
name = "thiserror"
version = "1.0.63"
@@ -1908,7 +2179,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -1993,7 +2264,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -2044,7 +2315,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -2092,6 +2363,21 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825"
[[package]]
name = "ubyte"
version = "0.10.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea"
[[package]]
name = "unicase"
version = "2.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89"
dependencies = [
"version_check",
]
[[package]]
name = "unicode-bidi"
version = "0.3.15"
@@ -2128,6 +2414,17 @@ dependencies = [
"form_urlencoded",
"idna",
"percent-encoding",
"serde",
]
[[package]]
name = "uuid"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314"
dependencies = [
"getrandom",
"serde",
]
[[package]]
@@ -2154,6 +2451,61 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasm-bindgen"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5"
dependencies = [
"cfg-if",
"once_cell",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b"
dependencies = [
"bumpalo",
"log",
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.77",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.77",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.93"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484"
[[package]]
name = "winapi"
version = "0.3.9"
@@ -2176,6 +2528,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets 0.52.6",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
@@ -2370,7 +2731,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.77",
]
[[package]]
@@ -2384,11 +2745,20 @@ name = "zettle_db"
version = "0.1.0"
dependencies = [
"axum",
"axum_typed_multipart",
"bytes",
"futures-lite 2.3.0",
"lapin",
"mime",
"mime_guess",
"serde",
"serde_json",
"sha2",
"tempfile",
"thiserror",
"tokio",
"tracing",
"tracing-subscriber",
"url",
"uuid",
]

View File

@@ -6,14 +6,23 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
axum = "0.7.5"
axum = { version = "0.7.5", features = ["multipart", "macros"] }
axum_typed_multipart = "0.12.1"
bytes = { version = "1.7.2", features = ["serde"] }
futures-lite = "2.3.0"
lapin = { version = "2.5.0", features = ["serde_json"] }
mime = "0.3.17"
mime_guess = "2.0.5"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
sha2 = "0.10.8"
tempfile = "3.12.0"
thiserror = "1.0.63"
tokio = { version = "1.40.0", features = ["full"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] }
uuid = { version = "1.10.0", features = ["v4", "serde"] }
[[bin]]

View File

@@ -29,28 +29,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Consumer connected to RabbitMQ");
// Start consuming messages
loop {
match consumer.consume().await {
Ok((message, delivery)) => {
info!("Received message: {}", message);
// Process the message here
// For example, you could insert it into a database
// process_message(&message).await?;
consumer.process_messages().await?;
// loop {
// match consumer.consume().await {
// Ok((message, delivery)) => {
// info!("Received message: {}", message);
// // Process the message here
// // For example, you could insert it into a database
// // process_message(&message).await?;
info!("Done processing, acking");
consumer.ack_delivery(delivery).await?
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
// info!("Done processing, acking");
// consumer.ack_delivery(delivery).await?
// }
// Err(RabbitMQError::ConsumeError(e)) => {
// error!("Error consuming message: {}", e);
// // Optionally add a delay before trying again
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
// }
// Err(e) => {
// error!("Unexpected error: {}", e);
// break;
// }
// }
// }
Ok(())
}

View File

@@ -1 +1,3 @@
pub mod models;
pub mod rabbitmq;
pub mod utils;

93
src/models/ingress.rs Normal file
View File

@@ -0,0 +1,93 @@
use axum::extract::Multipart;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::fs;
use tracing::info;
use url::Url;
use uuid::Uuid;
use sha2::{Digest, Sha256};
use std::path::Path;
use mime_guess::from_path;
use axum_typed_multipart::{FieldData, TryFromMultipart };
use tempfile::NamedTempFile;
#[derive(Debug, TryFromMultipart)]
pub struct IngressMultipart {
/// JSON content field
pub content: Option<String>,
pub instructions: String,
pub category: String,
/// Optional file
#[form_data(limit = "unlimited")]
pub file: Option<FieldData<NamedTempFile>>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct FileInfo {
pub uuid: Uuid,
pub sha256: String,
pub path: String,
pub mime_type: String,
}
#[derive(Debug, Deserialize, Serialize)]
pub enum Content {
Url(String),
Text(String),
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressContent {
pub content: Option<Content>,
pub instructions: String,
pub category: String,
pub files: Option<Vec<FileInfo>>,
}
/// Error types for file and content handling.
#[derive(Error, Debug)]
pub enum IngressContentError {
#[error("IO error occurred: {0}")]
Io(#[from] std::io::Error),
#[error("UTF-8 conversion error: {0}")]
Utf8(#[from] std::string::FromUtf8Error),
#[error("MIME type detection failed for input: {0}")]
MimeDetection(String),
#[error("Unsupported MIME type: {0}")]
UnsupportedMime(String),
#[error("URL parse error: {0}")]
UrlParse(#[from] url::ParseError),
}
impl IngressContent {
/// Create a new `IngressContent` from `IngressMultipart`.
pub async fn new(
content: Option<String>, instructions: String, category: String,
file: Option<FileInfo>
) -> Result<IngressContent, IngressContentError> {
let content = if let Some(content_str) = content {
// Check if the content is a URL
if let Ok(url) = Url::parse(&content_str) {
info!("Detected URL: {}", url);
Some(Content::Url(url.to_string()))
} else {
info!("Treating input as plain text");
Some(Content::Text(content_str))
}
} else {
None
};
Ok(IngressContent {
content,
instructions,
category,
files: file.map(|f| vec![f]), // Single file wrapped in a Vec
})
}
}

1
src/models/mod.rs Normal file
View File

@@ -0,0 +1 @@
pub mod ingress;

View File

@@ -2,10 +2,13 @@ use lapin::{
message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue
};
use futures_lite::stream::StreamExt;
use tracing::info;
use crate::models::ingress::IngressContent;
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
use tokio::fs;
/// Struct to consume messages from RabbitMQ.
pub struct RabbitMQConsumer {
common: RabbitMQCommon,
pub queue: Queue,
@@ -26,7 +29,7 @@ impl RabbitMQConsumer {
// Initialize the consumer
let consumer = Self::initialize_consumer(&common.channel, &config).await?;
Ok(Self { common, queue, consumer})
Ok(Self { common, queue, consumer })
}
async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result<Consumer, RabbitMQError> {
@@ -67,16 +70,21 @@ impl RabbitMQConsumer {
.map_err(|e| RabbitMQError::QueueError(e.to_string()))
}
pub async fn consume(&self) -> Result<(String, Delivery), RabbitMQError> {
/// Consumes a message and returns the deserialized IngressContent along with the Delivery
pub async fn consume(&self) -> Result<(IngressContent, Delivery), RabbitMQError> {
// Receive the next message
let delivery = self.consumer.clone().next().await
.ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))?
.map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?;
let message = String::from_utf8_lossy(&delivery.data).to_string();
// Deserialize the message payload into IngressContent
let ingress: IngressContent = serde_json::from_slice(&delivery.data)
.map_err(|e| RabbitMQError::ConsumeError(format!("Deserialization Error: {}", e)))?;
Ok((message, delivery))
Ok((ingress, delivery))
}
/// Acknowledges the message after processing
pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> {
self.common.channel
.basic_ack(delivery.delivery_tag, BasicAckOptions::default())
@@ -85,5 +93,37 @@ impl RabbitMQConsumer {
Ok(())
}
}
/// Processes messages in a loop
pub async fn process_messages(&self) -> Result<(), RabbitMQError> {
loop {
match self.consume().await {
Ok((ingress, delivery)) => {
info!("Received ingress object: {:?}", ingress);
// Process the ingress object
self.handle_ingress_content(&ingress).await;
info!("Processing done, acknowledging message");
self.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
Ok(())
}
/// Handles the IngressContent based on its type
async fn handle_ingress_content(&self, ingress: &IngressContent) {
info!("Processing content: {:?}", ingress);
}
}

View File

@@ -1,8 +1,8 @@
pub mod producer;
pub mod publisher;
pub mod consumer;
use lapin::{
options::{ExchangeDeclareOptions, QueueDeclareOptions}, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind, Queue
options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind
};
use thiserror::Error;

View File

@@ -1,39 +0,0 @@
use lapin::{
options::*, publisher_confirm::Confirmation, BasicProperties,
};
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
pub struct RabbitMQProducer {
common: RabbitMQCommon,
exchange_name: String,
routing_key: String,
}
impl RabbitMQProducer {
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?;
Ok(Self {
common,
exchange_name: config.exchange.clone(),
routing_key: config.routing_key.clone(),
})
}
pub async fn publish(&self, payload: &[u8]) -> Result<Confirmation, RabbitMQError> {
self.common.channel
.basic_publish(
&self.exchange_name,
&self.routing_key,
BasicPublishOptions::default(),
payload,
BasicProperties::default(),
)
.await
.map_err(|e| RabbitMQError::PublishError(e.to_string()))?
.await
.map_err(|e| RabbitMQError::PublishError(e.to_string()))
}
}

60
src/rabbitmq/publisher.rs Normal file
View File

@@ -0,0 +1,60 @@
use lapin::{
options::*, publisher_confirm::Confirmation, BasicProperties,
};
use crate::models::ingress::IngressContent;
use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError};
use tracing::{info, error};
pub struct RabbitMQProducer {
common: RabbitMQCommon,
exchange_name: String,
routing_key: String,
}
impl RabbitMQProducer {
pub async fn new(config: &RabbitMQConfig) -> Result<Self, RabbitMQError> {
let common = RabbitMQCommon::new(config).await?;
common.declare_exchange(config, false).await?;
Ok(Self {
common,
exchange_name: config.exchange.clone(),
routing_key: config.routing_key.clone(),
})
}
/// Publishes an IngressContent object to RabbitMQ after serializing it to JSON.
pub async fn publish(&self, ingress: &IngressContent) -> Result<Confirmation, RabbitMQError> {
// Serialize IngressContent to JSON
let payload = serde_json::to_vec(ingress)
.map_err(|e| {
error!("Serialization Error: {}", e);
RabbitMQError::PublishError(format!("Serialization Error: {}", e))
})?;
// Publish the serialized payload to RabbitMQ
let confirmation = self.common.channel
.basic_publish(
&self.exchange_name,
&self.routing_key,
BasicPublishOptions::default(),
&payload,
BasicProperties::default(),
)
.await
.map_err(|e| {
error!("Publish Error: {}", e);
RabbitMQError::PublishError(format!("Publish Error: {}", e))
})?
.await
.map_err(|e| {
error!("Publish Confirmation Error: {}", e);
RabbitMQError::PublishError(format!("Publish Confirmation Error: {}", e))
})?;
info!("Published message to exchange '{}' with routing key '{}'", self.exchange_name, self.routing_key);
Ok(confirmation)
}
}

View File

@@ -1,35 +1,76 @@
use axum::{
http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router
extract::Multipart, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router
};
use serde::Deserialize;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, producer::RabbitMQProducer, RabbitMQConfig};
use uuid::Uuid;
use zettle_db::{models::ingress::{FileInfo, IngressContent, IngressMultipart }, rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}};
use zettle_db::rabbitmq::publisher::RabbitMQProducer;
use std::sync::Arc;
#[derive(Deserialize)]
struct IngressPayload {
payload: String,
}
use axum_typed_multipart::TypedMultipart;
use axum::debug_handler;
use tracing::{info, error};
async fn ingress_handler(
pub async fn ingress_handler(
Extension(producer): Extension<Arc<RabbitMQProducer>>,
Json(payload): Json<IngressPayload>
) -> Response {
info!("Received payload: {:?}", payload.payload);
match producer.publish(&payload.payload.into_bytes().to_vec()).await {
TypedMultipart(multipart_data): TypedMultipart<IngressMultipart>, // Parse form data
) -> impl IntoResponse {
info!("Received multipart data: {:?}", &multipart_data);
let file_info = if let Some(file) = multipart_data.file {
// File name or default to "data.bin" if none is provided
let file_name = file.metadata.file_name.unwrap_or(String::from("data.bin"));
let mime_type = mime_guess::from_path(&file_name)
.first_or_octet_stream()
.to_string();
let uuid = Uuid::new_v4();
let path = std::path::Path::new("/tmp").join(uuid.to_string()).join(&file_name);
// Persist the file
match file.contents.persist(&path) {
Ok(_) => {
info!("File saved at: {:?}", path);
// Generate FileInfo
let file_info = FileInfo {
uuid,
sha256: "sha-12412".to_string(),
path: path.to_string_lossy().to_string(),
mime_type,
};
Some(file_info)
}
Err(e) => {
error!("Failed to save file: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to store file").into_response();
}
}
} else {
None // No file was uploaded
};
// Convert `IngressMultipart` to `IngressContent`
let content = match IngressContent::new(multipart_data.content, multipart_data.instructions,multipart_data.category, file_info).await {
Ok(content) => content,
Err(e) => {
error!("Error creating IngressContent: {:?}", e);
return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create content").into_response();
}
};
// Publish content to RabbitMQ (or other system)
match producer.publish(&content).await {
Ok(_) => {
info!("Message published successfully");
"thank you".to_string().into_response()
},
"Successfully processed".to_string().into_response()
}
Err(e) => {
error!("Failed to publish message: {:?}", e);
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
(StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response()
}
}
}
async fn queue_length_handler() -> Response {
info!("Getting queue length");
@@ -61,6 +102,7 @@ async fn queue_length_handler() -> Response {
}
}
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Set up tracing

192
src/utils/mime.rs Normal file
View File

@@ -0,0 +1,192 @@
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::info;
use url::Url;
use uuid::Uuid;
use std::path::Path;
use tokio::fs;
/// Struct to reference stored files.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Reference {
pub uuid: Uuid,
pub path: String,
}
impl Reference {
/// Creates a new Reference with a generated UUID.
pub fn new(path: String) -> Self {
Self {
uuid: Uuid::new_v4(),
path,
}
}
}
/// Enum representing different types of content.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum Content {
Text(String),
Url(String),
Document(Reference),
Video(Reference),
Audio(Reference),
// Extend with more variants as needed
}
impl Content {
/// Retrieves the path from a reference if the content is a Reference variant.
pub fn get_path(&self) -> Option<&str> {
match self {
Content::Document(ref r) | Content::Video(ref r) | Content::Audio(ref r) => Some(&r.path),
_ => None,
}
}
}
#[derive(Error, Debug)]
pub enum IngressContentError {
#[error("IO error occurred: {0}")]
Io(#[from] std::io::Error),
#[error("UTF-8 conversion error: {0}")]
Utf8(#[from] std::string::FromUtf8Error),
#[error("MIME type detection failed for input: {0}")]
MimeDetection(String),
#[error("Unsupported MIME type: {0}")]
UnsupportedMime(String),
#[error("URL parse error: {0}")]
UrlParse(#[from] url::ParseError),
// Add more error variants as needed.
}
#[derive(Serialize, Deserialize, Debug)]
pub struct IngressContent {
pub content: Content,
pub category: String,
pub instructions: String,
}
impl IngressContent {
/// Creates a new IngressContent instance from the given input.
///
/// # Arguments
///
/// * `input` - A string slice that holds the input content, which can be text, a file path, or a URL.
/// * `category` - A string slice representing the category of the content.
/// * `instructions` - A string slice containing instructions for processing the content.
///
/// # Returns
///
/// * `Result<IngressContent, IngressContentError>` - The result containing either the IngressContent instance or an error.
pub async fn new(
input: &str,
category: &str,
instructions: &str,
) -> Result<IngressContent, IngressContentError> {
// Check if the input is a valid URL
if let Ok(url) = Url::parse(input) {
info!("Detected URL: {}", url);
return Ok(IngressContent {
content: Content::Url(url.to_string()),
category: category.to_string(),
instructions: instructions.to_string(),
});
}
// Attempt to treat the input as a file path
if let Ok(metadata) = tokio::fs::metadata(input).await {
if metadata.is_file() {
info!("Processing as file path: {}", input);
let mime = mime_guess::from_path(input).first_or(mime::TEXT_PLAIN);
let reference = Self::store_file(input, &mime).await?;
let content = match mime.type_() {
mime::TEXT | mime::APPLICATION => Content::Document(reference),
mime::VIDEO => Content::Video(reference),
mime::AUDIO => Content::Audio(reference),
other => {
info!("Detected unsupported MIME type: {}", other);
return Err(IngressContentError::UnsupportedMime(mime.to_string()));
}
};
return Ok(IngressContent {
content,
category: category.to_string(),
instructions: instructions.to_string(),
});
}
}
// Treat the input as plain text if it's neither a URL nor a file path
info!("Treating input as plain text");
Ok(IngressContent {
content: Content::Text(input.to_string()),
category: category.to_string(),
instructions: instructions.to_string(),
})
}
/// Stores the file into 'data/' directory and returns a Reference.
async fn store_file(input_path: &str, mime: &mime::Mime) -> Result<Reference, IngressContentError> {
return Ok(Reference::new(input_path.to_string()));
// Define the data directory
let data_dir = Path::new("data/");
// Ensure 'data/' directory exists; create it if it doesn't
fs::create_dir_all(data_dir).await.map_err(IngressContentError::Io)?;
// Generate a UUID for the file
let uuid = Uuid::new_v4();
// Determine the file extension based on MIME type
// let extension = Some(mime_guess::get_mime_extensions(mime)).unwrap_or("bin");
// Create a unique filename using UUID and extension
let file_name = format!("{}.{}", uuid, extension);
// Define the full file path
let file_path = data_dir.join(&file_name);
// Copy the original file to the 'data/' directory with the new filename
fs::copy(input_path, &file_path).await.map_err(IngressContentError::Io)?;
// Return a new Reference
Ok(Reference::new(file_path.to_string_lossy().to_string()))
}
/// Example method to handle content. Implement your actual logic here.
pub fn handle_content(&self) {
match &self.content {
Content::Text(text) => {
// Handle text content
println!("Text: {}", text);
}
Content::Url(url) => {
// Handle URL content
println!("URL: {}", url);
}
Content::Document(ref reference) => {
// Handle Document content via reference
println!("Document Reference: UUID: {}, Path: {}", reference.uuid, reference.path);
// Optionally, read the file from reference.path
}
Content::Video(ref reference) => {
// Handle Video content via reference
println!("Video Reference: UUID: {}, Path: {}", reference.uuid, reference.path);
// Optionally, read the file from reference.path
}
Content::Audio(ref reference) => {
// Handle Audio content via reference
println!("Audio Reference: UUID: {}, Path: {}", reference.uuid, reference.path);
// Optionally, read the file from reference.path
}
// Handle additional content types
}
}
}

1
src/utils/mod.rs Normal file
View File

@@ -0,0 +1 @@
// pub mod mime;

1
test.txt Normal file
View File

@@ -0,0 +1 @@
test

1
test.unsupported Normal file
View File

@@ -0,0 +1 @@
Unsupported content