mirror of
https://github.com/mountain-loop/yaak.git
synced 2026-03-30 22:22:02 +02:00
Ability to sync environments to folder (#207)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
use crate::error::Result;
|
||||
use crate::sync::{
|
||||
apply_sync_ops, apply_sync_state_ops, compute_sync_ops, get_db_candidates, get_fs_candidates,
|
||||
FsCandidate, SyncOp,
|
||||
apply_sync_ops, apply_sync_state_ops, compute_sync_ops, get_db_candidates, get_fs_candidates, FsCandidate,
|
||||
SyncOp,
|
||||
};
|
||||
use crate::watch::{watch_directory, WatchEvent};
|
||||
use chrono::Utc;
|
||||
@@ -19,9 +19,8 @@ pub async fn calculate<R: Runtime>(
|
||||
workspace_id: &str,
|
||||
sync_dir: &Path,
|
||||
) -> Result<Vec<SyncOp>> {
|
||||
let db_candidates = get_db_candidates(&app_handle, workspace_id, sync_dir).await?;
|
||||
let fs_candidates = get_fs_candidates(sync_dir)
|
||||
.await?
|
||||
let db_candidates = get_db_candidates(&app_handle, workspace_id, sync_dir)?;
|
||||
let fs_candidates = get_fs_candidates(sync_dir)?
|
||||
.into_iter()
|
||||
// Only keep items in the same workspace
|
||||
.filter(|fs| fs.model.workspace_id() == workspace_id)
|
||||
@@ -34,7 +33,7 @@ pub async fn calculate<R: Runtime>(
|
||||
#[command]
|
||||
pub async fn calculate_fs(dir: &Path) -> Result<Vec<SyncOp>> {
|
||||
let db_candidates = Vec::new();
|
||||
let fs_candidates = get_fs_candidates(dir).await?;
|
||||
let fs_candidates = get_fs_candidates(dir)?;
|
||||
Ok(compute_sync_ops(db_candidates, fs_candidates))
|
||||
}
|
||||
|
||||
@@ -45,8 +44,8 @@ pub async fn apply<R: Runtime>(
|
||||
sync_dir: &Path,
|
||||
workspace_id: &str,
|
||||
) -> Result<()> {
|
||||
let sync_state_ops = apply_sync_ops(&app_handle, &workspace_id, sync_dir, sync_ops).await?;
|
||||
apply_sync_state_ops(&app_handle, workspace_id, sync_dir, sync_state_ops).await
|
||||
let sync_state_ops = apply_sync_ops(&app_handle, &workspace_id, sync_dir, sync_ops)?;
|
||||
apply_sync_state_ops(&app_handle, workspace_id, sync_dir, sync_state_ops)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, TS)]
|
||||
|
||||
@@ -130,4 +130,4 @@ impl TryFrom<AnyModel> for SyncModel {
|
||||
};
|
||||
Ok(m)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
use crate::error::Result;
|
||||
use crate::models::SyncModel;
|
||||
use chrono::Utc;
|
||||
use log::{debug, info, warn};
|
||||
use log::{info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::{Display, Formatter};
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
use tauri::{AppHandle, Runtime};
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use ts_rs::TS;
|
||||
use yaak_models::models::{SyncState, WorkspaceMeta};
|
||||
use yaak_models::query_manager::QueryManagerExt;
|
||||
@@ -41,17 +41,21 @@ pub(crate) enum SyncOp {
|
||||
model: SyncModel,
|
||||
state: SyncState,
|
||||
},
|
||||
IgnorePrivate {
|
||||
model: SyncModel,
|
||||
},
|
||||
}
|
||||
|
||||
impl SyncOp {
|
||||
fn workspace_id(&self) -> String {
|
||||
match self {
|
||||
SyncOp::FsCreate { model } => model.workspace_id(),
|
||||
SyncOp::FsUpdate { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::FsDelete { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::DbCreate { fs } => fs.model.workspace_id(),
|
||||
SyncOp::DbUpdate { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::DbDelete { model, .. } => model.workspace_id(),
|
||||
SyncOp::DbUpdate { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::FsCreate { model } => model.workspace_id(),
|
||||
SyncOp::FsDelete { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::FsUpdate { state, .. } => state.workspace_id.clone(),
|
||||
SyncOp::IgnorePrivate { model } => model.workspace_id(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -66,6 +70,7 @@ impl Display for SyncOp {
|
||||
SyncOp::DbCreate { fs } => format!("db_create({})", fs.model.id()),
|
||||
SyncOp::DbUpdate { fs, .. } => format!("db_update({})", fs.model.id()),
|
||||
SyncOp::DbDelete { model, .. } => format!("db_delete({})", model.id()),
|
||||
SyncOp::IgnorePrivate { model } => format!("ignore_private({})", model.id()),
|
||||
}
|
||||
.as_str(),
|
||||
)
|
||||
@@ -76,8 +81,8 @@ impl Display for SyncOp {
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub(crate) enum DbCandidate {
|
||||
Added(SyncModel),
|
||||
Modified(SyncModel, SyncState),
|
||||
Deleted(SyncState),
|
||||
Modified(SyncModel, SyncState),
|
||||
Unmodified(SyncModel, SyncState),
|
||||
}
|
||||
|
||||
@@ -85,8 +90,8 @@ impl DbCandidate {
|
||||
fn model_id(&self) -> String {
|
||||
match &self {
|
||||
DbCandidate::Added(m) => m.id(),
|
||||
DbCandidate::Modified(m, _) => m.id(),
|
||||
DbCandidate::Deleted(s) => s.model_id.clone(),
|
||||
DbCandidate::Modified(m, _) => m.id(),
|
||||
DbCandidate::Unmodified(m, _) => m.id(),
|
||||
}
|
||||
}
|
||||
@@ -101,16 +106,13 @@ pub(crate) struct FsCandidate {
|
||||
pub(crate) checksum: String,
|
||||
}
|
||||
|
||||
pub(crate) async fn get_db_candidates<R: Runtime>(
|
||||
pub(crate) fn get_db_candidates<R: Runtime>(
|
||||
app_handle: &AppHandle<R>,
|
||||
workspace_id: &str,
|
||||
sync_dir: &Path,
|
||||
) -> Result<Vec<DbCandidate>> {
|
||||
let models: HashMap<_, _> = workspace_models(app_handle, workspace_id)
|
||||
.await?
|
||||
.into_iter()
|
||||
.map(|m| (m.id(), m))
|
||||
.collect();
|
||||
let models: HashMap<_, _> =
|
||||
workspace_models(app_handle, workspace_id)?.into_iter().map(|m| (m.id(), m)).collect();
|
||||
let sync_states: HashMap<_, _> = app_handle
|
||||
.db()
|
||||
.list_sync_states_for_workspace(workspace_id, sync_dir)?
|
||||
@@ -121,20 +123,42 @@ pub(crate) async fn get_db_candidates<R: Runtime>(
|
||||
// 1. Add candidates for models (created/modified/unmodified)
|
||||
let mut candidates: Vec<DbCandidate> = models
|
||||
.values()
|
||||
.map(|model| {
|
||||
let existing_sync_state = match sync_states.get(&model.id()) {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
// No sync state yet, so model was just added
|
||||
return DbCandidate::Added(model.to_owned());
|
||||
}
|
||||
};
|
||||
.filter_map(|model| {
|
||||
match sync_states.get(&model.id()) {
|
||||
Some(existing_sync_state) => {
|
||||
// If a sync state exists but the model is now private, treat it as a deletion
|
||||
match model {
|
||||
SyncModel::Environment(e) if !e.public => {
|
||||
return Some(DbCandidate::Deleted(existing_sync_state.to_owned()));
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
|
||||
let updated_since_flush = model.updated_at() > existing_sync_state.flushed_at;
|
||||
if updated_since_flush {
|
||||
DbCandidate::Modified(model.to_owned(), existing_sync_state.to_owned())
|
||||
} else {
|
||||
DbCandidate::Unmodified(model.to_owned(), existing_sync_state.to_owned())
|
||||
let updated_since_flush = model.updated_at() > existing_sync_state.flushed_at;
|
||||
if updated_since_flush {
|
||||
Some(DbCandidate::Modified(
|
||||
model.to_owned(),
|
||||
existing_sync_state.to_owned(),
|
||||
))
|
||||
} else {
|
||||
Some(DbCandidate::Unmodified(
|
||||
model.to_owned(),
|
||||
existing_sync_state.to_owned(),
|
||||
))
|
||||
}
|
||||
}
|
||||
None => {
|
||||
return match model {
|
||||
SyncModel::Environment(e) if !e.public => {
|
||||
// No sync state yet, so ignore the model
|
||||
None
|
||||
}
|
||||
_ => {
|
||||
// No sync state yet, so the model was just added
|
||||
Some(DbCandidate::Added(model.to_owned()))
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
@@ -151,30 +175,28 @@ pub(crate) async fn get_db_candidates<R: Runtime>(
|
||||
Ok(candidates)
|
||||
}
|
||||
|
||||
pub(crate) async fn get_fs_candidates(dir: &Path) -> Result<Vec<FsCandidate>> {
|
||||
pub(crate) fn get_fs_candidates(dir: &Path) -> Result<Vec<FsCandidate>> {
|
||||
// Ensure the root directory exists
|
||||
fs::create_dir_all(dir).await?;
|
||||
fs::create_dir_all(dir)?;
|
||||
|
||||
let mut candidates = Vec::new();
|
||||
let mut entries = fs::read_dir(dir).await?;
|
||||
while let Some(dir_entry) = entries.next_entry().await? {
|
||||
if !dir_entry.file_type().await?.is_file() {
|
||||
let entries = fs::read_dir(dir)?;
|
||||
for dir_entry in entries {
|
||||
let dir_entry = match dir_entry {
|
||||
Ok(v) => v,
|
||||
Err(_) => continue,
|
||||
};
|
||||
|
||||
if !dir_entry.file_type()?.is_file() {
|
||||
continue;
|
||||
};
|
||||
|
||||
let path = dir_entry.path();
|
||||
let (model, checksum) = match SyncModel::from_file(&path) {
|
||||
// TODO: Remove this once we have logic to handle environments. This it to clean
|
||||
// any existing ones from the sync dir that resulted from the 2025.1 betas.
|
||||
Ok(Some((SyncModel::Environment(e), _))) => {
|
||||
fs::remove_file(path).await?;
|
||||
info!("Cleaned up synced environment {}", e.id);
|
||||
continue;
|
||||
}
|
||||
Ok(Some(m)) => m,
|
||||
Ok(None) => continue,
|
||||
Err(e) => {
|
||||
warn!("Failed to read sync file {e}");
|
||||
warn!("Failed to parse sync file {e}");
|
||||
return Err(e);
|
||||
}
|
||||
};
|
||||
@@ -212,32 +234,32 @@ pub(crate) fn compute_sync_ops(
|
||||
let op = match (db_map.get(k), fs_map.get(k)) {
|
||||
(None, None) => return None, // Can never happen
|
||||
(None, Some(fs)) => SyncOp::DbCreate { fs: fs.to_owned() },
|
||||
(Some(DbCandidate::Unmodified(model, sync_state)), None) => {
|
||||
// TODO: Remove this once we have logic to handle environments. This it to
|
||||
// ignore the cleaning we did above of any environments that were written
|
||||
// to disk in the 2025.1 betas.
|
||||
if let SyncModel::Environment(_) = model {
|
||||
return None;
|
||||
}
|
||||
SyncOp::DbDelete {
|
||||
model: model.to_owned(),
|
||||
state: sync_state.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
// DB unchanged <-> FS missing
|
||||
(Some(DbCandidate::Unmodified(model, sync_state)), None) => SyncOp::DbDelete {
|
||||
model: model.to_owned(),
|
||||
state: sync_state.to_owned(),
|
||||
},
|
||||
|
||||
// DB modified <-> FS missing
|
||||
(Some(DbCandidate::Modified(model, sync_state)), None) => SyncOp::FsUpdate {
|
||||
model: model.to_owned(),
|
||||
state: sync_state.to_owned(),
|
||||
},
|
||||
|
||||
// DB added <-> FS missing
|
||||
(Some(DbCandidate::Added(model)), None) => SyncOp::FsCreate {
|
||||
model: model.to_owned(),
|
||||
},
|
||||
(Some(DbCandidate::Deleted(sync_state)), None) => {
|
||||
// Already deleted on FS, but sending it so the SyncState gets dealt with
|
||||
SyncOp::FsDelete {
|
||||
state: sync_state.to_owned(),
|
||||
fs: None,
|
||||
}
|
||||
}
|
||||
|
||||
// DB deleted <-> FS missing
|
||||
// Already deleted on FS, but sending it so the SyncState gets dealt with
|
||||
(Some(DbCandidate::Deleted(sync_state)), None) => SyncOp::FsDelete {
|
||||
state: sync_state.to_owned(),
|
||||
fs: None,
|
||||
},
|
||||
|
||||
// DB unchanged <-> FS exists
|
||||
(Some(DbCandidate::Unmodified(_, sync_state)), Some(fs_candidate)) => {
|
||||
if sync_state.checksum == fs_candidate.checksum {
|
||||
return None;
|
||||
@@ -248,6 +270,8 @@ pub(crate) fn compute_sync_ops(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DB modified <-> FS exists
|
||||
(Some(DbCandidate::Modified(model, sync_state)), Some(fs_candidate)) => {
|
||||
if sync_state.checksum == fs_candidate.checksum {
|
||||
SyncOp::FsUpdate {
|
||||
@@ -255,25 +279,29 @@ pub(crate) fn compute_sync_ops(
|
||||
state: sync_state.to_owned(),
|
||||
}
|
||||
} else if model.updated_at() < fs_candidate.model.updated_at() {
|
||||
// CONFLICT! Write to DB if fs model is newer
|
||||
// CONFLICT! Write to DB if the fs model is newer
|
||||
SyncOp::DbUpdate {
|
||||
state: sync_state.to_owned(),
|
||||
fs: fs_candidate.to_owned(),
|
||||
}
|
||||
} else {
|
||||
// CONFLICT! Write to FS if db model is newer
|
||||
// CONFLICT! Write to FS if the db model is newer
|
||||
SyncOp::FsUpdate {
|
||||
model: model.to_owned(),
|
||||
state: sync_state.to_owned(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// DB added <-> FS anything
|
||||
(Some(DbCandidate::Added(model)), Some(_)) => {
|
||||
// This would be super rare (impossible?), so let's follow the user's intention
|
||||
SyncOp::FsCreate {
|
||||
model: model.to_owned(),
|
||||
}
|
||||
}
|
||||
|
||||
// DB deleted <-> FS exists
|
||||
(Some(DbCandidate::Deleted(sync_state)), Some(fs_candidate)) => SyncOp::FsDelete {
|
||||
state: sync_state.to_owned(),
|
||||
fs: Some(fs_candidate.to_owned()),
|
||||
@@ -284,12 +312,19 @@ pub(crate) fn compute_sync_ops(
|
||||
.collect()
|
||||
}
|
||||
|
||||
async fn workspace_models<R: Runtime>(
|
||||
fn workspace_models<R: Runtime>(
|
||||
app_handle: &AppHandle<R>,
|
||||
workspace_id: &str,
|
||||
) -> Result<Vec<SyncModel>> {
|
||||
let resources =
|
||||
get_workspace_export_resources(app_handle, vec![workspace_id], true).await?.resources;
|
||||
// We want to include private environments here so that we can take them into account during
|
||||
// the sync process. Otherwise, they would be treated as deleted.
|
||||
let include_private_environments = true;
|
||||
let resources = get_workspace_export_resources(
|
||||
app_handle,
|
||||
vec![workspace_id],
|
||||
include_private_environments,
|
||||
)?
|
||||
.resources;
|
||||
let workspace = resources.workspaces.iter().find(|w| w.id == workspace_id);
|
||||
|
||||
let workspace = match workspace {
|
||||
@@ -318,7 +353,7 @@ async fn workspace_models<R: Runtime>(
|
||||
Ok(sync_models)
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
pub(crate) fn apply_sync_ops<R: Runtime>(
|
||||
app_handle: &AppHandle<R>,
|
||||
workspace_id: &str,
|
||||
sync_dir: &Path,
|
||||
@@ -328,13 +363,14 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
return Ok(Vec::new());
|
||||
}
|
||||
|
||||
debug!(
|
||||
info!(
|
||||
"Applying sync ops {}",
|
||||
sync_ops.iter().map(|op| op.to_string()).collect::<Vec<String>>().join(", ")
|
||||
);
|
||||
|
||||
let mut sync_state_ops = Vec::new();
|
||||
let mut workspaces_to_upsert = Vec::new();
|
||||
let environments_to_upsert = Vec::new();
|
||||
let mut environments_to_upsert = Vec::new();
|
||||
let mut folders_to_upsert = Vec::new();
|
||||
let mut http_requests_to_upsert = Vec::new();
|
||||
let mut grpc_requests_to_upsert = Vec::new();
|
||||
@@ -351,8 +387,8 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
let rel_path = derive_model_filename(&model);
|
||||
let abs_path = sync_dir.join(rel_path.clone());
|
||||
let (content, checksum) = model.to_file_contents(&rel_path)?;
|
||||
let mut f = File::create(&abs_path).await?;
|
||||
f.write_all(&content).await?;
|
||||
let mut f = File::create(&abs_path)?;
|
||||
f.write_all(&content)?;
|
||||
SyncStateOp::Create {
|
||||
model_id: model.id(),
|
||||
checksum,
|
||||
@@ -364,8 +400,8 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
let rel_path = Path::new(&state.rel_path);
|
||||
let abs_path = Path::new(&state.sync_dir).join(&rel_path);
|
||||
let (content, checksum) = model.to_file_contents(&rel_path)?;
|
||||
let mut f = File::create(&abs_path).await?;
|
||||
f.write_all(&content).await?;
|
||||
let mut f = File::create(&abs_path)?;
|
||||
f.write_all(&content)?;
|
||||
SyncStateOp::Update {
|
||||
state: state.to_owned(),
|
||||
checksum,
|
||||
@@ -383,7 +419,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
// Always delete the existing path
|
||||
let rel_path = Path::new(&state.rel_path);
|
||||
let abs_path = Path::new(&state.sync_dir).join(&rel_path);
|
||||
fs::remove_file(&abs_path).await?;
|
||||
fs::remove_file(&abs_path)?;
|
||||
SyncStateOp::Delete {
|
||||
state: state.to_owned(),
|
||||
}
|
||||
@@ -395,14 +431,12 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
// Push updates to arrays so we can do them all in a single
|
||||
// batch upsert to make foreign keys happy
|
||||
match fs.model {
|
||||
SyncModel::Workspace(m) => workspaces_to_upsert.push(m),
|
||||
SyncModel::Environment(m) => environments_to_upsert.push(m),
|
||||
SyncModel::Folder(m) => folders_to_upsert.push(m),
|
||||
SyncModel::HttpRequest(m) => http_requests_to_upsert.push(m),
|
||||
SyncModel::GrpcRequest(m) => grpc_requests_to_upsert.push(m),
|
||||
SyncModel::HttpRequest(m) => http_requests_to_upsert.push(m),
|
||||
SyncModel::WebsocketRequest(m) => websocket_requests_to_upsert.push(m),
|
||||
|
||||
// TODO: Handle environments in sync
|
||||
SyncModel::Environment(_) => {}
|
||||
SyncModel::Workspace(m) => workspaces_to_upsert.push(m),
|
||||
};
|
||||
SyncStateOp::Create {
|
||||
model_id,
|
||||
@@ -414,14 +448,12 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
// Push updates to arrays so we can do them all in a single
|
||||
// batch upsert to make foreign keys happy
|
||||
match fs.model {
|
||||
SyncModel::Workspace(m) => workspaces_to_upsert.push(m),
|
||||
SyncModel::Environment(m) => environments_to_upsert.push(m),
|
||||
SyncModel::Folder(m) => folders_to_upsert.push(m),
|
||||
SyncModel::HttpRequest(m) => http_requests_to_upsert.push(m),
|
||||
SyncModel::GrpcRequest(m) => grpc_requests_to_upsert.push(m),
|
||||
SyncModel::HttpRequest(m) => http_requests_to_upsert.push(m),
|
||||
SyncModel::WebsocketRequest(m) => websocket_requests_to_upsert.push(m),
|
||||
|
||||
// TODO: Handle environments in sync
|
||||
SyncModel::Environment(_) => {}
|
||||
SyncModel::Workspace(m) => workspaces_to_upsert.push(m),
|
||||
}
|
||||
SyncStateOp::Update {
|
||||
state: state.to_owned(),
|
||||
@@ -435,6 +467,7 @@ pub(crate) async fn apply_sync_ops<R: Runtime>(
|
||||
state: state.to_owned(),
|
||||
}
|
||||
}
|
||||
SyncOp::IgnorePrivate { .. } => SyncStateOp::NoOp,
|
||||
});
|
||||
}
|
||||
|
||||
@@ -497,9 +530,10 @@ pub(crate) enum SyncStateOp {
|
||||
Delete {
|
||||
state: SyncState,
|
||||
},
|
||||
NoOp,
|
||||
}
|
||||
|
||||
pub(crate) async fn apply_sync_state_ops<R: Runtime>(
|
||||
pub(crate) fn apply_sync_state_ops<R: Runtime>(
|
||||
app_handle: &AppHandle<R>,
|
||||
workspace_id: &str,
|
||||
sync_dir: &Path,
|
||||
@@ -540,6 +574,9 @@ pub(crate) async fn apply_sync_state_ops<R: Runtime>(
|
||||
SyncStateOp::Delete { state } => {
|
||||
app_handle.db().delete_sync_state(&state)?;
|
||||
}
|
||||
SyncStateOp::NoOp => {
|
||||
// Nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
|
||||
Reference in New Issue
Block a user