Websockets for plugin runtime communication (#156)

This commit is contained in:
Gregory Schier
2025-01-20 10:55:53 -08:00
committed by GitHub
parent 095aaa5e92
commit b698a56549
54 changed files with 841 additions and 1185 deletions

View File

@@ -1,21 +1,14 @@
import type { InternalEvent } from '@yaakapp/api';
import EventEmitter from 'node:events';
import type { EventStreamEvent } from './gen/plugins/runtime';
import type { InternalEvent } from "@yaakapp/api";
import EventEmitter from "node:events";
export class EventChannel {
emitter: EventEmitter = new EventEmitter();
emit(e: InternalEvent) {
this.emitter.emit('__plugin_event__', { event: JSON.stringify(e) });
this.emitter.emit("__plugin_event__", e);
}
async *listen(): AsyncGenerator<EventStreamEvent> {
while (true) {
yield new Promise<EventStreamEvent>((resolve) => {
this.emitter.once('__plugin_event__', (event: EventStreamEvent) => {
resolve(event);
});
});
}
listen(cb: (e: InternalEvent) => void) {
this.emitter.on("__plugin_event__", cb);
}
}

View File

@@ -1,4 +1,4 @@
import type { BootRequest, InternalEvent } from '@yaakapp-internal/plugins';
import type { BootRequest, InternalEvent } from '@yaakapp/api';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
import type { EventChannel } from './EventChannel';

View File

@@ -1,126 +0,0 @@
// Code generated by protoc-gen-ts_proto. DO NOT EDIT.
// versions:
// protoc-gen-ts_proto v2.2.3
// protoc v3.19.1
// source: plugins/runtime.proto
/* eslint-disable */
import { BinaryReader, BinaryWriter } from "@bufbuild/protobuf/wire";
import { type CallContext, type CallOptions } from "nice-grpc-common";
export const protobufPackage = "yaak.plugins.runtime";
export interface EventStreamEvent {
event: string;
}
function createBaseEventStreamEvent(): EventStreamEvent {
return { event: "" };
}
export const EventStreamEvent: MessageFns<EventStreamEvent> = {
encode(message: EventStreamEvent, writer: BinaryWriter = new BinaryWriter()): BinaryWriter {
if (message.event !== "") {
writer.uint32(10).string(message.event);
}
return writer;
},
decode(input: BinaryReader | Uint8Array, length?: number): EventStreamEvent {
const reader = input instanceof BinaryReader ? input : new BinaryReader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = createBaseEventStreamEvent();
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1: {
if (tag !== 10) {
break;
}
message.event = reader.string();
continue;
}
}
if ((tag & 7) === 4 || tag === 0) {
break;
}
reader.skip(tag & 7);
}
return message;
},
fromJSON(object: any): EventStreamEvent {
return { event: isSet(object.event) ? globalThis.String(object.event) : "" };
},
toJSON(message: EventStreamEvent): unknown {
const obj: any = {};
if (message.event !== "") {
obj.event = message.event;
}
return obj;
},
create(base?: DeepPartial<EventStreamEvent>): EventStreamEvent {
return EventStreamEvent.fromPartial(base ?? {});
},
fromPartial(object: DeepPartial<EventStreamEvent>): EventStreamEvent {
const message = createBaseEventStreamEvent();
message.event = object.event ?? "";
return message;
},
};
export type PluginRuntimeDefinition = typeof PluginRuntimeDefinition;
export const PluginRuntimeDefinition = {
name: "PluginRuntime",
fullName: "yaak.plugins.runtime.PluginRuntime",
methods: {
eventStream: {
name: "EventStream",
requestType: EventStreamEvent,
requestStream: true,
responseType: EventStreamEvent,
responseStream: true,
options: {},
},
},
} as const;
export interface PluginRuntimeServiceImplementation<CallContextExt = {}> {
eventStream(
request: AsyncIterable<EventStreamEvent>,
context: CallContext & CallContextExt,
): ServerStreamingMethodResult<DeepPartial<EventStreamEvent>>;
}
export interface PluginRuntimeClient<CallOptionsExt = {}> {
eventStream(
request: AsyncIterable<DeepPartial<EventStreamEvent>>,
options?: CallOptions & CallOptionsExt,
): AsyncIterable<EventStreamEvent>;
}
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
export type DeepPartial<T> = T extends Builtin ? T
: T extends globalThis.Array<infer U> ? globalThis.Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>>
: T extends {} ? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
function isSet(value: any): boolean {
return value !== null && value !== undefined;
}
export type ServerStreamingMethodResult<Response> = { [Symbol.asyncIterator](): AsyncIterator<Response, void> };
export interface MessageFns<T> {
encode(message: T, writer?: BinaryWriter): BinaryWriter;
decode(input: BinaryReader | Uint8Array, length?: number): T;
fromJSON(object: any): T;
toJSON(message: T): unknown;
create(base?: DeepPartial<T>): T;
fromPartial(object: DeepPartial<T>): T;
}

