Use process group for node runtime

This commit is contained in:
Gregory Schier
2024-07-24 10:14:40 -07:00
parent 9d9b855c04
commit 6ffb0015c6
9 changed files with 214 additions and 99 deletions

View File

@@ -4,8 +4,8 @@ package yaak.plugins.runtime;
service PluginRuntime {
rpc hookImport (HookImportRequest) returns (HookResponse);
rpc hookResponseFilter (HookResponseFilterRequest) returns (HookResponse);
rpc hookExport (HookExportRequest) returns (HookResponse);
rpc hookResponseFilter (HookResponseFilterRequest) returns (HookResponse);
}
message PluginInfo {

167
src-tauri/Cargo.lock generated
View File

@@ -367,7 +367,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf"
dependencies = [
"async-trait",
"axum-core",
"axum-core 0.3.4",
"bitflags 1.3.2",
"bytes",
"futures-util",
@@ -388,6 +388,33 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum"
version = "0.7.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf"
dependencies = [
"async-trait",
"axum-core 0.4.3",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"itoa 1.0.11",
"matchit",
"memchr",
"mime",
"percent-encoding",
"pin-project-lite",
"rustversion",
"serde",
"sync_wrapper 1.0.1",
"tower",
"tower-layer",
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.3.4"
@@ -405,6 +432,26 @@ dependencies = [
"tower-service",
]
[[package]]
name = "axum-core"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3"
dependencies = [
"async-trait",
"bytes",
"futures-util",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"mime",
"pin-project-lite",
"rustversion",
"sync_wrapper 0.1.2",
"tower-layer",
"tower-service",
]
[[package]]
name = "backtrace"
version = "0.3.72"
@@ -833,6 +880,16 @@ dependencies = [
"memchr",
]
[[package]]
name = "command-group"
version = "5.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a68fa787550392a9d58f44c21a3022cfb3ea3e2458b7f85d3b399d0ceeccf409"
dependencies = [
"nix",
"winapi",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@@ -1980,9 +2037,9 @@ dependencies = [
"hyper-rustls 0.24.2",
"log",
"md5",
"prost",
"prost 0.12.6",
"prost-reflect",
"prost-types",
"prost-types 0.12.6",
"serde",
"serde_json",
"tauri",
@@ -2310,6 +2367,7 @@ dependencies = [
"http 1.1.0",
"http-body 1.0.0",
"httparse",
"httpdate",
"itoa 1.0.11",
"pin-project-lite",
"smallvec",
@@ -2363,6 +2421,19 @@ dependencies = [
"tokio-io-timeout",
]
[[package]]
name = "hyper-timeout"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793"
dependencies = [
"hyper 1.3.1",
"hyper-util",
"pin-project-lite",
"tokio",
"tower-service",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
@@ -3802,8 +3873,9 @@ name = "plugin_runtime"
version = "0.1.0"
dependencies = [
"anyhow",
"command-group",
"log",
"prost",
"prost 0.13.1",
"rand 0.8.5",
"reqwest",
"serde",
@@ -3811,7 +3883,7 @@ dependencies = [
"tauri",
"tauri-plugin-shell",
"tokio",
"tonic 0.11.0",
"tonic 0.12.1",
"tonic-build",
]
@@ -3937,14 +4009,24 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "deb1435c188b76130da55f17a466d252ff7b1418b2ad3e037d127b94e3411f29"
dependencies = [
"bytes",
"prost-derive",
"prost-derive 0.12.6",
]
[[package]]
name = "prost"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e13db3d3fde688c61e2446b4d843bc27a7e8af269a69440c0308021dc92333cc"
dependencies = [
"bytes",
"prost-derive 0.13.1",
]
[[package]]
name = "prost-build"
version = "0.12.6"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4"
checksum = "5bb182580f71dd070f88d01ce3de9f4da5021db7115d2e1c3605a754153b77c1"
dependencies = [
"bytes",
"heck 0.5.0",
@@ -3954,8 +4036,8 @@ dependencies = [
"once_cell",
"petgraph",
"prettyplease",
"prost",
"prost-types",
"prost 0.13.1",
"prost-types 0.13.1",
"regex",
"syn 2.0.66",
"tempfile",
@@ -3974,6 +4056,19 @@ dependencies = [
"syn 2.0.66",
]
[[package]]
name = "prost-derive"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "18bec9b0adc4eba778b33684b7ba3e7137789434769ee3ce3930463ef904cfca"
dependencies = [
"anyhow",
"itertools",
"proc-macro2",
"quote",
"syn 2.0.66",
]
[[package]]
name = "prost-reflect"
version = "0.12.0"
@@ -3982,9 +4077,9 @@ checksum = "057237efdb71cf4b3f9396302a3d6599a92fa94063ba537b66130980ea9909f3"
dependencies = [
"base64 0.21.7",
"once_cell",
"prost",
"prost 0.12.6",
"prost-reflect-derive",
"prost-types",
"prost-types 0.12.6",
"serde",
"serde-value",
]
@@ -4006,7 +4101,16 @@ version = "0.12.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9091c90b0a32608e984ff2fa4091273cbdd755d54935c51d520887f4a1dbd5b0"
dependencies = [
"prost",
"prost 0.12.6",
]
[[package]]
name = "prost-types"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cee5168b05f49d4b0ca581206eb14a7b22fafd963efe729ac48eb03266e25cc2"
dependencies = [
"prost 0.13.1",
]
[[package]]
@@ -6204,17 +6308,17 @@ checksum = "d560933a0de61cf715926b9cac824d4c883c2c43142f787595e48280c40a1d0e"
dependencies = [
"async-stream",
"async-trait",
"axum",
"axum 0.6.20",
"base64 0.21.7",
"bytes",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.29",
"hyper-timeout",
"hyper-timeout 0.4.1",
"percent-encoding",
"pin-project",
"prost",
"prost 0.12.6",
"tokio",
"tokio-stream",
"tower",
@@ -6225,23 +6329,26 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76c4eb7a4e9ef9d4763600161f12f5070b92a578e1b634db88a6887844c91a13"
checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401"
dependencies = [
"async-stream",
"async-trait",
"axum",
"base64 0.21.7",
"axum 0.7.5",
"base64 0.22.1",
"bytes",
"h2 0.3.26",
"http 0.2.12",
"http-body 0.4.6",
"hyper 0.14.29",
"hyper-timeout",
"h2 0.4.5",
"http 1.1.0",
"http-body 1.0.0",
"http-body-util",
"hyper 1.3.1",
"hyper-timeout 0.5.1",
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"prost 0.13.1",
"socket2",
"tokio",
"tokio-stream",
"tower",
@@ -6252,9 +6359,9 @@ dependencies = [
[[package]]
name = "tonic-build"
version = "0.11.0"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2"
checksum = "568392c5a2bd0020723e3f387891176aabafe36fd9fcd074ad309dfa0c8eb964"
dependencies = [
"prettyplease",
"proc-macro2",
@@ -6269,8 +6376,8 @@ version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fa37c513df1339d197f4ba21d28c918b9ef1ac1768265f11ecb6b7f1cba1b76"
dependencies = [
"prost",
"prost-types",
"prost 0.12.6",
"prost-types 0.12.6",
"tokio",
"tokio-stream",
"tonic 0.10.2",

View File

@@ -29,16 +29,6 @@
]
},
"shell:allow-open",
{
"identifier": "shell:allow-execute",
"allow": [
{
"name": "protoc",
"sidecar": true,
"args": true
}
]
},
"webview:allow-set-webview-zoom",
"window:allow-close",
"window:allow-is-fullscreen",

View File

@@ -1 +1 @@
{"main":{"identifier":"main","description":"Main permissions","local":true,"windows":["*"],"permissions":["os:allow-os-type","event:allow-emit","clipboard-manager:allow-write-text","clipboard-manager:allow-read-text","dialog:allow-open","dialog:allow-save","event:allow-listen","event:allow-unlisten","fs:allow-read-file","fs:allow-read-text-file",{"identifier":"fs:scope","allow":[{"path":"$APPDATA"},{"path":"$APPDATA/**"}]},"shell:allow-open",{"identifier":"shell:allow-execute","allow":[{"args":true,"name":"protoc","sidecar":true}]},"webview:allow-set-webview-zoom","window:allow-close","window:allow-is-fullscreen","window:allow-maximize","window:allow-minimize","window:allow-toggle-maximize","window:allow-set-decorations","window:allow-set-title","window:allow-start-dragging","window:allow-unmaximize","window:allow-theme","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}}
{"main":{"identifier":"main","description":"Main permissions","local":true,"windows":["*"],"permissions":["os:allow-os-type","event:allow-emit","clipboard-manager:allow-write-text","clipboard-manager:allow-read-text","dialog:allow-open","dialog:allow-save","event:allow-listen","event:allow-unlisten","fs:allow-read-file","fs:allow-read-text-file",{"identifier":"fs:scope","allow":[{"path":"$APPDATA"},{"path":"$APPDATA/**"}]},"shell:allow-open","webview:allow-set-webview-zoom","window:allow-close","window:allow-is-fullscreen","window:allow-maximize","window:allow-minimize","window:allow-toggle-maximize","window:allow-set-decorations","window:allow-set-title","window:allow-start-dragging","window:allow-unmaximize","window:allow-theme","clipboard-manager:allow-read-text","clipboard-manager:allow-write-text"]}}

View File

@@ -30,10 +30,6 @@ use tauri_plugin_log::{fern, Target, TargetKind};
use tauri_plugin_shell::ShellExt;
use tokio::sync::Mutex;
use ::grpc::manager::{DynamicMessage, GrpcHandle};
use ::grpc::{deserialize_message, serialize_message, Code, ServiceDefinition};
use plugin_runtime::manager::PluginManager;
use crate::analytics::{AnalyticsAction, AnalyticsResource};
use crate::grpc::metadata_to_map;
use crate::http_request::send_http_request;
@@ -58,6 +54,9 @@ use crate::notifications::YaakNotifier;
use crate::render::{render_request, variables_from_environment};
use crate::updates::{UpdateMode, YaakUpdater};
use crate::window_menu::app_menu;
use ::grpc::manager::{DynamicMessage, GrpcHandle};
use ::grpc::{deserialize_message, serialize_message, Code, ServiceDefinition};
use plugin_runtime::manager::PluginManager;
mod analytics;
mod grpc;
@@ -717,6 +716,7 @@ async fn cmd_send_ephemeral_request(
async fn cmd_filter_response(
w: WebviewWindow,
response_id: &str,
plugin_manager: State<'_, Mutex<PluginManager>>,
filter: &str,
) -> Result<String, String> {
let response = get_http_response(&w, response_id)
@@ -738,9 +738,9 @@ async fn cmd_filter_response(
let body = read_to_string(response.body_path.unwrap()).unwrap();
// TODO: Have plugins register their own content type (regex?)
let manager: State<PluginManager> = w.app_handle().state();
manager
.inner()
plugin_manager
.lock()
.await
.run_response_filter(filter, &body, &content_type)
.await
.map(|r| r.data)
@@ -749,14 +749,18 @@ async fn cmd_filter_response(
#[tauri::command]
async fn cmd_import_data(
w: WebviewWindow,
plugin_manager: State<'_, Mutex<PluginManager>>,
file_path: &str,
_workspace_id: &str,
) -> Result<WorkspaceExportResources, String> {
let file =
read_to_string(file_path).unwrap_or_else(|_| panic!("Unable to read file {}", file_path));
let file_contents = file.as_str();
let manager: State<PluginManager> = w.app_handle().state();
let import_response = manager.inner().run_import(file_contents).await?;
let import_response = plugin_manager
.lock()
.await
.run_import(file_contents)
.await?;
let import_result: ImportResult =
serde_json::from_str(import_response.data.as_str()).map_err(|e| e.to_string())?;
@@ -881,6 +885,7 @@ async fn cmd_import_data(
async fn cmd_request_to_curl(
app: AppHandle,
request_id: &str,
plugin_manager: State<'_, Mutex<PluginManager>>,
environment_id: Option<&str>,
) -> Result<String, String> {
let request = get_http_request(&app, request_id)
@@ -896,9 +901,9 @@ async fn cmd_request_to_curl(
let rendered = render_request(&request, &workspace, environment.as_ref());
let request_json = serde_json::to_string(&rendered).map_err(|e| e.to_string())?;
let manager: State<PluginManager> = app.state();
let import_response = manager
.inner()
let import_response = plugin_manager
.lock()
.await
.run_export_curl(request_json.as_str())
.await?;
Ok(import_response.data)
@@ -906,12 +911,11 @@ async fn cmd_request_to_curl(
#[tauri::command]
async fn cmd_curl_to_request(
app_handle: AppHandle,
command: &str,
plugin_manager: State<'_, Mutex<PluginManager>>,
workspace_id: &str,
) -> Result<HttpRequest, String> {
let manager: State<PluginManager> = app_handle.state();
let import_response = manager.inner().run_import(command).await?;
let import_response = plugin_manager.lock().await.run_import(command).await?;
let import_result: ImportResult =
serde_json::from_str(import_response.data.as_str()).map_err(|e| e.to_string())?;
import_result
@@ -1759,7 +1763,6 @@ pub fn run() {
});
}
RunEvent::WindowEvent {
label: _label,
event: WindowEvent::Focused(true),
..
} => {

View File

@@ -5,8 +5,9 @@ edition = "2021"
[dependencies]
anyhow = "1.0.86"
command-group = "5.0.1"
log = "0.4.21"
prost = "0.12"
prost = "0.13.1"
rand = "0.8.5"
reqwest = { version = "0.12.5", features = ["stream"] }
serde = { version = "1.0.198", features = ["derive"] }
@@ -14,7 +15,7 @@ serde_json = "1.0.113"
tauri = { version = "2.0.0-beta" }
tauri-plugin-shell = "2.0.0-beta"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "process"] }
tonic = "0.11"
tonic = "0.12.1"
[build-dependencies]
tonic-build = "0.11"
tonic-build = "0.12.1"

View File

@@ -1,9 +1,10 @@
extern crate core;
use tauri::plugin::{Builder, TauriPlugin};
use tauri::{Manager, Runtime};
use log::info;
use crate::manager::PluginManager;
use tauri::plugin::{Builder, TauriPlugin};
use tauri::{Manager, RunEvent, Runtime, State};
use tokio::sync::Mutex;
pub mod manager;
mod nodejs;
@@ -17,9 +18,20 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
.setup(|app, _| {
tauri::async_runtime::block_on(async move {
let manager = PluginManager::new(&app).await;
app.manage(manager);
let manager_state = Mutex::new(manager);
app.manage(manager_state);
Ok(())
})
})
.on_event(|app, e| match e {
RunEvent::ExitRequested { code, .. } => {
tauri::async_runtime::block_on(async move {
info!("Exiting plugin runtime because of app exit {:?}", code);
let manager: State<Mutex<PluginManager>> = app.state();
manager.lock().await.cleanup();
});
}
_ => {}
})
.build()
}

View File

@@ -1,24 +1,27 @@
use command_group::GroupChild;
use log::{debug, info};
use tauri::{AppHandle, Manager, Runtime};
use tokio::sync::Mutex;
use tonic::transport::Channel;
use crate::nodejs::node_start;
use crate::plugin_runtime::plugin_runtime_client::PluginRuntimeClient;
use crate::plugin_runtime::{HookExportRequest, HookImportRequest, HookResponse, HookResponseFilterRequest};
use crate::plugin_runtime::{
HookExportRequest, HookImportRequest, HookResponse, HookResponseFilterRequest,
};
pub struct PluginManager {
client: Mutex<PluginRuntimeClient<Channel>>,
client: PluginRuntimeClient<Channel>,
child: GroupChild,
}
impl PluginManager {
pub async fn new<R: Runtime>(app_handle: &AppHandle<R>) -> PluginManager {
let temp_dir = app_handle.path().temp_dir().unwrap();
let addr = node_start(app_handle, &temp_dir).await;
info!("Connecting to gRPC client at {addr}");
let start_resp = node_start(app_handle, &temp_dir).await;
info!("Connecting to gRPC client at {}", start_resp.addr);
let client = match PluginRuntimeClient::connect(addr.clone()).await {
let client = match PluginRuntimeClient::connect(start_resp.addr.clone()).await {
Ok(v) => v,
Err(err) => {
panic!("{}", err.to_string());
@@ -26,15 +29,19 @@ impl PluginManager {
};
PluginManager {
client: Mutex::new(client),
client,
child: start_resp.child,
}
}
pub async fn run_import(&self, data: &str) -> Result<HookResponse, String> {
pub fn cleanup(&mut self) {
info!("Cleaning up NodeJS process");
self.child.kill().unwrap();
}
pub async fn run_import(&mut self, data: &str) -> Result<HookResponse, String> {
let response = self
.client
.lock()
.await
.hook_import(tonic::Request::new(HookImportRequest {
data: data.to_string(),
}))
@@ -44,11 +51,9 @@ impl PluginManager {
Ok(response.into_inner())
}
pub async fn run_export_curl(&self, request: &str) -> Result<HookResponse, String> {
pub async fn run_export_curl(&mut self, request: &str) -> Result<HookResponse, String> {
let response = self
.client
.lock()
.await
.hook_export(tonic::Request::new(HookExportRequest {
request: request.to_string(),
}))
@@ -59,7 +64,7 @@ impl PluginManager {
}
pub async fn run_response_filter(
&self,
&mut self,
filter: &str,
body: &str,
content_type: &str,
@@ -67,8 +72,6 @@ impl PluginManager {
debug!("Running plugin filter");
let response = self
.client
.lock()
.await
.hook_response_filter(tonic::Request::new(HookResponseFilterRequest {
filter: filter.to_string(),
body: body.to_string(),

View File

@@ -1,13 +1,14 @@
use std::path::PathBuf;
use std::process::Command;
use std::time::Duration;
use command_group::{CommandGroup, GroupChild};
use log::{debug, info};
use rand::distributions::{Alphanumeric, DistString};
use serde;
use serde::Deserialize;
use tauri::path::BaseDirectory;
use tauri::{AppHandle, Manager, Runtime};
use tauri_plugin_shell::process::CommandEvent;
use tauri::path::BaseDirectory;
use tauri_plugin_shell::ShellExt;
use tokio::fs;
@@ -17,7 +18,12 @@ struct PortFile {
port: i32,
}
pub async fn node_start<R: Runtime>(app: &AppHandle<R>, temp_dir: &PathBuf) -> String {
pub struct StartResp {
pub addr: String,
pub child: GroupChild,
}
pub async fn node_start<R: Runtime>(app: &AppHandle<R>, temp_dir: &PathBuf) -> StartResp {
let port_file_path = temp_dir.join(Alphanumeric.sample_string(&mut rand::thread_rng(), 10));
let plugins_dir = app
@@ -42,26 +48,17 @@ pub async fn node_start<R: Runtime>(app: &AppHandle<R>, temp_dir: &PathBuf) -> S
plugin_runtime_dir.to_string_lossy(),
);
let (mut rx, _child) = app
let cmd = app
.shell()
.sidecar("yaaknode")
.expect("yaaknode not found")
.env("YAAK_GRPC_PORT_FILE_PATH", port_file_path.clone())
.env("YAAK_PLUGINS_DIR", plugins_dir)
.args(&[plugin_runtime_dir.join("index.cjs")])
.spawn()
.expect("yaaknode failed to start");
.args(&[plugin_runtime_dir.join("index.cjs")]);
tauri::async_runtime::spawn(async move {
// read events such as stdout
while let Some(event) = rx.recv().await {
if let CommandEvent::Stdout(line) = event {
print!("{}", String::from_utf8_lossy(line.as_slice()));
} else if let CommandEvent::Stderr(line) = event {
print!("{}", String::from_utf8_lossy(line.as_slice()));
}
}
});
let child = Command::from(cmd)
.group_spawn()
.expect("yaaknode failed to start");
let start = std::time::Instant::now();
let port_file_contents = loop {
@@ -80,5 +77,7 @@ pub async fn node_start<R: Runtime>(app: &AppHandle<R>, temp_dir: &PathBuf) -> S
let port_file: PortFile = serde_json::from_str(port_file_contents.as_str()).unwrap();
info!("Started plugin runtime on :{}", port_file.port);
format!("http://localhost:{}", port_file.port)
let addr = format!("http://localhost:{}", port_file.port);
StartResp { addr, child }
}