Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions allways/validator/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,12 @@ def cold_bootstrap(
# Wipe first so a crashed prior cold boot (anchors written, cursor not)
# or a stale-cursor fallback can't leave duplicate/orphaned rows.
self.state_store.reset_event_watcher_state()
# A cold boot means events before ``current_block - SCORING_WINDOW_BLOCKS``
# are gone (long outage / pruned RPC). Any persisted scoring frontier
# from before is now unreachable, so clear it: the first post-bootstrap
# window falls back to the trailing window instead of trying to score
# a span whose events no longer exist (issue #372).
self.state_store.set_last_scored_block(0)
if metagraph_hotkeys and contract_client is not None:
for hotkey in metagraph_hotkeys:
try:
Expand Down Expand Up @@ -732,8 +738,16 @@ def prune_old_events(self, current_block: int) -> None:
active event per hotkey is preserved as a state-reconstruction anchor;
busy events are kept while the open-swap count is still > 0 so the
matching -1 isn't orphaned. Mirrors the prune onto the SQL tables so
warm restarts see the same anchor invariants."""
cutoff = current_block - SCORING_WINDOW_BLOCKS
warm restarts see the same anchor invariants.

The cutoff trails the *unscored* frontier — ``min(current_block,
last_scored_block)`` — not ``current_block``. While catching up,
``last_scored_block`` lags the tip; anchoring on it keeps the events
of the not-yet-scored window ``(last_scored, current]`` until the next
scoring pass consumes them (issue #372)."""
last_scored = self.state_store.get_last_scored_block()
horizon = min(current_block, last_scored) if last_scored > 0 else current_block
cutoff = horizon - SCORING_WINDOW_BLOCKS
if cutoff <= 0:
return
if self.busy_events:
Expand Down
10 changes: 6 additions & 4 deletions allways/validator/forward.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from allways.constants import (
CHALLENGE_WINDOW_BLOCKS,
EXTEND_THRESHOLD_BLOCKS,
SCORING_WINDOW_BLOCKS,
)
from allways.contract_client import ContractError, is_contract_rejection
from allways.utils.logging import log_on_change
Expand All @@ -25,7 +24,7 @@
scale_encode_initiate_hash_input,
)
from allways.validator.chain_verification import SwapVerifier
from allways.validator.scoring import score_and_reward_miners
from allways.validator.scoring import score_and_reward_miners, scoring_due
from allways.validator.state_store import PendingConfirm
from allways.validator.swap_tracker import SwapTracker

Expand Down Expand Up @@ -100,9 +99,12 @@ async def forward(self: Validator) -> None:
extend_fulfilled_near_timeout(self)
enforce_swap_timeouts(self, tracker, uncertain_swaps)

if not self.initial_scoring_done or self.step % SCORING_WINDOW_BLOCKS == 0:
# Scoring cadence is anchored to block height, not self.step: score_due
# gates on persisted last_scored_block + event-watcher catch-up so windows
# tile the block axis contiguously and stay aligned across validators
# regardless of step counts or restart history (issue #372).
if scoring_due(self):
score_and_reward_miners(self)
self.initial_scoring_done = True
bt.logging.info('forward: scoring done')


Expand Down
59 changes: 56 additions & 3 deletions allways/validator/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,60 @@ class DirectionTrace:
best_rate: float = 0.0


def scoring_due(self: Validator) -> bool:
"""Block-anchored scoring cadence — replaces the old ``self.step % N`` gate.

Fires once the event-watcher cursor has caught up to the chain tip *and* a
full ``SCORING_WINDOW_BLOCKS`` (or more) has elapsed since the last scored
block. The first pass on a fresh validator (``last_scored == 0``) fires as
soon as the watcher is caught up.

Caught-up is required so window_end lands on the chain tip — a globally
agreed value — rather than on each validator's local sync frontier, and so
a window is never scored ahead of events the watcher has loaded. Decoupled
from ``self.step`` entirely: perturbing the step counter cannot change
which blocks are scored, and two validators replaying identical chain
state from the same ``last_scored_block`` score identical windows."""
if self.event_watcher.cursor < self.block:
return False
last_scored = self.state_store.get_last_scored_block()
if last_scored == 0:
return True
return self.block - last_scored >= SCORING_WINDOW_BLOCKS


def scoring_window(self: Validator) -> Tuple[int, int]:
"""Derive the ``(window_start, window_end]`` block range for this pass.

``window_start`` is the previously-scored block, so consecutive windows
tile the block axis with no gap and no overlap. ``window_end`` is clamped
to the event-watcher cursor so the window never runs ahead of the events
that have actually been loaded (the cursor never exceeds ``self.block``).
On the first pass (``last_scored == 0``) the window falls back to the
trailing ``SCORING_WINDOW_BLOCKS``, anchored by the bootstrap seed."""
last_scored = self.state_store.get_last_scored_block()
window_end = min(self.block, self.event_watcher.cursor)
if last_scored > 0:
# min() guards a backwards step (chain reorg / stale RPC tip): an
# inverted window would credit nothing, never negative credit.
window_start = min(last_scored, window_end)
else:
window_start = max(0, window_end - SCORING_WINDOW_BLOCKS)
return window_start, window_end


