From 9d5e7cd7949ade22475d8e51d70ba121448f0128 Mon Sep 17 00:00:00 2001 From: Per Stark Date: Thu, 28 May 2026 19:58:14 +0200 Subject: [PATCH] chore: improved error handling --- .../src/middlewares/analytics_middleware.rs | 9 ++- .../routes/chat/message_response_stream.rs | 61 ++++++++++++++----- .../src/routes/chat/reference_validation.rs | 28 ++++++--- retrieval-pipeline/src/pipeline/config.rs | 2 - 4 files changed, 72 insertions(+), 28 deletions(-) diff --git a/html-router/src/middlewares/analytics_middleware.rs b/html-router/src/middlewares/analytics_middleware.rs index 9f817a0..668f016 100644 --- a/html-router/src/middlewares/analytics_middleware.rs +++ b/html-router/src/middlewares/analytics_middleware.rs @@ -4,6 +4,7 @@ use axum::{ middleware::Next, response::Response, }; +use tracing::warn; use common::storage::{db::ProvidesDb, types::analytics::Analytics}; @@ -23,10 +24,14 @@ where // Only count visits/page loads for GET requests to non-asset, non-static paths if request.method() == Method::GET && !path.starts_with("/assets") && !path.contains('.') { if !session.get::("counted_visitor").unwrap_or(false) { - let _ = Analytics::increment_visitors(state.db()).await; + if let Err(e) = Analytics::increment_visitors(state.db()).await { + warn!("failed to increment visitor count: {e}"); + } session.set("counted_visitor", true); } - let _ = Analytics::increment_page_loads(state.db()).await; + if let Err(e) = Analytics::increment_page_loads(state.db()).await { + warn!("failed to increment page load count: {e}"); + } } next.run(request).await } diff --git a/html-router/src/routes/chat/message_response_stream.rs b/html-router/src/routes/chat/message_response_stream.rs index e871a23..e05cf41 100644 --- a/html-router/src/routes/chat/message_response_stream.rs +++ b/html-router/src/routes/chat/message_response_stream.rs @@ -25,7 +25,8 @@ use retrieval_pipeline::{ }; use serde::{Deserialize, Serialize}; use serde_json::from_str; -use tokio::sync::{mpsc::channel, Mutex}; +use std::sync::Mutex; +use tokio::sync::mpsc::channel; use tracing::{debug, error, info}; use common::storage::{ @@ -220,10 +221,11 @@ pub async fn get_response_stream( return create_replayed_response_stream(&state, existing_ai_message); } - let (request, allowed_reference_ids) = match prepare_chat_request(&state, &user_message, &user, &history).await { - Ok(result) => result, - Err(sse) => return sse, - }; + let (request, allowed_reference_ids) = + match prepare_chat_request(&state, &user_message, &user, &history).await { + Ok(result) => result, + Err(sse) => return sse, + }; let openai_stream = match state.openai_client.chat().create_stream(request).await { Ok(stream) => stream, @@ -232,12 +234,24 @@ pub async fn get_response_stream( } }; - build_chat_event_stream(state, openai_stream, &user_message, user.id.clone(), allowed_reference_ids) + build_chat_event_stream( + state, + openai_stream, + &user_message, + user.id.clone(), + allowed_reference_ids, + ) } fn build_chat_event_stream( state: HtmlState, - openai_stream: impl Stream> + Send + 'static, + openai_stream: impl Stream< + Item = Result< + async_openai::types::CreateChatCompletionStreamResponse, + async_openai::error::OpenAIError, + >, + > + Send + + 'static, user_message: &Message, user_id: String, allowed_reference_ids: Vec, @@ -245,7 +259,14 @@ fn build_chat_event_stream( let (tx, rx) = channel::(1000); let (tx_final, mut rx_final) = channel::(1); - spawn_storage_task(Arc::clone(&state.db), rx, tx_final, user_message, user_id, allowed_reference_ids); + spawn_storage_task( + Arc::clone(&state.db), + rx, + tx_final, + user_message, + user_id, + allowed_reference_ids, + ); let json_state = Arc::new(Mutex::new(StreamParserState::new())); @@ -267,9 +288,10 @@ fn build_chat_event_stream( if !content.is_empty() { let _ = tx_storage.send(content.clone()).await; - let mut state = json_state.lock().await; - let display_content = state.process_chunk(&content); - drop(state); + let display_content = { + let mut state = json_state.lock().expect("json parser mutex poisoned"); + state.process_chunk(&content) + }; if !display_content.is_empty() { yield Ok(Event::default() .event("chat_message") @@ -332,7 +354,10 @@ async fn prepare_chat_request( user: &User, history: &[Message], ) -> Result< - (async_openai::types::CreateChatCompletionRequest, Vec), + ( + async_openai::types::CreateChatCompletionRequest, + Vec, + ), SseResponse, > { let rerank_lease = match state.reranker_pool.as_ref() { @@ -356,7 +381,9 @@ async fn prepare_chat_request( { Ok(result) => result, Err(_e) => { - return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve knowledge"))); + return Err(sse_with_keep_alive(create_error_stream( + "Failed to retrieve knowledge", + ))); } }; @@ -374,10 +401,14 @@ async fn prepare_chat_request( let formatted_user_message = create_user_message_with_history(&context_json, history, &user_message.content); let Ok(settings) = SystemSettings::get_current(&state.db).await else { - return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve system settings"))); + return Err(sse_with_keep_alive(create_error_stream( + "Failed to retrieve system settings", + ))); }; let Ok(request) = create_chat_request(formatted_user_message, &settings) else { - return Err(sse_with_keep_alive(create_error_stream("Failed to create chat request"))); + return Err(sse_with_keep_alive(create_error_stream( + "Failed to create chat request", + ))); }; Ok((request, allowed_reference_ids)) diff --git a/html-router/src/routes/chat/reference_validation.rs b/html-router/src/routes/chat/reference_validation.rs index c0e94df..b4c9903 100644 --- a/html-router/src/routes/chat/reference_validation.rs +++ b/html-router/src/routes/chat/reference_validation.rs @@ -1,4 +1,4 @@ -#![allow(clippy::arithmetic_side_effects, clippy::missing_docs_in_private_items)] +#![allow(clippy::missing_docs_in_private_items)] use std::collections::HashSet; @@ -49,14 +49,24 @@ pub(crate) struct ReferenceReasonStats { impl ReferenceReasonStats { fn record(&mut self, reason: &InvalidReferenceReason) { match reason { - InvalidReferenceReason::Empty => self.empty += 1, - InvalidReferenceReason::UnsupportedPrefix => self.unsupported_prefix += 1, - InvalidReferenceReason::MalformedUuid => self.malformed_uuid += 1, - InvalidReferenceReason::Duplicate => self.duplicate += 1, - InvalidReferenceReason::NotInContext => self.not_in_context += 1, - InvalidReferenceReason::NotFound => self.not_found += 1, - InvalidReferenceReason::WrongUser => self.wrong_user += 1, - InvalidReferenceReason::OverLimit => self.over_limit += 1, + InvalidReferenceReason::Empty => self.empty = self.empty.saturating_add(1), + InvalidReferenceReason::UnsupportedPrefix => { + self.unsupported_prefix = self.unsupported_prefix.saturating_add(1) + } + InvalidReferenceReason::MalformedUuid => { + self.malformed_uuid = self.malformed_uuid.saturating_add(1) + } + InvalidReferenceReason::Duplicate => self.duplicate = self.duplicate.saturating_add(1), + InvalidReferenceReason::NotInContext => { + self.not_in_context = self.not_in_context.saturating_add(1) + } + InvalidReferenceReason::NotFound => self.not_found = self.not_found.saturating_add(1), + InvalidReferenceReason::WrongUser => { + self.wrong_user = self.wrong_user.saturating_add(1) + } + InvalidReferenceReason::OverLimit => { + self.over_limit = self.over_limit.saturating_add(1) + } } } } diff --git a/retrieval-pipeline/src/pipeline/config.rs b/retrieval-pipeline/src/pipeline/config.rs index 733c2d5..cee50eb 100644 --- a/retrieval-pipeline/src/pipeline/config.rs +++ b/retrieval-pipeline/src/pipeline/config.rs @@ -285,5 +285,3 @@ const fn default_chunk_rrf_vector_weight() -> f32 { const fn default_chunk_rrf_fts_weight() -> f32 { 1.0 } - -