Fix race condition where streamed events could be lost

Events stream in via model_write listener while also being fetched
from the database. If the DB fetch completed before all events were
persisted, replaceModelsInStore would wipe out events that came in
via model_write.

Added mergeModelsInStore that adds fetched events without removing
existing ones. Applied to HTTP, gRPC, and WebSocket event hooks.
This commit is contained in:
Gregory Schier
2026-01-11 07:42:04 -08:00
parent 2a5587c128
commit bbcae34575
6 changed files with 45 additions and 9 deletions

View File

@@ -342,7 +342,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_transaction_single_redirect() { async fn test_transaction_single_redirect() {
let redirect_headers = vec![("Location".to_string(), "https://example.com/new".to_string())]; let redirect_headers =
vec![("Location".to_string(), "https://example.com/new".to_string())];
let responses = vec![ let responses = vec![
MockResponse { status: 302, headers: redirect_headers, body: vec![] }, MockResponse { status: 302, headers: redirect_headers, body: vec![] },
@@ -373,7 +374,8 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_transaction_max_redirects_exceeded() { async fn test_transaction_max_redirects_exceeded() {
let redirect_headers = vec![("Location".to_string(), "https://example.com/loop".to_string())]; let redirect_headers =
vec![("Location".to_string(), "https://example.com/loop".to_string())];
// Create more redirects than allowed // Create more redirects than allowed
let responses: Vec<MockResponse> = (0..12) let responses: Vec<MockResponse> = (0..12)
@@ -525,7 +527,8 @@ mod tests {
_request: SendableHttpRequest, _request: SendableHttpRequest,
_event_tx: mpsc::Sender<HttpResponseEvent>, _event_tx: mpsc::Sender<HttpResponseEvent>,
) -> Result<HttpResponse> { ) -> Result<HttpResponse> {
let headers = vec![("set-cookie".to_string(), "session=xyz789; Path=/".to_string())]; let headers =
vec![("set-cookie".to_string(), "session=xyz789; Path=/".to_string())];
let body_stream: Pin<Box<dyn AsyncRead + Send>> = let body_stream: Pin<Box<dyn AsyncRead + Send>> =
Box::pin(std::io::Cursor::new(vec![])); Box::pin(std::io::Cursor::new(vec![]));
@@ -584,7 +587,10 @@ mod tests {
let headers = vec![ let headers = vec![
("set-cookie".to_string(), "session=abc123; Path=/".to_string()), ("set-cookie".to_string(), "session=abc123; Path=/".to_string()),
("set-cookie".to_string(), "user_id=42; Path=/".to_string()), ("set-cookie".to_string(), "user_id=42; Path=/".to_string()),
("set-cookie".to_string(), "preferences=dark; Path=/; Max-Age=86400".to_string()), (
"set-cookie".to_string(),
"preferences=dark; Path=/; Max-Age=86400".to_string(),
),
]; ];
let body_stream: Pin<Box<dyn AsyncRead + Send>> = let body_stream: Pin<Box<dyn AsyncRead + Send>> =

View File

@@ -206,6 +206,22 @@ export function replaceModelsInStore<
}); });
} }
export function mergeModelsInStore<
M extends AnyModel['model'],
T extends Extract<AnyModel, { model: M }>,
>(model: M, models: T[]) {
mustStore().set(modelStoreDataAtom, (prev: ModelStoreData) => {
const existingModels = { ...prev[model] } as Record<string, T>;
for (const m of models) {
existingModels[m.id] = m;
}
return {
...prev,
[model]: existingModels,
};
});
}
function shouldIgnoreModel({ model, updateSource }: ModelPayload) { function shouldIgnoreModel({ model, updateSource }: ModelPayload) {
// Never ignore updates from non-user sources // Never ignore updates from non-user sources
if (updateSource.type !== 'window') { if (updateSource.type !== 'window') {

View File

@@ -1,6 +1,10 @@
import { invoke } from '@tauri-apps/api/core'; import { invoke } from '@tauri-apps/api/core';
import type { HttpResponse, HttpResponseEvent } from '@yaakapp-internal/models'; import type { HttpResponse, HttpResponseEvent } from '@yaakapp-internal/models';
import { httpResponseEventsAtom, replaceModelsInStore } from '@yaakapp-internal/models'; import {
httpResponseEventsAtom,
mergeModelsInStore,
replaceModelsInStore,
} from '@yaakapp-internal/models';
import { useAtomValue } from 'jotai'; import { useAtomValue } from 'jotai';
import { useEffect, useMemo } from 'react'; import { useEffect, useMemo } from 'react';
@@ -13,8 +17,10 @@ export function useHttpResponseEvents(response: HttpResponse | null) {
return; return;
} }
// Use merge instead of replace to preserve events that came in via model_write
// while we were fetching from the database
invoke<HttpResponseEvent[]>('cmd_get_http_response_events', { responseId: response.id }).then( invoke<HttpResponseEvent[]>('cmd_get_http_response_events', { responseId: response.id }).then(
(events) => replaceModelsInStore('http_response_event', events), (events) => mergeModelsInStore('http_response_event', events),
); );
}, [response?.id]); }, [response?.id]);

View File

@@ -3,6 +3,7 @@ import type { GrpcConnection, GrpcEvent } from '@yaakapp-internal/models';
import { import {
grpcConnectionsAtom, grpcConnectionsAtom,
grpcEventsAtom, grpcEventsAtom,
mergeModelsInStore,
replaceModelsInStore, replaceModelsInStore,
} from '@yaakapp-internal/models'; } from '@yaakapp-internal/models';
import { atom, useAtomValue } from 'jotai'; import { atom, useAtomValue } from 'jotai';
@@ -67,8 +68,10 @@ export function useGrpcEvents(connectionId: string | null) {
return; return;
} }
// Use merge instead of replace to preserve events that came in via model_write
// while we were fetching from the database
invoke<GrpcEvent[]>('models_grpc_events', { connectionId }).then((events) => { invoke<GrpcEvent[]>('models_grpc_events', { connectionId }).then((events) => {
replaceModelsInStore('grpc_event', events); mergeModelsInStore('grpc_event', events);
}); });
}, [connectionId]); }, [connectionId]);

