diff --git a/package-lock.json b/package-lock.json index 183b2662..8fb32b2d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26,7 +26,7 @@ "@tauri-apps/plugin-fs": "^2.0.0-rc.0", "@tauri-apps/plugin-os": "^2.0.0-rc.0", "@tauri-apps/plugin-shell": "^2.0.0-rc.0", - "@yaakapp/api": "^0.1.0-beta.8", + "@yaakapp/api": "^0.1.0-beta.9", "buffer": "^6.0.3", "classnames": "^2.3.2", "cm6-graphql": "^0.0.9", @@ -2981,9 +2981,9 @@ "integrity": "sha512-N8tkAACJx2ww8vFMneJmaAgmjAG1tnVBZJRLRcx061tmsLRZHSEZSLuGWnwPtunsSLvSqXQ2wfp7Mgqg1I+2dQ==" }, "node_modules/@yaakapp/api": { - "version": "0.1.0-beta.8", - "resolved": "https://registry.npmjs.org/@yaakapp/api/-/api-0.1.0-beta.8.tgz", - "integrity": "sha512-kR3c179QBlq/h29R/DWGJq7crAt9FZOFuhapdCYVFR1FsNGLP3+3PfyLc6rTj/SrqbH1lkIR4c2DmZKPE6yPPA==", + "version": "0.1.0-beta.9", + "resolved": "https://registry.npmjs.org/@yaakapp/api/-/api-0.1.0-beta.9.tgz", + "integrity": "sha512-aNh1e2mUOV7GlIiVjDu6GSdvPPH1mz0k3coJs0jP1UhDr2V63RUC/phf00ei4odAO1VGlS7ISrtXCRJ7rK93aQ==", "dependencies": { "@types/node": "^22.0.0" } diff --git a/package.json b/package.json index a9c15c5f..095d8b56 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "@tauri-apps/plugin-fs": "^2.0.0-rc.0", "@tauri-apps/plugin-os": "^2.0.0-rc.0", "@tauri-apps/plugin-shell": "^2.0.0-rc.0", - "@yaakapp/api": "^0.1.0-beta.8", + "@yaakapp/api": "^0.1.0-beta.9", "buffer": "^6.0.3", "classnames": "^2.3.2", "cm6-graphql": "^0.0.9", diff --git a/plugin-runtime-types/package.json b/plugin-runtime-types/package.json index f8f82cb0..c194cf3f 100644 --- a/plugin-runtime-types/package.json +++ b/plugin-runtime-types/package.json @@ -1,6 +1,6 @@ { "name": "@yaakapp/api", - "version": "0.1.0-beta.8", + "version": "0.1.0-beta.9", "main": "lib/index.js", "typings": "./lib/index.d.ts", "files": [ diff --git a/plugin-runtime-types/src/gen/BootRequest.ts b/plugin-runtime-types/src/gen/BootRequest.ts new file mode 100644 index 00000000..398ca72c --- /dev/null +++ b/plugin-runtime-types/src/gen/BootRequest.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type BootRequest = { dir: string, }; diff --git a/plugin-runtime-types/src/gen/BootResponse.ts b/plugin-runtime-types/src/gen/BootResponse.ts new file mode 100644 index 00000000..1c1654de --- /dev/null +++ b/plugin-runtime-types/src/gen/BootResponse.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type BootResponse = { name: string, version: string, capabilities: Array, }; diff --git a/plugin-runtime-types/src/gen/EmptyResponse.ts b/plugin-runtime-types/src/gen/EmptyResponse.ts new file mode 100644 index 00000000..7fd56b5a --- /dev/null +++ b/plugin-runtime-types/src/gen/EmptyResponse.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type EmptyResponse = {}; diff --git a/plugin-runtime-types/src/gen/ExportHttpRequestRequest.ts b/plugin-runtime-types/src/gen/ExportHttpRequestRequest.ts new file mode 100644 index 00000000..994628bc --- /dev/null +++ b/plugin-runtime-types/src/gen/ExportHttpRequestRequest.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { HttpRequest } from "./HttpRequest"; + +export type ExportHttpRequestRequest = { httpRequest: HttpRequest, }; diff --git a/plugin-runtime-types/src/gen/ExportHttpRequestResponse.ts b/plugin-runtime-types/src/gen/ExportHttpRequestResponse.ts new file mode 100644 index 00000000..c9953827 --- /dev/null +++ b/plugin-runtime-types/src/gen/ExportHttpRequestResponse.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ExportHttpRequestResponse = { content: string, }; diff --git a/plugin-runtime-types/src/gen/FilterRequest.ts b/plugin-runtime-types/src/gen/FilterRequest.ts new file mode 100644 index 00000000..8fc8df60 --- /dev/null +++ b/plugin-runtime-types/src/gen/FilterRequest.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type FilterRequest = { content: string, filter: string, }; diff --git a/plugin-runtime-types/src/gen/FilterResponse.ts b/plugin-runtime-types/src/gen/FilterResponse.ts new file mode 100644 index 00000000..9298ab34 --- /dev/null +++ b/plugin-runtime-types/src/gen/FilterResponse.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { JsonValue } from "./serde_json/JsonValue"; + +export type FilterResponse = { items: Array, }; diff --git a/plugin-runtime-types/src/gen/GrpcConnection.ts b/plugin-runtime-types/src/gen/GrpcConnection.ts deleted file mode 100644 index 595b5d00..00000000 --- a/plugin-runtime-types/src/gen/GrpcConnection.ts +++ /dev/null @@ -1,3 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export type GrpcConnection = { id: string, model: "grpc_connection", workspaceId: string, requestId: string, createdAt: string, updatedAt: string, service: string, method: string, elapsed: number, status: number, url: string, error: string | null, trailers: { [key: string]: string }, }; diff --git a/plugin-runtime-types/src/gen/GrpcEvent.ts b/plugin-runtime-types/src/gen/GrpcEvent.ts deleted file mode 100644 index 67028185..00000000 --- a/plugin-runtime-types/src/gen/GrpcEvent.ts +++ /dev/null @@ -1,4 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -import type { GrpcEventType } from "./GrpcEventType"; - -export type GrpcEvent = { id: string, model: "grpc_event", workspaceId: string, requestId: string, connectionId: string, createdAt: string, content: string, eventType: GrpcEventType, metadata: { [key: string]: string }, status: number | null, error: string | null, }; diff --git a/plugin-runtime-types/src/gen/GrpcEventType.ts b/plugin-runtime-types/src/gen/GrpcEventType.ts deleted file mode 100644 index 00d8db9b..00000000 --- a/plugin-runtime-types/src/gen/GrpcEventType.ts +++ /dev/null @@ -1,3 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export type GrpcEventType = "info" | "error" | "client_message" | "server_message" | "connection_start" | "connection_end"; diff --git a/plugin-runtime-types/src/gen/ImportRequest.ts b/plugin-runtime-types/src/gen/ImportRequest.ts new file mode 100644 index 00000000..c86eb371 --- /dev/null +++ b/plugin-runtime-types/src/gen/ImportRequest.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type ImportRequest = { content: string, }; diff --git a/plugin-runtime-types/src/gen/ImportResources.ts b/plugin-runtime-types/src/gen/ImportResources.ts new file mode 100644 index 00000000..f4cb87d7 --- /dev/null +++ b/plugin-runtime-types/src/gen/ImportResources.ts @@ -0,0 +1,8 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { Environment } from "./Environment"; +import type { Folder } from "./Folder"; +import type { GrpcRequest } from "./GrpcRequest"; +import type { HttpRequest } from "./HttpRequest"; +import type { Workspace } from "./Workspace"; + +export type ImportResources = { workspaces: Array, environments: Array, folders: Array, httpRequests: Array, grpcRequests: Array, }; diff --git a/plugin-runtime-types/src/gen/ImportResponse.ts b/plugin-runtime-types/src/gen/ImportResponse.ts new file mode 100644 index 00000000..421aac82 --- /dev/null +++ b/plugin-runtime-types/src/gen/ImportResponse.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { ImportResources } from "./ImportResources"; + +export type ImportResponse = { resources: ImportResources, }; diff --git a/plugin-runtime-types/src/gen/InternalEvent.ts b/plugin-runtime-types/src/gen/InternalEvent.ts new file mode 100644 index 00000000..74977b54 --- /dev/null +++ b/plugin-runtime-types/src/gen/InternalEvent.ts @@ -0,0 +1,4 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { InternalEventPayload } from "./InternalEventPayload"; + +export type InternalEvent = { id: string, pluginRefId: string, replyId: string | null, payload: InternalEventPayload, }; diff --git a/plugin-runtime-types/src/gen/InternalEventPayload.ts b/plugin-runtime-types/src/gen/InternalEventPayload.ts new file mode 100644 index 00000000..05b4a71c --- /dev/null +++ b/plugin-runtime-types/src/gen/InternalEventPayload.ts @@ -0,0 +1,12 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { BootRequest } from "./BootRequest"; +import type { BootResponse } from "./BootResponse"; +import type { EmptyResponse } from "./EmptyResponse"; +import type { ExportHttpRequestRequest } from "./ExportHttpRequestRequest"; +import type { ExportHttpRequestResponse } from "./ExportHttpRequestResponse"; +import type { FilterRequest } from "./FilterRequest"; +import type { FilterResponse } from "./FilterResponse"; +import type { ImportRequest } from "./ImportRequest"; +import type { ImportResponse } from "./ImportResponse"; + +export type InternalEventPayload = { "type": "boot_request" } & BootRequest | { "type": "boot_response" } & BootResponse | { "type": "import_request" } & ImportRequest | { "type": "import_response" } & ImportResponse | { "type": "filter_request" } & FilterRequest | { "type": "filter_response" } & FilterResponse | { "type": "export_http_request_request" } & ExportHttpRequestRequest | { "type": "export_http_request_response" } & ExportHttpRequestResponse | { "type": "empty_response" } & EmptyResponse; diff --git a/plugin-runtime-types/src/gen/KeyValue.ts b/plugin-runtime-types/src/gen/KeyValue.ts deleted file mode 100644 index 301df19c..00000000 --- a/plugin-runtime-types/src/gen/KeyValue.ts +++ /dev/null @@ -1,3 +0,0 @@ -// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. - -export type KeyValue = { model: "key_value", createdAt: string, updatedAt: string, namespace: string, key: string, value: string, }; diff --git a/plugin-runtime-types/src/gen/Model.ts b/plugin-runtime-types/src/gen/Model.ts new file mode 100644 index 00000000..1e9c82d5 --- /dev/null +++ b/plugin-runtime-types/src/gen/Model.ts @@ -0,0 +1,9 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. +import type { Environment } from "./Environment"; +import type { Folder } from "./Folder"; +import type { GrpcRequest } from "./GrpcRequest"; +import type { HttpRequest } from "./HttpRequest"; +import type { HttpResponse } from "./HttpResponse"; +import type { Workspace } from "./Workspace"; + +export type Model = { "workspace": Workspace } | { "environment": Environment } | { "folder": Folder } | { "httpRequest": HttpRequest } | { "httpResponse": HttpResponse } | { "grpcRequest": GrpcRequest }; diff --git a/plugin-runtime-types/src/gen/serde_json/JsonValue.ts b/plugin-runtime-types/src/gen/serde_json/JsonValue.ts new file mode 100644 index 00000000..1af2d6e0 --- /dev/null +++ b/plugin-runtime-types/src/gen/serde_json/JsonValue.ts @@ -0,0 +1,3 @@ +// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. + +export type JsonValue = number | string | Array | { [key: string]: JsonValue }; diff --git a/plugin-runtime-types/src/index.ts b/plugin-runtime-types/src/index.ts index e329c582..c76addc2 100644 --- a/plugin-runtime-types/src/index.ts +++ b/plugin-runtime-types/src/index.ts @@ -4,15 +4,22 @@ export type * from './themes'; export * from './gen/Environment'; export * from './gen/EnvironmentVariable'; export * from './gen/Folder'; -export * from './gen/GrpcConnection'; -export * from './gen/GrpcEvent'; -export * from './gen/GrpcEventType'; export * from './gen/GrpcMetadataEntry'; export * from './gen/GrpcRequest'; export * from './gen/HttpRequest'; export * from './gen/HttpRequestHeader'; export * from './gen/HttpResponse'; -export * from './gen/HttpResponseHeader'; export * from './gen/HttpUrlParameter'; -export * from './gen/KeyValue'; +export * from './gen/BootRequest'; +export * from './gen/BootResponse'; +export * from './gen/EmptyResponse'; +export * from './gen/ExportHttpRequestRequest'; +export * from './gen/ExportHttpRequestResponse'; +export * from './gen/FilterRequest'; +export * from './gen/FilterResponse'; +export * from './gen/ImportRequest'; +export * from './gen/InternalEvent'; +export * from './gen/InternalEventPayload'; +export * from './gen/ImportResources'; +export * from './gen/ImportResponse'; export * from './gen/Workspace'; diff --git a/plugin-runtime/package-lock.json b/plugin-runtime/package-lock.json index ef045f27..fb1925fe 100644 --- a/plugin-runtime/package-lock.json +++ b/plugin-runtime/package-lock.json @@ -6,11 +6,13 @@ "": { "name": "@yaak/plugin-runtime", "dependencies": { + "intercept-stdout": "^0.1.2", "long": "^5.2.3", "nice-grpc": "^2.1.9", "protobufjs": "^7.3.2" }, "devDependencies": { + "@types/intercept-stdout": "^0.1.3", "grpc-tools": "^1.12.4", "nodemon": "^3.1.4", "npm-run-all": "^4.1.5", @@ -192,6 +194,12 @@ "integrity": "sha512-vxhUy4J8lyeyinH7Azl1pdd43GJhZH/tP2weN8TntQblOY+A0XbT8DJk1/oCPuOOyg/Ja757rG0CgHcWC8OfMA==", "dev": true }, + "node_modules/@types/intercept-stdout": { + "version": "0.1.3", + "resolved": "https://registry.npmjs.org/@types/intercept-stdout/-/intercept-stdout-0.1.3.tgz", + "integrity": "sha512-5qWSvqohM5rRKsF58LBWJeyu+lUlZwYKSnTcnXGfvFyMYIjvhpfniQRJNiyE/Gcru3jwVr2pHedsKTGLtzZqNA==", + "dev": true + }, "node_modules/@types/node": { "version": "20.14.7", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.14.7.tgz", @@ -1304,6 +1312,14 @@ "integrity": "sha512-k/vGaX4/Yla3WzyMCvTQOXYeIHvqOKtnqBduzTHpzpQZzAskKMhZ2K+EnBiSM9zGSoIFeMpXKxa4dYeZIQqewQ==", "dev": true }, + "node_modules/intercept-stdout": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/intercept-stdout/-/intercept-stdout-0.1.2.tgz", + "integrity": "sha512-Umb41Ryp5FzLurfCRAWx+jjNAk8jsw2RTk2XPIwus+86h/Y2Eb4DfOWof/mZ6FBww8SoO45rJSlg25054/Di9w==", + "dependencies": { + "lodash.toarray": "^3.0.0" + } + }, "node_modules/internal-slot": { "version": "1.0.7", "resolved": "https://registry.npmjs.org/internal-slot/-/internal-slot-1.0.7.tgz", @@ -1644,11 +1660,56 @@ "node": ">=4" } }, + "node_modules/lodash._arraycopy": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/lodash._arraycopy/-/lodash._arraycopy-3.0.0.tgz", + "integrity": "sha512-RHShTDnPKP7aWxlvXKiDT6IX2jCs6YZLCtNhOru/OX2Q/tzX295vVBK5oX1ECtN+2r86S0Ogy8ykP1sgCZAN0A==" + }, + "node_modules/lodash._basevalues": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/lodash._basevalues/-/lodash._basevalues-3.0.0.tgz", + "integrity": "sha512-H94wl5P13uEqlCg7OcNNhMQ8KvWSIyqXzOPusRgHC9DK3o54P6P3xtbXlVbRABG4q5gSmp7EDdJ0MSuW9HX6Mg==" + }, + "node_modules/lodash._getnative": { + "version": "3.9.1", + "resolved": "https://registry.npmjs.org/lodash._getnative/-/lodash._getnative-3.9.1.tgz", + "integrity": "sha512-RrL9VxMEPyDMHOd9uFbvMe8X55X16/cGM5IgOKgRElQZutpX89iS6vwl64duTV1/16w5JY7tuFNXqoekmh1EmA==" + }, "node_modules/lodash.camelcase": { "version": "4.3.0", "resolved": "https://registry.npmjs.org/lodash.camelcase/-/lodash.camelcase-4.3.0.tgz", "integrity": "sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==" }, + "node_modules/lodash.isarguments": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz", + "integrity": "sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==" + }, + "node_modules/lodash.isarray": { + "version": "3.0.4", + "resolved": "https://registry.npmjs.org/lodash.isarray/-/lodash.isarray-3.0.4.tgz", + "integrity": "sha512-JwObCrNJuT0Nnbuecmqr5DgtuBppuCvGD9lxjFpAzwnVtdGoDQ1zig+5W8k5/6Gcn0gZ3936HDAlGd28i7sOGQ==" + }, + "node_modules/lodash.keys": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/lodash.keys/-/lodash.keys-3.1.2.tgz", + "integrity": "sha512-CuBsapFjcubOGMn3VD+24HOAPxM79tH+V6ivJL3CHYjtrawauDJHUk//Yew9Hvc6e9rbCrURGk8z6PC+8WJBfQ==", + "dependencies": { + "lodash._getnative": "^3.0.0", + "lodash.isarguments": "^3.0.0", + "lodash.isarray": "^3.0.0" + } + }, + "node_modules/lodash.toarray": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/lodash.toarray/-/lodash.toarray-3.0.2.tgz", + "integrity": "sha512-ptkjUqvuHjTuMJJxiktJpZhxM5l60bEkfntJx+NFzdQd1bZVxfpTF1bhFYFqBrT4F0wZ1qx9KbVmHJV3Rfc7Tw==", + "dependencies": { + "lodash._arraycopy": "^3.0.0", + "lodash._basevalues": "^3.0.0", + "lodash.keys": "^3.0.0" + } + }, "node_modules/long": { "version": "5.2.3", "resolved": "https://registry.npmjs.org/long/-/long-5.2.3.tgz", diff --git a/plugin-runtime/package.json b/plugin-runtime/package.json index 74056824..19dfdd8c 100644 --- a/plugin-runtime/package.json +++ b/plugin-runtime/package.json @@ -8,11 +8,13 @@ "build:proto": "grpc_tools_node_protoc --ts_proto_out=src/gen --ts_proto_opt=outputServices=nice-grpc,outputServices=generic-definitions,useExactTypes=false --proto_path=../proto ../proto/plugins/*.proto" }, "dependencies": { + "intercept-stdout": "^0.1.2", "long": "^5.2.3", "nice-grpc": "^2.1.9", "protobufjs": "^7.3.2" }, "devDependencies": { + "@types/intercept-stdout": "^0.1.3", "grpc-tools": "^1.12.4", "nodemon": "^3.1.4", "npm-run-all": "^4.1.5", diff --git a/plugin-runtime/src/EventChannel.ts b/plugin-runtime/src/EventChannel.ts new file mode 100644 index 00000000..9e51623f --- /dev/null +++ b/plugin-runtime/src/EventChannel.ts @@ -0,0 +1,21 @@ +import { InternalEvent } from '@yaakapp/api'; +import EventEmitter from 'node:events'; +import { EventStreamEvent } from './gen/plugins/runtime'; + +export class EventChannel { + emitter: EventEmitter = new EventEmitter(); + + emit(e: InternalEvent) { + this.emitter.emit('__plugin_event__', { event: JSON.stringify(e) }); + } + + async *listen(): AsyncGenerator { + while (true) { + yield new Promise((resolve) => { + this.emitter.once('__plugin_event__', (event: EventStreamEvent) => { + resolve(event); + }); + }); + } + } +} diff --git a/plugin-runtime/src/PluginHandle.ts b/plugin-runtime/src/PluginHandle.ts index fdfb87c7..912bec4f 100644 --- a/plugin-runtime/src/PluginHandle.ts +++ b/plugin-runtime/src/PluginHandle.ts @@ -1,81 +1,31 @@ -import { randomUUID } from 'node:crypto'; +import { InternalEvent } from '@yaakapp/api'; import path from 'node:path'; import { Worker } from 'node:worker_threads'; -import { PluginInfo } from './plugins'; - -export interface ParentToWorkerEvent { - callbackId: string; - name: string; - payload: T; -} - -export type WorkerToParentSuccessEvent = { - callbackId: string; - payload: T; -}; - -export type WorkerToParentErrorEvent = { - callbackId: string; - error: string; -}; - -export type WorkerToParentEvent = WorkerToParentErrorEvent | WorkerToParentSuccessEvent; +import { EventChannel } from './EventChannel'; export class PluginHandle { - readonly pluginDir: string; readonly #worker: Worker; - constructor(pluginDir: string) { - this.pluginDir = pluginDir; - - const workerPath = path.join(__dirname, 'index.worker.cjs'); + constructor( + readonly pluginDir: string, + readonly pluginRefId: string, + readonly events: EventChannel, + ) { + const workerPath = process.env.YAAK_WORKER_PATH ?? path.join(__dirname, 'index.worker.cjs'); this.#worker = new Worker(workerPath, { workerData: { - pluginDir: this.pluginDir, + pluginDir, + pluginRefId, }, }); + this.#worker.on('message', (e) => this.events.emit(e)); this.#worker.on('error', this.#handleError.bind(this)); this.#worker.on('exit', this.#handleExit.bind(this)); } - async getInfo(): Promise { - return this.#callPlugin('info', null); - } - - async runResponseFilter({ filter, body }: { filter: string; body: string }): Promise { - return this.#callPlugin('run-filter', { filter, body }); - } - - async runExport(request: any): Promise { - return this.#callPlugin('run-export', request); - } - - async runImport(data: string): Promise { - const result = await this.#callPlugin('run-import', data); - - // Plugin returns object, but we convert to string - return JSON.stringify(result, null, 2); - } - - #callPlugin(name: string, payload: P): Promise { - const callbackId = `cb_${randomUUID().replaceAll('-', '')}`; - return new Promise((resolve, reject) => { - const cb = (e: WorkerToParentEvent) => { - if (e.callbackId !== callbackId) return; - - if ('error' in e) { - reject(e.error); - } else { - resolve(e.payload as R); - } - - this.#worker.removeListener('message', cb); - }; - - this.#worker.addListener('message', cb); - this.#worker.postMessage({ callbackId, name, payload }); - }); + sendToWorker(event: InternalEvent) { + this.#worker.postMessage(event); } async #handleError(err: Error) { diff --git a/plugin-runtime/src/PluginManager.ts b/plugin-runtime/src/PluginManager.ts deleted file mode 100644 index fc0ff75f..00000000 --- a/plugin-runtime/src/PluginManager.ts +++ /dev/null @@ -1,44 +0,0 @@ -import { PluginHandle } from './PluginHandle'; -import { loadPlugins, PluginInfo } from './plugins'; - -export class PluginManager { - #handles: PluginHandle[] | null = null; - static #instance: PluginManager | null = null; - - public static instance(): PluginManager { - if (PluginManager.#instance == null) { - PluginManager.#instance = new PluginManager(); - PluginManager.#instance.plugins(); // Trigger workers to boot, as it takes a few seconds - } - return PluginManager.#instance; - } - - async plugins(): Promise { - this.#handles = this.#handles ?? loadPlugins(); - return this.#handles; - } - - async #pluginsWithInfo(): Promise<{ plugin: PluginHandle; info: PluginInfo }[]> { - const plugins = await this.plugins(); - return Promise.all(plugins.map(async (plugin) => ({ plugin, info: await plugin.getInfo() }))); - } - - async pluginsWith(capability: PluginInfo['capabilities'][0]): Promise { - return (await this.#pluginsWithInfo()) - .filter((v) => v.info.capabilities.includes(capability)) - .map((v) => v.plugin); - } - - async plugin(name: string): Promise { - return (await this.#pluginsWithInfo()).find((v) => v.info.name === name)?.plugin ?? null; - } - - async pluginOrThrow(name: string): Promise { - const plugin = await this.plugin(name); - if (plugin == null) { - throw new Error(`Failed to find plugin by ${name}`); - } - - return plugin; - } -} diff --git a/plugin-runtime/src/gen/plugins/runtime.ts b/plugin-runtime/src/gen/plugins/runtime.ts index 31f5f13a..ba57cb5f 100644 --- a/plugin-runtime/src/gen/plugins/runtime.ts +++ b/plugin-runtime/src/gen/plugins/runtime.ts @@ -33,6 +33,10 @@ export interface HookExportRequest { request: string; } +export interface EventStreamEvent { + event: string; +} + function createBasePluginInfo(): PluginInfo { return { plugin: "" }; } @@ -369,54 +373,91 @@ export const HookExportRequest = { }, }; +function createBaseEventStreamEvent(): EventStreamEvent { + return { event: "" }; +} + +export const EventStreamEvent = { + encode(message: EventStreamEvent, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.event !== "") { + writer.uint32(10).string(message.event); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): EventStreamEvent { + const reader = input instanceof _m0.Reader ? input : _m0.Reader.create(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseEventStreamEvent(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + if (tag !== 10) { + break; + } + + message.event = reader.string(); + continue; + } + if ((tag & 7) === 4 || tag === 0) { + break; + } + reader.skipType(tag & 7); + } + return message; + }, + + fromJSON(object: any): EventStreamEvent { + return { event: isSet(object.event) ? globalThis.String(object.event) : "" }; + }, + + toJSON(message: EventStreamEvent): unknown { + const obj: any = {}; + if (message.event !== "") { + obj.event = message.event; + } + return obj; + }, + + create(base?: DeepPartial): EventStreamEvent { + return EventStreamEvent.fromPartial(base ?? {}); + }, + fromPartial(object: DeepPartial): EventStreamEvent { + const message = createBaseEventStreamEvent(); + message.event = object.event ?? ""; + return message; + }, +}; + export type PluginRuntimeDefinition = typeof PluginRuntimeDefinition; export const PluginRuntimeDefinition = { name: "PluginRuntime", fullName: "yaak.plugins.runtime.PluginRuntime", methods: { - hookImport: { - name: "hookImport", - requestType: HookImportRequest, - requestStream: false, - responseType: HookResponse, - responseStream: false, - options: {}, - }, - hookExport: { - name: "hookExport", - requestType: HookExportRequest, - requestStream: false, - responseType: HookResponse, - responseStream: false, - options: {}, - }, - hookResponseFilter: { - name: "hookResponseFilter", - requestType: HookResponseFilterRequest, - requestStream: false, - responseType: HookResponse, - responseStream: false, + eventStream: { + name: "EventStream", + requestType: EventStreamEvent, + requestStream: true, + responseType: EventStreamEvent, + responseStream: true, options: {}, }, }, } as const; export interface PluginRuntimeServiceImplementation { - hookImport(request: HookImportRequest, context: CallContext & CallContextExt): Promise>; - hookExport(request: HookExportRequest, context: CallContext & CallContextExt): Promise>; - hookResponseFilter( - request: HookResponseFilterRequest, + eventStream( + request: AsyncIterable, context: CallContext & CallContextExt, - ): Promise>; + ): ServerStreamingMethodResult>; } export interface PluginRuntimeClient { - hookImport(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise; - hookExport(request: DeepPartial, options?: CallOptions & CallOptionsExt): Promise; - hookResponseFilter( - request: DeepPartial, + eventStream( + request: AsyncIterable>, options?: CallOptions & CallOptionsExt, - ): Promise; + ): AsyncIterable; } type Builtin = Date | Function | Uint8Array | string | number | boolean | undefined; @@ -430,3 +471,5 @@ export type DeepPartial = T extends Builtin ? T function isSet(value: any): boolean { return value !== null && value !== undefined; } + +export type ServerStreamingMethodResult = { [Symbol.asyncIterator](): AsyncIterator }; diff --git a/plugin-runtime/src/index.ts b/plugin-runtime/src/index.ts index e1eed6c5..96cc90cd 100644 --- a/plugin-runtime/src/index.ts +++ b/plugin-runtime/src/index.ts @@ -1,87 +1,42 @@ -import { isAbortError } from 'abort-controller-x'; -import { createServer, ServerError, ServerMiddlewareCall, Status } from 'nice-grpc'; -import { CallContext } from 'nice-grpc-common'; -import * as fs from 'node:fs'; -import { - DeepPartial, - HookExportRequest, - HookImportRequest, - HookResponse, - HookResponseFilterRequest, - PluginRuntimeDefinition, - PluginRuntimeServiceImplementation, -} from './gen/plugins/runtime'; -import { PluginManager } from './PluginManager'; +import { InternalEvent } from '@yaakapp/api'; +import { createChannel, createClient, Status } from 'nice-grpc'; +import { EventChannel } from './EventChannel'; +import { PluginRuntimeClient, PluginRuntimeDefinition } from './gen/plugins/runtime'; +import { PluginHandle } from './PluginHandle'; -class PluginRuntimeService implements PluginRuntimeServiceImplementation { - #manager: PluginManager; +const port = process.env.PORT || '50051'; - constructor() { - this.#manager = PluginManager.instance(); - } +const channel = createChannel(`localhost:${port}`); +const client: PluginRuntimeClient = createClient(PluginRuntimeDefinition, channel); - async hookExport(request: HookExportRequest): Promise> { - const plugin = await this.#manager.pluginOrThrow('exporter-curl'); - const data = await plugin.runExport(JSON.parse(request.request)); - const info = { plugin: (await plugin.getInfo()).name }; - return { info, data }; - } +const events = new EventChannel(); +const plugins: Record = {}; - async hookImport(request: HookImportRequest): Promise> { - const plugins = await this.#manager.pluginsWith('import'); - for (const p of plugins) { - const data = await p.runImport(request.data); - if (data != null && data !== 'null') { - const info = { plugin: (await p.getInfo()).name }; - return { info, data }; - } - } - - throw new ServerError(Status.UNKNOWN, 'No importers found for data'); - } - - async hookResponseFilter(request: HookResponseFilterRequest): Promise> { - const pluginName = request.contentType.includes('json') ? 'filter-jsonpath' : 'filter-xpath'; - const plugin = await this.#manager.pluginOrThrow(pluginName); - const data = await plugin.runResponseFilter(request); - const info = { plugin: (await plugin.getInfo()).name }; - return { info, data }; - } -} - -let server = createServer(); - -async function* errorHandlingMiddleware( - call: ServerMiddlewareCall, - context: CallContext, -) { +(async () => { try { - return yield* call.next(call.request, context); - } catch (error: unknown) { - if (error instanceof ServerError || isAbortError(error)) { - throw error; + for await (const e of client.eventStream(events.listen())) { + const pluginEvent: InternalEvent = JSON.parse(e.event); + // Handle special event to bootstrap plugin + if (pluginEvent.payload.type === 'boot_request') { + const plugin = new PluginHandle(pluginEvent.payload.dir, pluginEvent.pluginRefId, events); + plugins[pluginEvent.pluginRefId] = plugin; + } + + // Once booted, forward all events to plugin's worker + const plugin = plugins[pluginEvent.pluginRefId]; + if (!plugin) { + console.warn('Failed to get plugin for event by', pluginEvent.pluginRefId); + continue; + } + + plugin.sendToWorker(pluginEvent); } - - let details = String(error); - - if (process.env.NODE_ENV === 'development') { - // @ts-ignore - details += `: ${error.stack}`; + console.log('Stream ended'); + } catch (err: any) { + if (err.code === Status.CANCELLED) { + console.log('Stream was cancelled by server'); + } else { + console.log('Client stream errored', err); } - - throw new ServerError(Status.UNKNOWN, details); } -} - -server = server.use(errorHandlingMiddleware); -server.add(PluginRuntimeDefinition, new PluginRuntimeService()); - -// Start on random port if YAAK_GRPC_PORT_FILE_PATH is set, or :4000 -const addr = process.env.YAAK_GRPC_PORT_FILE_PATH ? 'localhost:0' : 'localhost:4000'; -server.listen(addr).then((port) => { - console.log('gRPC server listening on', `http://localhost:${port}`); - if (process.env.YAAK_GRPC_PORT_FILE_PATH) { - console.log('Wrote port file to', process.env.YAAK_GRPC_PORT_FILE_PATH); - fs.writeFileSync(process.env.YAAK_GRPC_PORT_FILE_PATH, JSON.stringify({ port }, null, 2)); - } -}); +})(); diff --git a/plugin-runtime/src/index.worker.ts b/plugin-runtime/src/index.worker.ts index 97d03dba..8f7a9cc4 100644 --- a/plugin-runtime/src/index.worker.ts +++ b/plugin-runtime/src/index.worker.ts @@ -1,11 +1,13 @@ +import { ImportResponse, InternalEvent, InternalEventPayload } from '@yaakapp/api'; +import interceptStdout from 'intercept-stdout'; +import * as console from 'node:console'; import { readFileSync } from 'node:fs'; import path from 'node:path'; +import * as util from 'node:util'; import { parentPort, workerData } from 'node:worker_threads'; -import { ParentToWorkerEvent } from './PluginHandle'; -import { PluginInfo } from './plugins'; new Promise(async (resolve, reject) => { - const { pluginDir } = workerData; + const { pluginDir /*, pluginRefId*/ } = workerData; const pathMod = path.join(pluginDir, 'build/index.js'); const pathPkg = path.join(pluginDir, 'package.json'); @@ -18,59 +20,112 @@ new Promise(async (resolve, reject) => { return; } - const mod = (await import(`file://${pathMod}`)).default ?? {}; + prefixStdout(`[plugin][${pkg.name}] %s`); - const info: PluginInfo = { - capabilities: [], - name: pkg['name'] ?? 'n/a', - dir: pluginDir, - }; + const mod = (await import(pathMod)).default ?? {}; - if (typeof mod['pluginHookImport'] === 'function') { - info.capabilities.push('import'); - } + const capabilities: string[] = []; + if (typeof mod.pluginHookExport === 'function') capabilities.push('export'); + if (typeof mod.pluginHookImport === 'function') capabilities.push('import'); + if (typeof mod.pluginHookResponseFilter === 'function') capabilities.push('filter'); - if (typeof mod['pluginHookExport'] === 'function') { - info.capabilities.push('export'); - } + console.log('Plugin initialized', pkg.name, capabilities, Object.keys(mod)); - if (typeof mod['pluginHookResponseFilter'] === 'function') { - info.capabilities.push('filter'); - } + // Message comes into the plugin to be processed + parentPort!.on('message', async ({ payload, pluginRefId, id: replyId }: InternalEvent) => { + console.log(`Received ${payload.type}`); - console.log('Loaded plugin', info.name, info.capabilities, info.dir); - - function reply(originalMsg: ParentToWorkerEvent, payload: T) { - parentPort!.postMessage({ payload, callbackId: originalMsg.callbackId }); - } - - function replyErr(originalMsg: ParentToWorkerEvent, error: unknown) { - parentPort!.postMessage({ - error: String(error), - callbackId: originalMsg.callbackId, - }); - } - - parentPort!.on('message', async (msg: ParentToWorkerEvent) => { try { - const ctx = { todo: 'implement me' }; - if (msg.name === 'run-import') { - reply(msg, await mod.pluginHookImport(ctx, msg.payload)); - } else if (msg.name === 'run-filter') { - reply(msg, await mod.pluginHookResponseFilter(ctx, msg.payload)); - } else if (msg.name === 'run-export') { - reply(msg, await mod.pluginHookExport(ctx, msg.payload)); - } else if (msg.name === 'info') { - reply(msg, info); - } else { - console.log('Unknown message', msg); + if (payload.type === 'boot_request') { + const payload: InternalEventPayload = { + type: 'boot_response', + name: pkg.name, + version: pkg.version, + capabilities, + }; + sendToServer({ id: genId(), pluginRefId, replyId, payload }); + return; } - } catch (err: unknown) { - replyErr(msg, err); + + if (payload.type === 'import_request' && typeof mod.pluginHookImport === 'function') { + const reply: ImportResponse | null = await mod.pluginHookImport({}, payload.content); + if (reply != null) { + const replyPayload: InternalEventPayload = { + type: 'import_response', + resources: reply?.resources, + }; + sendToServer({ id: genId(), pluginRefId, replyId, payload: replyPayload }); + return; + } else { + // Continue, to send back an empty reply + } + } + + if ( + payload.type === 'export_http_request_request' && + typeof mod.pluginHookExport === 'function' + ) { + const reply: string = await mod.pluginHookExport({}, payload.httpRequest); + const replyPayload: InternalEventPayload = { + type: 'export_http_request_response', + content: reply, + }; + sendToServer({ id: genId(), pluginRefId, replyId, payload: replyPayload }); + return; + } + + if (payload.type === 'filter_request' && typeof mod.pluginHookResponseFilter === 'function') { + const reply: string = await mod.pluginHookResponseFilter( + {}, + { filter: payload.filter, body: payload.content }, + ); + const replyPayload: InternalEventPayload = { + type: 'filter_response', + items: JSON.parse(reply), + }; + sendToServer({ id: genId(), pluginRefId, replyId, payload: replyPayload }); + return; + } + } catch (err) { + console.log('Plugin call threw exception', payload.type, err); + // TODO: Return errors to server } + + // No matches, so send back an empty response so the caller doesn't block forever + const id = genId(); + console.log('Sending nothing back to', id, { replyId }); + sendToServer({ id, pluginRefId, replyId, payload: { type: 'empty_response' } }); }); resolve(); }).catch((err) => { console.log('failed to boot plugin', err); }); + +function sendToServer(e: InternalEvent) { + parentPort!.postMessage(e); +} + +function genId(len = 5): string { + const alphabet = '01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; + let id = ''; + for (let i = 0; i < len; i++) { + id += alphabet[Math.floor(Math.random() * alphabet.length)]; + } + return id; +} + +function prefixStdout(s: string) { + if (!s.includes('%s')) { + throw new Error('Console prefix must contain a "%s" replacer'); + } + interceptStdout((text) => { + const lines = text.split(/\n/); + let newText = ''; + for (let i = 0; i < lines.length; i++) { + if (lines[i] == '') continue; + newText += util.format(s, lines[i]) + '\n'; + } + return newText.trimEnd(); + }); +} diff --git a/plugin-runtime/src/plugins.ts b/plugin-runtime/src/plugins.ts deleted file mode 100644 index 13eee780..00000000 --- a/plugin-runtime/src/plugins.ts +++ /dev/null @@ -1,18 +0,0 @@ -import * as fs from 'node:fs'; -import path from 'node:path'; -import { PluginHandle } from './PluginHandle'; - -export interface PluginInfo { - name: string; - dir: string; - capabilities: ('import' | 'export' | 'filter')[]; -} - -export function loadPlugins(): PluginHandle[] { - const pluginsDir = process.env.YAAK_PLUGINS_DIR; - if (!pluginsDir) throw new Error('YAAK_PLUGINS_DIR is not set'); - console.log('Loading plugins from', pluginsDir); - - const pluginDirs = fs.readdirSync(pluginsDir).map((p) => path.join(pluginsDir, p)); - return pluginDirs.map((pluginDir) => new PluginHandle(pluginDir)); -} diff --git a/proto/plugins/runtime.proto b/proto/plugins/runtime.proto index 4ec5e9c0..3c4439f5 100644 --- a/proto/plugins/runtime.proto +++ b/proto/plugins/runtime.proto @@ -3,10 +3,7 @@ syntax = "proto3"; package yaak.plugins.runtime; service PluginRuntime { - rpc hookImport (HookImportRequest) returns (HookResponse); - rpc hookExport (HookExportRequest) returns (HookResponse); - rpc hookResponseFilter (HookResponseFilterRequest) returns (HookResponse); -} + rpc EventStream (stream EventStreamEvent) returns (stream EventStreamEvent);} message PluginInfo { string plugin = 1; @@ -30,3 +27,7 @@ message HookResponseFilterRequest { message HookExportRequest { string request = 1; } + +message EventStreamEvent { + string event = 1; +} diff --git a/src-tauri/Cargo.lock b/src-tauri/Cargo.lock index 6d19523d..10c80d62 100644 --- a/src-tauri/Cargo.lock +++ b/src-tauri/Cargo.lock @@ -7632,9 +7632,12 @@ dependencies = [ "serde_json", "tauri", "tauri-plugin-shell", + "thiserror", "tokio", "tonic 0.12.1", "tonic-build", + "ts-rs", + "yaak_models", ] [[package]] diff --git a/src-tauri/Cargo.toml b/src-tauri/Cargo.toml index 4af9a4b7..01372bbd 100644 --- a/src-tauri/Cargo.toml +++ b/src-tauri/Cargo.toml @@ -29,7 +29,6 @@ openssl-sys = { version = "0.9", features = ["vendored"] } # For Ubuntu installa grpc = { path = "./grpc" } templates = { path = "./templates" } yaak_plugin_runtime = { path = "yaak_plugin_runtime" } -yaak_models = { path = "yaak_models" } anyhow = "1.0.86" base64 = "0.22.0" chrono = { version = "0.4.31", features = ["serde"] } @@ -56,10 +55,12 @@ tauri-plugin-updater = "2.0.0-rc.0" tauri-plugin-window-state = "2.0.0-rc.0" tokio = { version = "1.36.0", features = ["sync"] } tokio-stream = "0.1.15" +yaak_models = {workspace = true} uuid = "1.7.0" thiserror = "1.0.61" mime_guess = "2.0.5" [workspace.dependencies] +yaak_models = { path = "yaak_models" } tauri = { version = "2.0.0-rc.0", features = ["devtools", "protocol-asset"] } tauri-plugin-shell = "2.0.0-rc.0" diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 05444430..0f94ca1c 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -35,9 +35,9 @@ use ::grpc::{deserialize_message, serialize_message, Code, ServiceDefinition}; use yaak_plugin_runtime::manager::PluginManager; use crate::analytics::{AnalyticsAction, AnalyticsResource}; +use crate::export_resources::{get_workspace_export_resources, WorkspaceExportResources}; use crate::grpc::metadata_to_map; use crate::http_request::send_http_request; -use crate::export_resources::{get_workspace_export_resources, ImportResult, WorkspaceExportResources}; use crate::notifications::YaakNotifier; use crate::render::{render_request, variables_from_environment}; use crate::updates::{UpdateMode, YaakUpdater}; @@ -61,9 +61,9 @@ use yaak_models::queries::{ }; mod analytics; +mod export_resources; mod grpc; mod http_request; -mod export_resources; mod notifications; mod render; #[cfg(target_os = "macos")] @@ -102,13 +102,13 @@ struct AppMetaData { async fn cmd_metadata(app_handle: AppHandle) -> Result { let app_data_dir = app_handle.path().app_data_dir().unwrap(); let app_log_dir = app_handle.path().app_log_dir().unwrap(); - return Ok(AppMetaData { + Ok(AppMetaData { is_dev: is_dev(), version: app_handle.package_info().version.to_string(), name: app_handle.package_info().name.to_string(), app_data_dir: app_data_dir.to_string_lossy().to_string(), app_log_dir: app_log_dir.to_string_lossy().to_string(), - }); + }) } #[tauri::command] @@ -720,7 +720,8 @@ async fn cmd_filter_response( response_id: &str, plugin_manager: State<'_, Mutex>, filter: &str, -) -> Result { +) -> Result, String> { + println!("FILTERING? {filter}"); let response = get_http_response(&w, response_id) .await .expect("Failed to get response"); @@ -743,9 +744,10 @@ async fn cmd_filter_response( plugin_manager .lock() .await - .run_response_filter(filter, &body, &content_type) + .run_filter(filter, &body, &content_type) .await - .map(|r| r.data) + .map(|r| r.items) + .map_err(|e| e.to_string()) } #[tauri::command] @@ -753,29 +755,16 @@ async fn cmd_import_data( w: WebviewWindow, plugin_manager: State<'_, Mutex>, file_path: &str, - _workspace_id: &str, ) -> Result { let file = read_to_string(file_path).unwrap_or_else(|_| panic!("Unable to read file {}", file_path)); let file_contents = file.as_str(); - let import_response = plugin_manager + let (import_result, plugin_name) = plugin_manager .lock() .await .run_import(file_contents) - .await?; - let import_result: ImportResult = - serde_json::from_str(import_response.data.as_str()).map_err(|e| e.to_string())?; - - // TODO: Track the plugin that ran, maybe return the run info in the plugin response? - let plugin_name = import_response.info.unwrap_or_default().plugin; - info!("Imported data using {}", plugin_name); - analytics::track_event( - &w.app_handle(), - AnalyticsResource::App, - AnalyticsAction::Import, - Some(json!({ "plugin": plugin_name })), - ) - .await; + .await + .map_err(|e| e.to_string())?; let mut imported_resources = WorkspaceExportResources::default(); let mut id_map: HashMap = HashMap::new(); @@ -806,7 +795,9 @@ async fn cmd_import_data( } } - for mut v in import_result.resources.workspaces { + let resources = import_result.resources; + + for mut v in resources.workspaces { v.id = maybe_gen_id(v.id.as_str(), ModelType::TypeWorkspace, &mut id_map); let x = upsert_workspace(&w, v).await.map_err(|e| e.to_string())?; imported_resources.workspaces.push(x.clone()); @@ -816,7 +807,7 @@ async fn cmd_import_data( imported_resources.workspaces.len() ); - for mut v in import_result.resources.environments { + for mut v in resources.environments { v.id = maybe_gen_id(v.id.as_str(), ModelType::TypeEnvironment, &mut id_map); v.workspace_id = maybe_gen_id( v.workspace_id.as_str(), @@ -831,7 +822,7 @@ async fn cmd_import_data( imported_resources.environments.len() ); - for mut v in import_result.resources.folders { + for mut v in resources.folders { v.id = maybe_gen_id(v.id.as_str(), ModelType::TypeFolder, &mut id_map); v.workspace_id = maybe_gen_id( v.workspace_id.as_str(), @@ -844,7 +835,7 @@ async fn cmd_import_data( } info!("Imported {} folders", imported_resources.folders.len()); - for mut v in import_result.resources.http_requests { + for mut v in resources.http_requests { v.id = maybe_gen_id(v.id.as_str(), ModelType::TypeHttpRequest, &mut id_map); v.workspace_id = maybe_gen_id( v.workspace_id.as_str(), @@ -862,7 +853,7 @@ async fn cmd_import_data( imported_resources.http_requests.len() ); - for mut v in import_result.resources.grpc_requests { + for mut v in resources.grpc_requests { v.id = maybe_gen_id(v.id.as_str(), ModelType::TypeGrpcRequest, &mut id_map); v.workspace_id = maybe_gen_id( v.workspace_id.as_str(), @@ -880,6 +871,14 @@ async fn cmd_import_data( imported_resources.grpc_requests.len() ); + analytics::track_event( + &w.app_handle(), + AnalyticsResource::App, + AnalyticsAction::Import, + Some(json!({ "plugin": plugin_name })), + ) + .await; + Ok(imported_resources) } @@ -901,14 +900,14 @@ async fn cmd_request_to_curl( .await .map_err(|e| e.to_string())?; let rendered = render_request(&request, &workspace, environment.as_ref()); - let request_json = serde_json::to_string(&rendered).map_err(|e| e.to_string())?; let import_response = plugin_manager .lock() .await - .run_export_curl(request_json.as_str()) - .await?; - Ok(import_response.data) + .run_export_curl(&rendered) + .await + .map_err(|e| e.to_string())?; + Ok(import_response.content) } #[tauri::command] @@ -916,10 +915,23 @@ async fn cmd_curl_to_request( command: &str, plugin_manager: State<'_, Mutex>, workspace_id: &str, + w: WebviewWindow, ) -> Result { - let import_response = plugin_manager.lock().await.run_import(command).await?; - let import_result: ImportResult = - serde_json::from_str(import_response.data.as_str()).map_err(|e| e.to_string())?; + let (import_result, plugin_name) = plugin_manager + .lock() + .await + .run_import(command) + .await + .map_err(|e| e.to_string())?; + + analytics::track_event( + &w.app_handle(), + AnalyticsResource::App, + AnalyticsAction::Import, + Some(json!({ "plugin": plugin_name })), + ) + .await; + import_result .resources .http_requests @@ -946,6 +958,7 @@ async fn cmd_export_data( .write(true) .open(export_path) .expect("Unable to create file"); + serde_json::to_writer_pretty(&f, &export_data) .map_err(|e| e.to_string()) .expect("Failed to write"); @@ -1590,6 +1603,7 @@ pub fn run() { .level_for("cookie_store", log::LevelFilter::Info) .level_for("h2", log::LevelFilter::Info) .level_for("hyper", log::LevelFilter::Info) + .level_for("hyper_util", log::LevelFilter::Info) .level_for("hyper_rustls", log::LevelFilter::Info) .level_for("reqwest", log::LevelFilter::Info) .level_for("sqlx", log::LevelFilter::Warn) @@ -1615,8 +1629,8 @@ pub fn run() { .plugin(tauri_plugin_dialog::init()) .plugin(tauri_plugin_os::init()) .plugin(tauri_plugin_fs::init()) - .plugin(yaak_models::Builder::default().build()) - .plugin(yaak_plugin_runtime::init()); + .plugin(yaak_models::plugin::Builder::default().build()) + .plugin(yaak_plugin_runtime::plugin::init()); #[cfg(target_os = "macos")] { diff --git a/src-tauri/yaak_models/src/error.rs b/src-tauri/yaak_models/src/error.rs new file mode 100644 index 00000000..7839da8a --- /dev/null +++ b/src-tauri/yaak_models/src/error.rs @@ -0,0 +1,13 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("SQL error")] + SqlError(#[from] rusqlite::Error), + #[error("JSON error")] + JsonError(#[from] serde_json::Error), + #[error("unknown error")] + Unknown, +} + +pub type Result = std::result::Result; \ No newline at end of file diff --git a/src-tauri/yaak_models/src/lib.rs b/src-tauri/yaak_models/src/lib.rs index ede1fcb9..d6b3e6cd 100644 --- a/src-tauri/yaak_models/src/lib.rs +++ b/src-tauri/yaak_models/src/lib.rs @@ -1,57 +1,5 @@ -use std::env::current_dir; -use std::fs::create_dir_all; -use r2d2; -use r2d2_sqlite; - -use log::info; -use r2d2::Pool; -use r2d2_sqlite::SqliteConnectionManager; -use serde::Deserialize; -use tauri::async_runtime::Mutex; -use tauri::plugin::TauriPlugin; -use tauri::{is_dev, plugin, Manager, Runtime}; - pub mod models; pub mod queries; +mod error; -pub struct SqliteConnection(Mutex>); - -#[derive(Default, Deserialize)] -pub struct PluginConfig { - // Nothing yet (will be configurable in tauri.conf.json -} - -/// Tauri SQL plugin builder. -#[derive(Default)] -pub struct Builder { - // Nothing Yet -} - -impl Builder { - pub fn new() -> Self { - Self::default() - } - - pub fn build(&self) -> TauriPlugin> { - plugin::Builder::>::new("yaak_models") - .setup(|app, _api| { - let app_path = match is_dev() { - true => current_dir().unwrap(), - false => app.path().app_data_dir().unwrap(), - }; - - create_dir_all(app_path.clone()).expect("Problem creating App directory!"); - - let db_file_path = app_path.join("db.sqlite"); - info!("Opening SQLite DB at {db_file_path:?}"); - - let manager = SqliteConnectionManager::file(db_file_path); - let pool = Pool::new(manager).unwrap(); - - app.manage(SqliteConnection(Mutex::new(pool))); - - Ok(()) - }) - .build() - } -} +pub mod plugin; \ No newline at end of file diff --git a/src-tauri/yaak_models/src/models.rs b/src-tauri/yaak_models/src/models.rs index 52342590..ceaca243 100644 --- a/src-tauri/yaak_models/src/models.rs +++ b/src-tauri/yaak_models/src/models.rs @@ -8,7 +8,7 @@ use ts_rs::TS; #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../src-web/lib/gen/")] +#[ts(export)] pub struct Settings { pub id: String, #[ts(type = "\"settings\"")] @@ -72,7 +72,7 @@ impl<'s> TryFrom<&Row<'s>> for Settings { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct Workspace { pub id: String, #[ts(type = "\"workspace\"")] @@ -140,7 +140,7 @@ impl Workspace { } #[derive(Debug, Clone, Serialize, Deserialize, TS)] -#[ts(export, export_to = "../../../src-web/lib/gen/")] +#[ts(export)] enum CookieDomain { HostOnly(String), Suffix(String), @@ -149,14 +149,14 @@ enum CookieDomain { } #[derive(Debug, Clone, Serialize, Deserialize, TS)] -#[ts(export, export_to = "../../../src-web/lib/gen/")] +#[ts(export)] enum CookieExpires { AtUtc(String), SessionEnd, } #[derive(Debug, Clone, Serialize, Deserialize, TS)] -#[ts(export, export_to = "../../../src-web/lib/gen/")] +#[ts(export)] pub struct Cookie { raw_cookie: String, domain: CookieDomain, @@ -166,7 +166,7 @@ pub struct Cookie { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../src-web/lib/gen/")] +#[ts(export)] pub struct CookieJar { pub id: String, #[ts(type = "\"cookie_jar\"")] @@ -210,7 +210,7 @@ impl<'s> TryFrom<&Row<'s>> for CookieJar { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct Environment { pub id: String, pub workspace_id: String, @@ -254,7 +254,7 @@ impl<'s> TryFrom<&Row<'s>> for Environment { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct EnvironmentVariable { #[serde(default = "default_true")] #[ts(optional, as = "Option")] @@ -265,7 +265,7 @@ pub struct EnvironmentVariable { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct Folder { pub created_at: NaiveDateTime, pub updated_at: NaiveDateTime, @@ -311,7 +311,7 @@ impl<'s> TryFrom<&Row<'s>> for Folder { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct HttpRequestHeader { #[serde(default = "default_true")] #[ts(optional, as = "Option")] @@ -322,7 +322,7 @@ pub struct HttpRequestHeader { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct HttpUrlParameter { #[serde(default = "default_true")] #[ts(optional, as = "Option")] @@ -333,7 +333,7 @@ pub struct HttpUrlParameter { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct HttpRequest { pub created_at: NaiveDateTime, pub updated_at: NaiveDateTime, @@ -348,7 +348,7 @@ pub struct HttpRequest { pub url_parameters: Vec, #[serde(default = "default_http_request_method")] pub method: String, - #[ts(type = "Record")] + #[ts(type = "Record")] pub body: HashMap, pub body_type: Option, #[ts(type = "Record")] @@ -410,7 +410,7 @@ impl<'s> TryFrom<&Row<'s>> for HttpRequest { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct HttpResponseHeader { pub name: String, pub value: String, @@ -418,7 +418,7 @@ pub struct HttpResponseHeader { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct HttpResponse { pub id: String, #[ts(type = "\"http_response\"")] @@ -501,7 +501,7 @@ impl HttpResponse { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct GrpcMetadataEntry { #[serde(default = "default_true")] #[ts(optional, as = "Option")] @@ -512,7 +512,7 @@ pub struct GrpcMetadataEntry { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct GrpcRequest { pub id: String, #[ts(type = "\"grpc_request\"")] @@ -582,7 +582,7 @@ impl<'s> TryFrom<&Row<'s>> for GrpcRequest { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct GrpcConnection { pub id: String, #[ts(type = "\"grpc_connection\"")] @@ -644,7 +644,7 @@ impl<'s> TryFrom<&Row<'s>> for GrpcConnection { #[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq, TS)] #[serde(rename_all = "snake_case")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub enum GrpcEventType { Info, Error, @@ -662,7 +662,7 @@ impl Default for GrpcEventType { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct GrpcEvent { pub id: String, #[ts(type = "\"grpc_event\"")] @@ -720,7 +720,7 @@ impl<'s> TryFrom<&Row<'s>> for GrpcEvent { #[derive(Debug, Clone, Serialize, Deserialize, Default, TS)] #[serde(default, rename_all = "camelCase")] -#[ts(export, export_to = "../../../plugin-runtime-types/src/gen/")] +#[ts(export)] pub struct KeyValue { #[ts(type = "\"key_value\"")] pub model: String, diff --git a/src-tauri/yaak_models/src/plugin.rs b/src-tauri/yaak_models/src/plugin.rs new file mode 100644 index 00000000..c5907733 --- /dev/null +++ b/src-tauri/yaak_models/src/plugin.rs @@ -0,0 +1,56 @@ +use log::info; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; +use serde::Deserialize; +use std::env::current_dir; +use std::fs::create_dir_all; +use std::time::Duration; +use tauri::async_runtime::Mutex; +use tauri::plugin::TauriPlugin; +use tauri::{is_dev, plugin, Manager, Runtime}; + +pub struct SqliteConnection(pub Mutex>); + +#[derive(Default, Deserialize)] +pub struct PluginConfig { + // Nothing yet (will be configurable in tauri.conf.json +} + +/// Tauri SQL plugin builder. +#[derive(Default)] +pub struct Builder { + // Nothing Yet +} + +impl Builder { + pub fn new() -> Self { + Self::default() + } + + pub fn build(&self) -> TauriPlugin> { + plugin::Builder::>::new("yaak_models") + .setup(|app, _api| { + let app_path = match is_dev() { + true => current_dir().unwrap(), + false => app.path().app_data_dir().unwrap(), + }; + + create_dir_all(app_path.clone()).expect("Problem creating App directory!"); + + let db_file_path = app_path.join("db.sqlite"); + info!("Opening SQLite DB at {db_file_path:?}"); + + let manager = SqliteConnectionManager::file(db_file_path); + let pool = Pool::builder() + .max_size(1000) // Up from 10 (just in case) + .connection_timeout(Duration::from_secs(10)) + .build(manager) + .unwrap(); + + app.manage(SqliteConnection(Mutex::new(pool))); + + Ok(()) + }) + .build() + } +} diff --git a/src-tauri/yaak_models/src/queries.rs b/src-tauri/yaak_models/src/queries.rs index 66d0833a..0fabacf8 100644 --- a/src-tauri/yaak_models/src/queries.rs +++ b/src-tauri/yaak_models/src/queries.rs @@ -1,6 +1,14 @@ use std::fs; -use log::error; +use crate::error::Result; +use crate::models::{ + CookieJar, CookieJarIden, Environment, EnvironmentIden, Folder, FolderIden, GrpcConnection, + GrpcConnectionIden, GrpcEvent, GrpcEventIden, GrpcRequest, GrpcRequestIden, HttpRequest, + HttpRequestIden, HttpResponse, HttpResponseHeader, HttpResponseIden, KeyValue, KeyValueIden, + ModelType, Settings, SettingsIden, Workspace, WorkspaceIden, +}; +use crate::plugin::SqliteConnection; +use log::{debug, error}; use rand::distributions::{Alphanumeric, DistString}; use sea_query::ColumnRef::Asterisk; use sea_query::Keyword::CurrentTimestamp; @@ -8,25 +16,6 @@ use sea_query::{Cond, Expr, OnConflict, Order, Query, SqliteQueryBuilder}; use sea_query_rusqlite::RusqliteBinder; use serde::Serialize; use tauri::{AppHandle, Emitter, Manager, WebviewWindow, Wry}; -use thiserror::Error; - -use crate::models::{ - CookieJar, CookieJarIden, Environment, EnvironmentIden, Folder, FolderIden, GrpcConnection, - GrpcConnectionIden, GrpcEvent, GrpcEventIden, GrpcRequest, GrpcRequestIden, HttpRequest, - HttpRequestIden, HttpResponse, HttpResponseHeader, HttpResponseIden, KeyValue, KeyValueIden, - ModelType, Settings, SettingsIden, Workspace, WorkspaceIden, -}; -use crate::SqliteConnection; - -#[derive(Error, Debug)] -pub enum DBError { - #[error("SQL error")] - SqlError(#[from] rusqlite::Error), - #[error("JSON error")] - JsonError(#[from] serde_json::Error), - #[error("unknown error")] - Unknown, -} pub async fn set_key_value_string( mgr: &impl Manager, @@ -96,9 +85,10 @@ pub async fn set_key_value_raw( key: &str, value: &str, ) -> (KeyValue, bool) { + let existing = get_key_value_raw(mgr, namespace, key).await; + let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); - let existing = get_key_value_raw(mgr, namespace, key).await; let (sql, params) = Query::insert() .into_table(KeyValueIden::Table) .columns([ @@ -153,7 +143,7 @@ pub async fn get_key_value_raw( .ok() } -pub async fn list_workspaces(mgr: &impl Manager) -> Result, DBError> { +pub async fn list_workspaces(mgr: &impl Manager) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -165,7 +155,7 @@ pub async fn list_workspaces(mgr: &impl Manager) -> Result, Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn get_workspace(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_workspace(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -177,10 +167,7 @@ pub async fn get_workspace(mgr: &impl Manager, id: &str) -> Result Result { +pub async fn upsert_workspace(window: &WebviewWindow, workspace: Workspace) -> Result { let id = match workspace.id.as_str() { "" => generate_model_id(ModelType::TypeWorkspace), _ => workspace.id.to_string(), @@ -235,10 +222,11 @@ pub async fn upsert_workspace( Ok(emit_upserted_model(window, m)) } -pub async fn delete_workspace(window: &WebviewWindow, id: &str) -> Result { +pub async fn delete_workspace(window: &WebviewWindow, id: &str) -> Result { + let workspace = get_workspace(window, id).await?; + let dbm = &*window.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); - let workspace = get_workspace(window, id).await?; let (sql, params) = Query::delete() .from_table(WorkspaceIden::Table) @@ -253,7 +241,7 @@ pub async fn delete_workspace(window: &WebviewWindow, id: &str) -> Result, id: &str) -> Result { +pub async fn get_cookie_jar(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -269,7 +257,7 @@ pub async fn get_cookie_jar(mgr: &impl Manager, id: &str) -> Result, workspace_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -282,7 +270,7 @@ pub async fn list_cookie_jars( Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn delete_cookie_jar(window: &WebviewWindow, id: &str) -> Result { +pub async fn delete_cookie_jar(window: &WebviewWindow, id: &str) -> Result { let cookie_jar = get_cookie_jar(window, id).await?; let dbm = &*window.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -296,16 +284,13 @@ pub async fn delete_cookie_jar(window: &WebviewWindow, id: &str) -> Result Result { +pub async fn duplicate_grpc_request(window: &WebviewWindow, id: &str) -> Result { let mut request = get_grpc_request(window, id).await?.clone(); request.id = "".to_string(); upsert_grpc_request(window, &request).await } -pub async fn delete_grpc_request(window: &WebviewWindow, id: &str) -> Result { +pub async fn delete_grpc_request(window: &WebviewWindow, id: &str) -> Result { let req = get_grpc_request(window, id).await?; let dbm = &*window.app_handle().state::(); @@ -322,15 +307,15 @@ pub async fn delete_grpc_request(window: &WebviewWindow, id: &str) -> Result Result { - let dbm = &*window.app_handle().state::(); - let db = dbm.0.lock().await.get().unwrap(); +) -> Result { let id = match request.id.as_str() { "" => generate_model_id(ModelType::TypeGrpcRequest), _ => request.id.to_string(), }; let trimmed_name = request.name.trim(); + let dbm = &*window.app_handle().state::(); + let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::insert() .into_table(GrpcRequestIden::Table) .columns([ @@ -396,7 +381,7 @@ pub async fn upsert_grpc_request( Ok(emit_upserted_model(window, m)) } -pub async fn get_grpc_request(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_grpc_request(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -412,7 +397,7 @@ pub async fn get_grpc_request(mgr: &impl Manager, id: &str) -> Result, workspace_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -428,13 +413,13 @@ pub async fn list_grpc_requests( pub async fn upsert_grpc_connection( window: &WebviewWindow, connection: &GrpcConnection, -) -> Result { - let dbm = &*window.app_handle().state::(); - let db = dbm.0.lock().await.get().unwrap(); +) -> Result { let id = match connection.id.as_str() { "" => generate_model_id(ModelType::TypeGrpcConnection), _ => connection.id.to_string(), }; + let dbm = &*window.app_handle().state::(); + let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::insert() .into_table(GrpcConnectionIden::Table) .columns([ @@ -487,10 +472,7 @@ pub async fn upsert_grpc_connection( Ok(emit_upserted_model(window, m)) } -pub async fn get_grpc_connection( - mgr: &impl Manager, - id: &str, -) -> Result { +pub async fn get_grpc_connection(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -505,7 +487,7 @@ pub async fn get_grpc_connection( pub async fn list_grpc_connections( mgr: &impl Manager, request_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -520,10 +502,7 @@ pub async fn list_grpc_connections( Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn delete_grpc_connection( - window: &WebviewWindow, - id: &str, -) -> Result { +pub async fn delete_grpc_connection(window: &WebviewWindow, id: &str) -> Result { let resp = get_grpc_connection(window, id).await?; let dbm = &*window.app_handle().state::(); @@ -538,27 +517,21 @@ pub async fn delete_grpc_connection( emit_deleted_model(window, resp) } -pub async fn delete_all_grpc_connections( - window: &WebviewWindow, - request_id: &str, -) -> Result<(), DBError> { +pub async fn delete_all_grpc_connections(window: &WebviewWindow, request_id: &str) -> Result<()> { for r in list_grpc_connections(window, request_id).await? { delete_grpc_connection(window, &r.id).await?; } Ok(()) } -pub async fn upsert_grpc_event( - window: &WebviewWindow, - event: &GrpcEvent, -) -> Result { - let dbm = &*window.app_handle().state::(); - let db = dbm.0.lock().await.get().unwrap(); +pub async fn upsert_grpc_event(window: &WebviewWindow, event: &GrpcEvent) -> Result { let id = match event.id.as_str() { "" => generate_model_id(ModelType::TypeGrpcEvent), _ => event.id.to_string(), }; + let dbm = &*window.app_handle().state::(); + let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::insert() .into_table(GrpcEventIden::Table) .columns([ @@ -607,7 +580,7 @@ pub async fn upsert_grpc_event( Ok(emit_upserted_model(window, m)) } -pub async fn get_grpc_event(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_grpc_event(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -622,7 +595,7 @@ pub async fn get_grpc_event(mgr: &impl Manager, id: &str) -> Result, connection_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -640,7 +613,7 @@ pub async fn list_grpc_events( pub async fn upsert_cookie_jar( window: &WebviewWindow, cookie_jar: &CookieJar, -) -> Result { +) -> Result { let id = match cookie_jar.id.as_str() { "" => generate_model_id(ModelType::TypeCookieJar), _ => cookie_jar.id.to_string(), @@ -688,7 +661,7 @@ pub async fn upsert_cookie_jar( pub async fn list_environments( mgr: &impl Manager, workspace_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -703,11 +676,12 @@ pub async fn list_environments( Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn delete_environment(window: &WebviewWindow, id: &str) -> Result { - let dbm = &*window.app_handle().state::(); - let db = dbm.0.lock().await.get().unwrap(); +pub async fn delete_environment(window: &WebviewWindow, id: &str) -> Result { let env = get_environment(window, id).await?; + let dbm = &*window.app_handle().state::(); + let db = dbm.0.lock().await.get().unwrap(); + let (sql, params) = Query::delete() .from_table(EnvironmentIden::Table) .cond_where(Expr::col(EnvironmentIden::Id).eq(id)) @@ -717,7 +691,7 @@ pub async fn delete_environment(window: &WebviewWindow, id: &str) -> Result) -> Result { +async fn get_settings(mgr: &impl Manager) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -752,10 +726,7 @@ pub async fn get_or_create_settings(mgr: &impl Manager) -> Settings { .expect("Failed to insert Settings") } -pub async fn update_settings( - window: &WebviewWindow, - settings: Settings, -) -> Result { +pub async fn update_settings(window: &WebviewWindow, settings: Settings) -> Result { let dbm = &*window.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -810,7 +781,7 @@ pub async fn update_settings( pub async fn upsert_environment( window: &WebviewWindow, environment: Environment, -) -> Result { +) -> Result { let id = match environment.id.as_str() { "" => generate_model_id(ModelType::TypeEnvironment), _ => environment.id.to_string(), @@ -857,7 +828,7 @@ pub async fn upsert_environment( Ok(emit_upserted_model(window, m)) } -pub async fn get_environment(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_environment(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -870,7 +841,7 @@ pub async fn get_environment(mgr: &impl Manager, id: &str) -> Result, id: &str) -> Result { +pub async fn get_folder(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -883,10 +854,7 @@ pub async fn get_folder(mgr: &impl Manager, id: &str) -> Result, - workspace_id: &str, -) -> Result, DBError> { +pub async fn list_folders(mgr: &impl Manager, workspace_id: &str) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -901,8 +869,9 @@ pub async fn list_folders( Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn delete_folder(window: &WebviewWindow, id: &str) -> Result { +pub async fn delete_folder(window: &WebviewWindow, id: &str) -> Result { let folder = get_folder(window, id).await?; + let dbm = &*window.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -915,7 +884,7 @@ pub async fn delete_folder(window: &WebviewWindow, id: &str) -> Result Result { +pub async fn upsert_folder(window: &WebviewWindow, r: Folder) -> Result { let id = match r.id.as_str() { "" => generate_model_id(ModelType::TypeFolder), _ => r.id.to_string(), @@ -941,6 +910,7 @@ pub async fn upsert_folder(window: &WebviewWindow, r: Folder) -> Result Result Result { +pub async fn duplicate_http_request(window: &WebviewWindow, id: &str) -> Result { let mut request = get_http_request(window, id).await?.clone(); request.id = "".to_string(); upsert_http_request(window, request).await } -pub async fn upsert_http_request( - window: &WebviewWindow, - r: HttpRequest, -) -> Result { +pub async fn upsert_http_request(window: &WebviewWindow, r: HttpRequest) -> Result { let id = match r.id.as_str() { "" => generate_model_id(ModelType::TypeHttpRequest), _ => r.id.to_string(), @@ -1050,7 +1014,7 @@ pub async fn upsert_http_request( pub async fn list_http_requests( mgr: &impl Manager, workspace_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -1064,7 +1028,7 @@ pub async fn list_http_requests( Ok(items.map(|v| v.unwrap()).collect()) } -pub async fn get_http_request(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_http_request(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -1077,7 +1041,7 @@ pub async fn get_http_request(mgr: &impl Manager, id: &str) -> Result Result { +pub async fn delete_http_request(window: &WebviewWindow, id: &str) -> Result { let req = get_http_request(window, id).await?; // DB deletes will cascade but this will delete the files @@ -1108,7 +1072,7 @@ pub async fn create_http_response( headers: Vec, version: Option<&str>, remote_addr: Option<&str>, -) -> Result { +) -> Result { let req = get_http_request(window, request_id).await?; let id = generate_model_id(ModelType::TypeHttpResponse); let dbm = &*window.app_handle().state::(); @@ -1158,7 +1122,7 @@ pub async fn create_http_response( Ok(emit_upserted_model(window, m)) } -pub async fn cancel_pending_grpc_connections(app: &AppHandle) -> Result<(), DBError> { +pub async fn cancel_pending_grpc_connections(app: &AppHandle) -> Result<()> { let dbm = &*app.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -1172,7 +1136,7 @@ pub async fn cancel_pending_grpc_connections(app: &AppHandle) -> Result<(), DBEr Ok(()) } -pub async fn cancel_pending_responses(app: &AppHandle) -> Result<(), DBError> { +pub async fn cancel_pending_responses(app: &AppHandle) -> Result<()> { let dbm = &*app.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -1192,7 +1156,7 @@ pub async fn cancel_pending_responses(app: &AppHandle) -> Result<(), DBError> { pub async fn update_response_if_id( window: &WebviewWindow, response: &HttpResponse, -) -> Result { +) -> Result { if response.id.is_empty() { Ok(response.clone()) } else { @@ -1203,7 +1167,7 @@ pub async fn update_response_if_id( pub async fn update_response( window: &WebviewWindow, response: &HttpResponse, -) -> Result { +) -> Result { let dbm = &*window.app_handle().state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -1254,7 +1218,7 @@ pub async fn update_response( Ok(emit_upserted_model(window, m)) } -pub async fn get_http_response(mgr: &impl Manager, id: &str) -> Result { +pub async fn get_http_response(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -1266,10 +1230,7 @@ pub async fn get_http_response(mgr: &impl Manager, id: &str) -> Result Result { +pub async fn delete_http_response(window: &WebviewWindow, id: &str) -> Result { let resp = get_http_response(window, id).await?; // Delete the body file if it exists @@ -1290,10 +1251,7 @@ pub async fn delete_http_response( emit_deleted_model(window, resp) } -pub async fn delete_all_http_responses( - window: &WebviewWindow, - request_id: &str, -) -> Result<(), DBError> { +pub async fn delete_all_http_responses(window: &WebviewWindow, request_id: &str) -> Result<()> { for r in list_responses(window, request_id, None).await? { delete_http_response(window, &r.id).await?; } @@ -1304,7 +1262,7 @@ pub async fn list_responses( mgr: &impl Manager, request_id: &str, limit: Option, -) -> Result, DBError> { +) -> Result> { let limit_unwrapped = limit.unwrap_or_else(|| i64::MAX); let dbm = mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); @@ -1323,7 +1281,7 @@ pub async fn list_responses( pub async fn list_responses_by_workspace_id( mgr: &impl Manager, workspace_id: &str, -) -> Result, DBError> { +) -> Result> { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); let (sql, params) = Query::select() @@ -1337,6 +1295,12 @@ pub async fn list_responses_by_workspace_id( Ok(items.map(|v| v.unwrap()).collect()) } +pub async fn debug_pool(mgr: &impl Manager) { + let dbm = &*mgr.state::(); + let db = dbm.0.lock().await; + debug!("Debug database state: {:?}", db.state()); +} + pub fn generate_model_id(model: ModelType) -> String { let id = generate_id(); format!("{}_{}", model.id_prefix(), id) @@ -1363,7 +1327,7 @@ fn emit_upserted_model(window: &WebviewWindow, model: M) - model } -fn emit_deleted_model(window: &WebviewWindow, model: M) -> Result { +fn emit_deleted_model(window: &WebviewWindow, model: M) -> Result { let payload = ModelPayload { model: model.clone(), window_label: window.label().to_string(), diff --git a/src-tauri/yaak_plugin_runtime/Cargo.toml b/src-tauri/yaak_plugin_runtime/Cargo.toml index 55e391c2..9be8ad7f 100644 --- a/src-tauri/yaak_plugin_runtime/Cargo.toml +++ b/src-tauri/yaak_plugin_runtime/Cargo.toml @@ -17,6 +17,9 @@ tauri = { workspace = true } tauri-plugin-shell = { workspace = true } tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "process"] } tonic = "0.12.1" +ts-rs = "9.0.1" +thiserror = "1.0.63" +yaak_models = {workspace = true} [build-dependencies] tonic-build = "0.12.1" diff --git a/src-tauri/yaak_plugin_runtime/build.rs b/src-tauri/yaak_plugin_runtime/build.rs index 83dd8a03..6c56dcf1 100644 --- a/src-tauri/yaak_plugin_runtime/build.rs +++ b/src-tauri/yaak_plugin_runtime/build.rs @@ -1,4 +1,9 @@ fn main() -> Result<(), Box> { + // Tell ts-rs where to generate types to + println!("cargo:rustc-env=TS_RS_EXPORT_DIR=../../plugin-runtime-types/src/gen"); + + // Compile protobuf types tonic_build::compile_protos("../../proto/plugins/runtime.proto")?; + Ok(()) } diff --git a/src-tauri/yaak_plugin_runtime/src/error.rs b/src-tauri/yaak_plugin_runtime/src/error.rs new file mode 100644 index 00000000..02def35e --- /dev/null +++ b/src-tauri/yaak_plugin_runtime/src/error.rs @@ -0,0 +1,40 @@ +use thiserror::Error; +use tokio::io; +use tokio::sync::mpsc::error::SendError; +use crate::server::plugin_runtime::EventStreamEvent; + +#[derive(Error, Debug)] +pub enum Error { + #[error("IO error")] + IoErr(#[from] io::Error), + #[error("Tauri error")] + TauriErr(#[from] tauri::Error), + #[error("Tauri shell error")] + TauriShellErr(#[from] tauri_plugin_shell::Error), + #[error("Grpc transport error")] + GrpcTransportErr(#[from] tonic::transport::Error), + #[error("Grpc send error")] + GrpcSendErr(#[from] SendError>), + #[error("JSON error")] + JsonErr(#[from] serde_json::Error), + #[error("Plugin not found error")] + PluginNotFoundErr(String), + #[error("unknown error")] + MissingCallbackIdErr(String), + #[error("Missing callback ID error")] + MissingCallbackErr(String), + #[error("No plugins found")] + NoPluginsErr(String), + #[error("Plugin error")] + PluginErr(String), + #[error("Unknown error")] + UnknownErr(String), +} + +impl Into for Error { + fn into(self) -> String { + todo!() + } +} + +pub type Result = std::result::Result; diff --git a/src-tauri/yaak_plugin_runtime/src/events.rs b/src-tauri/yaak_plugin_runtime/src/events.rs new file mode 100644 index 00000000..cc00dc01 --- /dev/null +++ b/src-tauri/yaak_plugin_runtime/src/events.rs @@ -0,0 +1,132 @@ +use serde::{Deserialize, Serialize}; +use serde_json::Value; +use ts_rs::TS; + +use yaak_models::models::{Environment, Folder, GrpcRequest, HttpRequest, HttpResponse, Workspace}; + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub struct InternalEvent { + pub id: String, + pub plugin_ref_id: String, + pub reply_id: Option, + pub payload: InternalEventPayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(tag = "type")] +#[serde(rename_all = "snake_case")] +#[ts(export)] +pub enum InternalEventPayload { + BootRequest(BootRequest), + BootResponse(BootResponse), + ImportRequest(ImportRequest), + ImportResponse(ImportResponse), + FilterRequest(FilterRequest), + FilterResponse(FilterResponse), + ExportHttpRequestRequest(ExportHttpRequestRequest), + ExportHttpRequestResponse(ExportHttpRequestResponse), + /// Returned when a plugin doesn't get run, just so the server + /// has something to listen for + EmptyResponse(EmptyResponse), +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default)] +#[ts(export, type = "{}")] +pub struct EmptyResponse {} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct BootRequest { + pub dir: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct BootResponse { + pub name: String, + pub version: String, + pub capabilities: Vec, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct ImportRequest { + pub content: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct ImportResponse { + pub resources: ImportResources, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct FilterRequest { + pub content: String, + pub filter: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct FilterResponse { + pub items: Vec, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct ExportHttpRequestRequest { + pub http_request: HttpRequest, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct ExportHttpRequestResponse { + pub content: String, +} + +// TODO: Migrate plugins to return this type +// #[derive(Debug, Clone, Serialize, Deserialize, TS)] +// #[serde(rename_all = "camelCase", untagged)] +// #[ts(export)] +// pub enum ExportableModel { +// Workspace(Workspace), +// Environment(Environment), +// Folder(Folder), +// HttpRequest(HttpRequest), +// GrpcRequest(GrpcRequest), +// } + +#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] +#[serde(default, rename_all = "camelCase")] +#[ts(export)] +pub struct ImportResources { + pub workspaces: Vec, + pub environments: Vec, + pub folders: Vec, + pub http_requests: Vec, + pub grpc_requests: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export)] +pub enum Model { + Workspace(Workspace), + Environment(Environment), + Folder(Folder), + HttpRequest(HttpRequest), + HttpResponse(HttpResponse), + GrpcRequest(GrpcRequest), +} diff --git a/src-tauri/yaak_plugin_runtime/src/lib.rs b/src-tauri/yaak_plugin_runtime/src/lib.rs index da5b5b2d..9edeff09 100644 --- a/src-tauri/yaak_plugin_runtime/src/lib.rs +++ b/src-tauri/yaak_plugin_runtime/src/lib.rs @@ -1,41 +1,6 @@ -extern crate core; - -use crate::manager::PluginManager; -use log::info; -use std::process::exit; -use tauri::plugin::{Builder, TauriPlugin}; -use tauri::{Manager, RunEvent, Runtime, State}; -use tokio::sync::Mutex; - +pub mod error; +mod events; pub mod manager; mod nodejs; - -pub mod plugin_runtime { - tonic::include_proto!("yaak.plugins.runtime"); -} - -pub fn init() -> TauriPlugin { - Builder::new("yaak_plugin_runtime") - .setup(|app, _| { - tauri::async_runtime::block_on(async move { - let manager = PluginManager::new(&app).await; - let manager_state = Mutex::new(manager); - app.manage(manager_state); - Ok(()) - }) - }) - .on_event(|app, e| match e { - // TODO: Also exit when app is force-quit (eg. cmd+r in IntelliJ runner) - RunEvent::ExitRequested { api, .. } => { - api.prevent_exit(); - tauri::async_runtime::block_on(async move { - info!("Exiting plugin runtime due to app exit"); - let manager: State> = app.state(); - manager.lock().await.cleanup().await; - exit(0); - }); - } - _ => {} - }) - .build() -} +pub mod plugin; +mod server; diff --git a/src-tauri/yaak_plugin_runtime/src/main.rs b/src-tauri/yaak_plugin_runtime/src/main.rs new file mode 100644 index 00000000..2382c9e8 --- /dev/null +++ b/src-tauri/yaak_plugin_runtime/src/main.rs @@ -0,0 +1,26 @@ +#[tokio::main] +async fn main() -> Result<(), Box> { + // let dir = env::var("YAAK_PLUGINS_DIR").expect("YAAK_PLUGINS_DIR not set"); + // + // let plugin_dirs: Vec = match read_dir(dir) { + // Ok(result) => { + // let mut dirs: Vec = vec![]; + // for entry_result in result { + // match entry_result { + // Ok(entry) => { + // if entry.path().is_dir() { + // dirs.push(entry.path().to_string_lossy().to_string()) + // } + // } + // Err(_) => { + // continue; + // } + // } + // }; + // dirs + // } + // Err(_) => vec![], + // }; + // start_server(plugin_dirs).await.unwrap(); + Ok(()) +} diff --git a/src-tauri/yaak_plugin_runtime/src/manager.rs b/src-tauri/yaak_plugin_runtime/src/manager.rs index 03ee3ee4..0372f16d 100644 --- a/src-tauri/yaak_plugin_runtime/src/manager.rs +++ b/src-tauri/yaak_plugin_runtime/src/manager.rs @@ -1,34 +1,38 @@ -use log::{debug, info}; -use std::time::Duration; -use tauri::{AppHandle, Manager, Runtime}; -use tokio::sync::watch::Sender; -use tonic::transport::Channel; - -use crate::nodejs::node_start; -use crate::plugin_runtime::plugin_runtime_client::PluginRuntimeClient; -use crate::plugin_runtime::{ - HookExportRequest, HookImportRequest, HookResponse, HookResponseFilterRequest, +use crate::error::Result; +use crate::events::{ + ExportHttpRequestRequest, ExportHttpRequestResponse, FilterRequest, FilterResponse, + ImportRequest, ImportResponse, InternalEventPayload, }; +use crate::error::Error::PluginErr; +use crate::nodejs::start_nodejs_plugin_runtime; +use crate::plugin::start_server; +use crate::server::PluginRuntimeGrpcServer; +use std::time::Duration; +use tauri::{AppHandle, Runtime}; +use tokio::sync::watch::Sender; +use yaak_models::models::HttpRequest; + pub struct PluginManager { - client: PluginRuntimeClient, kill_tx: Sender, + server: PluginRuntimeGrpcServer, } impl PluginManager { - pub async fn new(app_handle: &AppHandle) -> PluginManager { - let temp_dir = app_handle.path().temp_dir().unwrap(); + pub async fn new( + app_handle: &AppHandle, + plugin_dirs: Vec, + ) -> PluginManager { + let (server, addr) = start_server(plugin_dirs) + .await + .expect("Failed to start plugin runtime server"); let (kill_tx, kill_rx) = tokio::sync::watch::channel(false); - let start_resp = node_start(app_handle, &temp_dir, &kill_rx).await; - info!("Connecting to gRPC client at {}", start_resp.addr); + start_nodejs_plugin_runtime(app_handle, addr, &kill_rx) + .await + .expect("Failed to start plugin runtime"); - let client = match PluginRuntimeClient::connect(start_resp.addr.clone()).await { - Ok(v) => v, - Err(err) => panic!("{}", err.to_string()), - }; - - PluginManager { client, kill_tx } + PluginManager { kill_tx, server } } pub async fn cleanup(&mut self) { @@ -38,49 +42,81 @@ impl PluginManager { tokio::time::sleep(Duration::from_millis(500)).await; } - pub async fn run_import(&mut self, data: &str) -> Result { - let response = self - .client - .hook_import(tonic::Request::new(HookImportRequest { - data: data.to_string(), + pub async fn run_import(&mut self, content: &str) -> Result<(ImportResponse, String)> { + let reply_events = self + .server + .send_and_wait(&InternalEventPayload::ImportRequest(ImportRequest { + content: content.to_string(), })) - .await - .map_err(|e| e.message().to_string())?; + .await?; - Ok(response.into_inner()) + // TODO: Don't just return the first valid response + for event in reply_events { + match event.payload { + InternalEventPayload::ImportResponse(resp) => { + let ref_id = event.plugin_ref_id.as_str(); + let plugin = self.server.plugin_by_ref_id(ref_id).await?; + let plugin_name = plugin.name().await; + return Ok((resp, plugin_name)); + } + _ => {} + } + } + + Err(PluginErr("No import responses found".to_string())) } - pub async fn run_export_curl(&mut self, request: &str) -> Result { - let response = self - .client - .hook_export(tonic::Request::new(HookExportRequest { - request: request.to_string(), - })) - .await - .map_err(|e| e.message().to_string())?; + pub async fn run_export_curl( + &mut self, + request: &HttpRequest, + ) -> Result { + let event = self + .server + .send_to_plugin_and_wait( + "exporter-curl", + &InternalEventPayload::ExportHttpRequestRequest(ExportHttpRequestRequest { + http_request: request.to_owned(), + }), + ) + .await?; - Ok(response.into_inner()) + match event.payload { + InternalEventPayload::ExportHttpRequestResponse(resp) => Ok(resp), + InternalEventPayload::EmptyResponse(_) => { + Err(PluginErr("Export returned empty".to_string())) + } + e => Err(PluginErr(format!("Export returned invalid event {:?}", e))), + } } - pub async fn run_response_filter( + pub async fn run_filter( &mut self, filter: &str, - body: &str, + content: &str, content_type: &str, - ) -> Result { - debug!("Running plugin filter"); - let response = self - .client - .hook_response_filter(tonic::Request::new(HookResponseFilterRequest { - filter: filter.to_string(), - body: body.to_string(), - content_type: content_type.to_string(), - })) - .await - .map_err(|e| e.message().to_string())?; + ) -> Result { + let plugin_name = match content_type { + "application/json" => "filter-jsonpath", + _ => "filter-xpath", + }; - let result = response.into_inner(); - debug!("Ran plugin response filter {}", result.data); - Ok(result) + let event = self + .server + .send_to_plugin_and_wait( + plugin_name, + &InternalEventPayload::FilterRequest(FilterRequest { + filter: filter.to_string(), + content: content.to_string(), + }), + ) + .await?; + + match event.payload { + InternalEventPayload::FilterResponse(resp) => Ok(resp), + InternalEventPayload::EmptyResponse(_) => { + Err(PluginErr("Filter returned empty".to_string())) + } + e => Err(PluginErr(format!("Export returned invalid event {:?}", e))), + } } } diff --git a/src-tauri/yaak_plugin_runtime/src/nodejs.rs b/src-tauri/yaak_plugin_runtime/src/nodejs.rs index ada51094..cf903ec5 100644 --- a/src-tauri/yaak_plugin_runtime/src/nodejs.rs +++ b/src-tauri/yaak_plugin_runtime/src/nodejs.rs @@ -1,14 +1,12 @@ -use std::path::PathBuf; -use std::time::Duration; - +use std::net::SocketAddr; +use crate::error::Result; use log::info; -use rand::distributions::{Alphanumeric, DistString}; use serde; use serde::Deserialize; use tauri::path::BaseDirectory; use tauri::{AppHandle, Manager, Runtime}; +use tauri_plugin_shell::process::CommandEvent; use tauri_plugin_shell::ShellExt; -use tokio::fs; use tokio::sync::watch::Receiver; #[derive(Deserialize, Default)] @@ -17,57 +15,48 @@ struct PortFile { port: i32, } -pub struct StartResp { - pub addr: String, -} - -pub async fn node_start( +pub async fn start_nodejs_plugin_runtime( app: &AppHandle, - temp_dir: &PathBuf, + addr: SocketAddr, kill_rx: &Receiver, -) -> StartResp { - let port_file_path = temp_dir.join(Alphanumeric.sample_string(&mut rand::thread_rng(), 10)); - - let plugins_dir = app - .path() - .resolve("plugins", BaseDirectory::Resource) - .expect("failed to resolve plugin directory resource"); - +) -> Result<()> { let plugin_runtime_main = app .path() - .resolve("plugin-runtime", BaseDirectory::Resource) - .expect("failed to resolve plugin runtime resource") + .resolve("plugin-runtime", BaseDirectory::Resource)? .join("index.cjs"); // HACK: Remove UNC prefix for Windows paths to pass to sidecar - - let plugins_dir = dunce::simplified(plugins_dir.as_path()) - .to_string_lossy() - .to_string(); let plugin_runtime_main = dunce::simplified(plugin_runtime_main.as_path()) .to_string_lossy() .to_string(); - info!( - "Starting plugin runtime\n → port_file={}\n → plugins_dir={}\n → runtime_dir={}", - port_file_path.to_string_lossy(), - plugins_dir, - plugin_runtime_main, - ); + info!("Starting plugin runtime main={}", plugin_runtime_main); let cmd = app .shell() - .sidecar("yaaknode") - .expect("yaaknode not found") - .env("YAAK_GRPC_PORT_FILE_PATH", port_file_path.clone()) - .env("YAAK_PLUGINS_DIR", plugins_dir) + .sidecar("yaaknode")? + .env("PORT", addr.port().to_string()) .args(&[plugin_runtime_main]); - println!("Waiting on plugin runtime"); - let (_, child) = cmd.spawn().expect("yaaknode failed to start"); + let (mut child_rx, child) = cmd.spawn()?; + println!("Spawned plugin runtime"); let mut kill_rx = kill_rx.clone(); + tokio::spawn(async move { + while let Some(event) = child_rx.recv().await { + match event { + CommandEvent::Stderr(line) => { + print!("{}", String::from_utf8(line).unwrap()); + } + CommandEvent::Stdout(line) => { + print!("{}", String::from_utf8(line).unwrap()); + } + _ => {} + } + } + }); + // Check on child tokio::spawn(async move { kill_rx @@ -77,26 +66,7 @@ pub async fn node_start( info!("Killing plugin runtime"); child.kill().expect("Failed to kill plugin runtime"); info!("Killed plugin runtime"); - return; }); - let start = std::time::Instant::now(); - let port_file_contents = loop { - if start.elapsed().as_millis() > 30000 { - panic!("Failed to read port file in time"); - } - - match fs::read_to_string(port_file_path.clone()).await { - Ok(s) => break s, - Err(_) => { - tokio::time::sleep(Duration::from_millis(500)).await; - } - } - }; - - let port_file: PortFile = serde_json::from_str(port_file_contents.as_str()).unwrap(); - info!("Started plugin runtime on :{}", port_file.port); - let addr = format!("http://localhost:{}", port_file.port); - - StartResp { addr } + Ok(()) } diff --git a/src-tauri/yaak_plugin_runtime/src/plugin.rs b/src-tauri/yaak_plugin_runtime/src/plugin.rs new file mode 100644 index 00000000..67bd8695 --- /dev/null +++ b/src-tauri/yaak_plugin_runtime/src/plugin.rs @@ -0,0 +1,112 @@ +use std::net::SocketAddr; +use std::path::PathBuf; +use std::process::exit; +use std::time::Duration; + +use log::info; +use tauri::path::BaseDirectory; +use tauri::plugin::{Builder, TauriPlugin}; +use tauri::{Manager, RunEvent, Runtime, State}; +use tokio::fs::read_dir; +use tokio::net::TcpListener; +use tokio::sync::Mutex; +use tonic::codegen::tokio_stream; +use tonic::transport::Server; + +use crate::error::Result; +use crate::events::{InternalEvent, InternalEventPayload}; +use crate::manager::PluginManager; +use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntimeServer; +use crate::server::PluginRuntimeGrpcServer; + +pub fn init() -> TauriPlugin { + Builder::new("yaak_plugin_runtime") + .setup(|app, _| { + let plugins_dir = app + .path() + .resolve("plugins", BaseDirectory::Resource) + .expect("failed to resolve plugin directory resource"); + + tauri::async_runtime::block_on(async move { + let plugin_dirs = read_plugins_dir(&plugins_dir) + .await + .expect("Failed to read plugins dir"); + let manager = PluginManager::new(&app, plugin_dirs).await; + let manager_state = Mutex::new(manager); + app.manage(manager_state); + Ok(()) + }) + }) + .on_event(|app, e| match e { + // TODO: Also exit when app is force-quit (eg. cmd+r in IntelliJ runner) + RunEvent::ExitRequested { api, .. } => { + api.prevent_exit(); + tauri::async_runtime::block_on(async move { + info!("Exiting plugin runtime due to app exit"); + let manager: State> = app.state(); + manager.lock().await.cleanup().await; + exit(0); + }); + } + _ => {} + }) + .build() +} + +pub async fn start_server( + plugin_dirs: Vec, +) -> Result<(PluginRuntimeGrpcServer, SocketAddr)> { + println!("Starting plugin server with {plugin_dirs:?}"); + let server = PluginRuntimeGrpcServer::new(plugin_dirs); + + let svc = PluginRuntimeServer::new(server.clone()); + let listen_addr = match option_env!("PORT") { + None => "localhost:0".to_string(), + Some(port) => format!("localhost:{port}"), + }; + + { + let server = server.clone(); + tokio::spawn(async move { + let (rx_id, mut rx) = server.subscribe().await; + while let Some(event) = rx.recv().await { + match event.clone() { + InternalEvent { + payload: InternalEventPayload::BootResponse(resp), + plugin_ref_id, + .. + } => { + server.boot_plugin(plugin_ref_id.as_str(), &resp).await; + } + _ => {} + }; + } + server.unsubscribe(rx_id).await; + }); + }; + + let listener = TcpListener::bind(listen_addr).await?; + let addr = listener.local_addr()?; + println!("Starting gRPC plugin server on {addr}"); + tokio::spawn(async move { + Server::builder() + .timeout(Duration::from_secs(10)) + .add_service(svc) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .expect("grpc plugin runtime server failed to start"); + }); + + Ok((server, addr)) +} + +async fn read_plugins_dir(dir: &PathBuf) -> Result> { + let mut result = read_dir(dir).await?; + let mut dirs: Vec = vec![]; + while let Ok(Some(entry)) = result.next_entry().await { + if entry.path().is_dir() { + dirs.push(entry.path().to_string_lossy().to_string()) + } + } + Ok(dirs) +} diff --git a/src-tauri/yaak_plugin_runtime/src/server.rs b/src-tauri/yaak_plugin_runtime/src/server.rs new file mode 100644 index 00000000..ac526f10 --- /dev/null +++ b/src-tauri/yaak_plugin_runtime/src/server.rs @@ -0,0 +1,448 @@ +use log::info; +use rand::distributions::{Alphanumeric, DistString}; +use std::collections::HashMap; +use std::pin::Pin; +use std::sync::Arc; +use tokio::sync::mpsc::Receiver; +use tokio::sync::{mpsc, Mutex}; +use tonic::codegen::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::{Stream, StreamExt}; +use tonic::{Request, Response, Status, Streaming}; + +use crate::error::Error::{NoPluginsErr, PluginNotFoundErr}; +use crate::error::Result; +use crate::events::{BootRequest, BootResponse, InternalEvent, InternalEventPayload}; +use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntime; +use plugin_runtime::EventStreamEvent; +use yaak_models::queries::generate_id; + +pub mod plugin_runtime { + tonic::include_proto!("yaak.plugins.runtime"); +} + +type ResponseStream = + Pin> + Send>>; + +#[derive(Clone)] +pub struct PluginHandle { + dir: String, + to_plugin_tx: Arc>>>, + ref_id: String, + boot_resp: Arc>>, +} + +impl PluginHandle { + pub async fn name(&self) -> String { + match &*self.boot_resp.lock().await { + None => "__NOT_BOOTED__".to_string(), + Some(r) => r.name.to_owned(), + } + } + + pub fn build_event_to_send( + &self, + payload: &InternalEventPayload, + reply_id: Option, + ) -> InternalEvent { + InternalEvent { + id: gen_id(), + plugin_ref_id: self.ref_id.clone(), + reply_id, + payload: payload.clone(), + } + } + + pub async fn send(&self, event: &InternalEvent) -> Result<()> { + info!("Sending event {} {:?}", event.id, self.name().await); + self.to_plugin_tx + .lock() + .await + .send(Ok(EventStreamEvent { + event: serde_json::to_string(&event)?, + })) + .await?; + Ok(()) + } + + pub async fn boot(&self, resp: &BootResponse) { + let mut boot_resp = self.boot_resp.lock().await; + *boot_resp = Some(resp.clone()); + } +} + +#[derive(Clone)] +pub struct PluginRuntimeGrpcServer { + plugin_ref_to_plugin: Arc>>, + callback_to_plugin_ref: Arc>>, + subscribers: Arc>>>, + plugin_dirs: Vec, +} + +impl PluginRuntimeGrpcServer { + pub fn new(plugin_dirs: Vec) -> Self { + PluginRuntimeGrpcServer { + plugin_ref_to_plugin: Arc::new(Mutex::new(HashMap::new())), + callback_to_plugin_ref: Arc::new(Mutex::new(HashMap::new())), + subscribers: Arc::new(Mutex::new(HashMap::new())), + plugin_dirs, + } + } + + pub async fn subscribe(&self) -> (String, Receiver) { + let (tx, rx) = mpsc::channel(128); + let id = generate_id(); + self.subscribers.lock().await.insert(id.clone(), tx); + (id, rx) + } + + pub async fn unsubscribe(&self, rx_id: String) { + self.subscribers.lock().await.remove(rx_id.as_str()); + } + + pub async fn remove_plugins(&self, plugin_ids: Vec) { + for plugin_id in plugin_ids { + self.remove_plugin(plugin_id.as_str()).await; + } + } + + pub async fn remove_plugin(&self, id: &str) { + match self.plugin_ref_to_plugin.lock().await.remove(id) { + None => { + println!("Tried to remove non-existing plugin {}", id); + } + Some(plugin) => { + println!("Removed plugin {} {}", id, plugin.name().await); + } + }; + } + + pub async fn boot_plugin(&self, id: &str, resp: &BootResponse) { + match self.plugin_ref_to_plugin.lock().await.get(id) { + None => { + println!("Tried booting non-existing plugin {}", id); + } + Some(plugin) => { + plugin.clone().boot(resp).await; + } + } + } + + pub async fn add_plugin( + &self, + dir: &str, + tx: mpsc::Sender>, + ) -> PluginHandle { + let ref_id = gen_id(); + let plugin_handle = PluginHandle { + ref_id: ref_id.clone(), + dir: dir.to_string(), + to_plugin_tx: Arc::new(Mutex::new(tx)), + boot_resp: Arc::new(Mutex::new(None)), + }; + let _ = self + .plugin_ref_to_plugin + .lock() + .await + .insert(ref_id, plugin_handle.clone()); + plugin_handle + } + + // pub async fn callback( + // &self, + // source_event: InternalEvent, + // payload: InternalEventPayload, + // ) -> Result { + // let reply_id = match source_event.clone().reply_id { + // None => { + // let msg = format!("Source event missing reply Id {:?}", source_event.clone()); + // return Err(MissingCallbackIdErr(msg)); + // } + // Some(id) => id, + // }; + // + // let callbacks = self.callbacks.lock().await; + // let plugin_name = match callbacks.get(reply_id.as_str()) { + // None => { + // let msg = format!("Callback not found {:?}", source_event); + // return Err(MissingCallbackErr(msg)); + // } + // Some(n) => n, + // }; + // + // let plugins = self.plugins.lock().await; + // let plugin = match plugins.get(plugin_name) { + // None => { + // let msg = format!( + // "Plugin not found {plugin_name}. Choices were {:?}", + // plugins.keys() + // ); + // return Err(UnknownPluginErr(msg)); + // } + // Some(n) => n, + // }; + // + // plugin.send(&payload, Some(reply_id)).await + // } + + pub async fn plugin_by_ref_id(&self, ref_id: &str) -> Result { + let plugins = self.plugin_ref_to_plugin.lock().await; + if plugins.is_empty() { + return Err(NoPluginsErr("Send failed because no plugins exist".into())); + } + + match plugins.get(ref_id) { + None => { + let msg = format!("Failed to find plugin for id {ref_id}"); + Err(PluginNotFoundErr(msg)) + } + Some(p) => Ok(p.to_owned()), + } + } + + pub async fn plugin_by_name(&self, plugin_name: &str) -> Result { + let plugins = self.plugin_ref_to_plugin.lock().await; + if plugins.is_empty() { + return Err(NoPluginsErr("Send failed because no plugins exist".into())); + } + + for p in plugins.values() { + if p.name().await == plugin_name { + return Ok(p.to_owned()); + } + } + + let msg = format!("Failed to find plugin for {plugin_name}"); + Err(PluginNotFoundErr(msg)) + } + + pub async fn send_to_plugin( + &self, + plugin_name: &str, + payload: InternalEventPayload, + ) -> Result { + let plugins = self.plugin_ref_to_plugin.lock().await; + if plugins.is_empty() { + return Err(NoPluginsErr("Send failed because no plugins exist".into())); + } + + let mut plugin = None; + for p in plugins.values() { + if p.name().await == plugin_name { + plugin = Some(p); + break; + } + } + + match plugin { + Some(plugin) => { + let event = plugin.build_event_to_send(&payload, None); + plugin.send(&event).await?; + Ok(event) + } + None => { + let msg = format!("Failed to find plugin for {plugin_name}"); + Err(PluginNotFoundErr(msg)) + } + } + } + + pub async fn send_to_plugin_and_wait( + &self, + plugin_name: &str, + payload: &InternalEventPayload, + ) -> Result { + let plugin = self.plugin_by_name(plugin_name).await?; + let events = self.send_to_plugins_and_wait(payload, vec![plugin]).await?; + Ok(events.first().unwrap().to_owned()) + } + + pub async fn send_and_wait( + &self, + payload: &InternalEventPayload, + ) -> Result> { + let plugins = self + .plugin_ref_to_plugin + .lock() + .await + .values() + .cloned() + .collect(); + self.send_to_plugins_and_wait(payload, plugins).await + } + + async fn send_to_plugins_and_wait( + &self, + payload: &InternalEventPayload, + plugins: Vec, + ) -> Result> { + // 1. Build the events with IDs and everything + let events_to_send = plugins + .iter() + .map(|p| p.build_event_to_send(payload, None)) + .collect::>(); + + // 2. Spawn thread to subscribe to incoming events and check reply ids + let server = self.clone(); + let send_events_fut = { + let events_to_send = events_to_send.clone(); + tokio::spawn(async move { + let (rx_id, mut rx) = server.subscribe().await; + let mut found_events = Vec::new(); + + while let Some(event) = rx.recv().await { + if events_to_send + .iter() + .find(|e| Some(e.id.to_owned()) == event.reply_id) + .is_some() + { + found_events.push(event.clone()); + }; + if found_events.len() == events_to_send.len() { + break; + } + } + server.unsubscribe(rx_id).await; + + found_events + }) + }; + + // 3. Send the events + for event in events_to_send { + let plugin = plugins + .iter() + .find(|p| p.ref_id == event.plugin_ref_id) + .expect("Didn't find plugin in list"); + plugin.send(&event).await? + } + + // 4. Join on the spawned thread + let events = send_events_fut.await.expect("Thread didn't succeed"); + Ok(events) + } + + pub async fn send(&self, payload: InternalEventPayload) -> Result> { + let mut events: Vec = Vec::new(); + let plugins = self.plugin_ref_to_plugin.lock().await; + if plugins.is_empty() { + return Err(NoPluginsErr("Send failed because no plugins exist".into())); + } + + for ph in plugins.values() { + let event = ph.build_event_to_send(&payload, None); + self.send_to_plugin_handle(ph, &event).await?; + events.push(event); + } + + Ok(events) + } + + async fn send_to_plugin_handle( + &self, + plugin: &PluginHandle, + event: &InternalEvent, + ) -> Result<()> { + plugin.send(event).await + } + + async fn load_plugins( + &self, + to_plugin_tx: mpsc::Sender>, + plugin_dirs: Vec, + ) -> Vec { + let mut plugin_ids = Vec::new(); + + for dir in plugin_dirs { + let plugin = self.add_plugin(dir.as_str(), to_plugin_tx.clone()).await; + plugin_ids.push(plugin.clone().ref_id); + + let event = plugin.build_event_to_send( + &InternalEventPayload::BootRequest(BootRequest { + dir: dir.to_string(), + }), + None, + ); + if let Err(e) = plugin.send(&event).await { + // TODO: Error handling + println!( + "Failed boot plugin {} at {} -> {}", + plugin.ref_id, plugin.dir, e + ) + } else { + println!("Loaded plugin {} at {}", plugin.ref_id, plugin.dir) + } + } + + plugin_ids + } +} + +#[tonic::async_trait] +impl PluginRuntime for PluginRuntimeGrpcServer { + type EventStreamStream = ResponseStream; + + async fn event_stream( + &self, + req: Request>, + ) -> tonic::Result> { + let mut in_stream = req.into_inner(); + + let (to_plugin_tx, to_plugin_rx) = mpsc::channel(128); + + let plugin_ids = self + .load_plugins(to_plugin_tx, self.plugin_dirs.clone()) + .await; + + let callbacks = self.callback_to_plugin_ref.clone(); + let server = self.clone(); + tokio::spawn(async move { + while let Some(result) = in_stream.next().await { + match result { + Ok(v) => { + let event: InternalEvent = match serde_json::from_str(v.event.as_str()) { + Ok(pe) => pe, + Err(e) => { + println!("Failed to deserialize event {e:?} -> {}", v.event); + continue; + } + }; + + let plugin_ref_id = event.plugin_ref_id.clone(); + let reply_id = event.reply_id.clone(); + + let subscribers = server.subscribers.lock().await; + for tx in subscribers.values() { + // Emit event to the channel for server to handle + if let Err(e) = tx.try_send(event.clone()) { + println!("Failed to send to server channel. Receiver probably isn't listening: {:?}", e); + } + } + + // Add to callbacks if there's a reply_id + if let Some(reply_id) = reply_id { + callbacks.lock().await.insert(reply_id, plugin_ref_id); + } + } + Err(err) => { + // TODO: Better error handling + println!("gRPC server error {err}"); + break; + } + }; + } + + server.remove_plugins(plugin_ids).await; + }); + + // Write the same data that was received + let out_stream = ReceiverStream::new(to_plugin_rx); + + Ok(Response::new( + Box::pin(out_stream) as Self::EventStreamStream + )) + } +} + +fn gen_id() -> String { + Alphanumeric.sample_string(&mut rand::thread_rng(), 5) +} diff --git a/src-web/components/CookieDialog.tsx b/src-web/components/CookieDialog.tsx index 7e53f0ae..3711e160 100644 --- a/src-web/components/CookieDialog.tsx +++ b/src-web/components/CookieDialog.tsx @@ -1,6 +1,6 @@ import { useCookieJars } from '../hooks/useCookieJars'; import { useUpdateCookieJar } from '../hooks/useUpdateCookieJar'; -import type { Cookie } from '../lib/gen/Cookie'; +import type { Cookie } from '../lib/models/Cookie'; import { cookieDomain } from '../lib/models'; import { Banner } from './core/Banner'; import { IconButton } from './core/IconButton'; diff --git a/src-web/components/ImportDataDialog.tsx b/src-web/components/ImportDataDialog.tsx index 0c779984..ede0cbce 100644 --- a/src-web/components/ImportDataDialog.tsx +++ b/src-web/components/ImportDataDialog.tsx @@ -1,4 +1,5 @@ import React, { useState } from 'react'; +import { useLocalStorage } from 'react-use'; import { Button } from './core/Button'; import { VStack } from './core/Stacks'; import { SelectFile } from './SelectFile'; @@ -9,7 +10,7 @@ interface Props { export function ImportDataDialog({ importData }: Props) { const [isLoading, setIsLoading] = useState(false); - const [filePath, setFilePath] = useState(null); + const [filePath, setFilePath] = useLocalStorage('importFilePath', null); return ( @@ -24,7 +25,10 @@ export function ImportDataDialog({ importData }: Props) { - setFilePath(filePath)} /> + setFilePath(filePath)} + /> {filePath && (