Async connection management

This commit is contained in:
Gregory Schier
2024-02-05 11:29:27 -08:00
parent a7f2a86d71
commit ef7f942a8f
9 changed files with 102 additions and 84 deletions

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n UPDATE grpc_connections\n SET (elapsed) = (-1)\n WHERE elapsed = 0;\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "a690a04cd1ebe8c3dbfd0cd98ae4ef093a1696d7b7ecaf694d12e5fafd62b685"
}

View File

@@ -29,7 +29,7 @@ pub struct MethodDefinition {
pub server_streaming: bool, pub server_streaming: bool,
} }
pub async fn callable(uri: &Uri) -> Vec<ServiceDefinition> { pub async fn reflect(uri: &Uri) -> Vec<ServiceDefinition> {
let (pool, _) = fill_pool(uri).await; let (pool, _) = fill_pool(uri).await;
pool.services() pool.services()

View File

@@ -41,18 +41,18 @@ use window_ext::TrafficLightWindowExt;
use crate::analytics::{AnalyticsAction, AnalyticsResource}; use crate::analytics::{AnalyticsAction, AnalyticsResource};
use crate::http::send_http_request; use crate::http::send_http_request;
use crate::models::{ use crate::models::{
cancel_pending_responses, create_response, delete_all_grpc_connections, cancel_pending_grpc_connections, cancel_pending_responses, create_response,
delete_all_http_responses, delete_cookie_jar, delete_environment, delete_folder, delete_all_grpc_connections, delete_all_http_responses, delete_cookie_jar, delete_environment,
delete_grpc_connection, delete_http_request, delete_http_response, delete_workspace, delete_folder, delete_grpc_connection, delete_http_request, delete_http_response,
duplicate_grpc_request, duplicate_http_request, get_cookie_jar, get_environment, get_folder, delete_workspace, duplicate_grpc_request, duplicate_http_request, get_cookie_jar,
get_grpc_request, get_http_request, get_http_response, get_key_value_raw, get_environment, get_folder, get_grpc_request, get_http_request, get_http_response,
get_or_create_settings, get_workspace, get_workspace_export_resources, list_cookie_jars, get_key_value_raw, get_or_create_settings, get_workspace, get_workspace_export_resources,
list_environments, list_folders, list_grpc_connections, list_grpc_messages, list_grpc_requests, list_cookie_jars, list_environments, list_folders, list_grpc_connections, list_grpc_messages,
list_requests, list_responses, list_workspaces, set_key_value_raw, update_response_if_id, list_grpc_requests, list_requests, list_responses, list_workspaces, set_key_value_raw,
update_settings, upsert_cookie_jar, upsert_environment, upsert_folder, upsert_grpc_connection, update_response_if_id, update_settings, upsert_cookie_jar, upsert_environment, upsert_folder,
upsert_grpc_message, upsert_grpc_request, upsert_http_request, upsert_workspace, CookieJar, upsert_grpc_connection, upsert_grpc_message, upsert_grpc_request, upsert_http_request,
Environment, EnvironmentVariable, Folder, GrpcConnection, GrpcMessage, GrpcRequest, upsert_workspace, CookieJar, Environment, EnvironmentVariable, Folder, GrpcConnection,
HttpRequest, HttpResponse, KeyValue, Settings, Workspace, GrpcMessage, GrpcRequest, HttpRequest, HttpResponse, KeyValue, Settings, Workspace,
}; };
use crate::plugin::{ImportResources, ImportResult}; use crate::plugin::{ImportResources, ImportResult};
use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater}; use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater};
@@ -92,9 +92,15 @@ async fn migrate_db(app_handle: AppHandle, db: &Mutex<Pool<Sqlite>>) -> Result<(
} }
#[tauri::command] #[tauri::command]
async fn cmd_grpc_reflect(endpoint: &str) -> Result<Vec<ServiceDefinition>, String> { async fn cmd_grpc_reflect(
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?; request_id: &str,
Ok(grpc::callable(&uri).await) app_handle: AppHandle,
) -> Result<Vec<ServiceDefinition>, String> {
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
Ok(grpc::reflect(&uri).await)
} }
#[tauri::command] #[tauri::command]
@@ -1636,6 +1642,7 @@ fn main() {
app.manage(m); app.manage(m);
let h = app.handle(); let h = app.handle();
let _ = cancel_pending_responses(&h).await; let _ = cancel_pending_responses(&h).await;
let _ = cancel_pending_grpc_connections(&h).await;
}); });
Ok(()) Ok(())

