email wip

This commit is contained in:
Per Stark
2024-12-22 19:55:47 +01:00
parent 3d941d948d
commit 9f23005210
23 changed files with 674 additions and 189 deletions

View File

@@ -32,7 +32,7 @@ use zettle_db::{
AppState,
},
storage::{db::SurrealDbClient, types::user::User},
utils::mailer::Mailer,
utils::{config::get_config, mailer::Mailer},
};
#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
@@ -44,12 +44,16 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.try_init()
.ok();
let config = get_config()?;
info!("{:?}", config);
// Set up RabbitMQ
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
let rabbitmq_config = RabbitMQConfig {
amqp_addr: config.rabbitmq_address,
exchange: config.rabbitmq_exchange,
queue: config.rabbitmq_queue,
routing_key: config.rabbitmq_routing_key,
};
let reloader = AutoReloader::new(move |notifier| {
@@ -62,14 +66,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(env)
});
let mailer = Mailer::new();
let app_state = AppState {
rabbitmq_producer: Arc::new(RabbitMQProducer::new(&config).await?),
rabbitmq_consumer: Arc::new(RabbitMQConsumer::new(&config, false).await?),
surreal_db_client: Arc::new(SurrealDbClient::new().await?),
rabbitmq_producer: Arc::new(RabbitMQProducer::new(&rabbitmq_config).await?),
rabbitmq_consumer: Arc::new(RabbitMQConsumer::new(&rabbitmq_config, false).await?),
surreal_db_client: Arc::new(
SurrealDbClient::new(
&config.surrealdb_address,
&config.surrealdb_username,
&config.surrealdb_password,
&config.surrealdb_namespace,
&config.surrealdb_database,
)
.await?,
),
openai_client: Arc::new(async_openai::Client::new()),
templates: Arc::new(reloader),
mailer: Arc::new(Mailer::new(
config.smtp_username,
config.smtp_relayer,
config.smtp_password,
)?),
};
// setup_auth(&app_state.surreal_db_client).await?;

View File

