diff --git a/crates/smfs-core/src/cache/db.rs b/crates/smfs-core/src/cache/db.rs index 45dc55d..5da9679 100644 --- a/crates/smfs-core/src/cache/db.rs +++ b/crates/smfs-core/src/cache/db.rs @@ -180,6 +180,33 @@ impl Db { ); } + /// Clear `dirty_since` only if it still matches `expected_ms` (compare-and-swap). + /// Returns `true` if the flag was cleared. + pub(crate) fn clear_dirty_since_if_unchanged( + &self, + ino: u64, + expected_ms: Option, + ) -> bool { + let conn = self.conn.lock(); + let changed = match expected_ms { + Some(ms) => conn + .execute( + "UPDATE fs_inode SET dirty_since = NULL \ + WHERE ino = ?1 AND dirty_since = ?2", + rusqlite::params![ino as i64, ms], + ) + .unwrap_or(0), + None => conn + .execute( + "UPDATE fs_inode SET dirty_since = NULL \ + WHERE ino = ?1 AND dirty_since IS NULL", + rusqlite::params![ino as i64], + ) + .unwrap_or(0), + }; + changed > 0 + } + /// Get the dirty watermark for an inode, if any. pub(crate) fn get_dirty_since(&self, ino: u64) -> Option { let conn = self.conn.lock(); @@ -380,7 +407,7 @@ impl Db { } /// Atomically claim the next queued job whose backoff has elapsed. - /// Returns None if empty or everything is inflight / backing off. + /// Snapshots `dirty_since` at claim time for CAS on the success path. pub(crate) fn push_queue_claim_next(&self, now_ms: i64) -> Option { let conn = self.conn.lock(); let row: ( @@ -426,6 +453,17 @@ impl Db { return None; } + // Snapshot dirty_since for CAS in the success path. + let dirty_since_at_claim = content_ino.and_then(|ino| { + conn.query_row( + "SELECT dirty_since FROM fs_inode WHERE ino = ?1", + [ino], + |row| row.get::<_, Option>(0), + ) + .ok() + .flatten() + }); + Some(PushJob { filepath, op, @@ -433,6 +471,7 @@ impl Db { rename_to, remote_id, attempt, + dirty_since_at_claim, }) } @@ -703,6 +742,8 @@ pub(crate) struct PushJob { pub rename_to: Option, pub remote_id: Option, pub attempt: i64, + /// `dirty_since` at claim time, for CAS clearing after `wait_until_done`. + pub dirty_since_at_claim: Option, } #[derive(Debug)] diff --git a/crates/smfs-core/src/cache/tests.rs b/crates/smfs-core/src/cache/tests.rs index 1c84084..c01bd1b 100644 --- a/crates/smfs-core/src/cache/tests.rs +++ b/crates/smfs-core/src/cache/tests.rs @@ -783,3 +783,94 @@ async fn test_readdir_on_empty_dir_does_not_block_on_api() { "readdir burst took {elapsed:?}; empty-dir hot path may be blocking on API" ); } + +// Regression: concurrent write during wait_until_done must not lose data. +#[tokio::test] +async fn test_dirty_since_cas_prevents_data_loss() { + use crate::cache::fs::ReconcileOutcome; + + let fs = fs(); + + // epoch-ms values; updated_at "2026-01-01T00:00:05.000Z" ≈ 1767225605000 + let t1: i64 = 1_767_225_600_000; + let t2: i64 = 1_767_225_610_000; + + let (attr, handle) = fs + .create_file(ROOT, "idea.md", 0o644, UID, GID) + .await + .unwrap(); + handle.write(0, b"version A").await.unwrap(); + handle.flush().await.unwrap(); + let ino = attr.ino; + + fs.db() + .push_queue_upsert("/idea.md", PushOp::Create, Some(ino), None, None, t1); + fs.db().set_dirty_since(ino, Some(t1)); + + // Claim snapshots dirty_since. + let job = fs.db().push_queue_claim_next(t1 + 1000).unwrap(); + assert_eq!(job.filepath, "/idea.md"); + assert_eq!(job.dirty_since_at_claim, Some(t1)); + + let remote_id = "remote-doc-123"; + fs.db().set_remote_id(ino, remote_id); + fs.db().push_queue_set_remote_id("/idea.md", remote_id); + + // Concurrent write during wait_until_done window. + handle.write(0, b"version B").await.unwrap(); + handle.flush().await.unwrap(); + fs.db().set_dirty_since(ino, Some(t2)); + fs.db() + .push_queue_upsert("/idea.md", PushOp::Update, Some(ino), None, Some(remote_id), t2); + + // CAS refuses to clear because dirty_since changed (T2 ≠ T1). + fs.db() + .set_mirrored_state(ino, Some(t1 + 2000), Some("done"), Some(t1 + 4000)); + let cleared = fs + .db() + .clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim); + assert!(!cleared); + fs.db().push_queue_finalize_success("/idea.md", t1 + 4000); + + assert_eq!(fs.dirty_since_of(ino), Some(t2)); + + // Pull reconciler must skip overwrite (dirty_since T2 > updatedAt T1+5s). + let stale_doc = Document { + id: remote_id.to_string(), + filepath: Some("/idea.md".to_string()), + custom_id: None, + title: None, + summary: None, + content: Some("version A".to_string()), + status: "done".to_string(), + container_tags: None, + created_at: "2026-01-01T00:00:00.000Z".to_string(), + updated_at: "2026-01-01T00:00:05.000Z".to_string(), + metadata: None, + type_: Some("text".to_string()), + url: None, + }; + + let outcome = fs.reconcile_one(&stale_doc).unwrap(); + assert!(matches!(outcome, ReconcileOutcome::SkippedDirty)); + + let content = fs.db().read_all_content(ino); + assert_eq!(std::str::from_utf8(&content).unwrap(), "version B"); +} + +#[tokio::test] +async fn test_dirty_since_cas_clears_when_unchanged() { + let fs = fs(); + let (attr, handle) = fs + .create_file(ROOT, "notes.md", 0o644, UID, GID) + .await + .unwrap(); + handle.write(0, b"hello").await.unwrap(); + handle.flush().await.unwrap(); + let ino = attr.ino; + fs.db().set_dirty_since(ino, Some(1000)); + + let cleared = fs.db().clear_dirty_since_if_unchanged(ino, Some(1000)); + assert!(cleared); + assert!(fs.dirty_since_of(ino).is_none()); +} diff --git a/crates/smfs-core/src/sync/push.rs b/crates/smfs-core/src/sync/push.rs index 1483eb6..c7e2e3c 100644 --- a/crates/smfs-core/src/sync/push.rs +++ b/crates/smfs-core/src/sync/push.rs @@ -191,9 +191,13 @@ async fn process_job(fs: &Arc, job: crate::cache::db::PushJob) { tracing::debug!(filepath = %job.filepath, remote_id, "push: PATCH ok"); // Block until the PATCH's reprocessing settles, so // the next coalesced write sees a `done` doc. - wait_until_done(api, remote_id, WAIT_DONE_MAX).await; - db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); - db.set_dirty_since(ino, None); + let done = wait_until_done(api, remote_id, WAIT_DONE_MAX).await; + if done { + db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); + } else { + tracing::warn!(filepath = %job.filepath, remote_id, "push: wait_until_done timed out after PATCH; deferring status to poller"); + } + db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim); db.push_queue_finalize_success(&job.filepath, now_ms()); db.push_notify().notify_one(); } @@ -225,9 +229,13 @@ async fn process_job(fs: &Arc, job: crate::cache::db::PushJob) { db.push_queue_set_remote_id(&job.filepath, &resp.id); // Wait for this POST's pipeline to reach `done` so // a coalesced follow-up PATCH lands cleanly. - wait_until_done(api, &resp.id, WAIT_DONE_MAX).await; - db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); - db.set_dirty_since(ino, None); + let done = wait_until_done(api, &resp.id, WAIT_DONE_MAX).await; + if done { + db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); + } else { + tracing::warn!(filepath = %job.filepath, remote_id = %resp.id, "push: wait_until_done timed out after POST; deferring status to poller"); + } + db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim); db.push_queue_finalize_success(&job.filepath, now_ms()); db.push_notify().notify_one(); } @@ -367,9 +375,13 @@ async fn process_binary_upload( ); db.set_remote_id(ino, &resp.id); db.push_queue_set_remote_id(&job.filepath, &resp.id); - wait_until_done(api, &resp.id, WAIT_DONE_MAX).await; - db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); - db.set_dirty_since(ino, None); + let done = wait_until_done(api, &resp.id, WAIT_DONE_MAX).await; + if done { + db.set_mirrored_state(ino, None, Some("done"), Some(now_ms())); + } else { + tracing::warn!(filepath = %job.filepath, remote_id = %resp.id, "push: wait_until_done timed out after multipart upload; deferring status to poller"); + } + db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim); db.push_queue_finalize_success(&job.filepath, now_ms()); db.push_notify().notify_one(); }