Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion crates/smfs-core/src/cache/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,33 @@ impl Db {
);
}

/// Clear `dirty_since` only if it still matches `expected_ms` (compare-and-swap).
/// Returns `true` if the flag was cleared.
pub(crate) fn clear_dirty_since_if_unchanged(
&self,
ino: u64,
expected_ms: Option<i64>,
) -> bool {
let conn = self.conn.lock();
let changed = match expected_ms {
Some(ms) => conn
.execute(
"UPDATE fs_inode SET dirty_since = NULL \
WHERE ino = ?1 AND dirty_since = ?2",
rusqlite::params![ino as i64, ms],
)
.unwrap_or(0),
None => conn
.execute(
"UPDATE fs_inode SET dirty_since = NULL \
WHERE ino = ?1 AND dirty_since IS NULL",
rusqlite::params![ino as i64],
)
.unwrap_or(0),
};
changed > 0
}

/// Get the dirty watermark for an inode, if any.
pub(crate) fn get_dirty_since(&self, ino: u64) -> Option<i64> {
let conn = self.conn.lock();
Expand Down Expand Up @@ -380,7 +407,7 @@ impl Db {
}

/// Atomically claim the next queued job whose backoff has elapsed.
/// Returns None if empty or everything is inflight / backing off.
/// Snapshots `dirty_since` at claim time for CAS on the success path.
pub(crate) fn push_queue_claim_next(&self, now_ms: i64) -> Option<PushJob> {
let conn = self.conn.lock();
let row: (
Expand Down Expand Up @@ -426,13 +453,25 @@ impl Db {
return None;
}

// Snapshot dirty_since for CAS in the success path.
let dirty_since_at_claim = content_ino.and_then(|ino| {
conn.query_row(
"SELECT dirty_since FROM fs_inode WHERE ino = ?1",
[ino],
|row| row.get::<_, Option<i64>>(0),
)
.ok()
.flatten()
});

Some(PushJob {
filepath,
op,
content_ino: content_ino.map(|n| n as u64),
rename_to,
remote_id,
attempt,
dirty_since_at_claim,
})
}

Expand Down Expand Up @@ -703,6 +742,8 @@ pub(crate) struct PushJob {
pub rename_to: Option<String>,
pub remote_id: Option<String>,
pub attempt: i64,
/// `dirty_since` at claim time, for CAS clearing after `wait_until_done`.
pub dirty_since_at_claim: Option<i64>,
}

#[derive(Debug)]
Expand Down
91 changes: 91 additions & 0 deletions crates/smfs-core/src/cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -783,3 +783,94 @@ async fn test_readdir_on_empty_dir_does_not_block_on_api() {
"readdir burst took {elapsed:?}; empty-dir hot path may be blocking on API"
);
}

// Regression: concurrent write during wait_until_done must not lose data.
#[tokio::test]
async fn test_dirty_since_cas_prevents_data_loss() {
use crate::cache::fs::ReconcileOutcome;

let fs = fs();

// epoch-ms values; updated_at "2026-01-01T00:00:05.000Z" ≈ 1767225605000
let t1: i64 = 1_767_225_600_000;
let t2: i64 = 1_767_225_610_000;

let (attr, handle) = fs
.create_file(ROOT, "idea.md", 0o644, UID, GID)
.await
.unwrap();
handle.write(0, b"version A").await.unwrap();
handle.flush().await.unwrap();
let ino = attr.ino;

fs.db()
.push_queue_upsert("/idea.md", PushOp::Create, Some(ino), None, None, t1);
fs.db().set_dirty_since(ino, Some(t1));

// Claim snapshots dirty_since.
let job = fs.db().push_queue_claim_next(t1 + 1000).unwrap();
assert_eq!(job.filepath, "/idea.md");
assert_eq!(job.dirty_since_at_claim, Some(t1));

let remote_id = "remote-doc-123";
fs.db().set_remote_id(ino, remote_id);
fs.db().push_queue_set_remote_id("/idea.md", remote_id);

// Concurrent write during wait_until_done window.
handle.write(0, b"version B").await.unwrap();
handle.flush().await.unwrap();
fs.db().set_dirty_since(ino, Some(t2));
fs.db()
.push_queue_upsert("/idea.md", PushOp::Update, Some(ino), None, Some(remote_id), t2);

// CAS refuses to clear because dirty_since changed (T2 ≠ T1).
fs.db()
.set_mirrored_state(ino, Some(t1 + 2000), Some("done"), Some(t1 + 4000));
let cleared = fs
.db()
.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim);
assert!(!cleared);
fs.db().push_queue_finalize_success("/idea.md", t1 + 4000);

assert_eq!(fs.dirty_since_of(ino), Some(t2));

// Pull reconciler must skip overwrite (dirty_since T2 > updatedAt T1+5s).
let stale_doc = Document {
id: remote_id.to_string(),
filepath: Some("/idea.md".to_string()),
custom_id: None,
title: None,
summary: None,
content: Some("version A".to_string()),
status: "done".to_string(),
container_tags: None,
created_at: "2026-01-01T00:00:00.000Z".to_string(),
updated_at: "2026-01-01T00:00:05.000Z".to_string(),
metadata: None,
type_: Some("text".to_string()),
url: None,
};

let outcome = fs.reconcile_one(&stale_doc).unwrap();
assert!(matches!(outcome, ReconcileOutcome::SkippedDirty));

let content = fs.db().read_all_content(ino);
assert_eq!(std::str::from_utf8(&content).unwrap(), "version B");
}

