chore: dep updates & kv-mem separation to test feature

docker builder update
This commit is contained in:
Per Stark
2026-02-15 08:22:54 +01:00
parent b0b01182d7
commit 1490852a09
15 changed files with 1737 additions and 1664 deletions

View File

@@ -41,5 +41,8 @@ common = { path = "../common" }
retrieval-pipeline = { path = "../retrieval-pipeline" }
json-stream-parser = { path = "../json-stream-parser" }
[dev-dependencies]
common = { path = "../common", features = ["test-utils"] }
[build-dependencies]
minijinja-embed = { version = "2.8.0" }

View File

@@ -1,4 +1,4 @@
/*! tailwindcss v4.1.16 | MIT License | https://tailwindcss.com */
/*! tailwindcss v4.1.18 | MIT License | https://tailwindcss.com */
@layer properties;
@layer theme, base, components, utilities;
@layer theme {
@@ -44,6 +44,7 @@
--leading-snug: 1.375;
--leading-relaxed: 1.625;
--ease-out: cubic-bezier(0, 0, 0.2, 1);
--ease-in-out: cubic-bezier(0.4, 0, 0.2, 1);
--animate-pulse: pulse 2s cubic-bezier(0.4, 0, 0.6, 1) infinite;
--default-transition-duration: 150ms;
--default-transition-timing-function: cubic-bezier(0.4, 0, 0.2, 1);

View File

@@ -6,7 +6,7 @@ use async_stream::stream;
use axum::{
extract::{Query, State},
response::{
sse::{Event, KeepAlive},
sse::{Event, KeepAlive, KeepAliveStream},
Sse,
},
};
@@ -42,10 +42,19 @@ use crate::{html_state::HtmlState, AuthSessionType};
use super::reference_validation::{collect_reference_ids_from_retrieval, validate_references};
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>;
type SseResponse = Sse<KeepAliveStream<EventStream>>;
fn sse_with_keep_alive(stream: EventStream) -> SseResponse {
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
}
// Error handling function
fn create_error_stream(
message: impl Into<String>,
) -> Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>> {
fn create_error_stream(message: impl Into<String>) -> EventStream {
let message = message.into();
stream::once(async move { Ok(Event::default().event("error").data(message)) }).boxed()
}
@@ -55,13 +64,10 @@ async fn get_message_and_user(
db: &SurrealDbClient,
current_user: Option<User>,
message_id: &str,
) -> Result<
(Message, User, Conversation, Vec<Message>, Option<Message>),
Sse<Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>>,
> {
) -> Result<(Message, User, Conversation, Vec<Message>, Option<Message>), SseResponse> {
// Check authentication
let Some(user) = current_user else {
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"You must be signed in to use this feature",
)));
};
@@ -70,13 +76,13 @@ async fn get_message_and_user(
let message = match db.get_item::<Message>(message_id).await {
Ok(Some(message)) => message,
Ok(None) => {
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Message not found: the specified message does not exist",
)))
}
Err(e) => {
error!("Database error retrieving message {}: {:?}", message_id, e);
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Failed to retrieve message: database error",
)));
}
@@ -88,7 +94,7 @@ async fn get_message_and_user(
{
Err(e) => {
error!("Database error retrieving message {}: {:?}", message_id, e);
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Failed to retrieve message: database error",
)));
}
@@ -96,19 +102,19 @@ async fn get_message_and_user(
};
let Some(message_index) = find_message_index(&history, message_id) else {
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Message not found in conversation history",
)));
};
let Some(message_from_history) = history.get(message_index) else {
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Message not found in conversation history",
)));
};
if message_from_history.role != MessageRole::User {
return Err(Sse::new(create_error_stream(
return Err(sse_with_keep_alive(create_error_stream(
"Only user messages can be used to generate a response",
)));
}
@@ -144,10 +150,7 @@ fn history_before_message(messages: &[Message], message_index: usize) -> Vec<Mes
messages.iter().take(message_index).cloned().collect()
}
fn create_replayed_response_stream(
state: &HtmlState,
existing_ai_message: Message,
) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>> {
fn create_replayed_response_stream(state: &HtmlState, existing_ai_message: Message) -> SseResponse {
let references_event = if existing_ai_message
.references
.as_ref()
@@ -179,11 +182,7 @@ fn create_replayed_response_stream(
yield Ok(Event::default().event("close_stream").data("Stream complete"));
};
Sse::new(event_stream.boxed()).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
sse_with_keep_alive(event_stream.boxed())
}
#[derive(Deserialize)]
@@ -209,7 +208,7 @@ pub async fn get_response_stream(
State(state): State<HtmlState>,
auth: AuthSessionType,
Query(params): Query<QueryParams>,
) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>> {
) -> SseResponse {
// 1. Authentication and initial data validation
let (user_message, user, _conversation, history, existing_ai_response) =
match get_message_and_user(&state.db, auth.current_user, &params.message_id).await {
@@ -249,7 +248,7 @@ pub async fn get_response_stream(
{
Ok(result) => result,
Err(_e) => {
return Sse::new(create_error_stream("Failed to retrieve knowledge"));
return sse_with_keep_alive(create_error_stream("Failed to retrieve knowledge"));
}
};
@@ -269,17 +268,17 @@ pub async fn get_response_stream(
let formatted_user_message =
create_user_message_with_history(&context_json, &history, &user_message.content);
let Ok(settings) = SystemSettings::get_current(&state.db).await else {
return Sse::new(create_error_stream("Failed to retrieve system settings"));
return sse_with_keep_alive(create_error_stream("Failed to retrieve system settings"));
};
let Ok(request) = create_chat_request(formatted_user_message, &settings) else {
return Sse::new(create_error_stream("Failed to create chat request"));
return sse_with_keep_alive(create_error_stream("Failed to create chat request"));
};
// 4. Set up the OpenAI stream
let openai_stream = match state.openai_client.chat().create_stream(request).await {
Ok(stream) => stream,
Err(_e) => {
return Sse::new(create_error_stream("Failed to create OpenAI stream"));
return sse_with_keep_alive(create_error_stream("Failed to create OpenAI stream"));
}
};
@@ -460,11 +459,7 @@ pub async fn get_response_stream(
.data("Stream complete"))
}));
Sse::new(event_stream.boxed()).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
sse_with_keep_alive(event_stream.boxed())
}
struct StreamParserState {

View File

@@ -4,7 +4,7 @@ use axum::{
extract::{Query, State},
http::StatusCode,
response::{
sse::{Event, KeepAlive},
sse::{Event, KeepAlive, KeepAliveStream},
IntoResponse, Response, Sse,
},
};
@@ -36,6 +36,17 @@ use crate::{
AuthSessionType,
};
type EventStream = Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>;
type TaskSse = Sse<KeepAliveStream<EventStream>>;
fn sse_with_keep_alive(stream: EventStream) -> TaskSse {
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-ping"),
)
}
pub async fn show_ingest_form(
State(state): State<HtmlState>,
RequireUser(user): RequireUser,
@@ -158,9 +169,7 @@ pub struct QueryParams {
task_id: String,
}
fn create_error_stream(
message: impl Into<String>,
) -> Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>> {
fn create_error_stream(message: impl Into<String>) -> EventStream {
let message = message.into();
stream::once(async move { Ok(Event::default().event("error").data(message)) }).boxed()
}
@@ -169,13 +178,13 @@ pub async fn get_task_updates_stream(
State(state): State<HtmlState>,
auth: AuthSessionType,
Query(params): Query<QueryParams>,
) -> Sse<Pin<Box<dyn Stream<Item = Result<Event, axum::Error>> + Send>>> {
) -> TaskSse {
let task_id = params.task_id.clone();
let db = state.db.clone();
// 1. Check for authenticated user
let Some(current_user) = auth.current_user else {
return Sse::new(create_error_stream("User not authenticated"));
return sse_with_keep_alive(create_error_stream("User not authenticated"));
};
// 2. Fetch task for initial authorization and to ensure it exists
@@ -183,7 +192,7 @@ pub async fn get_task_updates_stream(
Ok(Some(task)) => {
// 3. Validate user ownership
if task.user_id != current_user.id {
return Sse::new(create_error_stream(
return sse_with_keep_alive(create_error_stream(
"Access denied: You do not have permission to view updates for this task.",
));
}
@@ -269,18 +278,14 @@ pub async fn get_task_updates_stream(
}
};
Sse::new(sse_stream.boxed()).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive-ping"),
)
sse_with_keep_alive(sse_stream.boxed())
}
Ok(None) => Sse::new(create_error_stream(format!(
Ok(None) => sse_with_keep_alive(create_error_stream(format!(
"Task with ID '{task_id}' not found."
))),
Err(e) => {
error!("Failed to fetch task '{task_id}' for authorization: {e:?}");
Sse::new(create_error_stream(
sse_with_keep_alive(create_error_stream(
"An error occurred while retrieving task details. Please try again later.",
))
}