Dir sync filesystem watching

This commit is contained in:
Gregory Schier
2025-01-06 09:24:07 -08:00
parent c72180bb59
commit c2ea2a5fe5
35 changed files with 525 additions and 482 deletions

View File

@@ -1,13 +1,27 @@
use crate::error::Result;
use crate::sync::{apply_sync, calculate_sync, SyncOp};
use tauri::{command, Runtime, WebviewWindow};
use crate::sync::{
apply_sync_ops, apply_sync_state_ops, compute_sync_ops, get_db_candidates, get_fs_candidates,
get_workspace_sync_dir, SyncOp,
};
use crate::watch::{watch_directory, WatchEvent};
use chrono::Utc;
use log::warn;
use serde::{Deserialize, Serialize};
use tauri::ipc::Channel;
use tauri::{command, Listener, Runtime, WebviewWindow};
use tokio::sync::watch;
use ts_rs::TS;
use yaak_models::queries::get_workspace;
#[command]
pub async fn calculate<R: Runtime>(
window: WebviewWindow<R>,
workspace_id: &str,
) -> Result<Vec<SyncOp>> {
calculate_sync(&window, workspace_id).await
let workspace = get_workspace(&window, workspace_id).await?;
let db_candidates = get_db_candidates(&window, &workspace).await?;
let fs_candidates = get_fs_candidates(&workspace).await?;
Ok(compute_sync_ops(db_candidates, fs_candidates))
}
#[command]
@@ -16,5 +30,44 @@ pub async fn apply<R: Runtime>(
sync_ops: Vec<SyncOp>,
workspace_id: &str,
) -> Result<()> {
apply_sync(&window, workspace_id, sync_ops).await
let workspace = get_workspace(&window, workspace_id).await?;
let sync_state_ops = apply_sync_ops(&window, &workspace, sync_ops).await?;
apply_sync_state_ops(&window, &workspace, sync_state_ops).await
}
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export, export_to = "watch.ts")]
pub(crate) struct WatchResult {
unlisten_event: String,
}
#[command]
pub async fn watch<R: Runtime>(
window: WebviewWindow<R>,
workspace_id: &str,
channel: Channel<WatchEvent>,
) -> Result<WatchResult> {
let workspace = get_workspace(&window, workspace_id).await?;
let sync_dir = get_workspace_sync_dir(&workspace)?;
let (cancel_tx, cancel_rx) = watch::channel(());
watch_directory(&sync_dir, channel, cancel_rx).await?;
let window_inner = window.clone();
let unlisten_event =
format!("watch-unlisten-{}-{}", workspace_id, Utc::now().timestamp_millis());
// TODO: Figure out a way to unlisten when the client window refreshes or closes. Perhaps with
// a heartbeat mechanism, or ensuring only a single subscription per workspace (at least
// this won't create `n` subs). We could also maybe have a global fs watcher that we keep
// adding to here.
window.listen_any(unlisten_event.clone(), move |event| {
window_inner.unlisten(event.id());
if let Err(e) = cancel_tx.send(()) {
warn!("Failed to send cancel signal to watcher {e:?}");
}
});
Ok(WatchResult { unlisten_event })
}

View File

