Reload plugins on change

This commit is contained in:
Gregory Schier
2024-09-09 11:34:52 -07:00
parent 3bf192953d
commit c0707bb246
18 changed files with 240 additions and 118 deletions

View File

@@ -2,7 +2,10 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use ts_rs::TS;
use yaak_models::models::{CookieJar, Environment, Folder, GrpcConnection, GrpcEvent, GrpcRequest, HttpRequest, HttpResponse, KeyValue, Plugin, Settings, Workspace};
use yaak_models::models::{
CookieJar, Environment, Folder, GrpcConnection, GrpcEvent, GrpcRequest, HttpRequest,
HttpResponse, KeyValue, Plugin, Settings, Workspace,
};
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
@@ -21,6 +24,9 @@ pub enum InternalEventPayload {
BootRequest(PluginBootRequest),
BootResponse(PluginBootResponse),
ReloadRequest(EmptyResponse),
ReloadResponse(EmptyResponse),
ImportRequest(ImportRequest),
ImportResponse(ImportResponse),

View File

@@ -0,0 +1,66 @@
use crate::events::{EmptyResponse, InternalEvent, InternalEventPayload, PluginBootResponse};
use crate::server::plugin_runtime::EventStreamEvent;
use crate::util::gen_id;
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
#[derive(Clone)]
pub struct PluginHandle {
pub ref_id: String,
pub dir: String,
pub(crate) to_plugin_tx: Arc<Mutex<mpsc::Sender<tonic::Result<EventStreamEvent>>>>,
pub(crate) boot_resp: Arc<Mutex<Option<PluginBootResponse>>>,
}
impl PluginHandle {
pub async fn name(&self) -> String {
match &*self.boot_resp.lock().await {
None => "__NOT_BOOTED__".to_string(),
Some(r) => r.name.to_owned(),
}
}
pub async fn info(&self) -> Option<PluginBootResponse> {
let resp = &*self.boot_resp.lock().await;
resp.clone()
}
pub fn build_event_to_send(
&self,
payload: &InternalEventPayload,
reply_id: Option<String>,
) -> InternalEvent {
InternalEvent {
id: gen_id(),
plugin_ref_id: self.ref_id.clone(),
reply_id,
payload: payload.clone(),
}
}
pub async fn reload(&self) -> crate::error::Result<()> {
let event = self.build_event_to_send(&InternalEventPayload::ReloadRequest(EmptyResponse{}), None);
self.send(&event).await
}
pub async fn send(&self, event: &InternalEvent) -> crate::error::Result<()> {
// info!(
// "Sending event to plugin {} {:?}",
// event.id,
// self.name().await
// );
self.to_plugin_tx
.lock()
.await
.send(Ok(EventStreamEvent {
event: serde_json::to_string(&event)?,
}))
.await?;
Ok(())
}
pub async fn boot(&self, resp: &PluginBootResponse) {
let mut boot_resp = self.boot_resp.lock().await;
*boot_resp = Some(resp.clone());
}
}

View File

@@ -4,3 +4,5 @@ pub mod manager;
mod nodejs;
pub mod plugin;
mod server;
pub mod handle;
mod util;

View File

@@ -15,6 +15,7 @@ use std::time::Duration;
use tauri::{AppHandle, Runtime};
use tokio::sync::mpsc;
use tokio::sync::watch::Sender;
use crate::handle::PluginHandle;
pub struct PluginManager {
kill_tx: Sender<bool>,
@@ -38,6 +39,10 @@ impl PluginManager {
PluginManager { kill_tx, server }
}
pub async fn reload_all(&self) {
self.server.reload_plugins().await
}
pub async fn subscribe(&self) -> (String, mpsc::Receiver<InternalEvent>) {
self.server.subscribe().await
}
@@ -68,6 +73,10 @@ impl PluginManager {
self.server.plugin_by_dir(dir).await.ok()?.info().await
}
pub async fn get_plugin(&self, ref_id: &str) -> Result<PluginHandle> {
self.server.plugin_by_ref_id(ref_id).await
}
pub async fn get_http_request_actions(&self) -> Result<Vec<GetHttpRequestActionsResponse>> {
let reply_events = self
.server

View File

@@ -1,7 +1,7 @@
use rand::distributions::{Alphanumeric, DistString};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use log::warn;
use tokio::sync::mpsc::Receiver;
use tokio::sync::{mpsc, Mutex};
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
@@ -10,8 +10,10 @@ use tonic::{Request, Response, Status, Streaming};
use crate::error::Error::PluginNotFoundErr;
use crate::error::Result;
use crate::events::{PluginBootRequest, PluginBootResponse, InternalEvent, InternalEventPayload};
use crate::events::{InternalEvent, InternalEventPayload, PluginBootRequest, PluginBootResponse};
use crate::handle::PluginHandle;
use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntime;
use crate::util::gen_id;
use plugin_runtime::EventStreamEvent;
use yaak_models::queries::generate_id;
@@ -22,62 +24,6 @@ pub mod plugin_runtime {
type ResponseStream =
Pin<Box<dyn Stream<Item = std::result::Result<EventStreamEvent, Status>> + Send>>;
#[derive(Clone)]
pub struct PluginHandle {
dir: String,
to_plugin_tx: Arc<Mutex<mpsc::Sender<tonic::Result<EventStreamEvent>>>>,
ref_id: String,
boot_resp: Arc<Mutex<Option<PluginBootResponse>>>,
}
impl PluginHandle {
pub async fn name(&self) -> String {
match &*self.boot_resp.lock().await {
None => "__NOT_BOOTED__".to_string(),
Some(r) => r.name.to_owned(),
}
}
pub async fn info(&self) -> Option<PluginBootResponse> {
let resp = &*self.boot_resp.lock().await;
resp.clone()
}
pub fn build_event_to_send(
&self,
payload: &InternalEventPayload,
reply_id: Option<String>,
) -> InternalEvent {
InternalEvent {
id: gen_id(),
plugin_ref_id: self.ref_id.clone(),
reply_id,
payload: payload.clone(),
}
}
pub async fn send(&self, event: &InternalEvent) -> Result<()> {
// info!(
// "Sending event to plugin {} {:?}",
// event.id,
// self.name().await
// );
self.to_plugin_tx
.lock()
.await
.send(Ok(EventStreamEvent {
event: serde_json::to_string(&event)?,
}))
.await?;
Ok(())
}
pub async fn boot(&self, resp: &PluginBootResponse) {
let mut boot_resp = self.boot_resp.lock().await;
*boot_resp = Some(resp.clone());
}
}
#[derive(Clone)]
pub struct PluginRuntimeGrpcServer {
plugin_ref_to_plugin: Arc<Mutex<HashMap<String, PluginHandle>>>,
@@ -96,6 +42,15 @@ impl PluginRuntimeGrpcServer {
}
}
pub async fn plugins(&self) -> Vec<PluginHandle> {
self.plugin_ref_to_plugin
.lock()
.await
.iter()
.map(|p| p.1.to_owned())
.collect::<Vec<PluginHandle>>()
}
pub async fn subscribe(&self) -> (String, Receiver<InternalEvent>) {
let (tx, rx) = mpsc::channel(128);
let rx_id = generate_id();
@@ -115,23 +70,15 @@ impl PluginRuntimeGrpcServer {
pub async fn remove_plugin(&self, id: &str) {
match self.plugin_ref_to_plugin.lock().await.remove(id) {
None => {
println!("Tried to remove non-existing plugin {}", id);
}
Some(plugin) => {
println!("Removed plugin {} {}", id, plugin.name().await);
}
None => println!("Tried to remove non-existing plugin {}", id),
Some(plugin) => println!("Removed plugin {} {}", id, plugin.name().await),
};
}
pub async fn boot_plugin(&self, id: &str, resp: &PluginBootResponse) {
match self.plugin_ref_to_plugin.lock().await.get(id) {
None => {
println!("Tried booting non-existing plugin {}", id);
}
Some(plugin) => {
plugin.clone().boot(resp).await;
}
None => println!("Tried booting non-existing plugin {}", id),
Some(plugin) => plugin.clone().boot(resp).await,
}
}
@@ -162,7 +109,7 @@ impl PluginRuntimeGrpcServer {
Some(p) => Ok(p.to_owned()),
}
}
pub async fn plugin_by_dir(&self, dir: &str) -> Result<PluginHandle> {
let plugins = self.plugin_ref_to_plugin.lock().await;
for p in plugins.values() {
@@ -299,6 +246,14 @@ impl PluginRuntimeGrpcServer {
Ok(events)
}
pub async fn reload_plugins(&self) {
for (_, plugin) in self.plugin_ref_to_plugin.lock().await.clone() {
if let Err(e) = plugin.reload().await {
warn!("Failed to reload plugin {} {}", plugin.dir, e)
}
}
}
async fn load_plugins(
&self,
to_plugin_tx: mpsc::Sender<tonic::Result<EventStreamEvent>>,
@@ -396,7 +351,3 @@ impl PluginRuntime for PluginRuntimeGrpcServer {
))
}
}
fn gen_id() -> String {
Alphanumeric.sample_string(&mut rand::thread_rng(), 5)
}

View File

@@ -0,0 +1,5 @@
use rand::distributions::{Alphanumeric, DistString};
pub fn gen_id() -> String {
Alphanumeric.sample_string(&mut rand::thread_rng(), 5)
}