diff --git a/package-lock.json b/package-lock.json index 2a6b47f8..836a9d19 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27,7 +27,7 @@ "@tauri-apps/plugin-log": "^2.0.0-rc.1", "@tauri-apps/plugin-os": "^2.0.0-rc.1", "@tauri-apps/plugin-shell": "^2.0.0-rc.1", - "@yaakapp/api": "^0.2.0", + "@yaakapp/api": "^0.2.3", "buffer": "^6.0.3", "classnames": "^2.5.1", "cm6-graphql": "^0.0.9", @@ -3322,9 +3322,9 @@ "license": "MIT" }, "node_modules/@yaakapp/api": { - "version": "0.2.0", - "resolved": "https://registry.npmjs.org/@yaakapp/api/-/api-0.2.0.tgz", - "integrity": "sha512-DskPYRQ0Hk3KcOIi8O5drbpK0wUwvpUcMvsYfPPz90jfwb9tSpprfFKFUiCVartrA/VO6R0skKscWz74QTKaSA==", + "version": "0.2.3", + "resolved": "https://registry.npmjs.org/@yaakapp/api/-/api-0.2.3.tgz", + "integrity": "sha512-LKLk1EErWF0LyFj70yhZZzk2ZwwpC7xT3y3zPofgxUqKis9gW7lwevsTdyb1Acv18BY6IL2u8as7dzIN2p85ew==", "dependencies": { "@types/node": "^22.5.4" } diff --git a/package.json b/package.json index 511f2bf5..0a60b059 100644 --- a/package.json +++ b/package.json @@ -46,7 +46,7 @@ "@tauri-apps/plugin-log": "^2.0.0-rc.1", "@tauri-apps/plugin-os": "^2.0.0-rc.1", "@tauri-apps/plugin-shell": "^2.0.0-rc.1", - "@yaakapp/api": "^0.2.0", + "@yaakapp/api": "^0.2.3", "buffer": "^6.0.3", "classnames": "^2.5.1", "cm6-graphql": "^0.0.9", diff --git a/plugin-runtime-types/package.json b/plugin-runtime-types/package.json index fdf26b4b..612ef824 100644 --- a/plugin-runtime-types/package.json +++ b/plugin-runtime-types/package.json @@ -1,6 +1,6 @@ { "name": "@yaakapp/api", - "version": "0.2.0", + "version": "0.2.3", "main": "lib/index.js", "typings": "./lib/index.d.ts", "files": [ diff --git a/plugin-runtime-types/src/gen/InternalEventPayload.ts b/plugin-runtime-types/src/gen/InternalEventPayload.ts index 10dca75c..bf2237a9 100644 --- a/plugin-runtime-types/src/gen/InternalEventPayload.ts +++ b/plugin-runtime-types/src/gen/InternalEventPayload.ts @@ -1,55 +1,27 @@ // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. -import type { BootRequest } from './BootRequest'; -import type { BootResponse } from './BootResponse'; -import type { CallHttpRequestActionRequest } from './CallHttpRequestActionRequest'; -import type { CallTemplateFunctionRequest } from './CallTemplateFunctionRequest'; -import type { CallTemplateFunctionResponse } from './CallTemplateFunctionResponse'; -import type { CopyTextRequest } from './CopyTextRequest'; -import type { ExportHttpRequestRequest } from './ExportHttpRequestRequest'; -import type { ExportHttpRequestResponse } from './ExportHttpRequestResponse'; -import type { FilterRequest } from './FilterRequest'; -import type { FilterResponse } from './FilterResponse'; -import type { FindHttpResponsesRequest } from './FindHttpResponsesRequest'; -import type { FindHttpResponsesResponse } from './FindHttpResponsesResponse'; -import type { GetHttpRequestActionsRequest } from './GetHttpRequestActionsRequest'; -import type { GetHttpRequestActionsResponse } from './GetHttpRequestActionsResponse'; -import type { GetHttpRequestByIdRequest } from './GetHttpRequestByIdRequest'; -import type { GetHttpRequestByIdResponse } from './GetHttpRequestByIdResponse'; -import type { GetTemplateFunctionsResponse } from './GetTemplateFunctionsResponse'; -import type { ImportRequest } from './ImportRequest'; -import type { ImportResponse } from './ImportResponse'; -import type { RenderHttpRequestRequest } from './RenderHttpRequestRequest'; -import type { RenderHttpRequestResponse } from './RenderHttpRequestResponse'; -import type { SendHttpRequestRequest } from './SendHttpRequestRequest'; -import type { SendHttpRequestResponse } from './SendHttpRequestResponse'; -import type { ShowToastRequest } from './ShowToastRequest'; +import type { BootRequest } from "./BootRequest"; +import type { BootResponse } from "./BootResponse"; +import type { CallHttpRequestActionRequest } from "./CallHttpRequestActionRequest"; +import type { CallTemplateFunctionRequest } from "./CallTemplateFunctionRequest"; +import type { CallTemplateFunctionResponse } from "./CallTemplateFunctionResponse"; +import type { CopyTextRequest } from "./CopyTextRequest"; +import type { ExportHttpRequestRequest } from "./ExportHttpRequestRequest"; +import type { ExportHttpRequestResponse } from "./ExportHttpRequestResponse"; +import type { FilterRequest } from "./FilterRequest"; +import type { FilterResponse } from "./FilterResponse"; +import type { FindHttpResponsesRequest } from "./FindHttpResponsesRequest"; +import type { FindHttpResponsesResponse } from "./FindHttpResponsesResponse"; +import type { GetHttpRequestActionsRequest } from "./GetHttpRequestActionsRequest"; +import type { GetHttpRequestActionsResponse } from "./GetHttpRequestActionsResponse"; +import type { GetHttpRequestByIdRequest } from "./GetHttpRequestByIdRequest"; +import type { GetHttpRequestByIdResponse } from "./GetHttpRequestByIdResponse"; +import type { GetTemplateFunctionsResponse } from "./GetTemplateFunctionsResponse"; +import type { ImportRequest } from "./ImportRequest"; +import type { ImportResponse } from "./ImportResponse"; +import type { RenderHttpRequestRequest } from "./RenderHttpRequestRequest"; +import type { RenderHttpRequestResponse } from "./RenderHttpRequestResponse"; +import type { SendHttpRequestRequest } from "./SendHttpRequestRequest"; +import type { SendHttpRequestResponse } from "./SendHttpRequestResponse"; +import type { ShowToastRequest } from "./ShowToastRequest"; -export type InternalEventPayload = - | ({ type: 'boot_request' } & BootRequest) - | ({ type: 'boot_response' } & BootResponse) - | { type: 'reload_request' } - | { type: 'reload_response' } - | ({ type: 'import_request' } & ImportRequest) - | ({ type: 'import_response' } & ImportResponse) - | ({ type: 'filter_request' } & FilterRequest) - | ({ type: 'filter_response' } & FilterResponse) - | ({ type: 'export_http_request_request' } & ExportHttpRequestRequest) - | ({ type: 'export_http_request_response' } & ExportHttpRequestResponse) - | ({ type: 'send_http_request_request' } & SendHttpRequestRequest) - | ({ type: 'send_http_request_response' } & SendHttpRequestResponse) - | ({ type: 'get_http_request_actions_request' } & GetHttpRequestActionsRequest) - | ({ type: 'get_http_request_actions_response' } & GetHttpRequestActionsResponse) - | ({ type: 'call_http_request_action_request' } & CallHttpRequestActionRequest) - | { type: 'get_template_functions_request' } - | ({ type: 'get_template_functions_response' } & GetTemplateFunctionsResponse) - | ({ type: 'call_template_function_request' } & CallTemplateFunctionRequest) - | ({ type: 'call_template_function_response' } & CallTemplateFunctionResponse) - | ({ type: 'copy_text_request' } & CopyTextRequest) - | ({ type: 'render_http_request_request' } & RenderHttpRequestRequest) - | ({ type: 'render_http_request_response' } & RenderHttpRequestResponse) - | ({ type: 'show_toast_request' } & ShowToastRequest) - | ({ type: 'get_http_request_by_id_request' } & GetHttpRequestByIdRequest) - | ({ type: 'get_http_request_by_id_response' } & GetHttpRequestByIdResponse) - | ({ type: 'find_http_responses_request' } & FindHttpResponsesRequest) - | ({ type: 'find_http_responses_response' } & FindHttpResponsesResponse) - | { type: 'empty_response' }; +export type InternalEventPayload = { "type": "boot_request" } & BootRequest | { "type": "boot_response" } & BootResponse | { "type": "reload_request" } | { "type": "reload_response" } | { "type": "terminate_request" } | { "type": "terminate_response" } | { "type": "import_request" } & ImportRequest | { "type": "import_response" } & ImportResponse | { "type": "filter_request" } & FilterRequest | { "type": "filter_response" } & FilterResponse | { "type": "export_http_request_request" } & ExportHttpRequestRequest | { "type": "export_http_request_response" } & ExportHttpRequestResponse | { "type": "send_http_request_request" } & SendHttpRequestRequest | { "type": "send_http_request_response" } & SendHttpRequestResponse | { "type": "get_http_request_actions_request" } & GetHttpRequestActionsRequest | { "type": "get_http_request_actions_response" } & GetHttpRequestActionsResponse | { "type": "call_http_request_action_request" } & CallHttpRequestActionRequest | { "type": "get_template_functions_request" } | { "type": "get_template_functions_response" } & GetTemplateFunctionsResponse | { "type": "call_template_function_request" } & CallTemplateFunctionRequest | { "type": "call_template_function_response" } & CallTemplateFunctionResponse | { "type": "copy_text_request" } & CopyTextRequest | { "type": "render_http_request_request" } & RenderHttpRequestRequest | { "type": "render_http_request_response" } & RenderHttpRequestResponse | { "type": "show_toast_request" } & ShowToastRequest | { "type": "get_http_request_by_id_request" } & GetHttpRequestByIdRequest | { "type": "get_http_request_by_id_response" } & GetHttpRequestByIdResponse | { "type": "find_http_responses_request" } & FindHttpResponsesRequest | { "type": "find_http_responses_response" } & FindHttpResponsesResponse | { "type": "empty_response" }; diff --git a/plugin-runtime/src/PluginHandle.ts b/plugin-runtime/src/PluginHandle.ts index d3c08e19..49c91e86 100644 --- a/plugin-runtime/src/PluginHandle.ts +++ b/plugin-runtime/src/PluginHandle.ts @@ -18,6 +18,10 @@ export class PluginHandle { this.#worker.postMessage(event); } + async terminate() { + await this.#worker.terminate(); + } + #createWorker(): Worker { const workerPath = process.env.YAAK_WORKER_PATH ?? path.join(__dirname, 'index.worker.cjs'); const worker = new Worker(workerPath, { diff --git a/plugin-runtime/src/index.ts b/plugin-runtime/src/index.ts index 96cc90cd..dc60a2b0 100644 --- a/plugin-runtime/src/index.ts +++ b/plugin-runtime/src/index.ts @@ -22,13 +22,19 @@ const plugins: Record = {}; plugins[pluginEvent.pluginRefId] = plugin; } - // Once booted, forward all events to plugin's worker + // Once booted, forward all events to the plugin worker const plugin = plugins[pluginEvent.pluginRefId]; if (!plugin) { console.warn('Failed to get plugin for event by', pluginEvent.pluginRefId); continue; } + if (pluginEvent.payload.type === 'terminate_request') { + await plugin.terminate(); + console.log('Terminated plugin worker', pluginEvent.pluginRefId); + delete plugins[pluginEvent.pluginRefId]; + } + plugin.sendToWorker(pluginEvent); } console.log('Stream ended'); diff --git a/plugin-runtime/src/index.worker.ts b/plugin-runtime/src/index.worker.ts index 863a74e8..8e83f47b 100644 --- a/plugin-runtime/src/index.worker.ts +++ b/plugin-runtime/src/index.worker.ts @@ -155,6 +155,14 @@ async function initialize() { return; } + if (payload.type === 'terminate_request') { + const payload: InternalEventPayload = { + type: 'terminate_response', + }; + sendPayload(payload, replyId); + return; + } + if (payload.type === 'import_request' && typeof mod.pluginHookImport === 'function') { const reply: ImportResponse | null = await mod.pluginHookImport(ctx, payload.content); if (reply != null) { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index c6356964..76dbd016 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -47,9 +47,9 @@ use yaak_models::queries::{ cancel_pending_grpc_connections, cancel_pending_responses, create_default_http_response, delete_all_grpc_connections, delete_all_http_responses, delete_cookie_jar, delete_environment, delete_folder, delete_grpc_connection, delete_grpc_request, delete_http_request, - delete_http_response, delete_workspace, duplicate_grpc_request, duplicate_http_request, - generate_model_id, get_cookie_jar, get_environment, get_folder, get_grpc_connection, - get_grpc_request, get_http_request, get_http_response, get_key_value_raw, + delete_http_response, delete_plugin, delete_workspace, duplicate_grpc_request, + duplicate_http_request, generate_model_id, get_cookie_jar, get_environment, get_folder, + get_grpc_connection, get_grpc_request, get_http_request, get_http_response, get_key_value_raw, get_or_create_settings, get_plugin, get_workspace, list_cookie_jars, list_environments, list_folders, list_grpc_connections, list_grpc_events, list_grpc_requests, list_http_requests, list_http_responses, list_plugins, list_workspaces, set_key_value_raw, update_response_if_id, @@ -57,12 +57,12 @@ use yaak_models::queries::{ upsert_grpc_event, upsert_grpc_request, upsert_http_request, upsert_plugin, upsert_workspace, }; use yaak_plugin_runtime::events::{ - CallHttpRequestActionRequest, FilterResponse, FindHttpResponsesResponse, + BootResponse, CallHttpRequestActionRequest, FilterResponse, FindHttpResponsesResponse, GetHttpRequestActionsResponse, GetHttpRequestByIdResponse, GetTemplateFunctionsResponse, - InternalEvent, InternalEventPayload, BootResponse, RenderHttpRequestResponse, - SendHttpRequestResponse, ShowToastRequest, ToastVariant, + InternalEvent, InternalEventPayload, RenderHttpRequestResponse, SendHttpRequestResponse, + ShowToastRequest, ToastVariant, }; -use yaak_plugin_runtime::handle::PluginHandle; +use yaak_plugin_runtime::plugin_handle::PluginHandle; use yaak_templates::{Parser, Tokens}; mod analytics; @@ -82,6 +82,7 @@ const DEFAULT_WINDOW_HEIGHT: f64 = 600.0; const MIN_WINDOW_WIDTH: f64 = 300.0; const MIN_WINDOW_HEIGHT: f64 = 300.0; +const MAIN_WINDOW_PREFIX: &str = "main_"; #[derive(serde::Serialize)] #[serde(default, rename_all = "camelCase")] @@ -1172,12 +1173,18 @@ async fn cmd_create_workspace(name: &str, w: WebviewWindow) -> Result, + plugin_manager: State<'_, PluginManager>, w: WebviewWindow, ) -> Result { - upsert_plugin( + plugin_manager + .add_plugin_by_dir(&directory) + .await + .map_err(|e| e.to_string())?; + + let plugin = upsert_plugin( &w, Plugin { directory: directory.into(), @@ -1186,7 +1193,27 @@ async fn cmd_create_plugin( }, ) .await - .map_err(|e| e.to_string()) + .map_err(|e| e.to_string())?; + + Ok(plugin) +} + +#[tauri::command] +async fn cmd_uninstall_plugin( + plugin_id: &str, + plugin_manager: State<'_, PluginManager>, + w: WebviewWindow, +) -> Result { + let plugin = delete_plugin(&w, plugin_id) + .await + .map_err(|e| e.to_string())?; + + plugin_manager + .uninstall(plugin.directory.as_str()) + .await + .map_err(|e| e.to_string())?; + + Ok(plugin) } #[tauri::command] @@ -1463,8 +1490,14 @@ async fn cmd_list_plugins(w: WebviewWindow) -> Result, String> { } #[tauri::command] -async fn cmd_reload_plugins(plugin_manager: State<'_, PluginManager>) -> Result<(), String> { - plugin_manager.reload_all().await; +async fn cmd_reload_plugins( + app_handle: AppHandle, + plugin_manager: State<'_, PluginManager>, +) -> Result<(), String> { + plugin_manager + .initialize_all_plugins(&app_handle) + .await + .map_err(|e| e.to_string())?; Ok(()) } @@ -1476,9 +1509,12 @@ async fn cmd_plugin_info( ) -> Result { let plugin = get_plugin(&w, id).await.map_err(|e| e.to_string())?; plugin_manager - .get_plugin_info(plugin.directory.as_str()) + .get_plugin_by_dir(plugin.directory.as_str()) .await - .ok_or("Failed to find plugin info".to_string()) + .ok_or("Failed to find plugin info".to_string())? + .info() + .await + .ok_or("Failed to find plugin".to_string()) } #[tauri::command] @@ -1719,8 +1755,7 @@ pub fn run() { let plugin_cb = PluginTemplateCallback::new(app.app_handle().clone()); app.manage(plugin_cb); - let app_handle = app.app_handle().clone(); - monitor_plugin_events(&app_handle); + monitor_plugin_events(&app.app_handle().clone()); Ok(()) }) @@ -1732,7 +1767,7 @@ pub fn run() { cmd_create_folder, cmd_create_grpc_request, cmd_create_http_request, - cmd_create_plugin, + cmd_install_plugin, cmd_create_workspace, cmd_curl_to_request, cmd_delete_all_grpc_connections, @@ -1744,6 +1779,7 @@ pub fn run() { cmd_delete_grpc_request, cmd_delete_http_request, cmd_delete_http_response, + cmd_uninstall_plugin, cmd_delete_workspace, cmd_dismiss_notification, cmd_duplicate_grpc_request, @@ -1909,7 +1945,7 @@ fn create_window(handle: &AppHandle, url: &str) -> WebviewWindow { handle.set_menu(menu).expect("Failed to set app menu"); let window_num = handle.webview_windows().len(); - let label = format!("main_{}", window_num); + let label = format!("{MAIN_WINDOW_PREFIX}{window_num}"); info!("Create new window label={label}"); let mut win_builder = tauri::WebviewWindowBuilder::new(handle, label, WebviewUrl::App(url.into())) @@ -2005,14 +2041,20 @@ fn monitor_plugin_events(app_handle: &AppHandle) { let app_handle = app_handle.clone(); tauri::async_runtime::spawn(async move { let plugin_manager: State<'_, PluginManager> = app_handle.state(); - let (_rx_id, mut rx) = plugin_manager.subscribe().await; + let (rx_id, mut rx) = plugin_manager.subscribe().await; while let Some(event) = rx.recv().await { let app_handle = app_handle.clone(); - let plugin = plugin_manager - .get_plugin(event.plugin_ref_id.as_str()) + let plugin = match plugin_manager + .get_plugin_by_ref_id(event.plugin_ref_id.as_str()) .await - .unwrap(); + { + None => { + warn!("Failed to get plugin for event {:?}", event); + continue; + } + Some(p) => p, + }; // We might have recursive back-and-forth calls between app and plugin, so we don't // want to block here @@ -2020,6 +2062,7 @@ fn monitor_plugin_events(app_handle: &AppHandle) { handle_plugin_event(&app_handle, &event, &plugin).await; }); } + plugin_manager.unsubscribe(rx_id.as_str()).await; }); } @@ -2062,19 +2105,19 @@ async fn handle_plugin_event( )) } InternalEventPayload::RenderHttpRequestRequest(req) => { - let w = get_focused_window_no_lock(app_handle).expect("No focused window"); + let window = get_focused_window_no_lock(app_handle).expect("No focused window"); let workspace = get_workspace(app_handle, req.http_request.workspace_id.as_str()) .await .expect("Failed to get workspace for request"); - let url = w.url().unwrap(); + let url = window.url().unwrap(); let mut query_pairs = url.query_pairs(); let environment_id = query_pairs .find(|(k, _v)| k == "environment_id") .map(|(_k, v)| v.to_string()); let environment = match environment_id { None => None, - Some(id) => get_environment(&w, id.as_str()).await.ok(), + Some(id) => get_environment(&window, id.as_str()).await.ok(), }; let cb = &*app_handle.state::(); let rendered_http_request = @@ -2086,28 +2129,22 @@ async fn handle_plugin_event( )) } InternalEventPayload::ReloadResponse => { - let w = get_focused_window_no_lock(app_handle).expect("No focused window"); - let plugins = list_plugins(&w).await.unwrap(); + let window = get_focused_window_no_lock(app_handle).expect("No focused window"); + let plugins = list_plugins(&window).await.unwrap(); for plugin in plugins { if plugin.directory != plugin_handle.dir { continue; } - upsert_plugin( - &w, - Plugin { - // TODO: Add reloaded_at field to use instead - updated_at: Utc::now().naive_utc(), - ..plugin - }, - ) - .await - .unwrap(); + let new_plugin = Plugin { + updated_at: Utc::now().naive_utc(), // TODO: Add reloaded_at field to use instead + ..plugin + }; + upsert_plugin(&window, new_plugin).await.unwrap(); } - let plugin_name = plugin_handle.info().await.unwrap().name; let toast_event = plugin_handle.build_event_to_send( &InternalEventPayload::ShowToastRequest(ShowToastRequest { - message: format!("Reloaded plugin {}", plugin_name), + message: format!("Reloaded plugin {}", plugin_handle.dir), variant: ToastVariant::Info, }), None, @@ -2174,18 +2211,25 @@ async fn handle_plugin_event( fn get_focused_window_no_lock(app_handle: &AppHandle) -> Option> { // TODO: Getting the focused window doesn't seem to work on Windows, so // we'll need to pass the window label into plugin events instead. - if app_handle.webview_windows().len() == 1 { - let w = app_handle - .webview_windows() - .iter() - .next() - .map(|w| w.1.clone()); - return w; - } - - app_handle + let main_windows = app_handle .webview_windows() .iter() - .find(|w| w.1.is_focused().unwrap_or(false)) - .map(|w| w.1.clone()) + .filter_map(|(_, w)| { + if w.label().starts_with(MAIN_WINDOW_PREFIX) { + Some(w.to_owned()) + } else { + None + } + }) + .collect::>>(); + + if main_windows.len() == 1 { + return main_windows.iter().next().map(|w| w.clone()) + } + + main_windows + .iter() + .cloned() + .find(|w| w.is_focused().unwrap_or(false)) + .map(|w| w.clone()) } diff --git a/src-tauri/src/template_callback.rs b/src-tauri/src/template_callback.rs index b0f68e2f..d797d2eb 100644 --- a/src-tauri/src/template_callback.rs +++ b/src-tauri/src/template_callback.rs @@ -34,7 +34,7 @@ impl TemplateCallback for PluginTemplateCallback { } else { fn_name }; - + let plugin_manager = self.app_handle.state::(); let function = plugin_manager .get_template_functions() @@ -46,7 +46,7 @@ impl TemplateCallback for PluginTemplateCallback { .ok_or("")?; let mut args_with_defaults = args.clone(); - + // Fill in default values for all args for a_def in function.args { let base = match a_def { diff --git a/src-tauri/src/updates.rs b/src-tauri/src/updates.rs index 298c5cdc..20250432 100644 --- a/src-tauri/src/updates.rs +++ b/src-tauri/src/updates.rs @@ -74,7 +74,7 @@ impl YaakUpdater { tauri::async_runtime::block_on(async move { info!("Shutting down plugin manager before update"); let plugin_manager = h.state::(); - plugin_manager.cleanup().await; + plugin_manager.terminate().await; }); }); }) diff --git a/src-tauri/yaak_models/src/queries.rs b/src-tauri/yaak_models/src/queries.rs index eec3acfd..b9f2e7f0 100644 --- a/src-tauri/yaak_models/src/queries.rs +++ b/src-tauri/yaak_models/src/queries.rs @@ -954,6 +954,21 @@ pub async fn upsert_plugin( Ok(emit_upserted_model(window, m)) } +pub async fn delete_plugin(window: &WebviewWindow, id: &str) -> Result { + let plugin = get_plugin(window, id).await?; + + let dbm = &*window.app_handle().state::(); + let db = dbm.0.lock().await.get().unwrap(); + + let (sql, params) = Query::delete() + .from_table(PluginIden::Table) + .cond_where(Expr::col(PluginIden::Id).eq(id)) + .build_rusqlite(SqliteQueryBuilder); + db.execute(sql.as_str(), &*params.as_params())?; + + emit_deleted_model(window, plugin) +} + pub async fn get_folder(mgr: &impl Manager, id: &str) -> Result { let dbm = &*mgr.state::(); let db = dbm.0.lock().await.get().unwrap(); diff --git a/src-tauri/yaak_plugin_runtime/src/error.rs b/src-tauri/yaak_plugin_runtime/src/error.rs index 32729731..710189a9 100644 --- a/src-tauri/yaak_plugin_runtime/src/error.rs +++ b/src-tauri/yaak_plugin_runtime/src/error.rs @@ -5,22 +5,35 @@ use crate::server::plugin_runtime::EventStreamEvent; #[derive(Error, Debug)] pub enum Error { - #[error("IO error")] + #[error("IO error: {0}")] IoErr(#[from] io::Error), - #[error("Tauri error")] + + #[error("Tauri error: {0}")] TauriErr(#[from] tauri::Error), - #[error("Tauri shell error")] + + #[error("Tauri shell error: {0}")] TauriShellErr(#[from] tauri_plugin_shell::Error), - #[error("Grpc transport error")] + + #[error("Grpc transport error: {0}")] GrpcTransportErr(#[from] tonic::transport::Error), - #[error("Grpc send error")] + + #[error("Grpc send error: {0}")] GrpcSendErr(#[from] SendError>), - #[error("JSON error")] + + #[error("JSON error: {0}")] JsonErr(#[from] serde_json::Error), + #[error("Plugin not found: {0}")] PluginNotFoundErr(String), + #[error("Plugin error: {0}")] PluginErr(String), + + #[error("Client not initialized error")] + ClientNotInitializedErr, + + #[error("Unknown event received")] + UnknownEventErr, } impl Into for Error { diff --git a/src-tauri/yaak_plugin_runtime/src/events.rs b/src-tauri/yaak_plugin_runtime/src/events.rs index 32d0b6dc..ceab0057 100644 --- a/src-tauri/yaak_plugin_runtime/src/events.rs +++ b/src-tauri/yaak_plugin_runtime/src/events.rs @@ -27,6 +27,9 @@ pub enum InternalEventPayload { ReloadRequest, ReloadResponse, + TerminateRequest, + TerminateResponse, + ImportRequest(ImportRequest), ImportResponse(ImportResponse), diff --git a/src-tauri/yaak_plugin_runtime/src/lib.rs b/src-tauri/yaak_plugin_runtime/src/lib.rs index 9b636f0c..60f76772 100644 --- a/src-tauri/yaak_plugin_runtime/src/lib.rs +++ b/src-tauri/yaak_plugin_runtime/src/lib.rs @@ -4,5 +4,5 @@ pub mod manager; mod nodejs; pub mod plugin; mod server; -pub mod handle; +pub mod plugin_handle; mod util; diff --git a/src-tauri/yaak_plugin_runtime/src/manager.rs b/src-tauri/yaak_plugin_runtime/src/manager.rs index 1e885612..6ffe181d 100644 --- a/src-tauri/yaak_plugin_runtime/src/manager.rs +++ b/src-tauri/yaak_plugin_runtime/src/manager.rs @@ -1,57 +1,236 @@ +use crate::error::Error::{ClientNotInitializedErr, PluginErr, PluginNotFoundErr, UnknownEventErr}; use crate::error::Result; use crate::events::{ - BootResponse, CallHttpRequestActionRequest, CallTemplateFunctionArgs, + BootRequest, CallHttpRequestActionRequest, CallTemplateFunctionArgs, CallTemplateFunctionRequest, CallTemplateFunctionResponse, FilterRequest, FilterResponse, GetHttpRequestActionsRequest, GetHttpRequestActionsResponse, GetTemplateFunctionsResponse, ImportRequest, ImportResponse, InternalEvent, InternalEventPayload, RenderPurpose, }; -use std::collections::HashMap; - -use crate::error::Error::PluginErr; use crate::nodejs::start_nodejs_plugin_runtime; -use crate::plugin::start_server; -use crate::server::PluginRuntimeGrpcServer; +use crate::plugin_handle::PluginHandle; +use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntimeServer; +use crate::server::PluginRuntimeServerImpl; +use log::{info, warn}; +use std::collections::HashMap; +use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; -use tauri::{AppHandle, Runtime}; -use tokio::sync::mpsc; -use tokio::sync::watch::Sender; -use crate::handle::PluginHandle; +use tauri::path::BaseDirectory; +use tauri::{AppHandle, Manager, Runtime}; +use tokio::fs::read_dir; +use tokio::net::TcpListener; +use tokio::sync::{mpsc, Mutex}; +use tonic::codegen::tokio_stream; +use tonic::transport::Server; +use yaak_models::queries::{generate_id, list_plugins}; +#[derive(Clone)] pub struct PluginManager { - kill_tx: Sender, - server: PluginRuntimeGrpcServer, + subscribers: Arc>>>, + plugins: Arc>>, + kill_tx: tokio::sync::watch::Sender, + server: Arc, } impl PluginManager { - pub async fn new( - app_handle: &AppHandle, - plugin_dirs: Vec, - ) -> PluginManager { - let (server, addr) = start_server(plugin_dirs) - .await - .expect("Failed to start plugin runtime server"); + pub fn new(app_handle: AppHandle) -> PluginManager { + let (events_tx, mut events_rx) = mpsc::channel(128); + let (kill_server_tx, kill_server_rx) = tokio::sync::watch::channel(false); - let (kill_tx, kill_rx) = tokio::sync::watch::channel(false); - start_nodejs_plugin_runtime(app_handle, addr, &kill_rx) - .await - .expect("Failed to start plugin runtime"); + let (client_disconnect_tx, mut client_disconnect_rx) = mpsc::channel(128); + let (client_connect_tx, mut client_connect_rx) = tokio::sync::watch::channel(false); + let server = + PluginRuntimeServerImpl::new(events_tx, client_disconnect_tx, client_connect_tx); - PluginManager { kill_tx, server } + let plugin_manager = PluginManager { + plugins: Arc::new(Mutex::new(Vec::new())), + subscribers: Arc::new(Mutex::new(HashMap::new())), + server: Arc::new(server.clone()), + kill_tx: kill_server_tx, + }; + + // Forward events to subscribers + let subscribers = plugin_manager.subscribers.clone(); + tauri::async_runtime::spawn(async move { + while let Some(event) = events_rx.recv().await { + for (tx_id, tx) in subscribers.lock().await.iter_mut() { + if let Err(e) = tx.try_send(event.clone()) { + warn!("Failed to send event to subscriber {tx_id} {e:?}"); + } + } + } + }); + + // Handle when client plugin runtime disconnects + tauri::async_runtime::spawn(async move { + while let Some(_) = client_disconnect_rx.recv().await { + info!("Plugin runtime client disconnected! TODO: Handle this case"); + } + }); + + info!("Starting plugin server"); + + let svc = PluginRuntimeServer::new(server.to_owned()); + let listen_addr = match option_env!("PORT") { + None => "localhost:0".to_string(), + Some(port) => format!("localhost:{port}"), + }; + let listener = tauri::async_runtime::block_on(async move { + TcpListener::bind(listen_addr) + .await + .expect("Failed to bind TCP listener") + }); + let addr = listener.local_addr().expect("Failed to get local address"); + + // 1. Reload all plugins when the Node.js runtime connects + { + let plugin_manager = plugin_manager.clone(); + let app_handle = app_handle.clone(); + tauri::async_runtime::spawn(async move { + match client_connect_rx.changed().await { + Ok(_) => { + info!("Plugin runtime client connected!"); + plugin_manager + .initialize_all_plugins(&app_handle) + .await + .expect("Failed to reload plugins"); + } + Err(e) => { + warn!("Failed to receive from client connection rx {e:?}"); + } + } + }); + }; + + // 1. Spawn server in the background + info!("Starting gRPC plugin server on {addr}"); + tauri::async_runtime::spawn(async move { + Server::builder() + .timeout(Duration::from_secs(10)) + .add_service(svc) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await + .expect("grpc plugin runtime server failed to start"); + }); + + // 2. Start Node.js runtime and initialize plugins + tauri::async_runtime::block_on(async move { + start_nodejs_plugin_runtime(&app_handle, addr, &kill_server_rx) + .await + .unwrap(); + }); + + plugin_manager } - pub async fn reload_all(&self) { - self.server.reload_plugins().await + pub async fn list_plugin_dirs(&self, app_handle: &AppHandle) -> Vec { + let plugins_dir = app_handle + .path() + .resolve("plugins", BaseDirectory::Resource) + .expect("failed to resolve plugin directory resource"); + + let bundled_plugin_dirs = read_plugins_dir(&plugins_dir) + .await + .expect(format!("Failed to read plugins dir: {:?}", plugins_dir).as_str()); + + let plugins = list_plugins(app_handle).await.unwrap_or_default(); + let installed_plugin_dirs = plugins + .iter() + .map(|p| p.directory.to_owned()) + .collect::>(); + + let plugin_dirs = [bundled_plugin_dirs, installed_plugin_dirs].concat(); + plugin_dirs + } + + pub async fn uninstall(&self, dir: &str) -> Result<()> { + let plugin = self + .get_plugin_by_dir(dir) + .await + .ok_or(PluginNotFoundErr(dir.to_string()))?; + self.remove_plugin(&plugin).await + } + + async fn remove_plugin(&self, plugin: &PluginHandle) -> Result<()> { + let mut plugins = self.plugins.lock().await; + + // Terminate the plugin + plugin.terminate().await?; + + // Remove the plugin from the list + let pos = plugins.iter().position(|p| p.ref_id == plugin.ref_id); + if let Some(pos) = pos { + plugins.remove(pos); + } + + Ok(()) + } + + pub async fn add_plugin_by_dir(&self, dir: &str) -> Result<()> { + info!("Adding plugin by dir {dir}"); + let maybe_tx = self.server.app_to_plugin_events_tx.lock().await; + let tx = match &*maybe_tx { + None => return Err(ClientNotInitializedErr), + Some(tx) => tx, + }; + let ph = PluginHandle::new(dir, tx.clone()); + self.plugins.lock().await.push(ph.clone()); + let plugin = self + .get_plugin_by_dir(dir) + .await + .ok_or(PluginNotFoundErr(dir.to_string()))?; + + // Boot the plugin + let event = self + .send_to_plugin_and_wait( + &plugin, + &InternalEventPayload::BootRequest(BootRequest { + dir: dir.to_string(), + }), + ) + .await?; + + let resp = match event.payload { + InternalEventPayload::BootResponse(resp) => resp, + _ => return Err(UnknownEventErr), + }; + + plugin.set_boot_response(&resp).await; + + Ok(()) + } + + pub async fn initialize_all_plugins( + &self, + app_handle: &AppHandle, + ) -> Result<()> { + for dir in self.list_plugin_dirs(app_handle).await { + // First remove the plugin if it exists + if let Some(plugin) = self.get_plugin_by_dir(dir.as_str()).await { + if let Err(e) = self.remove_plugin(&plugin).await { + warn!("Failed to remove plugin {dir} {e:?}"); + } + } + if let Err(e) = self.add_plugin_by_dir(dir.as_str()).await { + warn!("Failed to add plugin {dir} {e:?}"); + } + } + + Ok(()) } pub async fn subscribe(&self) -> (String, mpsc::Receiver) { - self.server.subscribe().await + let (tx, rx) = mpsc::channel(128); + let rx_id = generate_id(); + self.subscribers.lock().await.insert(rx_id.clone(), tx); + (rx_id, rx) } pub async fn unsubscribe(&self, rx_id: &str) { - self.server.unsubscribe(rx_id).await + self.subscribers.lock().await.remove(rx_id); } - pub async fn cleanup(&self) { + pub async fn terminate(&self) { self.kill_tx.send_replace(true); // Give it a bit of time to kill @@ -64,22 +243,115 @@ impl PluginManager { payload: &InternalEventPayload, ) -> Result<()> { let reply_id = Some(source_event.clone().id); - self.server - .send(&payload, source_event.plugin_ref_id.as_str(), reply_id) + let plugin = self + .get_plugin_by_ref_id(source_event.plugin_ref_id.as_str()) + .await + .ok_or(PluginNotFoundErr(source_event.plugin_ref_id.to_string()))?; + let event = plugin.build_event_to_send(&payload, reply_id); + plugin.send(&event).await + } + + pub async fn get_plugin_by_ref_id(&self, ref_id: &str) -> Option { + self.plugins + .lock() + .await + .iter() + .find(|p| p.ref_id == ref_id) + .cloned() + } + + pub async fn get_plugin_by_dir(&self, dir: &str) -> Option { + self.plugins + .lock() + .await + .iter() + .find(|p| p.dir == dir) + .cloned() + } + + pub async fn get_plugin_by_name(&self, name: &str) -> Option { + for plugin in self.plugins.lock().await.iter().cloned() { + let info = plugin.info().await?; + if info.name == name { + return Some(plugin); + } + } + None + } + + async fn send_to_plugin_and_wait( + &self, + plugin: &PluginHandle, + payload: &InternalEventPayload, + ) -> Result { + let events = self + .send_to_plugins_and_wait(payload, vec![plugin.to_owned()]) + .await?; + Ok(events.first().unwrap().to_owned()) + } + + async fn send_and_wait(&self, payload: &InternalEventPayload) -> Result> { + self.send_to_plugins_and_wait(payload, self.plugins.lock().await.clone()) .await } - pub async fn get_plugin_info(&self, dir: &str) -> Option { - self.server.plugin_by_dir(dir).await.ok()?.info().await - } + async fn send_to_plugins_and_wait( + &self, + payload: &InternalEventPayload, + plugins: Vec, + ) -> Result> { + let (rx_id, mut rx) = self.subscribe().await; - pub async fn get_plugin(&self, ref_id: &str) -> Result { - self.server.plugin_by_ref_id(ref_id).await + // 1. Build the events with IDs and everything + let events_to_send = plugins + .iter() + .map(|p| p.build_event_to_send(payload, None)) + .collect::>(); + + // 2. Spawn thread to subscribe to incoming events and check reply ids + let send_events_fut = { + let events_to_send = events_to_send.clone(); + + tokio::spawn(async move { + let mut found_events = Vec::new(); + + while let Some(event) = rx.recv().await { + if events_to_send + .iter() + .find(|e| Some(e.id.to_owned()) == event.reply_id) + .is_some() + { + found_events.push(event.clone()); + }; + if found_events.len() == events_to_send.len() { + break; + } + } + + found_events + }) + }; + + // 3. Send the events + for event in events_to_send { + let plugin = plugins + .iter() + .find(|p| p.ref_id == event.plugin_ref_id) + .expect("Didn't find plugin in list"); + plugin.send(&event).await? + } + + // 4. Join on the spawned thread + let events = send_events_fut.await.expect("Thread didn't succeed"); + + // 5. Unsubscribe + self.unsubscribe(rx_id.as_str()).await; + + Ok(events) } pub async fn get_http_request_actions(&self) -> Result> { let reply_events = self - .server .send_and_wait(&InternalEventPayload::GetHttpRequestActionsRequest( GetHttpRequestActionsRequest {}, )) @@ -97,7 +369,6 @@ impl PluginManager { pub async fn get_template_functions(&self) -> Result> { let reply_events = self - .server .send_and_wait(&InternalEventPayload::GetTemplateFunctionsRequest) .await?; @@ -112,10 +383,11 @@ impl PluginManager { } pub async fn call_http_request_action(&self, req: CallHttpRequestActionRequest) -> Result<()> { + let ref_id = req.plugin_ref_id.clone(); let plugin = self - .server - .plugin_by_ref_id(req.plugin_ref_id.as_str()) - .await?; + .get_plugin_by_ref_id(ref_id.as_str()) + .await + .ok_or(PluginNotFoundErr(ref_id))?; let event = plugin.build_event_to_send( &InternalEventPayload::CallHttpRequestActionRequest(req), None, @@ -139,7 +411,6 @@ impl PluginManager { }; let events = self - .server .send_and_wait(&InternalEventPayload::CallTemplateFunctionRequest(req)) .await?; @@ -155,7 +426,6 @@ impl PluginManager { pub async fn import_data(&self, content: &str) -> Result<(ImportResponse, String)> { let reply_events = self - .server .send_and_wait(&InternalEventPayload::ImportRequest(ImportRequest { content: content.to_string(), })) @@ -172,9 +442,12 @@ impl PluginManager { "No importers found for file contents".to_string(), )), Some((resp, ref_id)) => { - let plugin = self.server.plugin_by_ref_id(ref_id.as_str()).await?; - let plugin_name = plugin.name().await; - Ok((resp, plugin_name)) + let plugin = self + .get_plugin_by_ref_id(ref_id.as_str()) + .await + .ok_or(PluginNotFoundErr(ref_id))?; + let info = plugin.info().await.unwrap(); + Ok((resp, info.name)) } } } @@ -191,10 +464,14 @@ impl PluginManager { "filter-xpath" }; + let plugin = self + .get_plugin_by_dir(plugin_name) + .await + .ok_or(PluginNotFoundErr(plugin_name.to_string()))?; + let event = self - .server .send_to_plugin_and_wait( - plugin_name, + &plugin, &InternalEventPayload::FilterRequest(FilterRequest { filter: filter.to_string(), content: content.to_string(), @@ -211,3 +488,39 @@ impl PluginManager { } } } + +async fn read_plugins_dir(dir: &PathBuf) -> Result> { + let mut result = read_dir(dir).await?; + let mut dirs: Vec = vec![]; + while let Ok(Some(entry)) = result.next_entry().await { + if entry.path().is_dir() { + #[cfg(target_os = "windows")] + dirs.push(fix_windows_paths(&entry.path())); + #[cfg(not(target_os = "windows"))] + dirs.push(entry.path().to_string_lossy().to_string()); + } + } + Ok(dirs) +} + +#[cfg(target_os = "windows")] +fn fix_windows_paths(p: &PathBuf) -> String { + use dunce; + use path_slash::PathBufExt; + use regex::Regex; + + // 1. Remove UNC prefix for Windows paths to pass to sidecar + let safe_path = dunce::simplified(p.as_path()).to_string_lossy().to_string(); + + // 2. Remove the drive letter + let safe_path = Regex::new("^[a-zA-Z]:") + .unwrap() + .replace(safe_path.as_str(), ""); + + // 3. Convert backslashes to forward + let safe_path = PathBuf::from(safe_path.to_string()) + .to_slash_lossy() + .to_string(); + + safe_path +} diff --git a/src-tauri/yaak_plugin_runtime/src/nodejs.rs b/src-tauri/yaak_plugin_runtime/src/nodejs.rs index cf903ec5..4c9a3488 100644 --- a/src-tauri/yaak_plugin_runtime/src/nodejs.rs +++ b/src-tauri/yaak_plugin_runtime/src/nodejs.rs @@ -39,7 +39,7 @@ pub async fn start_nodejs_plugin_runtime( .args(&[plugin_runtime_main]); let (mut child_rx, child) = cmd.spawn()?; - println!("Spawned plugin runtime"); + info!("Spawned plugin runtime"); let mut kill_rx = kill_rx.clone(); diff --git a/src-tauri/yaak_plugin_runtime/src/plugin.rs b/src-tauri/yaak_plugin_runtime/src/plugin.rs index ead60212..811b79f2 100644 --- a/src-tauri/yaak_plugin_runtime/src/plugin.rs +++ b/src-tauri/yaak_plugin_runtime/src/plugin.rs @@ -1,46 +1,16 @@ -use crate::error::Result; -use crate::events::{InternalEvent, InternalEventPayload}; use crate::manager::PluginManager; -use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntimeServer; -use crate::server::PluginRuntimeGrpcServer; use log::info; -use std::net::SocketAddr; -use std::path::PathBuf; use std::process::exit; -use std::time::Duration; -use tauri::path::BaseDirectory; use tauri::plugin::{Builder, TauriPlugin}; use tauri::{Manager, RunEvent, Runtime, State}; -use tokio::fs::read_dir; -use tokio::net::TcpListener; -use tonic::codegen::tokio_stream; -use tonic::transport::Server; -use yaak_models::queries::list_plugins; pub fn init() -> TauriPlugin { Builder::new("yaak_plugin_runtime") .setup(|app_handle, _| { - let plugins_dir = app_handle - .path() - .resolve("plugins", BaseDirectory::Resource) - .expect("failed to resolve plugin directory resource"); + let manager = PluginManager::new(app_handle.clone()); + app_handle.manage(manager.clone()); - tauri::async_runtime::block_on(async move { - let bundled_plugin_dirs = read_plugins_dir(&plugins_dir) - .await - .expect(format!("Failed to read plugins dir: {:?}", plugins_dir).as_str()); - - let plugins = list_plugins(app_handle).await.unwrap_or_default(); - let installed_plugin_dirs = plugins - .iter() - .map(|p| p.directory.to_owned()) - .collect::>(); - - let plugin_dirs = [installed_plugin_dirs, bundled_plugin_dirs].concat(); - let manager = PluginManager::new(&app_handle, plugin_dirs).await; - app_handle.manage(manager); - Ok(()) - }) + Ok(()) }) .on_event(|app, e| match e { // TODO: Also exit when app is force-quit (eg. cmd+r in IntelliJ runner) @@ -49,94 +19,11 @@ pub fn init() -> TauriPlugin { tauri::async_runtime::block_on(async move { info!("Exiting plugin runtime due to app exit"); let manager: State = app.state(); - manager.cleanup().await; + manager.terminate().await; exit(0); }); } _ => {} }) .build() -} - -pub async fn start_server( - plugin_dirs: Vec, -) -> Result<(PluginRuntimeGrpcServer, SocketAddr)> { - println!("Starting plugin server with {plugin_dirs:?}"); - let server = PluginRuntimeGrpcServer::new(plugin_dirs); - - let svc = PluginRuntimeServer::new(server.clone()); - let listen_addr = match option_env!("PORT") { - None => "localhost:0".to_string(), - Some(port) => format!("localhost:{port}"), - }; - - { - let server = server.clone(); - tokio::spawn(async move { - let (rx_id, mut rx) = server.subscribe().await; - while let Some(event) = rx.recv().await { - match event.clone() { - InternalEvent { - payload: InternalEventPayload::BootResponse(resp), - plugin_ref_id, - .. - } => { - server.boot_plugin(plugin_ref_id.as_str(), &resp).await; - } - _ => {} - }; - } - server.unsubscribe(rx_id.as_str()).await; - }); - }; - - let listener = TcpListener::bind(listen_addr).await?; - let addr = listener.local_addr()?; - println!("Starting gRPC plugin server on {addr}"); - tokio::spawn(async move { - Server::builder() - .timeout(Duration::from_secs(10)) - .add_service(svc) - .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) - .await - .expect("grpc plugin runtime server failed to start"); - }); - - Ok((server, addr)) -} - -async fn read_plugins_dir(dir: &PathBuf) -> Result> { - let mut result = read_dir(dir).await?; - let mut dirs: Vec = vec![]; - while let Ok(Some(entry)) = result.next_entry().await { - if entry.path().is_dir() { - #[cfg(target_os = "windows")] - dirs.push(fix_windows_paths(&entry.path())); - #[cfg(not(target_os = "windows"))] - dirs.push(entry.path().to_string_lossy().to_string()); - } - } - Ok(dirs) -} - -#[cfg(target_os = "windows")] -fn fix_windows_paths(p: &PathBuf) -> String { - use dunce; - use path_slash::PathBufExt; - use regex::Regex; - - // 1. Remove UNC prefix for Windows paths to pass to sidecar - let safe_path = dunce::simplified(p.as_path()).to_string_lossy().to_string(); - - // 2. Remove the drive letter - let safe_path = Regex::new("^[a-zA-Z]:") - .unwrap() - .replace(safe_path.as_str(), ""); - - // 3. Convert backslashes to forward - let safe_path = PathBuf::from(safe_path.to_string()) - .to_slash_lossy() - .to_string(); - - safe_path -} +} \ No newline at end of file diff --git a/src-tauri/yaak_plugin_runtime/src/handle.rs b/src-tauri/yaak_plugin_runtime/src/plugin_handle.rs similarity index 58% rename from src-tauri/yaak_plugin_runtime/src/handle.rs rename to src-tauri/yaak_plugin_runtime/src/plugin_handle.rs index 915d5fd5..a4d5937e 100644 --- a/src-tauri/yaak_plugin_runtime/src/handle.rs +++ b/src-tauri/yaak_plugin_runtime/src/plugin_handle.rs @@ -1,7 +1,9 @@ +use crate::error::Result; use crate::events::{BootResponse, InternalEvent, InternalEventPayload}; use crate::server::plugin_runtime::EventStreamEvent; use crate::util::gen_id; use std::sync::Arc; +use log::info; use tokio::sync::{mpsc, Mutex}; #[derive(Clone)] @@ -13,10 +15,14 @@ pub struct PluginHandle { } impl PluginHandle { - pub async fn name(&self) -> String { - match &*self.boot_resp.lock().await { - None => "__NOT_BOOTED__".to_string(), - Some(r) => r.name.to_owned(), + pub fn new(dir: &str, tx: mpsc::Sender>) -> Self { + let ref_id = gen_id(); + + PluginHandle { + ref_id: ref_id.clone(), + dir: dir.to_string(), + to_plugin_tx: Arc::new(Mutex::new(tx)), + boot_resp: Arc::new(Mutex::new(None)), } } @@ -38,28 +44,33 @@ impl PluginHandle { } } - pub async fn reload(&self) -> crate::error::Result<()> { - let event = self.build_event_to_send(&InternalEventPayload::ReloadRequest, None); + pub async fn terminate(&self) -> Result<()> { + info!("Terminating plugin {}", self.dir); + let event = self.build_event_to_send(&InternalEventPayload::TerminateRequest, None); self.send(&event).await } - pub async fn send(&self, event: &InternalEvent) -> crate::error::Result<()> { - // info!( - // "Sending event to plugin {} {:?}", - // event.id, - // self.name().await - // ); + pub async fn send(&self, event: &InternalEvent) -> Result<()> { self.to_plugin_tx .lock() .await .send(Ok(EventStreamEvent { - event: serde_json::to_string(&event)?, + event: serde_json::to_string(event)?, })) .await?; Ok(()) } - pub async fn boot(&self, resp: &BootResponse) { + pub async fn send_payload( + &self, + payload: &InternalEventPayload, + reply_id: Option, + ) -> Result<()> { + let event = self.build_event_to_send(payload, reply_id); + self.send(&event).await + } + + pub async fn set_boot_response(&self, resp: &BootResponse) { let mut boot_resp = self.boot_resp.lock().await; *boot_resp = Some(resp.clone()); } diff --git a/src-tauri/yaak_plugin_runtime/src/server.rs b/src-tauri/yaak_plugin_runtime/src/server.rs index b40f1515..ccfba40c 100644 --- a/src-tauri/yaak_plugin_runtime/src/server.rs +++ b/src-tauri/yaak_plugin_runtime/src/server.rs @@ -1,293 +1,47 @@ -use std::collections::HashMap; +use log::warn; use std::pin::Pin; use std::sync::Arc; -use log::warn; -use tokio::sync::mpsc::Receiver; use tokio::sync::{mpsc, Mutex}; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; -use crate::error::Error::PluginNotFoundErr; -use crate::error::Result; -use crate::events::{InternalEvent, InternalEventPayload, BootRequest, BootResponse}; -use crate::handle::PluginHandle; +use crate::events::InternalEvent; use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntime; -use crate::util::gen_id; use plugin_runtime::EventStreamEvent; -use yaak_models::queries::generate_id; pub mod plugin_runtime { tonic::include_proto!("yaak.plugins.runtime"); } -type ResponseStream = - Pin> + Send>>; +type ResponseStream = Pin> + Send>>; #[derive(Clone)] -pub struct PluginRuntimeGrpcServer { - plugin_ref_to_plugin: Arc>>, - callback_to_plugin_ref: Arc>>, - subscribers: Arc>>>, - plugin_dirs: Vec, +pub(crate) struct PluginRuntimeServerImpl { + pub(crate) app_to_plugin_events_tx: + Arc>>>>, + client_disconnect_tx: mpsc::Sender, + client_connect_tx: tokio::sync::watch::Sender, + plugin_to_app_events_tx: mpsc::Sender, } -impl PluginRuntimeGrpcServer { - pub fn new(plugin_dirs: Vec) -> Self { - PluginRuntimeGrpcServer { - plugin_ref_to_plugin: Arc::new(Mutex::new(HashMap::new())), - callback_to_plugin_ref: Arc::new(Mutex::new(HashMap::new())), - subscribers: Arc::new(Mutex::new(HashMap::new())), - plugin_dirs, +impl PluginRuntimeServerImpl { + pub fn new( + events_tx: mpsc::Sender, + disconnect_tx: mpsc::Sender, + connect_tx: tokio::sync::watch::Sender, + ) -> Self { + PluginRuntimeServerImpl { + app_to_plugin_events_tx: Arc::new(Mutex::new(None)), + client_disconnect_tx: disconnect_tx, + client_connect_tx: connect_tx, + plugin_to_app_events_tx: events_tx, } } - - pub async fn plugins(&self) -> Vec { - self.plugin_ref_to_plugin - .lock() - .await - .iter() - .map(|p| p.1.to_owned()) - .collect::>() - } - - pub async fn subscribe(&self) -> (String, Receiver) { - let (tx, rx) = mpsc::channel(128); - let rx_id = generate_id(); - self.subscribers.lock().await.insert(rx_id.clone(), tx); - (rx_id, rx) - } - - pub async fn unsubscribe(&self, rx_id: &str) { - self.subscribers.lock().await.remove(rx_id); - } - - pub async fn remove_plugins(&self, plugin_ids: Vec) { - for plugin_id in plugin_ids { - self.remove_plugin(plugin_id.as_str()).await; - } - } - - pub async fn remove_plugin(&self, id: &str) { - match self.plugin_ref_to_plugin.lock().await.remove(id) { - None => println!("Tried to remove non-existing plugin {}", id), - Some(plugin) => println!("Removed plugin {} {}", id, plugin.name().await), - }; - } - - pub async fn boot_plugin(&self, id: &str, resp: &BootResponse) { - match self.plugin_ref_to_plugin.lock().await.get(id) { - None => println!("Tried booting non-existing plugin {}", id), - Some(plugin) => plugin.clone().boot(resp).await, - } - } - - pub async fn add_plugin( - &self, - dir: &str, - tx: mpsc::Sender>, - ) -> PluginHandle { - let ref_id = gen_id(); - let plugin_handle = PluginHandle { - ref_id: ref_id.clone(), - dir: dir.to_string(), - to_plugin_tx: Arc::new(Mutex::new(tx)), - boot_resp: Arc::new(Mutex::new(None)), - }; - let _ = self - .plugin_ref_to_plugin - .lock() - .await - .insert(ref_id, plugin_handle.clone()); - plugin_handle - } - - pub async fn plugin_by_ref_id(&self, ref_id: &str) -> Result { - let plugins = self.plugin_ref_to_plugin.lock().await; - match plugins.get(ref_id) { - None => Err(PluginNotFoundErr(ref_id.into())), - Some(p) => Ok(p.to_owned()), - } - } - - pub async fn plugin_by_dir(&self, dir: &str) -> Result { - let plugins = self.plugin_ref_to_plugin.lock().await; - for p in plugins.values() { - if p.dir == dir { - return Ok(p.to_owned()); - } - } - - Err(PluginNotFoundErr(dir.into())) - } - - pub async fn plugin_by_name(&self, plugin_name: &str) -> Result { - let plugins = self.plugin_ref_to_plugin.lock().await; - for p in plugins.values() { - if p.name().await == plugin_name { - return Ok(p.to_owned()); - } - } - - Err(PluginNotFoundErr(plugin_name.into())) - } - - pub async fn send( - &self, - payload: &InternalEventPayload, - plugin_ref_id: &str, - reply_id: Option, - ) -> Result<()> { - let plugin = self.plugin_by_ref_id(plugin_ref_id).await?; - let event = plugin.build_event_to_send(payload, reply_id); - plugin.send(&event).await - } - - pub async fn send_to_plugin( - &self, - plugin_name: &str, - payload: InternalEventPayload, - ) -> Result { - let plugins = self.plugin_ref_to_plugin.lock().await; - if plugins.is_empty() { - return Err(PluginNotFoundErr(plugin_name.into())); - } - - let mut plugin = None; - for p in plugins.values() { - if p.name().await == plugin_name { - plugin = Some(p); - break; - } - } - - match plugin { - Some(plugin) => { - let event = plugin.build_event_to_send(&payload, None); - plugin.send(&event).await?; - Ok(event) - } - None => Err(PluginNotFoundErr(plugin_name.into())), - } - } - - pub async fn send_to_plugin_and_wait( - &self, - plugin_name: &str, - payload: &InternalEventPayload, - ) -> Result { - let plugin = self.plugin_by_name(plugin_name).await?; - let events = self.send_to_plugins_and_wait(payload, vec![plugin]).await?; - Ok(events.first().unwrap().to_owned()) - } - - pub async fn send_and_wait( - &self, - payload: &InternalEventPayload, - ) -> Result> { - let plugins = self - .plugin_ref_to_plugin - .lock() - .await - .values() - .cloned() - .collect(); - self.send_to_plugins_and_wait(payload, plugins).await - } - - async fn send_to_plugins_and_wait( - &self, - payload: &InternalEventPayload, - plugins: Vec, - ) -> Result> { - // 1. Build the events with IDs and everything - let events_to_send = plugins - .iter() - .map(|p| p.build_event_to_send(payload, None)) - .collect::>(); - - // 2. Spawn thread to subscribe to incoming events and check reply ids - let server = self.clone(); - let send_events_fut = { - let events_to_send = events_to_send.clone(); - tokio::spawn(async move { - let (rx_id, mut rx) = server.subscribe().await; - let mut found_events = Vec::new(); - - while let Some(event) = rx.recv().await { - if events_to_send - .iter() - .find(|e| Some(e.id.to_owned()) == event.reply_id) - .is_some() - { - found_events.push(event.clone()); - }; - if found_events.len() == events_to_send.len() { - break; - } - } - server.unsubscribe(rx_id.as_str()).await; - - found_events - }) - }; - - // 3. Send the events - for event in events_to_send { - let plugin = plugins - .iter() - .find(|p| p.ref_id == event.plugin_ref_id) - .expect("Didn't find plugin in list"); - plugin.send(&event).await? - } - - // 4. Join on the spawned thread - let events = send_events_fut.await.expect("Thread didn't succeed"); - Ok(events) - } - - pub async fn reload_plugins(&self) { - for (_, plugin) in self.plugin_ref_to_plugin.lock().await.clone() { - if let Err(e) = plugin.reload().await { - warn!("Failed to reload plugin {} {}", plugin.dir, e) - } - } - } - - async fn load_plugins( - &self, - to_plugin_tx: mpsc::Sender>, - plugin_dirs: Vec, - ) -> Vec { - let mut plugin_ids = Vec::new(); - - for dir in plugin_dirs { - let plugin = self.add_plugin(dir.as_str(), to_plugin_tx.clone()).await; - plugin_ids.push(plugin.clone().ref_id); - - let event = plugin.build_event_to_send( - &InternalEventPayload::BootRequest(BootRequest { - dir: dir.to_string(), - }), - None, - ); - if let Err(e) = plugin.send(&event).await { - // TODO: Error handling - println!( - "Failed boot plugin {} at {} -> {}", - plugin.ref_id, plugin.dir, e - ) - } else { - println!("Loaded plugin {} at {}", plugin.ref_id, plugin.dir) - } - } - - plugin_ids - } } #[tonic::async_trait] -impl PluginRuntime for PluginRuntimeGrpcServer { +impl PluginRuntime for PluginRuntimeServerImpl { type EventStreamStream = ResponseStream; async fn event_stream( @@ -296,51 +50,48 @@ impl PluginRuntime for PluginRuntimeGrpcServer { ) -> tonic::Result> { let mut in_stream = req.into_inner(); - let (to_plugin_tx, to_plugin_rx) = mpsc::channel(128); + let (to_plugin_tx, to_plugin_rx) = mpsc::channel::>(128); + let mut app_to_plugin_events_tx = self.app_to_plugin_events_tx.lock().await; + *app_to_plugin_events_tx = Some(to_plugin_tx); + println!("GRPC CLIENT CONNECTED"); - let plugin_ids = self - .load_plugins(to_plugin_tx, self.plugin_dirs.clone()) - .await; + let plugin_to_app_events_tx = self.plugin_to_app_events_tx.clone(); + let client_disconnect_tx = self.client_disconnect_tx.clone(); + + self.client_connect_tx + .send(true) + .expect("Failed to send client ready event"); - let callbacks = self.callback_to_plugin_ref.clone(); - let server = self.clone(); tokio::spawn(async move { while let Some(result) = in_stream.next().await { + // Received event from plugin runtime match result { Ok(v) => { let event: InternalEvent = match serde_json::from_str(v.event.as_str()) { Ok(pe) => pe, Err(e) => { - println!("Failed to deserialize event {e:?} -> {}", v.event); + warn!("Failed to deserialize event {e:?} -> {}", v.event); continue; } }; - let plugin_ref_id = event.plugin_ref_id.clone(); - let reply_id = event.reply_id.clone(); - - let subscribers = server.subscribers.lock().await; - for tx in subscribers.values() { - // Emit event to the channel for server to handle - if let Err(e) = tx.try_send(event.clone()) { - println!("Failed to send to server channel (n={}). Receiver probably isn't listening: {:?}", subscribers.len(), e); - } - } - - // Add to callbacks if there's a reply_id - if let Some(reply_id) = reply_id { - callbacks.lock().await.insert(reply_id, plugin_ref_id); + // Send event to subscribers + // Emit event to the channel for server to handle + if let Err(e) = plugin_to_app_events_tx.try_send(event.clone()) { + warn!("Failed to send to channel. Receiver probably isn't listening: {:?}", e); } } Err(err) => { // TODO: Better error handling - println!("gRPC server error {err}"); + warn!("gRPC server error {err}"); break; } }; } - server.remove_plugins(plugin_ids).await; + if let Err(e) = client_disconnect_tx.send(true).await { + warn!("Failed to send killed event {:?}", e); + } }); // Write the same data that was received diff --git a/src-web/components/Settings/SettingsPlugins.tsx b/src-web/components/Settings/SettingsPlugins.tsx index 624314cb..e472e019 100644 --- a/src-web/components/Settings/SettingsPlugins.tsx +++ b/src-web/components/Settings/SettingsPlugins.tsx @@ -1,11 +1,11 @@ import type { Plugin } from '@yaakapp/api'; import { open } from '@tauri-apps/plugin-shell'; import React from 'react'; -import { useCreatePlugin } from '../../hooks/useCreatePlugin'; +import { useInstallPlugin } from '../../hooks/useInstallPlugin'; +import { useUninstallPlugin } from '../../hooks/useUninstallPlugin'; import { usePluginInfo } from '../../hooks/usePluginInfo'; import { usePlugins, useRefreshPlugins } from '../../hooks/usePlugins'; import { Button } from '../core/Button'; -import { Checkbox } from '../core/Checkbox'; import { IconButton } from '../core/IconButton'; import { InlineCode } from '../core/InlineCode'; import { HStack } from '../core/Stacks'; @@ -15,7 +15,7 @@ import { SelectFile } from '../SelectFile'; export function SettingsPlugins() { const [directory, setDirectory] = React.useState(null); const plugins = usePlugins(); - const createPlugin = useCreatePlugin(); + const createPlugin = useInstallPlugin(); const refreshPlugins = useRefreshPlugins(); return (
@@ -31,7 +31,6 @@ export function SettingsPlugins() { - @@ -88,14 +87,10 @@ export function SettingsPlugins() { function PluginInfo({ plugin }: { plugin: Plugin }) { const pluginInfo = usePluginInfo(plugin.id); + const deletePlugin = useUninstallPlugin(plugin.id); return ( - - + @@ -105,6 +100,7 @@ function PluginInfo({ plugin }: { plugin: Plugin }) { icon="trash" title="Uninstall plugin" className="text-text-subtlest" + onClick={() => deletePlugin.mutate()} /> diff --git a/src-web/hooks/useHttpRequestActions.ts b/src-web/hooks/useHttpRequestActions.ts index 3189e4f2..e8ee9bcc 100644 --- a/src-web/hooks/useHttpRequestActions.ts +++ b/src-web/hooks/useHttpRequestActions.ts @@ -5,12 +5,13 @@ import type { HttpRequest, } from '@yaakapp/api'; import { invokeCmd } from '../lib/tauri'; -import { usePlugins } from './usePlugins'; +import { usePluginsKey } from './usePlugins'; export function useHttpRequestActions() { - const plugins = usePlugins(); + const pluginsKey = usePluginsKey(); + const httpRequestActions = useQuery({ - queryKey: ['http_request_actions', plugins.map((p) => p.updatedAt)], + queryKey: ['http_request_actions', pluginsKey], refetchOnWindowFocus: false, queryFn: async () => { const responses = (await invokeCmd( diff --git a/src-web/hooks/useCreatePlugin.ts b/src-web/hooks/useInstallPlugin.ts similarity index 69% rename from src-web/hooks/useCreatePlugin.ts rename to src-web/hooks/useInstallPlugin.ts index cb845da1..31d030a8 100644 --- a/src-web/hooks/useCreatePlugin.ts +++ b/src-web/hooks/useInstallPlugin.ts @@ -2,11 +2,11 @@ import { useMutation } from '@tanstack/react-query'; import { trackEvent } from '../lib/analytics'; import { invokeCmd } from '../lib/tauri'; -export function useCreatePlugin() { +export function useInstallPlugin() { return useMutation({ - mutationKey: ['create_plugin'], + mutationKey: ['install_plugin'], mutationFn: async (directory: string) => { - await invokeCmd('cmd_create_plugin', { directory }); + await invokeCmd('cmd_install_plugin', { directory }); }, onSettled: () => trackEvent('plugin', 'create'), }); diff --git a/src-web/hooks/usePluginInfo.ts b/src-web/hooks/usePluginInfo.ts index 6a109f50..081e592b 100644 --- a/src-web/hooks/usePluginInfo.ts +++ b/src-web/hooks/usePluginInfo.ts @@ -1,12 +1,12 @@ import { useQuery } from '@tanstack/react-query'; -import type { PluginBootResponse } from '@yaakapp/api'; +import type { BootResponse } from '@yaakapp/api'; import { invokeCmd } from '../lib/tauri'; export function usePluginInfo(id: string) { return useQuery({ queryKey: ['plugin_info', id], queryFn: async () => { - const info = (await invokeCmd('cmd_plugin_info', { id })) as PluginBootResponse; + const info = (await invokeCmd('cmd_plugin_info', { id })) as BootResponse; return info; }, }); diff --git a/src-web/hooks/usePlugins.ts b/src-web/hooks/usePlugins.ts index 0cb9f2ab..beba38bc 100644 --- a/src-web/hooks/usePlugins.ts +++ b/src-web/hooks/usePlugins.ts @@ -12,6 +12,12 @@ export function usePlugins() { return useAtomValue(pluginsAtom); } +export function usePluginsKey() { + return useAtomValue(pluginsAtom) + .map((p) => p.id + p.updatedAt) + .join(','); +} + /** * Reload all plugins and refresh the list of plugins */ diff --git a/src-web/hooks/useTemplateFunctions.ts b/src-web/hooks/useTemplateFunctions.ts index d3e077cc..4b2d5116 100644 --- a/src-web/hooks/useTemplateFunctions.ts +++ b/src-web/hooks/useTemplateFunctions.ts @@ -1,10 +1,13 @@ import { useQuery } from '@tanstack/react-query'; import type { GetTemplateFunctionsResponse } from '@yaakapp/api'; import { invokeCmd } from '../lib/tauri'; +import { usePluginsKey } from './usePlugins'; export function useTemplateFunctions() { + const pluginsKey = usePluginsKey(); + const result = useQuery({ - queryKey: ['template_functions'], + queryKey: ['template_functions', pluginsKey], queryFn: async () => { const responses = (await invokeCmd( 'cmd_template_functions', @@ -13,6 +16,5 @@ export function useTemplateFunctions() { }, }); - const fns = result.data?.flatMap((r) => r.functions) ?? []; - return fns; + return result.data?.flatMap((r) => r.functions) ?? []; } diff --git a/src-web/hooks/useUninstallPlugin.ts b/src-web/hooks/useUninstallPlugin.ts new file mode 100644 index 00000000..bf48d958 --- /dev/null +++ b/src-web/hooks/useUninstallPlugin.ts @@ -0,0 +1,14 @@ +import { useMutation } from '@tanstack/react-query'; +import type { Plugin } from '@yaakapp/api'; +import { trackEvent } from '../lib/analytics'; +import { invokeCmd } from '../lib/tauri'; + +export function useUninstallPlugin(pluginId: string) { + return useMutation({ + mutationKey: ['uninstall_plugin'], + mutationFn: async () => { + return invokeCmd('cmd_uninstall_plugin', { pluginId }); + }, + onSettled: () => trackEvent('plugin', 'delete'), + }); +} diff --git a/src-web/lib/tauri.ts b/src-web/lib/tauri.ts index 2b12182e..efdd0e7f 100644 --- a/src-web/lib/tauri.ts +++ b/src-web/lib/tauri.ts @@ -6,7 +6,6 @@ type TauriCmd = | 'cmd_check_for_updates' | 'cmd_create_cookie_jar' | 'cmd_create_environment' - | 'cmd_create_plugin' | 'cmd_template_tokens_to_string' | 'cmd_create_folder' | 'cmd_create_grpc_request' @@ -40,6 +39,7 @@ type TauriCmd = | 'cmd_grpc_reflect' | 'cmd_http_request_actions' | 'cmd_import_data' + | 'cmd_install_plugin' | 'cmd_list_cookie_jars' | 'cmd_list_environments' | 'cmd_list_folders' @@ -64,6 +64,7 @@ type TauriCmd = | 'cmd_set_update_mode' | 'cmd_template_functions' | 'cmd_track_event' + | 'cmd_uninstall_plugin' | 'cmd_update_cookie_jar' | 'cmd_update_environment' | 'cmd_update_folder'
Plugin Version
- null} /> - - {pluginInfo.data?.name} - {pluginInfo.data?.name} {pluginInfo.data?.version}