Queries now use AppHandle instead of Window (#189)

This commit is contained in:
Gregory Schier
2025-03-20 09:43:14 -07:00
committed by GitHub
parent cf8f8743bb
commit 4c4eaba7d2
19 changed files with 1063 additions and 740 deletions

View File

@@ -1,8 +1,7 @@
use tauri::{Manager, Runtime, WebviewWindow};
use tauri::{AppHandle, Runtime};
use yaak_models::queries::{
get_key_value_int, get_key_value_string,
set_key_value_int, set_key_value_string, UpdateSource,
get_key_value_int, get_key_value_string, set_key_value_int, set_key_value_string, UpdateSource,
};
const NAMESPACE: &str = "analytics";
@@ -16,14 +15,15 @@ pub struct LaunchEventInfo {
pub num_launches: i32,
}
pub async fn store_launch_history<R: Runtime>(w: &WebviewWindow<R>) -> LaunchEventInfo {
pub async fn store_launch_history<R: Runtime>(app_handle: &AppHandle<R>) -> LaunchEventInfo {
let last_tracked_version_key = "last_tracked_version";
let mut info = LaunchEventInfo::default();
info.num_launches = get_num_launches(w).await + 1;
info.previous_version = get_key_value_string(w, NAMESPACE, last_tracked_version_key, "").await;
info.current_version = w.package_info().version.to_string();
info.num_launches = get_num_launches(app_handle).await + 1;
info.previous_version =
get_key_value_string(app_handle, NAMESPACE, last_tracked_version_key, "").await;
info.current_version = app_handle.package_info().version.to_string();
if info.previous_version.is_empty() {
} else {
@@ -33,16 +33,22 @@ pub async fn store_launch_history<R: Runtime>(w: &WebviewWindow<R>) -> LaunchEve
// Update key values
set_key_value_string(
w,
app_handle,
NAMESPACE,
last_tracked_version_key,
info.current_version.as_str(),
&UpdateSource::Background,
)
.await;
set_key_value_int(w, NAMESPACE, NUM_LAUNCHES_KEY, info.num_launches, &UpdateSource::Background)
.await;
set_key_value_int(
app_handle,
NAMESPACE,
NUM_LAUNCHES_KEY,
info.num_launches,
&UpdateSource::Background,
)
.await;
info
}
@@ -59,6 +65,6 @@ pub fn get_os() -> &'static str {
}
}
pub async fn get_num_launches<R: Runtime>(w: &WebviewWindow<R>) -> i32 {
get_key_value_int(w, NAMESPACE, NUM_LAUNCHES_KEY, 0).await
pub async fn get_num_launches<R: Runtime>(app_handle: &AppHandle<R>) -> i32 {
get_key_value_int(app_handle, NAMESPACE, NUM_LAUNCHES_KEY, 0).await
}

View File

@@ -46,18 +46,22 @@ pub async fn send_http_request<R: Runtime>(
cookie_jar: Option<CookieJar>,
cancelled_rx: &mut Receiver<bool>,
) -> Result<HttpResponse> {
let plugin_manager = window.state::<PluginManager>();
let workspace = get_workspace(window, &unrendered_request.workspace_id).await?;
let base_environment = get_base_environment(window, &unrendered_request.workspace_id).await?;
let settings = get_or_create_settings(window).await;
let app_handle = window.app_handle().clone();
let plugin_manager = app_handle.state::<PluginManager>();
let workspace = get_workspace(&app_handle, &unrendered_request.workspace_id).await?;
let base_environment =
get_base_environment(&app_handle, &unrendered_request.workspace_id).await?;
let settings = get_or_create_settings(&app_handle).await;
let response_id = og_response.id.clone();
let response = Arc::new(Mutex::new(og_response.clone()));
let cb = PluginTemplateCallback::new(
window.app_handle(),
&WindowContext::from_window(window),
RenderPurpose::Send,
);
let response_id = og_response.id.clone();
let response = Arc::new(Mutex::new(og_response.clone()));
let update_source = UpdateSource::from_window(window);
let request = match render_http_request(
&unrendered_request,
@@ -68,7 +72,15 @@ pub async fn send_http_request<R: Runtime>(
.await
{
Ok(r) => r,
Err(e) => return Ok(response_err(&*response.lock().await, e.to_string(), window).await),
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
e.to_string(),
&update_source,
)
.await)
}
};
let mut url_string = request.url;
@@ -178,9 +190,10 @@ pub async fn send_http_request<R: Runtime>(
Ok(u) => u,
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
&update_source,
)
.await);
}
@@ -190,9 +203,10 @@ pub async fn send_http_request<R: Runtime>(
Ok(u) => u,
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
format!("Failed to parse URL \"{}\": {}", url_string, e.to_string()),
window,
&update_source,
)
.await);
}
@@ -296,7 +310,13 @@ pub async fn send_http_request<R: Runtime>(
request_builder = request_builder.body(f);
}
Err(e) => {
return Ok(response_err(&*response.lock().await, e, window).await);
return Ok(response_err(
&app_handle,
&*response.lock().await,
e,
&update_source,
)
.await);
}
}
} else if body_type == "multipart/form-data" && request_body.contains_key("form") {
@@ -323,9 +343,10 @@ pub async fn send_http_request<R: Runtime>(
Ok(f) => multipart::Part::bytes(f),
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
e.to_string(),
window,
&update_source,
)
.await);
}
@@ -340,9 +361,10 @@ pub async fn send_http_request<R: Runtime>(
Ok(p) => p,
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
format!("Invalid mime for multi-part entry {e:?}"),
window,
&update_source,
)
.await);
}
@@ -356,9 +378,10 @@ pub async fn send_http_request<R: Runtime>(
Ok(p) => p,
Err(e) => {
return Ok(response_err(
&app_handle,
&*response.lock().await,
format!("Invalid mime for multi-part entry {e:?}"),
window,
&update_source,
)
.await);
}
@@ -397,7 +420,13 @@ pub async fn send_http_request<R: Runtime>(
Ok(r) => r,
Err(e) => {
warn!("Failed to build request builder {e:?}");
return Ok(response_err(&*response.lock().await, e.to_string(), window).await);
return Ok(response_err(
&app_handle,
&*response.lock().await,
e.to_string(),
&update_source,
)
.await);
}
};
@@ -423,7 +452,13 @@ pub async fn send_http_request<R: Runtime>(
let plugin_result = match auth_result {
Ok(r) => r,
Err(e) => {
return Ok(response_err(&*response.lock().await, e.to_string(), window).await);
return Ok(response_err(
&app_handle,
&*response.lock().await,
e.to_string(),
&update_source,
)
.await);
}
};
@@ -449,21 +484,23 @@ pub async fn send_http_request<R: Runtime>(
Ok(r) = resp_rx => r,
_ = cancelled_rx.changed() => {
debug!("Request cancelled");
return Ok(response_err(&*response.lock().await, "Request was cancelled".to_string(), window).await);
return Ok(response_err(&app_handle, &*response.lock().await, "Request was cancelled".to_string(), &update_source).await);
}
};
{
let app_handle = app_handle.clone();
let window = window.clone();
let cancelled_rx = cancelled_rx.clone();
let response_id = response_id.clone();
let response = response.clone();
let update_source = update_source.clone();
tokio::spawn(async move {
match raw_response {
Ok(mut v) => {
let content_length = v.content_length();
let response_headers = v.headers().clone();
let dir = window.app_handle().path().app_data_dir().unwrap();
let dir = app_handle.path().app_data_dir().unwrap();
let base_dir = dir.join("responses");
create_dir_all(base_dir.clone()).await.expect("Failed to create responses dir");
let body_path = if response_id.is_empty() {
@@ -497,7 +534,7 @@ pub async fn send_http_request<R: Runtime>(
};
r.state = HttpResponseState::Connected;
update_response_if_id(&window, &r, &UpdateSource::Window)
update_response_if_id(&app_handle, &r, &update_source)
.await
.expect("Failed to update response after connected");
}
@@ -526,7 +563,7 @@ pub async fn send_http_request<R: Runtime>(
f.flush().await.expect("Failed to flush file");
written_bytes += bytes.len();
r.content_length = Some(written_bytes as i32);
update_response_if_id(&window, &r, &UpdateSource::Window)
update_response_if_id(&app_handle, &r, &update_source)
.await
.expect("Failed to update response");
}
@@ -534,7 +571,13 @@ pub async fn send_http_request<R: Runtime>(
break;
}
Err(e) => {
response_err(&*response.lock().await, e.to_string(), &window).await;
response_err(
&app_handle,
&*response.lock().await,
e.to_string(),
&update_source,
)
.await;
break;
}
}
@@ -548,7 +591,7 @@ pub async fn send_http_request<R: Runtime>(
None => Some(written_bytes as i32),
};
r.state = HttpResponseState::Closed;
update_response_if_id(&window, &r, &UpdateSource::Window)
update_response_if_id(&app_handle, &r, &UpdateSource::from_window(&window))
.await
.expect("Failed to update response");
};
@@ -574,8 +617,12 @@ pub async fn send_http_request<R: Runtime>(
})
.collect::<Vec<_>>();
cookie_jar.cookies = json_cookies;
if let Err(e) =
upsert_cookie_jar(&window, &cookie_jar, &UpdateSource::Window).await
if let Err(e) = upsert_cookie_jar(
&app_handle,
&cookie_jar,
&UpdateSource::from_window(&window),
)
.await
{
error!("Failed to update cookie jar: {}", e);
};
@@ -583,7 +630,13 @@ pub async fn send_http_request<R: Runtime>(
}
Err(e) => {
warn!("Failed to execute request {e}");
response_err(&*response.lock().await, format!("{e}{e:?}"), &window).await;
response_err(
&app_handle,
&*response.lock().await,
format!("{e}{e:?}"),
&update_source,
)
.await;
}
};
@@ -592,16 +645,17 @@ pub async fn send_http_request<R: Runtime>(
});
};
let app_handle = app_handle.clone();
Ok(tokio::select! {
Ok(r) = done_rx => r,
_ = cancelled_rx.changed() => {
match get_http_response(window, response_id.as_str()).await {
match get_http_response(&app_handle, response_id.as_str()).await {
Ok(mut r) => {
r.state = HttpResponseState::Closed;
update_response_if_id(&window, &r, &UpdateSource::Window).await.expect("Failed to update response")
update_response_if_id(&app_handle, &r, &UpdateSource::from_window(window)).await.expect("Failed to update response")
},
_ => {
response_err(&*response.lock().await, "Ephemeral request was cancelled".to_string(), &window).await
response_err(&app_handle, &*response.lock().await, "Ephemeral request was cancelled".to_string(), &update_source).await
}.clone(),
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -6,7 +6,7 @@ use log::debug;
use reqwest::Method;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tauri::{Emitter, Manager, Runtime, WebviewWindow};
use tauri::{AppHandle, Emitter, Manager, Runtime, WebviewWindow};
use yaak_models::queries::{get_key_value_raw, set_key_value_raw, UpdateSource};
// Check for updates every hour
@@ -43,16 +43,18 @@ impl YaakNotifier {
}
}
pub async fn seen<R: Runtime>(&mut self, w: &WebviewWindow<R>, id: &str) -> Result<(), String> {
let mut seen = get_kv(w).await?;
pub async fn seen<R: Runtime>(&mut self, window: &WebviewWindow<R>, id: &str) -> Result<(), String> {
let app_handle = window.app_handle();
let mut seen = get_kv(app_handle).await?;
seen.push(id.to_string());
debug!("Marked notification as seen {}", id);
let seen_json = serde_json::to_string(&seen).map_err(|e| e.to_string())?;
set_key_value_raw(w, KV_NAMESPACE, KV_KEY, seen_json.as_str(), &UpdateSource::Window).await;
set_key_value_raw(app_handle, KV_NAMESPACE, KV_KEY, seen_json.as_str(), &UpdateSource::from_window(window)).await;
Ok(())
}
pub async fn check<R: Runtime>(&mut self, window: &WebviewWindow<R>) -> Result<(), String> {
let app_handle = window.app_handle();
let ignore_check = self.last_check.elapsed().unwrap().as_secs() < MAX_UPDATE_CHECK_SECONDS;
if ignore_check {
@@ -61,8 +63,8 @@ impl YaakNotifier {
self.last_check = SystemTime::now();
let num_launches = get_num_launches(window).await;
let info = window.app_handle().package_info().clone();
let num_launches = get_num_launches(app_handle).await;
let info = app_handle.package_info().clone();
let req = reqwest::Client::default()
.request(Method::GET, "https://notify.yaak.app/notifications")
.query(&[
@@ -90,14 +92,14 @@ impl YaakNotifier {
for notification in notifications {
let age = notification.timestamp.signed_duration_since(Utc::now());
let seen = get_kv(window).await?;
let seen = get_kv(app_handle).await?;
if seen.contains(&notification.id) || (age > Duration::days(2)) {
debug!("Already seen notification {}", notification.id);
continue;
}
debug!("Got notification {:?}", notification);
let _ = window.emit_to(window.label(), "notification", notification.clone());
let _ = app_handle.emit_to(window.label(), "notification", notification.clone());
break; // Only show one notification
}
@@ -105,8 +107,8 @@ impl YaakNotifier {
}
}
async fn get_kv<R: Runtime>(w: &WebviewWindow<R>) -> Result<Vec<String>, String> {
match get_key_value_raw(w, "notifications", "seen").await {
async fn get_kv<R: Runtime>(app_handle: &AppHandle<R>) -> Result<Vec<String>, String> {
match get_key_value_raw(app_handle, "notifications", "seen").await {
None => Ok(Vec::new()),
Some(v) => serde_json::from_str(&v.value).map_err(|e| e.to_string()),
}

View File

@@ -18,8 +18,8 @@ use yaak_models::queries::{
use yaak_plugins::events::{
Color, DeleteKeyValueResponse, EmptyPayload, FindHttpResponsesResponse,
GetHttpRequestByIdResponse, GetKeyValueResponse, Icon, InternalEvent, InternalEventPayload,
RenderHttpRequestResponse, SendHttpRequestResponse, SetKeyValueResponse, ShowToastRequest,
TemplateRenderResponse, WindowContext, WindowNavigateEvent,
RenderHttpRequestResponse, RenderPurpose, SendHttpRequestResponse, SetKeyValueResponse,
ShowToastRequest, TemplateRenderResponse, WindowContext, WindowNavigateEvent,
};
use yaak_plugins::manager::PluginManager;
use yaak_plugins::plugin_handle::PluginHandle;
@@ -80,7 +80,7 @@ pub(crate) async fn handle_plugin_event<R: Runtime>(
.await
.expect("Failed to get workspace_id from window URL");
let environment = environment_from_window(&window).await;
let base_environment = get_base_environment(&window, workspace.id.as_str())
let base_environment = get_base_environment(app_handle, workspace.id.as_str())
.await
.expect("Failed to get base environment");
let cb = PluginTemplateCallback::new(app_handle, &window_context, req.purpose);
@@ -104,7 +104,7 @@ pub(crate) async fn handle_plugin_event<R: Runtime>(
.await
.expect("Failed to get workspace_id from window URL");
let environment = environment_from_window(&window).await;
let base_environment = get_base_environment(&window, workspace.id.as_str())
let base_environment = get_base_environment(app_handle, workspace.id.as_str())
.await
.expect("Failed to get base environment");
let cb = PluginTemplateCallback::new(app_handle, &window_context, req.purpose);
@@ -145,7 +145,7 @@ pub(crate) async fn handle_plugin_event<R: Runtime>(
updated_at: Utc::now().naive_utc(), // TODO: Add reloaded_at field to use instead
..plugin
};
upsert_plugin(&window, new_plugin, &UpdateSource::Plugin).await.unwrap();
upsert_plugin(app_handle, new_plugin, &UpdateSource::Plugin).await.unwrap();
}
let toast_event = plugin_handle.build_event_to_send(
&WindowContext::from_window(&window),
@@ -177,7 +177,7 @@ pub(crate) async fn handle_plugin_event<R: Runtime>(
HttpResponse::new()
} else {
create_default_http_response(
&window,
app_handle,
http_request.id.as_str(),
&UpdateSource::Plugin,
)

View File

@@ -81,11 +81,11 @@ pub async fn activate_license<R: Runtime>(
let body: ActivateLicenseResponsePayload = response.json().await?;
yaak_models::queries::set_key_value_string(
window,
window.app_handle(),
KV_ACTIVATION_ID_KEY,
KV_NAMESPACE,
body.activation_id.as_str(),
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await;
@@ -100,7 +100,8 @@ pub async fn deactivate_license<R: Runtime>(
window: &WebviewWindow<R>,
p: DeactivateLicenseRequestPayload,
) -> Result<()> {
let activation_id = get_activation_id(window).await;
let app_handle = window.app_handle();
let activation_id = get_activation_id(app_handle).await;
let client = reqwest::Client::new();
let path = format!("/licenses/activations/{}/deactivate", activation_id);
@@ -119,14 +120,14 @@ pub async fn deactivate_license<R: Runtime>(
}
yaak_models::queries::delete_key_value(
window,
app_handle,
KV_ACTIVATION_ID_KEY,
KV_NAMESPACE,
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await;
if let Err(e) = window.emit("license-deactivated", true) {
if let Err(e) = app_handle.emit("license-deactivated", true) {
warn!("Failed to emit deactivate-license event: {}", e);
}
@@ -143,7 +144,10 @@ pub enum LicenseCheckStatus {
Trialing { end: NaiveDateTime },
}
pub async fn check_license<R: Runtime>(app_handle: &AppHandle<R>, payload: CheckActivationRequestPayload) -> Result<LicenseCheckStatus> {
pub async fn check_license<R: Runtime>(
app_handle: &AppHandle<R>,
payload: CheckActivationRequestPayload,
) -> Result<LicenseCheckStatus> {
let activation_id = get_activation_id(app_handle).await;
let settings = yaak_models::queries::get_or_create_settings(app_handle).await;
let trial_end = settings.created_at.add(Duration::from_secs(TRIAL_SECONDS));
@@ -195,7 +199,7 @@ fn build_url(path: &str) -> String {
}
}
pub async fn get_activation_id<R: Runtime>(mgr: &impl Manager<R>) -> String {
yaak_models::queries::get_key_value_string(mgr, KV_ACTIVATION_ID_KEY, KV_NAMESPACE, "")
pub async fn get_activation_id<R: Runtime>(app_handle: &AppHandle<R>) -> String {
yaak_models::queries::get_key_value_string(app_handle, KV_ACTIVATION_ID_KEY, KV_NAMESPACE, "")
.await
}

View File

@@ -44,7 +44,7 @@ export type HttpUrlParameter = { enabled?: boolean, name: string, value: string,
export type KeyValue = { model: "key_value", createdAt: string, updatedAt: string, key: string, namespace: string, value: string, };
export type ModelPayload = { model: AnyModel, windowLabel: string, updateSource: UpdateSource, };
export type ModelPayload = { model: AnyModel, updateSource: UpdateSource, };
export type Plugin = { model: "plugin", id: string, createdAt: string, updatedAt: string, checkedAt: string | null, directory: string, enabled: boolean, url: string | null, };
@@ -60,7 +60,7 @@ export type SyncHistory = { model: "sync_history", id: string, workspaceId: stri
export type SyncState = { model: "sync_state", id: string, workspaceId: string, createdAt: string, updatedAt: string, flushedAt: string, modelId: string, checksum: string, relPath: string, syncDir: string, };
export type UpdateSource = "sync" | "window" | "plugin" | "background" | "import";
export type UpdateSource = { "type": "sync" } | { "type": "window", label: string, } | { "type": "plugin" } | { "type": "background" } | { "type": "import" };
export type WebsocketConnection = { model: "websocket_connection", id: string, createdAt: string, updatedAt: string, workspaceId: string, requestId: string, elapsed: number, error: string | null, headers: Array<HttpResponseHeader>, state: WebsocketConnectionState, status: number, url: string, };

View File

@@ -4,10 +4,16 @@ use thiserror::Error;
pub enum Error {
#[error("SQL error: {0}")]
SqlError(#[from] rusqlite::Error),
#[error("SQL Pool error: {0}")]
SqlPoolError(#[from] r2d2::Error),
#[error("JSON error: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Model not found {0}")]
ModelNotFound(String),
#[error("unknown error")]
Unknown,
}

View File

@@ -4,3 +4,4 @@ pub mod queries;
pub mod plugin;
pub mod render;
pub mod manager;

View File

@@ -0,0 +1,40 @@
use crate::error::Result;
use crate::models::{Workspace, WorkspaceIden};
use crate::plugin::SqliteConnection;
use r2d2::{Pool, PooledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::Connection;
use sea_query::{Asterisk, Order, Query, SqliteQueryBuilder};
use sea_query_rusqlite::RusqliteBinder;
use std::future::Future;
use std::ops::Deref;
use tauri::{AppHandle, Manager, Runtime};
pub struct QueryManager {
pool: Pool<SqliteConnectionManager>,
}
pub trait DBConnection {
fn connect(
&self,
) -> impl Future<Output = Result<PooledConnection<SqliteConnectionManager>>> + Send;
}
impl<R: Runtime> DBConnection for AppHandle<R> {
async fn connect(&self) -> Result<PooledConnection<SqliteConnectionManager>> {
let dbm = &*self.state::<SqliteConnection>();
let db = dbm.0.lock().await.get()?;
Ok(db)
}
}
pub async fn list_workspaces<T: Deref<Target = Connection>>(c: &T) -> Result<Vec<Workspace>> {
let (sql, params) = Query::select()
.from(WorkspaceIden::Table)
.column(Asterisk)
.order_by(WorkspaceIden::Name, Order::Asc)
.build_rusqlite(SqliteQueryBuilder);
let mut stmt = c.prepare(sql.as_str())?;
let items = stmt.query_map(&*params.as_params(), |row| row.try_into())?;
Ok(items.map(|v| v.unwrap()).collect())
}

File diff suppressed because it is too large Load Diff

View File

@@ -9,17 +9,17 @@ use log::warn;
use serde::{Deserialize, Serialize};
use std::path::Path;
use tauri::ipc::Channel;
use tauri::{command, Listener, Runtime, WebviewWindow};
use tauri::{command, AppHandle, Listener, Runtime};
use tokio::sync::watch;
use ts_rs::TS;
#[command]
pub async fn calculate<R: Runtime>(
window: WebviewWindow<R>,
app_handle: AppHandle<R>,
workspace_id: &str,
sync_dir: &Path,
) -> Result<Vec<SyncOp>> {
let db_candidates = get_db_candidates(&window, workspace_id, sync_dir).await?;
let db_candidates = get_db_candidates(&app_handle, workspace_id, sync_dir).await?;
let fs_candidates = get_fs_candidates(sync_dir)
.await?
.into_iter()
@@ -40,13 +40,13 @@ pub async fn calculate_fs(dir: &Path) -> Result<Vec<SyncOp>> {
#[command]
pub async fn apply<R: Runtime>(
window: WebviewWindow<R>,
app_handle: AppHandle<R>,
sync_ops: Vec<SyncOp>,
sync_dir: &Path,
workspace_id: &str,
) -> Result<()> {
let sync_state_ops = apply_sync_ops(&window, &workspace_id, sync_dir, sync_ops).await?;
apply_sync_state_ops(&window, workspace_id, sync_dir, sync_state_ops).await
let sync_state_ops = apply_sync_ops(&app_handle, &workspace_id, sync_dir, sync_ops).await?;
apply_sync_state_ops(&app_handle, workspace_id, sync_dir, sync_state_ops).await
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
@@ -58,7 +58,7 @@ pub(crate) struct WatchResult {
#[command]
pub async fn watch<R: Runtime>(
window: WebviewWindow<R>,
app_handle: AppHandle<R>,
sync_dir: &Path,
workspace_id: &str,
channel: Channel<WatchEvent>,
@@ -67,16 +67,16 @@ pub async fn watch<R: Runtime>(
watch_directory(&sync_dir, channel, cancel_rx).await?;
let window_inner = window.clone();
let app_handle_inner = app_handle.clone();
let unlisten_event =
format!("watch-unlisten-{}-{}", workspace_id, Utc::now().timestamp_millis());
// TODO: Figure out a way to unlisten when the client window refreshes or closes. Perhaps with
// TODO: Figure out a way to unlisten when the client app_handle refreshes or closes. Perhaps with
// a heartbeat mechanism, or ensuring only a single subscription per workspace (at least
// this won't create `n` subs). We could also maybe have a global fs watcher that we keep
// adding to here.
window.listen_any(unlisten_event.clone(), move |event| {
window_inner.unlisten(event.id());
app_handle.listen_any(unlisten_event.clone(), move |event| {
app_handle_inner.unlisten(event.id());
if let Err(e) = cancel_tx.send(()) {
warn!("Failed to send cancel signal to watcher {e:?}");
}

View File

@@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::path::{Path, PathBuf};
use tauri::{Manager, Runtime, WebviewWindow};
use tauri::{AppHandle, Runtime};
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
@@ -106,17 +106,21 @@ pub(crate) struct FsCandidate {
}
pub(crate) async fn get_db_candidates<R: Runtime>(
mgr: &impl Manager<R>,
app_handle: &AppHandle<R>,
workspace_id: &str,
sync_dir: &Path,
) -> Result<Vec<DbCandidate>> {
let models: HashMap<_, _> =
workspace_models(mgr, workspace_id).await?.into_iter().map(|m| (m.id(), m)).collect();
let sync_states: HashMap<_, _> = list_sync_states_for_workspace(mgr, workspace_id, sync_dir)
let models: HashMap<_, _> = workspace_models(app_handle, workspace_id)
.await?
.into_iter()
.map(|s| (s.model_id.clone(), s))
.map(|m| (m.id(), m))
.collect();
let sync_states: HashMap<_, _> =
list_sync_states_for_workspace(app_handle, workspace_id, sync_dir)
.await?
.into_iter()
.map(|s| (s.model_id.clone(), s))
.collect();
// 1. Add candidates for models (created/modified/unmodified)
let mut candidates: Vec<DbCandidate> = models
@@ -223,7 +227,7 @@ pub(crate) fn compute_sync_ops(
model: model.to_owned(),
state: sync_state.to_owned(),
}
},
}
(Some(DbCandidate::Modified(model, sync_state)), None) => SyncOp::FsUpdate {
model: model.to_owned(),
state: sync_state.to_owned(),
@@ -285,10 +289,11 @@ pub(crate) fn compute_sync_ops(
}
async fn workspace_models<R: Runtime>(
mgr: &impl Manager<R>,
app_handle: &AppHandle<R>,
workspace_id: &str,
) -> Result<Vec<SyncModel>> {
let resources = get_workspace_export_resources(mgr, vec![workspace_id], true).await?.resources;
let resources =
get_workspace_export_resources(app_handle, vec![workspace_id], true).await?.resources;
let workspace = resources.workspaces.iter().find(|w| w.id == workspace_id);
let workspace = match workspace {
@@ -318,7 +323,7 @@ async fn workspace_models<R: Runtime>(
}
pub(crate) async fn apply_sync_ops<R: Runtime>(
window: &WebviewWindow<R>,
app_handle: &AppHandle<R>,
workspace_id: &str,
sync_dir: &Path,
sync_ops: Vec<SyncOp>,
@@ -429,7 +434,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
}
}
SyncOp::DbDelete { model, state } => {
delete_model(window, &model).await?;
delete_model(app_handle, &model).await?;
SyncStateOp::Delete {
state: state.to_owned(),
}
@@ -438,7 +443,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
}
let upserted_models = batch_upsert(
window,
app_handle,
workspaces_to_upsert,
environments_to_upsert,
folders_to_upsert,
@@ -452,7 +457,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
// Ensure we creat WorkspaceMeta models for each new workspace, with the appropriate sync dir
let sync_dir_string = sync_dir.to_string_lossy().to_string();
for workspace in upserted_models.workspaces {
let r = match get_workspace_meta(window, &workspace).await {
let r = match get_workspace_meta(app_handle, &workspace).await {
Ok(Some(m)) => {
if m.setting_sync_dir == Some(sync_dir_string.clone()) {
// We don't need to update if unchanged
@@ -462,7 +467,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
setting_sync_dir: Some(sync_dir.to_string_lossy().to_string()),
..m
};
upsert_workspace_meta(window, wm, &UpdateSource::Sync).await
upsert_workspace_meta(app_handle, wm, &UpdateSource::Sync).await
}
Ok(None) => {
let wm = WorkspaceMeta {
@@ -470,7 +475,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
setting_sync_dir: Some(sync_dir.to_string_lossy().to_string()),
..Default::default()
};
upsert_workspace_meta(window, wm, &UpdateSource::Sync).await
upsert_workspace_meta(app_handle, wm, &UpdateSource::Sync).await
}
Err(e) => Err(e),
};
@@ -501,7 +506,7 @@ pub(crate) enum SyncStateOp {
}
pub(crate) async fn apply_sync_state_ops<R: Runtime>(
window: &WebviewWindow<R>,
app_handle: &AppHandle<R>,
workspace_id: &str,
sync_dir: &Path,
ops: Vec<SyncStateOp>,
@@ -522,7 +527,7 @@ pub(crate) async fn apply_sync_state_ops<R: Runtime>(
flushed_at: Utc::now().naive_utc(),
..Default::default()
};
upsert_sync_state(window, sync_state).await?;
upsert_sync_state(app_handle, sync_state).await?;
}
SyncStateOp::Update {
state: sync_state,
@@ -536,10 +541,10 @@ pub(crate) async fn apply_sync_state_ops<R: Runtime>(
flushed_at: Utc::now().naive_utc(),
..sync_state
};
upsert_sync_state(window, sync_state).await?;
upsert_sync_state(app_handle, sync_state).await?;
}
SyncStateOp::Delete { state } => {
delete_sync_state(window, state.id.as_str()).await?;
delete_sync_state(app_handle, state.id.as_str()).await?;
}
}
}
@@ -551,25 +556,25 @@ fn derive_model_filename(m: &SyncModel) -> PathBuf {
Path::new(&rel).to_path_buf()
}
async fn delete_model<R: Runtime>(window: &WebviewWindow<R>, model: &SyncModel) -> Result<()> {
async fn delete_model<R: Runtime>(app_handle: &AppHandle<R>, model: &SyncModel) -> Result<()> {
match model {
SyncModel::Workspace(m) => {
delete_workspace(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_workspace(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
SyncModel::Environment(m) => {
delete_environment(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_environment(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
SyncModel::Folder(m) => {
delete_folder(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_folder(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
SyncModel::HttpRequest(m) => {
delete_http_request(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_http_request(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
SyncModel::GrpcRequest(m) => {
delete_grpc_request(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_grpc_request(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
SyncModel::WebsocketRequest(m) => {
delete_websocket_request(window, m.id.as_str(), &UpdateSource::Sync).await?;
delete_websocket_request(app_handle, m.id.as_str(), &UpdateSource::Sync).await?;
}
};
Ok(())

View File

@@ -1,7 +1,6 @@
use crate::error::Error::{RenderStackExceededError, VariableNotFound};
use crate::error::Result;
use crate::{Parser, Token, Tokens, Val};
use log::warn;
use serde_json::json;
use std::collections::HashMap;
use std::future::Future;
@@ -113,13 +112,7 @@ async fn render_value<T: TemplateCallback>(
let v = Box::pin(render_value(a.value, vars, cb, depth)).await?;
resolved_args.insert(a.name, v);
}
match cb.run(name.as_str(), resolved_args.clone()).await {
Ok(s) => s,
Err(e) => {
warn!("Failed to run template callback {}({:?}): {}", name, resolved_args, e);
"".to_string()
}
}
cb.run(name.as_str(), resolved_args.clone()).await?
}
Val::Null => "".into(),
};
@@ -324,7 +317,7 @@ mod parse_and_render_tests {
#[tokio::test]
async fn render_fn_err() -> Result<()> {
let vars = HashMap::new();
let template = r#"${[ error() ]}"#;
let template = r#"hello ${[ error() ]}"#;
struct CB {}
impl TemplateCallback for CB {

View File

@@ -5,7 +5,7 @@ use crate::render::render_request;
use log::{info, warn};
use std::str::FromStr;
use tauri::http::{HeaderMap, HeaderName};
use tauri::{AppHandle, Manager, Runtime, State, Url, WebviewWindow};
use tauri::{AppHandle, Runtime, State, Url, WebviewWindow};
use tokio::sync::{mpsc, Mutex};
use tokio_tungstenite::tungstenite::http::HeaderValue;
use tokio_tungstenite::tungstenite::Message;
@@ -28,41 +28,54 @@ use yaak_plugins::template_callback::PluginTemplateCallback;
#[tauri::command]
pub(crate) async fn upsert_request<R: Runtime>(
request: WebsocketRequest,
w: WebviewWindow<R>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::upsert_websocket_request(&w, request, &UpdateSource::Window).await?)
Ok(queries::upsert_websocket_request(&app_handle, request, &UpdateSource::from_window(&window))
.await?)
}
#[tauri::command]
pub(crate) async fn duplicate_request<R: Runtime>(
request_id: &str,
w: WebviewWindow<R>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::duplicate_websocket_request(&w, request_id, &UpdateSource::Window).await?)
Ok(queries::duplicate_websocket_request(
&app_handle,
request_id,
&UpdateSource::from_window(&window),
)
.await?)
}
#[tauri::command]
pub(crate) async fn delete_request<R: Runtime>(
request_id: &str,
w: WebviewWindow<R>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketRequest> {
Ok(queries::delete_websocket_request(&w, request_id, &UpdateSource::Window).await?)
Ok(queries::delete_websocket_request(&app_handle, request_id, &UpdateSource::from_window(&window)).await?)
}
#[tauri::command]
pub(crate) async fn delete_connection<R: Runtime>(
connection_id: &str,
w: WebviewWindow<R>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<WebsocketConnection> {
Ok(queries::delete_websocket_connection(&w, connection_id, &UpdateSource::Window).await?)
Ok(queries::delete_websocket_connection(&app_handle, connection_id, &UpdateSource::from_window(&window))
.await?)
}
#[tauri::command]
pub(crate) async fn delete_connections<R: Runtime>(
request_id: &str,
w: WebviewWindow<R>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
) -> Result<()> {
Ok(queries::delete_all_websocket_connections(&w, request_id, &UpdateSource::Window).await?)
Ok(queries::delete_all_websocket_connections(&app_handle, request_id, &UpdateSource::from_window(&window))
.await?)
}
#[tauri::command]
@@ -93,24 +106,26 @@ pub(crate) async fn list_connections<R: Runtime>(
pub(crate) async fn send<R: Runtime>(
connection_id: &str,
environment_id: Option<&str>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let connection = get_websocket_connection(&window, connection_id).await?;
let unrendered_request = get_websocket_request(&window, &connection.request_id)
let connection = get_websocket_connection(&app_handle, connection_id).await?;
let unrendered_request = get_websocket_request(&app_handle, &connection.request_id)
.await?
.ok_or(GenericError("WebSocket Request not found".to_string()))?;
let environment = match environment_id {
Some(id) => Some(get_environment(&window, id).await?),
Some(id) => Some(get_environment(&app_handle, id).await?),
None => None,
};
let base_environment = get_base_environment(&window, &unrendered_request.workspace_id).await?;
let base_environment =
get_base_environment(&app_handle, &unrendered_request.workspace_id).await?;
let request = render_request(
&unrendered_request,
&base_environment,
environment.as_ref(),
&PluginTemplateCallback::new(
window.app_handle(),
&app_handle,
&WindowContext::from_window(&window),
RenderPurpose::Send,
),
@@ -121,7 +136,7 @@ pub(crate) async fn send<R: Runtime>(
ws_manager.send(&connection.id, Message::Text(request.message.clone().into())).await?;
upsert_websocket_event(
&window,
&app_handle,
WebsocketEvent {
connection_id: connection.id.clone(),
request_id: request.id.clone(),
@@ -131,7 +146,7 @@ pub(crate) async fn send<R: Runtime>(
message: request.message.into(),
..Default::default()
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();
@@ -142,17 +157,18 @@ pub(crate) async fn send<R: Runtime>(
#[tauri::command]
pub(crate) async fn close<R: Runtime>(
connection_id: &str,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let connection = get_websocket_connection(&window, connection_id).await?;
let connection = get_websocket_connection(&app_handle, connection_id).await?;
let connection = upsert_websocket_connection(
&window,
&app_handle,
&WebsocketConnection {
state: WebsocketConnectionState::Closing,
..connection
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();
@@ -170,24 +186,26 @@ pub(crate) async fn connect<R: Runtime>(
request_id: &str,
environment_id: Option<&str>,
cookie_jar_id: Option<&str>,
app_handle: AppHandle<R>,
window: WebviewWindow<R>,
plugin_manager: State<'_, PluginManager>,
ws_manager: State<'_, Mutex<WebsocketManager>>,
) -> Result<WebsocketConnection> {
let unrendered_request = get_websocket_request(&window, request_id)
let unrendered_request = get_websocket_request(&app_handle, request_id)
.await?
.ok_or(GenericError("Failed to find GRPC request".to_string()))?;
let environment = match environment_id {
Some(id) => Some(get_environment(&window, id).await?),
Some(id) => Some(get_environment(&app_handle, id).await?),
None => None,
};
let base_environment = get_base_environment(&window, &unrendered_request.workspace_id).await?;
let base_environment =
get_base_environment(&app_handle, &unrendered_request.workspace_id).await?;
let request = render_request(
&unrendered_request,
&base_environment,
environment.as_ref(),
&PluginTemplateCallback::new(
window.app_handle(),
&app_handle,
&WindowContext::from_window(&window),
RenderPurpose::Send,
),
@@ -224,18 +242,18 @@ pub(crate) async fn connect<R: Runtime>(
// TODO: Handle cookies
let _cookie_jar = match cookie_jar_id {
Some(id) => Some(get_cookie_jar(&window, id).await?),
Some(id) => Some(get_cookie_jar(&app_handle, id).await?),
None => None,
};
let connection = upsert_websocket_connection(
&window,
&app_handle,
&WebsocketConnection {
workspace_id: request.workspace_id.clone(),
request_id: request_id.to_string(),
..Default::default()
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await?;
@@ -261,20 +279,20 @@ pub(crate) async fn connect<R: Runtime>(
Ok(r) => r,
Err(e) => {
return Ok(upsert_websocket_connection(
&window,
&app_handle,
&WebsocketConnection {
error: Some(format!("{e:?}")),
state: WebsocketConnectionState::Closed,
..connection
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await?);
}
};
upsert_websocket_event(
&window,
&app_handle,
WebsocketEvent {
connection_id: connection.id.clone(),
request_id: request.id.clone(),
@@ -283,7 +301,7 @@ pub(crate) async fn connect<R: Runtime>(
message_type: WebsocketEventType::Open,
..Default::default()
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();
@@ -298,7 +316,7 @@ pub(crate) async fn connect<R: Runtime>(
.collect::<Vec<HttpResponseHeader>>();
let connection = upsert_websocket_connection(
&window,
&app_handle,
&WebsocketConnection {
state: WebsocketConnectionState::Connected,
headers: response_headers,
@@ -306,7 +324,7 @@ pub(crate) async fn connect<R: Runtime>(
url: request.url.clone(),
..connection
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await?;
@@ -314,7 +332,6 @@ pub(crate) async fn connect<R: Runtime>(
let connection_id = connection.id.clone();
let request_id = request.id.to_string();
let workspace_id = request.workspace_id.clone();
let window = window.clone();
let connection = connection.clone();
let mut has_written_close = false;
tokio::spawn(async move {
@@ -324,7 +341,7 @@ pub(crate) async fn connect<R: Runtime>(
}
upsert_websocket_event(
&window,
&app_handle,
WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
@@ -342,7 +359,7 @@ pub(crate) async fn connect<R: Runtime>(
message: message.into_data().into(),
..Default::default()
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();
@@ -350,7 +367,7 @@ pub(crate) async fn connect<R: Runtime>(
info!("Websocket connection closed");
if !has_written_close {
upsert_websocket_event(
&window,
&app_handle,
WebsocketEvent {
connection_id: connection_id.clone(),
request_id: request_id.clone(),
@@ -359,20 +376,20 @@ pub(crate) async fn connect<R: Runtime>(
message_type: WebsocketEventType::Close,
..Default::default()
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();
}
upsert_websocket_connection(
&window,
&app_handle,
&WebsocketConnection {
workspace_id: request.workspace_id.clone(),
request_id: request_id.to_string(),
state: WebsocketConnectionState::Closed,
..connection
},
&UpdateSource::Window,
&UpdateSource::from_window(&window),
)
.await
.unwrap();