From 4321ed810cd7d2be4c5364e473fe91c77940183a Mon Sep 17 00:00:00 2001 From: greatjourney589 Date: Mon, 25 May 2026 07:02:41 -0400 Subject: [PATCH] fix: anchor scoring cadence and prune horizon to block height (#372) --- allways/validator/event_watcher.py | 18 +++- allways/validator/forward.py | 10 ++- allways/validator/scoring.py | 59 +++++++++++- allways/validator/state_store.py | 26 ++++++ neurons/validator.py | 9 +- tests/test_event_watcher.py | 84 +++++++++++++++++ tests/test_scoring_v1.py | 140 ++++++++++++++++++++++++++++- 7 files changed, 332 insertions(+), 14 deletions(-) diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index 63b4d19..bb975ab 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -336,6 +336,12 @@ def cold_bootstrap( # Wipe first so a crashed prior cold boot (anchors written, cursor not) # or a stale-cursor fallback can't leave duplicate/orphaned rows. self.state_store.reset_event_watcher_state() + # A cold boot means events before ``current_block - SCORING_WINDOW_BLOCKS`` + # are gone (long outage / pruned RPC). Any persisted scoring frontier + # from before is now unreachable, so clear it: the first post-bootstrap + # window falls back to the trailing window instead of trying to score + # a span whose events no longer exist (issue #372). + self.state_store.set_last_scored_block(0) if metagraph_hotkeys and contract_client is not None: for hotkey in metagraph_hotkeys: try: @@ -732,8 +738,16 @@ def prune_old_events(self, current_block: int) -> None: active event per hotkey is preserved as a state-reconstruction anchor; busy events are kept while the open-swap count is still > 0 so the matching -1 isn't orphaned. Mirrors the prune onto the SQL tables so - warm restarts see the same anchor invariants.""" - cutoff = current_block - SCORING_WINDOW_BLOCKS + warm restarts see the same anchor invariants. + + The cutoff trails the *unscored* frontier — ``min(current_block, + last_scored_block)`` — not ``current_block``. While catching up, + ``last_scored_block`` lags the tip; anchoring on it keeps the events + of the not-yet-scored window ``(last_scored, current]`` until the next + scoring pass consumes them (issue #372).""" + last_scored = self.state_store.get_last_scored_block() + horizon = min(current_block, last_scored) if last_scored > 0 else current_block + cutoff = horizon - SCORING_WINDOW_BLOCKS if cutoff <= 0: return if self.busy_events: diff --git a/allways/validator/forward.py b/allways/validator/forward.py index 8794574..b1e731b 100644 --- a/allways/validator/forward.py +++ b/allways/validator/forward.py @@ -13,7 +13,6 @@ from allways.constants import ( CHALLENGE_WINDOW_BLOCKS, EXTEND_THRESHOLD_BLOCKS, - SCORING_WINDOW_BLOCKS, ) from allways.contract_client import ContractError, is_contract_rejection from allways.utils.logging import log_on_change @@ -25,7 +24,7 @@ scale_encode_initiate_hash_input, ) from allways.validator.chain_verification import SwapVerifier -from allways.validator.scoring import score_and_reward_miners +from allways.validator.scoring import score_and_reward_miners, scoring_due from allways.validator.state_store import PendingConfirm from allways.validator.swap_tracker import SwapTracker @@ -100,9 +99,12 @@ async def forward(self: Validator) -> None: extend_fulfilled_near_timeout(self) enforce_swap_timeouts(self, tracker, uncertain_swaps) - if not self.initial_scoring_done or self.step % SCORING_WINDOW_BLOCKS == 0: + # Scoring cadence is anchored to block height, not self.step: score_due + # gates on persisted last_scored_block + event-watcher catch-up so windows + # tile the block axis contiguously and stay aligned across validators + # regardless of step counts or restart history (issue #372). + if scoring_due(self): score_and_reward_miners(self) - self.initial_scoring_done = True bt.logging.info('forward: scoring done') diff --git a/allways/validator/scoring.py b/allways/validator/scoring.py index fcab18e..483cd3c 100644 --- a/allways/validator/scoring.py +++ b/allways/validator/scoring.py @@ -41,13 +41,60 @@ class DirectionTrace: best_rate: float = 0.0 +def scoring_due(self: Validator) -> bool: + """Block-anchored scoring cadence — replaces the old ``self.step % N`` gate. + + Fires once the event-watcher cursor has caught up to the chain tip *and* a + full ``SCORING_WINDOW_BLOCKS`` (or more) has elapsed since the last scored + block. The first pass on a fresh validator (``last_scored == 0``) fires as + soon as the watcher is caught up. + + Caught-up is required so window_end lands on the chain tip — a globally + agreed value — rather than on each validator's local sync frontier, and so + a window is never scored ahead of events the watcher has loaded. Decoupled + from ``self.step`` entirely: perturbing the step counter cannot change + which blocks are scored, and two validators replaying identical chain + state from the same ``last_scored_block`` score identical windows.""" + if self.event_watcher.cursor < self.block: + return False + last_scored = self.state_store.get_last_scored_block() + if last_scored == 0: + return True + return self.block - last_scored >= SCORING_WINDOW_BLOCKS + + +def scoring_window(self: Validator) -> Tuple[int, int]: + """Derive the ``(window_start, window_end]`` block range for this pass. + + ``window_start`` is the previously-scored block, so consecutive windows + tile the block axis with no gap and no overlap. ``window_end`` is clamped + to the event-watcher cursor so the window never runs ahead of the events + that have actually been loaded (the cursor never exceeds ``self.block``). + On the first pass (``last_scored == 0``) the window falls back to the + trailing ``SCORING_WINDOW_BLOCKS``, anchored by the bootstrap seed.""" + last_scored = self.state_store.get_last_scored_block() + window_end = min(self.block, self.event_watcher.cursor) + if last_scored > 0: + # min() guards a backwards step (chain reorg / stale RPC tip): an + # inverted window would credit nothing, never negative credit. + window_start = min(last_scored, window_end) + else: + window_start = max(0, window_end - SCORING_WINDOW_BLOCKS) + return window_start, window_end + + def score_and_reward_miners(self: Validator) -> None: try: + _, window_end = scoring_window(self) if _contract_is_halted(self): rewards, miner_uids = build_halted_rewards(self) else: rewards, miner_uids = calculate_miner_rewards(self) self.update_scores(rewards, miner_uids) + # Advance the persisted frontier only after scores update, so a crash + # mid-pass re-scores this window rather than skipping it. Pruning runs + # after this so the prune horizon reflects the just-scored block. + self.state_store.set_last_scored_block(window_end) prune_rate_events(self) prune_swap_outcomes(self) except Exception as e: @@ -77,7 +124,14 @@ def build_halted_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: def prune_rate_events(self: Validator) -> None: - cutoff = self.block - SCORING_WINDOW_BLOCKS + """Drop rate events older than one scoring window behind the *unscored* + frontier. Anchored on ``min(self.block, last_scored_block)`` rather than + ``self.block`` so a window that hasn't been scored yet — last_scored lags + self.block while catching up — never loses its inputs before scoring + consumes them (issue #372).""" + last_scored = self.state_store.get_last_scored_block() + horizon = min(self.block, last_scored) if last_scored > 0 else self.block + cutoff = horizon - SCORING_WINDOW_BLOCKS if cutoff > 0: self.state_store.prune_events_older_than(cutoff) @@ -100,8 +154,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]: if n_uids == 0: return np.array([], dtype=np.float32), set() - window_end = self.block - window_start = max(0, window_end - SCORING_WINDOW_BLOCKS) + window_start, window_end = scoring_window(self) # A miner's *current* active flag is irrelevant to whether they earned # crown during the replay window. The only at-scoring-time check is diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 6a4c0a2..757775b 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -540,6 +540,32 @@ def set_event_cursor(self, block_num: int) -> None: ) conn.commit() + def get_last_scored_block(self) -> int: + """Block height the last scoring pass advanced to (0 if never scored). + + Persisted in ``event_watcher_meta`` so the scoring frontier survives a + restart — unlike ``self.step``, which is a per-process counter. Drives + the block-anchored scoring cadence and tiles consecutive scoring + windows: window_start of the next pass is window_end of the last.""" + with self.lock: + conn = self.require_connection() + row = conn.execute( + 'SELECT value FROM event_watcher_meta WHERE key = ?', ('last_scored_block',) + ).fetchone() + return int(row['value']) if row is not None else 0 + + def set_last_scored_block(self, block_num: int) -> None: + with self.lock: + conn = self.require_connection() + conn.execute( + """ + INSERT INTO event_watcher_meta (key, value) VALUES ('last_scored_block', ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value + """, + (block_num,), + ) + conn.commit() + def add_bootstrapped_swap(self, swap_id: int) -> None: with self.lock: conn = self.require_connection() diff --git a/neurons/validator.py b/neurons/validator.py index c191b60..870b8cf 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -87,10 +87,11 @@ def __init__(self, config=None): current_block_fn=lambda: self.block, ) self.last_known_rates: dict[tuple[str, str, str], float] = {} - # Forces one scoring pass per fresh process so a mid-window restart - # doesn't leave self.scores stale until the next 1200-step boundary - # (which would route emissions to RECYCLE via the empty-norm fallback). - self.initial_scoring_done = False + # Scoring cadence is anchored to block height via the persisted + # last_scored_block (see allways.validator.scoring.scoring_due), not a + # per-process step counter. A fresh process with a warm state.db + # resumes from its persisted frontier; a fresh DB (last_scored == 0) + # scores its first window as soon as the event watcher is caught up. # Optimistic propose/challenge/finalize for reservation + timeout # extensions. Stateless decision class — the forward loop drives it diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py index baa6fbf..5601102 100644 --- a/tests/test_event_watcher.py +++ b/tests/test_event_watcher.py @@ -591,6 +591,20 @@ def test_event_cursor_default_is_none_then_round_trips(self, tmp_path: Path): assert store.get_event_cursor() == 5678 store.close() + def test_last_scored_block_default_zero_then_round_trips(self, tmp_path: Path): + """The scoring frontier persists across restart (issue #372) — unlike + self.step, it's stored in state.db so a fresh process resumes from it.""" + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + assert store.get_last_scored_block() == 0 # never scored + store.set_last_scored_block(9_400) + assert store.get_last_scored_block() == 9_400 + store.set_last_scored_block(10_000) + assert store.get_last_scored_block() == 10_000 + # Round-trips independently of the event cursor (shares the meta table). + store.set_event_cursor(12_345) + assert store.get_last_scored_block() == 10_000 + store.close() + def test_prune_active_events_preserves_latest_per_hotkey(self, tmp_path: Path): store = ValidatorStateStore(db_path=tmp_path / 'state.db') store.insert_active_event(100, 'hk_a', True) @@ -640,6 +654,49 @@ def test_reset_event_watcher_state_wipes_all_four_tables(self, tmp_path: Path): store.close() +class TestPruneHorizonTracksScoringFrontier: + """Issue #372 — the event prune cutoff trails the *unscored* frontier + (min(current_block, last_scored_block)), not the chain tip. A window that + hasn't been scored yet must keep its busy/active inputs until scoring + consumes them.""" + + def test_lagging_frontier_retains_unscored_events(self, tmp_path: Path): + from allways.constants import SCORING_WINDOW_BLOCKS as W + + w = make_watcher(tmp_path) + # Frontier far behind the tip — simulates catch-up after an outage. + w.state_store.set_last_scored_block(1_000) + # hk_a's activate→deactivate both sit inside the unscored window + # (1_000, tip]; the earlier one is NOT the per-hotkey anchor, so under + # the old `tip - W` cutoff it would be pruned before scoring. + w.apply_event(1_200, 'MinerActivated', {'miner': 'hk_a', 'active': True}) + w.apply_event(1_700, 'MinerActivated', {'miner': 'hk_a', 'active': False}) + + tip = 1_000 + 2 * W # old cutoff (tip - W) would drop the 1_200 row + w.prune_old_events(tip) + + blocks = sorted(ev.block for ev in w.active_events_by_hotkey['hk_a']) + assert blocks == [1_200, 1_700] # both transitions retained for scoring + w.state_store.close() + + def test_caught_up_frontier_prunes_to_window(self, tmp_path: Path): + """Once the frontier reaches the tip, pruning trims back to one window + behind it — stale anchors aside — same as before the fix.""" + from allways.constants import SCORING_WINDOW_BLOCKS as W + + w = make_watcher(tmp_path) + tip = 10_000 + w.state_store.set_last_scored_block(tip) # frontier caught up + w.apply_event(100, 'MinerActivated', {'miner': 'hk_a', 'active': True}) + w.apply_event(tip - W + 100, 'MinerActivated', {'miner': 'hk_a', 'active': False}) + + w.prune_old_events(tip) # cutoff = tip - W + + blocks = [ev.block for ev in w.active_events_by_hotkey['hk_a']] + assert blocks == [tip - W + 100] # ancient row dropped, in-window row kept + w.state_store.close() + + class TestEventWatcherWarmRestart: """Persisted cursor branches initialize() into hydrate-from-DB vs. cold.""" @@ -794,6 +851,33 @@ def test_long_outage_falls_back_to_cold_bootstrap(self, tmp_path: Path): assert w.cursor == current_block - SCORING_WINDOW_BLOCKS w.state_store.close() + def test_cold_bootstrap_resets_scoring_frontier(self, tmp_path: Path): + """Issue #372 — a long-outage cold boot abandons the unreachable + pre-bootstrap blocks: the persisted scoring frontier resets to 0 so the + first window falls back to the trailing window instead of trying to + score a span whose events no longer exist.""" + from allways.constants import SCORING_WINDOW_BLOCKS + + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.set_last_scored_block(100) # stale frontier from before the outage + store.set_event_cursor(100) # cursor far behind head → cold path + store.close() + + w = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + client = MagicMock() + client.get_miner_active_flag.return_value = False + client.get_active_swaps.return_value = [] + current_block = 100 + SCORING_WINDOW_BLOCKS + 50 # gap > window + w.initialize(current_block=current_block, metagraph_hotkeys=['hk_a'], contract_client=client) + + assert w.state_store.get_last_scored_block() == 0 + w.state_store.close() + def test_terminal_event_removes_bootstrapped_swap_from_db(self, tmp_path: Path): w = make_watcher(tmp_path) w.state_store.add_bootstrapped_swap(42) diff --git a/tests/test_scoring_v1.py b/tests/test_scoring_v1.py index 4e41c1c..128b820 100644 --- a/tests/test_scoring_v1.py +++ b/tests/test_scoring_v1.py @@ -6,7 +6,7 @@ import numpy as np -from allways.constants import RECYCLE_UID, SUCCESS_EXPONENT +from allways.constants import RECYCLE_UID, SCORING_WINDOW_BLOCKS, SUCCESS_EXPONENT from allways.validator.event_watcher import ActiveEvent, ContractEventWatcher from allways.validator.scoring import ( calculate_miner_rewards, @@ -14,6 +14,8 @@ crown_holders_at_instant, replay_crown_time_window, score_and_reward_miners, + scoring_due, + scoring_window, success_rate, ) from allways.validator.state_store import ValidatorStateStore @@ -87,6 +89,9 @@ def make_validator( store = ValidatorStateStore(db_path=tmp_path / 'state.db') watcher = make_watcher(store, active=set(hotkeys)) + # Caught up to the tip — scoring_window clamps window_end to the cursor, so + # the replay covers the trailing SCORING_WINDOW_BLOCKS exactly as before. + watcher.cursor = block collaterals = collaterals or {} bounds_cache = MagicMock() bounds_cache.max_swap_amount.return_value = max_swap_amount @@ -507,6 +512,139 @@ def test_never_active_miner_gets_no_credit(self, tmp_path: Path): v.state_store.close() +class TestBlockAnchoredScoringCadence: + """Issue #372 — scoring cadence is anchored to block height (the persisted + last_scored_block), not the per-process self.step counter. Consecutive + windows tile the block axis contiguously so no block's crown-time is ever + skipped, and validators stay aligned regardless of step/restart history.""" + + def _crown_validator(self, tmp_path: Path, block: int) -> SimpleNamespace: + """A single miner holding the tao→btc crown from block 0 onward.""" + hotkeys = pad_hotkeys_to_cover_recycle(['hk_a']) + v = make_validator(tmp_path, hotkeys=hotkeys, block=block) + conn = v.state_store.require_connection() + conn.execute( + 'INSERT INTO rate_events (hotkey, from_chain, to_chain, rate, block) VALUES (?, ?, ?, ?, ?)', + ('hk_a', 'tao', 'btc', 0.00020, 0), + ) + conn.commit() + return v + + def test_scoring_due_waits_until_watcher_caught_up(self, tmp_path: Path): + """Criterion: never score a window whose events the cursor hasn't loaded.""" + v = self._crown_validator(tmp_path, block=10_000) + v.event_watcher.cursor = 9_000 # watcher lags the tip + assert scoring_due(v) is False + v.event_watcher.cursor = 10_000 # caught up to the tip + assert scoring_due(v) is True + v.state_store.close() + + def test_scoring_due_first_pass_then_gates_on_block_delta(self, tmp_path: Path): + W = SCORING_WINDOW_BLOCKS + v = self._crown_validator(tmp_path, block=10_000) + v.event_watcher.cursor = 10_000 + # last_scored == 0 → first pass fires as soon as the watcher is caught up. + assert scoring_due(v) is True + v.state_store.set_last_scored_block(10_000) + assert scoring_due(v) is False # nothing new this block + # Just shy of a full window: still not due. + v.block = v.event_watcher.cursor = 10_000 + W - 1 + assert scoring_due(v) is False + # A full window elapsed: due again. + v.block = v.event_watcher.cursor = 10_000 + W + assert scoring_due(v) is True + v.state_store.close() + + def test_scoring_due_ignores_step_counter(self, tmp_path: Path): + """Cadence is block-driven: a self.step value that would have tripped the + old `step % SCORING_WINDOW_BLOCKS == 0` gate has no effect now.""" + v = self._crown_validator(tmp_path, block=10_000) + v.event_watcher.cursor = 10_000 + v.state_store.set_last_scored_block(10_000) + v.step = SCORING_WINDOW_BLOCKS # old gate would have fired + assert scoring_due(v) is False # only block progress matters + v.state_store.close() + + def test_window_resumes_from_persisted_frontier(self, tmp_path: Path): + """Criterion: after a restart mid-stream the next window resumes from the + persisted last_scored_block, dropping no block between it and the tip.""" + W = SCORING_WINDOW_BLOCKS + v = self._crown_validator(tmp_path, block=10_000) + v.event_watcher.cursor = 10_000 + # No frontier yet → fall back to the trailing window. + assert scoring_window(v) == (10_000 - W, 10_000) + # Frontier persisted (survives restart) → resume from it, no gap. + v.state_store.set_last_scored_block(9_500) + assert scoring_window(v) == (9_500, 10_000) + v.state_store.close() + + def test_window_end_clamped_to_loaded_cursor(self, tmp_path: Path): + v = self._crown_validator(tmp_path, block=10_000) + v.event_watcher.cursor = 9_800 # tip raced ahead of loaded events + _, window_end = scoring_window(v) + assert window_end == 9_800 # never score past the loaded frontier + v.state_store.close() + + def test_windows_tile_and_credit_full_span(self, tmp_path: Path): + """THE issue #372 case. Advance the chain by 3×SCORING_WINDOW_BLOCKS over + a handful of forward steps (>1 block/step). Consecutive windows must tile + with no gap and no overlap, and a miner holding crown across the whole + span must be credited for the whole span — not just the final window.""" + W = SCORING_WINDOW_BLOCKS + start = 5_000 + v = self._crown_validator(tmp_path, block=start) + v.event_watcher.cursor = start + v.state_store.set_last_scored_block(start) # frontier established here + + span = 3 * W + blocks_per_step = 137 # >1 block/step, deliberately not a divisor of W + windows: list[tuple[int, int]] = [] + crown_total = 0.0 + + def score_one() -> None: + ws, we = scoring_window(v) + if we <= ws: + return + crown = replay_crown_time_window( + store=v.state_store, + event_watcher=v.event_watcher, + from_chain='tao', + to_chain='btc', + window_start=ws, + window_end=we, + rewardable_hotkeys={'hk_a'}, + ) + crown_total_local = crown.get('hk_a', 0.0) + nonlocal crown_total + crown_total += crown_total_local + windows.append((ws, we)) + v.state_store.set_last_scored_block(we) + + while v.block < start + span: + v.block += blocks_per_step + v.event_watcher.cursor = v.block # watcher keeps up each step + if scoring_due(v): + score_one() + # Tail: the next pass (once another full window elapses) would cover + # (last_scored, tip]; pull it in so the span runs frontier→tip and the + # "no block dropped" assertion holds end-to-end. + score_one() + + # Multiple tiles, not one giant window or a single short one. + assert len(windows) >= 3 + # Consecutive windows tile: start of N+1 == end of N (no gap, no overlap). + for (_, prev_end), (next_start, _) in zip(windows, windows[1:]): + assert next_start == prev_end + # Full coverage: frontier → tip, every block in exactly one window. + assert windows[0][0] == start + assert windows[-1][1] == v.block + # The crown holder is credited for the ENTIRE span, not just the last + # window — this is the bug. crown_total equals the contiguous coverage. + assert crown_total == windows[-1][1] - windows[0][0] + assert crown_total >= span # >= 3×SCORING_WINDOW_BLOCKS, far past one window + v.state_store.close() + + class TestHistoricalActiveState: """Replay must judge active state *as of each block* in the window, not as of the scoring moment. A miner inactive at scoring time is still