Start on plugin ctx API (#64)

This commit is contained in:
Gregory Schier
2024-08-14 06:42:54 -07:00
committed by GitHub
parent e47a2c5fab
commit 12f4c2c668
106 changed files with 1086 additions and 1219 deletions

View File

@@ -1,7 +1,10 @@
use serde::{Deserialize, Serialize};
use ts_rs::TS;
use yaak_models::models::{CookieJar, Environment, Folder, GrpcConnection, GrpcEvent, GrpcRequest, HttpRequest, HttpResponse, KeyValue, Settings, Workspace};
use yaak_models::models::{
CookieJar, Environment, Folder, GrpcConnection, GrpcEvent, GrpcRequest, HttpRequest,
HttpResponse, KeyValue, Settings, Workspace,
};
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
@@ -20,12 +23,22 @@ pub struct InternalEvent {
pub enum InternalEventPayload {
BootRequest(BootRequest),
BootResponse(BootResponse),
ImportRequest(ImportRequest),
ImportResponse(ImportResponse),
FilterRequest(FilterRequest),
FilterResponse(FilterResponse),
ExportHttpRequestRequest(ExportHttpRequestRequest),
ExportHttpRequestResponse(ExportHttpRequestResponse),
SendHttpRequestRequest(SendHttpRequestRequest),
SendHttpRequestResponse(SendHttpRequestResponse),
GetHttpRequestByIdRequest(GetHttpRequestByIdRequest),
GetHttpRequestByIdResponse(GetHttpRequestByIdResponse),
/// Returned when a plugin doesn't get run, just so the server
/// has something to listen for
EmptyResponse(EmptyResponse),
@@ -95,17 +108,33 @@ pub struct ExportHttpRequestResponse {
pub content: String,
}
// TODO: Migrate plugins to return this type
// #[derive(Debug, Clone, Serialize, Deserialize, TS)]
// #[serde(rename_all = "camelCase", untagged)]
// #[ts(export)]
// pub enum ExportableModel {
// Workspace(Workspace),
// Environment(Environment),
// Folder(Folder),
// HttpRequest(HttpRequest),
// GrpcRequest(GrpcRequest),
// }
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export)]
pub struct SendHttpRequestRequest {
pub http_request: HttpRequest,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export)]
pub struct SendHttpRequestResponse {
pub http_response: HttpResponse,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export)]
pub struct GetHttpRequestByIdRequest {
pub id: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]
#[ts(export)]
pub struct GetHttpRequestByIdResponse {
pub http_request: Option<HttpRequest>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, TS)]
#[serde(default, rename_all = "camelCase")]

View File

