Start on plugin ctx API (#64)

This commit is contained in:
Gregory Schier
2024-08-14 06:42:54 -07:00
committed by GitHub
parent 5eef910b8c
commit c484dd4041
106 changed files with 1086 additions and 1219 deletions

View File

@@ -95,8 +95,8 @@ impl PluginRuntimeGrpcServer {
(id, rx)
}
pub async fn unsubscribe(&self, rx_id: String) {
self.subscribers.lock().await.remove(rx_id.as_str());
pub async fn unsubscribe(&self, rx_id: &str) {
self.subscribers.lock().await.remove(rx_id);
}
pub async fn remove_plugins(&self, plugin_ids: Vec<String>) {
@@ -214,6 +214,13 @@ impl PluginRuntimeGrpcServer {
let msg = format!("Failed to find plugin for {plugin_name}");
Err(PluginNotFoundErr(msg))
}
pub async fn send(&self, payload: &InternalEventPayload, plugin_ref_id: &str, reply_id: Option<String>)-> Result<()> {
let plugin = self.plugin_by_ref_id(plugin_ref_id).await?;
let event = plugin.build_event_to_send(payload, reply_id);
plugin.send(&event).await
}
pub async fn send_to_plugin(
&self,
@@ -301,7 +308,7 @@ impl PluginRuntimeGrpcServer {
break;
}
}
server.unsubscribe(rx_id).await;
server.unsubscribe(rx_id.as_str()).await;
found_events
})
@@ -321,30 +328,6 @@ impl PluginRuntimeGrpcServer {
Ok(events)
}
pub async fn send(&self, payload: InternalEventPayload) -> Result<Vec<InternalEvent>> {
let mut events: Vec<InternalEvent> = Vec::new();
let plugins = self.plugin_ref_to_plugin.lock().await;
if plugins.is_empty() {
return Err(NoPluginsErr("Send failed because no plugins exist".into()));
}
for ph in plugins.values() {
let event = ph.build_event_to_send(&payload, None);
self.send_to_plugin_handle(ph, &event).await?;
events.push(event);
}
Ok(events)
}
async fn send_to_plugin_handle(
&self,
plugin: &PluginHandle,
event: &InternalEvent,
) -> Result<()> {
plugin.send(event).await
}
async fn load_plugins(
&self,
to_plugin_tx: mpsc::Sender<tonic::Result<EventStreamEvent>>,