More messages

This commit is contained in:
Gregory Schier
2024-02-04 19:08:31 -08:00
parent 07cad2e337
commit acb01cf086
5 changed files with 178 additions and 106 deletions

View File

@@ -39,11 +39,6 @@ impl GrpcConnection {
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
println!(
"\n---------- SENDING -----------------\n{}",
serde_json::to_string_pretty(&req_message).expect("json")
);
let req = req_message.into_request();
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -52,7 +47,6 @@ impl GrpcConnection {
let resp = client.unary(req, path, codec).await.unwrap();
let msg = resp.into_inner();
let response_json = serde_json::to_string_pretty(&msg).expect("json to string");
println!("\n---------- RECEIVING ---------------\n{}", response_json,);
Ok(response_json)
}
@@ -140,11 +134,6 @@ impl GrpcConnection {
let mut client = tonic::client::Grpc::with_origin(self.conn.clone(), self.uri.clone());
println!(
"\n---------- SENDING -----------------\n{}",
serde_json::to_string_pretty(&req_message).expect("json")
);
let req = req_message.into_request();
let path = method_desc_to_path(method);
let codec = DynamicCodec::new(method.clone());
@@ -184,7 +173,6 @@ impl GrpcManager {
method: &str,
message: &str,
) -> Result<Streaming<DynamicMessage>> {
println!("Server streaming {}", id);
self.connect(id, uri)
.await
.server_streaming(service, method, message)

View File

@@ -182,7 +182,6 @@ async fn cmd_grpc_client_streaming(
app_handle: AppHandle<Wry>,
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<GrpcConnection, String> {
println!("CLIENT STREAMING");
let db = &*db_state.lock().await;
let req = get_grpc_request(db, request_id)
.await
@@ -200,7 +199,6 @@ async fn cmd_grpc_client_streaming(
.await
.map_err(|e| e.to_string())?
};
println!("CREATED CONN: {}", conn.clone().id);
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
@@ -300,12 +298,9 @@ async fn cmd_grpc_client_streaming(
});
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
maybe_in_msg_tx.take();
// in_msg_stream.close();
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -324,7 +319,6 @@ async fn cmd_grpc_client_streaming(
async move {
let grpc_handle = app_handle.state::<Mutex<GrpcManager>>();
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
println!("STARTING CLIENT STREAM");
let msg = grpc_handle
.lock()
.await
@@ -352,18 +346,34 @@ async fn cmd_grpc_client_streaming(
}
};
println!("ENDED CLIENT STREAM");
{
let conn = conn.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
@@ -440,7 +450,6 @@ async fn cmd_grpc_streaming(
};
let (in_msg_tx, in_msg_rx) = tauri::async_runtime::channel::<String>(16);
let _maybe_in_msg_tx = Mutex::new(Some(in_msg_tx.clone()));
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let uri = safe_uri(&req.url).map_err(|e| e.to_string())?;
@@ -465,7 +474,6 @@ async fn cmd_grpc_streaming(
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Commit,
Cancel,
}
@@ -508,12 +516,7 @@ async fn cmd_grpc_streaming(
);
});
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -574,12 +577,29 @@ async fn cmd_grpc_streaming(
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
@@ -617,6 +637,7 @@ async fn cmd_grpc_server_streaming(
let req = get_grpc_request(db, request_id)
.await
.map_err(|e| e.to_string())?;
let conn = {
let req = req.clone();
upsert_grpc_connection(
@@ -632,6 +653,28 @@ async fn cmd_grpc_server_streaming(
};
emit_side_effect(app_handle.clone(), "created_model", conn.clone());
{
let req = req.clone();
let conn = conn.clone();
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Initiating connection".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
}
let (cancelled_tx, mut cancelled_rx) = tokio::sync::watch::channel(false);
let (service, method) = match (&req.service, &req.method) {
@@ -649,8 +692,6 @@ async fn cmd_grpc_server_streaming(
#[derive(serde::Deserialize)]
enum IncomingMsg {
Message(String),
Commit,
Cancel,
}
@@ -664,15 +705,7 @@ async fn cmd_grpc_server_streaming(
}
match serde_json::from_str::<IncomingMsg>(ev.payload().unwrap()) {
Ok(IncomingMsg::Message(msg)) => {
println!("Received message: {}", msg);
}
Ok(IncomingMsg::Commit) => {
println!("Received commit");
// TODO: Commit client streaming stream
}
Ok(IncomingMsg::Cancel) => {
println!("Received cancel");
cancelled_tx.send_replace(true);
}
Err(e) => {
@@ -688,6 +721,7 @@ async fn cmd_grpc_server_streaming(
let db = db.clone();
let conn_id = conn.clone().id;
let app_handle = app_handle.clone();
let req = req.clone();
async move {
loop {
let req = req.clone();
@@ -724,17 +758,57 @@ async fn cmd_grpc_server_streaming(
}
};
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
debug!("gRPC listen finished");
},
_ = cancelled_rx.changed() => {
debug!("gRPC connection cancelled");
},
}
app_handle.unlisten(event_handler);
});
{
let conn = conn.clone();
let req = req.clone();
tauri::async_runtime::spawn(async move {
tokio::select! {
_ = grpc_listen => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection completed".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
_ = cancelled_rx.changed() => {
let db_state = app_handle.state::<Mutex<Pool<Sqlite>>>();
let db = &*db_state.lock().await;
emit_side_effect(
app_handle.clone(),
"created_model",
upsert_grpc_message(
&db,
&GrpcMessage {
message: "Connection cancelled".to_string(),
workspace_id: req.workspace_id,
request_id: req.id,
connection_id: conn.id,
is_info: true,
..Default::default()
},
)
.await
.expect("Failed to upsert message"),
);
},
}
app_handle.unlisten(event_handler);
});
}
Ok(conn)
}
@@ -1087,7 +1161,6 @@ async fn cmd_update_cookie_jar(
db_state: State<'_, Mutex<Pool<Sqlite>>>,
) -> Result<CookieJar, String> {
let db = &*db_state.lock().await;
println!("Updating cookie jar {}", cookie_jar.cookies.len());
let updated = upsert_cookie_jar(db, &cookie_jar)
.await

View File

@@ -44,14 +44,6 @@ export function GrpcConnectionLayout({ style }: Props) {
return s.methods.find((m) => m.name === activeRequest.method);
}, [activeRequest, grpc.services]);
const handleCancel = useCallback(() => {
grpc.cancel.mutateAsync().catch(console.error);
}, [grpc.cancel]);
const handleCommit = useCallback(() => {
grpc.commit.mutateAsync().catch(console.error);
}, [grpc.commit]);
const handleConnect = useCallback(
async (e: FormEvent) => {
e.preventDefault();
@@ -71,7 +63,8 @@ export function GrpcConnectionLayout({ style }: Props) {
} else if (activeMethod.clientStreaming && !activeMethod.serverStreaming) {
await grpc.clientStreaming.mutateAsync(activeRequest.id);
} else {
await grpc.unary.mutateAsync(activeRequest.id);
const msg = await grpc.unary.mutateAsync(activeRequest.id);
setActiveMessageId(msg.id);
}
},
[
@@ -148,6 +141,15 @@ export function GrpcConnectionLayout({ style }: Props) {
[activeMessageId, messages],
);
const messageType: 'unary' | 'server_streaming' | 'client_streaming' | 'streaming' =
useMemo(() => {
if (activeMethod == null) return 'unary'; // Good enough
if (activeMethod.clientStreaming && activeMethod.serverStreaming) return 'streaming';
if (activeMethod.clientStreaming) return 'client_streaming';
if (activeMethod.serverStreaming) return 'server_streaming';
return 'unary';
}, [activeMethod]);
if (activeRequest == null) {
return null;
}
@@ -201,21 +203,40 @@ export function GrpcConnectionLayout({ style }: Props) {
<IconButton
className="border border-highlight"
size="sm"
title="to-do"
title={messageType === 'unary' ? 'Send' : 'Connect'}
hotkeyAction={grpc.isStreaming ? undefined : 'http_request.send'}
onClick={grpc.isStreaming ? handleCancel : handleConnect}
onClick={grpc.isStreaming ? () => grpc.cancel.mutateAsync() : handleConnect}
disabled={grpc.isStreaming}
spin={grpc.isStreaming || grpc.unary.isLoading}
icon={
grpc.isStreaming
? 'x'
: !activeMethod?.clientStreaming && activeMethod?.serverStreaming
? 'arrowDownToDot'
: activeMethod?.clientStreaming && !activeMethod?.serverStreaming
? 'arrowUpFromDot'
: activeMethod?.clientStreaming && activeMethod?.serverStreaming
? 'arrowUpDown'
: 'sendHorizontal'
? 'refresh'
: messageType === 'unary'
? 'sendHorizontal'
: 'arrowUpDown'
}
/>
{grpc.isStreaming && (
<IconButton
className="border border-highlight"
size="sm"
title="Cancel"
onClick={() => grpc.cancel.mutateAsync()}
icon="x"
disabled={!grpc.isStreaming}
/>
)}
{activeMethod?.clientStreaming &&
!activeMethod.serverStreaming &&
grpc.isStreaming && (
<IconButton
className="border border-highlight"
size="sm"
title="to-do"
onClick={() => grpc.commit.mutateAsync()}
icon="check"
/>
)}
{activeMethod?.clientStreaming && grpc.isStreaming && (
<IconButton
className="border border-highlight"
@@ -226,17 +247,6 @@ export function GrpcConnectionLayout({ style }: Props) {
icon="sendHorizontal"
/>
)}
{activeMethod?.clientStreaming &&
!activeMethod.serverStreaming &&
grpc.isStreaming && (
<IconButton
className="border border-highlight"
size="sm"
title="to-do"
onClick={handleCommit}
icon="check"
/>
)}
</HStack>
</div>
<GrpcEditor
@@ -265,11 +275,17 @@ export function GrpcConnectionLayout({ style }: Props) {
</Banner>
) : messages.length >= 0 ? (
<SplitLayout
name="grpc_messages"
name={
!activeMethod?.clientStreaming && !activeMethod?.serverStreaming
? 'grpc_messages_unary'
: 'grpc_messages_streaming'
}
defaultRatio={
!activeMethod?.clientStreaming && !activeMethod?.serverStreaming ? 0.75 : 0.3
}
minHeightPx={20}
defaultRatio={0.25}
firstSlot={() => (
<div className="overflow-y-auto">
<div className="overflow-y-auto w-full">
{...messages.map((m) => (
<HStack
key={m.id}

View File

@@ -5,24 +5,32 @@ import { memo } from 'react';
const icons = {
archive: lucide.ArchiveIcon,
arrowBigDownDash: lucide.ArrowBigDownDashIcon,
arrowBigUpDash: lucide.ArrowBigUpDashIcon,
arrowDown: lucide.ArrowDownIcon,
arrowDownToDot: lucide.ArrowDownToDotIcon,
arrowUp: lucide.ArrowUpIcon,
arrowUpDown: lucide.ArrowUpDownIcon,
arrowUpFromDot: lucide.ArrowUpFromDotIcon,
box: lucide.BoxIcon,
cake: lucide.CakeIcon,
chat: lucide.MessageSquare,
check: lucide.CheckIcon,
chevronDown: lucide.ChevronDownIcon,
chevronRight: lucide.ChevronRightIcon,
cookie: lucide.CookieIcon,
code: lucide.CodeIcon,
cookie: lucide.CookieIcon,
copy: lucide.CopyIcon,
download: lucide.DownloadIcon,
folderInput: lucide.FolderInputIcon,
folderOutput: lucide.FolderOutputIcon,
externalLink: lucide.ExternalLinkIcon,
eye: lucide.EyeIcon,
eyeClosed: lucide.EyeOffIcon,
filter: lucide.FilterIcon,
flask: lucide.FlaskConicalIcon,
folderInput: lucide.FolderInputIcon,
folderOutput: lucide.FolderOutputIcon,
gripVertical: lucide.GripVerticalIcon,
info: lucide.InfoIcon,
keyboard: lucide.KeyboardIcon,
leftPanelHidden: lucide.PanelLeftOpenIcon,
leftPanelVisible: lucide.PanelLeftCloseIcon,
@@ -33,6 +41,7 @@ const icons = {
plus: lucide.PlusIcon,
plusCircle: lucide.PlusCircleIcon,
question: lucide.ShieldQuestionIcon,
refresh: lucide.RefreshCwIcon,
sendHorizontal: lucide.SendHorizonalIcon,
settings2: lucide.Settings2Icon,
settings: lucide.SettingsIcon,
@@ -40,14 +49,6 @@ const icons = {
trash: lucide.TrashIcon,
update: lucide.RefreshCcwIcon,
upload: lucide.UploadIcon,
arrowUpFromDot: lucide.ArrowUpFromDotIcon,
arrowDownToDot: lucide.ArrowDownToDotIcon,
arrowUpDown: lucide.ArrowUpDownIcon,
arrowDown: lucide.ArrowDownIcon,
arrowUp: lucide.ArrowUpIcon,
arrowBigDownDash: lucide.ArrowBigDownDashIcon,
arrowBigUpDash: lucide.ArrowBigUpDashIcon,
info: lucide.InfoIcon,
x: lucide.XIcon,
empty: (props: HTMLAttributes<HTMLSpanElement>) => <span {...props} />,

View File

@@ -29,7 +29,6 @@ export function useGrpc(url: string | null, requestId: string | null) {
requestId: id,
})) as GrpcMessage;
await messages.set([message]);
console.log('MESSAGE', message);
return message;
},
});
@@ -40,7 +39,6 @@ export function useGrpc(url: string | null, requestId: string | null) {
if (url === null) throw new Error('No URL provided');
await messages.set([]);
const c = (await invoke('cmd_grpc_client_streaming', { requestId })) as GrpcConnection;
console.log('GOT CONNECTION', c);
setActiveConnectionId(c.id);
},
});
@@ -72,10 +70,6 @@ export function useGrpc(url: string | null, requestId: string | null) {
mutationFn: async ({ message }: { message: string }) => {
if (activeConnectionId == null) throw new Error('No active connection');
await messages.set([]);
// await messages.set((m) => {
// return [...m, { type: 'client', message, timestamp: new Date().toISOString() }];
// });
console.log('SENDING', activeConnectionId);
await emit(`grpc_client_msg_${activeConnectionId}`, { Message: message });
},
});