chore: improved error handling

This commit is contained in:
Per Stark
2026-05-28 19:58:14 +02:00
parent 30bb59f243
commit 9d5e7cd794
4 changed files with 72 additions and 28 deletions
@@ -4,6 +4,7 @@ use axum::{
middleware::Next, middleware::Next,
response::Response, response::Response,
}; };
use tracing::warn;
use common::storage::{db::ProvidesDb, types::analytics::Analytics}; use common::storage::{db::ProvidesDb, types::analytics::Analytics};
@@ -23,10 +24,14 @@ where
// Only count visits/page loads for GET requests to non-asset, non-static paths // Only count visits/page loads for GET requests to non-asset, non-static paths
if request.method() == Method::GET && !path.starts_with("/assets") && !path.contains('.') { if request.method() == Method::GET && !path.starts_with("/assets") && !path.contains('.') {
if !session.get::<bool>("counted_visitor").unwrap_or(false) { if !session.get::<bool>("counted_visitor").unwrap_or(false) {
let _ = Analytics::increment_visitors(state.db()).await; if let Err(e) = Analytics::increment_visitors(state.db()).await {
warn!("failed to increment visitor count: {e}");
}
session.set("counted_visitor", true); session.set("counted_visitor", true);
} }
let _ = Analytics::increment_page_loads(state.db()).await; if let Err(e) = Analytics::increment_page_loads(state.db()).await {
warn!("failed to increment page load count: {e}");
}
} }
next.run(request).await next.run(request).await
} }
@@ -25,7 +25,8 @@ use retrieval_pipeline::{
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::from_str; use serde_json::from_str;
use tokio::sync::{mpsc::channel, Mutex}; use std::sync::Mutex;
use tokio::sync::mpsc::channel;
use tracing::{debug, error, info}; use tracing::{debug, error, info};
use common::storage::{ use common::storage::{
@@ -220,10 +221,11 @@ pub async fn get_response_stream(
return create_replayed_response_stream(&state, existing_ai_message); return create_replayed_response_stream(&state, existing_ai_message);
} }
let (request, allowed_reference_ids) = match prepare_chat_request(&state, &user_message, &user, &history).await { let (request, allowed_reference_ids) =
Ok(result) => result, match prepare_chat_request(&state, &user_message, &user, &history).await {
Err(sse) => return sse, Ok(result) => result,
}; Err(sse) => return sse,
};
let openai_stream = match state.openai_client.chat().create_stream(request).await { let openai_stream = match state.openai_client.chat().create_stream(request).await {
Ok(stream) => stream, Ok(stream) => stream,
@@ -232,12 +234,24 @@ pub async fn get_response_stream(
} }
}; };
build_chat_event_stream(state, openai_stream, &user_message, user.id.clone(), allowed_reference_ids) build_chat_event_stream(
state,
openai_stream,
&user_message,
user.id.clone(),
allowed_reference_ids,
)
} }
fn build_chat_event_stream( fn build_chat_event_stream(
state: HtmlState, state: HtmlState,
openai_stream: impl Stream<Item = Result<async_openai::types::CreateChatCompletionStreamResponse, async_openai::error::OpenAIError>> + Send + 'static, openai_stream: impl Stream<
Item = Result<
async_openai::types::CreateChatCompletionStreamResponse,
async_openai::error::OpenAIError,
>,
> + Send
+ 'static,
user_message: &Message, user_message: &Message,
user_id: String, user_id: String,
allowed_reference_ids: Vec<String>, allowed_reference_ids: Vec<String>,
@@ -245,7 +259,14 @@ fn build_chat_event_stream(
let (tx, rx) = channel::<String>(1000); let (tx, rx) = channel::<String>(1000);
let (tx_final, mut rx_final) = channel::<Message>(1); let (tx_final, mut rx_final) = channel::<Message>(1);
spawn_storage_task(Arc::clone(&state.db), rx, tx_final, user_message, user_id, allowed_reference_ids); spawn_storage_task(
Arc::clone(&state.db),
rx,
tx_final,
user_message,
user_id,
allowed_reference_ids,
);
let json_state = Arc::new(Mutex::new(StreamParserState::new())); let json_state = Arc::new(Mutex::new(StreamParserState::new()));
@@ -267,9 +288,10 @@ fn build_chat_event_stream(
if !content.is_empty() { if !content.is_empty() {
let _ = tx_storage.send(content.clone()).await; let _ = tx_storage.send(content.clone()).await;
let mut state = json_state.lock().await; let display_content = {
let display_content = state.process_chunk(&content); let mut state = json_state.lock().expect("json parser mutex poisoned");
drop(state); state.process_chunk(&content)
};
if !display_content.is_empty() { if !display_content.is_empty() {
yield Ok(Event::default() yield Ok(Event::default()
.event("chat_message") .event("chat_message")
@@ -332,7 +354,10 @@ async fn prepare_chat_request(
user: &User, user: &User,
history: &[Message], history: &[Message],
) -> Result< ) -> Result<
(async_openai::types::CreateChatCompletionRequest, Vec<String>), (
async_openai::types::CreateChatCompletionRequest,
Vec<String>,
),
SseResponse, SseResponse,
> { > {
let rerank_lease = match state.reranker_pool.as_ref() { let rerank_lease = match state.reranker_pool.as_ref() {
@@ -356,7 +381,9 @@ async fn prepare_chat_request(
{ {
Ok(result) => result, Ok(result) => result,
Err(_e) => { Err(_e) => {
return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve knowledge"))); return Err(sse_with_keep_alive(create_error_stream(
"Failed to retrieve knowledge",
)));
} }
}; };
@@ -374,10 +401,14 @@ async fn prepare_chat_request(
let formatted_user_message = let formatted_user_message =
create_user_message_with_history(&context_json, history, &user_message.content); create_user_message_with_history(&context_json, history, &user_message.content);
let Ok(settings) = SystemSettings::get_current(&state.db).await else { let Ok(settings) = SystemSettings::get_current(&state.db).await else {
return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve system settings"))); return Err(sse_with_keep_alive(create_error_stream(
"Failed to retrieve system settings",
)));
}; };
let Ok(request) = create_chat_request(formatted_user_message, &settings) else { let Ok(request) = create_chat_request(formatted_user_message, &settings) else {
return Err(sse_with_keep_alive(create_error_stream("Failed to create chat request"))); return Err(sse_with_keep_alive(create_error_stream(
"Failed to create chat request",
)));
}; };
Ok((request, allowed_reference_ids)) Ok((request, allowed_reference_ids))
@@ -1,4 +1,4 @@
#![allow(clippy::arithmetic_side_effects, clippy::missing_docs_in_private_items)] #![allow(clippy::missing_docs_in_private_items)]
use std::collections::HashSet; use std::collections::HashSet;
@@ -49,14 +49,24 @@ pub(crate) struct ReferenceReasonStats {
impl ReferenceReasonStats { impl ReferenceReasonStats {
fn record(&mut self, reason: &InvalidReferenceReason) { fn record(&mut self, reason: &InvalidReferenceReason) {
match reason { match reason {
InvalidReferenceReason::Empty => self.empty += 1, InvalidReferenceReason::Empty => self.empty = self.empty.saturating_add(1),
InvalidReferenceReason::UnsupportedPrefix => self.unsupported_prefix += 1, InvalidReferenceReason::UnsupportedPrefix => {
InvalidReferenceReason::MalformedUuid => self.malformed_uuid += 1, self.unsupported_prefix = self.unsupported_prefix.saturating_add(1)
InvalidReferenceReason::Duplicate => self.duplicate += 1, }
InvalidReferenceReason::NotInContext => self.not_in_context += 1, InvalidReferenceReason::MalformedUuid => {
InvalidReferenceReason::NotFound => self.not_found += 1, self.malformed_uuid = self.malformed_uuid.saturating_add(1)
InvalidReferenceReason::WrongUser => self.wrong_user += 1, }
InvalidReferenceReason::OverLimit => self.over_limit += 1, InvalidReferenceReason::Duplicate => self.duplicate = self.duplicate.saturating_add(1),
InvalidReferenceReason::NotInContext => {
self.not_in_context = self.not_in_context.saturating_add(1)
}
InvalidReferenceReason::NotFound => self.not_found = self.not_found.saturating_add(1),
InvalidReferenceReason::WrongUser => {
self.wrong_user = self.wrong_user.saturating_add(1)
}
InvalidReferenceReason::OverLimit => {
self.over_limit = self.over_limit.saturating_add(1)
}
} }
} }
} }
@@ -285,5 +285,3 @@ const fn default_chunk_rrf_vector_weight() -> f32 {
const fn default_chunk_rrf_fts_weight() -> f32 { const fn default_chunk_rrf_fts_weight() -> f32 {
1.0 1.0
} }