From 091270b458a66afdd1891ea6bd3f4dfd0224b346 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Sat, 1 Mar 2025 22:43:20 +0100 Subject: [PATCH] uniform template for references, mvp chat interface --- assets/style.css | 85 ++ example_flake.nix | 72 -- prompt | 1117 ----------------- .../html/chat/message_response_stream.rs | 22 +- src/storage/types/message.rs | 17 - templates/chat/base.html | 4 +- templates/chat/drawer.html | 3 +- templates/chat/history.html | 19 +- templates/chat/new_message_form.html | 2 +- templates/chat/reference_list.html | 12 +- todo.md | 11 +- 11 files changed, 127 insertions(+), 1237 deletions(-) delete mode 100644 example_flake.nix delete mode 100644 prompt diff --git a/assets/style.css b/assets/style.css index c4fd15a..31b9fcb 100644 --- a/assets/style.css +++ b/assets/style.css @@ -2242,6 +2242,18 @@ .bottom-0 { bottom: calc(var(--spacing) * 0); } + .bottom-0\.5 { + bottom: calc(var(--spacing) * 0.5); + } + .bottom-1 { + bottom: calc(var(--spacing) * 1); + } + .bottom-2 { + bottom: calc(var(--spacing) * 2); + } + .bottom-4 { + bottom: calc(var(--spacing) * 4); + } .\!left-3 { left: calc(var(--spacing) * 3) !important; } @@ -3761,6 +3773,9 @@ color: var(--color-base-content); font-weight: 600; } + .\!mb-0 { + margin-bottom: calc(var(--spacing) * 0) !important; + } .mb-2 { margin-bottom: calc(var(--spacing) * 2); } @@ -4144,9 +4159,57 @@ .h-32 { height: calc(var(--spacing) * 32); } + .h-100 { + height: calc(var(--spacing) * 100); + } + .h-\[calc\(100vh-48px\)\] { + height: calc(100vh - 48px); + } + .h-\[calc\(100vh-52px\)\] { + height: calc(100vh - 52px); + } + .h-\[calc\(100vh-54px\)\] { + height: calc(100vh - 54px); + } + .h-\[calc\(100vh-60px\)\] { + height: calc(100vh - 60px); + } + .h-\[calc\(100vh-65px\)\] { + height: calc(100vh - 65px); + } + .h-\[calc\(100vh-80px\)\] { + height: calc(100vh - 80px); + } + .h-\[calc\(100vh-120px\)\] { + height: calc(100vh - 120px); + } + .h-\[calc\(100vh-140px\)\] { + height: calc(100vh - 140px); + } .h-\[calc\(100vh-160px\)\] { height: calc(100vh - 160px); } + .h-\[calc\(100vh-170px\)\] { + height: calc(100vh - 170px); + } + .h-\[calc\(100vh-175px\)\] { + height: calc(100vh - 175px); + } + .h-\[calc\(100vh-180px\)\] { + height: calc(100vh - 180px); + } + .h-\[calc\(100vh-200px\)\] { + height: calc(100vh - 200px); + } + .h-\[calc\(100vh-240px\)\] { + height: calc(100vh - 240px); + } + .h-full { + height: 100%; + } + .max-h-full { + max-height: 100%; + } .min-h-\[100dvh\] { min-height: 100dvh; } @@ -4389,12 +4452,18 @@ text-overflow: ellipsis; white-space: nowrap; } + .overflow-auto { + overflow: auto; + } .overflow-clip { overflow: clip; } .overflow-hidden { overflow: hidden; } + .overflow-visible { + overflow: visible; + } .overflow-x-auto { overflow-x: auto; } @@ -4593,12 +4662,18 @@ .pr-12 { padding-right: calc(var(--spacing) * 12); } + .\!pb-0 { + padding-bottom: calc(var(--spacing) * 0) !important; + } .pb-0 { padding-bottom: calc(var(--spacing) * 0); } .pb-10 { padding-bottom: calc(var(--spacing) * 10); } + .pb-24 { + padding-bottom: calc(var(--spacing) * 24); + } .pb-32 { padding-bottom: calc(var(--spacing) * 32); } @@ -4970,11 +5045,21 @@ padding-inline: calc(var(--spacing) * 0); } } + .sm\:pb-0 { + @media (width >= 40rem) { + padding-bottom: calc(var(--spacing) * 0); + } + } .sm\:pb-4 { @media (width >= 40rem) { padding-bottom: calc(var(--spacing) * 4); } } + .sm\:pb-32 { + @media (width >= 40rem) { + padding-bottom: calc(var(--spacing) * 32); + } + } .sm\:text-6xl { @media (width >= 40rem) { font-size: var(--text-6xl); diff --git a/example_flake.nix b/example_flake.nix deleted file mode 100644 index 5e117a9..0000000 --- a/example_flake.nix +++ /dev/null @@ -1,72 +0,0 @@ -{ - inputs = { - nixpkgs.url = "github:nixos/nixpkgs/nixos-unstable"; - systems.url = "github:nix-systems/default"; - devenv.url = "github:cachix/devenv"; - devenv.inputs.nixpkgs.follows = "nixpkgs"; - }; - - nixConfig = { - extra-trusted-public-keys = "devenv.cachix.org-1:w1cLUi8dv3hnoSPGAuibQv+f9TZLr6cv/Hm9XgU50cw="; - extra-substituters = "https://devenv.cachix.org"; - }; - - outputs = { - self, - nixpkgs, - devenv, - systems, - ... - } @ inputs: let - forEachSystem = nixpkgs.lib.genAttrs (import systems); - in { - packages = forEachSystem (system: { - devenv-up = self.devShells.${system}.default.config.procfileScript; - }); - - devShells = - forEachSystem - (system: let - pkgs = nixpkgs.legacyPackages.${system}; - in { - default = devenv.lib.mkShell { - inherit inputs pkgs; - modules = [ - { - # https://devenv.sh/reference/options/ - enterShell = '' - echo "Welcome to zettle_db project" - echo "----------------------------" - echo "run devenv up -d to start and monitor services" - ''; - - packages = [ - pkgs.vscode-langservers-extracted - pkgs.openssl - ]; - - languages.rust.enable = true; - - processes = { - surreal_db.exec = "docker run --rm --pull always -p 8000:8000 --user $(id -u) -v $(pwd)/database:/database surrealdb/surrealdb:latest-dev start rocksdb:/database/database.db --user root_user --pass root_password"; - # tailwind_css.exec = "npx tailwindcss --input src/server/assets/input.css --output src/server/assets/style.css -w"; - }; - - services = { - rabbitmq = { - enable = true; - # plugins = ["tracing"]; - }; - }; - - env = { - SMTP_RELAYER = "relay_address"; - SMTP_USERNAME = "relay_user"; - SMTP_PASSWORD = "relay_pass"; - }; - } - ]; - }; - }); - }; -} diff --git a/prompt b/prompt deleted file mode 100644 index 6ea2a8a..0000000 --- a/prompt +++ /dev/null @@ -1,1117 +0,0 @@ -I want your assistance refactoring a section of my rust application. It's been a while since I set it up and I feel that there is a more straightforward solution that results in reduced complexity. There is some redundancy in certain places and there is a intermediate struct that could be removed. I will detail the flow of the application, relevant structs and code snippets. Not having nested enums will be helpful when writing the frontend portion of the application (I use minijinja and SSR). - - -User submits a multipart form containing files and or content, the content can be text content or an URL, which will then be scraped and parsed into textcontent. The form is then being split up to several objects which duplicates instructions, userid and category, and each file or content will be a object each. This is nessecary for the next step which then processes the content and generates nodes and vector representations. So each input is then saved to a queue from the server, and a worker picks the jobs up and processes the objects. - - - -use crate::{ - error::{ApiError, AppError}, - ingress::types::ingress_input::{create_ingress_objects, IngressInput}, - server::AppState, - storage::types::{file_info::FileInfo, user::User}, -}; -use axum::{extract::State, http::StatusCode, response::IntoResponse, Extension}; -use axum_typed_multipart::{FieldData, TryFromMultipart, TypedMultipart}; -use futures::{future::try_join_all, TryFutureExt}; -use tempfile::NamedTempFile; -use tracing::{debug, info}; - -#[derive(Debug, TryFromMultipart)] -pub struct IngressParams { - pub content: Option, - pub instructions: String, - pub category: String, - #[form_data(limit = "10000000")] // Adjust limit as needed - #[form_data(default)] - pub files: Vec>, -} - -pub async fn ingress_data( - State(state): State, - Extension(user): Extension, - TypedMultipart(input): TypedMultipart, -) -> Result { - info!("Received input: {:?}", input); - - let file_infos = try_join_all(input.files.into_iter().map(|file| { - FileInfo::new(file, &state.surreal_db_client, &user.id).map_err(AppError::from) - })) - .await?; - - debug!("Got file infos"); - - let ingress_objects = create_ingress_objects( - IngressInput { - content: input.content, - instructions: input.instructions, - category: input.category, - files: file_infos, - }, - user.id.as_str(), - )?; - debug!("Got ingress objects"); - - let futures: Vec<_> = ingress_objects - .into_iter() - .map(|object| state.job_queue.enqueue(object.clone(), user.id.clone())) - .collect(); - - try_join_all(futures).await.map_err(AppError::from)?; - - Ok(StatusCode::OK) -} - - - - -use axum_typed_multipart::FieldData; -use mime_guess::from_path; -use sha2::{Digest, Sha256}; -use std::{ - io::{BufReader, Read}, - path::{Path, PathBuf}, -}; -use tempfile::NamedTempFile; -use thiserror::Error; -use tracing::info; -use uuid::Uuid; - -use crate::{ - storage::db::{store_item, SurrealDbClient}, - stored_object, -}; - -#[derive(Error, Debug)] -pub enum FileError { - #[error("File not found for UUID: {0}")] - FileNotFound(String), - - #[error("IO error occurred: {0}")] - Io(#[from] std::io::Error), - - #[error("Duplicate file detected with SHA256: {0}")] - DuplicateFile(String), - - #[error("SurrealDB error: {0}")] - SurrealError(#[from] surrealdb::Error), - - #[error("Failed to persist file: {0}")] - PersistError(#[from] tempfile::PersistError), - - #[error("File name missing in metadata")] - MissingFileName, -} - -stored_object!(FileInfo, "file", { - sha256: String, - path: String, - mime_type: String -}); - -impl FileInfo { - pub async fn new( - field_data: FieldData, - db_client: &SurrealDbClient, - user_id: &str, - ) -> Result { - let file = field_data.contents; - let file_name = field_data - .metadata - .file_name - .ok_or(FileError::MissingFileName)?; - - // Calculate SHA256 - let sha256 = Self::get_sha(&file).await?; - - // Early return if file already exists - match Self::get_by_sha(&sha256, db_client).await { - Ok(existing_file) => { - info!("File already exists with SHA256: {}", sha256); - return Ok(existing_file); - } - Err(FileError::FileNotFound(_)) => (), // Expected case for new files - Err(e) => return Err(e), // Propagate unexpected errors - } - - // Generate UUID and prepare paths - let uuid = Uuid::new_v4(); - let sanitized_file_name = Self::sanitize_file_name(&file_name); - - let now = Utc::now(); - // Create new FileInfo instance - let file_info = Self { - id: uuid.to_string(), - created_at: now, - updated_at: now, - sha256, - path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id) - .await? - .to_string_lossy() - .into(), - mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)), - }; - - // Store in database - store_item(&db_client.client, file_info.clone()).await?; - - Ok(file_info) - } - - /// Guesses the MIME type based on the file extension. - /// - /// # Arguments - /// * `path` - The path to the file. - /// - /// # Returns - /// * `String` - The guessed MIME type as a string. - fn guess_mime_type(path: &Path) -> String { - from_path(path) - .first_or(mime::APPLICATION_OCTET_STREAM) - .to_string() - } - - /// Calculates the SHA256 hash of the given file. - /// - /// # Arguments - /// * `file` - The file to hash. - /// - /// # Returns - /// * `Result` - The SHA256 hash as a hex string or an error. - async fn get_sha(file: &NamedTempFile) -> Result { - let mut reader = BufReader::new(file.as_file()); - let mut hasher = Sha256::new(); - let mut buffer = [0u8; 8192]; // 8KB buffer - - loop { - let n = reader.read(&mut buffer)?; - if n == 0 { - break; - } - hasher.update(&buffer[..n]); - } - - let digest = hasher.finalize(); - Ok(format!("{:x}", digest)) - } - - /// Sanitizes the file name to prevent security vulnerabilities like directory traversal. - /// Replaces any non-alphanumeric characters (excluding '.' and '_') with underscores. - fn sanitize_file_name(file_name: &str) -> String { - file_name - .chars() - .map(|c| { - if c.is_ascii_alphanumeric() || c == '.' || c == '_' { - c - } else { - '_' - } - }) - .collect() - } - - /// Persists the file to the filesystem under `./data/{user_id}/{uuid}/{file_name}`. - /// - /// # Arguments - /// * `uuid` - The UUID of the file. - /// * `file` - The temporary file to persist. - /// * `file_name` - The sanitized file name. - /// * `user-id` - User id - /// - /// # Returns - /// * `Result` - The persisted file path or an error. - async fn persist_file( - uuid: &Uuid, - file: NamedTempFile, - file_name: &str, - user_id: &str, - ) -> Result { - let base_dir = Path::new("./data"); - let user_dir = base_dir.join(user_id); // Create the user directory - let uuid_dir = user_dir.join(uuid.to_string()); // Create the UUID directory under the user directory - - // Create the user and UUID directories if they don't exist - tokio::fs::create_dir_all(&uuid_dir) - .await - .map_err(FileError::Io)?; - - // Define the final file path - let final_path = uuid_dir.join(file_name); - info!("Final path: {:?}", final_path); - - // Persist the temporary file to the final path - file.persist(&final_path)?; - info!("Persisted file to {:?}", final_path); - - Ok(final_path) - } - - /// Retrieves a `FileInfo` by SHA256. - /// - /// # Arguments - /// * `sha256` - The SHA256 hash string. - /// * `db_client` - Reference to the SurrealDbClient. - /// - /// # Returns - /// * `Result, FileError>` - The `FileInfo` or `None` if not found. - async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result { - let query = format!("SELECT * FROM file WHERE sha256 = '{}'", &sha256); - let response: Vec = db_client.client.query(query).await?.take(0)?; - - response - .into_iter() - .next() - .ok_or(FileError::FileNotFound(sha256.to_string())) - } -} - - -use axum_typed_multipart::FieldData; -use mime_guess::from_path; -use sha2::{Digest, Sha256}; -use std::{ - io::{BufReader, Read}, - path::{Path, PathBuf}, -}; -use tempfile::NamedTempFile; -use thiserror::Error; -use tracing::info; -use uuid::Uuid; - -use crate::{ - storage::db::{store_item, SurrealDbClient}, - stored_object, -}; - -#[derive(Error, Debug)] -pub enum FileError { - #[error("File not found for UUID: {0}")] - FileNotFound(String), - - #[error("IO error occurred: {0}")] - Io(#[from] std::io::Error), - - #[error("Duplicate file detected with SHA256: {0}")] - DuplicateFile(String), - - #[error("SurrealDB error: {0}")] - SurrealError(#[from] surrealdb::Error), - - #[error("Failed to persist file: {0}")] - PersistError(#[from] tempfile::PersistError), - - #[error("File name missing in metadata")] - MissingFileName, -} - -stored_object!(FileInfo, "file", { - sha256: String, - path: String, - mime_type: String -}); - -impl FileInfo { - pub async fn new( - field_data: FieldData, - db_client: &SurrealDbClient, - user_id: &str, - ) -> Result { - let file = field_data.contents; - let file_name = field_data - .metadata - .file_name - .ok_or(FileError::MissingFileName)?; - - // Calculate SHA256 - let sha256 = Self::get_sha(&file).await?; - - // Early return if file already exists - match Self::get_by_sha(&sha256, db_client).await { - Ok(existing_file) => { - info!("File already exists with SHA256: {}", sha256); - return Ok(existing_file); - } - Err(FileError::FileNotFound(_)) => (), // Expected case for new files - Err(e) => return Err(e), // Propagate unexpected errors - } - - // Generate UUID and prepare paths - let uuid = Uuid::new_v4(); - let sanitized_file_name = Self::sanitize_file_name(&file_name); - - let now = Utc::now(); - // Create new FileInfo instance - let file_info = Self { - id: uuid.to_string(), - created_at: now, - updated_at: now, - sha256, - path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id) - .await? - .to_string_lossy() - .into(), - mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)), - }; - - // Store in database - store_item(&db_client.client, file_info.clone()).await?; - - Ok(file_info) - } - - /// Guesses the MIME type based on the file extension. - /// - /// # Arguments - /// * `path` - The path to the file. - /// - /// # Returns - /// * `String` - The guessed MIME type as a string. - fn guess_mime_type(path: &Path) -> String { - from_path(path) - .first_or(mime::APPLICATION_OCTET_STREAM) - .to_string() - } - - /// Calculates the SHA256 hash of the given file. - /// - /// # Arguments - /// * `file` - The file to hash. - /// - /// # Returns - /// * `Result` - The SHA256 hash as a hex string or an error. - async fn get_sha(file: &NamedTempFile) -> Result { - let mut reader = BufReader::new(file.as_file()); - let mut hasher = Sha256::new(); - let mut buffer = [0u8; 8192]; // 8KB buffer - - loop { - let n = reader.read(&mut buffer)?; - if n == 0 { - break; - } - hasher.update(&buffer[..n]); - } - - let digest = hasher.finalize(); - Ok(format!("{:x}", digest)) - } - - /// Sanitizes the file name to prevent security vulnerabilities like directory traversal. - /// Replaces any non-alphanumeric characters (excluding '.' and '_') with underscores. - fn sanitize_file_name(file_name: &str) -> String { - file_name - .chars() - .map(|c| { - if c.is_ascii_alphanumeric() || c == '.' || c == '_' { - c - } else { - '_' - } - }) - .collect() - } - - /// Persists the file to the filesystem under `./data/{user_id}/{uuid}/{file_name}`. - /// - /// # Arguments - /// * `uuid` - The UUID of the file. - /// * `file` - The temporary file to persist. - /// * `file_name` - The sanitized file name. - /// * `user-id` - User id - /// - /// # Returns - /// * `Result` - The persisted file path or an error. - async fn persist_file( - uuid: &Uuid, - file: NamedTempFile, - file_name: &str, - user_id: &str, - ) -> Result { - let base_dir = Path::new("./data"); - let user_dir = base_dir.join(user_id); // Create the user directory - let uuid_dir = user_dir.join(uuid.to_string()); // Create the UUID directory under the user directory - - // Create the user and UUID directories if they don't exist - tokio::fs::create_dir_all(&uuid_dir) - .await - .map_err(FileError::Io)?; - - // Define the final file path - let final_path = uuid_dir.join(file_name); - info!("Final path: {:?}", final_path); - - // Persist the temporary file to the final path - file.persist(&final_path)?; - info!("Persisted file to {:?}", final_path); - - Ok(final_path) - } - - /// Retrieves a `FileInfo` by SHA256. - /// - /// # Arguments - /// * `sha256` - The SHA256 hash string. - /// * `db_client` - Reference to the SurrealDbClient. - /// - /// # Returns - /// * `Result, FileError>` - The `FileInfo` or `None` if not found. - async fn get_by_sha(sha256: &str, db_client: &SurrealDbClient) -> Result { - let query = format!("SELECT * FROM file WHERE sha256 = '{}'", &sha256); - let response: Vec = db_client.client.query(query).await?.take(0)?; - - response - .into_iter() - .next() - .ok_or(FileError::FileNotFound(sha256.to_string())) - } -} - - - -use chrono::Utc; -use futures::Stream; -use std::sync::Arc; -use surrealdb::{opt::PatchOp, Error, Notification}; -use tracing::{debug, error, info}; - -use crate::{ - error::AppError, - storage::{ - db::{delete_item, get_item, store_item, SurrealDbClient}, - types::{ - job::{Job, JobStatus}, - StoredObject, - }, - }, -}; - -use super::{content_processor::ContentProcessor, types::ingress_object::IngressObject}; - -pub struct JobQueue { - pub db: Arc, - pub openai_client: Arc>, -} - -pub const MAX_ATTEMPTS: u32 = 3; - -impl JobQueue { - pub fn new( - db: Arc, - openai_client: Arc>, - ) -> Self { - Self { db, openai_client } - } - - /// Creates a new job and stores it in the database - pub async fn enqueue(&self, content: IngressObject, user_id: String) -> Result<(), AppError> { - let job = Job::new(content, user_id).await; - - info!("{:?}", job); - - store_item(&self.db, job).await?; - - Ok(()) - } - - /// Gets all jobs for a specific user - pub async fn get_user_jobs(&self, user_id: &str) -> Result, AppError> { - let jobs: Vec = self - .db - .query("SELECT * FROM job WHERE user_id = $user_id ORDER BY created_at DESC") - .bind(("user_id", user_id.to_owned())) - .await? - .take(0)?; - - debug!("{:?}", jobs); - - Ok(jobs) - } - - /// Gets all active jobs for a specific user - pub async fn get_unfinished_user_jobs(&self, user_id: &str) -> Result, AppError> { - let jobs: Vec = self - .db - .query( - "SELECT * FROM type::table($table) - WHERE user_id = $user_id - AND ( - status = 'Created' - OR ( - status.InProgress != NONE - AND status.InProgress.attempts < $max_attempts - ) - ) - ORDER BY created_at DESC", - ) - .bind(("table", Job::table_name())) - .bind(("user_id", user_id.to_owned())) - .bind(("max_attempts", MAX_ATTEMPTS)) - .await? - .take(0)?; - debug!("{:?}", jobs); - Ok(jobs) - } - - pub async fn delete_job(&self, id: &str, user_id: &str) -> Result<(), AppError> { - get_item::(&self.db.client, id) - .await? - .filter(|job| job.user_id == user_id) - .ok_or_else(|| { - error!("Unauthorized attempt to delete job {id} by user {user_id}"); - AppError::Auth("Not authorized to delete this job".into()) - })?; - - info!("Deleting job {id} for user {user_id}"); - delete_item::(&self.db.client, id) - .await - .map_err(AppError::Database)?; - - Ok(()) - } - - pub async fn update_status(&self, id: &str, status: JobStatus) -> Result<(), AppError> { - let _job: Option = self - .db - .update((Job::table_name(), id)) - .patch(PatchOp::replace("/status", status)) - .patch(PatchOp::replace( - "/updated_at", - surrealdb::sql::Datetime::default(), - )) - .await?; - - Ok(()) - } - - /// Listen for new jobs - pub async fn listen_for_jobs( - &self, - ) -> Result, Error>>, Error> { - self.db.select("job").live().await - } - - /// Get unfinished jobs, ie newly created and in progress up two times - pub async fn get_unfinished_jobs(&self) -> Result, AppError> { - let jobs: Vec = self - .db - .query( - "SELECT * FROM type::table($table) - WHERE - status = 'Created' - OR ( - status.InProgress != NONE - AND status.InProgress.attempts < $max_attempts - ) - ORDER BY created_at ASC", - ) - .bind(("table", Job::table_name())) - .bind(("max_attempts", MAX_ATTEMPTS)) - .await? - .take(0)?; - - Ok(jobs) - } - - // Method to process a single job - pub async fn process_job( - &self, - job: Job, - processor: &ContentProcessor, - ) -> Result<(), AppError> { - let current_attempts = match job.status { - JobStatus::InProgress { attempts, .. } => attempts + 1, - _ => 1, - }; - - // Update status to InProgress with attempt count - self.update_status( - &job.id, - JobStatus::InProgress { - attempts: current_attempts, - last_attempt: Utc::now(), - }, - ) - .await?; - - let text_content = job.content.to_text_content(&self.openai_client).await?; - - match processor.process(&text_content).await { - Ok(_) => { - self.update_status(&job.id, JobStatus::Completed).await?; - Ok(()) - } - Err(e) => { - if current_attempts >= MAX_ATTEMPTS { - self.update_status( - &job.id, - JobStatus::Error(format!("Max attempts reached: {}", e)), - ) - .await?; - } - Err(AppError::Processing(e.to_string())) - } - } - } -} - - - - -use std::sync::Arc; - -use futures::StreamExt; -use surrealdb::Action; -use tracing::{error, info}; -use tracing_subscriber::{fmt, prelude::*, EnvFilter}; -use zettle_db::{ - ingress::{ - content_processor::ContentProcessor, - jobqueue::{JobQueue, MAX_ATTEMPTS}, - }, - storage::{ - db::{get_item, SurrealDbClient}, - types::job::{Job, JobStatus}, - }, - utils::config::get_config, -}; - -#[tokio::main] -async fn main() -> Result<(), Box> { - // Set up tracing - tracing_subscriber::registry() - .with(fmt::layer()) - .with(EnvFilter::from_default_env()) - .try_init() - .ok(); - - let config = get_config()?; - - let surreal_db_client = Arc::new( - SurrealDbClient::new( - &config.surrealdb_address, - &config.surrealdb_username, - &config.surrealdb_password, - &config.surrealdb_namespace, - &config.surrealdb_database, - ) - .await?, - ); - - let openai_client = Arc::new(async_openai::Client::new()); - - let job_queue = JobQueue::new(surreal_db_client.clone(), openai_client.clone()); - - let content_processor = ContentProcessor::new(surreal_db_client, openai_client).await?; - - loop { - // First, check for any unfinished jobs - let unfinished_jobs = job_queue.get_unfinished_jobs().await?; - - if !unfinished_jobs.is_empty() { - info!("Found {} unfinished jobs", unfinished_jobs.len()); - - for job in unfinished_jobs { - job_queue.process_job(job, &content_processor).await?; - } - } - - // If no unfinished jobs, start listening for new ones - info!("Listening for new jobs..."); - let mut job_stream = job_queue.listen_for_jobs().await?; - - while let Some(notification) = job_stream.next().await { - match notification { - Ok(notification) => { - info!("Received notification: {:?}", notification); - - match notification.action { - Action::Create => { - if let Err(e) = job_queue - .process_job(notification.data, &content_processor) - .await - { - error!("Error processing job: {}", e); - } - } - Action::Update => { - match notification.data.status { - JobStatus::Completed - | JobStatus::Error(_) - | JobStatus::Cancelled => { - info!( - "Skipping already completed/error/cancelled job: {}", - notification.data.id - ); - continue; - } - JobStatus::InProgress { attempts, .. } => { - // Only process if this is a retry after an error, not our own update - if let Ok(Some(current_job)) = - get_item::(&job_queue.db.client, ¬ification.data.id) - .await - { - match current_job.status { - JobStatus::Error(_) if attempts < MAX_ATTEMPTS => { - // This is a retry after an error - if let Err(e) = job_queue - .process_job(current_job, &content_processor) - .await - { - error!("Error processing job retry: {}", e); - } - } - _ => { - info!( - "Skipping in-progress update for job: {}", - notification.data.id - ); - continue; - } - } - } - } - JobStatus::Created => { - // Shouldn't happen with Update action, but process if it does - if let Err(e) = job_queue - .process_job(notification.data, &content_processor) - .await - { - error!("Error processing job: {}", e); - } - } - } - } - _ => {} // Ignore other actions - } - } - Err(e) => error!("Error in job notification: {}", e), - } - } - - // If we reach here, the stream has ended (connection lost?) - error!("Job stream ended unexpectedly, reconnecting..."); - tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; - } -} - - - - -use std::{sync::Arc, time::Duration}; - -use crate::{ - error::AppError, - storage::types::{file_info::FileInfo, text_content::TextContent}, -}; -use async_openai::types::{ - ChatCompletionRequestSystemMessage, ChatCompletionRequestUserMessage, - CreateChatCompletionRequestArgs, -}; -use reqwest; -use scraper::{Html, Selector}; -use serde::{Deserialize, Serialize}; -use std::fmt::Write; -use tiktoken_rs::{o200k_base, CoreBPE}; - -/// Knowledge object type, containing the content or reference to it, as well as metadata -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum IngressObject { - Url { - url: String, - instructions: String, - category: String, - user_id: String, - }, - Text { - text: String, - instructions: String, - category: String, - user_id: String, - }, - File { - file_info: FileInfo, - instructions: String, - category: String, - user_id: String, - }, -} - -impl IngressObject { - /// Creates a new `TextContent` instance from a `IngressObject`. - /// - /// # Arguments - /// `&self` - A reference to the `IngressObject`. - /// - /// # Returns - /// `TextContent` - An object containing a text representation of the object, could be a scraped URL, parsed PDF, etc. - pub async fn to_text_content( - &self, - openai_client: &Arc>, - ) -> Result { - match self { - IngressObject::Url { - url, - instructions, - category, - user_id, - } => { - let text = Self::fetch_text_from_url(url, openai_client).await?; - Ok(TextContent::new( - text, - instructions.into(), - category.into(), - None, - user_id.into(), - )) - } - IngressObject::Text { - text, - instructions, - category, - user_id, - } => Ok(TextContent::new( - text.into(), - instructions.into(), - category.into(), - None, - user_id.into(), - )), - IngressObject::File { - file_info, - instructions, - category, - user_id, - } => { - let text = Self::extract_text_from_file(file_info).await?; - Ok(TextContent::new( - text, - instructions.into(), - category.into(), - Some(file_info.to_owned()), - user_id.into(), - )) - } - } - } - - /// Get text from url, will return it as a markdown formatted string - async fn fetch_text_from_url( - url: &str, - openai_client: &Arc>, - ) -> Result { - // Use a client with timeouts and reuse - let client = reqwest::ClientBuilder::new() - .timeout(Duration::from_secs(30)) - .build()?; - let response = client.get(url).send().await?.text().await?; - - // Preallocate string with capacity - let mut structured_content = String::with_capacity(response.len() / 2); - - let document = Html::parse_document(&response); - let main_selectors = Selector::parse( - "article, main, .article-content, .post-content, .entry-content, [role='main']", - ) - .unwrap(); - - let content_element = document - .select(&main_selectors) - .next() - .or_else(|| document.select(&Selector::parse("body").unwrap()).next()) - .ok_or(AppError::NotFound("No content found".into()))?; - - // Compile selectors once - let heading_selector = Selector::parse("h1, h2, h3").unwrap(); - let paragraph_selector = Selector::parse("p").unwrap(); - - // Process content in one pass - for element in content_element.select(&heading_selector) { - let _ = writeln!( - structured_content, - "{}", - element.text().collect::().trim() - ); - } - for element in content_element.select(¶graph_selector) { - let _ = writeln!( - structured_content, - "{}", - element.text().collect::().trim() - ); - } - - let content = structured_content - .replace(|c: char| c.is_control(), " ") - .replace(" ", " "); - Self::process_web_content(content, openai_client).await - } - - pub async fn process_web_content( - content: String, - openai_client: &Arc>, - ) -> Result { - const MAX_TOKENS: usize = 122000; - const SYSTEM_PROMPT: &str = r#" - You are a precise content extractor for web pages. Your task: - - 1. Extract ONLY the main article/content from the provided text - 2. Maintain the original content - do not summarize or modify the core information - 3. Ignore peripheral content such as: - - Navigation elements - - Error messages (e.g., "JavaScript required") - - Related articles sections - - Comments - - Social media links - - Advertisement text - - FORMAT: - - Convert tags to markdown headings (#, ##, ###) - - Convert tags to markdown paragraphs - - Preserve quotes and important formatting - - Remove duplicate content - - Remove any metadata or technical artifacts - - OUTPUT RULES: - - Output ONLY the cleaned content in markdown - - Do not add any explanations or meta-commentary - - Do not add summaries or conclusions - - Do not use any XML/HTML tags in the output - "#; - - let bpe = o200k_base()?; - - // Process content in chunks if needed - let truncated_content = if bpe.encode_with_special_tokens(&content).len() > MAX_TOKENS { - Self::truncate_content(&content, MAX_TOKENS, &bpe)? - } else { - content - }; - - let request = CreateChatCompletionRequestArgs::default() - .model("gpt-4o-mini") - .temperature(0.0) - .max_tokens(16200u32) - .messages([ - ChatCompletionRequestSystemMessage::from(SYSTEM_PROMPT).into(), - ChatCompletionRequestUserMessage::from(truncated_content).into(), - ]) - .build()?; - - let response = openai_client.chat().create(request).await?; - - response - .choices - .first() - .and_then(|choice| choice.message.content.as_ref()) - .map(|content| content.to_owned()) - .ok_or(AppError::LLMParsing("No content in response".into())) - } - - fn truncate_content( - content: &str, - max_tokens: usize, - tokenizer: &CoreBPE, - ) -> Result { - // Pre-allocate with estimated size - let mut result = String::with_capacity(content.len() / 2); - let mut current_tokens = 0; - - // Process content by paragraph to maintain context - for paragraph in content.split("\n\n") { - let tokens = tokenizer.encode_with_special_tokens(paragraph).len(); - - // Check if adding paragraph exceeds limit - if current_tokens + tokens > max_tokens { - break; - } - - result.push_str(paragraph); - result.push_str("\n\n"); - current_tokens += tokens; - } - - // Ensure we return valid content - if result.is_empty() { - return Err(AppError::Processing("Content exceeds token limit".into())); - } - - Ok(result.trim_end().to_string()) - } - - /// Extracts text from a file based on its MIME type. - async fn extract_text_from_file(file_info: &FileInfo) -> Result { - match file_info.mime_type.as_str() { - "text/plain" => { - // Read the file and return its content - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "text/markdown" => { - // Read the file and return its content - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "application/pdf" => { - // TODO: Implement PDF text extraction using a crate like `pdf-extract` or `lopdf` - Err(AppError::NotFound(file_info.mime_type.clone())) - } - "image/png" | "image/jpeg" => { - // TODO: Implement OCR on image using a crate like `tesseract` - Err(AppError::NotFound(file_info.mime_type.clone())) - } - "application/octet-stream" => { - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - "text/x-rust" => { - let content = tokio::fs::read_to_string(&file_info.path).await?; - Ok(content) - } - // Handle other MIME types as needed - _ => Err(AppError::NotFound(file_info.mime_type.clone())), - } - } -} - - - - -use uuid::Uuid; - -use crate::stored_object; - -use super::file_info::FileInfo; - -stored_object!(TextContent, "text_content", { - text: String, - file_info: Option, - instructions: String, - category: String, - user_id: String -}); - -impl TextContent { - pub fn new( - text: String, - instructions: String, - category: String, - file_info: Option, - user_id: String, - ) -> Self { - let now = Utc::now(); - Self { - id: Uuid::new_v4().to_string(), - created_at: now, - updated_at: now, - text, - file_info, - instructions, - category, - user_id, - } - } -} - - -------- -Additional instructions: Id like a more clear view for the TextContent where the soruce came from. The TextContent is the logical entity which I then use in my frontend to present the data and act as the main processed entity. diff --git a/src/server/routes/html/chat/message_response_stream.rs b/src/server/routes/html/chat/message_response_stream.rs index 1837bed..8f5b123 100644 --- a/src/server/routes/html/chat/message_response_stream.rs +++ b/src/server/routes/html/chat/message_response_stream.rs @@ -139,7 +139,7 @@ pub async fn get_response_stream( // 5. Create channel for collecting complete response let (tx, mut rx) = channel::(1000); let tx_clone = tx.clone(); - let (tx_final, mut rx_final) = channel::>(1); + let (tx_final, mut rx_final) = channel::(1); // 6. Set up the collection task for DB storage let db_client = state.surreal_db_client.clone(); @@ -160,8 +160,6 @@ pub async fn get_response_stream( .map(|r| r.reference) .collect(); - let _ = tx_final.send(references.clone()).await; - let ai_message = Message::new( user_message.conversation_id, MessageRole::AI, @@ -169,6 +167,8 @@ pub async fn get_response_stream( Some(references), ); + let _ = tx_final.send(ai_message.clone()).await; + match store_item(&db_client, ai_message).await { Ok(_) => info!("Successfully stored AI message with references"), Err(e) => error!("Failed to store AI message: {:?}", e), @@ -181,7 +181,7 @@ pub async fn get_response_stream( user_message.conversation_id, MessageRole::AI, full_json, - Some(vec![]), + None, ); let _ = store_item(&db_client, ai_message).await; @@ -234,31 +234,27 @@ pub async fn get_response_stream( }) .flatten() .chain(stream::once(async move { - if let Some(references) = rx_final.recv().await { + if let Some(message) = rx_final.recv().await { // Don't send any event if references is empty - if references.is_empty() { + if message.references.as_ref().is_some_and(|x| x.is_empty()) { return Ok(Event::default().event("empty")); // This event won't be sent } // Prepare data for template #[derive(Serialize)] struct ReferenceData { - references: Vec, - user_message_id: String, + message: Message, } // Render template with references match render_template( "chat/reference_list.html", - ReferenceData { - references, - user_message_id: user_message.id, - }, + ReferenceData { message }, state.templates.clone(), ) { Ok(html) => { // Extract the String from Html - let html_string = html.0; // Convert Html to String + let html_string = html.0; // Return the rendered HTML Ok(Event::default().event("references").data(html_string)) diff --git a/src/storage/types/message.rs b/src/storage/types/message.rs index 546cfc9..ca8afbf 100644 --- a/src/storage/types/message.rs +++ b/src/storage/types/message.rs @@ -34,21 +34,4 @@ impl Message { references, } } - pub fn new_ai_message( - conversation_id: String, - id: String, - content: String, - references: Option>, - ) -> Self { - let now = Utc::now(); - Self { - id, - created_at: now, - updated_at: now, - role: MessageRole::AI, - content, - references, - conversation_id, - } - } } diff --git a/templates/chat/base.html b/templates/chat/base.html index 96710ef..a1d7bf7 100644 --- a/templates/chat/base.html +++ b/templates/chat/base.html @@ -1,10 +1,10 @@ {% extends 'body_base.html' %} {% block main %} -
+
-
+
{% include "chat/history.html" %} diff --git a/templates/chat/drawer.html b/templates/chat/drawer.html index 893674e..5f58e8e 100644 --- a/templates/chat/drawer.html +++ b/templates/chat/drawer.html @@ -1,6 +1,6 @@ \ No newline at end of file diff --git a/templates/chat/history.html b/templates/chat/history.html index 0c367e4..607aade 100644 --- a/templates/chat/history.html +++ b/templates/chat/history.html @@ -1,4 +1,4 @@ -
+
{% for message in history %} {% if message.role == "AI" %}
@@ -31,11 +31,24 @@ } }); - // Also scroll when page loads window.addEventListener('load', function () { const chatContainer = document.getElementById('chat_container'); if (chatContainer) { chatContainer.scrollTop = chatContainer.scrollHeight; } }); - \ No newline at end of file + + \ No newline at end of file diff --git a/templates/chat/new_message_form.html b/templates/chat/new_message_form.html index 226d117..1f514d5 100644 --- a/templates/chat/new_message_form.html +++ b/templates/chat/new_message_form.html @@ -1,4 +1,4 @@ -
+