design: neobrutalist_theme into main

This commit is contained in:
Per Stark
2025-09-17 10:00:55 +02:00
parent 62d909bb7e
commit 6ea51095e8
57 changed files with 1791 additions and 951 deletions

View File

@@ -1,3 +1,3 @@
pub mod db;
pub mod types;
pub mod store;
pub mod types;

View File

@@ -3,8 +3,8 @@ use std::sync::Arc;
use anyhow::{anyhow, Result as AnyResult};
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use futures::stream::BoxStream;
use futures::{StreamExt, TryStreamExt};
use object_store::local::LocalFileSystem;
use object_store::{path::Path as ObjPath, ObjectStore};
@@ -26,12 +26,12 @@ pub async fn build_store(prefix: &Path, cfg: &AppConfig) -> object_store::Result
match cfg.storage {
StorageKind::Local => {
if !prefix.exists() {
tokio::fs::create_dir_all(prefix)
.await
.map_err(|e| object_store::Error::Generic {
tokio::fs::create_dir_all(prefix).await.map_err(|e| {
object_store::Error::Generic {
store: "LocalFileSystem",
source: e.into(),
})?;
}
})?;
}
let store = LocalFileSystem::new_with_prefix(prefix)?;
Ok(Arc::new(store))
@@ -46,7 +46,8 @@ pub fn resolve_base_dir(cfg: &AppConfig) -> PathBuf {
if cfg.data_dir.starts_with('/') {
PathBuf::from(&cfg.data_dir)
} else {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
std::env::current_dir()
.unwrap_or_else(|_| PathBuf::from("."))
.join(&cfg.data_dir)
}
}
@@ -64,7 +65,12 @@ pub async fn build_store_root(cfg: &AppConfig) -> object_store::Result<DynStore>
///
/// Prefer [`put_bytes_at`] for location-based writes that do not need to compute
/// a separate filesystem prefix.
pub async fn put_bytes(prefix: &Path, file_name: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> {
pub async fn put_bytes(
prefix: &Path,
file_name: &str,
data: Bytes,
cfg: &AppConfig,
) -> object_store::Result<()> {
let store = build_store(prefix, cfg).await?;
let payload = object_store::PutPayload::from_bytes(data);
store.put(&ObjPath::from(file_name), payload).await?;
@@ -75,7 +81,11 @@ pub async fn put_bytes(prefix: &Path, file_name: &str, data: Bytes, cfg: &AppCon
///
/// The store root is taken from `AppConfig::data_dir` for the local backend.
/// This performs an atomic write as guaranteed by `object_store`.
pub async fn put_bytes_at(location: &str, data: Bytes, cfg: &AppConfig) -> object_store::Result<()> {
pub async fn put_bytes_at(
location: &str,
data: Bytes,
cfg: &AppConfig,
) -> object_store::Result<()> {
let store = build_store_root(cfg).await?;
let payload = object_store::PutPayload::from_bytes(data);
store.put(&ObjPath::from(location), payload).await?;
@@ -85,7 +95,11 @@ pub async fn put_bytes_at(location: &str, data: Bytes, cfg: &AppConfig) -> objec
/// Read bytes from `file_name` within a filesystem `prefix` using the configured store.
///
/// Prefer [`get_bytes_at`] for location-based reads.
pub async fn get_bytes(prefix: &Path, file_name: &str, cfg: &AppConfig) -> object_store::Result<Bytes> {
pub async fn get_bytes(
prefix: &Path,
file_name: &str,
cfg: &AppConfig,
) -> object_store::Result<Bytes> {
let store = build_store(prefix, cfg).await?;
let r = store.get(&ObjPath::from(file_name)).await?;
let b = r.bytes().await?;
@@ -105,7 +119,10 @@ pub async fn get_bytes_at(location: &str, cfg: &AppConfig) -> object_store::Resu
///
/// Returns a fallible `BoxStream` of `Bytes`, suitable for use with
/// `axum::body::Body::from_stream` to stream responses without buffering.
pub async fn get_stream_at(location: &str, cfg: &AppConfig) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
pub async fn get_stream_at(
location: &str,
cfg: &AppConfig,
) -> object_store::Result<BoxStream<'static, object_store::Result<Bytes>>> {
let store = build_store_root(cfg).await?;
let r = store.get(&ObjPath::from(location)).await?;
Ok(r.into_stream())
@@ -119,7 +136,10 @@ pub async fn delete_prefix(prefix: &Path, cfg: &AppConfig) -> object_store::Resu
let store = build_store(prefix, cfg).await?;
// list everything and delete
let locations = store.list(None).map_ok(|m| m.location).boxed();
store.delete_stream(locations).try_collect::<Vec<_>>().await?;
store
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;
// Best effort remove the directory itself
if tokio::fs::try_exists(prefix).await.unwrap_or(false) {
let _ = tokio::fs::remove_dir_all(prefix).await;
@@ -134,8 +154,14 @@ pub async fn delete_prefix(prefix: &Path, cfg: &AppConfig) -> object_store::Resu
pub async fn delete_prefix_at(prefix: &str, cfg: &AppConfig) -> object_store::Result<()> {
let store = build_store_root(cfg).await?;
let prefix_path = ObjPath::from(prefix);
let locations = store.list(Some(&prefix_path)).map_ok(|m| m.location).boxed();
store.delete_stream(locations).try_collect::<Vec<_>>().await?;
let locations = store
.list(Some(&prefix_path))
.map_ok(|m| m.location)
.boxed();
store
.delete_stream(locations)
.try_collect::<Vec<_>>()
.await?;
// Best effort remove empty directory on disk for local storage
let base_dir = resolve_base_dir(cfg).join(prefix);
if tokio::fs::try_exists(&base_dir).await.unwrap_or(false) {
@@ -209,12 +235,16 @@ mod tests {
let location = format!("{}/{}", &location_prefix, file_name);
let payload = Bytes::from_static(b"hello world");
put_bytes_at(&location, payload.clone(), &cfg).await.expect("put");
put_bytes_at(&location, payload.clone(), &cfg)
.await
.expect("put");
let got = get_bytes_at(&location, &cfg).await.expect("get");
assert_eq!(got.as_ref(), payload.as_ref());
// Delete the whole prefix and ensure retrieval fails
delete_prefix_at(&location_prefix, &cfg).await.expect("delete prefix");
delete_prefix_at(&location_prefix, &cfg)
.await
.expect("delete prefix");
assert!(get_bytes_at(&location, &cfg).await.is_err());
let _ = tokio::fs::remove_dir_all(&base).await;
@@ -244,12 +274,9 @@ mod tests {
assert_eq!(combined, content);
delete_prefix_at(
&split_object_path(&location).unwrap().0,
&cfg,
)
.await
.ok();
delete_prefix_at(&split_object_path(&location).unwrap().0, &cfg)
.await
.ok();
let _ = tokio::fs::remove_dir_all(&base).await;
}

View File

@@ -1,5 +1,6 @@
use axum_typed_multipart::FieldData;
use mime_guess::from_path;
use object_store::Error as ObjectStoreError;
use sha2::{Digest, Sha256};
use std::{
io::{BufReader, Read},
@@ -7,7 +8,6 @@ use std::{
};
use tempfile::NamedTempFile;
use thiserror::Error;
use object_store::Error as ObjectStoreError;
// futures imports no longer needed here after abstraction
use tracing::info;
use uuid::Uuid;
@@ -90,8 +90,7 @@ impl FileInfo {
updated_at: now,
file_name,
sha256,
path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config)
.await?,
path: Self::persist_file(&uuid, file, &sanitized_file_name, user_id, config).await?,
mime_type: Self::guess_mime_type(Path::new(&sanitized_file_name)),
user_id: user_id.to_string(),
};
@@ -248,7 +247,10 @@ impl FileInfo {
store::delete_prefix_at(&parent_prefix, config)
.await
.map_err(|e| AppError::from(anyhow::anyhow!(e)))?;
info!("Removed object prefix {} and its contents via object_store", parent_prefix);
info!(
"Removed object prefix {} and its contents via object_store",
parent_prefix
);
// Delete the FileInfo from the database
db_client.delete_item::<FileInfo>(id).await?;
@@ -276,9 +278,9 @@ impl FileInfo {
#[cfg(test)]
mod tests {
use super::*;
use crate::utils::config::StorageKind;
use axum::http::HeaderMap;
use axum_typed_multipart::FieldMetadata;
use crate::utils::config::StorageKind;
use std::io::Write;
use tempfile::NamedTempFile;
@@ -657,9 +659,22 @@ mod tests {
// Create and persist a test file via FileInfo::new
let user_id = "user123";
let cfg = AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local };
let cfg = AppConfig {
data_dir: "./data".to_string(),
openai_api_key: "".to_string(),
surrealdb_address: "".to_string(),
surrealdb_username: "".to_string(),
surrealdb_password: "".to_string(),
surrealdb_namespace: "".to_string(),
surrealdb_database: "".to_string(),
http_port: 0,
openai_base_url: "".to_string(),
storage: crate::utils::config::StorageKind::Local,
};
let temp = create_test_file(b"test content", "test_file.txt");
let file_info = FileInfo::new(temp, &db, user_id, &cfg).await.expect("create file");
let file_info = FileInfo::new(temp, &db, user_id, &cfg)
.await
.expect("create file");
// Delete the file
let delete_result = FileInfo::delete_by_id(&file_info.id, &db, &cfg).await;
@@ -695,7 +710,23 @@ mod tests {
.expect("Failed to start in-memory surrealdb");
// Try to delete a file that doesn't exist
let result = FileInfo::delete_by_id("nonexistent_id", &db, &AppConfig { data_dir: "./data".to_string(), openai_api_key: "".to_string(), surrealdb_address: "".to_string(), surrealdb_username: "".to_string(), surrealdb_password: "".to_string(), surrealdb_namespace: "".to_string(), surrealdb_database: "".to_string(), http_port: 0, openai_base_url: "".to_string(), storage: crate::utils::config::StorageKind::Local }).await;
let result = FileInfo::delete_by_id(
"nonexistent_id",
&db,
&AppConfig {
data_dir: "./data".to_string(),
openai_api_key: "".to_string(),
surrealdb_address: "".to_string(),
surrealdb_username: "".to_string(),
surrealdb_password: "".to_string(),
surrealdb_namespace: "".to_string(),
surrealdb_database: "".to_string(),
http_port: 0,
openai_base_url: "".to_string(),
storage: crate::utils::config::StorageKind::Local,
},
)
.await;
// Should fail with FileNotFound error
assert!(result.is_err());

View File

@@ -4,11 +4,17 @@ use axum_session_auth::Authentication;
use surrealdb::{engine::any::Any, Surreal};
use uuid::Uuid;
use super::text_chunk::TextChunk;
use super::{
conversation::Conversation, ingestion_task::IngestionTask, knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship, system_settings::SystemSettings,
conversation::Conversation,
ingestion_task::{IngestionTask, MAX_ATTEMPTS},
knowledge_entity::KnowledgeEntity,
knowledge_relationship::KnowledgeRelationship,
system_settings::SystemSettings,
text_content::TextContent,
};
use chrono::Duration;
use futures::try_join;
#[derive(Deserialize)]
pub struct CategoryResponse {
@@ -61,7 +67,93 @@ fn validate_timezone(input: &str) -> String {
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DashboardStats {
pub total_documents: i64,
pub new_documents_week: i64,
pub total_entities: i64,
pub new_entities_week: i64,
pub total_conversations: i64,
pub new_conversations_week: i64,
pub total_text_chunks: i64,
pub new_text_chunks_week: i64,
}
#[derive(Deserialize)]
struct CountResult {
count: i64,
}
impl User {
async fn count_total<T: crate::storage::types::StoredObject>(
db: &SurrealDbClient,
user_id: &str,
) -> Result<i64, AppError> {
let result: Option<CountResult> = db
.client
.query("SELECT count() as count FROM type::table($table) WHERE user_id = $user_id GROUP ALL")
.bind(("table", T::table_name()))
.bind(("user_id", user_id.to_string()))
.await?
.take(0)?;
Ok(result.map(|r| r.count).unwrap_or(0))
}
async fn count_since<T: crate::storage::types::StoredObject>(
db: &SurrealDbClient,
user_id: &str,
since: chrono::DateTime<chrono::Utc>,
) -> Result<i64, AppError> {
let result: Option<CountResult> = db
.client
.query(
"SELECT count() as count FROM type::table($table) WHERE user_id = $user_id AND created_at >= $since GROUP ALL",
)
.bind(("table", T::table_name()))
.bind(("user_id", user_id.to_string()))
.bind(("since", since))
.await?
.take(0)?;
Ok(result.map(|r| r.count).unwrap_or(0))
}
pub async fn get_dashboard_stats(
user_id: &str,
db: &SurrealDbClient,
) -> Result<DashboardStats, AppError> {
let since = chrono::Utc::now() - Duration::days(7);
let (
total_documents,
new_documents_week,
total_entities,
new_entities_week,
total_conversations,
new_conversations_week,
total_text_chunks,
new_text_chunks_week,
) = try_join!(
Self::count_total::<TextContent>(db, user_id),
Self::count_since::<TextContent>(db, user_id, since),
Self::count_total::<KnowledgeEntity>(db, user_id),
Self::count_since::<KnowledgeEntity>(db, user_id, since),
Self::count_total::<Conversation>(db, user_id),
Self::count_since::<Conversation>(db, user_id, since),
Self::count_total::<TextChunk>(db, user_id),
Self::count_since::<TextChunk>(db, user_id, since)
)?;
Ok(DashboardStats {
total_documents,
new_documents_week,
total_entities,
new_entities_week,
total_conversations,
new_conversations_week,
total_text_chunks,
new_text_chunks_week,
})
}
pub async fn create_new(
email: String,
password: String,
@@ -444,17 +536,17 @@ impl User {
"SELECT * FROM type::table($table)
WHERE user_id = $user_id
AND (
status = 'Created'
status.name = 'Created'
OR (
status.InProgress != NONE
AND status.InProgress.attempts < $max_attempts
status.name = 'InProgress'
AND status.attempts < $max_attempts
)
)
ORDER BY created_at DESC",
)
.bind(("table", IngestionTask::table_name()))
.bind(("user_id", user_id.to_owned()))
.bind(("max_attempts", 3))
.bind(("max_attempts", MAX_ATTEMPTS))
.await?
.take(0)?;
@@ -511,6 +603,9 @@ impl User {
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::types::ingestion_payload::IngestionPayload;
use crate::storage::types::ingestion_task::{IngestionTask, IngestionTaskStatus, MAX_ATTEMPTS};
use std::collections::HashSet;
// Helper function to set up a test database with SystemSettings
async fn setup_test_db() -> SurrealDbClient {
@@ -596,6 +691,75 @@ mod tests {
assert!(nonexistent.is_err());
}
#[tokio::test]
async fn test_get_unfinished_ingestion_tasks_filters_correctly() {
let db = setup_test_db().await;
let user_id = "unfinished_user";
let other_user_id = "other_user";
let payload = IngestionPayload::Text {
text: "Test".to_string(),
context: "Context".to_string(),
category: "Category".to_string(),
user_id: user_id.to_string(),
};
let created_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
db.store_item(created_task.clone())
.await
.expect("Failed to store created task");
let mut in_progress_allowed =
IngestionTask::new(payload.clone(), user_id.to_string()).await;
in_progress_allowed.status = IngestionTaskStatus::InProgress {
attempts: 1,
last_attempt: chrono::Utc::now(),
};
db.store_item(in_progress_allowed.clone())
.await
.expect("Failed to store in-progress task");
let mut in_progress_blocked =
IngestionTask::new(payload.clone(), user_id.to_string()).await;
in_progress_blocked.status = IngestionTaskStatus::InProgress {
attempts: MAX_ATTEMPTS,
last_attempt: chrono::Utc::now(),
};
db.store_item(in_progress_blocked.clone())
.await
.expect("Failed to store blocked task");
let mut completed_task = IngestionTask::new(payload.clone(), user_id.to_string()).await;
completed_task.status = IngestionTaskStatus::Completed;
db.store_item(completed_task.clone())
.await
.expect("Failed to store completed task");
let other_payload = IngestionPayload::Text {
text: "Other".to_string(),
context: "Context".to_string(),
category: "Category".to_string(),
user_id: other_user_id.to_string(),
};
let other_task = IngestionTask::new(other_payload, other_user_id.to_string()).await;
db.store_item(other_task)
.await
.expect("Failed to store other user task");
let unfinished = User::get_unfinished_ingestion_tasks(user_id, &db)
.await
.expect("Failed to fetch unfinished tasks");
let unfinished_ids: HashSet<String> =
unfinished.iter().map(|task| task.id.clone()).collect();
assert!(unfinished_ids.contains(&created_task.id));
assert!(unfinished_ids.contains(&in_progress_allowed.id));
assert!(!unfinished_ids.contains(&in_progress_blocked.id));
assert!(!unfinished_ids.contains(&completed_task.id));
assert_eq!(unfinished_ids.len(), 2);
}
#[tokio::test]
async fn test_find_by_email() {
// Setup test database