mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-19 16:21:25 +01:00
(feat) Add ability to disable plugins and show bundled plugins (#337)
This commit is contained in:
@@ -1351,8 +1351,8 @@ pub struct ListHttpRequestsResponse {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export, export_to = "gen_events.ts")]
|
||||
#[serde(default)]
|
||||
#[ts(export, type = "{}", export_to = "gen_events.ts")]
|
||||
pub struct ListFoldersRequest {}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
|
||||
@@ -55,7 +55,7 @@ pub async fn download_and_install<R: Runtime>(
|
||||
zip_extract::extract(Cursor::new(&bytes), &plugin_dir, true)?;
|
||||
info!("Extracted plugin {} to {}", plugin_version.id, plugin_dir_str);
|
||||
|
||||
plugin_manager.add_plugin_by_dir(&PluginContext::new(&window), &plugin_dir_str).await?;
|
||||
plugin_manager.add_plugin_by_dir(&PluginContext::new(&window), &plugin_dir_str, true).await?;
|
||||
|
||||
window.db().upsert_plugin(
|
||||
&Plugin {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
use crate::error::Error::{
|
||||
AuthPluginNotFound, ClientNotInitializedErr, PluginErr, PluginNotFoundErr, UnknownEventErr,
|
||||
self, AuthPluginNotFound, ClientNotInitializedErr, PluginErr, PluginNotFoundErr,
|
||||
UnknownEventErr,
|
||||
};
|
||||
use crate::error::Result;
|
||||
use crate::events::{
|
||||
@@ -35,10 +36,10 @@ use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::{Mutex, mpsc};
|
||||
use tokio::time::{Instant, timeout};
|
||||
use yaak_models::models::Environment;
|
||||
use yaak_models::models::{Environment, Plugin};
|
||||
use yaak_models::query_manager::QueryManagerExt;
|
||||
use yaak_models::render::make_vars_hashmap;
|
||||
use yaak_models::util::generate_id;
|
||||
use yaak_models::util::{UpdateSource, generate_id};
|
||||
use yaak_templates::error::Error::RenderError;
|
||||
use yaak_templates::error::Result as TemplateResult;
|
||||
use yaak_templates::{RenderErrorBehavior, RenderOptions, render_json_value_raw};
|
||||
@@ -46,18 +47,13 @@ use yaak_templates::{RenderErrorBehavior, RenderOptions, render_json_value_raw};
|
||||
#[derive(Clone)]
|
||||
pub struct PluginManager {
|
||||
subscribers: Arc<Mutex<HashMap<String, mpsc::Sender<InternalEvent>>>>,
|
||||
plugins: Arc<Mutex<Vec<PluginHandle>>>,
|
||||
plugin_handles: Arc<Mutex<Vec<PluginHandle>>>,
|
||||
kill_tx: tokio::sync::watch::Sender<bool>,
|
||||
ws_service: Arc<PluginRuntimeServerWebsocket>,
|
||||
vendored_plugin_dir: PathBuf,
|
||||
pub(crate) installed_plugin_dir: PathBuf,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct PluginCandidate {
|
||||
dir: String,
|
||||
}
|
||||
|
||||
impl PluginManager {
|
||||
pub fn new<R: Runtime>(app_handle: AppHandle<R>) -> PluginManager {
|
||||
let (events_tx, mut events_rx) = mpsc::channel(128);
|
||||
@@ -80,7 +76,7 @@ impl PluginManager {
|
||||
.join("installed-plugins");
|
||||
|
||||
let plugin_manager = PluginManager {
|
||||
plugins: Default::default(),
|
||||
plugin_handles: Default::default(),
|
||||
subscribers: Default::default(),
|
||||
ws_service: Arc::new(ws_service.clone()),
|
||||
kill_tx: kill_server_tx,
|
||||
@@ -109,7 +105,7 @@ impl PluginManager {
|
||||
|
||||
// Handle when client plugin runtime disconnects
|
||||
tauri::async_runtime::spawn(async move {
|
||||
while let Some(_) = client_disconnect_rx.recv().await {
|
||||
while (client_disconnect_rx.recv().await).is_some() {
|
||||
// Happens when the app is closed
|
||||
info!("Plugin runtime client disconnected");
|
||||
}
|
||||
@@ -163,10 +159,10 @@ impl PluginManager {
|
||||
plugin_manager
|
||||
}
|
||||
|
||||
async fn list_plugin_dirs<R: Runtime>(
|
||||
async fn list_available_plugins<R: Runtime>(
|
||||
&self,
|
||||
app_handle: &AppHandle<R>,
|
||||
) -> Vec<PluginCandidate> {
|
||||
) -> Result<Vec<Plugin>> {
|
||||
let plugins_dir = if is_dev() {
|
||||
// Use plugins directly for easy development
|
||||
env::current_dir()
|
||||
@@ -178,18 +174,27 @@ impl PluginManager {
|
||||
|
||||
info!("Loading bundled plugins from {plugins_dir:?}");
|
||||
|
||||
let bundled_plugin_dirs: Vec<PluginCandidate> = read_plugins_dir(&plugins_dir)
|
||||
// Read bundled plugin directories from disk
|
||||
let bundled_plugin_dirs: Vec<String> = read_plugins_dir(&plugins_dir)
|
||||
.await
|
||||
.expect(format!("Failed to read plugins dir: {:?}", plugins_dir).as_str())
|
||||
.iter()
|
||||
.map(|d| PluginCandidate { dir: d.into() })
|
||||
.collect();
|
||||
.expect(&format!("Failed to read plugins dir: {:?}", plugins_dir));
|
||||
|
||||
let plugins = app_handle.db().list_plugins().unwrap_or_default();
|
||||
let installed_plugin_dirs: Vec<PluginCandidate> =
|
||||
plugins.iter().map(|p| PluginCandidate { dir: p.directory.to_owned() }).collect();
|
||||
// Ensure all bundled plugins make it into the database
|
||||
for dir in &bundled_plugin_dirs {
|
||||
if app_handle.db().get_plugin_by_directory(dir).is_none() {
|
||||
app_handle.db().upsert_plugin(
|
||||
&Plugin {
|
||||
directory: dir.clone(),
|
||||
enabled: true,
|
||||
url: None,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Background,
|
||||
)?;
|
||||
}
|
||||
}
|
||||
|
||||
[bundled_plugin_dirs, installed_plugin_dirs].concat()
|
||||
Ok(app_handle.db().list_plugins()?)
|
||||
}
|
||||
|
||||
pub async fn uninstall(&self, plugin_context: &PluginContext, dir: &str) -> Result<()> {
|
||||
@@ -202,16 +207,18 @@ impl PluginManager {
|
||||
plugin_context: &PluginContext,
|
||||
plugin: &PluginHandle,
|
||||
) -> Result<()> {
|
||||
// Terminate the plugin
|
||||
self.send_to_plugin_and_wait(
|
||||
plugin_context,
|
||||
plugin,
|
||||
&InternalEventPayload::TerminateRequest,
|
||||
)
|
||||
.await?;
|
||||
// Terminate the plugin if it's enabled
|
||||
if plugin.enabled {
|
||||
self.send_to_plugin_and_wait(
|
||||
plugin_context,
|
||||
plugin,
|
||||
&InternalEventPayload::TerminateRequest,
|
||||
)
|
||||
.await?;
|
||||
}
|
||||
|
||||
// Remove the plugin from the list
|
||||
let mut plugins = self.plugins.lock().await;
|
||||
let mut plugins = self.plugin_handles.lock().await;
|
||||
let pos = plugins.iter().position(|p| p.ref_id == plugin.ref_id);
|
||||
if let Some(pos) = pos {
|
||||
plugins.remove(pos);
|
||||
@@ -220,7 +227,12 @@ impl PluginManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn add_plugin_by_dir(&self, plugin_context: &PluginContext, dir: &str) -> Result<()> {
|
||||
pub async fn add_plugin_by_dir(
|
||||
&self,
|
||||
plugin_context: &PluginContext,
|
||||
dir: &str,
|
||||
enabled: bool,
|
||||
) -> Result<()> {
|
||||
info!("Adding plugin by dir {dir}");
|
||||
|
||||
let maybe_tx = self.ws_service.app_to_plugin_events_tx.lock().await;
|
||||
@@ -228,32 +240,32 @@ impl PluginManager {
|
||||
None => return Err(ClientNotInitializedErr),
|
||||
Some(tx) => tx,
|
||||
};
|
||||
let plugin_handle = PluginHandle::new(dir, tx.clone())?;
|
||||
let plugin_handle = PluginHandle::new(dir, enabled, tx.clone())?;
|
||||
let dir_path = Path::new(dir);
|
||||
let is_vendored = dir_path.starts_with(self.vendored_plugin_dir.as_path());
|
||||
let is_installed = dir_path.starts_with(self.installed_plugin_dir.as_path());
|
||||
|
||||
// Boot the plugin
|
||||
let event = timeout(
|
||||
Duration::from_secs(5),
|
||||
self.send_to_plugin_and_wait(
|
||||
plugin_context,
|
||||
&plugin_handle,
|
||||
&InternalEventPayload::BootRequest(BootRequest {
|
||||
dir: dir.to_string(),
|
||||
watch: !is_vendored && !is_installed,
|
||||
}),
|
||||
),
|
||||
)
|
||||
.await??;
|
||||
// Boot the plugin if it's enabled
|
||||
if enabled {
|
||||
let event = self
|
||||
.send_to_plugin_and_wait(
|
||||
plugin_context,
|
||||
&plugin_handle,
|
||||
&InternalEventPayload::BootRequest(BootRequest {
|
||||
dir: dir.to_string(),
|
||||
watch: !is_vendored && !is_installed,
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if !matches!(event.payload, InternalEventPayload::BootResponse) {
|
||||
return Err(UnknownEventErr);
|
||||
if !matches!(event.payload, InternalEventPayload::BootResponse) {
|
||||
return Err(UnknownEventErr);
|
||||
}
|
||||
}
|
||||
|
||||
let mut plugins = self.plugins.lock().await;
|
||||
plugins.retain(|p| p.dir != dir);
|
||||
plugins.push(plugin_handle.clone());
|
||||
let mut plugin_handles = self.plugin_handles.lock().await;
|
||||
plugin_handles.retain(|p| p.dir != dir);
|
||||
plugin_handles.push(plugin_handle.clone());
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -263,22 +275,24 @@ impl PluginManager {
|
||||
app_handle: &AppHandle<R>,
|
||||
plugin_context: &PluginContext,
|
||||
) -> Result<()> {
|
||||
info!("Initializing all plugins");
|
||||
let start = Instant::now();
|
||||
let candidates = self.list_plugin_dirs(app_handle).await;
|
||||
for candidate in candidates.clone() {
|
||||
// First remove the plugin if it exists
|
||||
if let Some(plugin) = self.get_plugin_by_dir(candidate.dir.as_str()).await {
|
||||
if let Err(e) = self.remove_plugin(plugin_context, &plugin).await {
|
||||
error!("Failed to remove plugin {} {e:?}", candidate.dir);
|
||||
for plugin in self.list_available_plugins(app_handle).await?.clone() {
|
||||
// First remove the plugin if it exists and is enabled
|
||||
if let Some(plugin_handle) = self.get_plugin_by_dir(&plugin.directory).await {
|
||||
if let Err(e) = self.remove_plugin(plugin_context, &plugin_handle).await {
|
||||
error!("Failed to remove plugin {} {e:?}", plugin.directory);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Err(e) = self.add_plugin_by_dir(plugin_context, candidate.dir.as_str()).await {
|
||||
warn!("Failed to add plugin {} {e:?}", candidate.dir);
|
||||
if let Err(e) =
|
||||
self.add_plugin_by_dir(plugin_context, &plugin.directory, plugin.enabled).await
|
||||
{
|
||||
warn!("Failed to add plugin {} {e:?}", plugin.directory);
|
||||
}
|
||||
}
|
||||
|
||||
let plugins = self.plugins.lock().await;
|
||||
let plugins = self.plugin_handles.lock().await;
|
||||
let names = plugins.iter().map(|p| p.dir.to_string()).collect::<Vec<String>>();
|
||||
info!(
|
||||
"Initialized {} plugins in {:?}:\n - {}",
|
||||
@@ -324,15 +338,15 @@ impl PluginManager {
|
||||
}
|
||||
|
||||
pub async fn get_plugin_by_ref_id(&self, ref_id: &str) -> Option<PluginHandle> {
|
||||
self.plugins.lock().await.iter().find(|p| p.ref_id == ref_id).cloned()
|
||||
self.plugin_handles.lock().await.iter().find(|p| p.ref_id == ref_id).cloned()
|
||||
}
|
||||
|
||||
pub async fn get_plugin_by_dir(&self, dir: &str) -> Option<PluginHandle> {
|
||||
self.plugins.lock().await.iter().find(|p| p.dir == dir).cloned()
|
||||
self.plugin_handles.lock().await.iter().find(|p| p.dir == dir).cloned()
|
||||
}
|
||||
|
||||
pub async fn get_plugin_by_name(&self, name: &str) -> Option<PluginHandle> {
|
||||
for plugin in self.plugins.lock().await.iter().cloned() {
|
||||
for plugin in self.plugin_handles.lock().await.iter().cloned() {
|
||||
let info = plugin.info();
|
||||
if info.name == name {
|
||||
return Some(plugin);
|
||||
@@ -347,9 +361,19 @@ impl PluginManager {
|
||||
plugin: &PluginHandle,
|
||||
payload: &InternalEventPayload,
|
||||
) -> Result<InternalEvent> {
|
||||
if !plugin.enabled {
|
||||
return Err(Error::PluginErr(format!("Plugin {} is disabled", plugin.metadata.name)));
|
||||
}
|
||||
|
||||
let events =
|
||||
self.send_to_plugins_and_wait(plugin_context, payload, vec![plugin.to_owned()]).await?;
|
||||
Ok(events.first().unwrap().to_owned())
|
||||
Ok(events
|
||||
.first()
|
||||
.ok_or(Error::PluginErr(format!(
|
||||
"No plugin events returned for: {}",
|
||||
plugin.metadata.name
|
||||
)))?
|
||||
.to_owned())
|
||||
}
|
||||
|
||||
async fn send_and_wait(
|
||||
@@ -357,7 +381,7 @@ impl PluginManager {
|
||||
plugin_context: &PluginContext,
|
||||
payload: &InternalEventPayload,
|
||||
) -> Result<Vec<InternalEvent>> {
|
||||
let plugins = { self.plugins.lock().await.clone() };
|
||||
let plugins = { self.plugin_handles.lock().await.clone() };
|
||||
self.send_to_plugins_and_wait(plugin_context, payload, plugins).await
|
||||
}
|
||||
|
||||
@@ -373,6 +397,7 @@ impl PluginManager {
|
||||
// 1. Build the events with IDs and everything
|
||||
let events_to_send = plugins
|
||||
.iter()
|
||||
.filter(|p| p.enabled)
|
||||
.map(|p| p.build_event_to_send(plugin_context, payload, None))
|
||||
.collect::<Vec<InternalEvent>>();
|
||||
|
||||
@@ -383,19 +408,28 @@ impl PluginManager {
|
||||
tokio::spawn(async move {
|
||||
let mut found_events = Vec::new();
|
||||
|
||||
while let Some(event) = rx.recv().await {
|
||||
let matched_sent_event = events_to_send
|
||||
.iter()
|
||||
.find(|e| Some(e.id.to_owned()) == event.reply_id)
|
||||
.is_some();
|
||||
if matched_sent_event {
|
||||
found_events.push(event.clone());
|
||||
};
|
||||
let collect_events = async {
|
||||
while let Some(event) = rx.recv().await {
|
||||
let matched_sent_event =
|
||||
events_to_send.iter().any(|e| Some(e.id.to_owned()) == event.reply_id);
|
||||
if matched_sent_event {
|
||||
found_events.push(event.clone());
|
||||
};
|
||||
|
||||
let found_them_all = found_events.len() == events_to_send.len();
|
||||
if found_them_all {
|
||||
break;
|
||||
let found_them_all = found_events.len() == events_to_send.len();
|
||||
if found_them_all {
|
||||
break;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Timeout after 10 seconds to prevent hanging forever if plugin doesn't respond
|
||||
if timeout(Duration::from_secs(5), collect_events).await.is_err() {
|
||||
warn!(
|
||||
"Timeout waiting for plugin responses. Got {}/{} responses",
|
||||
found_events.len(),
|
||||
events_to_send.len()
|
||||
);
|
||||
}
|
||||
|
||||
found_events
|
||||
@@ -586,7 +620,7 @@ impl PluginManager {
|
||||
// We don't want to fail for this op because the UI will not be able to list any auth types then
|
||||
let render_opt = RenderOptions { error_behavior: RenderErrorBehavior::ReturnEmpty };
|
||||
let rendered_values = render_json_value_raw(json!(values), vars, &cb, &render_opt).await?;
|
||||
let context_id = format!("{:x}", md5::compute(model_id.to_string()));
|
||||
let context_id = format!("{:x}", md5::compute(model_id));
|
||||
|
||||
let event = self
|
||||
.send_to_plugin_and_wait(
|
||||
@@ -754,7 +788,7 @@ impl PluginManager {
|
||||
// We don't want to fail for this op because the UI will not be able to list any auth types then
|
||||
let render_opt = RenderOptions { error_behavior: RenderErrorBehavior::ReturnEmpty };
|
||||
let rendered_values = render_json_value_raw(json!(values), vars, &cb, &render_opt).await?;
|
||||
let context_id = format!("{:x}", md5::compute(model_id.to_string()));
|
||||
let context_id = format!("{:x}", md5::compute(model_id));
|
||||
let event = self
|
||||
.send_to_plugin_and_wait(
|
||||
&PluginContext::new(window),
|
||||
@@ -804,7 +838,7 @@ impl PluginManager {
|
||||
.find_map(|(p, r)| if r.name == auth_name { Some(p) } else { None })
|
||||
.ok_or(PluginNotFoundErr(auth_name.into()))?;
|
||||
|
||||
let context_id = format!("{:x}", md5::compute(model_id.to_string()));
|
||||
let context_id = format!("{:x}", md5::compute(model_id));
|
||||
self.send_to_plugin_and_wait(
|
||||
&PluginContext::new(window),
|
||||
&plugin,
|
||||
@@ -831,7 +865,7 @@ impl PluginManager {
|
||||
plugin_context: &PluginContext,
|
||||
) -> Result<CallHttpAuthenticationResponse> {
|
||||
let disabled = match req.values.get("disabled") {
|
||||
Some(JsonPrimitive::Boolean(v)) => v.clone(),
|
||||
Some(JsonPrimitive::Boolean(v)) => *v,
|
||||
_ => false,
|
||||
};
|
||||
|
||||
|
||||
@@ -10,12 +10,13 @@ use tokio::sync::{Mutex, mpsc};
|
||||
pub struct PluginHandle {
|
||||
pub ref_id: String,
|
||||
pub dir: String,
|
||||
pub enabled: bool,
|
||||
pub(crate) to_plugin_tx: Arc<Mutex<mpsc::Sender<InternalEvent>>>,
|
||||
pub(crate) metadata: PluginMetadata,
|
||||
}
|
||||
|
||||
impl PluginHandle {
|
||||
pub fn new(dir: &str, tx: mpsc::Sender<InternalEvent>) -> Result<Self> {
|
||||
pub fn new(dir: &str, enabled: bool, tx: mpsc::Sender<InternalEvent>) -> Result<Self> {
|
||||
let ref_id = gen_id();
|
||||
let metadata = get_plugin_meta(&Path::new(dir))?;
|
||||
|
||||
@@ -23,6 +24,7 @@ impl PluginHandle {
|
||||
ref_id: ref_id.clone(),
|
||||
dir: dir.to_string(),
|
||||
to_plugin_tx: Arc::new(Mutex::new(tx)),
|
||||
enabled,
|
||||
metadata,
|
||||
})
|
||||
}
|
||||
|
||||
@@ -73,10 +73,17 @@ impl PluginRuntimeServerWebsocket {
|
||||
|
||||
// Skip non-text messages
|
||||
if !msg.is_text() {
|
||||
return;
|
||||
warn!("Received non-text message from plugin runtime");
|
||||
continue;
|
||||
}
|
||||
|
||||
let msg_text = msg.into_text().unwrap();
|
||||
let msg_text = match msg.into_text() {
|
||||
Ok(text) => text,
|
||||
Err(e) => {
|
||||
error!("Failed to convert message to text: {e:?}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let event = match serde_json::from_str::<InternalEventRawPayload>(&msg_text) {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
@@ -117,9 +124,18 @@ impl PluginRuntimeServerWebsocket {
|
||||
return;
|
||||
},
|
||||
Some(event) => {
|
||||
let event_bytes = serde_json::to_string(&event).unwrap();
|
||||
let event_bytes = match serde_json::to_string(&event) {
|
||||
Ok(bytes) => bytes,
|
||||
Err(e) => {
|
||||
error!("Failed to serialize event: {:?}", e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
let msg = Message::text(event_bytes);
|
||||
ws_sender.send(msg).await.unwrap();
|
||||
if let Err(e) = ws_sender.send(msg).await {
|
||||
error!("Failed to send message to plugin runtime: {:?}", e);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user