#[tokio::test]
async fn test_dirty_since_cas_clears_when_unchanged() {
let fs = fs();
let (attr, handle) = fs
.create_file(ROOT, "notes.md", 0o644, UID, GID)
.await
.unwrap();
handle.write(0, b"hello").await.unwrap();
handle.flush().await.unwrap();
let ino = attr.ino;
fs.db().set_dirty_since(ino, Some(1000));

let cleared = fs.db().clear_dirty_since_if_unchanged(ino, Some(1000));
assert!(cleared);
assert!(fs.dirty_since_of(ino).is_none());
}
30 changes: 21 additions & 9 deletions crates/smfs-core/src/sync/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,13 @@ async fn process_job(fs: &Arc<SupermemoryFs>, job: crate::cache::db::PushJob) {
tracing::debug!(filepath = %job.filepath, remote_id, "push: PATCH ok");
// Block until the PATCH's reprocessing settles, so
// the next coalesced write sees a `done` doc.
wait_until_done(api, remote_id, WAIT_DONE_MAX).await;
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
db.set_dirty_since(ino, None);
let done = wait_until_done(api, remote_id, WAIT_DONE_MAX).await;
if done {
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
} else {
tracing::warn!(filepath = %job.filepath, remote_id, "push: wait_until_done timed out after PATCH; deferring status to poller");
}
db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim);
db.push_queue_finalize_success(&job.filepath, now_ms());
db.push_notify().notify_one();
}
Expand Down Expand Up @@ -225,9 +229,13 @@ async fn process_job(fs: &Arc<SupermemoryFs>, job: crate::cache::db::PushJob) {
db.push_queue_set_remote_id(&job.filepath, &resp.id);
// Wait for this POST's pipeline to reach `done` so
// a coalesced follow-up PATCH lands cleanly.
wait_until_done(api, &resp.id, WAIT_DONE_MAX).await;
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
db.set_dirty_since(ino, None);
let done = wait_until_done(api, &resp.id, WAIT_DONE_MAX).await;
if done {
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
} else {
tracing::warn!(filepath = %job.filepath, remote_id = %resp.id, "push: wait_until_done timed out after POST; deferring status to poller");
}
db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim);
db.push_queue_finalize_success(&job.filepath, now_ms());
db.push_notify().notify_one();
}
Expand Down Expand Up @@ -367,9 +375,13 @@ async fn process_binary_upload(
);
db.set_remote_id(ino, &resp.id);
db.push_queue_set_remote_id(&job.filepath, &resp.id);
wait_until_done(api, &resp.id, WAIT_DONE_MAX).await;
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
db.set_dirty_since(ino, None);
let done = wait_until_done(api, &resp.id, WAIT_DONE_MAX).await;
if done {
db.set_mirrored_state(ino, None, Some("done"), Some(now_ms()));
} else {
tracing::warn!(filepath = %job.filepath, remote_id = %resp.id, "push: wait_until_done timed out after multipart upload; deferring status to poller");
}
db.clear_dirty_since_if_unchanged(ino, job.dirty_since_at_claim);
db.push_queue_finalize_success(&job.filepath, now_ms());
db.push_notify().notify_one();
}
Expand Down