View File

@@ -1,52 +1,52 @@
import type { InternalEvent } from '@yaakapp/api';
import { createChannel, createClient, Status } from 'nice-grpc';
import { EventChannel } from './EventChannel';
import type { PluginRuntimeClient} from './gen/plugins/runtime';
import { PluginRuntimeDefinition } from './gen/plugins/runtime';
import { PluginHandle } from './PluginHandle';
import WebSocket from 'ws';
const port = process.env.PORT || '50051';
const channel = createChannel(`localhost:${port}`, undefined, {
'grpc.max_receive_message_length': Number.MAX_SAFE_INTEGER,
'grpc.max_send_message_length': Number.MAX_SAFE_INTEGER,
});
const client: PluginRuntimeClient = createClient(PluginRuntimeDefinition, channel);
const port = process.env.YAAK_PLUGIN_SERVER_PORT || '9442';
const events = new EventChannel();
const plugins: Record<string, PluginHandle> = {};
(async () => {
const ws = new WebSocket(`ws://localhost:${port}`);
ws.on('message', async (e: Buffer) => {
try {
for await (const e of client.eventStream(events.listen())) {
const pluginEvent: InternalEvent = JSON.parse(e.event);
// Handle special event to bootstrap plugin
if (pluginEvent.payload.type === 'boot_request') {
const plugin = new PluginHandle(pluginEvent.pluginRefId, pluginEvent.payload, events);
plugins[pluginEvent.pluginRefId] = plugin;
}
// Once booted, forward all events to the plugin worker
const plugin = plugins[pluginEvent.pluginRefId];
if (!plugin) {
console.warn('Failed to get plugin for event by', pluginEvent.pluginRefId);
continue;
}
if (pluginEvent.payload.type === 'terminate_request') {
await plugin.terminate();
console.log('Terminated plugin worker', pluginEvent.pluginRefId);
delete plugins[pluginEvent.pluginRefId];
}
plugin.sendToWorker(pluginEvent);
}
console.log('Stream ended');
} catch (err: any) {
if (err.code === Status.CANCELLED) {
console.log('Stream was cancelled by server');
} else {
console.log('Client stream errored', err);
}
await handleIncoming(e.toString());
} catch (err) {
console.log('Failed to handle incoming plugin event', err);
}
})();
});
ws.on('open', (e) => console.log('Plugin runtime connected to websocket', e));
ws.on('error', (e) => console.error('Plugin runtime websocket error', e));
ws.on('close', (e) => console.log('Plugin runtime websocket closed', e));
// Listen for incoming events from plugins
events.listen((e) => {
const eventStr = JSON.stringify(e);
ws.send(eventStr);
});
async function handleIncoming(msg: string) {
const pluginEvent: InternalEvent = JSON.parse(msg);
// Handle special event to bootstrap plugin
if (pluginEvent.payload.type === 'boot_request') {
const plugin = new PluginHandle(pluginEvent.pluginRefId, pluginEvent.payload, events);
plugins[pluginEvent.pluginRefId] = plugin;
}
// Once booted, forward all events to the plugin worker
const plugin = plugins[pluginEvent.pluginRefId];
if (!plugin) {
console.warn('Failed to get plugin for event by', pluginEvent.pluginRefId);
return;
}
if (pluginEvent.payload.type === 'terminate_request') {
await plugin.terminate();
console.log('Terminated plugin worker', pluginEvent.pluginRefId);
delete plugins[pluginEvent.pluginRefId];
}
plugin.sendToWorker(pluginEvent);
}

