mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-21 00:49:45 +01:00
NodeJS Plugin Runtime (#53)
This commit is contained in:
90
plugin-runtime/src/PluginHandle.ts
Normal file
90
plugin-runtime/src/PluginHandle.ts
Normal file
@@ -0,0 +1,90 @@
|
||||
import { randomUUID } from 'node:crypto';
|
||||
import { Worker } from 'node:worker_threads';
|
||||
import { PluginInfo } from './plugins';
|
||||
|
||||
export interface ParentToWorkerEvent<T = any> {
|
||||
callbackId: string;
|
||||
name: string;
|
||||
payload: T;
|
||||
}
|
||||
|
||||
export type WorkerToParentSuccessEvent<T> = {
|
||||
callbackId: string;
|
||||
payload: T;
|
||||
};
|
||||
|
||||
export type WorkerToParentErrorEvent = {
|
||||
callbackId: string;
|
||||
error: string;
|
||||
};
|
||||
|
||||
export type WorkerToParentEvent<T = any> = WorkerToParentErrorEvent | WorkerToParentSuccessEvent<T>;
|
||||
|
||||
export class PluginHandle {
|
||||
readonly pluginDir: string;
|
||||
readonly #worker: Worker;
|
||||
|
||||
constructor({ pluginDir, workerJsPath }: { pluginDir: string; workerJsPath: string }) {
|
||||
this.pluginDir = pluginDir;
|
||||
|
||||
this.#worker = new Worker(workerJsPath, {
|
||||
workerData: {
|
||||
pluginDir: this.pluginDir,
|
||||
},
|
||||
});
|
||||
|
||||
this.#worker.on('error', this.#handleError.bind(this));
|
||||
this.#worker.on('exit', this.#handleExit.bind(this));
|
||||
}
|
||||
|
||||
async getInfo(): Promise<PluginInfo> {
|
||||
return this.#callPlugin('info', null);
|
||||
}
|
||||
|
||||
async runResponseFilter({ filter, body }: { filter: string; body: string }): Promise<string> {
|
||||
return this.#callPlugin('run-filter', { filter, body });
|
||||
}
|
||||
|
||||
async runExport(request: any): Promise<string> {
|
||||
return this.#callPlugin('run-export', request);
|
||||
}
|
||||
|
||||
async runImport(data: string): Promise<string> {
|
||||
const result = await this.#callPlugin('run-import', data);
|
||||
|
||||
// Plugin returns object, but we convert to string
|
||||
return JSON.stringify(result, null, 2);
|
||||
}
|
||||
|
||||
#callPlugin<P, R>(name: string, payload: P): Promise<R> {
|
||||
const callbackId = `cb_${randomUUID().replaceAll('-', '')}`;
|
||||
return new Promise((resolve, reject) => {
|
||||
const cb = (e: WorkerToParentEvent<R>) => {
|
||||
if (e.callbackId !== callbackId) return;
|
||||
|
||||
if ('error' in e) {
|
||||
reject(e.error);
|
||||
} else {
|
||||
resolve(e.payload as R);
|
||||
}
|
||||
|
||||
this.#worker.removeListener('message', cb);
|
||||
};
|
||||
|
||||
this.#worker.addListener('message', cb);
|
||||
this.#worker.postMessage({ callbackId, name, payload });
|
||||
});
|
||||
}
|
||||
|
||||
async #handleError(err: Error) {
|
||||
console.error('PLUGIN ERROR', this.pluginDir, err);
|
||||
}
|
||||
|
||||
async #handleExit(code: number) {
|
||||
if (code === 0) {
|
||||
console.log('PLUGIN EXITED SUCCESSFULLY');
|
||||
} else {
|
||||
console.log('PLUGIN EXITED CODE', code);
|
||||
}
|
||||
}
|
||||
}
|
||||
61
plugin-runtime/src/PluginManager.ts
Normal file
61
plugin-runtime/src/PluginManager.ts
Normal file
@@ -0,0 +1,61 @@
|
||||
import { existsSync, writeFileSync } from 'node:fs';
|
||||
import { tmpdir } from 'node:os';
|
||||
import path from 'node:path';
|
||||
import { getAsset } from 'node:sea';
|
||||
import { PluginHandle } from './PluginHandle';
|
||||
import { loadPlugins, PluginInfo } from './plugins';
|
||||
|
||||
export class PluginManager {
|
||||
#handles: PluginHandle[] | null = null;
|
||||
static #instance: PluginManager | null = null;
|
||||
static #workerPath = path.join(tmpdir(), `index.${Math.random()}.worker.js`);
|
||||
|
||||
public static instance(): PluginManager {
|
||||
if (PluginManager.#instance == null) {
|
||||
PluginManager.#instance = new PluginManager();
|
||||
PluginManager.#instance.plugins(); // Trigger workers to boot, as it takes a few seconds
|
||||
}
|
||||
return PluginManager.#instance;
|
||||
}
|
||||
|
||||
async plugins(): Promise<PluginHandle[]> {
|
||||
await this.#ensureWorkerForSea();
|
||||
this.#handles = this.#handles ?? loadPlugins(PluginManager.#workerPath);
|
||||
return this.#handles;
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy worker JS asset to filesystem if we're in single-executable-application (SEA)
|
||||
* @private
|
||||
*/
|
||||
async #ensureWorkerForSea() {
|
||||
if (existsSync(PluginManager.#workerPath)) return;
|
||||
|
||||
console.log('Writing worker file to', PluginManager.#workerPath);
|
||||
writeFileSync(PluginManager.#workerPath, getAsset('worker', 'utf8'));
|
||||
}
|
||||
|
||||
async #pluginsWithInfo(): Promise<{ plugin: PluginHandle; info: PluginInfo }[]> {
|
||||
const plugins = await this.plugins();
|
||||
return Promise.all(plugins.map(async (plugin) => ({ plugin, info: await plugin.getInfo() })));
|
||||
}
|
||||
|
||||
async pluginsWith(capability: PluginInfo['capabilities'][0]): Promise<PluginHandle[]> {
|
||||
return (await this.#pluginsWithInfo())
|
||||
.filter((v) => v.info.capabilities.includes(capability))
|
||||
.map((v) => v.plugin);
|
||||
}
|
||||
|
||||
async plugin(name: string): Promise<PluginHandle | null> {
|
||||
return (await this.#pluginsWithInfo()).find((v) => v.info.name === name)?.plugin ?? null;
|
||||
}
|
||||
|
||||
async pluginOrThrow(name: string): Promise<PluginHandle> {
|
||||
const plugin = await this.plugin(name);
|
||||
if (plugin == null) {
|
||||
throw new Error(`Failed to find plugin by ${name}`);
|
||||
}
|
||||
|
||||
return plugin;
|
||||
}
|
||||
}
|
||||
432
plugin-runtime/src/gen/plugins/runtime.ts
Normal file
432
plugin-runtime/src/gen/plugins/runtime.ts
Normal file
@@ -0,0 +1,432 @@
|
||||
// Code generated by protoc-gen-ts_proto. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-ts_proto v1.180.0
|
||||
// protoc v3.19.1
|
||||
// source: plugins/runtime.proto
|
||||
|
||||
/* eslint-disable */
|
||||
import { type CallContext, type CallOptions } from "nice-grpc-common";
|
||||
import * as _m0 from "protobufjs/minimal";
|
||||
|
||||
export const protobufPackage = "yaak.plugins.runtime";
|
||||
|
||||
export interface PluginInfo {
|
||||
plugin: string;
|
||||
}
|
||||
|
||||
export interface HookResponse {
|
||||
info: PluginInfo | undefined;
|
||||
data: string;
|
||||
}
|
||||
|
||||
export interface HookImportRequest {
|
||||
data: string;
|
||||
}
|
||||
|
||||
export interface HookResponseFilterRequest {
|
||||
filter: string;
|
||||
body: string;
|
||||
contentType: string;
|
||||
}
|
||||
|
||||
export interface HookExportRequest {
|
||||
request: string;
|
||||
}
|
||||
|
||||
function createBasePluginInfo(): PluginInfo {
|
||||
return { plugin: "" };
|
||||
}
|
||||
|
||||
export const PluginInfo = {
|
||||
encode(message: PluginInfo, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
|
||||
if (message.plugin !== "") {
|
||||
writer.uint32(10).string(message.plugin);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): PluginInfo {
|
||||
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = createBasePluginInfo();
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
if (tag !== 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.plugin = reader.string();
|
||||
continue;
|
||||
}
|
||||
if ((tag & 7) === 4 || tag === 0) {
|
||||
break;
|
||||
}
|
||||
reader.skipType(tag & 7);
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): PluginInfo {
|
||||
return { plugin: isSet(object.plugin) ? globalThis.String(object.plugin) : "" };
|
||||
},
|
||||
|
||||
toJSON(message: PluginInfo): unknown {
|
||||
const obj: any = {};
|
||||
if (message.plugin !== "") {
|
||||
obj.plugin = message.plugin;
|
||||
}
|
||||
return obj;
|
||||
},
|
||||
|
||||
create(base?: DeepPartial<PluginInfo>): PluginInfo {
|
||||
return PluginInfo.fromPartial(base ?? {});
|
||||
},
|
||||
fromPartial(object: DeepPartial<PluginInfo>): PluginInfo {
|
||||
const message = createBasePluginInfo();
|
||||
message.plugin = object.plugin ?? "";
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
function createBaseHookResponse(): HookResponse {
|
||||
return { info: undefined, data: "" };
|
||||
}
|
||||
|
||||
export const HookResponse = {
|
||||
encode(message: HookResponse, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
|
||||
if (message.info !== undefined) {
|
||||
PluginInfo.encode(message.info, writer.uint32(10).fork()).ldelim();
|
||||
}
|
||||
if (message.data !== "") {
|
||||
writer.uint32(18).string(message.data);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): HookResponse {
|
||||
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = createBaseHookResponse();
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
if (tag !== 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.info = PluginInfo.decode(reader, reader.uint32());
|
||||
continue;
|
||||
case 2:
|
||||
if (tag !== 18) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.data = reader.string();
|
||||
continue;
|
||||
}
|
||||
if ((tag & 7) === 4 || tag === 0) {
|
||||
break;
|
||||
}
|
||||
reader.skipType(tag & 7);
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): HookResponse {
|
||||
return {
|
||||
info: isSet(object.info) ? PluginInfo.fromJSON(object.info) : undefined,
|
||||
data: isSet(object.data) ? globalThis.String(object.data) : "",
|
||||
};
|
||||
},
|
||||
|
||||
toJSON(message: HookResponse): unknown {
|
||||
const obj: any = {};
|
||||
if (message.info !== undefined) {
|
||||
obj.info = PluginInfo.toJSON(message.info);
|
||||
}
|
||||
if (message.data !== "") {
|
||||
obj.data = message.data;
|
||||
}
|
||||
return obj;
|
||||
},
|
||||
|
||||
create(base?: DeepPartial<HookResponse>): HookResponse {
|
||||
return HookResponse.fromPartial(base ?? {});
|
||||
},
|
||||
fromPartial(object: DeepPartial<HookResponse>): HookResponse {
|
||||
const message = createBaseHookResponse();
|
||||
message.info = (object.info !== undefined && object.info !== null)
|
||||
? PluginInfo.fromPartial(object.info)
|
||||
: undefined;
|
||||
message.data = object.data ?? "";
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
function createBaseHookImportRequest(): HookImportRequest {
|
||||
return { data: "" };
|
||||
}
|
||||
|
||||
export const HookImportRequest = {
|
||||
encode(message: HookImportRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
|
||||
if (message.data !== "") {
|
||||
writer.uint32(10).string(message.data);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): HookImportRequest {
|
||||
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = createBaseHookImportRequest();
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
if (tag !== 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.data = reader.string();
|
||||
continue;
|
||||
}
|
||||
if ((tag & 7) === 4 || tag === 0) {
|
||||
break;
|
||||
}
|
||||
reader.skipType(tag & 7);
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): HookImportRequest {
|
||||
return { data: isSet(object.data) ? globalThis.String(object.data) : "" };
|
||||
},
|
||||
|
||||
toJSON(message: HookImportRequest): unknown {
|
||||
const obj: any = {};
|
||||
if (message.data !== "") {
|
||||
obj.data = message.data;
|
||||
}
|
||||
return obj;
|
||||
},
|
||||
|
||||
create(base?: DeepPartial<HookImportRequest>): HookImportRequest {
|
||||
return HookImportRequest.fromPartial(base ?? {});
|
||||
},
|
||||
fromPartial(object: DeepPartial<HookImportRequest>): HookImportRequest {
|
||||
const message = createBaseHookImportRequest();
|
||||
message.data = object.data ?? "";
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
function createBaseHookResponseFilterRequest(): HookResponseFilterRequest {
|
||||
return { filter: "", body: "", contentType: "" };
|
||||
}
|
||||
|
||||
export const HookResponseFilterRequest = {
|
||||
encode(message: HookResponseFilterRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
|
||||
if (message.filter !== "") {
|
||||
writer.uint32(10).string(message.filter);
|
||||
}
|
||||
if (message.body !== "") {
|
||||
writer.uint32(18).string(message.body);
|
||||
}
|
||||
if (message.contentType !== "") {
|
||||
writer.uint32(26).string(message.contentType);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): HookResponseFilterRequest {
|
||||
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = createBaseHookResponseFilterRequest();
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
if (tag !== 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.filter = reader.string();
|
||||
continue;
|
||||
case 2:
|
||||
if (tag !== 18) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.body = reader.string();
|
||||
continue;
|
||||
case 3:
|
||||
if (tag !== 26) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.contentType = reader.string();
|
||||
continue;
|
||||
}
|
||||
if ((tag & 7) === 4 || tag === 0) {
|
||||
break;
|
||||
}
|
||||
reader.skipType(tag & 7);
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): HookResponseFilterRequest {
|
||||
return {
|
||||
filter: isSet(object.filter) ? globalThis.String(object.filter) : "",
|
||||
body: isSet(object.body) ? globalThis.String(object.body) : "",
|
||||
contentType: isSet(object.contentType) ? globalThis.String(object.contentType) : "",
|
||||
};
|
||||
},
|
||||
|
||||
toJSON(message: HookResponseFilterRequest): unknown {
|
||||
const obj: any = {};
|
||||
if (message.filter !== "") {
|
||||
obj.filter = message.filter;
|
||||
}
|
||||
if (message.body !== "") {
|
||||
obj.body = message.body;
|
||||
}
|
||||
if (message.contentType !== "") {
|
||||
obj.contentType = message.contentType;
|
||||
}
|
||||
return obj;
|
||||
},
|
||||
|
||||
create(base?: DeepPartial<HookResponseFilterRequest>): HookResponseFilterRequest {
|
||||
return HookResponseFilterRequest.fromPartial(base ?? {});
|
||||
},
|
||||
fromPartial(object: DeepPartial<HookResponseFilterRequest>): HookResponseFilterRequest {
|
||||
const message = createBaseHookResponseFilterRequest();
|
||||
message.filter = object.filter ?? "";
|
||||
message.body = object.body ?? "";
|
||||
message.contentType = object.contentType ?? "";
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
function createBaseHookExportRequest(): HookExportRequest {
|
||||
return { request: "" };
|
||||
}
|
||||
|
||||
export const HookExportRequest = {
|
||||
encode(message: HookExportRequest, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer {
|
||||
if (message.request !== "") {
|
||||
writer.uint32(10).string(message.request);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): HookExportRequest {
|
||||
const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = createBaseHookExportRequest();
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
if (tag !== 10) {
|
||||
break;
|
||||
}
|
||||
|
||||
message.request = reader.string();
|
||||
continue;
|
||||
}
|
||||
if ((tag & 7) === 4 || tag === 0) {
|
||||
break;
|
||||
}
|
||||
reader.skipType(tag & 7);
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): HookExportRequest {
|
||||
return { request: isSet(object.request) ? globalThis.String(object.request) : "" };
|
||||
},
|
||||
|
||||
toJSON(message: HookExportRequest): unknown {
|
||||
const obj: any = {};
|
||||
if (message.request !== "") {
|
||||
obj.request = message.request;
|
||||
}
|
||||
return obj;
|
||||
},
|
||||
|
||||
create(base?: DeepPartial<HookExportRequest>): HookExportRequest {
|
||||
return HookExportRequest.fromPartial(base ?? {});
|
||||
},
|
||||
fromPartial(object: DeepPartial<HookExportRequest>): HookExportRequest {
|
||||
const message = createBaseHookExportRequest();
|
||||
message.request = object.request ?? "";
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
export type PluginRuntimeDefinition = typeof PluginRuntimeDefinition;
|
||||
export const PluginRuntimeDefinition = {
|
||||
name: "PluginRuntime",
|
||||
fullName: "yaak.plugins.runtime.PluginRuntime",
|
||||
methods: {
|
||||
hookImport: {
|
||||
name: "hookImport",
|
||||
requestType: HookImportRequest,
|
||||
requestStream: false,
|
||||
responseType: HookResponse,
|
||||
responseStream: false,
|
||||
options: {},
|
||||
},
|
||||
hookResponseFilter: {
|
||||
name: "hookResponseFilter",
|
||||
requestType: HookResponseFilterRequest,
|
||||
requestStream: false,
|
||||
responseType: HookResponse,
|
||||
responseStream: false,
|
||||
options: {},
|
||||
},
|
||||
hookExport: {
|
||||
name: "hookExport",
|
||||
requestType: HookExportRequest,
|
||||
requestStream: false,
|
||||
responseType: HookResponse,
|
||||
responseStream: false,
|
||||
options: {},
|
||||
},
|
||||
},
|
||||
} as const;
|
||||
|
||||
export interface PluginRuntimeServiceImplementation<CallContextExt = {}> {
|
||||
hookImport(request: HookImportRequest, context: CallContext & CallContextExt): Promise<DeepPartial<HookResponse>>;
|
||||
hookResponseFilter(
|
||||
request: HookResponseFilterRequest,
|
||||
context: CallContext & CallContextExt,
|
||||
): Promise<DeepPartial<HookResponse>>;
|
||||
hookExport(request: HookExportRequest, context: CallContext & CallContextExt): Promise<DeepPartial<HookResponse>>;
|
||||
}
|
||||
|
||||
export interface PluginRuntimeClient<CallOptionsExt = {}> {
|
||||
hookImport(request: DeepPartial<HookImportRequest>, options?: CallOptions & CallOptionsExt): Promise<HookResponse>;
|
||||
hookResponseFilter(
|
||||
request: DeepPartial<HookResponseFilterRequest>,
|
||||
options?: CallOptions & CallOptionsExt,
|
||||
): Promise<HookResponse>;
|
||||
hookExport(request: DeepPartial<HookExportRequest>, options?: CallOptions & CallOptionsExt): Promise<HookResponse>;
|
||||
}
|
||||
|
||||
type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined;
|
||||
|
||||
export type DeepPartial<T> = T extends Builtin ? T
|
||||
: T extends globalThis.Array<infer U> ? globalThis.Array<DeepPartial<U>>
|
||||
: T extends ReadonlyArray<infer U> ? ReadonlyArray<DeepPartial<U>>
|
||||
: T extends {} ? { [K in keyof T]?: DeepPartial<T[K]> }
|
||||
: Partial<T>;
|
||||
|
||||
function isSet(value: any): boolean {
|
||||
return value !== null && value !== undefined;
|
||||
}
|
||||
87
plugin-runtime/src/index.ts
Normal file
87
plugin-runtime/src/index.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { isAbortError } from 'abort-controller-x';
|
||||
import { createServer, ServerError, ServerMiddlewareCall, Status } from 'nice-grpc';
|
||||
import { CallContext } from 'nice-grpc-common';
|
||||
import * as fs from 'node:fs';
|
||||
import {
|
||||
DeepPartial,
|
||||
HookExportRequest,
|
||||
HookImportRequest,
|
||||
HookResponse,
|
||||
HookResponseFilterRequest,
|
||||
PluginRuntimeDefinition,
|
||||
PluginRuntimeServiceImplementation,
|
||||
} from './gen/plugins/runtime';
|
||||
import { PluginManager } from './PluginManager';
|
||||
|
||||
class PluginRuntimeService implements PluginRuntimeServiceImplementation {
|
||||
#manager: PluginManager;
|
||||
|
||||
constructor() {
|
||||
this.#manager = PluginManager.instance();
|
||||
}
|
||||
|
||||
async hookExport(request: HookExportRequest): Promise<DeepPartial<HookResponse>> {
|
||||
const plugin = await this.#manager.pluginOrThrow('exporter-curl');
|
||||
const data = await plugin.runExport(JSON.parse(request.request));
|
||||
const info = { plugin: (await plugin.getInfo()).name };
|
||||
return { info, data };
|
||||
}
|
||||
|
||||
async hookImport(request: HookImportRequest): Promise<DeepPartial<HookResponse>> {
|
||||
const plugins = await this.#manager.pluginsWith('import');
|
||||
for (const p of plugins) {
|
||||
const data = await p.runImport(request.data);
|
||||
if (data != 'null') {
|
||||
const info = { plugin: (await p.getInfo()).name };
|
||||
return { info, data };
|
||||
}
|
||||
}
|
||||
|
||||
throw new ServerError(Status.UNKNOWN, 'No importers found for data');
|
||||
}
|
||||
|
||||
async hookResponseFilter(request: HookResponseFilterRequest): Promise<DeepPartial<HookResponse>> {
|
||||
const pluginName = request.contentType.includes('json') ? 'filter-jsonpath' : 'filter-xpath';
|
||||
const plugin = await this.#manager.pluginOrThrow(pluginName);
|
||||
const data = await plugin.runResponseFilter(request);
|
||||
const info = { plugin: (await plugin.getInfo()).name };
|
||||
return { info, data };
|
||||
}
|
||||
}
|
||||
|
||||
let server = createServer();
|
||||
|
||||
async function* errorHandlingMiddleware<Request, Response>(
|
||||
call: ServerMiddlewareCall<Request, Response>,
|
||||
context: CallContext,
|
||||
) {
|
||||
try {
|
||||
return yield* call.next(call.request, context);
|
||||
} catch (error: unknown) {
|
||||
if (error instanceof ServerError || isAbortError(error)) {
|
||||
throw error;
|
||||
}
|
||||
|
||||
let details = String(error);
|
||||
|
||||
if (process.env.NODE_ENV === 'development') {
|
||||
// @ts-ignore
|
||||
details += `: ${error.stack}`;
|
||||
}
|
||||
|
||||
throw new ServerError(Status.UNKNOWN, details);
|
||||
}
|
||||
}
|
||||
|
||||
server = server.use(errorHandlingMiddleware);
|
||||
server.add(PluginRuntimeDefinition, new PluginRuntimeService());
|
||||
|
||||
// Start on random port if GRPC_PORT_FILE_PATH is set, or :4000
|
||||
const addr = process.env.GRPC_PORT_FILE_PATH ? 'localhost:0' : 'localhost:4000';
|
||||
server.listen(addr).then((port) => {
|
||||
console.log('gRPC server listening on', `http://localhost:${port}`);
|
||||
if (process.env.GRPC_PORT_FILE_PATH) {
|
||||
console.log('Wrote port file to', process.env.GRPC_PORT_FILE_PATH);
|
||||
fs.writeFileSync(process.env.GRPC_PORT_FILE_PATH, JSON.stringify({ port }, null, 2));
|
||||
}
|
||||
});
|
||||
64
plugin-runtime/src/index.worker.ts
Normal file
64
plugin-runtime/src/index.worker.ts
Normal file
@@ -0,0 +1,64 @@
|
||||
import { readFileSync } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { parentPort, workerData } from 'node:worker_threads';
|
||||
import { ParentToWorkerEvent } from './PluginHandle';
|
||||
import { PluginInfo } from './plugins';
|
||||
|
||||
new Promise(async () => {
|
||||
const { pluginDir } = workerData;
|
||||
const pathMod = path.join(pluginDir, 'build/index.js');
|
||||
const pathPkg = path.join(pluginDir, 'package.json');
|
||||
|
||||
const pkg = JSON.parse(readFileSync(pathPkg, 'utf8'));
|
||||
const mod = (await import(pathMod)).default ?? {};
|
||||
|
||||
const info: PluginInfo = {
|
||||
capabilities: [],
|
||||
name: pkg['name'] ?? 'n/a',
|
||||
dir: pluginDir,
|
||||
};
|
||||
|
||||
if (typeof mod['pluginHookImport'] === 'function') {
|
||||
info.capabilities.push('import');
|
||||
}
|
||||
|
||||
if (typeof mod['pluginHookExport'] === 'function') {
|
||||
info.capabilities.push('export');
|
||||
}
|
||||
|
||||
if (typeof mod['pluginHookResponseFilter'] === 'function') {
|
||||
info.capabilities.push('filter');
|
||||
}
|
||||
|
||||
console.log('Loaded plugin', info.name, info.capabilities, info.dir);
|
||||
|
||||
function reply<T>(originalMsg: ParentToWorkerEvent, payload: T) {
|
||||
parentPort!.postMessage({ payload, callbackId: originalMsg.callbackId });
|
||||
}
|
||||
|
||||
function replyErr(originalMsg: ParentToWorkerEvent, error: unknown) {
|
||||
parentPort!.postMessage({
|
||||
error: String(error),
|
||||
callbackId: originalMsg.callbackId,
|
||||
});
|
||||
}
|
||||
|
||||
parentPort!.on('message', (msg: ParentToWorkerEvent) => {
|
||||
try {
|
||||
const ctx = { todo: 'implement me' };
|
||||
if (msg.name === 'run-import') {
|
||||
reply(msg, mod.pluginHookImport(ctx, msg.payload));
|
||||
} else if (msg.name === 'run-filter') {
|
||||
reply(msg, mod.pluginHookResponseFilter(ctx, msg.payload));
|
||||
} else if (msg.name === 'run-export') {
|
||||
reply(msg, mod.pluginHookExport(ctx, msg.payload));
|
||||
} else if (msg.name === 'info') {
|
||||
reply(msg, info);
|
||||
} else {
|
||||
console.log('Unknown message', msg);
|
||||
}
|
||||
} catch (err: unknown) {
|
||||
replyErr(msg, err);
|
||||
}
|
||||
});
|
||||
}).catch((err) => console.log('failed to boot plugin', err));
|
||||
18
plugin-runtime/src/plugins.ts
Normal file
18
plugin-runtime/src/plugins.ts
Normal file
@@ -0,0 +1,18 @@
|
||||
import * as fs from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { PluginHandle } from './PluginHandle';
|
||||
|
||||
export interface PluginInfo {
|
||||
name: string;
|
||||
dir: string;
|
||||
capabilities: ('import' | 'export' | 'filter')[];
|
||||
}
|
||||
|
||||
export function loadPlugins(workerJsPath: string): PluginHandle[] {
|
||||
const pluginsDir = process.env.PLUGINS_DIR;
|
||||
if (!pluginsDir) throw new Error('PLUGINS_DIR is not set');
|
||||
console.log('Loading plugins from', pluginsDir);
|
||||
|
||||
const pluginDirs = fs.readdirSync(pluginsDir).map((p) => path.join(pluginsDir, p));
|
||||
return pluginDirs.map((pluginDir) => new PluginHandle({ pluginDir, workerJsPath }));
|
||||
}
|
||||
Reference in New Issue
Block a user