def score_and_reward_miners(self: Validator) -> None:
try:
_, window_end = scoring_window(self)
if _contract_is_halted(self):
rewards, miner_uids = build_halted_rewards(self)
else:
rewards, miner_uids = calculate_miner_rewards(self)
self.update_scores(rewards, miner_uids)
# Advance the persisted frontier only after scores update, so a crash
# mid-pass re-scores this window rather than skipping it. Pruning runs
# after this so the prune horizon reflects the just-scored block.
self.state_store.set_last_scored_block(window_end)
prune_rate_events(self)
prune_swap_outcomes(self)
except Exception as e:
Expand Down Expand Up @@ -77,7 +124,14 @@ def build_halted_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:


def prune_rate_events(self: Validator) -> None:
cutoff = self.block - SCORING_WINDOW_BLOCKS
"""Drop rate events older than one scoring window behind the *unscored*
frontier. Anchored on ``min(self.block, last_scored_block)`` rather than
``self.block`` so a window that hasn't been scored yet — last_scored lags
self.block while catching up — never loses its inputs before scoring
consumes them (issue #372)."""
last_scored = self.state_store.get_last_scored_block()
horizon = min(self.block, last_scored) if last_scored > 0 else self.block
cutoff = horizon - SCORING_WINDOW_BLOCKS
if cutoff > 0:
self.state_store.prune_events_older_than(cutoff)

Expand All @@ -100,8 +154,7 @@ def calculate_miner_rewards(self: Validator) -> Tuple[np.ndarray, Set[int]]:
if n_uids == 0:
return np.array([], dtype=np.float32), set()

window_end = self.block
window_start = max(0, window_end - SCORING_WINDOW_BLOCKS)
window_start, window_end = scoring_window(self)

# A miner's *current* active flag is irrelevant to whether they earned
# crown during the replay window. The only at-scoring-time check is
Expand Down
26 changes: 26 additions & 0 deletions allways/validator/state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,32 @@ def set_event_cursor(self, block_num: int) -> None:
)
conn.commit()

def get_last_scored_block(self) -> int:
"""Block height the last scoring pass advanced to (0 if never scored).

Persisted in ``event_watcher_meta`` so the scoring frontier survives a
restart — unlike ``self.step``, which is a per-process counter. Drives
the block-anchored scoring cadence and tiles consecutive scoring
windows: window_start of the next pass is window_end of the last."""
with self.lock:
conn = self.require_connection()
row = conn.execute(
'SELECT value FROM event_watcher_meta WHERE key = ?', ('last_scored_block',)
).fetchone()
return int(row['value']) if row is not None else 0

def set_last_scored_block(self, block_num: int) -> None:
with self.lock:
conn = self.require_connection()
conn.execute(
"""
INSERT INTO event_watcher_meta (key, value) VALUES ('last_scored_block', ?)
ON CONFLICT(key) DO UPDATE SET value = excluded.value
""",
(block_num,),
)
conn.commit()

def add_bootstrapped_swap(self, swap_id: int) -> None:
with self.lock:
conn = self.require_connection()
Expand Down
9 changes: 5 additions & 4 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,11 @@ def __init__(self, config=None):
current_block_fn=lambda: self.block,
)
self.last_known_rates: dict[tuple[str, str, str], float] = {}
# Forces one scoring pass per fresh process so a mid-window restart
# doesn't leave self.scores stale until the next 1200-step boundary
# (which would route emissions to RECYCLE via the empty-norm fallback).
self.initial_scoring_done = False
# Scoring cadence is anchored to block height via the persisted
# last_scored_block (see allways.validator.scoring.scoring_due), not a
# per-process step counter. A fresh process with a warm state.db
# resumes from its persisted frontier; a fresh DB (last_scored == 0)
# scores its first window as soon as the event watcher is caught up.

# Optimistic propose/challenge/finalize for reservation + timeout
# extensions. Stateless decision class — the forward loop drives it
Expand Down
84 changes: 84 additions & 0 deletions tests/test_event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,20 @@ def test_event_cursor_default_is_none_then_round_trips(self, tmp_path: Path):
assert store.get_event_cursor() == 5678
store.close()