View File

@@ -1,27 +1,25 @@
import type {
BootRequest,
Context,
FindHttpResponsesResponse,
GetHttpRequestByIdResponse,
HttpRequestAction,
ImportResponse,
InternalEvent,
InternalEventPayload,
PluginDefinition,
PromptTextResponse,
RenderHttpRequestResponse,
SendHttpRequestResponse,
TemplateFunction,
TemplateRenderResponse,
WindowContext,
} from '@yaakapp-internal/plugins';
import type { Context } from '@yaakapp/api';
import type { HttpRequestActionPlugin } from '@yaakapp/api/lib/plugins/HttpRequestActionPlugin';
import type { TemplateFunctionPlugin } from '@yaakapp/api/lib/plugins/TemplateFunctionPlugin';
import interceptStdout from 'intercept-stdout';
} from '@yaakapp/api';
import * as console from 'node:console';
import type { Stats } from 'node:fs';
import { readFileSync, statSync, watch } from 'node:fs';
import path from 'node:path';
import * as util from 'node:util';
import { interceptStdout } from './interceptStdout';
import { parentPort, workerData } from 'node:worker_threads';
export interface PluginWorkerData {
@@ -29,40 +27,32 @@ export interface PluginWorkerData {
pluginRefId: string;
}
async function initialize() {
function initialize(workerData: PluginWorkerData) {
const {
bootRequest: { dir: pluginDir, watch: enableWatch },
pluginRefId,
}: PluginWorkerData = workerData;
const pathPkg = path.join(pluginDir, 'package.json');
const pathMod = path.posix.join(pluginDir, 'build', 'index.js');
async function importModule() {
const id = require.resolve(pathMod);
delete require.cache[id];
return require(id);
}
const pkg = JSON.parse(readFileSync(pathPkg, 'utf8'));
prefixStdout(`[plugin][${pkg.name}] %s`);
let mod = await importModule();
const capabilities: string[] = [];
if (typeof mod.pluginHookExport === 'function') capabilities.push('export');
if (typeof mod.pluginHookImport === 'function') capabilities.push('import');
if (typeof mod.pluginHookResponseFilter === 'function') capabilities.push('filter');
console.log('Plugin initialized', pkg.name, { capabilities, enableWatch });
function buildEventToSend(
windowContext: WindowContext,
payload: InternalEventPayload,
replyId: string | null = null,
): InternalEvent {
return { pluginRefId, id: genId(), replyId, payload, windowContext };
return {
pluginRefId,
pluginName: path.basename(pluginDir),
id: genId(),
replyId,
payload,
windowContext,
};
}
function sendEmpty(windowContext: WindowContext, replyId: string | null = null): string {
@@ -86,7 +76,7 @@ async function initialize() {
parentPort!.postMessage(event);
}
async function sendAndWaitForReply<T extends Omit<InternalEventPayload, 'type'>>(
function sendAndWaitForReply<T extends Omit<InternalEventPayload, 'type'>>(
windowContext: WindowContext,
payload: InternalEventPayload,
): Promise<T> {
@@ -94,7 +84,7 @@ async function initialize() {
const eventToSend = buildEventToSend(windowContext, payload, null);
// 2. Spawn listener in background
const promise = new Promise<InternalEventPayload>(async (resolve) => {
const promise = new Promise<InternalEventPayload>((resolve) => {
const cb = (event: InternalEvent) => {
if (event.replyId === eventToSend.id) {
parentPort!.off('message', cb); // Unlisten, now that we're done
@@ -111,31 +101,33 @@ async function initialize() {
return promise as unknown as Promise<T>;
}
async function reloadModule() {
mod = await importModule();
}
// Reload plugin if JS or package.json changes
// Reload plugin if the JS or package.json changes
const windowContextNone: WindowContext = { type: 'none' };
const cb = async () => {
await reloadModule();
const fileChangeCallback = async () => {
await importModule();
return sendPayload(windowContextNone, { type: 'reload_response' }, null);
};
if (enableWatch) {
watchFile(pathMod, cb);
watchFile(pathPkg, cb);
watchFile(pathMod, fileChangeCallback);
watchFile(pathPkg, fileChangeCallback);
}
const newCtx = (event: InternalEvent): Context => ({
clipboard: {
async copyText(text) {
await sendAndWaitForReply(event.windowContext, { type: 'copy_text_request', text });
await sendAndWaitForReply(event.windowContext, {
type: 'copy_text_request',
text,
});
},
},
toast: {
async show(args) {
await sendAndWaitForReply(event.windowContext, { type: 'show_toast_request', ...args });
await sendAndWaitForReply(event.windowContext, {
type: 'show_toast_request',
...args,
});
},
},
prompt: {
@@ -149,7 +141,10 @@ async function initialize() {
},
httpResponse: {
async find(args) {
const payload = { type: 'find_http_responses_request', ...args } as const;
const payload = {
type: 'find_http_responses_request',
...args,
} as const;
const { httpResponses } = await sendAndWaitForReply<FindHttpResponsesResponse>(
event.windowContext,
payload,
@@ -159,7 +154,10 @@ async function initialize() {
},
httpRequest: {
async getById(args) {
const payload = { type: 'get_http_request_by_id_request', ...args } as const;
const payload = {
type: 'get_http_request_by_id_request',
...args,
} as const;
const { httpRequest } = await sendAndWaitForReply<GetHttpRequestByIdResponse>(
event.windowContext,
payload,
@@ -167,7 +165,10 @@ async function initialize() {
return httpRequest;
},
async send(args) {
const payload = { type: 'send_http_request_request', ...args } as const;
const payload = {
type: 'send_http_request_request',
...args,
} as const;
const { httpResponse } = await sendAndWaitForReply<SendHttpRequestResponse>(
event.windowContext,
payload,
@@ -175,7 +176,10 @@ async function initialize() {
return httpResponse;
},
async render(args) {
const payload = { type: 'render_http_request_request', ...args } as const;
const payload = {
type: 'render_http_request_request',
...args,
} as const;
const { httpRequest } = await sendAndWaitForReply<RenderHttpRequestResponse>(
event.windowContext,
payload,
@@ -187,7 +191,7 @@ async function initialize() {
/**
* Invoke Yaak's template engine to render a value. If the value is a nested type
* (eg. object), it will be recursively rendered.
* */
*/
async render(args) {
const payload = { type: 'template_render_request', ...args } as const;
const result = await sendAndWaitForReply<TemplateRenderResponse>(
@@ -199,17 +203,35 @@ async function initialize() {
},
});
let plug: PluginDefinition | null = null;
function importModule() {
const id = require.resolve(pathMod);
delete require.cache[id];
plug = require(id).plugin;
}
importModule();
if (pkg.name?.includes('yaak-faker')) {
sendPayload(
{ type: 'none' },
{ type: 'error_response', error: 'Failed to initialize Faker plugin' },
null,
);
return;
}
// Message comes into the plugin to be processed
parentPort!.on('message', async (event: InternalEvent) => {
const { windowContext, payload, id: replyId } = event;
const ctx = newCtx(event);
const { windowContext, payload, id: replyId } = event;
try {
if (payload.type === 'boot_request') {
// console.log('Plugin initialized', pkg.name, { capabilities, enableWatch });
const payload: InternalEventPayload = {
type: 'boot_response',
name: pkg.name,
version: pkg.version,
capabilities,
};
sendPayload(windowContext, payload, replyId);
return;
@@ -223,12 +245,15 @@ async function initialize() {
return;
}
if (payload.type === 'import_request' && typeof mod.pluginHookImport === 'function') {
const reply: ImportResponse | null = await mod.pluginHookImport(ctx, payload.content);
if (payload.type === 'import_request' && typeof plug?.importer?.onImport === 'function') {
const reply = await plug.importer.onImport(ctx, {
text: payload.content,
});
if (reply != null) {
const replyPayload: InternalEventPayload = {
type: 'import_response',
resources: reply?.resources,
// deno-lint-ignore no-explicit-any
resources: reply.resources as any,
};
sendPayload(windowContext, replyPayload, replyId);
return;
@@ -237,27 +262,15 @@ async function initialize() {
}
}
if (
payload.type === 'export_http_request_request' &&
typeof mod.pluginHookExport === 'function'
) {
const reply: string = await mod.pluginHookExport(ctx, payload.httpRequest);
const replyPayload: InternalEventPayload = {
type: 'export_http_request_response',
content: reply,
};
sendPayload(windowContext, replyPayload, replyId);
return;
}
if (payload.type === 'filter_request' && typeof mod.pluginHookResponseFilter === 'function') {
const reply: string = await mod.pluginHookResponseFilter(ctx, {
if (payload.type === 'filter_request' && typeof plug?.filter?.onFilter === 'function') {
const reply = await plug.filter.onFilter(ctx, {
filter: payload.filter,
body: payload.content,
payload: payload.content,
mimeType: payload.type,
});
const replyPayload: InternalEventPayload = {
type: 'filter_response',
content: reply,
content: reply.filtered,
};
sendPayload(windowContext, replyPayload, replyId);
return;
@@ -265,15 +278,13 @@ async function initialize() {
if (
payload.type === 'get_http_request_actions_request' &&
Array.isArray(mod.plugin?.httpRequestActions)
Array.isArray(plug?.httpRequestActions)
) {
const reply: HttpRequestAction[] = mod.plugin.httpRequestActions.map(
(a: HttpRequestActionPlugin) => ({
...a,
// Add everything except onSelect
onSelect: undefined,
}),
);
const reply: HttpRequestAction[] = plug.httpRequestActions.map((a) => ({
...a,
// Add everything except onSelect
onSelect: undefined,
}));
const replyPayload: InternalEventPayload = {
type: 'get_http_request_actions_response',
pluginRefId,
@@ -285,15 +296,13 @@ async function initialize() {
if (
payload.type === 'get_template_functions_request' &&
Array.isArray(mod.plugin?.templateFunctions)
Array.isArray(plug?.templateFunctions)
) {
const reply: TemplateFunction[] = mod.plugin.templateFunctions.map(
(a: TemplateFunctionPlugin) => ({
...a,
// Add everything except render
onRender: undefined,
}),
);
const reply: TemplateFunction[] = plug.templateFunctions.map((a) => ({
...a,
// Add everything except render
onRender: undefined,
}));
const replyPayload: InternalEventPayload = {
type: 'get_template_functions_response',
pluginRefId,
@@ -303,21 +312,19 @@ async function initialize() {
return;
}
if (payload.type === 'get_http_authentication_request' && mod.plugin?.authentication) {
const auth = mod.plugin.authentication;
if (payload.type === 'get_http_authentication_request' && plug?.authentication) {
const { onApply: _, ...auth } = plug.authentication;
const replyPayload: InternalEventPayload = {
...auth,
type: 'get_http_authentication_response',
// Remove unneeded attrs
onApply: undefined,
};
sendPayload(windowContext, replyPayload, replyId);
return;
}
if (payload.type === 'call_http_authentication_request' && mod.plugin?.authentication) {
const auth = mod.plugin.authentication;
if (payload.type === 'call_http_authentication_request' && plug?.authentication) {
const auth = plug.authentication;
if (typeof auth?.onApply === 'function') {
const result = await auth.onApply(ctx, payload);
sendPayload(
@@ -334,11 +341,9 @@ async function initialize() {
if (
payload.type === 'call_http_request_action_request' &&
Array.isArray(mod.plugin?.httpRequestActions)
Array.isArray(plug?.httpRequestActions)
) {
const action = mod.plugin.httpRequestActions.find(
(a: HttpRequestActionPlugin) => a.key === payload.key,
);
const action = plug.httpRequestActions.find((a) => a.key === payload.key);
if (typeof action?.onSelect === 'function') {
await action.onSelect(ctx, payload.args);
sendEmpty(windowContext, replyId);
@@ -348,11 +353,9 @@ async function initialize() {
if (
payload.type === 'call_template_function_request' &&
Array.isArray(mod.plugin?.templateFunctions)
Array.isArray(plug?.templateFunctions)
) {
const action = mod.plugin.templateFunctions.find(
(a: TemplateFunctionPlugin) => a.name === payload.name,
);
const action = plug.templateFunctions.find((a) => a.name === payload.name);
if (typeof action?.onRender === 'function') {
const result = await action.onRender(ctx, payload.args);
sendPayload(
@@ -368,10 +371,18 @@ async function initialize() {
}
if (payload.type === 'reload_request') {
await reloadModule();
await importModule();
}
} catch (err) {
console.log('Plugin call threw exception', payload.type, err);
sendPayload(
windowContext,
{
type: 'error_response',
error: `${err}`,
},
replyId,
);
// TODO: Return errors to server
}
@@ -380,9 +391,7 @@ async function initialize() {
});
}
initialize().catch((err) => {
console.log('failed to boot plugin', err);
});
initialize(workerData);
function genId(len = 5): string {
const alphabet = '01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
@@ -397,7 +406,7 @@ function prefixStdout(s: string) {
if (!s.includes('%s')) {
throw new Error('Console prefix must contain a "%s" replacer');
}
interceptStdout((text) => {
interceptStdout((text: string) => {
const lines = text.split(/\n/);
let newText = '';
for (let i = 0; i < lines.length; i++) {
@@ -413,11 +422,11 @@ const watchedFiles: Record<string, Stats> = {};
/**
* Watch a file and trigger callback on change.
*
* We also track the stat for each file because fs.watch will
* We also track the stat for each file because fs.watch() will
* trigger a "change" event when the access date changes
*/
function watchFile(filepath: string, cb: (filepath: string) => void) {
watch(filepath, (_event, _name) => {
watch(filepath, () => {
const stat = statSync(filepath);
if (stat.mtimeMs !== watchedFiles[filepath]?.mtimeMs) {
cb(filepath);

View File

@@ -0,0 +1,37 @@
import process from "node:process";
export function interceptStdout(
intercept: (text: string) => string,
) {
const old_stdout_write = process.stdout.write;
const old_stderr_write = process.stderr.write;
process.stdout.write = (function (write) {
return function (text: string) {
arguments[0] = interceptor(text, intercept);
// deno-lint-ignore no-explicit-any
write.apply(process.stdout, arguments as any);
return true;
};
})(process.stdout.write);
process.stderr.write = (function (write) {
return function (text: string) {
arguments[0] = interceptor(text, intercept);
// deno-lint-ignore no-explicit-any
write.apply(process.stderr, arguments as any);
return true;
};
})(process.stderr.write);
// puts back to original
return function unhook() {
process.stdout.write = old_stdout_write;
process.stderr.write = old_stderr_write;
};
}
function interceptor(text: string, fn: (text: string) => string) {
return fn(text).replace(/\n$/, "") +
(fn(text) && /\n$/.test(text) ? "\n" : "");
}