@@ -1,6 +1,10 @@
use tracing::info;
use tracing::{error, info};
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
use zettle_db::rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig};
use zettle_db::{
ingress::content_processor::ContentProcessor,
rabbitmq::{consumer::RabbitMQConsumer, RabbitMQConfig, RabbitMQError},
utils::config::get_config,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -13,19 +17,47 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
info!("Starting RabbitMQ consumer");
let config = get_config()?;
// Set up RabbitMQ config
let config = RabbitMQConfig {
amqp_addr: "amqp://localhost".to_string(),
exchange: "my_exchange".to_string(),
queue: "my_queue".to_string(),
routing_key: "my_key".to_string(),
let rabbitmq_config = RabbitMQConfig {
amqp_addr: config.rabbitmq_address.clone(),
exchange: config.rabbitmq_exchange.clone(),
queue: config.rabbitmq_queue.clone(),
routing_key: config.rabbitmq_routing_key.clone(),
};
// Create a RabbitMQ consumer
let consumer = RabbitMQConsumer::new(&config, true).await?;
let consumer = RabbitMQConsumer::new(&rabbitmq_config, true).await?;
// Start consuming messages
consumer.process_messages().await?;
loop {
match consumer.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
// Get the TextContent
let text_content = ingress.to_text_content().await?;
// Initialize ContentProcessor which handles LLM analysis and storage
let content_processor = ContentProcessor::new(&config).await?;
// Begin processing of TextContent
content_processor.process(&text_content).await?;
// Remove from queue
consumer.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
Ok(())
}

View File

@@ -6,7 +6,7 @@ use tokio::task::JoinError;
use crate::{
ingress::types::ingress_input::IngressContentError, rabbitmq::RabbitMQError,
storage::types::file_info::FileError,
storage::types::file_info::FileError, utils::mailer::EmailError,
};
#[derive(Error, Debug)]
@@ -70,6 +70,8 @@ pub enum ApiError {
AuthRequired,
#[error("Templating error: {0}")]
TemplatingError(#[from] minijinja::Error),
#[error("Mail error: {0}")]
EmailError(#[from] EmailError),
}
impl IntoResponse for ApiError {
@@ -90,6 +92,7 @@ impl IntoResponse for ApiError {
ApiError::RabbitMQError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
ApiError::FileError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
ApiError::TemplatingError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
ApiError::EmailError(_) => (StatusCode::INTERNAL_SERVER_ERROR, self.to_string()),
};
(

View File

@@ -12,7 +12,7 @@ use crate::{
text_chunk::TextChunk, text_content::TextContent,
},
},
utils::embedding::generate_embedding,
utils::{config::AppConfig, embedding::generate_embedding},
};
use super::analysis::{
@@ -25,9 +25,16 @@ pub struct ContentProcessor {
}
impl ContentProcessor {
pub async fn new() -> Result<Self, ProcessingError> {
pub async fn new(app_config: &AppConfig) -> Result<Self, ProcessingError> {
Ok(Self {
db_client: SurrealDbClient::new().await?,
db_client: SurrealDbClient::new(
&app_config.surrealdb_address,
&app_config.surrealdb_username,
&app_config.surrealdb_password,
&app_config.surrealdb_namespace,
&app_config.surrealdb_database,
)
.await?,
openai_client: async_openai::Client::new(),
})
}

View File

@@ -1,13 +1,9 @@
use futures::StreamExt;
use lapin::{message::Delivery, options::*, types::FieldTable, Channel, Consumer, Queue};
use crate::{
error::IngressConsumerError,
ingress::{content_processor::ContentProcessor, types::ingress_object::IngressObject},
};
use crate::ingress::types::ingress_object::IngressObject;
use super::{RabbitMQCommon, RabbitMQCommonTrait, RabbitMQConfig, RabbitMQError};
use tracing::{error, info};
/// Struct to consume messages from RabbitMQ.
pub struct RabbitMQConsumer {
@@ -193,38 +189,6 @@ impl RabbitMQConsumer {
.await
.map_err(|e| RabbitMQError::ConsumeError(e.to_string()))?;
Ok(())
}
/// Function to continually consume messages as they come in
pub async fn process_messages(&self) -> Result<(), IngressConsumerError> {
loop {
match self.consume().await {
Ok((ingress, delivery)) => {
info!("Received IngressObject: {:?}", ingress);
// Get the TextContent
let text_content = ingress.to_text_content().await?;
// Initialize ContentProcessor which handles LLM analysis and storage
let content_processor = ContentProcessor::new().await?;
// Begin processing of TextContent
content_processor.process(&text_content).await?;
// Remove from queue
self.ack_delivery(delivery).await?;
}
Err(RabbitMQError::ConsumeError(e)) => {
error!("Error consuming message: {}", e);
// Optionally add a delay before trying again
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
Err(e) => {
error!("Unexpected error: {}", e);
break;
}
}
}
Ok(())
}
}

View File

@@ -1,6 +1,7 @@
use crate::rabbitmq::consumer::RabbitMQConsumer;
use crate::rabbitmq::publisher::RabbitMQProducer;
use crate::storage::db::SurrealDbClient;
use crate::utils::mailer::Mailer;
use minijinja_autoreload::AutoReloader;
use std::sync::Arc;
@@ -14,4 +15,5 @@ pub struct AppState {
pub surreal_db_client: Arc<SurrealDbClient>,
pub openai_client: Arc<async_openai::Client<async_openai::config::OpenAIConfig>>,
pub templates: Arc<AutoReloader>,
pub mailer: Arc<Mailer>,
}

View File

@@ -1,5 +1,6 @@
use axum::{extract::State, http::StatusCode, response::IntoResponse};
use tracing::info;
use minijinja::context;
use tracing::{info, Instrument};
use crate::{error::ApiError, server::AppState};
@@ -12,6 +13,10 @@ pub async fn queue_length_handler(
info!("Queue length: {}", queue_length);
state
.mailer
.send_email_verification("per@starks.cloud", "1001010", &state.templates)?;
// Return the queue length with a 200 OK status
Ok((StatusCode::OK, queue_length.to_string()))
}

View File

@@ -11,7 +11,7 @@ use crate::{
storage::types::user::User,
};
page_data!(IndexData, {
page_data!(IndexData, "index.html", {
queue_length: u32,
});
@@ -25,7 +25,9 @@ pub async fn index_handler(
let queue_length = state.rabbitmq_consumer.get_queue_length().await?;
let output = render_template("index.html", IndexData { queue_length }, state.templates)?;
let data = IndexData { queue_length };
let output = render_template(IndexData::template_name(), data, state.templates)?;
Ok(output)
}

View File

@@ -7,6 +7,10 @@ pub mod auth;
pub mod index;
pub mod search_result;
pub trait PageData {
fn template_name() -> &'static str;
}
pub fn render_template<T>(
template_name: &str,
context: T,
@@ -44,12 +48,19 @@ where
#[macro_export]
macro_rules! page_data {
($name:ident, {$($(#[$attr:meta])* $field:ident: $ty:ty),*$(,)?}) => {
($name:ident, $template_name:expr, {$($(#[$attr:meta])* $field:ident: $ty:ty),*$(,)?}) => {
use serde::{Serialize, Deserialize};
use $crate::server::routes::html::PageData;
#[derive(Debug, Deserialize, Serialize)]
pub struct $name {
$($(#[$attr])* pub $field: $ty),*
}
impl PageData for $name {
fn template_name() -> &'static str {
$template_name
}
}
};
}

View File

@@ -18,18 +18,20 @@ impl SurrealDbClient {
///
/// # Returns
/// * `SurrealDbClient` initialized
pub async fn new() -> Result<Self, Error> {
let db = connect("ws://127.0.0.1:8000").await?;
pub async fn new(
address: &str,
username: &str,
password: &str,
namespace: &str,
database: &str,
) -> Result<Self, Error> {
let db = connect(address).await?;
// Sign in to database
db.signin(Root {
username: "root_user",
password: "root_password",
})
.await?;
db.signin(Root { username, password }).await?;
// Set namespace
db.use_ns("test").use_db("test").await?;
db.use_ns(namespace).use_db(database).await?;
Ok(SurrealDbClient { client: db })
}

38
src/utils/config.rs Normal file
View File

@@ -0,0 +1,38 @@
use config::{Config, ConfigError, File};
#[derive(Clone, Debug)]
pub struct AppConfig {
pub smtp_username: String,
pub smtp_password: String,
pub smtp_relayer: String,
pub rabbitmq_address: String,
pub rabbitmq_exchange: String,
pub rabbitmq_queue: String,
pub rabbitmq_routing_key: String,
pub surrealdb_address: String,
pub surrealdb_username: String,
pub surrealdb_password: String,
pub surrealdb_namespace: String,
pub surrealdb_database: String,
}
pub fn get_config() -> Result<AppConfig, ConfigError> {
let config = Config::builder()
.add_source(File::with_name("config"))
.build()?;
Ok(AppConfig {
smtp_username: config.get_string("SMTP_USERNAME")?,
smtp_password: config.get_string("SMTP_PASSWORD")?,
smtp_relayer: config.get_string("SMTP_RELAYER")?,
rabbitmq_address: config.get_string("RABBITMQ_ADDRESS")?,
rabbitmq_exchange: config.get_string("RABBITMQ_EXCHANGE")?,
rabbitmq_queue: config.get_string("RABBITMQ_QUEUE")?,
rabbitmq_routing_key: config.get_string("RABBITMQ_ROUTING_KEY")?,
surrealdb_address: config.get_string("SURREALDB_ADDRESS")?,
surrealdb_username: config.get_string("SURREALDB_USERNAME")?,
surrealdb_password: config.get_string("SURREALDB_PASSWORD")?,
surrealdb_namespace: config.get_string("SURREALDB_NAMESPACE")?,
surrealdb_database: config.get_string("SURREALDB_DATABASE")?,
})
}

View File

@@ -1,23 +1,81 @@
use std::env;
use lettre::address::AddressError;
use lettre::message::MultiPart;
use lettre::{transport::smtp::authentication::Credentials, SmtpTransport};
use lettre::{Message, Transport};
use minijinja::context;
use minijinja_autoreload::AutoReloader;
use thiserror::Error;
use tracing::info;
pub struct Mailer {
pub mailer: SmtpTransport,
}
#[derive(Error, Debug)]
pub enum EmailError {
#[error("Email construction error: {0}")]
EmailParsingError(#[from] AddressError),
#[error("Email sending error: {0}")]
SendingError(#[from] lettre::transport::smtp::Error),
#[error("Body constructing error: {0}")]
BodyError(#[from] lettre::error::Error),
#[error("Templating error: {0}")]
TemplatingError(#[from] minijinja::Error),
}
impl Mailer {
pub fn new() -> Self {
let creds = Credentials::new(
env::var("SMTP_USERNAME").unwrap().to_owned(),
env::var("SMTP_PASSWORD").unwrap().to_owned(),
);
pub fn new(
username: String,
relayer: String,
password: String,
) -> Result<Self, lettre::transport::smtp::Error> {
let creds = Credentials::new(username, password);
let mailer = SmtpTransport::relay(env::var("SMTP_RELAYER").unwrap().as_str())
.unwrap()
.credentials(creds)
.build();
let mailer = SmtpTransport::relay(&relayer)?.credentials(creds).build();
Mailer { mailer }
Ok(Mailer { mailer })
}
pub fn send_email_verification(
&self,
email_to: &str,
verification_code: &str,
templates: &AutoReloader,
) -> Result<(), EmailError> {
let name = email_to
.split('@')
.next()
.unwrap_or("User")
.chars()
.enumerate()
.map(|(i, c)| if i == 0 { c.to_ascii_uppercase() } else { c })
.collect::<String>();
let context = context! {
name => name,
verification_code => verification_code
};
let env = templates.acquire_env()?;
let html = env
.get_template("email/email_verification.html")?
.render(&context)?;
let plain = env
.get_template("email/email_verification.txt")?
.render(&context)?;
let email = Message::builder()
.from("Admin <minne@starks.cloud>".parse()?)
.reply_to("Admin <minne@starks.cloud>".parse()?)
.to(format!("{} <{}>", name, email_to).parse()?)
.subject("Verify Your Email Address")
.multipart(MultiPart::alternative_plain_html(plain, html))?;
info!("Sending email to: {}", email_to);
self.mailer.send(&email)?;
Ok(())
}
}

View File

@@ -1,2 +1,3 @@
pub mod config;
pub mod embedding;
pub mod mailer;