Fix recursive plugin call locking

This commit is contained in:
Gregory Schier
2024-08-23 13:20:48 -07:00
parent 7e194b9148
commit 8d3260f394
10 changed files with 57 additions and 53 deletions

View File

@@ -1,4 +1,4 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { CallTemplateFunctionPurpose } from "./CallTemplateFunctionPurpose"; import type { RenderPurpose } from "./RenderPurpose";
export type CallTemplateFunctionArgs = { purpose: CallTemplateFunctionPurpose, values: { [key: string]: string }, }; export type CallTemplateFunctionArgs = { purpose: RenderPurpose, values: { [key: string]: string }, };

View File

@@ -1,4 +1,5 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
import type { HttpRequest } from "./HttpRequest"; import type { HttpRequest } from "./HttpRequest";
import type { RenderPurpose } from "./RenderPurpose";
export type RenderHttpRequestRequest = { httpRequest: HttpRequest, }; export type RenderHttpRequestRequest = { httpRequest: HttpRequest, purpose: RenderPurpose, };

View File

@@ -1,3 +1,3 @@
// This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually. // This file was generated by [ts-rs](https://github.com/Aleph-Alpha/ts-rs). Do not edit this file manually.
export type CallTemplateFunctionPurpose = "send" | "preview"; export type RenderPurpose = "send" | "preview";

View File

@@ -5,7 +5,6 @@ export type * from './themes';
export * from './gen/BootRequest'; export * from './gen/BootRequest';
export * from './gen/BootResponse'; export * from './gen/BootResponse';
export * from './gen/CallHttpRequestActionArgs'; export * from './gen/CallHttpRequestActionArgs';
export * from './gen/CallTemplateFunctionPurpose';
export * from './gen/CallHttpRequestActionRequest'; export * from './gen/CallHttpRequestActionRequest';
export * from './gen/CallTemplateFunctionRequest'; export * from './gen/CallTemplateFunctionRequest';
export * from './gen/CallTemplateFunctionResponse'; export * from './gen/CallTemplateFunctionResponse';
@@ -48,6 +47,7 @@ export * from './gen/KeyValue';
export * from './gen/Model'; export * from './gen/Model';
export * from './gen/RenderHttpRequestRequest'; export * from './gen/RenderHttpRequestRequest';
export * from './gen/RenderHttpRequestResponse'; export * from './gen/RenderHttpRequestResponse';
export * from './gen/RenderPurpose';
export * from './gen/SendHttpRequestRequest'; export * from './gen/SendHttpRequestRequest';
export * from './gen/SendHttpRequestResponse'; export * from './gen/SendHttpRequestResponse';
export * from './gen/SendHttpRequestResponse'; export * from './gen/SendHttpRequestResponse';

View File

