Optimize directory sync performance

This commit is contained in:
Gregory Schier
2025-01-05 10:56:40 -08:00
parent 40adce921b
commit 17fdd608d1
7 changed files with 113 additions and 96 deletions

View File

@@ -6,10 +6,11 @@ use log::{debug, warn};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::fmt::{Display, Formatter};
use std::fs;
use std::fs::create_dir_all;
use std::path::{Path, PathBuf};
use tauri::{Manager, Runtime, WebviewWindow};
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use ts_rs::TS;
use yaak_models::models::{SyncState, Workspace};
use yaak_models::queries::{
@@ -97,7 +98,7 @@ pub(crate) async fn calculate_sync<R: Runtime>(
) -> Result<Vec<SyncOp>> {
let workspace = get_workspace(window, workspace_id).await?;
let db_candidates = get_db_candidates(window, &workspace).await?;
let fs_candidates = get_fs_candidates(&workspace)?;
let fs_candidates = get_fs_candidates(&workspace).await?;
let sync_ops = compute_sync_ops(db_candidates, fs_candidates);
Ok(sync_ops)
@@ -106,10 +107,9 @@ pub(crate) async fn calculate_sync<R: Runtime>(
pub(crate) async fn apply_sync<R: Runtime>(
window: &WebviewWindow<R>,
workspace_id: &str,
sync_ops: Vec<SyncOp>,
) -> Result<()> {
let workspace = get_workspace(window, workspace_id).await?;
let sync_ops = calculate_sync(window, workspace_id).await?;
let sync_state_ops = apply_sync_ops(window, &workspace, sync_ops).await?;
let result = apply_sync_state_ops(window, &workspace, sync_state_ops).await;
@@ -120,17 +120,21 @@ async fn get_db_candidates<R: Runtime>(
mgr: &impl Manager<R>,
workspace: &Workspace,
) -> Result<Vec<DbCandidate>> {
let workspace_id = workspace.id.as_str();
let models = workspace_models(mgr, workspace).await;
let sync_dir = get_workspace_sync_dir(workspace)?;
let sync_states = list_sync_states_for_workspace(mgr, workspace_id, sync_dir).await?;
let models: HashMap<_, _> =
workspace_models(mgr, workspace).await.into_iter().map(|m| (m.id(), m)).collect();
let sync_states: HashMap<_, _> =
list_sync_states_for_workspace(mgr, workspace.id.as_str(), sync_dir)
.await?
.into_iter()
.map(|s| (s.model_id.clone(), s))
.collect();
// 1. Add candidates for models (created/modified/unmodified)
let mut candidates: Vec<DbCandidate> = models
.iter()
.values()
.map(|model| {
let existing_sync_state = sync_states.iter().find(|ss| ss.model_id == model.id());
let existing_sync_state = match existing_sync_state {
let existing_sync_state = match sync_states.get(&model.id()) {
Some(s) => s,
None => {
// No sync state yet, so model was just added
@@ -148,8 +152,8 @@ async fn get_db_candidates<R: Runtime>(
.collect();
// 2. Add SyncState-only candidates (deleted)
candidates.extend(sync_states.iter().filter_map(|sync_state| {
let already_added = models.iter().find(|m| m.id() == sync_state.model_id).is_some();
candidates.extend(sync_states.values().filter_map(|sync_state| {
let already_added = models.contains_key(&sync_state.model_id);
if already_added {
return None;
}
@@ -159,47 +163,46 @@ async fn get_db_candidates<R: Runtime>(
Ok(candidates)
}
fn get_fs_candidates(workspace: &Workspace) -> Result<Vec<FsCandidate>> {
async fn get_fs_candidates(workspace: &Workspace) -> Result<Vec<FsCandidate>> {
let dir = match workspace.setting_sync_dir.clone() {
None => return Ok(Vec::new()),
Some(d) => d,
};
// Ensure the root directory exists
create_dir_all(dir.clone())?;
fs::create_dir_all(dir.clone()).await?;
let candidates = fs::read_dir(dir)?
.filter_map(|dir_entry| {
let dir_entry = dir_entry.ok()?;
if !dir_entry.file_type().ok()?.is_file() {
return None;
};
let mut candidates = Vec::new();
let mut entries = fs::read_dir(dir).await?;
while let Some(dir_entry) = entries.next_entry().await? {
if !dir_entry.file_type().await?.is_file() {
continue;
};
let path = dir_entry.path();
let (model, _, checksum) = match SyncModel::from_file(&path) {
Ok(Some(m)) => m,
Ok(None) => return None,
Err(InvalidSyncFile(_)) => return None,
Err(e) => {
warn!("Failed to read sync file {e}");
return None;
}
};
// Skip models belonging to different workspace
if model.workspace_id() != workspace.id.as_str() {
debug!("Skipping non-workspace file");
return None;
let path = dir_entry.path();
let (model, _, checksum) = match SyncModel::from_file(&path).await {
Ok(Some(m)) => m,
Ok(None) => continue,
Err(InvalidSyncFile(_)) => continue,
Err(e) => {
warn!("Failed to read sync file {e}");
continue;
}
};
let rel_path = Path::new(&dir_entry.file_name()).to_path_buf();
Some(FsCandidate {
rel_path,
model,
checksum,
})
// Skip models belonging to different workspace
if model.workspace_id() != workspace.id.as_str() {
debug!("Skipping non-workspace file");
continue;
}
let rel_path = Path::new(&dir_entry.file_name()).to_path_buf();
candidates.push(FsCandidate {
rel_path,
model,
checksum,
})
.collect();
}
Ok(candidates)
}
@@ -363,7 +366,8 @@ async fn apply_sync_op<R: Runtime>(
let rel_path = derive_model_filename(&model);
let abs_path = derive_full_model_path(workspace, &model)?;
let (content, checksum) = model.to_file_contents(&rel_path)?;
fs::write(&abs_path, content)?;
let mut f = File::create(&abs_path).await?;
f.write_all(&content).await?;
SyncStateOp::Create {
model_id: model.id(),
checksum,
@@ -374,7 +378,8 @@ async fn apply_sync_op<R: Runtime>(
let rel_path = derive_model_filename(&model);
let abs_path = derive_full_model_path(workspace, &model)?;
let (content, checksum) = model.to_file_contents(&rel_path)?;
fs::write(&abs_path, content)?;
let mut f = File::create(&abs_path).await?;
f.write_all(&content).await?;
SyncStateOp::Update {
state: state.to_owned(),
checksum,
@@ -390,7 +395,7 @@ async fn apply_sync_op<R: Runtime>(
},
Some(fs_candidate) => {
let abs_path = derive_full_model_path(workspace, &fs_candidate.model)?;
fs::remove_file(&abs_path)?;
fs::remove_file(&abs_path).await?;
SyncStateOp::Delete {
state: state.to_owned(),
}