mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-27 03:41:11 +01:00
Improve plugin source modeling and runtime dedup (#414)
This commit is contained in:
@@ -7,7 +7,7 @@ use serde::{Deserialize, Serialize};
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
use ts_rs::TS;
|
||||
use yaak_models::models::Plugin;
|
||||
use yaak_models::models::{Plugin, PluginSource};
|
||||
|
||||
/// Get plugin info from the registry.
|
||||
pub async fn get_plugin(
|
||||
@@ -58,7 +58,7 @@ pub async fn check_plugin_updates(
|
||||
) -> Result<PluginUpdatesResponse> {
|
||||
let name_versions: Vec<PluginNameVersion> = plugins
|
||||
.into_iter()
|
||||
.filter(|p| p.url.is_some()) // Only check plugins with URLs (from registry)
|
||||
.filter(|p| matches!(p.source, PluginSource::Registry)) // Only check registry-installed plugins
|
||||
.filter_map(|p| match get_plugin_meta(&Path::new(&p.directory)) {
|
||||
Ok(m) => Some(PluginNameVersion { name: m.name, version: m.version }),
|
||||
Err(e) => {
|
||||
|
||||
@@ -9,7 +9,7 @@ use log::info;
|
||||
use std::fs::{create_dir_all, remove_dir_all};
|
||||
use std::io::Cursor;
|
||||
use std::sync::Arc;
|
||||
use yaak_models::models::Plugin;
|
||||
use yaak_models::models::{Plugin, PluginSource};
|
||||
use yaak_models::query_manager::QueryManager;
|
||||
use yaak_models::util::UpdateSource;
|
||||
|
||||
@@ -78,6 +78,7 @@ pub async fn download_and_install(
|
||||
directory: plugin_dir_str.clone(),
|
||||
enabled: true,
|
||||
url: Some(plugin_version.url.clone()),
|
||||
source: PluginSource::Registry,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Background,
|
||||
|
||||
@@ -21,9 +21,10 @@ use crate::events::{
|
||||
use crate::native_template_functions::{template_function_keyring, template_function_secure};
|
||||
use crate::nodejs::start_nodejs_plugin_runtime;
|
||||
use crate::plugin_handle::PluginHandle;
|
||||
use crate::plugin_meta::get_plugin_meta;
|
||||
use crate::server_ws::PluginRuntimeServerWebsocket;
|
||||
use log::{error, info, warn};
|
||||
use std::collections::HashMap;
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::env;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
@@ -33,7 +34,7 @@ use tokio::net::TcpListener;
|
||||
use tokio::sync::mpsc::error::TrySendError;
|
||||
use tokio::sync::{Mutex, mpsc, oneshot};
|
||||
use tokio::time::{Instant, timeout};
|
||||
use yaak_models::models::Plugin;
|
||||
use yaak_models::models::{Plugin, PluginSource};
|
||||
use yaak_models::query_manager::QueryManager;
|
||||
use yaak_models::util::{UpdateSource, generate_id};
|
||||
use yaak_templates::error::Error::RenderError;
|
||||
@@ -162,13 +163,14 @@ impl PluginManager {
|
||||
|
||||
let bundled_dirs = plugin_manager.list_bundled_plugin_dirs().await?;
|
||||
let db = query_manager.connect();
|
||||
for dir in bundled_dirs {
|
||||
if db.get_plugin_by_directory(&dir).is_none() {
|
||||
for dir in &bundled_dirs {
|
||||
if db.get_plugin_by_directory(dir).is_none() {
|
||||
db.upsert_plugin(
|
||||
&Plugin {
|
||||
directory: dir,
|
||||
directory: dir.clone(),
|
||||
enabled: true,
|
||||
url: None,
|
||||
source: PluginSource::Bundled,
|
||||
..Default::default()
|
||||
},
|
||||
&UpdateSource::Background,
|
||||
@@ -213,6 +215,57 @@ impl PluginManager {
|
||||
read_plugins_dir(&plugins_dir).await
|
||||
}
|
||||
|
||||
pub async fn resolve_plugins_for_runtime_from_db(&self, plugins: Vec<Plugin>) -> Vec<Plugin> {
|
||||
let bundled_dirs = match self.list_bundled_plugin_dirs().await {
|
||||
Ok(dirs) => dirs,
|
||||
Err(err) => {
|
||||
warn!("Failed to read bundled plugin dirs for resolution: {err:?}");
|
||||
Vec::new()
|
||||
}
|
||||
};
|
||||
self.resolve_plugins_for_runtime(plugins, bundled_dirs)
|
||||
}
|
||||
|
||||
/// Resolve the plugin set for the current runtime instance.
|
||||
///
|
||||
/// Rules:
|
||||
/// - Drop bundled rows that are not present in this instance's bundled directory list.
|
||||
/// - Deduplicate by plugin metadata name (fallback to directory key when metadata is unreadable).
|
||||
/// - Prefer sources in this order: filesystem > registry > bundled.
|
||||
/// - For same-source conflicts, prefer the most recently installed row (`created_at`).
|
||||
fn resolve_plugins_for_runtime(
|
||||
&self,
|
||||
plugins: Vec<Plugin>,
|
||||
bundled_dirs: Vec<String>,
|
||||
) -> Vec<Plugin> {
|
||||
let bundled_dir_set: HashSet<String> = bundled_dirs.into_iter().collect();
|
||||
let mut selected: HashMap<String, Plugin> = HashMap::new();
|
||||
|
||||
for plugin in plugins {
|
||||
if matches!(plugin.source, PluginSource::Bundled)
|
||||
&& !bundled_dir_set.contains(&plugin.directory)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
let key = match get_plugin_meta(Path::new(&plugin.directory)) {
|
||||
Ok(meta) => meta.name,
|
||||
Err(_) => format!("__dir__{}", plugin.directory),
|
||||
};
|
||||
|
||||
match selected.get(&key) {
|
||||
Some(existing) if !prefer_plugin(&plugin, existing) => {}
|
||||
_ => {
|
||||
selected.insert(key, plugin);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut resolved = selected.into_values().collect::<Vec<_>>();
|
||||
resolved.sort_by(|a, b| b.created_at.cmp(&a.created_at));
|
||||
resolved
|
||||
}
|
||||
|
||||
pub async fn uninstall(&self, plugin_context: &PluginContext, dir: &str) -> Result<()> {
|
||||
let plugin = self.get_plugin_by_dir(dir).await.ok_or(PluginNotFoundErr(dir.to_string()))?;
|
||||
self.remove_plugin(plugin_context, &plugin).await
|
||||
@@ -287,7 +340,8 @@ impl PluginManager {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize all plugins from the provided list.
|
||||
/// Initialize all plugins from the provided DB list.
|
||||
/// Plugin candidates are resolved for this runtime instance before initialization.
|
||||
/// Returns a list of (plugin_directory, error_message) for any plugins that failed to initialize.
|
||||
pub async fn initialize_all_plugins(
|
||||
&self,
|
||||
@@ -297,15 +351,18 @@ impl PluginManager {
|
||||
info!("Initializing all plugins");
|
||||
let start = Instant::now();
|
||||
let mut errors = Vec::new();
|
||||
let plugins = self.resolve_plugins_for_runtime_from_db(plugins).await;
|
||||
|
||||
// Rebuild runtime handles from scratch to avoid stale/duplicate handles.
|
||||
let existing_handles = { self.plugin_handles.lock().await.clone() };
|
||||
for plugin_handle in existing_handles {
|
||||
if let Err(e) = self.remove_plugin(plugin_context, &plugin_handle).await {
|
||||
error!("Failed to remove plugin {} {e:?}", plugin_handle.dir);
|
||||
errors.push((plugin_handle.dir.clone(), e.to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
for plugin in plugins {
|
||||
// First remove the plugin if it exists and is enabled
|
||||
if let Some(plugin_handle) = self.get_plugin_by_dir(&plugin.directory).await {
|
||||
if let Err(e) = self.remove_plugin(plugin_context, &plugin_handle).await {
|
||||
error!("Failed to remove plugin {} {e:?}", plugin.directory);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if let Err(e) = self.add_plugin(plugin_context, &plugin).await {
|
||||
warn!("Failed to add plugin {} {e:?}", plugin.directory);
|
||||
errors.push((plugin.directory.clone(), e.to_string()));
|
||||
@@ -1063,6 +1120,24 @@ impl PluginManager {
|
||||
}
|
||||
}
|
||||
|
||||
fn source_priority(source: &PluginSource) -> i32 {
|
||||
match source {
|
||||
PluginSource::Filesystem => 3,
|
||||
PluginSource::Registry => 2,
|
||||
PluginSource::Bundled => 1,
|
||||
}
|
||||
}
|
||||
|
||||
fn prefer_plugin(candidate: &Plugin, existing: &Plugin) -> bool {
|
||||
let candidate_priority = source_priority(&candidate.source);
|
||||
let existing_priority = source_priority(&existing.source);
|
||||
if candidate_priority != existing_priority {
|
||||
return candidate_priority > existing_priority;
|
||||
}
|
||||
|
||||
candidate.created_at > existing.created_at
|
||||
}
|
||||
|
||||
async fn read_plugins_dir(dir: &PathBuf) -> Result<Vec<String>> {
|
||||
let mut result = read_dir(dir).await?;
|
||||
let mut dirs: Vec<String> = vec![];
|
||||
|
||||
Reference in New Issue
Block a user