@@ -64,6 +64,9 @@ new Promise<void>(async (resolve, reject) => {
} }
function sendEvent(event: InternalEvent) { function sendEvent(event: InternalEvent) {
if (event.payload.type !== 'empty_response') {
console.log('Sending event to app', event.id, event.payload.type);
}
parentPort!.postMessage(event); parentPort!.postMessage(event);
} }
@@ -77,8 +80,8 @@ new Promise<void>(async (resolve, reject) => {
const promise = new Promise<InternalEventPayload>(async (resolve) => { const promise = new Promise<InternalEventPayload>(async (resolve) => {
const cb = (event: InternalEvent) => { const cb = (event: InternalEvent) => {
if (event.replyId === eventToSend.id) { if (event.replyId === eventToSend.id) {
resolve(event.payload); // Not type-safe but oh well
parentPort!.off('message', cb); // Unlisten, now that we're done parentPort!.off('message', cb); // Unlisten, now that we're done
resolve(event.payload); // Not type-safe but oh well
} }
}; };
parentPort!.on('message', cb); parentPort!.on('message', cb);
@@ -110,18 +113,18 @@ new Promise<void>(async (resolve, reject) => {
}, },
}, },
httpRequest: { httpRequest: {
async getById({ id }) { async getById(args) {
const payload = { type: 'get_http_request_by_id_request', id } as const; const payload = { type: 'get_http_request_by_id_request', ...args } as const;
const { httpRequest } = await sendAndWaitForReply<GetHttpRequestByIdResponse>(payload); const { httpRequest } = await sendAndWaitForReply<GetHttpRequestByIdResponse>(payload);
return httpRequest; return httpRequest;
}, },
async send({ httpRequest }) { async send(args) {
const payload = { type: 'send_http_request_request', httpRequest } as const; const payload = { type: 'send_http_request_request', ...args } as const;
const { httpResponse } = await sendAndWaitForReply<SendHttpRequestResponse>(payload); const { httpResponse } = await sendAndWaitForReply<SendHttpRequestResponse>(payload);
return httpResponse; return httpResponse;
}, },
async render({ httpRequest }) { async render(args) {
const payload = { type: 'render_http_request_request', httpRequest } as const; const payload = { type: 'render_http_request_request', ...args } as const;
const result = await sendAndWaitForReply<RenderHttpRequestResponse>(payload); const result = await sendAndWaitForReply<RenderHttpRequestResponse>(payload);
return result.httpRequest; return result.httpRequest;
}, },
@@ -130,8 +133,6 @@ new Promise<void>(async (resolve, reject) => {
// Message comes into the plugin to be processed // Message comes into the plugin to be processed
parentPort!.on('message', async ({ payload, id: replyId }: InternalEvent) => { parentPort!.on('message', async ({ payload, id: replyId }: InternalEvent) => {
console.log(`Received ${payload.type}`);
try { try {
if (payload.type === 'boot_request') { if (payload.type === 'boot_request') {
const payload: InternalEventPayload = { const payload: InternalEventPayload = {

View File

@@ -1942,24 +1942,21 @@ fn monitor_plugin_events<R: Runtime>(app_handle: &AppHandle<R>) {
let plugin_manager: State<'_, PluginManager> = app_handle.state(); let plugin_manager: State<'_, PluginManager> = app_handle.state();
let (_rx_id, mut rx) = plugin_manager.subscribe().await; let (_rx_id, mut rx) = plugin_manager.subscribe().await;
let app_handle = app_handle.clone();
while let Some(event) = rx.recv().await { while let Some(event) = rx.recv().await {
let payload = match handle_plugin_event(&app_handle, &event).await { let app_handle = app_handle.clone();
Some(e) => e,
None => continue, // We might have recursive back-and-forth calls between app and plugin, so we don't
}; // want to block here
if let Err(e) = plugin_manager.reply(&event, &payload).await { tauri::async_runtime::spawn(async move {
warn!("Failed to reply to plugin manager: {}", e) handle_plugin_event(&app_handle, &event).await;
} });
} }
}); });
} }
async fn handle_plugin_event<R: Runtime>( async fn handle_plugin_event<R: Runtime>(app_handle: &AppHandle<R>, event: &InternalEvent) {
app_handle: &AppHandle<R>, info!("Got event to app {}", event.id);
event: &InternalEvent, let response_event: Option<InternalEventPayload> = match event.clone().payload {
) -> Option<InternalEventPayload> {
let event = match event.clone().payload {
InternalEventPayload::CopyTextRequest(req) => { InternalEventPayload::CopyTextRequest(req) => {
app_handle app_handle
.clipboard() .clipboard()
@@ -1992,7 +1989,7 @@ async fn handle_plugin_event<R: Runtime>(
)) ))
} }
InternalEventPayload::RenderHttpRequestRequest(req) => { InternalEventPayload::RenderHttpRequestRequest(req) => {
let w = get_focused_window_no_lock(app_handle)?; let w = get_focused_window_no_lock(app_handle).expect("No focused window");
let workspace = get_workspace(app_handle, req.http_request.workspace_id.as_str()) let workspace = get_workspace(app_handle, req.http_request.workspace_id.as_str())
.await .await
.expect("Failed to get workspace for request"); .expect("Failed to get workspace for request");
@@ -2007,13 +2004,8 @@ async fn handle_plugin_event<R: Runtime>(
Some(id) => get_environment(&w, id.as_str()).await.ok(), Some(id) => get_environment(&w, id.as_str()).await.ok(),
}; };
let cb = &*app_handle.state::<PluginTemplateCallback>(); let cb = &*app_handle.state::<PluginTemplateCallback>();
let rendered_http_request = render_http_request( let rendered_http_request =
&req.http_request, render_http_request(&req.http_request, &workspace, environment.as_ref(), cb).await;
&workspace,
environment.as_ref(),
cb,
)
.await;
Some(InternalEventPayload::RenderHttpRequestResponse( Some(InternalEventPayload::RenderHttpRequestResponse(
RenderHttpRequestResponse { RenderHttpRequestResponse {
http_request: rendered_http_request, http_request: rendered_http_request,
@@ -2021,7 +2013,7 @@ async fn handle_plugin_event<R: Runtime>(
)) ))
} }
InternalEventPayload::SendHttpRequestRequest(req) => { InternalEventPayload::SendHttpRequestRequest(req) => {
let w = get_focused_window_no_lock(app_handle)?; let w = get_focused_window_no_lock(app_handle).expect("No focused window");
let url = w.url().unwrap(); let url = w.url().unwrap();
let mut query_pairs = url.query_pairs(); let mut query_pairs = url.query_pairs();
@@ -2057,7 +2049,7 @@ async fn handle_plugin_event<R: Runtime>(
let http_response = match result { let http_response = match result {
Ok(r) => r, Ok(r) => r,
Err(_e) => return None, Err(_e) => return,
}; };
Some(InternalEventPayload::SendHttpRequestResponse( Some(InternalEventPayload::SendHttpRequestResponse(
@@ -2067,7 +2059,12 @@ async fn handle_plugin_event<R: Runtime>(
_ => None, _ => None,
}; };
event if let Some(e) = response_event {
let plugin_manager: State<'_, PluginManager> = app_handle.state();
if let Err(e) = plugin_manager.reply(&event, &e).await {
warn!("Failed to reply to plugin manager: {}", e)
}
}
} }
// app_handle.get_focused_window locks, so this one is a non-locking version, safe for use in async context // app_handle.get_focused_window locks, so this one is a non-locking version, safe for use in async context

View File

@@ -1,23 +1,23 @@
use std::collections::HashMap; use std::collections::HashMap;
use tauri::{AppHandle, Manager}; use tauri::{AppHandle, Manager};
use yaak_plugin_runtime::events::CallTemplateFunctionPurpose; use yaak_plugin_runtime::events::RenderPurpose;
use yaak_plugin_runtime::manager::PluginManager; use yaak_plugin_runtime::manager::PluginManager;
use yaak_templates::TemplateCallback; use yaak_templates::TemplateCallback;
#[derive(Clone)] #[derive(Clone)]
pub struct PluginTemplateCallback { pub struct PluginTemplateCallback {
app_handle: AppHandle, app_handle: AppHandle,
purpose: CallTemplateFunctionPurpose, purpose: RenderPurpose,
} }
impl PluginTemplateCallback { impl PluginTemplateCallback {
pub fn new(app_handle: AppHandle) -> PluginTemplateCallback { pub fn new(app_handle: AppHandle) -> PluginTemplateCallback {
PluginTemplateCallback { app_handle, purpose: CallTemplateFunctionPurpose::Preview } PluginTemplateCallback { app_handle, purpose: RenderPurpose::Preview }
} }
pub fn for_send(&self) -> PluginTemplateCallback { pub fn for_send(&self) -> PluginTemplateCallback {
let mut v = self.clone(); let mut v = self.clone();
v.purpose = CallTemplateFunctionPurpose::Send; v.purpose = RenderPurpose::Send;
v v
} }
} }

View File

@@ -153,6 +153,7 @@ pub struct CopyTextRequest {
#[ts(export)] #[ts(export)]
pub struct RenderHttpRequestRequest { pub struct RenderHttpRequestRequest {
pub http_request: HttpRequest, pub http_request: HttpRequest,
pub purpose: RenderPurpose,
} }
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)] #[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
@@ -289,21 +290,21 @@ pub struct CallTemplateFunctionResponse {
#[serde(default, rename_all = "camelCase")] #[serde(default, rename_all = "camelCase")]
#[ts(export)] #[ts(export)]
pub struct CallTemplateFunctionArgs { pub struct CallTemplateFunctionArgs {
pub purpose: CallTemplateFunctionPurpose, pub purpose: RenderPurpose,
pub values: HashMap<String, String>, pub values: HashMap<String, String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, TS)] #[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
#[ts(export)] #[ts(export)]
pub enum CallTemplateFunctionPurpose { pub enum RenderPurpose {
Send, Send,
Preview, Preview,
} }
impl Default for CallTemplateFunctionPurpose { impl Default for RenderPurpose {
fn default() -> Self { fn default() -> Self {
CallTemplateFunctionPurpose::Preview RenderPurpose::Preview
} }
} }

View File

@@ -1,6 +1,6 @@
use crate::error::Result; use crate::error::Result;
use crate::events::{ use crate::events::{
CallHttpRequestActionRequest, CallTemplateFunctionArgs, CallTemplateFunctionPurpose, CallHttpRequestActionRequest, CallTemplateFunctionArgs, RenderPurpose,
CallTemplateFunctionRequest, CallTemplateFunctionResponse, FilterRequest, FilterResponse, CallTemplateFunctionRequest, CallTemplateFunctionResponse, FilterRequest, FilterResponse,
GetHttpRequestActionsRequest, GetHttpRequestActionsResponse, GetTemplateFunctionsResponse, GetHttpRequestActionsRequest, GetHttpRequestActionsResponse, GetTemplateFunctionsResponse,
ImportRequest, ImportResponse, InternalEvent, InternalEventPayload, ImportRequest, ImportResponse, InternalEvent, InternalEventPayload,
@@ -115,7 +115,7 @@ impl PluginManager {
&self, &self,
fn_name: &str, fn_name: &str,
args: HashMap<String, String>, args: HashMap<String, String>,
purpose: CallTemplateFunctionPurpose, purpose: RenderPurpose,
) -> Result<Option<String>> { ) -> Result<Option<String>> {
let req = CallTemplateFunctionRequest { let req = CallTemplateFunctionRequest {
name: fn_name.to_string(), name: fn_name.to_string(),

View File

@@ -53,7 +53,11 @@ impl PluginHandle {
} }
pub async fn send(&self, event: &InternalEvent) -> Result<()> { pub async fn send(&self, event: &InternalEvent) -> Result<()> {
info!("Sending event {} {:?}", event.id, self.name().await); info!(
"Sending event to plugin {} {:?}",
event.id,
self.name().await
);
self.to_plugin_tx self.to_plugin_tx
.lock() .lock()
.await .await
@@ -90,9 +94,9 @@ impl PluginRuntimeGrpcServer {
pub async fn subscribe(&self) -> (String, Receiver<InternalEvent>) { pub async fn subscribe(&self) -> (String, Receiver<InternalEvent>) {
let (tx, rx) = mpsc::channel(128); let (tx, rx) = mpsc::channel(128);
let id = generate_id(); let rx_id = generate_id();
self.subscribers.lock().await.insert(id.clone(), tx); self.subscribers.lock().await.insert(rx_id.clone(), tx);
(id, rx) (rx_id, rx)
} }
pub async fn unsubscribe(&self, rx_id: &str) { pub async fn unsubscribe(&self, rx_id: &str) {
@@ -394,7 +398,7 @@ impl PluginRuntime for PluginRuntimeGrpcServer {
for tx in subscribers.values() { for tx in subscribers.values() {
// Emit event to the channel for server to handle // Emit event to the channel for server to handle
if let Err(e) = tx.try_send(event.clone()) { if let Err(e) = tx.try_send(event.clone()) {
println!("Failed to send to server channel. Receiver probably isn't listening: {:?}", e); println!("Failed to send to server channel (n={}). Receiver probably isn't listening: {:?}", subscribers.len(), e);
} }
} }