View File

@@ -1183,6 +1183,20 @@ pub async fn create_response(
get_http_response(app_handle, &id).await get_http_response(app_handle, &id).await
} }
pub async fn cancel_pending_grpc_connections(app_handle: &AppHandle) -> Result<(), sqlx::Error> {
let db = get_db(app_handle).await;
sqlx::query!(
r#"
UPDATE grpc_connections
SET (elapsed) = (-1)
WHERE elapsed = 0;
"#,
)
.execute(&db)
.await?;
Ok(())
}
pub async fn cancel_pending_responses(app_handle: &AppHandle) -> Result<(), sqlx::Error> { pub async fn cancel_pending_responses(app_handle: &AppHandle) -> Result<(), sqlx::Error> {
let db = get_db(app_handle).await; let db = get_db(app_handle).await;
sqlx::query!( sqlx::query!(

View File

@@ -9,7 +9,6 @@ import { useGrpc } from '../hooks/useGrpc';
import { useGrpcConnections } from '../hooks/useGrpcConnections'; import { useGrpcConnections } from '../hooks/useGrpcConnections';
import { useGrpcMessages } from '../hooks/useGrpcMessages'; import { useGrpcMessages } from '../hooks/useGrpcMessages';
import { useUpdateGrpcRequest } from '../hooks/useUpdateGrpcRequest'; import { useUpdateGrpcRequest } from '../hooks/useUpdateGrpcRequest';
import { count, pluralize } from '../lib/pluralize';
import { Banner } from './core/Banner'; import { Banner } from './core/Banner';
import { Button } from './core/Button'; import { Button } from './core/Button';
import { HotKeyList } from './core/HotKeyList'; import { HotKeyList } from './core/HotKeyList';
@@ -33,10 +32,10 @@ export function GrpcConnectionLayout({ style }: Props) {
const updateRequest = useUpdateGrpcRequest(activeRequest?.id ?? null); const updateRequest = useUpdateGrpcRequest(activeRequest?.id ?? null);
const alert = useAlert(); const alert = useAlert();
const [activeMessageId, setActiveMessageId] = useState<string | null>(null); const [activeMessageId, setActiveMessageId] = useState<string | null>(null);
const grpc = useGrpc(activeRequest?.url ?? null, activeRequest?.id ?? null);
const connections = useGrpcConnections(activeRequest?.id ?? null); const connections = useGrpcConnections(activeRequest?.id ?? null);
const activeConnection = connections[0] ?? null; const activeConnection = connections[0] ?? null;
const messages = useGrpcMessages(activeConnection?.id ?? null); const messages = useGrpcMessages(activeConnection?.id ?? null);
const grpc = useGrpc(activeRequest, activeConnection);
const activeMethod = useMemo(() => { const activeMethod = useMemo(() => {
if (grpc.services == null || activeRequest == null) return null; if (grpc.services == null || activeRequest == null) return null;
@@ -59,13 +58,13 @@ export function GrpcConnectionLayout({ style }: Props) {
}); });
} }
if (activeMethod.clientStreaming && activeMethod.serverStreaming) { if (activeMethod.clientStreaming && activeMethod.serverStreaming) {
await grpc.streaming.mutateAsync(activeRequest.id); await grpc.streaming.mutateAsync();
} else if (!activeMethod.clientStreaming && activeMethod.serverStreaming) { } else if (!activeMethod.clientStreaming && activeMethod.serverStreaming) {
await grpc.serverStreaming.mutateAsync(activeRequest.id); await grpc.serverStreaming.mutateAsync();
} else if (activeMethod.clientStreaming && !activeMethod.serverStreaming) { } else if (activeMethod.clientStreaming && !activeMethod.serverStreaming) {
await grpc.clientStreaming.mutateAsync(activeRequest.id); await grpc.clientStreaming.mutateAsync();
} else { } else {
const msg = await grpc.unary.mutateAsync(activeRequest.id); const msg = await grpc.unary.mutateAsync();
setActiveMessageId(msg.id); setActiveMessageId(msg.id);
} }
}, },
@@ -291,7 +290,7 @@ export function GrpcConnectionLayout({ style }: Props) {
<div className="w-full grid grid-rows-[auto_minmax(0,1fr)] items-center"> <div className="w-full grid grid-rows-[auto_minmax(0,1fr)] items-center">
<HStack className="pl-3 mb-1 font-mono" alignItems="center"> <HStack className="pl-3 mb-1 font-mono" alignItems="center">
<HStack alignItems="center" space={2}> <HStack alignItems="center" space={2}>
{count('message', messages.filter((m) => !m.isInfo).length)} <span>{messages.filter((m) => !m.isInfo).length} messages</span>
{grpc.isStreaming && ( {grpc.isStreaming && (
<Icon icon="refresh" size="sm" spin className="text-gray-600" /> <Icon icon="refresh" size="sm" spin className="text-gray-600" />
)} )}

View File

@@ -1,3 +1,4 @@
import { formatDistanceToNow } from 'date-fns';
import { useDeleteGrpcConnection } from '../hooks/useDeleteGrpcConnection'; import { useDeleteGrpcConnection } from '../hooks/useDeleteGrpcConnection';
import { useDeleteGrpcConnections } from '../hooks/useDeleteGrpcConnections'; import { useDeleteGrpcConnections } from '../hooks/useDeleteGrpcConnections';
import type { GrpcConnection } from '../lib/models'; import type { GrpcConnection } from '../lib/models';
@@ -42,6 +43,7 @@ export const RecentConnectionsDropdown = function ResponsePane({
key: c.id, key: c.id,
label: ( label: (
<HStack space={2} alignItems="center"> <HStack space={2} alignItems="center">
{formatDistanceToNow(c.createdAt)} &bull;{' '}
<span className="font-mono text-xs">{c.elapsed}ms</span> <span className="font-mono text-xs">{c.elapsed}ms</span>
</HStack> </HStack>
), ),

View File

@@ -21,6 +21,7 @@ import { useGrpcRequests } from '../hooks/useGrpcRequests';
import { useHotKey } from '../hooks/useHotKey'; import { useHotKey } from '../hooks/useHotKey';
import { useHttpRequests } from '../hooks/useHttpRequests'; import { useHttpRequests } from '../hooks/useHttpRequests';
import { useKeyValue } from '../hooks/useKeyValue'; import { useKeyValue } from '../hooks/useKeyValue';
import { useLatestGrpcConnection } from '../hooks/useLatestGrpcConnection';
import { useLatestHttpResponse } from '../hooks/useLatestHttpResponse'; import { useLatestHttpResponse } from '../hooks/useLatestHttpResponse';
import { usePrompt } from '../hooks/usePrompt'; import { usePrompt } from '../hooks/usePrompt';
import { useSendManyRequests } from '../hooks/useSendFolder'; import { useSendManyRequests } from '../hooks/useSendFolder';
@@ -558,7 +559,8 @@ const SidebarItem = forwardRef(function SidebarItem(
const duplicateGrpcRequest = useDuplicateGrpcRequest({ id: itemId, navigateAfter: true }); const duplicateGrpcRequest = useDuplicateGrpcRequest({ id: itemId, navigateAfter: true });
const sendRequest = useSendRequest(itemId); const sendRequest = useSendRequest(itemId);
const sendManyRequests = useSendManyRequests(); const sendManyRequests = useSendManyRequests();
const latestResponse = useLatestHttpResponse(itemId); const latestHttpResponse = useLatestHttpResponse(itemId);
const latestGrpcConnection = useLatestGrpcConnection(itemId);
const updateHttpRequest = useUpdateHttpRequest(itemId); const updateHttpRequest = useUpdateHttpRequest(itemId);
const updateGrpcRequest = useUpdateGrpcRequest(itemId); const updateGrpcRequest = useUpdateGrpcRequest(itemId);
const updateAnyFolder = useUpdateAnyFolder(); const updateAnyFolder = useUpdateAnyFolder();
@@ -751,15 +753,19 @@ const SidebarItem = forwardRef(function SidebarItem(
) : ( ) : (
<span className="truncate">{itemName || itemFallbackName}</span> <span className="truncate">{itemName || itemFallbackName}</span>
)} )}
{latestResponse && ( {latestGrpcConnection ? (
<div className="ml-auto"> <div className="ml-auto">
{isResponseLoading(latestResponse) ? ( {latestGrpcConnection.elapsed === 0 && <Icon spin size="sm" icon="update" />}
</div>
) : latestHttpResponse ? (
<div className="ml-auto">
{isResponseLoading(latestHttpResponse) ? (
<Icon spin size="sm" icon="update" /> <Icon spin size="sm" icon="update" />
) : ( ) : (
<StatusTag className="text-2xs dark:opacity-80" response={latestResponse} /> <StatusTag className="text-2xs dark:opacity-80" response={latestHttpResponse} />
)} )}
</div> </div>
)} ) : null}
</button> </button>
</div> </div>
{children} {children}

View File

@@ -1,100 +1,71 @@
import { useMutation, useQuery } from '@tanstack/react-query'; import { useMutation, useQuery } from '@tanstack/react-query';
import { invoke } from '@tauri-apps/api'; import { invoke } from '@tauri-apps/api';
import { emit } from '@tauri-apps/api/event'; import { emit } from '@tauri-apps/api/event';
import { useEffect, useState } from 'react'; import type { GrpcConnection, GrpcMessage, GrpcRequest } from '../lib/models';
import type { GrpcConnection, GrpcMessage } from '../lib/models';
import { useKeyValue } from './useKeyValue';
interface ReflectResponseService { interface ReflectResponseService {
name: string; name: string;
methods: { name: string; schema: string; serverStreaming: boolean; clientStreaming: boolean }[]; methods: { name: string; schema: string; serverStreaming: boolean; clientStreaming: boolean }[];
} }
export function useGrpc(url: string | null, requestId: string | null) { export function useGrpc(req: GrpcRequest | null, conn: GrpcConnection | null) {
const messages = useKeyValue<GrpcMessage[]>({ const requestId = req?.id ?? 'n/a';
namespace: 'debug',
key: ['grpc_msgs', requestId ?? 'n/a'],
defaultValue: [],
});
const [activeConnectionId, setActiveConnectionId] = useState<string | null>(null);
useEffect(() => { const unary = useMutation<GrpcMessage, string>({
setActiveConnectionId(null); mutationKey: ['grpc_unary', conn?.id ?? 'n/a'],
}, [requestId]); mutationFn: async () => {
const unary = useMutation<GrpcMessage, string, string>({
mutationKey: ['grpc_unary', url],
mutationFn: async (id) => {
const message = (await invoke('cmd_grpc_call_unary', { const message = (await invoke('cmd_grpc_call_unary', {
requestId: id, requestId,
})) as GrpcMessage; })) as GrpcMessage;
await messages.set([message]);
return message; return message;
}, },
}); });
const clientStreaming = useMutation<void, string, string>({ const clientStreaming = useMutation<void, string>({
mutationKey: ['grpc_client_streaming', url], mutationKey: ['grpc_client_streaming', conn?.id ?? 'n/a'],
mutationFn: async (requestId) => { mutationFn: async () => {
if (url === null) throw new Error('No URL provided'); await invoke('cmd_grpc_client_streaming', { requestId });
await messages.set([]);
const c = (await invoke('cmd_grpc_client_streaming', { requestId })) as GrpcConnection;
setActiveConnectionId(c.id);
}, },
}); });
const serverStreaming = useMutation<void, string, string>({ const serverStreaming = useMutation<void, string>({
mutationKey: ['grpc_server_streaming', url], mutationKey: ['grpc_server_streaming', conn?.id ?? 'n/a'],
mutationFn: async (requestId) => { mutationFn: async () => {
if (url === null) throw new Error('No URL provided'); await invoke('cmd_grpc_server_streaming', { requestId });
await messages.set([]);
const c = (await invoke('cmd_grpc_server_streaming', { requestId })) as GrpcConnection;
setActiveConnectionId(c.id);
}, },
}); });
const streaming = useMutation<void, string, string>({ const streaming = useMutation<void, string>({
mutationKey: ['grpc_streaming', url], mutationKey: ['grpc_streaming', conn?.id ?? 'n/a'],
mutationFn: async (requestId) => { mutationFn: async () => {
if (url === null) throw new Error('No URL provided'); await invoke('cmd_grpc_streaming', { requestId });
await messages.set([]);
const id: string = await invoke('cmd_grpc_streaming', {
requestId,
});
setActiveConnectionId(id);
}, },
}); });
const send = useMutation({ const send = useMutation({
mutationKey: ['grpc_send', url],
mutationFn: async ({ message }: { message: string }) => { mutationFn: async ({ message }: { message: string }) => {
if (activeConnectionId == null) throw new Error('No active connection'); await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, { Message: message });
await messages.set([]);
await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message });
}, },
}); });
const cancel = useMutation({ const cancel = useMutation({
mutationKey: ['grpc_cancel', url], mutationKey: ['grpc_cancel', conn?.id ?? 'n/a'],
mutationFn: async () => { mutationFn: async () => {
setActiveConnectionId(null); await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, 'Cancel');
await emit(`grpc_client_msg_${activeConnectionId}`, 'Cancel');
}, },
}); });
const commit = useMutation({ const commit = useMutation({
mutationKey: ['grpc_commit', url], mutationKey: ['grpc_commit', conn?.id ?? 'n/a'],
mutationFn: async () => { mutationFn: async () => {
setActiveConnectionId(null); await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, 'Commit');
await emit(`grpc_client_msg_${activeConnectionId}`, 'Commit');
}, },
}); });
const reflect = useQuery<ReflectResponseService[]>({ const reflect = useQuery<ReflectResponseService[]>({
queryKey: ['grpc_reflect', url ?? ''], queryKey: ['grpc_reflect', conn?.id ?? 'n/a'],
queryFn: async () => { queryFn: async () => {
if (url === null) return []; return (await invoke('cmd_grpc_reflect', { requestId })) as ReflectResponseService[];
return (await invoke('cmd_grpc_reflect', { endpoint: url })) as ReflectResponseService[];
}, },
}); });
@@ -106,7 +77,7 @@ export function useGrpc(url: string | null, requestId: string | null) {
services: reflect.data, services: reflect.data,
cancel, cancel,
commit, commit,
isStreaming: activeConnectionId !== null, isStreaming: conn?.elapsed === 0,
send, send,
}; };
} }

View File

@@ -0,0 +1,7 @@
import type { GrpcConnection } from '../lib/models';
import { useGrpcConnections } from './useGrpcConnections';
export function useLatestGrpcConnection(requestId: string | null): GrpcConnection | null {
const connections = useGrpcConnections(requestId);
return connections[0] ?? null;
}