feat(smg): outbound worker mesh sync — single-writer ownership, publish loop, tombstones, recovery#1661
Conversation
Record WorkerOrigin (Local vs Mesh) per worker id, written under the per-worker mutation lock before the Registered event so consumers that see the event can query it race-free. Replaces the dead sync_mesh parameter on register_inner; remove() clears the entry. The outbound mesh sync consults this so imported workers are never re-published. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
on_remote_worker_state now: refuses to mutate locally-owned workers (a
peer's echo of this node's own published state must not fight the local
health state machine); adopts the publisher's worker id so a later
tombstone for worker:{id} resolves to the import; and forces NotReady on
an explicitly-unhealthy import, which the builder would otherwise
default to Ready when the spec disables health checks.
remove_remote removes a worker on a remote tombstone only when its
origin is Mesh; a tombstone for a locally-owned worker is refused with a
warning.
Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
start() now also spawns a loop over the registry's WorkerEvent stream: locally-owned workers are published as WorkerState (spec included) on Registered/Replaced/StatusChanged and tombstoned on Removed. Mesh-imported workers are filtered by registration origin so a peer's state is never re-published. A published-key set lets removals be tombstoned after the registry drops the origin entry, and on broadcast lag the loop re-publishes all local workers and tombstones any that vanished during the lag window. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
The inbound loop now routes worker: tombstones through WorkerRegistry::remove_remote. Tombstones carry only the key, so they resolve via the publisher's worker id that the import adopted; only mesh-imported workers are removed, and a tombstone for a locally-owned or unknown id is ignored. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
A mesh import can win the URL race against the local AddWorker workflow (systematically so across gateway restarts: the node's own previous state gossips back before startup workflows run). register_or_replace then fell back to replace(), which never touches the origin, leaving a locally-configured worker permanently Mesh-origin: never published, peer-mutable, and deletable by a remote tombstone. register_or_replace now promotes the origin to Local on its replace fallback, and the outbound Replaced/StatusChanged arms gate on live origin instead of the published set so a claimed worker starts publishing from its next mutation. Found by adversarial review (4 independent findings, one root cause). Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
WorkerSpec uses skip_serializing_none and per-field skip_serializing_if attributes, which a positional format cannot round-trip: bincode deserialization failed for every published spec, so every mesh import silently fell back to the minimal builder (HTTP, Regular, default health config) — a peer's gRPC decode worker imported as an HTTP regular worker. JSON is self-describing and round-trips the attributes. Also surface the failure modes: warn on spec encode/decode fallback, and log spec_applied (whether the spec was actually used) instead of the misleading has_spec. Found by adversarial review; empirically reproduced. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
register_inner inserted into workers before worker_origins, leaving a window where lock-free readers (on_remote_worker_state's URL lookup) saw a live worker with no origin and treated it as mesh-imported — a peer's state could promote a just-registered Pending local worker to Ready, bypassing the probe state machine. Both inserts run under the per-worker mutation lock; only the order changes. Found by adversarial review. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
A queued subscription event can be stale by the time it drains: a put echo dequeued after its key was tombstoned would re-import the node's own just-removed worker as a mesh ghost (permanent if the trailing delete notification was dropped by the bounded try_send), and a stale state snapshot could regress health. The inbound loop now re-reads the namespace per event and routes live values through on_remote_worker_state and missing keys through remove_remote, making delivery order irrelevant. Backfill shares the same path. Found by adversarial review. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Subscription notifications go through a bounded try_send and a dropped one is never re-fired (re-merged ops are idempotent and fire no subscribers), so a single drop could permanently desynchronize the registry from the CRDT. A 30s reconcile pass now: removes Mesh-origin workers whose backing key is tombstoned, re-publishes Local workers whose key is missing, and re-imports anything absent (including a cold-start merge burst larger than the channel). Removal runs before backfill so a same-URL key from another publisher re-imports in the same pass. A foreign tombstone for a locally-owned key is now also re-asserted inline: the delete already removed the key cluster-wide, so refusing the registry removal alone would leave the worker delisted on every peer until its next status change. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
The register_or_replace promotion ran after replace() released the lock: a concurrent remove() in the window orphaned the origin entry permanently, and the claiming Replaced event could observe the stale Mesh origin and skip the immediate publish. replace_inner now applies the promotion while holding the guard, before the event, so consumers observing Replaced see the final origin. Found by the memory-audit adversarial review. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
With --remove-unhealthy-workers, a Failed mesh import got Job::RemoveWorker — but the removal is local-only (no tombstone: the worker is peer-owned), so the live CRDT key re-imports it on the next reconcile, probes fail again, and the remove/re-import loop churns forever. Failed imports now stay registered and demoted (unroutable); the owner manages their lifecycle. Found by adversarial review (completeness critic). Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
types.rs still described the spec payload as bincode (a producer following it would write undecodable specs); registry doc comments still promised the retired inline mesh sync; the adoption-block comment promised probe-demotion where the reconcile pass now removes the import. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
The reconcile pass called CrdtNamespace::get per registered worker every 30s, cloning each live worker's full value bytes just to test key presence; a keys() set makes the pass allocation-light. Missing-key handling now reuses sync_key_from_store (one source of truth for remove-import / re-assert-local), and the tombstone arm resolves the origin once instead of twice. Module doc records the dead-owner key gap (cleanup belongs to dead-node key GC). Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
gc_tombstones existed with a 300s grace but had no runtime caller, so every deleted key's grave marker was retained forever on every node — unbounded growth under worker register/remove churn now that removals tombstone worker: keys. The controller's 60-round housekeeping tick drives it across all engines. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
A dead owner's key keeps health=true forever, and the periodic reconcile replays it: a probe-failed import flipped back to Ready every pass, routing traffic to a dead backend in a permanent 30s flap. Promotion now applies only from Pending/NotReady; Failed and Draining stay owned by the local state machine. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Feed reconcile's key-presence set into the backfill instead of a second keys() scan, and record the crash-restart orphan-key case in the module's known-gap note. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughAdds MeshKV::gc_tombstones(), tracks per-worker origin in WorkerRegistry (Local vs Mesh) with origin-aware registration/removal, prevents automatic removal of mesh-imported failed workers, and rewrites WorkerSyncAdapter to run outbound publish, inbound subscribe, and periodic reconcile with expanded tests. ChangesMesh sync and worker origin behavior
Sequence Diagram(s)sequenceDiagram
participant LocalRegistry as Local Worker Registry
participant OutboundLoop as Outbound Publish Loop
participant MeshKV as Mesh KV Store
participant InboundLoop as Inbound Subscribe Loop
participant ReconcileTask as Periodic Reconcile Task
LocalRegistry->>OutboundLoop: mutation event (put/remove local worker)
OutboundLoop->>OutboundLoop: check worker origin is Local
OutboundLoop->>MeshKV: publish WorkerState (JSON-encoded spec)
OutboundLoop->>OutboundLoop: track published keys
MeshKV->>InboundLoop: subscription delivers key/value
InboundLoop->>InboundLoop: read current store bytes
InboundLoop->>LocalRegistry: sync_key_from_store routes import or tombstone handling
InboundLoop->>LocalRegistry: import mesh workers as WorkerOrigin::Mesh
LocalRegistry->>ReconcileTask: periodically check key presence
ReconcileTask->>MeshKV: list worker: keys
ReconcileTask->>LocalRegistry: recover missed puts/tombstones
ReconcileTask->>MeshKV: tombstone stale published keys on lag
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a robust synchronization mechanism for WorkerState across mesh nodes, including periodic CRDT tombstone garbage collection and tracking of worker registration origins (Local vs Mesh) to prevent infinite update loops and unauthorized remote removals. It also switches WorkerSpec serialization from bincode to JSON to ensure faithful round-tripping of skipped fields. Feedback from the review highlights a potential bug where peer ID changes for the same URL are not handled, which could lead to ignored tombstones and transient re-registrations, and suggests skipping the first immediate tick of the reconcile interval to avoid redundant startup scans.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| if let Some(existing_id) = self.get_id_by_url(&state.url) { | ||
| // A locally-owned worker's state is published BY this node; | ||
| // a peer's echo of it must never mutate local status — the | ||
| // local health state machine is the single writer. | ||
| if self.origin_of(&existing_id) == Some(WorkerOrigin::Local) { | ||
| tracing::debug!( | ||
| url = %state.url, | ||
| "Ignoring mesh state for locally-owned worker" | ||
| ); | ||
| return; | ||
| } | ||
| if let Some(existing) = self.get(&existing_id) { | ||
| let status = existing.status(); | ||
| if state.health { | ||
| if matches!(status, WorkerStatus::Pending | WorkerStatus::NotReady) { | ||
| existing.set_status(WorkerStatus::Ready); | ||
| } | ||
| } else if status == WorkerStatus::Ready { | ||
| existing.set_status(WorkerStatus::NotReady); | ||
| } | ||
| tracing::debug!( | ||
| url = %state.url, | ||
| healthy = state.health, | ||
| "Updated health for existing mesh-synced worker" | ||
| ); | ||
| return; | ||
| } | ||
| tracing::debug!( | ||
| url = %state.url, | ||
| healthy = state.health, | ||
| "Updated health for existing mesh-synced worker" | ||
| ); | ||
| return; | ||
| } |
There was a problem hiding this comment.
When a peer restarts or its ID changes, on_remote_worker_state will find the existing worker by URL (with the old ID ID_A), update its health, and return early without updating its ID to ID_B or registering ID_B.
This mismatch between the registry ID (ID_A) and the mesh store key (ID_B) causes two major issues:
- Ignored Tombstones: If
ID_Bis tombstoned, the tombstone event forID_Bwill be ignored becauseID_Bis not inworker_origins, leaving the staleID_Aworker in the registry until the next reconcile pass. - Transient Re-registration: The next periodic
reconcile_oncepass will see thatID_Ais not in the store, removeID_Afrom the registry, and then registerID_Bas a minimal worker. This completely defeats the "Don't re-register to preserve full config" design.
To resolve this, we should detect when the peer's ID has changed for the same URL, and remove the stale existing_id so that the new state.worker_id can be registered immediately.
if let Some(existing_id) = self.get_id_by_url(&state.url) {
// A locally-owned worker's state is published BY this node;
// a peer's echo of it must never mutate local status — the
// local health state machine is the single writer.
if self.origin_of(&existing_id) == Some(WorkerOrigin::Local) {
tracing::debug!(
url = %state.url,
"Ignoring mesh state for locally-owned worker"
);
return;
}
if existing_id.as_str() != state.worker_id {
// Peer ID changed (e.g. restart). Remove the stale ID so the new ID can be registered.
self.remove(&existing_id);
} else if let Some(existing) = self.get(&existing_id) {
let status = existing.status();
if state.health {
if matches!(status, WorkerStatus::Pending | WorkerStatus::NotReady) {
existing.set_status(WorkerStatus::Ready);
}
} else if status == WorkerStatus::Ready {
existing.set_status(WorkerStatus::NotReady);
}
tracing::debug!(
url = %state.url,
healthy = state.health,
"Updated health for existing mesh-synced worker"
);
return;
}
}There was a problem hiding this comment.
The underlying gap is real and already disclosed in the PR's known-limitations (restart re-key: importer holds the old id, tombstones for the new id don't resolve directly). Two corrections to the specifics: (1) the reconcile re-key path imports the new id with its full JSON spec — on_remote_worker_state only falls back to the minimal builder on an undecodable spec — and (2) the remove-then-reimport convergence is the designed healing path, not an accident. Inline id re-binding was evaluated and rejected as invasive registry surgery; the planned fix dissolves the id churn at the source with deterministic worker ids (uuid5(node_name + url)), with dead-node key GC as the backstop. Deferring to those follow-ups.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1a3109294f
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
There was a problem hiding this comment.
Thorough review of all 8 changed files (~1034 additions, ~88 deletions). No issues found.
What I verified:
-
Origin tracking lifecycle —
WorkerOriginis recorded under the per-worker mutation lock before the worker becomes visible in indexes, and cleaned up on removal. Theregister_or_replacepromotion path correctly takes ownership under the same lock, preventing the outbound sync from missing the transition. -
Outbound publish loop — correctly gates on live
origin_of(not thepublishedset) so a worker promoted toLocalmid-life starts publishing immediately. Thepublishedset is only used forRemovedevents (origin is already cleared by then). Lag recovery viaresync_localre-publishes all local workers and tombstones stale keys. -
Inbound store-truth design — the subscription loop ignores the event's value snapshot and re-reads the namespace, making delivery order irrelevant. A stale put echo dequeued after a tombstone correctly converges to absent.
-
Health hint hardening —
FailedandDrainingstates are correctly excluded from mesh health promotion (matches!(status, Pending | NotReady)), preventing stalehealth=truekeys from resurrecting probe-failed imports. -
Spec encoding (bincode → JSON) — backward-compatible: old nodes' bincode specs fail JSON decode and fall back to the minimal builder (same degraded behavior as before). The
serde(skip_serializing_*)round-trip issue is real and well-documented. -
Tombstone resolution — import adopts the publisher's
worker_idviaor_insert_with(preserves existing URL reservations), so a later tombstone forworker:{id}resolves to the imported worker. Foreign tombstones for locally-owned keys are refused and the key is re-asserted. -
Reconcile pass — correctly handles missed puts (backfill), missed tombstones (remove_remote), and lost publishes (re-assert local workers with missing keys). Missing-key handling runs before backfill so a same-URL key from another publisher can re-import in the same pass.
-
Failed mesh import retention —
should_remove_failedprevents the remove/re-import churn loop. Well-reasoned: the CRDT key persists, so local removal just desynchronizes. -
Race conditions —
remove_remotehas a theoretical TOCTOU betweenorigin_ofandself.remove, but the window is nanoseconds-narrow (two consecutive calls in the same function) and requires a concurrentregister_or_replacefor the same worker ID. The system recovers via the 30s reconcile pass regardless.
Test coverage is strong with 25 new tests covering all critical paths: tombstone removal, foreign tombstone refusal, outbound publish/filter/tombstone, reconcile recovery, store-truth convergence, spec round-trip, restart race, and resync correctness.
🔴 Important: 0 · 🟡 Nit: 0 · 🟣 Pre-existing: 0
Time-based tombstone collection is unsound: a peer absent longer than the grace period still holds the deleted key's older insert in its op-log and replays it on reconnect; with the tombstone metadata collected, the stale insert lands on an empty entry and resurrects the key. Collection is only safe at causal stability — every live peer acked past the tombstone (provable via the per-peer ack watermarks) and absent peers declared dead — which is the dead-node cleanup layer's job. MeshKV::gc_tombstones stays for that wiring; the module known-gap note now covers tombstone accrual. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: e8e3b1329c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
remove_remote's lock-free origin check could observe Mesh, block on the per-worker mutation lock while a local claim (register_or_replace -> replace_inner) promoted the origin to Local, then delete the now locally-owned worker — defeating the foreign-tombstone protection in the claim race. remove_inner re-checks the origin after acquiring the lock and aborts on mismatch; the inbound tombstone arm then sees a Local owner and re-asserts the key. Mirror of the promotion-side fix: both halves of the origin lifecycle now decide under the same lock. Signed-off-by: Chang Su <8605658+CatherineSue@users.noreply.github.com>
Description
Problem
#1626 made inbound worker sync live, but nothing publishes: a node's workers, their health transitions, and their removals are invisible to the mesh. The registry also had no notion of which workers it owns versus which it imported — the dead
sync_meshflag carried that intent but did nothing — so any naive outbound wiring would re-publish imports (a CRDT version-bump loop) and let peers mutate or delete locally-configured workers.Solution
Single-writer ownership, enforced end to end: each
worker:{id}key has exactly one writer — the node that registered the worker. Everyone else is a reader.WorkerOrigin(Local/Mesh) recorded per worker under the per-worker mutation lock, before the worker becomes visible; promoted to Local whenregister_or_replaceclaims a URL a mesh import won (the rolling-restart race); cleared on removal.WorkerSyncAdapter::start: Registered/Replaced/StatusChanged of locally-owned workers publishWorkerState(JSONWorkerSpecriding along); Removed tombstones the key. Mesh imports are filtered by live origin. Broadcast lag triggers a full local resync that also tombstones removals missed in the lag window.disable_health_checkspecs to Ready); mesh health hints never resurrect probe-ownedFailed/Drainingstates.remove_remoteremoves only Mesh-origin workers; a foreign tombstone for an owned key is refused and the key re-asserted (the delete already propagated cluster-wide).try_sendnotification missed — imports, tombstones, and lost publishes — including cold-start merge bursts larger than the subscription channel.WorkerSpecis JSON inside the bincodeWorkerStateenvelope — bincode could not round-trip its serde-skip attributes, so every import silently downgraded to the minimal builder.Review hardening
Three adversarial review rounds (multi-agent) ran before this PR: a whole-branch sweep (19 confirmed findings → 6 fix commits), a memory/lifecycle audit of the new structures (leak-freedom proven; one lock-scoping bug found and fixed), and a per-commit design/complexity/perf review + final capped perf pass (tombstone-GC wiring, the Ready-flap fix, and the reconcile allocation fix came out of it).
Changes
model_gateway/src/worker/registry.rs—WorkerOrigin+origin_of/remove_remote;register_inner(origin);replace_inner(.., promote_origin); import hardening inon_remote_worker_statemodel_gateway/src/mesh/adapters/worker_sync.rs— outbound publish loop, store-truth inbound, tombstone handling, 30s reconcile, lag resyncmodel_gateway/src/worker/manager.rs— failed mesh imports are never auto-removed (owner-managed; local removal would churn re-import/remove forever)crates/mesh/src/kv.rs+gossip_controller.rs— tombstone GC driven from the 60-round housekeeping tickcrates/mesh/src/types.rs— spec field documented as JSONKnown limitations (tracked follow-ups)
worker:keys remain cluster-wide (~0.5–2 KB/worker/node; imports stay registered, demoted by probes). Fix: dead-node key GC.api_keynever gossips (deliberate — secrets stay on the owner): imports of auth-protected backends fail non-owner probes and sit demoted.Test Plan
cargo test -p smg -p smg-mesh— full suite green post-rebase (1,550+ tests; 25 new in this branch).cargo clippy --workspace --all-targets --all-features -- -D warnings— clean.Key regressions:
claimed_import_publishes_from_next_mutation(restart race),outbound_never_republishes_mesh_imported_worker(echo),remote_tombstone_never_removes_local_worker(+ key re-assert),stale_healthy_state_never_resurrects_probe_failed_import(Ready-flap),published_spec_round_trips_into_importing_registry(gRPC decode worker imports faithfully),reconcile_recovers_missed_put_and_missed_tombstone,rapid_put_then_delete_converges_to_absent,unhealthy_import_with_disabled_health_check_is_not_ready,failed_mesh_imported_workers_are_not_removed,gc_tombstones_respects_grace_through_mesh_kv.Checklist
cargo +nightly fmtpassescargo clippy --all-targets --all-features -- -D warningspasses🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests