mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-24 02:11:10 +01:00
Plugin runtime v2 (#62)
This commit is contained in:
40
src-tauri/yaak_plugin_runtime/src/error.rs
Normal file
40
src-tauri/yaak_plugin_runtime/src/error.rs
Normal file
@@ -0,0 +1,40 @@
|
||||
use thiserror::Error;
|
||||
use tokio::io;
|
||||
use tokio::sync::mpsc::error::SendError;
|
||||
use crate::server::plugin_runtime::EventStreamEvent;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("IO error")]
|
||||
IoErr(#[from] io::Error),
|
||||
#[error("Tauri error")]
|
||||
TauriErr(#[from] tauri::Error),
|
||||
#[error("Tauri shell error")]
|
||||
TauriShellErr(#[from] tauri_plugin_shell::Error),
|
||||
#[error("Grpc transport error")]
|
||||
GrpcTransportErr(#[from] tonic::transport::Error),
|
||||
#[error("Grpc send error")]
|
||||
GrpcSendErr(#[from] SendError<tonic::Result<EventStreamEvent>>),
|
||||
#[error("JSON error")]
|
||||
JsonErr(#[from] serde_json::Error),
|
||||
#[error("Plugin not found error")]
|
||||
PluginNotFoundErr(String),
|
||||
#[error("unknown error")]
|
||||
MissingCallbackIdErr(String),
|
||||
#[error("Missing callback ID error")]
|
||||
MissingCallbackErr(String),
|
||||
#[error("No plugins found")]
|
||||
NoPluginsErr(String),
|
||||
#[error("Plugin error")]
|
||||
PluginErr(String),
|
||||
#[error("Unknown error")]
|
||||
UnknownErr(String),
|
||||
}
|
||||
|
||||
impl Into<String> for Error {
|
||||
fn into(self) -> String {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
pub type Result<T> = std::result::Result<T, Error>;
|
||||
132
src-tauri/yaak_plugin_runtime/src/events.rs
Normal file
132
src-tauri/yaak_plugin_runtime/src/events.rs
Normal file
@@ -0,0 +1,132 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use ts_rs::TS;
|
||||
|
||||
use yaak_models::models::{Environment, Folder, GrpcRequest, HttpRequest, HttpResponse, Workspace};
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct InternalEvent {
|
||||
pub id: String,
|
||||
pub plugin_ref_id: String,
|
||||
pub reply_id: Option<String>,
|
||||
pub payload: InternalEventPayload,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
#[serde(tag = "type")]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[ts(export)]
|
||||
pub enum InternalEventPayload {
|
||||
BootRequest(BootRequest),
|
||||
BootResponse(BootResponse),
|
||||
ImportRequest(ImportRequest),
|
||||
ImportResponse(ImportResponse),
|
||||
FilterRequest(FilterRequest),
|
||||
FilterResponse(FilterResponse),
|
||||
ExportHttpRequestRequest(ExportHttpRequestRequest),
|
||||
ExportHttpRequestResponse(ExportHttpRequestResponse),
|
||||
/// Returned when a plugin doesn't get run, just so the server
|
||||
/// has something to listen for
|
||||
EmptyResponse(EmptyResponse),
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default)]
|
||||
#[ts(export, type = "{}")]
|
||||
pub struct EmptyResponse {}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct BootRequest {
|
||||
pub dir: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct BootResponse {
|
||||
pub name: String,
|
||||
pub version: String,
|
||||
pub capabilities: Vec<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ImportRequest {
|
||||
pub content: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ImportResponse {
|
||||
pub resources: ImportResources,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct FilterRequest {
|
||||
pub content: String,
|
||||
pub filter: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct FilterResponse {
|
||||
pub items: Vec<Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ExportHttpRequestRequest {
|
||||
pub http_request: HttpRequest,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ExportHttpRequestResponse {
|
||||
pub content: String,
|
||||
}
|
||||
|
||||
// TODO: Migrate plugins to return this type
|
||||
// #[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
// #[serde(rename_all = "camelCase", untagged)]
|
||||
// #[ts(export)]
|
||||
// pub enum ExportableModel {
|
||||
// Workspace(Workspace),
|
||||
// Environment(Environment),
|
||||
// Folder(Folder),
|
||||
// HttpRequest(HttpRequest),
|
||||
// GrpcRequest(GrpcRequest),
|
||||
// }
|
||||
|
||||
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
|
||||
#[serde(default, rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub struct ImportResources {
|
||||
pub workspaces: Vec<Workspace>,
|
||||
pub environments: Vec<Environment>,
|
||||
pub folders: Vec<Folder>,
|
||||
pub http_requests: Vec<HttpRequest>,
|
||||
pub grpc_requests: Vec<GrpcRequest>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
#[ts(export)]
|
||||
pub enum Model {
|
||||
Workspace(Workspace),
|
||||
Environment(Environment),
|
||||
Folder(Folder),
|
||||
HttpRequest(HttpRequest),
|
||||
HttpResponse(HttpResponse),
|
||||
GrpcRequest(GrpcRequest),
|
||||
}
|
||||
@@ -1,41 +1,6 @@
|
||||
extern crate core;
|
||||
|
||||
use crate::manager::PluginManager;
|
||||
use log::info;
|
||||
use std::process::exit;
|
||||
use tauri::plugin::{Builder, TauriPlugin};
|
||||
use tauri::{Manager, RunEvent, Runtime, State};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
pub mod error;
|
||||
mod events;
|
||||
pub mod manager;
|
||||
mod nodejs;
|
||||
|
||||
pub mod plugin_runtime {
|
||||
tonic::include_proto!("yaak.plugins.runtime");
|
||||
}
|
||||
|
||||
pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
Builder::new("yaak_plugin_runtime")
|
||||
.setup(|app, _| {
|
||||
tauri::async_runtime::block_on(async move {
|
||||
let manager = PluginManager::new(&app).await;
|
||||
let manager_state = Mutex::new(manager);
|
||||
app.manage(manager_state);
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.on_event(|app, e| match e {
|
||||
// TODO: Also exit when app is force-quit (eg. cmd+r in IntelliJ runner)
|
||||
RunEvent::ExitRequested { api, .. } => {
|
||||
api.prevent_exit();
|
||||
tauri::async_runtime::block_on(async move {
|
||||
info!("Exiting plugin runtime due to app exit");
|
||||
let manager: State<Mutex<PluginManager>> = app.state();
|
||||
manager.lock().await.cleanup().await;
|
||||
exit(0);
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
})
|
||||
.build()
|
||||
}
|
||||
pub mod plugin;
|
||||
mod server;
|
||||
|
||||
26
src-tauri/yaak_plugin_runtime/src/main.rs
Normal file
26
src-tauri/yaak_plugin_runtime/src/main.rs
Normal file
@@ -0,0 +1,26 @@
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
// let dir = env::var("YAAK_PLUGINS_DIR").expect("YAAK_PLUGINS_DIR not set");
|
||||
//
|
||||
// let plugin_dirs: Vec<String> = match read_dir(dir) {
|
||||
// Ok(result) => {
|
||||
// let mut dirs: Vec<String> = vec![];
|
||||
// for entry_result in result {
|
||||
// match entry_result {
|
||||
// Ok(entry) => {
|
||||
// if entry.path().is_dir() {
|
||||
// dirs.push(entry.path().to_string_lossy().to_string())
|
||||
// }
|
||||
// }
|
||||
// Err(_) => {
|
||||
// continue;
|
||||
// }
|
||||
// }
|
||||
// };
|
||||
// dirs
|
||||
// }
|
||||
// Err(_) => vec![],
|
||||
// };
|
||||
// start_server(plugin_dirs).await.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,34 +1,38 @@
|
||||
use log::{debug, info};
|
||||
use std::time::Duration;
|
||||
use tauri::{AppHandle, Manager, Runtime};
|
||||
use tokio::sync::watch::Sender;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
use crate::nodejs::node_start;
|
||||
use crate::plugin_runtime::plugin_runtime_client::PluginRuntimeClient;
|
||||
use crate::plugin_runtime::{
|
||||
HookExportRequest, HookImportRequest, HookResponse, HookResponseFilterRequest,
|
||||
use crate::error::Result;
|
||||
use crate::events::{
|
||||
ExportHttpRequestRequest, ExportHttpRequestResponse, FilterRequest, FilterResponse,
|
||||
ImportRequest, ImportResponse, InternalEventPayload,
|
||||
};
|
||||
|
||||
use crate::error::Error::PluginErr;
|
||||
use crate::nodejs::start_nodejs_plugin_runtime;
|
||||
use crate::plugin::start_server;
|
||||
use crate::server::PluginRuntimeGrpcServer;
|
||||
use std::time::Duration;
|
||||
use tauri::{AppHandle, Runtime};
|
||||
use tokio::sync::watch::Sender;
|
||||
use yaak_models::models::HttpRequest;
|
||||
|
||||
pub struct PluginManager {
|
||||
client: PluginRuntimeClient<Channel>,
|
||||
kill_tx: Sender<bool>,
|
||||
server: PluginRuntimeGrpcServer,
|
||||
}
|
||||
|
||||
impl PluginManager {
|
||||
pub async fn new<R: Runtime>(app_handle: &AppHandle<R>) -> PluginManager {
|
||||
let temp_dir = app_handle.path().temp_dir().unwrap();
|
||||
pub async fn new<R: Runtime>(
|
||||
app_handle: &AppHandle<R>,
|
||||
plugin_dirs: Vec<String>,
|
||||
) -> PluginManager {
|
||||
let (server, addr) = start_server(plugin_dirs)
|
||||
.await
|
||||
.expect("Failed to start plugin runtime server");
|
||||
|
||||
let (kill_tx, kill_rx) = tokio::sync::watch::channel(false);
|
||||
let start_resp = node_start(app_handle, &temp_dir, &kill_rx).await;
|
||||
info!("Connecting to gRPC client at {}", start_resp.addr);
|
||||
start_nodejs_plugin_runtime(app_handle, addr, &kill_rx)
|
||||
.await
|
||||
.expect("Failed to start plugin runtime");
|
||||
|
||||
let client = match PluginRuntimeClient::connect(start_resp.addr.clone()).await {
|
||||
Ok(v) => v,
|
||||
Err(err) => panic!("{}", err.to_string()),
|
||||
};
|
||||
|
||||
PluginManager { client, kill_tx }
|
||||
PluginManager { kill_tx, server }
|
||||
}
|
||||
|
||||
pub async fn cleanup(&mut self) {
|
||||
@@ -38,49 +42,81 @@ impl PluginManager {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
|
||||
pub async fn run_import(&mut self, data: &str) -> Result<HookResponse, String> {
|
||||
let response = self
|
||||
.client
|
||||
.hook_import(tonic::Request::new(HookImportRequest {
|
||||
data: data.to_string(),
|
||||
pub async fn run_import(&mut self, content: &str) -> Result<(ImportResponse, String)> {
|
||||
let reply_events = self
|
||||
.server
|
||||
.send_and_wait(&InternalEventPayload::ImportRequest(ImportRequest {
|
||||
content: content.to_string(),
|
||||
}))
|
||||
.await
|
||||
.map_err(|e| e.message().to_string())?;
|
||||
.await?;
|
||||
|
||||
Ok(response.into_inner())
|
||||
// TODO: Don't just return the first valid response
|
||||
for event in reply_events {
|
||||
match event.payload {
|
||||
InternalEventPayload::ImportResponse(resp) => {
|
||||
let ref_id = event.plugin_ref_id.as_str();
|
||||
let plugin = self.server.plugin_by_ref_id(ref_id).await?;
|
||||
let plugin_name = plugin.name().await;
|
||||
return Ok((resp, plugin_name));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
Err(PluginErr("No import responses found".to_string()))
|
||||
}
|
||||
|
||||
pub async fn run_export_curl(&mut self, request: &str) -> Result<HookResponse, String> {
|
||||
let response = self
|
||||
.client
|
||||
.hook_export(tonic::Request::new(HookExportRequest {
|
||||
request: request.to_string(),
|
||||
}))
|
||||
.await
|
||||
.map_err(|e| e.message().to_string())?;
|
||||
pub async fn run_export_curl(
|
||||
&mut self,
|
||||
request: &HttpRequest,
|
||||
) -> Result<ExportHttpRequestResponse> {
|
||||
let event = self
|
||||
.server
|
||||
.send_to_plugin_and_wait(
|
||||
"exporter-curl",
|
||||
&InternalEventPayload::ExportHttpRequestRequest(ExportHttpRequestRequest {
|
||||
http_request: request.to_owned(),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(response.into_inner())
|
||||
match event.payload {
|
||||
InternalEventPayload::ExportHttpRequestResponse(resp) => Ok(resp),
|
||||
InternalEventPayload::EmptyResponse(_) => {
|
||||
Err(PluginErr("Export returned empty".to_string()))
|
||||
}
|
||||
e => Err(PluginErr(format!("Export returned invalid event {:?}", e))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_response_filter(
|
||||
pub async fn run_filter(
|
||||
&mut self,
|
||||
filter: &str,
|
||||
body: &str,
|
||||
content: &str,
|
||||
content_type: &str,
|
||||
) -> Result<HookResponse, String> {
|
||||
debug!("Running plugin filter");
|
||||
let response = self
|
||||
.client
|
||||
.hook_response_filter(tonic::Request::new(HookResponseFilterRequest {
|
||||
filter: filter.to_string(),
|
||||
body: body.to_string(),
|
||||
content_type: content_type.to_string(),
|
||||
}))
|
||||
.await
|
||||
.map_err(|e| e.message().to_string())?;
|
||||
) -> Result<FilterResponse> {
|
||||
let plugin_name = match content_type {
|
||||
"application/json" => "filter-jsonpath",
|
||||
_ => "filter-xpath",
|
||||
};
|
||||
|
||||
let result = response.into_inner();
|
||||
debug!("Ran plugin response filter {}", result.data);
|
||||
Ok(result)
|
||||
let event = self
|
||||
.server
|
||||
.send_to_plugin_and_wait(
|
||||
plugin_name,
|
||||
&InternalEventPayload::FilterRequest(FilterRequest {
|
||||
filter: filter.to_string(),
|
||||
content: content.to_string(),
|
||||
}),
|
||||
)
|
||||
.await?;
|
||||
|
||||
match event.payload {
|
||||
InternalEventPayload::FilterResponse(resp) => Ok(resp),
|
||||
InternalEventPayload::EmptyResponse(_) => {
|
||||
Err(PluginErr("Filter returned empty".to_string()))
|
||||
}
|
||||
e => Err(PluginErr(format!("Export returned invalid event {:?}", e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1,12 @@
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use std::net::SocketAddr;
|
||||
use crate::error::Result;
|
||||
use log::info;
|
||||
use rand::distributions::{Alphanumeric, DistString};
|
||||
use serde;
|
||||
use serde::Deserialize;
|
||||
use tauri::path::BaseDirectory;
|
||||
use tauri::{AppHandle, Manager, Runtime};
|
||||
use tauri_plugin_shell::process::CommandEvent;
|
||||
use tauri_plugin_shell::ShellExt;
|
||||
use tokio::fs;
|
||||
use tokio::sync::watch::Receiver;
|
||||
|
||||
#[derive(Deserialize, Default)]
|
||||
@@ -17,57 +15,48 @@ struct PortFile {
|
||||
port: i32,
|
||||
}
|
||||
|
||||
pub struct StartResp {
|
||||
pub addr: String,
|
||||
}
|
||||
|
||||
pub async fn node_start<R: Runtime>(
|
||||
pub async fn start_nodejs_plugin_runtime<R: Runtime>(
|
||||
app: &AppHandle<R>,
|
||||
temp_dir: &PathBuf,
|
||||
addr: SocketAddr,
|
||||
kill_rx: &Receiver<bool>,
|
||||
) -> StartResp {
|
||||
let port_file_path = temp_dir.join(Alphanumeric.sample_string(&mut rand::thread_rng(), 10));
|
||||
|
||||
let plugins_dir = app
|
||||
.path()
|
||||
.resolve("plugins", BaseDirectory::Resource)
|
||||
.expect("failed to resolve plugin directory resource");
|
||||
|
||||
) -> Result<()> {
|
||||
let plugin_runtime_main = app
|
||||
.path()
|
||||
.resolve("plugin-runtime", BaseDirectory::Resource)
|
||||
.expect("failed to resolve plugin runtime resource")
|
||||
.resolve("plugin-runtime", BaseDirectory::Resource)?
|
||||
.join("index.cjs");
|
||||
|
||||
// HACK: Remove UNC prefix for Windows paths to pass to sidecar
|
||||
|
||||
let plugins_dir = dunce::simplified(plugins_dir.as_path())
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
let plugin_runtime_main = dunce::simplified(plugin_runtime_main.as_path())
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
|
||||
info!(
|
||||
"Starting plugin runtime\n → port_file={}\n → plugins_dir={}\n → runtime_dir={}",
|
||||
port_file_path.to_string_lossy(),
|
||||
plugins_dir,
|
||||
plugin_runtime_main,
|
||||
);
|
||||
info!("Starting plugin runtime main={}", plugin_runtime_main);
|
||||
|
||||
let cmd = app
|
||||
.shell()
|
||||
.sidecar("yaaknode")
|
||||
.expect("yaaknode not found")
|
||||
.env("YAAK_GRPC_PORT_FILE_PATH", port_file_path.clone())
|
||||
.env("YAAK_PLUGINS_DIR", plugins_dir)
|
||||
.sidecar("yaaknode")?
|
||||
.env("PORT", addr.port().to_string())
|
||||
.args(&[plugin_runtime_main]);
|
||||
|
||||
println!("Waiting on plugin runtime");
|
||||
let (_, child) = cmd.spawn().expect("yaaknode failed to start");
|
||||
let (mut child_rx, child) = cmd.spawn()?;
|
||||
println!("Spawned plugin runtime");
|
||||
|
||||
let mut kill_rx = kill_rx.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
while let Some(event) = child_rx.recv().await {
|
||||
match event {
|
||||
CommandEvent::Stderr(line) => {
|
||||
print!("{}", String::from_utf8(line).unwrap());
|
||||
}
|
||||
CommandEvent::Stdout(line) => {
|
||||
print!("{}", String::from_utf8(line).unwrap());
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Check on child
|
||||
tokio::spawn(async move {
|
||||
kill_rx
|
||||
@@ -77,26 +66,7 @@ pub async fn node_start<R: Runtime>(
|
||||
info!("Killing plugin runtime");
|
||||
child.kill().expect("Failed to kill plugin runtime");
|
||||
info!("Killed plugin runtime");
|
||||
return;
|
||||
});
|
||||
|
||||
let start = std::time::Instant::now();
|
||||
let port_file_contents = loop {
|
||||
if start.elapsed().as_millis() > 30000 {
|
||||
panic!("Failed to read port file in time");
|
||||
}
|
||||
|
||||
match fs::read_to_string(port_file_path.clone()).await {
|
||||
Ok(s) => break s,
|
||||
Err(_) => {
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let port_file: PortFile = serde_json::from_str(port_file_contents.as_str()).unwrap();
|
||||
info!("Started plugin runtime on :{}", port_file.port);
|
||||
let addr = format!("http://localhost:{}", port_file.port);
|
||||
|
||||
StartResp { addr }
|
||||
Ok(())
|
||||
}
|
||||
|
||||
112
src-tauri/yaak_plugin_runtime/src/plugin.rs
Normal file
112
src-tauri/yaak_plugin_runtime/src/plugin.rs
Normal file
@@ -0,0 +1,112 @@
|
||||
use std::net::SocketAddr;
|
||||
use std::path::PathBuf;
|
||||
use std::process::exit;
|
||||
use std::time::Duration;
|
||||
|
||||
use log::info;
|
||||
use tauri::path::BaseDirectory;
|
||||
use tauri::plugin::{Builder, TauriPlugin};
|
||||
use tauri::{Manager, RunEvent, Runtime, State};
|
||||
use tokio::fs::read_dir;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Mutex;
|
||||
use tonic::codegen::tokio_stream;
|
||||
use tonic::transport::Server;
|
||||
|
||||
use crate::error::Result;
|
||||
use crate::events::{InternalEvent, InternalEventPayload};
|
||||
use crate::manager::PluginManager;
|
||||
use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntimeServer;
|
||||
use crate::server::PluginRuntimeGrpcServer;
|
||||
|
||||
pub fn init<R: Runtime>() -> TauriPlugin<R> {
|
||||
Builder::new("yaak_plugin_runtime")
|
||||
.setup(|app, _| {
|
||||
let plugins_dir = app
|
||||
.path()
|
||||
.resolve("plugins", BaseDirectory::Resource)
|
||||
.expect("failed to resolve plugin directory resource");
|
||||
|
||||
tauri::async_runtime::block_on(async move {
|
||||
let plugin_dirs = read_plugins_dir(&plugins_dir)
|
||||
.await
|
||||
.expect("Failed to read plugins dir");
|
||||
let manager = PluginManager::new(&app, plugin_dirs).await;
|
||||
let manager_state = Mutex::new(manager);
|
||||
app.manage(manager_state);
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
.on_event(|app, e| match e {
|
||||
// TODO: Also exit when app is force-quit (eg. cmd+r in IntelliJ runner)
|
||||
RunEvent::ExitRequested { api, .. } => {
|
||||
api.prevent_exit();
|
||||
tauri::async_runtime::block_on(async move {
|
||||
info!("Exiting plugin runtime due to app exit");
|
||||
let manager: State<Mutex<PluginManager>> = app.state();
|
||||
manager.lock().await.cleanup().await;
|
||||
exit(0);
|
||||
});
|
||||
}
|
||||
_ => {}
|
||||
})
|
||||
.build()
|
||||
}
|
||||
|
||||
pub async fn start_server(
|
||||
plugin_dirs: Vec<String>,
|
||||
) -> Result<(PluginRuntimeGrpcServer, SocketAddr)> {
|
||||
println!("Starting plugin server with {plugin_dirs:?}");
|
||||
let server = PluginRuntimeGrpcServer::new(plugin_dirs);
|
||||
|
||||
let svc = PluginRuntimeServer::new(server.clone());
|
||||
let listen_addr = match option_env!("PORT") {
|
||||
None => "localhost:0".to_string(),
|
||||
Some(port) => format!("localhost:{port}"),
|
||||
};
|
||||
|
||||
{
|
||||
let server = server.clone();
|
||||
tokio::spawn(async move {
|
||||
let (rx_id, mut rx) = server.subscribe().await;
|
||||
while let Some(event) = rx.recv().await {
|
||||
match event.clone() {
|
||||
InternalEvent {
|
||||
payload: InternalEventPayload::BootResponse(resp),
|
||||
plugin_ref_id,
|
||||
..
|
||||
} => {
|
||||
server.boot_plugin(plugin_ref_id.as_str(), &resp).await;
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
}
|
||||
server.unsubscribe(rx_id).await;
|
||||
});
|
||||
};
|
||||
|
||||
let listener = TcpListener::bind(listen_addr).await?;
|
||||
let addr = listener.local_addr()?;
|
||||
println!("Starting gRPC plugin server on {addr}");
|
||||
tokio::spawn(async move {
|
||||
Server::builder()
|
||||
.timeout(Duration::from_secs(10))
|
||||
.add_service(svc)
|
||||
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
|
||||
.await
|
||||
.expect("grpc plugin runtime server failed to start");
|
||||
});
|
||||
|
||||
Ok((server, addr))
|
||||
}
|
||||
|
||||
async fn read_plugins_dir(dir: &PathBuf) -> Result<Vec<String>> {
|
||||
let mut result = read_dir(dir).await?;
|
||||
let mut dirs: Vec<String> = vec![];
|
||||
while let Ok(Some(entry)) = result.next_entry().await {
|
||||
if entry.path().is_dir() {
|
||||
dirs.push(entry.path().to_string_lossy().to_string())
|
||||
}
|
||||
}
|
||||
Ok(dirs)
|
||||
}
|
||||
448
src-tauri/yaak_plugin_runtime/src/server.rs
Normal file
448
src-tauri/yaak_plugin_runtime/src/server.rs
Normal file
@@ -0,0 +1,448 @@
|
||||
use log::info;
|
||||
use rand::distributions::{Alphanumeric, DistString};
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::sync::mpsc::Receiver;
|
||||
use tokio::sync::{mpsc, Mutex};
|
||||
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::codegen::tokio_stream::{Stream, StreamExt};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
use crate::error::Error::{NoPluginsErr, PluginNotFoundErr};
|
||||
use crate::error::Result;
|
||||
use crate::events::{BootRequest, BootResponse, InternalEvent, InternalEventPayload};
|
||||
use crate::server::plugin_runtime::plugin_runtime_server::PluginRuntime;
|
||||
use plugin_runtime::EventStreamEvent;
|
||||
use yaak_models::queries::generate_id;
|
||||
|
||||
pub mod plugin_runtime {
|
||||
tonic::include_proto!("yaak.plugins.runtime");
|
||||
}
|
||||
|
||||
type ResponseStream =
|
||||
Pin<Box<dyn Stream<Item = std::result::Result<EventStreamEvent, Status>> + Send>>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PluginHandle {
|
||||
dir: String,
|
||||
to_plugin_tx: Arc<Mutex<mpsc::Sender<tonic::Result<EventStreamEvent>>>>,
|
||||
ref_id: String,
|
||||
boot_resp: Arc<Mutex<Option<BootResponse>>>,
|
||||
}
|
||||
|
||||
impl PluginHandle {
|
||||
pub async fn name(&self) -> String {
|
||||
match &*self.boot_resp.lock().await {
|
||||
None => "__NOT_BOOTED__".to_string(),
|
||||
Some(r) => r.name.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn build_event_to_send(
|
||||
&self,
|
||||
payload: &InternalEventPayload,
|
||||
reply_id: Option<String>,
|
||||
) -> InternalEvent {
|
||||
InternalEvent {
|
||||
id: gen_id(),
|
||||
plugin_ref_id: self.ref_id.clone(),
|
||||
reply_id,
|
||||
payload: payload.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send(&self, event: &InternalEvent) -> Result<()> {
|
||||
info!("Sending event {} {:?}", event.id, self.name().await);
|
||||
self.to_plugin_tx
|
||||
.lock()
|
||||
.await
|
||||
.send(Ok(EventStreamEvent {
|
||||
event: serde_json::to_string(&event)?,
|
||||
}))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn boot(&self, resp: &BootResponse) {
|
||||
let mut boot_resp = self.boot_resp.lock().await;
|
||||
*boot_resp = Some(resp.clone());
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct PluginRuntimeGrpcServer {
|
||||
plugin_ref_to_plugin: Arc<Mutex<HashMap<String, PluginHandle>>>,
|
||||
callback_to_plugin_ref: Arc<Mutex<HashMap<String, String>>>,
|
||||
subscribers: Arc<Mutex<HashMap<String, mpsc::Sender<InternalEvent>>>>,
|
||||
plugin_dirs: Vec<String>,
|
||||
}
|
||||
|
||||
impl PluginRuntimeGrpcServer {
|
||||
pub fn new(plugin_dirs: Vec<String>) -> Self {
|
||||
PluginRuntimeGrpcServer {
|
||||
plugin_ref_to_plugin: Arc::new(Mutex::new(HashMap::new())),
|
||||
callback_to_plugin_ref: Arc::new(Mutex::new(HashMap::new())),
|
||||
subscribers: Arc::new(Mutex::new(HashMap::new())),
|
||||
plugin_dirs,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn subscribe(&self) -> (String, Receiver<InternalEvent>) {
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
let id = generate_id();
|
||||
self.subscribers.lock().await.insert(id.clone(), tx);
|
||||
(id, rx)
|
||||
}
|
||||
|
||||
pub async fn unsubscribe(&self, rx_id: String) {
|
||||
self.subscribers.lock().await.remove(rx_id.as_str());
|
||||
}
|
||||
|
||||
pub async fn remove_plugins(&self, plugin_ids: Vec<String>) {
|
||||
for plugin_id in plugin_ids {
|
||||
self.remove_plugin(plugin_id.as_str()).await;
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn remove_plugin(&self, id: &str) {
|
||||
match self.plugin_ref_to_plugin.lock().await.remove(id) {
|
||||
None => {
|
||||
println!("Tried to remove non-existing plugin {}", id);
|
||||
}
|
||||
Some(plugin) => {
|
||||
println!("Removed plugin {} {}", id, plugin.name().await);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
pub async fn boot_plugin(&self, id: &str, resp: &BootResponse) {
|
||||
match self.plugin_ref_to_plugin.lock().await.get(id) {
|
||||
None => {
|
||||
println!("Tried booting non-existing plugin {}", id);
|
||||
}
|
||||
Some(plugin) => {
|
||||
plugin.clone().boot(resp).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn add_plugin(
|
||||
&self,
|
||||
dir: &str,
|
||||
tx: mpsc::Sender<tonic::Result<EventStreamEvent>>,
|
||||
) -> PluginHandle {
|
||||
let ref_id = gen_id();
|
||||
let plugin_handle = PluginHandle {
|
||||
ref_id: ref_id.clone(),
|
||||
dir: dir.to_string(),
|
||||
to_plugin_tx: Arc::new(Mutex::new(tx)),
|
||||
boot_resp: Arc::new(Mutex::new(None)),
|
||||
};
|
||||
let _ = self
|
||||
.plugin_ref_to_plugin
|
||||
.lock()
|
||||
.await
|
||||
.insert(ref_id, plugin_handle.clone());
|
||||
plugin_handle
|
||||
}
|
||||
|
||||
// pub async fn callback(
|
||||
// &self,
|
||||
// source_event: InternalEvent,
|
||||
// payload: InternalEventPayload,
|
||||
// ) -> Result<InternalEvent> {
|
||||
// let reply_id = match source_event.clone().reply_id {
|
||||
// None => {
|
||||
// let msg = format!("Source event missing reply Id {:?}", source_event.clone());
|
||||
// return Err(MissingCallbackIdErr(msg));
|
||||
// }
|
||||
// Some(id) => id,
|
||||
// };
|
||||
//
|
||||
// let callbacks = self.callbacks.lock().await;
|
||||
// let plugin_name = match callbacks.get(reply_id.as_str()) {
|
||||
// None => {
|
||||
// let msg = format!("Callback not found {:?}", source_event);
|
||||
// return Err(MissingCallbackErr(msg));
|
||||
// }
|
||||
// Some(n) => n,
|
||||
// };
|
||||
//
|
||||
// let plugins = self.plugins.lock().await;
|
||||
// let plugin = match plugins.get(plugin_name) {
|
||||
// None => {
|
||||
// let msg = format!(
|
||||
// "Plugin not found {plugin_name}. Choices were {:?}",
|
||||
// plugins.keys()
|
||||
// );
|
||||
// return Err(UnknownPluginErr(msg));
|
||||
// }
|
||||
// Some(n) => n,
|
||||
// };
|
||||
//
|
||||
// plugin.send(&payload, Some(reply_id)).await
|
||||
// }
|
||||
|
||||
pub async fn plugin_by_ref_id(&self, ref_id: &str) -> Result<PluginHandle> {
|
||||
let plugins = self.plugin_ref_to_plugin.lock().await;
|
||||
if plugins.is_empty() {
|
||||
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
|
||||
}
|
||||
|
||||
match plugins.get(ref_id) {
|
||||
None => {
|
||||
let msg = format!("Failed to find plugin for id {ref_id}");
|
||||
Err(PluginNotFoundErr(msg))
|
||||
}
|
||||
Some(p) => Ok(p.to_owned()),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn plugin_by_name(&self, plugin_name: &str) -> Result<PluginHandle> {
|
||||
let plugins = self.plugin_ref_to_plugin.lock().await;
|
||||
if plugins.is_empty() {
|
||||
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
|
||||
}
|
||||
|
||||
for p in plugins.values() {
|
||||
if p.name().await == plugin_name {
|
||||
return Ok(p.to_owned());
|
||||
}
|
||||
}
|
||||
|
||||
let msg = format!("Failed to find plugin for {plugin_name}");
|
||||
Err(PluginNotFoundErr(msg))
|
||||
}
|
||||
|
||||
pub async fn send_to_plugin(
|
||||
&self,
|
||||
plugin_name: &str,
|
||||
payload: InternalEventPayload,
|
||||
) -> Result<InternalEvent> {
|
||||
let plugins = self.plugin_ref_to_plugin.lock().await;
|
||||
if plugins.is_empty() {
|
||||
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
|
||||
}
|
||||
|
||||
let mut plugin = None;
|
||||
for p in plugins.values() {
|
||||
if p.name().await == plugin_name {
|
||||
plugin = Some(p);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
match plugin {
|
||||
Some(plugin) => {
|
||||
let event = plugin.build_event_to_send(&payload, None);
|
||||
plugin.send(&event).await?;
|
||||
Ok(event)
|
||||
}
|
||||
None => {
|
||||
let msg = format!("Failed to find plugin for {plugin_name}");
|
||||
Err(PluginNotFoundErr(msg))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn send_to_plugin_and_wait(
|
||||
&self,
|
||||
plugin_name: &str,
|
||||
payload: &InternalEventPayload,
|
||||
) -> Result<InternalEvent> {
|
||||
let plugin = self.plugin_by_name(plugin_name).await?;
|
||||
let events = self.send_to_plugins_and_wait(payload, vec![plugin]).await?;
|
||||
Ok(events.first().unwrap().to_owned())
|
||||
}
|
||||
|
||||
pub async fn send_and_wait(
|
||||
&self,
|
||||
payload: &InternalEventPayload,
|
||||
) -> Result<Vec<InternalEvent>> {
|
||||
let plugins = self
|
||||
.plugin_ref_to_plugin
|
||||
.lock()
|
||||
.await
|
||||
.values()
|
||||
.cloned()
|
||||
.collect();
|
||||
self.send_to_plugins_and_wait(payload, plugins).await
|
||||
}
|
||||
|
||||
async fn send_to_plugins_and_wait(
|
||||
&self,
|
||||
payload: &InternalEventPayload,
|
||||
plugins: Vec<PluginHandle>,
|
||||
) -> Result<Vec<InternalEvent>> {
|
||||
// 1. Build the events with IDs and everything
|
||||
let events_to_send = plugins
|
||||
.iter()
|
||||
.map(|p| p.build_event_to_send(payload, None))
|
||||
.collect::<Vec<InternalEvent>>();
|
||||
|
||||
// 2. Spawn thread to subscribe to incoming events and check reply ids
|
||||
let server = self.clone();
|
||||
let send_events_fut = {
|
||||
let events_to_send = events_to_send.clone();
|
||||
tokio::spawn(async move {
|
||||
let (rx_id, mut rx) = server.subscribe().await;
|
||||
let mut found_events = Vec::new();
|
||||
|
||||
while let Some(event) = rx.recv().await {
|
||||
if events_to_send
|
||||
.iter()
|
||||
.find(|e| Some(e.id.to_owned()) == event.reply_id)
|
||||
.is_some()
|
||||
{
|
||||
found_events.push(event.clone());
|
||||
};
|
||||
if found_events.len() == events_to_send.len() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
server.unsubscribe(rx_id).await;
|
||||
|
||||
found_events
|
||||
})
|
||||
};
|
||||
|
||||
// 3. Send the events
|
||||
for event in events_to_send {
|
||||
let plugin = plugins
|
||||
.iter()
|
||||
.find(|p| p.ref_id == event.plugin_ref_id)
|
||||
.expect("Didn't find plugin in list");
|
||||
plugin.send(&event).await?
|
||||
}
|
||||
|
||||
// 4. Join on the spawned thread
|
||||
let events = send_events_fut.await.expect("Thread didn't succeed");
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
pub async fn send(&self, payload: InternalEventPayload) -> Result<Vec<InternalEvent>> {
|
||||
let mut events: Vec<InternalEvent> = Vec::new();
|
||||
let plugins = self.plugin_ref_to_plugin.lock().await;
|
||||
if plugins.is_empty() {
|
||||
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
|
||||
}
|
||||
|
||||
for ph in plugins.values() {
|
||||
let event = ph.build_event_to_send(&payload, None);
|
||||
self.send_to_plugin_handle(ph, &event).await?;
|
||||
events.push(event);
|
||||
}
|
||||
|
||||
Ok(events)
|
||||
}
|
||||
|
||||
async fn send_to_plugin_handle(
|
||||
&self,
|
||||
plugin: &PluginHandle,
|
||||
event: &InternalEvent,
|
||||
) -> Result<()> {
|
||||
plugin.send(event).await
|
||||
}
|
||||
|
||||
async fn load_plugins(
|
||||
&self,
|
||||
to_plugin_tx: mpsc::Sender<tonic::Result<EventStreamEvent>>,
|
||||
plugin_dirs: Vec<String>,
|
||||
) -> Vec<String> {
|
||||
let mut plugin_ids = Vec::new();
|
||||
|
||||
for dir in plugin_dirs {
|
||||
let plugin = self.add_plugin(dir.as_str(), to_plugin_tx.clone()).await;
|
||||
plugin_ids.push(plugin.clone().ref_id);
|
||||
|
||||
let event = plugin.build_event_to_send(
|
||||
&InternalEventPayload::BootRequest(BootRequest {
|
||||
dir: dir.to_string(),
|
||||
}),
|
||||
None,
|
||||
);
|
||||
if let Err(e) = plugin.send(&event).await {
|
||||
// TODO: Error handling
|
||||
println!(
|
||||
"Failed boot plugin {} at {} -> {}",
|
||||
plugin.ref_id, plugin.dir, e
|
||||
)
|
||||
} else {
|
||||
println!("Loaded plugin {} at {}", plugin.ref_id, plugin.dir)
|
||||
}
|
||||
}
|
||||
|
||||
plugin_ids
|
||||
}
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl PluginRuntime for PluginRuntimeGrpcServer {
|
||||
type EventStreamStream = ResponseStream;
|
||||
|
||||
async fn event_stream(
|
||||
&self,
|
||||
req: Request<Streaming<EventStreamEvent>>,
|
||||
) -> tonic::Result<Response<Self::EventStreamStream>> {
|
||||
let mut in_stream = req.into_inner();
|
||||
|
||||
let (to_plugin_tx, to_plugin_rx) = mpsc::channel(128);
|
||||
|
||||
let plugin_ids = self
|
||||
.load_plugins(to_plugin_tx, self.plugin_dirs.clone())
|
||||
.await;
|
||||
|
||||
let callbacks = self.callback_to_plugin_ref.clone();
|
||||
let server = self.clone();
|
||||
tokio::spawn(async move {
|
||||
while let Some(result) = in_stream.next().await {
|
||||
match result {
|
||||
Ok(v) => {
|
||||
let event: InternalEvent = match serde_json::from_str(v.event.as_str()) {
|
||||
Ok(pe) => pe,
|
||||
Err(e) => {
|
||||
println!("Failed to deserialize event {e:?} -> {}", v.event);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let plugin_ref_id = event.plugin_ref_id.clone();
|
||||
let reply_id = event.reply_id.clone();
|
||||
|
||||
let subscribers = server.subscribers.lock().await;
|
||||
for tx in subscribers.values() {
|
||||
// Emit event to the channel for server to handle
|
||||
if let Err(e) = tx.try_send(event.clone()) {
|
||||
println!("Failed to send to server channel. Receiver probably isn't listening: {:?}", e);
|
||||
}
|
||||
}
|
||||
|
||||
// Add to callbacks if there's a reply_id
|
||||
if let Some(reply_id) = reply_id {
|
||||
callbacks.lock().await.insert(reply_id, plugin_ref_id);
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
// TODO: Better error handling
|
||||
println!("gRPC server error {err}");
|
||||
break;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
server.remove_plugins(plugin_ids).await;
|
||||
});
|
||||
|
||||
// Write the same data that was received
|
||||
let out_stream = ReceiverStream::new(to_plugin_rx);
|
||||
|
||||
Ok(Response::new(
|
||||
Box::pin(out_stream) as Self::EventStreamStream
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
fn gen_id() -> String {
|
||||
Alphanumeric.sample_string(&mut rand::thread_rng(), 5)
|
||||
}
|
||||
Reference in New Issue
Block a user