@@ -1,7 +1,7 @@
use crate::error::Result;
use crate::events::{
ExportHttpRequestRequest, ExportHttpRequestResponse, FilterRequest, FilterResponse,
ImportRequest, ImportResponse, InternalEventPayload,
ExportHttpRequestRequest, ExportHttpRequestResponse, FilterRequest, FilterResponse
, ImportRequest, ImportResponse, InternalEvent, InternalEventPayload,
};
use crate::error::Error::PluginErr;
@@ -10,6 +10,7 @@ use crate::plugin::start_server;
use crate::server::PluginRuntimeGrpcServer;
use std::time::Duration;
use tauri::{AppHandle, Runtime};
use tokio::sync::mpsc;
use tokio::sync::watch::Sender;
use yaak_models::models::HttpRequest;
@@ -35,14 +36,33 @@ impl PluginManager {
PluginManager { kill_tx, server }
}
pub async fn cleanup(&mut self) {
pub async fn subscribe(&self) -> (String, mpsc::Receiver<InternalEvent>) {
self.server.subscribe().await
}
pub async fn unsubscribe(&self, rx_id: &str) {
self.server.unsubscribe(rx_id).await
}
pub async fn cleanup(&self) {
self.kill_tx.send_replace(true);
// Give it a bit of time to kill
tokio::time::sleep(Duration::from_millis(500)).await;
}
pub async fn run_import(&mut self, content: &str) -> Result<(ImportResponse, String)> {
pub async fn reply(
&self,
source_event: &InternalEvent,
payload: &InternalEventPayload,
) -> Result<()> {
let reply_id = Some(source_event.clone().id);
self.server
.send(&payload, source_event.plugin_ref_id.as_str(), reply_id)
.await
}
pub async fn run_import(&self, content: &str) -> Result<(ImportResponse, String)> {
let reply_events = self
.server
.send_and_wait(&InternalEventPayload::ImportRequest(ImportRequest {
@@ -67,7 +87,7 @@ impl PluginManager {
}
pub async fn run_export_curl(
&mut self,
&self,
request: &HttpRequest,
) -> Result<ExportHttpRequestResponse> {
let event = self
@@ -90,7 +110,7 @@ impl PluginManager {
}
pub async fn run_filter(
&mut self,
&self,
filter: &str,
content: &str,
content_type: &str,

View File

@@ -13,7 +13,6 @@ use tauri::plugin::{Builder, TauriPlugin};
use tauri::{Manager, RunEvent, Runtime, State};
use tokio::fs::read_dir;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tonic::codegen::tokio_stream;
use tonic::transport::Server;
@@ -30,8 +29,7 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
.await
.expect("Failed to read plugins dir");
let manager = PluginManager::new(&app, plugin_dirs).await;
let manager_state = Mutex::new(manager);
app.manage(manager_state);
app.manage(manager);
Ok(())
})
})
@@ -41,8 +39,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
api.prevent_exit();
tauri::async_runtime::block_on(async move {
info!("Exiting plugin runtime due to app exit");
let manager: State<Mutex<PluginManager>> = app.state();
manager.lock().await.cleanup().await;
let manager: State<PluginManager> = app.state();
manager.cleanup().await;
exit(0);
});
}
@@ -79,7 +77,7 @@ pub async fn start_server(
_ => {}
};
}
server.unsubscribe(rx_id).await;
server.unsubscribe(rx_id.as_str()).await;
});
};

View File

@@ -95,8 +95,8 @@ impl PluginRuntimeGrpcServer {
(id, rx)
}
pub async fn unsubscribe(&self, rx_id: String) {
self.subscribers.lock().await.remove(rx_id.as_str());
pub async fn unsubscribe(&self, rx_id: &str) {
self.subscribers.lock().await.remove(rx_id);
}
pub async fn remove_plugins(&self, plugin_ids: Vec<String>) {
@@ -214,6 +214,13 @@ impl PluginRuntimeGrpcServer {
let msg = format!("Failed to find plugin for {plugin_name}");
Err(PluginNotFoundErr(msg))
}
pub async fn send(&self, payload: &InternalEventPayload, plugin_ref_id: &str, reply_id: Option<String>)-> Result<()> {
let plugin = self.plugin_by_ref_id(plugin_ref_id).await?;
let event = plugin.build_event_to_send(payload, reply_id);
plugin.send(&event).await
}
pub async fn send_to_plugin(
&self,
@@ -301,7 +308,7 @@ impl PluginRuntimeGrpcServer {
break;
}
}
server.unsubscribe(rx_id).await;
server.unsubscribe(rx_id.as_str()).await;
found_events
})
@@ -321,30 +328,6 @@ impl PluginRuntimeGrpcServer {
Ok(events)
}
pub async fn send(&self, payload: InternalEventPayload) -> Result<Vec<InternalEvent>> {
let mut events: Vec<InternalEvent> = Vec::new();
let plugins = self.plugin_ref_to_plugin.lock().await;
if plugins.is_empty() {
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
}
for ph in plugins.values() {
let event = ph.build_event_to_send(&payload, None);
self.send_to_plugin_handle(ph, &event).await?;
events.push(event);
}
Ok(events)
}
async fn send_to_plugin_handle(
&self,
plugin: &PluginHandle,
event: &InternalEvent,
) -> Result<()> {
plugin.send(event).await
}
async fn load_plugins(
&self,
to_plugin_tx: mpsc::Sender<tonic::Result<EventStreamEvent>>,