@@ -24,6 +24,9 @@ pub enum Error {
#[error("Invalid sync file: {0}")]
InvalidSyncFile(String),
#[error("Watch error: {0}")]
NotifyError(#[from] notify::Error),
}
impl Serialize for Error {

View File

@@ -1,4 +1,4 @@
use crate::commands::{apply, calculate};
use crate::commands::{apply, calculate, watch};
use tauri::{
generate_handler,
plugin::{Builder, TauriPlugin},
@@ -9,7 +9,8 @@ mod commands;
mod error;
mod models;
mod sync;
mod watch;
pub fn init<R: Runtime>() -> TauriPlugin<R> {
Builder::new("yaak-sync").invoke_handler(generate_handler![calculate, apply]).build()
Builder::new("yaak-sync").invoke_handler(generate_handler![calculate, apply, watch]).build()
}

View File

@@ -15,7 +15,7 @@ use ts_rs::TS;
use yaak_models::models::{SyncState, Workspace};
use yaak_models::queries::{
delete_environment, delete_folder, delete_grpc_request, delete_http_request, delete_sync_state,
delete_workspace, get_workspace, get_workspace_export_resources,
delete_workspace, get_workspace_export_resources,
list_sync_states_for_workspace, upsert_environment, upsert_folder, upsert_grpc_request,
upsert_http_request, upsert_sync_state, upsert_workspace, UpdateSource,
};
@@ -65,7 +65,7 @@ impl Display for SyncOp {
}
#[derive(Debug, Clone)]
enum DbCandidate {
pub(crate) enum DbCandidate {
Added(SyncModel),
Modified(SyncModel, SyncState),
Deleted(SyncState),
@@ -92,31 +92,7 @@ pub(crate) struct FsCandidate {
checksum: String,
}
pub(crate) async fn calculate_sync<R: Runtime>(
window: &WebviewWindow<R>,
workspace_id: &str,
) -> Result<Vec<SyncOp>> {
let workspace = get_workspace(window, workspace_id).await?;
let db_candidates = get_db_candidates(window, &workspace).await?;
let fs_candidates = get_fs_candidates(&workspace).await?;
let sync_ops = compute_sync_ops(db_candidates, fs_candidates);
Ok(sync_ops)
}
pub(crate) async fn apply_sync<R: Runtime>(
window: &WebviewWindow<R>,
workspace_id: &str,
sync_ops: Vec<SyncOp>,
) -> Result<()> {
let workspace = get_workspace(window, workspace_id).await?;
let sync_state_ops = apply_sync_ops(window, &workspace, sync_ops).await?;
let result = apply_sync_state_ops(window, &workspace, sync_state_ops).await;
result
}
async fn get_db_candidates<R: Runtime>(
pub(crate) async fn get_db_candidates<R: Runtime>(
mgr: &impl Manager<R>,
workspace: &Workspace,
) -> Result<Vec<DbCandidate>> {
@@ -163,7 +139,7 @@ async fn get_db_candidates<R: Runtime>(
Ok(candidates)
}
async fn get_fs_candidates(workspace: &Workspace) -> Result<Vec<FsCandidate>> {
pub(crate) async fn get_fs_candidates(workspace: &Workspace) -> Result<Vec<FsCandidate>> {
let dir = match workspace.setting_sync_dir.clone() {
None => return Ok(Vec::new()),
Some(d) => d,
@@ -207,7 +183,7 @@ async fn get_fs_candidates(workspace: &Workspace) -> Result<Vec<FsCandidate>> {
Ok(candidates)
}
fn compute_sync_ops(
pub(crate) fn compute_sync_ops(
db_candidates: Vec<DbCandidate>,
fs_candidates: Vec<FsCandidate>,
) -> Vec<SyncOp> {
@@ -317,7 +293,7 @@ async fn workspace_models<R: Runtime>(
sync_models
}
async fn apply_sync_ops<R: Runtime>(
pub(crate) async fn apply_sync_ops<R: Runtime>(
window: &WebviewWindow<R>,
workspace: &Workspace,
sync_ops: Vec<SyncOp>,
@@ -339,7 +315,7 @@ async fn apply_sync_ops<R: Runtime>(
}
#[derive(Debug)]
enum SyncStateOp {
pub(crate) enum SyncStateOp {
Create {
model_id: String,
checksum: String,
@@ -427,7 +403,8 @@ async fn apply_sync_op<R: Runtime>(
Ok(sync_state_op)
}
async fn apply_sync_state_ops<R: Runtime>(
pub(crate) async fn apply_sync_state_ops<R: Runtime>(
window: &WebviewWindow<R>,
workspace: &Workspace,
ops: Vec<SyncStateOp>,
@@ -483,7 +460,7 @@ async fn apply_sync_state_op<R: Runtime>(
Ok(())
}
fn get_workspace_sync_dir(workspace: &Workspace) -> Result<PathBuf> {
pub(crate) fn get_workspace_sync_dir(workspace: &Workspace) -> Result<PathBuf> {
let workspace_id = workspace.to_owned().id;
match workspace.setting_sync_dir.to_owned() {
Some(d) => Ok(Path::new(&d).to_path_buf()),

View File

@@ -0,0 +1,70 @@
use crate::error::Result;
use log::{error, info};
use notify::Watcher;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use tauri::ipc::Channel;
use tokio::select;
use ts_rs::TS;
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
#[serde(rename_all = "camelCase")]
#[ts(export, export_to = "watch.ts")]
pub(crate) struct WatchEvent {
paths: Vec<PathBuf>,
kind: String,
}
pub(crate) async fn watch_directory(
dir: &Path,
channel: Channel<WatchEvent>,
mut cancel_rx: tokio::sync::watch::Receiver<()>,
) -> Result<()> {
let dir = dir.to_owned();
let (tx, rx) = mpsc::channel::<notify::Result<notify::Event>>();
let mut watcher = notify::recommended_watcher(tx)?;
// Spawn a blocking thread to handle the blocking `std::sync::mpsc::Receiver`
let (async_tx, mut async_rx) = tokio::sync::mpsc::channel::<notify::Result<notify::Event>>(100);
std::thread::spawn(move || {
for res in rx {
if async_tx.blocking_send(res).is_err() {
break; // Exit the thread if the async receiver is closed
}
}
});
tauri::async_runtime::spawn(async move {
watcher.watch(&dir, notify::RecursiveMode::Recursive).expect("Failed to watch directory");
info!("Watching directory {:?}", dir);
loop {
select! {
// Listen for new watch events
Some(event_res) = async_rx.recv() => {
match event_res {
Ok(event) => {
channel
.send(WatchEvent {
paths: event.paths,
kind: format!("{:?}", event.kind),
})
.expect("Failed to send watch event");
}
Err(e) => error!("Directory watch error: {:?}", e),
}
}
// Listen for cancellation
_ = cancel_rx.changed() => {
// To cancel, we break from the loop, which will exit the task and make the
// watcher go out of scope (cancelling it)
info!("Cancelling watch for {:?}", dir);
break;
}
}
}
});
Ok(())
}