Async connection management

This commit is contained in:
Gregory Schier
2024-02-05 11:29:27 -08:00
parent 3ed00c0955
commit 5ad13a61e6
9 changed files with 102 additions and 84 deletions

View File

@@ -0,0 +1,12 @@
{
"db_name": "SQLite",
"query": "\n UPDATE grpc_connections\n SET (elapsed) = (-1)\n WHERE elapsed = 0;\n ",
"describe": {
"columns": [],
"parameters": {
"Right": 0
},
"nullable": []
},
"hash": "a690a04cd1ebe8c3dbfd0cd98ae4ef093a1696d7b7ecaf694d12e5fafd62b685"
}

View File

@@ -29,7 +29,7 @@ pub struct MethodDefinition {
pub server_streaming: bool,
}
pub async fn callable(uri: &Uri) -> Vec<ServiceDefinition> {
pub async fn reflect(uri: &Uri) -> Vec<ServiceDefinition> {
let (pool, _) = fill_pool(uri).await;
pool.services()

View File

@@ -41,18 +41,18 @@ use window_ext::TrafficLightWindowExt;
use crate::analytics::{AnalyticsAction, AnalyticsResource};
use crate::http::send_http_request;
use crate::models::{
cancel_pending_responses, create_response, delete_all_grpc_connections,
delete_all_http_responses, delete_cookie_jar, delete_environment, delete_folder,
delete_grpc_connection, delete_http_request, delete_http_response, delete_workspace,
duplicate_grpc_request, duplicate_http_request, get_cookie_jar, get_environment, get_folder,
get_grpc_request, get_http_request, get_http_response, get_key_value_raw,
get_or_create_settings, get_workspace, get_workspace_export_resources, list_cookie_jars,
list_environments, list_folders, list_grpc_connections, list_grpc_messages, list_grpc_requests,
list_requests, list_responses, list_workspaces, set_key_value_raw, update_response_if_id,
update_settings, upsert_cookie_jar, upsert_environment, upsert_folder, upsert_grpc_connection,
upsert_grpc_message, upsert_grpc_request, upsert_http_request, upsert_workspace, CookieJar,
Environment, EnvironmentVariable, Folder, GrpcConnection, GrpcMessage, GrpcRequest,
HttpRequest, HttpResponse, KeyValue, Settings, Workspace,
cancel_pending_grpc_connections, cancel_pending_responses, create_response,
delete_all_grpc_connections, delete_all_http_responses, delete_cookie_jar, delete_environment,
delete_folder, delete_grpc_connection, delete_http_request, delete_http_response,
delete_workspace, duplicate_grpc_request, duplicate_http_request, get_cookie_jar,
get_environment, get_folder, get_grpc_request, get_http_request, get_http_response,
get_key_value_raw, get_or_create_settings, get_workspace, get_workspace_export_resources,
list_cookie_jars, list_environments, list_folders, list_grpc_connections, list_grpc_messages,
list_grpc_requests, list_requests, list_responses, list_workspaces, set_key_value_raw,
update_response_if_id, update_settings, upsert_cookie_jar, upsert_environment, upsert_folder,
upsert_grpc_connection, upsert_grpc_message, upsert_grpc_request, upsert_http_request,
upsert_workspace, CookieJar, Environment, EnvironmentVariable, Folder, GrpcConnection,
GrpcMessage, GrpcRequest, HttpRequest, HttpResponse, KeyValue, Settings, Workspace,
};
use crate::plugin::{ImportResources, ImportResult};
use crate::updates::{update_mode_from_str, UpdateMode, YaakUpdater};
@@ -92,9 +92,15 @@ async fn migrate_db(app_handle: AppHandle, db: &Mutex<Pool<Sqlite>>) -> Result<(
}
#[tauri::command]
async fn cmd_grpc_reflect(endpoint: &str) -> Result<Vec<ServiceDefinition>, String> {
let uri = safe_uri(endpoint).map_err(|e| e.to_string())?;
Ok(grpc::callable(&uri).await)
async fn cmd_grpc_reflect(
request_id: &str,
app_handle: AppHandle,
) -> Result<Vec<ServiceDefinition>, String> {
let req = get_grpc_request(&app_handle, request_id)
.await
.map_err(|e| e.to_string())?;
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
Ok(grpc::reflect(&uri).await)
}
#[tauri::command]
@@ -1636,6 +1642,7 @@ fn main() {
app.manage(m);
let h = app.handle();
let _ = cancel_pending_responses(&h).await;
let _ = cancel_pending_grpc_connections(&h).await;
});
Ok(())

View File

@@ -1183,6 +1183,20 @@ pub async fn create_response(
get_http_response(app_handle, &id).await
}
pub async fn cancel_pending_grpc_connections(app_handle: &AppHandle) -> Result<(), sqlx::Error> {
let db = get_db(app_handle).await;
sqlx::query!(
r#"
UPDATE grpc_connections
SET (elapsed) = (-1)
WHERE elapsed = 0;
"#,
)
.execute(&db)
.await?;
Ok(())
}
pub async fn cancel_pending_responses(app_handle: &AppHandle) -> Result<(), sqlx::Error> {
let db = get_db(app_handle).await;
sqlx::query!(

View File

@@ -9,7 +9,6 @@ import { useGrpc } from '../hooks/useGrpc';
import { useGrpcConnections } from '../hooks/useGrpcConnections';
import { useGrpcMessages } from '../hooks/useGrpcMessages';
import { useUpdateGrpcRequest } from '../hooks/useUpdateGrpcRequest';
import { count, pluralize } from '../lib/pluralize';
import { Banner } from './core/Banner';
import { Button } from './core/Button';
import { HotKeyList } from './core/HotKeyList';
@@ -33,10 +32,10 @@ export function GrpcConnectionLayout({ style }: Props) {
const updateRequest = useUpdateGrpcRequest(activeRequest?.id ?? null);
const alert = useAlert();
const [activeMessageId, setActiveMessageId] = useState<string | null>(null);
const grpc = useGrpc(activeRequest?.url ?? null, activeRequest?.id ?? null);
const connections = useGrpcConnections(activeRequest?.id ?? null);
const activeConnection = connections[0] ?? null;
const messages = useGrpcMessages(activeConnection?.id ?? null);
const grpc = useGrpc(activeRequest, activeConnection);
const activeMethod = useMemo(() => {
if (grpc.services == null || activeRequest == null) return null;
@@ -59,13 +58,13 @@ export function GrpcConnectionLayout({ style }: Props) {
});
}
if (activeMethod.clientStreaming && activeMethod.serverStreaming) {
await grpc.streaming.mutateAsync(activeRequest.id);
await grpc.streaming.mutateAsync();
} else if (!activeMethod.clientStreaming && activeMethod.serverStreaming) {
await grpc.serverStreaming.mutateAsync(activeRequest.id);
await grpc.serverStreaming.mutateAsync();
} else if (activeMethod.clientStreaming && !activeMethod.serverStreaming) {
await grpc.clientStreaming.mutateAsync(activeRequest.id);
await grpc.clientStreaming.mutateAsync();
} else {
const msg = await grpc.unary.mutateAsync(activeRequest.id);
const msg = await grpc.unary.mutateAsync();
setActiveMessageId(msg.id);
}
},
@@ -291,7 +290,7 @@ export function GrpcConnectionLayout({ style }: Props) {
<div className="w-full grid grid-rows-[auto_minmax(0,1fr)] items-center">
<HStack className="pl-3 mb-1 font-mono" alignItems="center">
<HStack alignItems="center" space={2}>
{count('message', messages.filter((m) => !m.isInfo).length)}
<span>{messages.filter((m) => !m.isInfo).length} messages</span>
{grpc.isStreaming && (
<Icon icon="refresh" size="sm" spin className="text-gray-600" />
)}

View File

@@ -1,3 +1,4 @@
import { formatDistanceToNow } from 'date-fns';
import { useDeleteGrpcConnection } from '../hooks/useDeleteGrpcConnection';
import { useDeleteGrpcConnections } from '../hooks/useDeleteGrpcConnections';
import type { GrpcConnection } from '../lib/models';
@@ -42,6 +43,7 @@ export const RecentConnectionsDropdown = function ResponsePane({
key: c.id,
label: (
<HStack space={2} alignItems="center">
{formatDistanceToNow(c.createdAt)} &bull;{' '}
<span className="font-mono text-xs">{c.elapsed}ms</span>
</HStack>
),

View File

@@ -21,6 +21,7 @@ import { useGrpcRequests } from '../hooks/useGrpcRequests';
import { useHotKey } from '../hooks/useHotKey';
import { useHttpRequests } from '../hooks/useHttpRequests';
import { useKeyValue } from '../hooks/useKeyValue';
import { useLatestGrpcConnection } from '../hooks/useLatestGrpcConnection';
import { useLatestHttpResponse } from '../hooks/useLatestHttpResponse';
import { usePrompt } from '../hooks/usePrompt';
import { useSendManyRequests } from '../hooks/useSendFolder';
@@ -558,7 +559,8 @@ const SidebarItem = forwardRef(function SidebarItem(
const duplicateGrpcRequest = useDuplicateGrpcRequest({ id: itemId, navigateAfter: true });
const sendRequest = useSendRequest(itemId);
const sendManyRequests = useSendManyRequests();
const latestResponse = useLatestHttpResponse(itemId);
const latestHttpResponse = useLatestHttpResponse(itemId);
const latestGrpcConnection = useLatestGrpcConnection(itemId);
const updateHttpRequest = useUpdateHttpRequest(itemId);
const updateGrpcRequest = useUpdateGrpcRequest(itemId);
const updateAnyFolder = useUpdateAnyFolder();
@@ -751,15 +753,19 @@ const SidebarItem = forwardRef(function SidebarItem(
) : (
<span className="truncate">{itemName || itemFallbackName}</span>
)}
{latestResponse && (
{latestGrpcConnection ? (
<div className="ml-auto">
{isResponseLoading(latestResponse) ? (
{latestGrpcConnection.elapsed === 0 && <Icon spin size="sm" icon="update" />}
</div>
) : latestHttpResponse ? (
<div className="ml-auto">
{isResponseLoading(latestHttpResponse) ? (
<Icon spin size="sm" icon="update" />
) : (
<StatusTag className="text-2xs dark:opacity-80" response={latestResponse} />
<StatusTag className="text-2xs dark:opacity-80" response={latestHttpResponse} />
)}
</div>
)}
) : null}
</button>
</div>
{children}

View File

@@ -1,100 +1,71 @@
import { useMutation, useQuery } from '@tanstack/react-query';
import { invoke } from '@tauri-apps/api';
import { emit } from '@tauri-apps/api/event';
import { useEffect, useState } from 'react';
import type { GrpcConnection, GrpcMessage } from '../lib/models';
import { useKeyValue } from './useKeyValue';
import type { GrpcConnection, GrpcMessage, GrpcRequest } from '../lib/models';
interface ReflectResponseService {
name: string;
methods: { name: string; schema: string; serverStreaming: boolean; clientStreaming: boolean }[];
}
export function useGrpc(url: string | null, requestId: string | null) {
const messages = useKeyValue<GrpcMessage[]>({
namespace: 'debug',
key: ['grpc_msgs', requestId ?? 'n/a'],
defaultValue: [],
});
const [activeConnectionId, setActiveConnectionId] = useState<string | null>(null);
export function useGrpc(req: GrpcRequest | null, conn: GrpcConnection | null) {
const requestId = req?.id ?? 'n/a';
useEffect(() => {
setActiveConnectionId(null);
}, [requestId]);
const unary = useMutation<GrpcMessage, string, string>({
mutationKey: ['grpc_unary', url],
mutationFn: async (id) => {
const unary = useMutation<GrpcMessage, string>({
mutationKey: ['grpc_unary', conn?.id ?? 'n/a'],
mutationFn: async () => {
const message = (await invoke('cmd_grpc_call_unary', {
requestId: id,
requestId,
})) as GrpcMessage;
await messages.set([message]);
return message;
},
});
const clientStreaming = useMutation<void, string, string>({
mutationKey: ['grpc_client_streaming', url],
mutationFn: async (requestId) => {
if (url === null) throw new Error('No URL provided');
await messages.set([]);
const c = (await invoke('cmd_grpc_client_streaming', { requestId })) as GrpcConnection;
setActiveConnectionId(c.id);
const clientStreaming = useMutation<void, string>({
mutationKey: ['grpc_client_streaming', conn?.id ?? 'n/a'],
mutationFn: async () => {
await invoke('cmd_grpc_client_streaming', { requestId });
},
});
const serverStreaming = useMutation<void, string, string>({
mutationKey: ['grpc_server_streaming', url],
mutationFn: async (requestId) => {
if (url === null) throw new Error('No URL provided');
await messages.set([]);
const c = (await invoke('cmd_grpc_server_streaming', { requestId })) as GrpcConnection;
setActiveConnectionId(c.id);
const serverStreaming = useMutation<void, string>({
mutationKey: ['grpc_server_streaming', conn?.id ?? 'n/a'],
mutationFn: async () => {
await invoke('cmd_grpc_server_streaming', { requestId });
},
});
const streaming = useMutation<void, string, string>({
mutationKey: ['grpc_streaming', url],
mutationFn: async (requestId) => {
if (url === null) throw new Error('No URL provided');
await messages.set([]);
const id: string = await invoke('cmd_grpc_streaming', {
requestId,
});
setActiveConnectionId(id);
const streaming = useMutation<void, string>({
mutationKey: ['grpc_streaming', conn?.id ?? 'n/a'],
mutationFn: async () => {
await invoke('cmd_grpc_streaming', { requestId });
},
});
const send = useMutation({
mutationKey: ['grpc_send', url],
mutationFn: async ({ message }: { message: string }) => {
if (activeConnectionId == null) throw new Error('No active connection');
await messages.set([]);
await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message });
await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, { Message: message });
},
});
const cancel = useMutation({
mutationKey: ['grpc_cancel', url],
mutationKey: ['grpc_cancel', conn?.id ?? 'n/a'],
mutationFn: async () => {
setActiveConnectionId(null);
await emit(`grpc_client_msg_${activeConnectionId}`, 'Cancel');
await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, 'Cancel');
},
});
const commit = useMutation({
mutationKey: ['grpc_commit', url],
mutationKey: ['grpc_commit', conn?.id ?? 'n/a'],
mutationFn: async () => {
setActiveConnectionId(null);
await emit(`grpc_client_msg_${activeConnectionId}`, 'Commit');
await emit(`grpc_client_msg_${conn?.id ?? 'none'}`, 'Commit');
},
});
const reflect = useQuery<ReflectResponseService[]>({
queryKey: ['grpc_reflect', url ?? ''],
queryKey: ['grpc_reflect', conn?.id ?? 'n/a'],
queryFn: async () => {
if (url === null) return [];
return (await invoke('cmd_grpc_reflect', { endpoint: url })) as ReflectResponseService[];
return (await invoke('cmd_grpc_reflect', { requestId })) as ReflectResponseService[];
},
});
@@ -106,7 +77,7 @@ export function useGrpc(url: string | null, requestId: string | null) {
services: reflect.data,
cancel,
commit,
isStreaming: activeConnectionId !== null,
isStreaming: conn?.elapsed === 0,
send,
};
}

View File

@@ -0,0 +1,7 @@
import type { GrpcConnection } from '../lib/models';
import { useGrpcConnections } from './useGrpcConnections';
export function useLatestGrpcConnection(requestId: string | null): GrpcConnection | null {
const connections = useGrpcConnections(requestId);
return connections[0] ?? null;
}