diff --git a/Cargo.lock b/Cargo.lock index ac187a8..9b978d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -85,6 +85,27 @@ dependencies = [ "url", ] +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] +name = "anyhow" +version = "1.0.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" + [[package]] name = "asn1-rs" version = "0.6.2" @@ -109,7 +130,7 @@ checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", "synstructure", ] @@ -121,7 +142,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -260,7 +281,7 @@ checksum = "a27b8a3a6e1a44fa4c8baf1f653e4172e81486d4941f2237e20dc2d0cf4ddff1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -283,6 +304,7 @@ checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "bytes", "futures-util", "http", @@ -294,6 +316,7 @@ dependencies = [ "matchit", "memchr", "mime", + "multer", "percent-encoding", "pin-project-lite", "rustversion", @@ -330,6 +353,50 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-macros" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57d123550fa8d071b7255cb0cc04dc302baa6c8c4a79f55701552684d8399bce" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + +[[package]] +name = "axum_typed_multipart" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d097f2fcf54695aea6617d0c1dcc50bf2b7ed50e880fd992e4baf65ab334230e" +dependencies = [ + "anyhow", + "axum", + "axum_typed_multipart_macros", + "bytes", + "chrono", + "futures-core", + "futures-util", + "tempfile", + "thiserror", + "tokio", + "uuid", +] + +[[package]] +name = "axum_typed_multipart_macros" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbe748f9a46f680070920e4a3ef57939503c8d30fe30ca5525b6c63c96b7abfe" +dependencies = [ + "darling", + "heck", + "proc-macro-error", + "quote", + "syn 2.0.77", + "ubyte", +] + [[package]] name = "backtrace" version = "0.3.74" @@ -400,6 +467,12 @@ dependencies = [ "piper", ] +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + [[package]] name = "byteorder" version = "1.5.0" @@ -411,6 +484,9 @@ name = "bytes" version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +dependencies = [ + "serde", +] [[package]] name = "cbc" @@ -436,6 +512,20 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chrono" +version = "0.4.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a21f936df1771bf62b77f047b726c4625ff2e8aa607c01ec06e5a05bd8463401" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-targets 0.52.6", +] + [[package]] name = "cipher" version = "0.4.4" @@ -520,6 +610,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f63b86c8a8826a49b8c21f08a2d07338eec8d900540f8630dc76284be802989" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95133861a8032aaea082871032f5815eb9e98cef03fa916ab4500513994df9e5" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 2.0.77", +] + +[[package]] +name = "darling_macro" +version = "0.20.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.77", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -561,7 +686,7 @@ checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -601,7 +726,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -610,6 +735,15 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" +[[package]] +name = "encoding_rs" +version = "0.8.34" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +dependencies = [ + "cfg-if", +] + [[package]] name = "errno" version = "0.3.9" @@ -752,6 +886,17 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "futures-sink" version = "0.3.30" @@ -771,9 +916,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-core", + "futures-macro", "futures-task", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -803,6 +950,12 @@ version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32085ea23f3234fc7846555e85283ba4de91e21016dc0455a16286d87a292d64" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -910,6 +1063,35 @@ dependencies = [ "tokio", ] +[[package]] +name = "iana-time-zone" +version = "0.1.61" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.5.0" @@ -956,6 +1138,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "js-sys" +version = "0.3.70" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1868808506b929d7b0cfa8f75951347aa71bb21144b7791bae35d9bccfcfe37a" +dependencies = [ + "wasm-bindgen", +] + [[package]] name = "lapin" version = "2.5.0" @@ -1046,6 +1237,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7c44f8e672c00fe5308fa235f821cb4198414e1c77935c1ab6948d3fd78550e" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1073,6 +1274,23 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http", + "httparse", + "memchr", + "mime", + "spin", + "version_check", +] + [[package]] name = "nom" version = "7.1.3" @@ -1256,7 +1474,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -1370,6 +1588,30 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn 1.0.109", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + [[package]] name = "proc-macro2" version = "1.0.86" @@ -1702,7 +1944,7 @@ checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -1839,12 +2081,28 @@ dependencies = [ "der", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.77" @@ -1876,7 +2134,7 @@ checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -1891,6 +2149,19 @@ dependencies = [ "rustls-pemfile", ] +[[package]] +name = "tempfile" +version = "3.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04cbcdd0c794ebb0d4cf35e88edd2f7d2c4c3e9a5a6dab322839b321c6a87a64" +dependencies = [ + "cfg-if", + "fastrand 2.1.1", + "once_cell", + "rustix 0.38.37", + "windows-sys 0.59.0", +] + [[package]] name = "thiserror" version = "1.0.63" @@ -1908,7 +2179,7 @@ checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -1993,7 +2264,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2044,7 +2315,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2092,6 +2363,21 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ubyte" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f720def6ce1ee2fc44d40ac9ed6d3a59c361c80a75a7aa8e75bb9baed31cf2ea" + +[[package]] +name = "unicase" +version = "2.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.15" @@ -2128,6 +2414,17 @@ dependencies = [ "form_urlencoded", "idna", "percent-encoding", + "serde", +] + +[[package]] +name = "uuid" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +dependencies = [ + "getrandom", + "serde", ] [[package]] @@ -2154,6 +2451,61 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasm-bindgen" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a82edfc16a6c469f5f44dc7b571814045d60404b55a0ee849f9bcfa2e63dd9b5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9de396da306523044d3302746f1208fa71d7532227f15e347e2d93e4145dd77b" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "585c4c91a46b072c92e908d99cb1dcdf95c5218eeb6f3bf1efa991ee7a68cccf" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afc340c74d9005395cf9dd098506f7f44e38f2b4a21c6aaacf9a105ea5e1e836" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.77", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.93" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c62a0a307cb4a311d3a07867860911ca130c3494e8c2719593806c08bc5d0484" + [[package]] name = "winapi" version = "0.3.9" @@ -2176,6 +2528,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.6", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -2370,7 +2731,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.77", ] [[package]] @@ -2384,11 +2745,20 @@ name = "zettle_db" version = "0.1.0" dependencies = [ "axum", + "axum_typed_multipart", + "bytes", "futures-lite 2.3.0", "lapin", + "mime", + "mime_guess", "serde", + "serde_json", + "sha2", + "tempfile", "thiserror", "tokio", "tracing", "tracing-subscriber", + "url", + "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index b3934b8..9dbebd0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,14 +6,23 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = "0.7.5" +axum = { version = "0.7.5", features = ["multipart", "macros"] } +axum_typed_multipart = "0.12.1" +bytes = { version = "1.7.2", features = ["serde"] } futures-lite = "2.3.0" lapin = { version = "2.5.0", features = ["serde_json"] } +mime = "0.3.17" +mime_guess = "2.0.5" serde = { version = "1.0.210", features = ["derive"] } +serde_json = "1.0.128" +sha2 = "0.10.8" +tempfile = "3.12.0" thiserror = "1.0.63" tokio = { version = "1.40.0", features = ["full"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } +url = { version = "2.5.2", features = ["serde"] } +uuid = { version = "1.10.0", features = ["v4", "serde"] } [[bin]] diff --git a/src/consumer.rs b/src/consumer.rs index 0d2ef69..da87f92 100644 --- a/src/consumer.rs +++ b/src/consumer.rs @@ -29,28 +29,29 @@ async fn main() -> Result<(), Box> { info!("Consumer connected to RabbitMQ"); // Start consuming messages - loop { - match consumer.consume().await { - Ok((message, delivery)) => { - info!("Received message: {}", message); - // Process the message here - // For example, you could insert it into a database - // process_message(&message).await?; + consumer.process_messages().await?; + // loop { + // match consumer.consume().await { + // Ok((message, delivery)) => { + // info!("Received message: {}", message); + // // Process the message here + // // For example, you could insert it into a database + // // process_message(&message).await?; - info!("Done processing, acking"); - consumer.ack_delivery(delivery).await? - } - Err(RabbitMQError::ConsumeError(e)) => { - error!("Error consuming message: {}", e); - // Optionally add a delay before trying again - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - } - Err(e) => { - error!("Unexpected error: {}", e); - break; - } - } - } + // info!("Done processing, acking"); + // consumer.ack_delivery(delivery).await? + // } + // Err(RabbitMQError::ConsumeError(e)) => { + // error!("Error consuming message: {}", e); + // // Optionally add a delay before trying again + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + // } + // Err(e) => { + // error!("Unexpected error: {}", e); + // break; + // } + // } + // } Ok(()) } diff --git a/src/lib.rs b/src/lib.rs index c9bc93a..8750f18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,3 @@ +pub mod models; pub mod rabbitmq; +pub mod utils; diff --git a/src/models/ingress.rs b/src/models/ingress.rs new file mode 100644 index 0000000..bf381e6 --- /dev/null +++ b/src/models/ingress.rs @@ -0,0 +1,93 @@ +use axum::extract::Multipart; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tokio::fs; +use tracing::info; +use url::Url; +use uuid::Uuid; +use sha2::{Digest, Sha256}; +use std::path::Path; +use mime_guess::from_path; +use axum_typed_multipart::{FieldData, TryFromMultipart }; +use tempfile::NamedTempFile; + +#[derive(Debug, TryFromMultipart)] +pub struct IngressMultipart { + /// JSON content field + pub content: Option, + pub instructions: String, + pub category: String, + + /// Optional file + #[form_data(limit = "unlimited")] + pub file: Option>, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct FileInfo { + pub uuid: Uuid, + pub sha256: String, + pub path: String, + pub mime_type: String, +} + +#[derive(Debug, Deserialize, Serialize)] +pub enum Content { + Url(String), + Text(String), +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct IngressContent { + pub content: Option, + pub instructions: String, + pub category: String, + pub files: Option>, +} + +/// Error types for file and content handling. +#[derive(Error, Debug)] +pub enum IngressContentError { + #[error("IO error occurred: {0}")] + Io(#[from] std::io::Error), + + #[error("UTF-8 conversion error: {0}")] + Utf8(#[from] std::string::FromUtf8Error), + + #[error("MIME type detection failed for input: {0}")] + MimeDetection(String), + + #[error("Unsupported MIME type: {0}")] + UnsupportedMime(String), + + #[error("URL parse error: {0}")] + UrlParse(#[from] url::ParseError), +} + +impl IngressContent { + /// Create a new `IngressContent` from `IngressMultipart`. + pub async fn new( + content: Option, instructions: String, category: String, + file: Option + ) -> Result { + let content = if let Some(content_str) = content { + // Check if the content is a URL + if let Ok(url) = Url::parse(&content_str) { + info!("Detected URL: {}", url); + Some(Content::Url(url.to_string())) + } else { + info!("Treating input as plain text"); + Some(Content::Text(content_str)) + } + } else { + None + }; + + Ok(IngressContent { + content, + instructions, + category, + files: file.map(|f| vec![f]), // Single file wrapped in a Vec + }) + } +} diff --git a/src/models/mod.rs b/src/models/mod.rs new file mode 100644 index 0000000..d8d3864 --- /dev/null +++ b/src/models/mod.rs @@ -0,0 +1 @@ +pub mod ingress; diff --git a/src/rabbitmq/consumer.rs b/src/rabbitmq/consumer.rs index 4ff9874..4c062f1 100644 --- a/src/rabbitmq/consumer.rs +++ b/src/rabbitmq/consumer.rs @@ -2,10 +2,13 @@ use lapin::{ message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue }; use futures_lite::stream::StreamExt; -use tracing::info; +use crate::models::ingress::IngressContent; use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; +use tracing::{info, error}; +use tokio::fs; +/// Struct to consume messages from RabbitMQ. pub struct RabbitMQConsumer { common: RabbitMQCommon, pub queue: Queue, @@ -26,7 +29,7 @@ impl RabbitMQConsumer { // Initialize the consumer let consumer = Self::initialize_consumer(&common.channel, &config).await?; - Ok(Self { common, queue, consumer}) + Ok(Self { common, queue, consumer }) } async fn initialize_consumer(channel: &Channel, config: &RabbitMQConfig) -> Result { @@ -67,16 +70,21 @@ impl RabbitMQConsumer { .map_err(|e| RabbitMQError::QueueError(e.to_string())) } - pub async fn consume(&self) -> Result<(String, Delivery), RabbitMQError> { + /// Consumes a message and returns the deserialized IngressContent along with the Delivery + pub async fn consume(&self) -> Result<(IngressContent, Delivery), RabbitMQError> { + // Receive the next message let delivery = self.consumer.clone().next().await .ok_or_else(|| RabbitMQError::ConsumeError("No message received".to_string()))? .map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?; - let message = String::from_utf8_lossy(&delivery.data).to_string(); + // Deserialize the message payload into IngressContent + let ingress: IngressContent = serde_json::from_slice(&delivery.data) + .map_err(|e| RabbitMQError::ConsumeError(format!("Deserialization Error: {}", e)))?; - Ok((message, delivery)) + Ok((ingress, delivery)) } + /// Acknowledges the message after processing pub async fn ack_delivery(&self, delivery: Delivery) -> Result<(), RabbitMQError> { self.common.channel .basic_ack(delivery.delivery_tag, BasicAckOptions::default()) @@ -85,5 +93,37 @@ impl RabbitMQConsumer { Ok(()) } -} + /// Processes messages in a loop + pub async fn process_messages(&self) -> Result<(), RabbitMQError> { + loop { + match self.consume().await { + Ok((ingress, delivery)) => { + info!("Received ingress object: {:?}", ingress); + + // Process the ingress object + self.handle_ingress_content(&ingress).await; + + info!("Processing done, acknowledging message"); + self.ack_delivery(delivery).await?; + } + Err(RabbitMQError::ConsumeError(e)) => { + error!("Error consuming message: {}", e); + // Optionally add a delay before trying again + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + } + Err(e) => { + error!("Unexpected error: {}", e); + break; + } + } + } + + Ok(()) + } + + /// Handles the IngressContent based on its type + async fn handle_ingress_content(&self, ingress: &IngressContent) { + info!("Processing content: {:?}", ingress); + } +} diff --git a/src/rabbitmq/mod.rs b/src/rabbitmq/mod.rs index 3959fe7..8a8f1ea 100644 --- a/src/rabbitmq/mod.rs +++ b/src/rabbitmq/mod.rs @@ -1,8 +1,8 @@ -pub mod producer; +pub mod publisher; pub mod consumer; use lapin::{ - options::{ExchangeDeclareOptions, QueueDeclareOptions}, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind, Queue + options::ExchangeDeclareOptions, types::FieldTable, Channel, Connection, ConnectionProperties, ExchangeKind }; use thiserror::Error; diff --git a/src/rabbitmq/producer.rs b/src/rabbitmq/producer.rs deleted file mode 100644 index 8e44690..0000000 --- a/src/rabbitmq/producer.rs +++ /dev/null @@ -1,39 +0,0 @@ -use lapin::{ - options::*, publisher_confirm::Confirmation, BasicProperties, -}; - -use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; - -pub struct RabbitMQProducer { - common: RabbitMQCommon, - exchange_name: String, - routing_key: String, -} - -impl RabbitMQProducer { - pub async fn new(config: &RabbitMQConfig) -> Result { - let common = RabbitMQCommon::new(config).await?; - common.declare_exchange(config, false).await?; - - Ok(Self { - common, - exchange_name: config.exchange.clone(), - routing_key: config.routing_key.clone(), - }) - } - - pub async fn publish(&self, payload: &[u8]) -> Result { - self.common.channel - .basic_publish( - &self.exchange_name, - &self.routing_key, - BasicPublishOptions::default(), - payload, - BasicProperties::default(), - ) - .await - .map_err(|e| RabbitMQError::PublishError(e.to_string()))? - .await - .map_err(|e| RabbitMQError::PublishError(e.to_string())) - } -} diff --git a/src/rabbitmq/publisher.rs b/src/rabbitmq/publisher.rs new file mode 100644 index 0000000..70033a2 --- /dev/null +++ b/src/rabbitmq/publisher.rs @@ -0,0 +1,60 @@ +use lapin::{ + options::*, publisher_confirm::Confirmation, BasicProperties, +}; +use crate::models::ingress::IngressContent; + +use super::{RabbitMQCommon, RabbitMQConfig, RabbitMQError}; +use tracing::{info, error}; + +pub struct RabbitMQProducer { + common: RabbitMQCommon, + exchange_name: String, + routing_key: String, +} + +impl RabbitMQProducer { + pub async fn new(config: &RabbitMQConfig) -> Result { + let common = RabbitMQCommon::new(config).await?; + common.declare_exchange(config, false).await?; + + Ok(Self { + common, + exchange_name: config.exchange.clone(), + routing_key: config.routing_key.clone(), + }) + } + + /// Publishes an IngressContent object to RabbitMQ after serializing it to JSON. + pub async fn publish(&self, ingress: &IngressContent) -> Result { + // Serialize IngressContent to JSON + let payload = serde_json::to_vec(ingress) + .map_err(|e| { + error!("Serialization Error: {}", e); + RabbitMQError::PublishError(format!("Serialization Error: {}", e)) + })?; + + // Publish the serialized payload to RabbitMQ + let confirmation = self.common.channel + .basic_publish( + &self.exchange_name, + &self.routing_key, + BasicPublishOptions::default(), + &payload, + BasicProperties::default(), + ) + .await + .map_err(|e| { + error!("Publish Error: {}", e); + RabbitMQError::PublishError(format!("Publish Error: {}", e)) + })? + .await + .map_err(|e| { + error!("Publish Confirmation Error: {}", e); + RabbitMQError::PublishError(format!("Publish Confirmation Error: {}", e)) + })?; + + info!("Published message to exchange '{}' with routing key '{}'", self.exchange_name, self.routing_key); + + Ok(confirmation) + } +} diff --git a/src/server.rs b/src/server.rs index 21c4c82..ad7deae 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,35 +1,76 @@ use axum::{ - http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router + extract::Multipart, http::StatusCode, response::{IntoResponse, Response}, routing::{get, post}, Extension, Json, Router }; -use serde::Deserialize; use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, producer::RabbitMQProducer, RabbitMQConfig}; +use uuid::Uuid; +use zettle_db::{models::ingress::{FileInfo, IngressContent, IngressMultipart }, rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig}}; +use zettle_db::rabbitmq::publisher::RabbitMQProducer; use std::sync::Arc; - -#[derive(Deserialize)] -struct IngressPayload { - payload: String, -} +use axum_typed_multipart::TypedMultipart; +use axum::debug_handler; use tracing::{info, error}; -async fn ingress_handler( +pub async fn ingress_handler( Extension(producer): Extension>, - Json(payload): Json -) -> Response { - info!("Received payload: {:?}", payload.payload); - match producer.publish(&payload.payload.into_bytes().to_vec()).await { + TypedMultipart(multipart_data): TypedMultipart, // Parse form data +) -> impl IntoResponse { + info!("Received multipart data: {:?}", &multipart_data); + + let file_info = if let Some(file) = multipart_data.file { + // File name or default to "data.bin" if none is provided + let file_name = file.metadata.file_name.unwrap_or(String::from("data.bin")); + let mime_type = mime_guess::from_path(&file_name) + .first_or_octet_stream() + .to_string(); + let uuid = Uuid::new_v4(); + let path = std::path::Path::new("/tmp").join(uuid.to_string()).join(&file_name); + + // Persist the file + match file.contents.persist(&path) { + Ok(_) => { + info!("File saved at: {:?}", path); + // Generate FileInfo + let file_info = FileInfo { + uuid, + sha256: "sha-12412".to_string(), + path: path.to_string_lossy().to_string(), + mime_type, + }; + Some(file_info) + } + Err(e) => { + error!("Failed to save file: {:?}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to store file").into_response(); + } + } + } else { + None // No file was uploaded + }; + + // Convert `IngressMultipart` to `IngressContent` + let content = match IngressContent::new(multipart_data.content, multipart_data.instructions,multipart_data.category, file_info).await { + Ok(content) => content, + Err(e) => { + error!("Error creating IngressContent: {:?}", e); + return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to create content").into_response(); + } + }; + + // Publish content to RabbitMQ (or other system) + match producer.publish(&content).await { Ok(_) => { info!("Message published successfully"); - "thank you".to_string().into_response() - }, + "Successfully processed".to_string().into_response() + } Err(e) => { error!("Failed to publish message: {:?}", e); - (axum::http::StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() + (StatusCode::INTERNAL_SERVER_ERROR, "Failed to publish message").into_response() } } } + async fn queue_length_handler() -> Response { info!("Getting queue length"); @@ -61,6 +102,7 @@ async fn queue_length_handler() -> Response { } } + #[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn main() -> Result<(), Box> { // Set up tracing diff --git a/src/utils/mime.rs b/src/utils/mime.rs new file mode 100644 index 0000000..8ec3a98 --- /dev/null +++ b/src/utils/mime.rs @@ -0,0 +1,192 @@ +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use tracing::info; +use url::Url; +use uuid::Uuid; +use std::path::Path; +use tokio::fs; + + +/// Struct to reference stored files. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Reference { + pub uuid: Uuid, + pub path: String, +} + +impl Reference { + /// Creates a new Reference with a generated UUID. + pub fn new(path: String) -> Self { + Self { + uuid: Uuid::new_v4(), + path, + } + } +} + +/// Enum representing different types of content. +#[derive(Serialize, Deserialize, Debug, Clone)] +pub enum Content { + Text(String), + Url(String), + Document(Reference), + Video(Reference), + Audio(Reference), + // Extend with more variants as needed +} + +impl Content { + /// Retrieves the path from a reference if the content is a Reference variant. + pub fn get_path(&self) -> Option<&str> { + match self { + Content::Document(ref r) | Content::Video(ref r) | Content::Audio(ref r) => Some(&r.path), + _ => None, + } + } +} +#[derive(Error, Debug)] +pub enum IngressContentError { + #[error("IO error occurred: {0}")] + Io(#[from] std::io::Error), + + #[error("UTF-8 conversion error: {0}")] + Utf8(#[from] std::string::FromUtf8Error), + + #[error("MIME type detection failed for input: {0}")] + MimeDetection(String), + + #[error("Unsupported MIME type: {0}")] + UnsupportedMime(String), + + #[error("URL parse error: {0}")] + UrlParse(#[from] url::ParseError), + + // Add more error variants as needed. +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct IngressContent { + pub content: Content, + pub category: String, + pub instructions: String, +} + +impl IngressContent { + /// Creates a new IngressContent instance from the given input. + /// + /// # Arguments + /// + /// * `input` - A string slice that holds the input content, which can be text, a file path, or a URL. + /// * `category` - A string slice representing the category of the content. + /// * `instructions` - A string slice containing instructions for processing the content. + /// + /// # Returns + /// + /// * `Result` - The result containing either the IngressContent instance or an error. + pub async fn new( + input: &str, + category: &str, + instructions: &str, + ) -> Result { + // Check if the input is a valid URL + if let Ok(url) = Url::parse(input) { + info!("Detected URL: {}", url); + return Ok(IngressContent { + content: Content::Url(url.to_string()), + category: category.to_string(), + instructions: instructions.to_string(), + }); + } + + // Attempt to treat the input as a file path + if let Ok(metadata) = tokio::fs::metadata(input).await { + if metadata.is_file() { + info!("Processing as file path: {}", input); + let mime = mime_guess::from_path(input).first_or(mime::TEXT_PLAIN); + let reference = Self::store_file(input, &mime).await?; + let content = match mime.type_() { + mime::TEXT | mime::APPLICATION => Content::Document(reference), + mime::VIDEO => Content::Video(reference), + mime::AUDIO => Content::Audio(reference), + other => { + info!("Detected unsupported MIME type: {}", other); + return Err(IngressContentError::UnsupportedMime(mime.to_string())); + } + }; + return Ok(IngressContent { + content, + category: category.to_string(), + instructions: instructions.to_string(), + }); + } + } + + // Treat the input as plain text if it's neither a URL nor a file path + info!("Treating input as plain text"); + Ok(IngressContent { + content: Content::Text(input.to_string()), + category: category.to_string(), + instructions: instructions.to_string(), + }) + } + + /// Stores the file into 'data/' directory and returns a Reference. + async fn store_file(input_path: &str, mime: &mime::Mime) -> Result { + + return Ok(Reference::new(input_path.to_string())); + + // Define the data directory + let data_dir = Path::new("data/"); + + // Ensure 'data/' directory exists; create it if it doesn't + fs::create_dir_all(data_dir).await.map_err(IngressContentError::Io)?; + + // Generate a UUID for the file + let uuid = Uuid::new_v4(); + + // Determine the file extension based on MIME type + // let extension = Some(mime_guess::get_mime_extensions(mime)).unwrap_or("bin"); + + // Create a unique filename using UUID and extension + let file_name = format!("{}.{}", uuid, extension); + + // Define the full file path + let file_path = data_dir.join(&file_name); + + // Copy the original file to the 'data/' directory with the new filename + fs::copy(input_path, &file_path).await.map_err(IngressContentError::Io)?; + + // Return a new Reference + Ok(Reference::new(file_path.to_string_lossy().to_string())) + } + + /// Example method to handle content. Implement your actual logic here. + pub fn handle_content(&self) { + match &self.content { + Content::Text(text) => { + // Handle text content + println!("Text: {}", text); + } + Content::Url(url) => { + // Handle URL content + println!("URL: {}", url); + } + Content::Document(ref reference) => { + // Handle Document content via reference + println!("Document Reference: UUID: {}, Path: {}", reference.uuid, reference.path); + // Optionally, read the file from reference.path + } + Content::Video(ref reference) => { + // Handle Video content via reference + println!("Video Reference: UUID: {}, Path: {}", reference.uuid, reference.path); + // Optionally, read the file from reference.path + } + Content::Audio(ref reference) => { + // Handle Audio content via reference + println!("Audio Reference: UUID: {}, Path: {}", reference.uuid, reference.path); + // Optionally, read the file from reference.path + } + // Handle additional content types + } + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs new file mode 100644 index 0000000..701e689 --- /dev/null +++ b/src/utils/mod.rs @@ -0,0 +1 @@ +// pub mod mime; diff --git a/test.txt b/test.txt new file mode 100644 index 0000000..9daeafb --- /dev/null +++ b/test.txt @@ -0,0 +1 @@ +test diff --git a/test.unsupported b/test.unsupported new file mode 100644 index 0000000..5d33aab --- /dev/null +++ b/test.unsupported @@ -0,0 +1 @@ +Unsupported content \ No newline at end of file