diff --git a/html-router/src/middlewares/analytics_middleware.rs b/html-router/src/middlewares/analytics_middleware.rs
index 9f817a0..668f016 100644
--- a/html-router/src/middlewares/analytics_middleware.rs
+++ b/html-router/src/middlewares/analytics_middleware.rs
@@ -4,6 +4,7 @@ use axum::{
middleware::Next,
response::Response,
};
+use tracing::warn;
use common::storage::{db::ProvidesDb, types::analytics::Analytics};
@@ -23,10 +24,14 @@ where
// Only count visits/page loads for GET requests to non-asset, non-static paths
if request.method() == Method::GET && !path.starts_with("/assets") && !path.contains('.') {
if !session.get::("counted_visitor").unwrap_or(false) {
- let _ = Analytics::increment_visitors(state.db()).await;
+ if let Err(e) = Analytics::increment_visitors(state.db()).await {
+ warn!("failed to increment visitor count: {e}");
+ }
session.set("counted_visitor", true);
}
- let _ = Analytics::increment_page_loads(state.db()).await;
+ if let Err(e) = Analytics::increment_page_loads(state.db()).await {
+ warn!("failed to increment page load count: {e}");
+ }
}
next.run(request).await
}
diff --git a/html-router/src/routes/chat/message_response_stream.rs b/html-router/src/routes/chat/message_response_stream.rs
index e871a23..e05cf41 100644
--- a/html-router/src/routes/chat/message_response_stream.rs
+++ b/html-router/src/routes/chat/message_response_stream.rs
@@ -25,7 +25,8 @@ use retrieval_pipeline::{
};
use serde::{Deserialize, Serialize};
use serde_json::from_str;
-use tokio::sync::{mpsc::channel, Mutex};
+use std::sync::Mutex;
+use tokio::sync::mpsc::channel;
use tracing::{debug, error, info};
use common::storage::{
@@ -220,10 +221,11 @@ pub async fn get_response_stream(
return create_replayed_response_stream(&state, existing_ai_message);
}
- let (request, allowed_reference_ids) = match prepare_chat_request(&state, &user_message, &user, &history).await {
- Ok(result) => result,
- Err(sse) => return sse,
- };
+ let (request, allowed_reference_ids) =
+ match prepare_chat_request(&state, &user_message, &user, &history).await {
+ Ok(result) => result,
+ Err(sse) => return sse,
+ };
let openai_stream = match state.openai_client.chat().create_stream(request).await {
Ok(stream) => stream,
@@ -232,12 +234,24 @@ pub async fn get_response_stream(
}
};
- build_chat_event_stream(state, openai_stream, &user_message, user.id.clone(), allowed_reference_ids)
+ build_chat_event_stream(
+ state,
+ openai_stream,
+ &user_message,
+ user.id.clone(),
+ allowed_reference_ids,
+ )
}
fn build_chat_event_stream(
state: HtmlState,
- openai_stream: impl Stream- > + Send + 'static,
+ openai_stream: impl Stream<
+ Item = Result<
+ async_openai::types::CreateChatCompletionStreamResponse,
+ async_openai::error::OpenAIError,
+ >,
+ > + Send
+ + 'static,
user_message: &Message,
user_id: String,
allowed_reference_ids: Vec,
@@ -245,7 +259,14 @@ fn build_chat_event_stream(
let (tx, rx) = channel::(1000);
let (tx_final, mut rx_final) = channel::(1);
- spawn_storage_task(Arc::clone(&state.db), rx, tx_final, user_message, user_id, allowed_reference_ids);
+ spawn_storage_task(
+ Arc::clone(&state.db),
+ rx,
+ tx_final,
+ user_message,
+ user_id,
+ allowed_reference_ids,
+ );
let json_state = Arc::new(Mutex::new(StreamParserState::new()));
@@ -267,9 +288,10 @@ fn build_chat_event_stream(
if !content.is_empty() {
let _ = tx_storage.send(content.clone()).await;
- let mut state = json_state.lock().await;
- let display_content = state.process_chunk(&content);
- drop(state);
+ let display_content = {
+ let mut state = json_state.lock().expect("json parser mutex poisoned");
+ state.process_chunk(&content)
+ };
if !display_content.is_empty() {
yield Ok(Event::default()
.event("chat_message")
@@ -332,7 +354,10 @@ async fn prepare_chat_request(
user: &User,
history: &[Message],
) -> Result<
- (async_openai::types::CreateChatCompletionRequest, Vec),
+ (
+ async_openai::types::CreateChatCompletionRequest,
+ Vec,
+ ),
SseResponse,
> {
let rerank_lease = match state.reranker_pool.as_ref() {
@@ -356,7 +381,9 @@ async fn prepare_chat_request(
{
Ok(result) => result,
Err(_e) => {
- return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve knowledge")));
+ return Err(sse_with_keep_alive(create_error_stream(
+ "Failed to retrieve knowledge",
+ )));
}
};
@@ -374,10 +401,14 @@ async fn prepare_chat_request(
let formatted_user_message =
create_user_message_with_history(&context_json, history, &user_message.content);
let Ok(settings) = SystemSettings::get_current(&state.db).await else {
- return Err(sse_with_keep_alive(create_error_stream("Failed to retrieve system settings")));
+ return Err(sse_with_keep_alive(create_error_stream(
+ "Failed to retrieve system settings",
+ )));
};
let Ok(request) = create_chat_request(formatted_user_message, &settings) else {
- return Err(sse_with_keep_alive(create_error_stream("Failed to create chat request")));
+ return Err(sse_with_keep_alive(create_error_stream(
+ "Failed to create chat request",
+ )));
};
Ok((request, allowed_reference_ids))
diff --git a/html-router/src/routes/chat/reference_validation.rs b/html-router/src/routes/chat/reference_validation.rs
index c0e94df..b4c9903 100644
--- a/html-router/src/routes/chat/reference_validation.rs
+++ b/html-router/src/routes/chat/reference_validation.rs
@@ -1,4 +1,4 @@
-#![allow(clippy::arithmetic_side_effects, clippy::missing_docs_in_private_items)]
+#![allow(clippy::missing_docs_in_private_items)]
use std::collections::HashSet;
@@ -49,14 +49,24 @@ pub(crate) struct ReferenceReasonStats {
impl ReferenceReasonStats {
fn record(&mut self, reason: &InvalidReferenceReason) {
match reason {
- InvalidReferenceReason::Empty => self.empty += 1,
- InvalidReferenceReason::UnsupportedPrefix => self.unsupported_prefix += 1,
- InvalidReferenceReason::MalformedUuid => self.malformed_uuid += 1,
- InvalidReferenceReason::Duplicate => self.duplicate += 1,
- InvalidReferenceReason::NotInContext => self.not_in_context += 1,
- InvalidReferenceReason::NotFound => self.not_found += 1,
- InvalidReferenceReason::WrongUser => self.wrong_user += 1,
- InvalidReferenceReason::OverLimit => self.over_limit += 1,
+ InvalidReferenceReason::Empty => self.empty = self.empty.saturating_add(1),
+ InvalidReferenceReason::UnsupportedPrefix => {
+ self.unsupported_prefix = self.unsupported_prefix.saturating_add(1)
+ }
+ InvalidReferenceReason::MalformedUuid => {
+ self.malformed_uuid = self.malformed_uuid.saturating_add(1)
+ }
+ InvalidReferenceReason::Duplicate => self.duplicate = self.duplicate.saturating_add(1),
+ InvalidReferenceReason::NotInContext => {
+ self.not_in_context = self.not_in_context.saturating_add(1)
+ }
+ InvalidReferenceReason::NotFound => self.not_found = self.not_found.saturating_add(1),
+ InvalidReferenceReason::WrongUser => {
+ self.wrong_user = self.wrong_user.saturating_add(1)
+ }
+ InvalidReferenceReason::OverLimit => {
+ self.over_limit = self.over_limit.saturating_add(1)
+ }
}
}
}
diff --git a/retrieval-pipeline/src/pipeline/config.rs b/retrieval-pipeline/src/pipeline/config.rs
index 733c2d5..cee50eb 100644
--- a/retrieval-pipeline/src/pipeline/config.rs
+++ b/retrieval-pipeline/src/pipeline/config.rs
@@ -285,5 +285,3 @@ const fn default_chunk_rrf_vector_weight() -> f32 {
const fn default_chunk_rrf_fts_weight() -> f32 {
1.0
}
-
-