def test_last_scored_block_default_zero_then_round_trips(self, tmp_path: Path):
"""The scoring frontier persists across restart (issue #372) — unlike
self.step, it's stored in state.db so a fresh process resumes from it."""
store = ValidatorStateStore(db_path=tmp_path / 'state.db')
assert store.get_last_scored_block() == 0 # never scored
store.set_last_scored_block(9_400)
assert store.get_last_scored_block() == 9_400
store.set_last_scored_block(10_000)
assert store.get_last_scored_block() == 10_000
# Round-trips independently of the event cursor (shares the meta table).
store.set_event_cursor(12_345)
assert store.get_last_scored_block() == 10_000
store.close()

def test_prune_active_events_preserves_latest_per_hotkey(self, tmp_path: Path):
store = ValidatorStateStore(db_path=tmp_path / 'state.db')
store.insert_active_event(100, 'hk_a', True)
Expand Down Expand Up @@ -640,6 +654,49 @@ def test_reset_event_watcher_state_wipes_all_four_tables(self, tmp_path: Path):
store.close()


class TestPruneHorizonTracksScoringFrontier:
"""Issue #372 — the event prune cutoff trails the *unscored* frontier
(min(current_block, last_scored_block)), not the chain tip. A window that
hasn't been scored yet must keep its busy/active inputs until scoring
consumes them."""

def test_lagging_frontier_retains_unscored_events(self, tmp_path: Path):
from allways.constants import SCORING_WINDOW_BLOCKS as W

w = make_watcher(tmp_path)
# Frontier far behind the tip — simulates catch-up after an outage.
w.state_store.set_last_scored_block(1_000)
# hk_a's activate→deactivate both sit inside the unscored window
# (1_000, tip]; the earlier one is NOT the per-hotkey anchor, so under
# the old `tip - W` cutoff it would be pruned before scoring.
w.apply_event(1_200, 'MinerActivated', {'miner': 'hk_a', 'active': True})
w.apply_event(1_700, 'MinerActivated', {'miner': 'hk_a', 'active': False})

tip = 1_000 + 2 * W # old cutoff (tip - W) would drop the 1_200 row
w.prune_old_events(tip)

blocks = sorted(ev.block for ev in w.active_events_by_hotkey['hk_a'])
assert blocks == [1_200, 1_700] # both transitions retained for scoring
w.state_store.close()

def test_caught_up_frontier_prunes_to_window(self, tmp_path: Path):
"""Once the frontier reaches the tip, pruning trims back to one window
behind it — stale anchors aside — same as before the fix."""
from allways.constants import SCORING_WINDOW_BLOCKS as W

w = make_watcher(tmp_path)
tip = 10_000
w.state_store.set_last_scored_block(tip) # frontier caught up
w.apply_event(100, 'MinerActivated', {'miner': 'hk_a', 'active': True})
w.apply_event(tip - W + 100, 'MinerActivated', {'miner': 'hk_a', 'active': False})

w.prune_old_events(tip) # cutoff = tip - W

blocks = [ev.block for ev in w.active_events_by_hotkey['hk_a']]
assert blocks == [tip - W + 100] # ancient row dropped, in-window row kept
w.state_store.close()


class TestEventWatcherWarmRestart:
"""Persisted cursor branches initialize() into hydrate-from-DB vs. cold."""

Expand Down Expand Up @@ -794,6 +851,33 @@ def test_long_outage_falls_back_to_cold_bootstrap(self, tmp_path: Path):
assert w.cursor == current_block - SCORING_WINDOW_BLOCKS
w.state_store.close()

def test_cold_bootstrap_resets_scoring_frontier(self, tmp_path: Path):
"""Issue #372 — a long-outage cold boot abandons the unreachable
pre-bootstrap blocks: the persisted scoring frontier resets to 0 so the
first window falls back to the trailing window instead of trying to
score a span whose events no longer exist."""
from allways.constants import SCORING_WINDOW_BLOCKS

store = ValidatorStateStore(db_path=tmp_path / 'state.db')
store.set_last_scored_block(100) # stale frontier from before the outage
store.set_event_cursor(100) # cursor far behind head → cold path
store.close()

w = ContractEventWatcher(
substrate=MagicMock(),
contract_address=TEST_CONTRACT_ADDRESS,
metadata_path=METADATA_PATH,
state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'),
)
client = MagicMock()
client.get_miner_active_flag.return_value = False
client.get_active_swaps.return_value = []
current_block = 100 + SCORING_WINDOW_BLOCKS + 50 # gap > window
w.initialize(current_block=current_block, metagraph_hotkeys=['hk_a'], contract_client=client)

assert w.state_store.get_last_scored_block() == 0
w.state_store.close()

def test_terminal_event_removes_bootstrapped_swap_from_db(self, tmp_path: Path):
w = make_watcher(tmp_path)
w.state_store.add_bootstrapped_swap(42)
Expand Down
Loading