View File

@@ -1,6 +1,7 @@
import { invoke } from '@tauri-apps/api/core'; import { invoke } from '@tauri-apps/api/core';
import type { WebsocketConnection, WebsocketEvent } from '@yaakapp-internal/models'; import type { WebsocketConnection, WebsocketEvent } from '@yaakapp-internal/models';
import { import {
mergeModelsInStore,
replaceModelsInStore, replaceModelsInStore,
websocketConnectionsAtom, websocketConnectionsAtom,
websocketEventsAtom, websocketEventsAtom,
@@ -54,8 +55,10 @@ export function useWebsocketEvents(connectionId: string | null) {
return; return;
} }
// Use merge instead of replace to preserve events that came in via model_write
// while we were fetching from the database
invoke<WebsocketEvent[]>('models_websocket_events', { connectionId }).then( invoke<WebsocketEvent[]>('models_websocket_events', { connectionId }).then(
(events) => replaceModelsInStore('websocket_event', events), (events) => mergeModelsInStore('websocket_event', events),
); );
}, [connectionId]); }, [connectionId]);

View File

@@ -11,7 +11,9 @@ function makeEvent(
id: 'test', id: 'test',
model: 'http_response_event', model: 'http_response_event',
responseId: 'resp', responseId: 'resp',
createdAt: Date.now(), workspaceId: 'ws',
createdAt: new Date().toISOString(),
updatedAt: new Date().toISOString(),
event: { type, name, value } as HttpResponseEvent['event'], event: { type, name, value } as HttpResponseEvent['event'],
}; };
} }