Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1ddb880
feat(smg): track worker registration origin in WorkerRegistry
CatherineSue Jun 10, 2026
bc835b9
feat(smg): harden mesh worker imports; add remove_remote
CatherineSue Jun 10, 2026
1efa894
feat(smg): outbound worker publish loop in WorkerSyncAdapter
CatherineSue Jun 10, 2026
b92462d
feat(smg): remove mesh-imported workers on remote tombstone
CatherineSue Jun 10, 2026
fd8f0cd
fix(smg): local registration over a mesh import takes ownership
CatherineSue Jun 10, 2026
22d7517
fix(smg): encode mesh WorkerSpec as JSON; bincode never round-tripped
CatherineSue Jun 10, 2026
da98466
fix(smg): record worker origin before the worker becomes visible
CatherineSue Jun 10, 2026
0842434
fix(smg): inbound worker sync acts on store truth, not event snapshots
CatherineSue Jun 10, 2026
40706fa
feat(smg): reconcile and recovery paths for worker mesh sync
CatherineSue Jun 10, 2026
cc74e87
fix(smg): promote worker origin under the per-worker mutation lock
CatherineSue Jun 10, 2026
39f7b65
fix(smg): never auto-remove failed mesh-imported workers
CatherineSue Jun 10, 2026
6407abd
docs(smg): correct stale mesh-sync comments from the review round
CatherineSue Jun 11, 2026
67f068f
perf(smg): reconcile checks key presence without cloning values
CatherineSue Jun 11, 2026
39d82e0
fix(mesh): drive CRDT tombstone GC from gossip housekeeping
CatherineSue Jun 11, 2026
90f3e47
fix(smg): mesh health hints never resurrect probe-owned worker states
CatherineSue Jun 11, 2026
1a31092
perf(smg): scan the namespace once per reconcile pass
CatherineSue Jun 11, 2026
e8e3b13
revert(mesh): don't drive CRDT tombstone GC on a timer
CatherineSue Jun 11, 2026
3a90035
fix(smg): re-verify worker origin under the lock before remote removal
CatherineSue Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions crates/mesh/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,20 @@ impl MeshKV {
self.chunk_assembler.clone()
}

/// Reclaim tombstone metadata older than the default grace period
/// across every CRDT engine. Returns the number reclaimed.
///
/// Deliberately not driven anywhere: time-based collection is unsound.
/// A peer absent longer than the grace still holds the deleted key's
/// older insert in its op-log and replays it on reconnect; with the
/// tombstone metadata gone, the stale insert lands on an empty entry
/// and resurrects the key. Collection is only safe at causal stability
/// — every live peer acked past the tombstone and absent peers
/// declared dead — so the caller must be the dead-node cleanup layer.
pub fn gc_tombstones(&self) -> usize {
self.store.gc_tombstones()
}

/// Fire subscribers whose prefix matches `key`. Used by the gossip
/// receive path when a chunked value completes (or a single-chunk
/// entry arrives), so handlers can deliver into adapter-owned
Expand Down
16 changes: 16 additions & 0 deletions crates/mesh/src/tests/crdt_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,22 @@ async fn remote_tombstone_after_insert_notifies_none() {
assert_eq!(r_ns.get("worker:a"), None);
}

#[test]
fn gc_tombstones_respects_grace_through_mesh_kv() {
// The controller drives this periodically; a fresh tombstone must
// survive the grace period (engine-level reclamation is covered by
// the crdt_kv tests).
let mesh = MeshKV::new("node-a".into());
let ns = mesh.configure_crdt_prefix("worker:", MergeStrategy::LastWriterWins);
ns.put("worker:a", b"v".to_vec());
ns.delete("worker:a");
assert_eq!(
mesh.gc_tombstones(),
0,
"a fresh tombstone survives the grace period"
);
}

#[test]
fn caught_up_peer_is_sent_nothing() {
let sender = MeshKV::new("sender".to_string());
Expand Down
9 changes: 5 additions & 4 deletions crates/mesh/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use serde::{Deserialize, Serialize};

/// Worker state entry synced across mesh nodes. `spec` is an
/// opaque bincode-serialized `WorkerSpec`; the mesh crate
/// opaque JSON-serialized `WorkerSpec`; the mesh crate
/// doesn't interpret it.
///
/// `Eq`/`Hash` are intentionally omitted: `load: f64` can be
Expand All @@ -18,9 +18,10 @@ pub struct WorkerState {
pub health: bool,
pub load: f64,
pub version: u64,
/// Opaque worker specification (bincode-serialized
/// `WorkerSpec` from the gateway). Empty on old nodes that
/// don't populate this field.
/// Opaque worker specification (JSON-serialized `WorkerSpec`
/// from the gateway; JSON because the type's serde-skip
/// attributes don't round-trip positional formats). Empty on
/// old nodes that don't populate this field.
#[serde(default)]
pub spec: Vec<u8>,
}
Loading
Loading