diff --git a/src/agent.rs b/src/agent.rs index 417b47c..191aef2 100644 --- a/src/agent.rs +++ b/src/agent.rs @@ -11,12 +11,13 @@ use std::sync::{Arc, Mutex as StdMutex}; use tokio::io::{stdin, stdout, BufReader, BufWriter, Stdout}; use tokio::sync::Mutex; +use crate::baseline::LiveBaseline; use crate::cache::HashCache; use crate::ignores::IgnoreStack; use crate::peer::{ apply_delete, apply_delta_to_file, apply_file_data, apply_mkdir, apply_rename, apply_symlink, cleanup_orphan_tmps, compute_delta, compute_signature, forward_local_events, live_loop, - send_file, Pending, Suppression, + send_file, GitGate, Pending, Suppression, }; use crate::protocol::{read_message, write_message, EntryKind, Message, PROTOCOL_VERSION}; use crate::walker::{build_entry, ensure_root, walk_manifest}; @@ -352,13 +353,27 @@ pub async fn run(path: PathBuf) -> Result<()> { // user edits made on the remote during the startup window flow to the // client. let ignores = Arc::new(IgnoreStack::load(&root)); + // Agent has no plan/baseline; a disabled one keeps the shared signatures + // uniform while skipping all persistence. The git gate, however, is real. + let gate = GitGate::default(); + let live_baseline = LiveBaseline::disabled(); let mut buffered: Vec = Vec::new(); while let Ok(batch) = watcher_handle.events.try_recv() { buffered.extend(batch); } if !buffered.is_empty() { tracing::debug!("agent: draining {} buffered watcher events", buffered.len()); - forward_local_events(&root, buffered, &writer, compress, &suppress, false).await?; + forward_local_events( + &root, + buffered, + &writer, + compress, + &suppress, + false, + &gate, + &live_baseline, + ) + .await?; } // ── Live mode ── @@ -368,6 +383,8 @@ pub async fn run(path: PathBuf) -> Result<()> { compress, is_client: false, ignores, + gate, + baseline: live_baseline, }; live_loop(ctx, reader, writer, suppress, pending, watcher_handle).await } diff --git a/src/baseline.rs b/src/baseline.rs new file mode 100644 index 0000000..9b61bf2 --- /dev/null +++ b/src/baseline.rs @@ -0,0 +1,182 @@ +//! Persistent per-root baseline: the converged manifest from the last +//! successful sync. +//! +//! Used as the common ancestor in the three-way diff. From the two live +//! manifests alone, "the user deleted this file here" and "the peer created +//! this file there" are byte-for-byte indistinguishable — a stateless diff +//! can only ever pull the file back, which silently resurrects deletions. +//! The baseline records what both sides agreed on last time, so a path that +//! is now absent on one side can be classified: gone-and-unchanged-elsewhere +//! is a genuine deletion (propagate), anything else is kept (never lose data). +//! +//! `Baseline` is the read side, loaded at session start for the plan. +//! `LiveBaseline` is the write side: seeded with the converged manifest after +//! init sync, then kept current as the live loop applies/forwards changes, so +//! even a file created and deleted within one session is recorded correctly +//! and never resurrects. Both share one on-disk file (a bare path → Entry +//! map), keyed by root, living next to the hash cache in the user-cache dir. + +use std::collections::HashMap; +use std::fs; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, Instant}; + +use crate::protocol::Entry; + +/// At most one disk write per this interval while the live loop is churning. +/// A slightly stale baseline is safe — it only makes the next diff fall back +/// to the conservative pull-back for the un-persisted paths, never a wrong +/// delete — so debouncing trades a tiny resurrection window for far less IO. +const PERSIST_DEBOUNCE: Duration = Duration::from_secs(3); + +/// Read side: the baseline as it was at the last sync. Empty on first run. +#[derive(Default)] +pub struct Baseline { + entries: HashMap, +} + +impl Baseline { + /// Any failure (missing, corrupt, older format) yields an empty baseline, + /// which makes the three-way diff fall back to the conservative pull-back + /// behavior — never a mass delete. + pub fn load(root: &Path) -> Self { + let Some(path) = baseline_path_for(root) else { + return Self::default(); + }; + match fs::read(&path) { + Ok(bytes) => Self { + entries: postcard::from_bytes(&bytes).unwrap_or_default(), + }, + Err(_) => Self::default(), + } + } + + pub fn get(&self, path: &Path) -> Option<&Entry> { + self.entries.get(path) + } + + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } +} + +/// Write side: a shared, mutable baseline kept current during a live session +/// and persisted (debounced) to the same file `Baseline::load` reads. +#[derive(Clone, Default)] +pub struct LiveBaseline { + inner: Arc>, + root: PathBuf, + /// Only the client owns a persistent baseline (it builds the plan). The + /// agent gets a disabled one whose mutations and writes are no-ops. + enabled: bool, +} + +#[derive(Default)] +struct Inner { + entries: HashMap, + dirty: bool, + last_save: Option, +} + +impl LiveBaseline { + /// Seed with the converged manifest and persist immediately, so even a + /// `--once` run or an instant disconnect leaves a correct baseline. + pub fn seed(root: PathBuf, entries: HashMap) -> Self { + let lb = Self { + inner: Arc::new(Mutex::new(Inner { + entries, + dirty: true, + last_save: None, + })), + root, + enabled: true, + }; + lb.persist_now(); + lb + } + + /// A no-op baseline for the agent side (no planning, nothing to persist). + pub fn disabled() -> Self { + Self::default() + } + + /// Record that `path` now holds `entry`'s content on both sides. + pub fn set(&self, entry: Entry) { + if !self.enabled { + return; + } + if let Ok(mut g) = self.inner.lock() { + g.entries.insert(entry.path.clone(), entry); + g.dirty = true; + } + self.persist_due(); + } + + /// Record that `path` is now gone on both sides. + pub fn remove(&self, path: &Path) { + if !self.enabled { + return; + } + if let Ok(mut g) = self.inner.lock() { + if g.entries.remove(path).is_some() { + g.dirty = true; + } + } + self.persist_due(); + } + + /// Persist if dirty and the debounce interval has elapsed. + fn persist_due(&self) { + self.write(false); + } + + /// Persist unconditionally if dirty (called on clean live-loop exit). + pub fn persist_now(&self) { + self.write(true); + } + + fn write(&self, force: bool) { + if !self.enabled { + return; + } + // Serialize under the lock, write outside it — keep the critical + // section to a single in-memory pass, never an IO syscall. + let bytes = { + let Ok(mut g) = self.inner.lock() else { + return; + }; + if !g.dirty { + return; + } + let due = force + || g.last_save + .map(|t| t.elapsed() >= PERSIST_DEBOUNCE) + .unwrap_or(true); + if !due { + return; + } + let Ok(bytes) = postcard::to_allocvec(&g.entries) else { + return; + }; + g.dirty = false; + g.last_save = Some(Instant::now()); + bytes + }; + let Some(path) = baseline_path_for(&self.root) else { + return; + }; + if let Some(parent) = path.parent() { + let _ = fs::create_dir_all(parent); + } + let _ = fs::write(&path, &bytes); + } +} + +fn baseline_path_for(root: &Path) -> Option { + let base = dirs::cache_dir()?.join("synx"); + let mut h = blake3::Hasher::new(); + h.update(root.as_os_str().as_encoded_bytes()); + let id = h.finalize().to_hex(); + Some(base.join(format!("{}.baseline", &id.as_str()[..16]))) +} diff --git a/src/main.rs b/src/main.rs index 4a9eff6..c5f964f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,5 @@ mod agent; +mod baseline; mod cache; mod cli; mod ignores; diff --git a/src/peer.rs b/src/peer.rs index 196ef28..7f4044d 100644 --- a/src/peer.rs +++ b/src/peer.rs @@ -5,7 +5,7 @@ use anyhow::{Context, Result}; use humansize::{format_size, BINARY}; use owo_colors::OwoColorize; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs; use std::io::{Read, Write}; use std::os::unix::fs::{MetadataExt, PermissionsExt}; @@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use tokio::sync::Mutex; +use crate::baseline::LiveBaseline; use crate::ignores::IgnoreStack; use crate::protocol::{ read_message, write_message, Entry, EntryKind, Message, SyncMode, CHUNK_SIZE, CHUNK_THRESHOLD, @@ -277,6 +278,80 @@ fn newest_age(p: &Path, meta: &fs::Metadata, now: std::time::SystemTime) -> Dura youngest } +/// Keep `.git/` paused this long after git's markers disappear. `git_busy` +/// flickers false between a multi-step operation's sub-steps (lock files +/// appear and vanish per step), so without hysteresis a burst of transient +/// `.git/` churn — lock files, half-written objects — leaks to the peer in +/// the gaps. This bridges them. +const GIT_SETTLE: Duration = Duration::from_secs(5); + +/// Sticky gate around `git_busy` plus a defer queue for `.git/` events. +/// +/// While git is busy (with hysteresis) we don't drop `.git/` events — we +/// stash them. Once git settles, the caller replays each deferred *path* +/// against its current on-disk state, so intermediate churn collapses to the +/// final result and stray lock files resolve to deletes. Deferred incoming +/// ops are replayed in arrival order. +#[derive(Clone, Default)] +pub struct GitGate { + inner: Arc>, +} + +#[derive(Default)] +struct GitGateInner { + last_busy: Option, + /// Local `.git/` paths touched while paused — replayed by current state. + deferred_out: HashSet, + /// Incoming `.git/` ops received while paused — replayed in order. + deferred_in: Vec, +} + +impl GitGate { + /// Sticky busy check: true while git is mid-operation and for `GIT_SETTLE` + /// after its markers clear. Refreshes the hysteresis timer when actually + /// busy. + pub fn busy(&self, root: &Path) -> bool { + let raw = git_busy(root); + let Ok(mut g) = self.inner.lock() else { + return raw; + }; + if raw { + g.last_busy = Some(Instant::now()); + return true; + } + matches!(g.last_busy, Some(t) if t.elapsed() < GIT_SETTLE) + } + + pub fn defer_out(&self, path: PathBuf) { + if let Ok(mut g) = self.inner.lock() { + g.deferred_out.insert(path); + } + } + + pub fn defer_in(&self, msg: Message) { + if let Ok(mut g) = self.inner.lock() { + g.deferred_in.push(msg); + } + } + + pub fn has_deferred(&self) -> bool { + self.inner + .lock() + .map(|g| !g.deferred_out.is_empty() || !g.deferred_in.is_empty()) + .unwrap_or(false) + } + + /// Drain everything deferred while git was busy; the caller replays it. + pub fn take_deferred(&self) -> (Vec, Vec) { + let Ok(mut g) = self.inner.lock() else { + return (Vec::new(), Vec::new()); + }; + let out: Vec = g.deferred_out.drain().collect(); + let inc = std::mem::take(&mut g.deferred_in); + (out, inc) + } +} + /// Remove tmp files left over from a previous crashed run. /// Age-based (> 1 hour) so we don't step on a concurrently-running synx. /// Cheap; safe to call at startup of both client and agent. @@ -723,6 +798,11 @@ pub struct SessionCtx { pub compress: bool, pub is_client: bool, pub ignores: Arc, + /// Sticky `.git/` pause + defer queue (see `GitGate`). + pub gate: GitGate, + /// Live three-way-diff baseline, kept current as the loop converges. + /// Disabled (no-op) on the agent side. + pub baseline: LiveBaseline, } fn directions(mode: SyncMode, is_client: bool) -> (bool, bool) { @@ -783,6 +863,12 @@ where let sigint = tokio::signal::ctrl_c(); tokio::pin!(sigint); + // Drives the deferred-`.git/` replay: once git settles, the queued events + // are flushed against their current state. 1s latency to resume `.git/` + // sync after an operation finishes is imperceptible. + let mut git_tick = tokio::time::interval(Duration::from_secs(1)); + git_tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + loop { tokio::select! { biased; @@ -818,12 +904,34 @@ where ev = event_rx.recv() => { let Some(events) = ev else { break }; if send_local { - forward_local_events(&ctx.root, events, &writer, ctx.compress, &suppress, ctx.is_client).await?; + forward_local_events(&ctx.root, events, &writer, ctx.compress, &suppress, ctx.is_client, &ctx.gate, &ctx.baseline).await?; + } + } + + _ = git_tick.tick() => { + // Replay `.git/` events deferred during a git operation, but + // only once git has actually settled. + if ctx.gate.has_deferred() && !ctx.gate.busy(&ctx.root) { + let (paths, msgs) = ctx.gate.take_deferred(); + if send_local && !paths.is_empty() { + // Synthesize a Modified for each touched path; + // forward_local_events re-reads current state, so a + // path that's now gone becomes a Delete. + let events: Vec = paths.into_iter().map(FsEvent::Modified).collect(); + forward_local_events(&ctx.root, events, &writer, ctx.compress, &suppress, ctx.is_client, &ctx.gate, &ctx.baseline).await?; + } + for m in msgs { + if let Err(e) = handle_incoming(&ctx, m, &suppress, &pending, &writer, apply_remote).await { + tracing::warn!("deferred apply failed: {}", e); + } + } } } } } + // Flush the latest converged state before we tear down / reconnect. + ctx.baseline.persist_now(); reader_task.abort(); Ok(()) } @@ -848,10 +956,11 @@ where // forwarded over SSH to the same terminal, so any logs there would just // duplicate the client's transcript. let log_event = ctx.is_client; - // If git is mid-operation locally, refuse to apply any change under - // `.git/`. Otherwise the peer (who may NOT be busy) would clobber our - // in-progress rebase/merge state and break ref locking. - let busy = git_busy(root); + // If git is mid-operation locally, don't apply any change under `.git/` — + // the peer (who may NOT be busy) would clobber our in-progress rebase/merge + // state and break ref locking. Sticky (hysteresis) so brief gaps between + // git's sub-steps don't open the gate. + let busy = ctx.gate.busy(root); let path_of = |m: &Message| -> Option { match m { Message::FileData { entry, .. } => Some(entry.path.clone()), @@ -870,7 +979,9 @@ where if busy { if let Some(p) = path_of(&msg) { if is_under_git(&p) { - tracing::debug!("git busy: skip incoming for {}", p.display()); + // Defer, don't drop: replayed once git settles (see live_loop). + tracing::debug!("git busy: defer incoming for {}", p.display()); + ctx.gate.defer_in(msg); return Ok(()); } } @@ -890,6 +1001,7 @@ where let mt = lstat_mtime_ns(&root.join(&entry.path)); suppress.mark_set(entry.path.clone(), mt, entry.hash); tracing::trace!("dedup (recv FileData): {}", entry.path.display()); + ctx.baseline.set(entry); return Ok(()); } // Stale-create guard: peer is sending us a file we just deleted. @@ -919,6 +1031,7 @@ where format_size(size, BINARY).dimmed() ); } + ctx.baseline.set(entry); } Message::FileStart { entry, .. } => { if !apply_remote { @@ -935,6 +1048,7 @@ where let mt = lstat_mtime_ns(&root.join(&entry.path)); suppress.mark_set(entry.path.clone(), mt, entry.hash); tracing::trace!("dedup (recv FileStart): {}", entry.path.display()); + ctx.baseline.set(entry); return Ok(()); } // Stale-create guard (chunked transfer variant). @@ -975,6 +1089,7 @@ where format_size(entry.size, BINARY).dimmed() ); } + ctx.baseline.set(entry); } } Message::Touch { path, mtime, mode } => { @@ -1024,6 +1139,7 @@ where if is_already_equal(root, &entry) { let mt = lstat_mtime_ns(&root.join(&entry.path)); suppress.mark_mtime(entry.path.clone(), mt); + ctx.baseline.set(entry); return Ok(()); } let full = root.join(&entry.path); @@ -1035,6 +1151,7 @@ where return Ok(()); } apply_mkdir(root, &entry)?; + ctx.baseline.set(entry.clone()); // Use the actual on-disk mtime (dir mtime changes whenever // children are added) so future echoes match precisely. let mt = lstat_mtime_ns(&full); @@ -1050,6 +1167,7 @@ where if is_already_equal(root, &entry) { let mt = lstat_mtime_ns(&root.join(&entry.path)); suppress.mark_mtime(entry.path.clone(), mt); + ctx.baseline.set(entry); return Ok(()); } let full = root.join(&entry.path); @@ -1061,6 +1179,7 @@ where return Ok(()); } apply_symlink(root, &entry)?; + ctx.baseline.set(entry.clone()); let mt = lstat_mtime_ns(&full); suppress.mark_mtime(entry.path, mt); } @@ -1073,6 +1192,7 @@ where } let existed_before = fs::symlink_metadata(root.join(&path)).is_ok(); apply_delete(root, &path)?; + ctx.baseline.remove(&path); suppress.mark_deleted(path.clone()); if existed_before && log_event { eprintln!(" {} × {}", "←".bright_cyan(), path.display()); @@ -1102,6 +1222,10 @@ where return Ok(()); } apply_rename(root, &from, &to)?; + ctx.baseline.remove(&from); + if let Some(e) = build_entry(root, &to, None)? { + ctx.baseline.set(e); + } suppress.mark_deleted(from); let mt = lstat_mtime_ns(&root.join(&to)); suppress.mark_mtime(to, mt); @@ -1195,6 +1319,7 @@ fn coalesce(events: Vec) -> Vec { .collect() } +#[allow(clippy::too_many_arguments)] // distinct session pieces; also called pre-SessionCtx pub async fn forward_local_events( root: &Path, events: Vec, @@ -1202,6 +1327,8 @@ pub async fn forward_local_events( compress: bool, suppress: &Suppression, is_client: bool, + gate: &GitGate, + baseline: &LiveBaseline, ) -> Result<()> where W: AsyncWriteExt + Unpin, @@ -1210,10 +1337,11 @@ where // forwarded over SSH stderr and duplicate every transfer line. let log_event = is_client; let events = coalesce(events); - // Once per batch: if git is mid-operation, suppress every event that - // touches .git/. Prevents partial rebase/merge state from leaking to - // the peer where it would race with the peer's own ref updates. - let pause_git = git_busy(root); + // Once per batch: if git is mid-operation, defer every event that touches + // .git/. Prevents partial rebase/merge state from leaking to the peer + // where it would race with the peer's own ref updates. Sticky (hysteresis) + // so the brief gaps between git's sub-steps don't open the gate. + let pause_git = gate.busy(root); for ev in events { if suppress.is_echo(root, &ev) { tracing::trace!("echo suppressed: {:?}", ev); @@ -1225,7 +1353,10 @@ where FsEvent::Renamed { to, .. } => to, }; if is_under_git(key) { - tracing::debug!("git busy: skip event {:?}", ev); + // Defer, don't drop: replayed against current state once git + // settles (see live_loop's git_tick). + tracing::debug!("git busy: defer event {:?}", ev); + gate.defer_out(key.clone()); continue; } } @@ -1248,6 +1379,7 @@ where write_message(&mut *w, &Message::Delete { path: p.clone() }, compress) .await?; } + baseline.remove(&p); suppress.mark_deleted(p); continue; } @@ -1256,6 +1388,9 @@ where let entry_mtime = entry.mtime; let entry_hash = entry.hash; let entry_kind = entry.kind; + // Snapshot for the baseline before `entry` is moved into the + // outgoing message below. + let baseline_entry = entry.clone(); match entry.kind { EntryKind::Dir => { let mut w = writer.lock().await; @@ -1347,6 +1482,9 @@ where _ => NO_HASH, }; suppress.mark_set(path_clone, entry_mtime, hash_to_mark); + // This content is now on both sides — record it as the + // converged baseline (so a later delete of it is detectable). + baseline.set(baseline_entry); } FsEvent::Removed(p) => { if log_event { @@ -1359,6 +1497,7 @@ where // Record that *we* deleted this — receiver dedup uses this // to drop stale FileData / MkDir for the same path arriving // out-of-order from the peer. + baseline.remove(&p); suppress.mark_deleted(p); } FsEvent::Renamed { from, to } => { @@ -1376,6 +1515,7 @@ where } if let Some(entry) = build_entry(root, &to, None)? { let to_mtime = entry.mtime; + let baseline_entry = entry.clone(); match entry.kind { EntryKind::File => { send_file(writer, root, &entry, compress).await?; @@ -1390,7 +1530,9 @@ where } } suppress.mark_mtime(to.clone(), to_mtime); + baseline.set(baseline_entry); } + baseline.remove(&from); suppress.mark_deleted(from.clone()); if log_event { eprintln!( diff --git a/src/sync.rs b/src/sync.rs index 328022c..02d3436 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -12,13 +12,14 @@ use tokio::io::{BufReader, BufWriter}; use tokio::process::{ChildStdin, ChildStdout}; use tokio::sync::Mutex; +use crate::baseline::{Baseline, LiveBaseline}; use crate::cache::HashCache; use crate::cli::ClientArgs; use crate::ignores::IgnoreStack; use crate::peer::{ apply_delete, apply_delta_to_file, apply_file_data, apply_mkdir, apply_rename, apply_symlink, compute_delta, compute_signature, forward_local_events, git_busy, is_under_git, live_loop, - send_file, Pending, Suppression, + send_file, GitGate, Pending, Suppression, }; use crate::protocol::{ read_message, write_message, Entry, EntryKind, Message, SyncMode, PROTOCOL_VERSION, @@ -264,7 +265,15 @@ async fn run_inner( .collect(); // ── Diff ── - let plan = build_plan(&local_manifest, &remote_manifest, args.mode); + // Baseline = the converged manifest from our last successful sync. It is + // the common ancestor that lets the three-way diff distinguish a genuine + // deletion from a creation on the peer (see build_plan). Empty on first + // run → conservative pull-back, no deletes propagate that session. + let baseline = Baseline::load(&local_root); + if baseline.is_empty() { + tracing::debug!("no baseline yet — deletions won't propagate until next sync"); + } + let plan = build_plan(&local_manifest, &remote_manifest, &baseline, args.mode); plan.print(); if args.dry_run { @@ -275,6 +284,20 @@ async fn run_inner( return Ok(()); } + // Apply remote-originated deletions locally. These are paths the peer + // removed whose local copy is still byte-identical to the last baseline, + // so it's a propagated deletion (not an unsynced local edit). Destructive, + // hence gated on the baseline match computed in build_plan. + for path in &plan.del_local { + match apply_delete(&local_root, path) { + Ok(()) => { + suppress.mark_deleted(path.clone()); + eprintln!(" {} × {}", "←".bright_cyan(), path.display()); + } + Err(e) => tracing::warn!("local delete {}: {}", path.display(), e), + } + } + // ── Execute initial sync ── // Phase 1 (sequential): dirs + symlinks. Parents must exist. // Phase 2a (sequential): delta sync for files where remote has an @@ -300,6 +323,7 @@ async fn run_inner( let push_plan = plan.push.clone(); let get_plan = plan.get.clone(); + let del_remote_plan = plan.del_remote.clone(); let writer_for_send = writer.clone(); let local_root_for_send = local_root.clone(); let remote_hash_by_send = remote_hash_by.clone(); @@ -312,6 +336,13 @@ async fn run_inner( .into_iter() .partition(|e| !matches!(e.kind, EntryKind::File)); + // Phase 0: propagate local deletions to the remote. The agent's + // init-sync loop applies these (apply_delete + mark_deleted). + for path in del_remote_plan { + let mut w = writer_for_send.lock().await; + write_message(&mut *w, &Message::Delete { path }, compress).await?; + } + // Phase 1. for e in non_files { let mut w = writer_for_send.lock().await; @@ -653,6 +684,31 @@ async fn run_inner( c.save(&local_root); } + // Seed the next session's baseline from the converged manifest. After init + // sync the local tree equals the merged result: start from the local + // manifest, drop what we just deleted locally, and overwrite pulled paths + // with the remote's version (we now hold its content). This live baseline + // is then kept current by the live loop and persisted on exit — it's what + // lets the next run tell a genuine deletion from a peer creation, even for + // a file created and removed within a single session. + let live_baseline = { + let remote_by_path: HashMap<&PathBuf, &Entry> = + remote_manifest.iter().map(|e| (&e.path, e)).collect(); + let deleted_local: std::collections::HashSet = + plan.del_local.iter().cloned().collect(); + let mut converged: HashMap = local_manifest + .iter() + .filter(|e| !deleted_local.contains(&e.path)) + .map(|e| (e.path.clone(), e.clone())) + .collect(); + for p in &plan.get { + if let Some(r) = remote_by_path.get(p) { + converged.insert(p.clone(), (*r).clone()); + } + } + LiveBaseline::seed(local_root.clone(), converged) + }; + let _ = received_files; if args.once { @@ -663,6 +719,9 @@ async fn run_inner( return Ok(()); } + // Shared per-session git gate (sticky .git/ pause + defer queue). + let gate = GitGate::default(); + // Drain any watcher events that accumulated during the walk + manifest // exchange + init-sync apply. With suppress now populated for every // file we just wrote, echoes of our own writes filter out and real @@ -673,7 +732,17 @@ async fn run_inner( } if !buffered.is_empty() { tracing::debug!("draining {} buffered watcher events", buffered.len()); - forward_local_events(&local_root, buffered, &writer, compress, &suppress, true).await?; + forward_local_events( + &local_root, + buffered, + &writer, + compress, + &suppress, + true, + &gate, + &live_baseline, + ) + .await?; } crate::ui::info("watching for changes — ctrl+c to stop"); @@ -683,6 +752,8 @@ async fn run_inner( compress, is_client: true, ignores, + gate, + baseline: live_baseline, }; let result = live_loop(ctx, reader, writer, suppress, pending, watcher_handle).await; let _ = child.wait().await; @@ -722,6 +793,12 @@ where struct Plan { push: Vec, get: Vec, + /// Deletions to propagate to the remote: paths the user removed locally + /// whose remote copy is still byte-identical to the last synced baseline. + del_remote: Vec, + /// Deletions to apply locally: paths the remote removed whose local copy + /// is still byte-identical to the last synced baseline. + del_local: Vec, /// Paths where local and remote disagree on `kind` (e.g. one side has /// a file, the other a directory). Skipped from sync because blindly /// overwriting would either fail with EISDIR or destroy a directory @@ -752,13 +829,23 @@ impl Plan { .filter(|e| matches!(e.kind, EntryKind::File)) .map(|e| e.size) .sum(); + let del_note = if self.del_remote.is_empty() && self.del_local.is_empty() { + String::new() + } else { + format!( + " • delete {} remote {} local", + self.del_remote.len().to_string().bright_red(), + self.del_local.len().to_string().bright_red(), + ) + }; crate::ui::info(&format!( - "plan: push {} files ({}) {} dirs {} links • pull {} entries", + "plan: push {} files ({}) {} dirs {} links • pull {} entries{}", push_files.to_string().bright_green(), format_size(push_bytes, BINARY).bright_green(), push_dirs, push_links, self.get.len().to_string().bright_cyan(), + del_note, )); if !self.conflicts.is_empty() { crate::ui::warn(&format!( @@ -780,7 +867,14 @@ impl Plan { } } -fn build_plan(local: &[Entry], remote: &[Entry], mode: SyncMode) -> Plan { +/// Three-way diff. `baseline` is the converged manifest from the last +/// successful sync (empty on first run). It is what lets us tell a genuine +/// deletion ("present in baseline, gone here, unchanged there") apart from a +/// creation on the other side ("not in baseline") — the two are identical +/// from the live manifests alone. When the evidence is ambiguous (no +/// baseline) or the surviving side has its own changes (modify-vs-delete), +/// we never propagate the destructive op: we keep the data. +fn build_plan(local: &[Entry], remote: &[Entry], baseline: &Baseline, mode: SyncMode) -> Plan { let local_map: HashMap<&PathBuf, &Entry> = local.iter().map(|e| (&e.path, e)).collect(); let remote_map: HashMap<&PathBuf, &Entry> = remote.iter().map(|e| (&e.path, e)).collect(); @@ -792,21 +886,47 @@ fn build_plan(local: &[Entry], remote: &[Entry], mode: SyncMode) -> Plan { all_paths.sort(); all_paths.dedup(); + // What this sync direction is allowed to mutate. + let (allow_push, allow_pull) = match mode { + SyncMode::Push => (true, false), + SyncMode::Pull => (false, true), + SyncMode::Both => (true, true), + }; + let mut push: Vec = Vec::new(); let mut get: Vec = Vec::new(); + let mut del_remote: Vec = Vec::new(); + let mut del_local: Vec = Vec::new(); let mut conflicts: Vec<(PathBuf, EntryKind, EntryKind)> = Vec::new(); for p in all_paths { let l = local_map.get(p).copied(); let r = remote_map.get(p).copied(); + let b = baseline.get(p); match (l, r) { (Some(l), None) => { - if matches!(mode, SyncMode::Push | SyncMode::Both) { + // Present locally, absent on remote. Either the remote + // deleted a file we both had, or we created it. It's a real + // remote deletion only if our copy is unchanged since the + // baseline; otherwise (we changed it, or no baseline) keep + // local and re-push. + let remote_deleted = b.map(|b| l.same_content(b)).unwrap_or(false); + if remote_deleted && allow_pull { + del_local.push(l.path.clone()); + } else if allow_push { push.push(l.clone()); } } (None, Some(r)) => { - if matches!(mode, SyncMode::Pull | SyncMode::Both) { + // Absent locally, present on remote. Either we deleted a file + // we both had, or the remote created it. It's a real local + // deletion only if the remote copy is unchanged since the + // baseline; otherwise (remote changed it, or no baseline) + // keep remote and pull. + let local_deleted = b.map(|b| r.same_content(b)).unwrap_or(false); + if local_deleted && allow_push { + del_remote.push(r.path.clone()); + } else if allow_pull { get.push(r.path.clone()); } } @@ -850,6 +970,8 @@ fn build_plan(local: &[Entry], remote: &[Entry], mode: SyncMode) -> Plan { Plan { push, get, + del_remote, + del_local, conflicts, } }