diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index c555130..63b4d19 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -244,8 +244,12 @@ def __init__( # Swap IDs whose +1 was seeded directly from the contract's active-swap # list during initialize(). Replay must skip their SwapInitiated event # to avoid double-counting — the busy tick is already in open_swap_count. - # Entries are discarded on the matching terminal event. + # Discarded on the terminal event; persisted so warm restart keeps it. self.bootstrapped_swap_ids: Set[int] = set() + # Per-sync_to counters; collapse pruned-block skips into one summary line. + self.pruned_block_count: int = 0 + self.pruned_block_first: Optional[int] = None + self.pruned_block_last: Optional[int] = None # ─── Public API consumed by scoring ───────────────────────────────── @@ -301,9 +305,37 @@ def initialize( metagraph_hotkeys: Optional[List[str]] = None, contract_client: Any = None, ) -> None: - """Cold start: snapshot contract state for every metagraph miner, then - rewind the cursor by one scoring window so ``sync_to`` backfills it - before the first scoring pass runs.""" + """Branch on persisted cursor: warm restart hydrates from state.db so + sync_to picks up where the last process left off (no contract reads, + no replay of pre-cursor history). A fresh DB or a cursor more than + one scoring window behind falls back to cold bootstrap.""" + persisted_cursor = self.state_store.get_event_cursor() + if persisted_cursor is None: + self.cold_bootstrap(current_block, metagraph_hotkeys, contract_client) + return + gap = current_block - persisted_cursor + if gap > SCORING_WINDOW_BLOCKS: + bt.logging.warning( + f'EventWatcher: persisted cursor {persisted_cursor} is {gap} blocks behind ' + f'current {current_block} (> SCORING_WINDOW_BLOCKS={SCORING_WINDOW_BLOCKS}). ' + 'Resetting persistence and falling back to cold bootstrap.' + ) + self.cold_bootstrap(current_block, metagraph_hotkeys, contract_client) + return + self.hydrate_from_db() + + def cold_bootstrap( + self, + current_block: int, + metagraph_hotkeys: Optional[List[str]] = None, + contract_client: Any = None, + ) -> None: + """Snapshot contract state for every metagraph miner, persist the + anchors, then rewind the cursor by one scoring window so ``sync_to`` + backfills it before the first scoring pass runs.""" + # Wipe first so a crashed prior cold boot (anchors written, cursor not) + # or a stale-cursor fallback can't leave duplicate/orphaned rows. + self.state_store.reset_event_watcher_state() if metagraph_hotkeys and contract_client is not None: for hotkey in metagraph_hotkeys: try: @@ -324,9 +356,13 @@ def initialize( swap_id = getattr(swap, 'id', None) if isinstance(swap_id, int): self.bootstrapped_swap_ids.add(swap_id) + self.state_store.add_bootstrapped_swap(swap_id) seen_hotkeys.add(hk) self.open_swap_count[hk] = self.open_swap_count.get(hk, 0) + 1 self.busy_events.append(BusyEvent(hotkey=hk, delta=+1, block=init_block)) + self.state_store.insert_busy_event( + init_block, hk, +1, swap_id if isinstance(swap_id, int) else None + ) if seen_hotkeys: self.busy_events.sort(key=lambda ev: ev.block) bt.logging.info(f'EventWatcher bootstrap: seeded {len(seen_hotkeys)} miners as busy from contract') @@ -341,19 +377,59 @@ def initialize( event = ActiveEvent(hotkey=hotkey, active=True, block=self.cursor) self.active_events.append(event) self.active_events_by_hotkey.setdefault(hotkey, []).append(event) + self.state_store.insert_active_event(self.cursor, hotkey, True) + self.state_store.set_event_cursor(self.cursor) + + def hydrate_from_db(self) -> None: + """Rebuild every in-memory mirror from state.db. Called on warm restart + when the persisted cursor is within one scoring window of head — the + contract is bypassed entirely; DB is treated as source of truth.""" + self.cursor = self.state_store.get_event_cursor() or 0 + self.bootstrapped_swap_ids = self.state_store.load_bootstrapped_swaps() + + active_rows = self.state_store.load_all_active_events() + self.active_events = [ + ActiveEvent(hotkey=r['hotkey'], active=bool(r['active']), block=r['block_num']) for r in active_rows + ] + self.active_events_by_hotkey = {} + latest_active: Dict[str, bool] = {} + for ev in self.active_events: + self.active_events_by_hotkey.setdefault(ev.hotkey, []).append(ev) + latest_active[ev.hotkey] = ev.active + self.active_miners = {hk for hk, is_active in latest_active.items() if is_active} + + busy_rows = self.state_store.load_all_busy_events() + self.busy_events = [BusyEvent(hotkey=r['hotkey'], delta=r['delta'], block=r['block_num']) for r in busy_rows] + counts: Dict[str, int] = {} + for ev in self.busy_events: + counts[ev.hotkey] = counts.get(ev.hotkey, 0) + ev.delta + self.open_swap_count = {hk: c for hk, c in counts.items() if c > 0} + + bt.logging.info( + f'EventWatcher hydrated from DB: cursor={self.cursor}, ' + f'{len(self.active_miners)} active, {sum(self.open_swap_count.values())} open swaps' + ) def sync_to(self, current_block: int) -> None: """Catch up from cursor to ``current_block`` in MAX_BLOCKS_PER_SYNC chunks so a long outage doesn't freeze the forward loop.""" if current_block <= self.cursor: return + self.pruned_block_count = 0 + self.pruned_block_first = None + self.pruned_block_last = None end = min(current_block, self.cursor + MAX_BLOCKS_PER_SYNC) for block_num in range(self.cursor + 1, end + 1): self.process_block(block_num) - self.cursor = end if current_block - self.last_prune_block >= EVENT_PRUNE_INTERVAL_BLOCKS: self.prune_old_events(current_block) self.last_prune_block = current_block + if self.pruned_block_count > 0: + bt.logging.info( + f'EventWatcher: {self.pruned_block_count} pruned blocks skipped ' + f'(blocks {self.pruned_block_first}..{self.pruned_block_last}) — ' + 'RPC node retains only recent state' + ) def process_block(self, block_num: int) -> None: try: @@ -362,7 +438,19 @@ def process_block(self, block_num: int) -> None: return events = self.substrate.get_events(block_hash=block_hash) except Exception as e: - bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}') + msg = str(e).lower() + if ('state' in msg and 'discarded' in msg) or 'pruned' in msg: + # Permanently pruned: advance past it, else cold start loops + # the first pruned block forever and never reaches live state. + self.pruned_block_count += 1 + if self.pruned_block_first is None: + self.pruned_block_first = block_num + self.pruned_block_last = block_num + self.cursor = block_num + self.state_store.set_event_cursor(block_num) + else: + # Transient: hold the cursor so the block is retried next sync. + bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}') return for event_record in events: @@ -377,6 +465,8 @@ def process_block(self, block_num: int) -> None: # which would re-replay every successful apply_event in the # same block on the next pass and double-apply busy deltas. bt.logging.warning(f'EventWatcher: apply_event {name}@{block_num} failed: {e}') + self.cursor = block_num + self.state_store.set_event_cursor(block_num) def decode_contract_event(self, event_record: Any) -> Optional[Tuple[str, Dict[str, Any]]]: record = event_record.value if hasattr(event_record, 'value') else event_record @@ -449,7 +539,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None # double-count the miner as busy. if isinstance(swap_id, int) and swap_id in self.bootstrapped_swap_ids: return - self.apply_busy_delta(block_num, miner, +1) + self.apply_busy_delta(block_num, miner, +1, swap_id if isinstance(swap_id, int) else None) bt.logging.info(f'EventWatcher: {self._label(miner)} SwapInitiated swap=#{swap_id} @ block {block_num}') elif name == 'SwapCompleted': swap_id = values.get('swap_id') @@ -466,8 +556,9 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None from_chain=from_chain, to_chain=to_chain, ) - self.apply_busy_delta(block_num, miner, -1) + self.apply_busy_delta(block_num, miner, -1, swap_id) self.bootstrapped_swap_ids.discard(swap_id) + self.state_store.remove_bootstrapped_swap(swap_id) if self.swap_tracker is not None: self.swap_tracker.resolve(swap_id, SwapStatus.COMPLETED, block_num) bt.logging.info( @@ -489,8 +580,9 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None # Defensive: a SwapInitiated this validator missed would leave # a stale pin behind — clear it on the terminal event too. self.state_store.remove_reservation_pin(miner) - self.apply_busy_delta(block_num, miner, -1) + self.apply_busy_delta(block_num, miner, -1, swap_id) self.bootstrapped_swap_ids.discard(swap_id) + self.state_store.remove_bootstrapped_swap(swap_id) if self.swap_tracker is not None: self.swap_tracker.resolve(swap_id, SwapStatus.TIMED_OUT, block_num) bt.logging.warning( @@ -615,10 +707,12 @@ def record_active_transition(self, block_num: int, hotkey: str, active: bool) -> event = ActiveEvent(hotkey=hotkey, active=active, block=block_num) self.active_events.append(event) self.active_events_by_hotkey.setdefault(hotkey, []).append(event) + self.state_store.insert_active_event(block_num, hotkey, active) - def apply_busy_delta(self, block_num: int, hotkey: str, delta: int) -> None: + def apply_busy_delta(self, block_num: int, hotkey: str, delta: int, swap_id: Optional[int] = None) -> None: """Apply a ±1 transition. Drops any -1 with no matching prior +1 - rather than letting the open-swap count go negative.""" + rather than letting the open-swap count go negative. ``swap_id`` is + persisted on the row for traceability and pairing of +1/-1's.""" if delta == 0: return current = self.open_swap_count.get(hotkey, 0) @@ -631,12 +725,14 @@ def apply_busy_delta(self, block_num: int, hotkey: str, delta: int) -> None: return self.open_swap_count[hotkey] = new_count self.busy_events.append(BusyEvent(hotkey=hotkey, delta=delta, block=block_num)) + self.state_store.insert_busy_event(block_num, hotkey, delta, swap_id) def prune_old_events(self, current_block: int) -> None: """Drop busy and active events older than one scoring window. Latest active event per hotkey is preserved as a state-reconstruction anchor; busy events are kept while the open-swap count is still > 0 so the - matching -1 isn't orphaned.""" + matching -1 isn't orphaned. Mirrors the prune onto the SQL tables so + warm restarts see the same anchor invariants.""" cutoff = current_block - SCORING_WINDOW_BLOCKS if cutoff <= 0: return @@ -657,3 +753,5 @@ def prune_old_events(self, current_block: int) -> None: self.active_events_by_hotkey[hotkey] = pruned else: del self.active_events_by_hotkey[hotkey] + self.state_store.prune_active_events(cutoff) + self.state_store.prune_busy_events(cutoff) diff --git a/allways/validator/state_store.py b/allways/validator/state_store.py index 9551b2e..6a4c0a2 100644 --- a/allways/validator/state_store.py +++ b/allways/validator/state_store.py @@ -1,11 +1,14 @@ """SQLite-backed store for all validator-local state. Tables: ``pending_confirms`` (axon→forward queue), ``rate_events`` (crown-time -input), ``swap_outcomes`` (credibility ledger). Single connection guarded by -one lock; opened with ``check_same_thread=False``. ``busy_timeout`` is set -before ``journal_mode=WAL`` because the WAL flip takes a brief exclusive lock -that concurrent openers would otherwise hit as "database is locked" — the -local dev env runs two validators against the same file. +input), ``swap_outcomes`` (credibility ledger), ``active_events`` + +``busy_events`` + ``event_watcher_meta`` + ``bootstrapped_swaps`` (event +watcher persistence — warm restarts hydrate from these instead of replaying +contract history). Single connection guarded by one lock; opened with +``check_same_thread=False``. ``busy_timeout`` is set before +``journal_mode=WAL`` because the WAL flip takes a brief exclusive lock that +concurrent openers would otherwise hit as "database is locked" — the local +dev env runs two validators against the same file. """ import sqlite3 @@ -13,7 +16,7 @@ import time from dataclasses import dataclass, field from pathlib import Path -from typing import Callable, Dict, List, Optional, Tuple +from typing import Callable, Dict, List, Optional, Set, Tuple @dataclass @@ -480,6 +483,129 @@ def prune_swap_outcomes_older_than(self, cutoff_block: int) -> None: conn.execute('DELETE FROM swap_outcomes WHERE resolved_block < ?', (cutoff_block,)) conn.commit() + # ─── event_watcher state ──────────────────────────────────────────── + + def insert_active_event(self, block_num: int, hotkey: str, active: bool) -> None: + with self.lock: + conn = self.require_connection() + conn.execute( + 'INSERT INTO active_events (block_num, hotkey, active) VALUES (?, ?, ?)', + (block_num, hotkey, 1 if active else 0), + ) + conn.commit() + + def insert_busy_event(self, block_num: int, hotkey: str, delta: int, swap_id: Optional[int] = None) -> None: + with self.lock: + conn = self.require_connection() + conn.execute( + 'INSERT INTO busy_events (block_num, hotkey, delta, swap_id) VALUES (?, ?, ?, ?)', + (block_num, hotkey, delta, swap_id), + ) + conn.commit() + + def load_all_active_events(self) -> List[dict]: + with self.lock: + conn = self.require_connection() + rows = conn.execute( + 'SELECT block_num, hotkey, active FROM active_events ORDER BY block_num ASC, id ASC' + ).fetchall() + return [{'block_num': r['block_num'], 'hotkey': r['hotkey'], 'active': bool(r['active'])} for r in rows] + + def load_all_busy_events(self) -> List[dict]: + with self.lock: + conn = self.require_connection() + rows = conn.execute( + 'SELECT block_num, hotkey, delta, swap_id FROM busy_events ORDER BY block_num ASC, id ASC' + ).fetchall() + return [ + {'block_num': r['block_num'], 'hotkey': r['hotkey'], 'delta': r['delta'], 'swap_id': r['swap_id']} + for r in rows + ] + + def get_event_cursor(self) -> Optional[int]: + with self.lock: + conn = self.require_connection() + row = conn.execute('SELECT value FROM event_watcher_meta WHERE key = ?', ('cursor',)).fetchone() + return int(row['value']) if row is not None else None + + def set_event_cursor(self, block_num: int) -> None: + with self.lock: + conn = self.require_connection() + conn.execute( + """ + INSERT INTO event_watcher_meta (key, value) VALUES ('cursor', ?) + ON CONFLICT(key) DO UPDATE SET value = excluded.value + """, + (block_num,), + ) + conn.commit() + + def add_bootstrapped_swap(self, swap_id: int) -> None: + with self.lock: + conn = self.require_connection() + conn.execute('INSERT OR IGNORE INTO bootstrapped_swaps (swap_id) VALUES (?)', (swap_id,)) + conn.commit() + + def remove_bootstrapped_swap(self, swap_id: int) -> None: + with self.lock: + conn = self.require_connection() + conn.execute('DELETE FROM bootstrapped_swaps WHERE swap_id = ?', (swap_id,)) + conn.commit() + + def load_bootstrapped_swaps(self) -> Set[int]: + with self.lock: + conn = self.require_connection() + rows = conn.execute('SELECT swap_id FROM bootstrapped_swaps').fetchall() + return {int(r['swap_id']) for r in rows} + + def prune_active_events(self, cutoff_block: int) -> None: + """Drop active events older than ``cutoff_block``, preserving the latest + row per hotkey as a state-reconstruction anchor (mirrors the in-memory + prune's anchor-preservation rule).""" + if cutoff_block <= 0: + return + with self.lock: + conn = self.require_connection() + conn.execute( + """ + DELETE FROM active_events + WHERE block_num < ? + AND id NOT IN (SELECT MAX(id) FROM active_events GROUP BY hotkey) + """, + (cutoff_block,), + ) + conn.commit() + + def prune_busy_events(self, cutoff_block: int) -> None: + """Drop busy events older than ``cutoff_block`` except for hotkeys whose + SUM(delta) > 0 — those still have an open swap, so we keep their full + +1/-1 history so a future SwapCompleted's -1 isn't orphaned.""" + if cutoff_block <= 0: + return + with self.lock: + conn = self.require_connection() + conn.execute( + """ + DELETE FROM busy_events + WHERE block_num < ? + AND hotkey NOT IN (SELECT hotkey FROM busy_events GROUP BY hotkey HAVING SUM(delta) > 0) + """, + (cutoff_block,), + ) + conn.commit() + + def reset_event_watcher_state(self) -> None: + """Wipe all event-watcher persistence. Used when the cursor is more than + a scoring window behind current — the chain has moved past replayable + history so we fall back to cold bootstrap from the contract.""" + with self.lock: + conn = self.require_connection() + conn.execute('DELETE FROM active_events') + conn.execute('DELETE FROM busy_events') + conn.execute("DELETE FROM event_watcher_meta WHERE key = 'cursor'") + conn.execute('DELETE FROM bootstrapped_swaps') + conn.commit() + # ─── cross-table maintenance ──────────────────────────────────────── def delete_hotkey(self, hotkey: str) -> None: @@ -586,6 +712,38 @@ def init_db(self) -> None: ); CREATE INDEX IF NOT EXISTS idx_reservation_pins_reserved_until ON reservation_pins(reserved_until); + + CREATE TABLE IF NOT EXISTS active_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + block_num INTEGER NOT NULL, + hotkey TEXT NOT NULL, + active INTEGER NOT NULL + ); + CREATE INDEX IF NOT EXISTS idx_active_events_block + ON active_events(block_num); + CREATE INDEX IF NOT EXISTS idx_active_events_hotkey + ON active_events(hotkey); + + CREATE TABLE IF NOT EXISTS busy_events ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + block_num INTEGER NOT NULL, + hotkey TEXT NOT NULL, + delta INTEGER NOT NULL, + swap_id INTEGER + ); + CREATE INDEX IF NOT EXISTS idx_busy_events_block + ON busy_events(block_num); + CREATE INDEX IF NOT EXISTS idx_busy_events_hotkey + ON busy_events(hotkey); + + CREATE TABLE IF NOT EXISTS event_watcher_meta ( + key TEXT PRIMARY KEY, + value INTEGER NOT NULL + ); + + CREATE TABLE IF NOT EXISTS bootstrapped_swaps ( + swap_id INTEGER PRIMARY KEY + ); """ ) # Ensure newer columns exist on DBs created by older validator diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py index 19f6f01..baa6fbf 100644 --- a/tests/test_event_watcher.py +++ b/tests/test_event_watcher.py @@ -535,3 +535,401 @@ def test_bootstrap_tolerates_contract_read_failures(self, tmp_path: Path): assert w.active_miners == set() assert w.cursor == 0 w.state_store.close() + + +class TestStateStoreEventTables: + """Direct exercise of the new event-watcher tables on ValidatorStateStore.""" + + def test_init_db_creates_event_tables(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + conn = store.require_connection() + names = {r[0] for r in conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()} + for expected in ('active_events', 'busy_events', 'event_watcher_meta', 'bootstrapped_swaps'): + assert expected in names + store.close() + + def test_init_db_is_idempotent(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_active_event(100, 'hk_a', True) + store.close() + store2 = ValidatorStateStore(db_path=tmp_path / 'state.db') + assert store2.load_all_active_events() == [{'block_num': 100, 'hotkey': 'hk_a', 'active': True}] + store2.close() + + def test_insert_and_load_active_events_round_trip(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_active_event(100, 'hk_a', True) + store.insert_active_event(200, 'hk_b', False) + store.insert_active_event(150, 'hk_a', False) + loaded = store.load_all_active_events() + assert loaded == [ + {'block_num': 100, 'hotkey': 'hk_a', 'active': True}, + {'block_num': 150, 'hotkey': 'hk_a', 'active': False}, + {'block_num': 200, 'hotkey': 'hk_b', 'active': False}, + ] + store.close() + + def test_insert_and_load_busy_events_round_trip(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_busy_event(100, 'hk_a', +1, 7) + store.insert_busy_event(150, 'hk_a', -1, 7) + store.insert_busy_event(120, 'hk_b', +1, None) + loaded = store.load_all_busy_events() + assert loaded == [ + {'block_num': 100, 'hotkey': 'hk_a', 'delta': 1, 'swap_id': 7}, + {'block_num': 120, 'hotkey': 'hk_b', 'delta': 1, 'swap_id': None}, + {'block_num': 150, 'hotkey': 'hk_a', 'delta': -1, 'swap_id': 7}, + ] + store.close() + + def test_event_cursor_default_is_none_then_round_trips(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + assert store.get_event_cursor() is None + store.set_event_cursor(1234) + assert store.get_event_cursor() == 1234 + store.set_event_cursor(5678) + assert store.get_event_cursor() == 5678 + store.close() + + def test_prune_active_events_preserves_latest_per_hotkey(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_active_event(100, 'hk_a', True) + store.insert_active_event(200, 'hk_a', False) + store.insert_active_event(100, 'hk_b', True) + store.prune_active_events(cutoff_block=300) + remaining = store.load_all_active_events() + # hk_a's (100, True) is dropped; (200, False) is its latest anchor. + # hk_b's only row is preserved as its own anchor even though < cutoff. + assert remaining == [ + {'block_num': 100, 'hotkey': 'hk_b', 'active': True}, + {'block_num': 200, 'hotkey': 'hk_a', 'active': False}, + ] + store.close() + + def test_prune_busy_events_preserves_open_swaps(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_busy_event(100, 'hk_a', +1, 1) + store.insert_busy_event(150, 'hk_a', -1, 1) # hk_a SUM=0 → fully prunable + store.insert_busy_event(100, 'hk_b', +1, 2) # hk_b SUM=+1 → keep + store.prune_busy_events(cutoff_block=200) + remaining = store.load_all_busy_events() + assert remaining == [{'block_num': 100, 'hotkey': 'hk_b', 'delta': 1, 'swap_id': 2}] + store.close() + + def test_bootstrapped_swaps_add_remove_load(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.add_bootstrapped_swap(7) + store.add_bootstrapped_swap(9) + store.add_bootstrapped_swap(7) # idempotent + assert store.load_bootstrapped_swaps() == {7, 9} + store.remove_bootstrapped_swap(7) + assert store.load_bootstrapped_swaps() == {9} + store.close() + + def test_reset_event_watcher_state_wipes_all_four_tables(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.insert_active_event(100, 'hk_a', True) + store.insert_busy_event(100, 'hk_a', +1, 1) + store.set_event_cursor(123) + store.add_bootstrapped_swap(1) + store.reset_event_watcher_state() + assert store.load_all_active_events() == [] + assert store.load_all_busy_events() == [] + assert store.get_event_cursor() is None + assert store.load_bootstrapped_swaps() == set() + store.close() + + +class TestEventWatcherWarmRestart: + """Persisted cursor branches initialize() into hydrate-from-DB vs. cold.""" + + def make_swap(self, swap_id: int, hotkey: str, initiated_block: int): + s = MagicMock() + s.id = swap_id + s.miner_hotkey = hotkey + s.initiated_block = initiated_block + return s + + def test_cold_bootstrap_writes_anchors_and_cursor(self, tmp_path: Path): + from allways.constants import SCORING_WINDOW_BLOCKS + + w = make_watcher(tmp_path) + client = MagicMock() + client.get_miner_active_flag.side_effect = lambda hk: hk in {'hk_a', 'hk_b'} + client.get_active_swaps.return_value = [self.make_swap(42, 'hk_a', initiated_block=950)] + + current_block = SCORING_WINDOW_BLOCKS + 500 + w.initialize(current_block=current_block, metagraph_hotkeys=['hk_a', 'hk_b'], contract_client=client) + + cursor_expected = current_block - SCORING_WINDOW_BLOCKS + assert w.state_store.get_event_cursor() == cursor_expected + loaded_active = w.state_store.load_all_active_events() + assert {(r['hotkey'], r['active']) for r in loaded_active} == {('hk_a', True), ('hk_b', True)} + loaded_busy = w.state_store.load_all_busy_events() + assert loaded_busy == [{'block_num': 950, 'hotkey': 'hk_a', 'delta': 1, 'swap_id': 42}] + assert w.state_store.load_bootstrapped_swaps() == {42} + w.state_store.close() + + def test_warm_restart_hydrates_without_contract_reads(self, tmp_path: Path): + from allways.constants import SCORING_WINDOW_BLOCKS + + # First boot: cold. + w1 = make_watcher(tmp_path) + client = MagicMock() + client.get_miner_active_flag.side_effect = lambda hk: hk == 'hk_a' + client.get_active_swaps.return_value = [self.make_swap(99, 'hk_a', initiated_block=900)] + current_block = SCORING_WINDOW_BLOCKS + 500 + w1.initialize(current_block=current_block, metagraph_hotkeys=['hk_a'], contract_client=client) + w1.state_store.close() + + # Second boot: contract_client must NOT be called. + w2 = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + strict_client = MagicMock() + strict_client.get_miner_active_flag.side_effect = AssertionError('warm restart must not call contract') + strict_client.get_active_swaps.side_effect = AssertionError('warm restart must not call contract') + # Second boot at the same head — keeps gap within SCORING_WINDOW_BLOCKS. + w2.initialize(current_block=current_block, metagraph_hotkeys=['hk_a'], contract_client=strict_client) + + assert w2.active_miners == {'hk_a'} + assert w2.open_swap_count == {'hk_a': 1} + assert w2.bootstrapped_swap_ids == {99} + assert w2.cursor == current_block - SCORING_WINDOW_BLOCKS + w2.state_store.close() + + def test_cold_bootstrap_is_idempotent_against_crashed_prior_boot(self, tmp_path: Path): + """A cold boot that wrote anchors but died before the cursor write must + not leave duplicate anchors when the next cold boot runs.""" + from allways.constants import SCORING_WINDOW_BLOCKS + + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + # Simulate a crashed prior cold boot: anchor rows present, no cursor. + anchor_block = (SCORING_WINDOW_BLOCKS + 500) - SCORING_WINDOW_BLOCKS + store.insert_active_event(anchor_block, 'hk_a', True) + store.add_bootstrapped_swap(7) + assert store.get_event_cursor() is None # cursor never landed + store.close() + + w = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + client = MagicMock() + client.get_miner_active_flag.side_effect = lambda hk: hk == 'hk_a' + client.get_active_swaps.return_value = [] + w.initialize(current_block=SCORING_WINDOW_BLOCKS + 500, metagraph_hotkeys=['hk_a'], contract_client=client) + + # Exactly one anchor for hk_a — the stale row was wiped, not duplicated. + rows = w.state_store.load_all_active_events() + assert rows == [{'block_num': anchor_block, 'hotkey': 'hk_a', 'active': True}] + # Orphaned bootstrapped swap from the crashed boot is gone. + assert w.state_store.load_bootstrapped_swaps() == set() + w.state_store.close() + + def test_warm_restart_rebuilds_open_swap_count_with_multiple_swaps(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.set_event_cursor(1000) + store.insert_busy_event(500, 'hk_a', +1, 1) + store.insert_busy_event(600, 'hk_a', +1, 2) + store.close() + + w = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + w.initialize(current_block=1100) + assert w.open_swap_count == {'hk_a': 2} + w.state_store.close() + + def test_warm_restart_drops_closed_hotkeys_from_open_swap_count(self, tmp_path: Path): + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.set_event_cursor(1000) + store.insert_busy_event(500, 'hk_a', +1, 1) + store.insert_busy_event(600, 'hk_a', -1, 1) + store.close() + + w = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + w.initialize(current_block=1100) + assert 'hk_a' not in w.open_swap_count + w.state_store.close() + + def test_long_outage_falls_back_to_cold_bootstrap(self, tmp_path: Path): + from allways.constants import SCORING_WINDOW_BLOCKS + + # Persist a stale cursor far behind head. + store = ValidatorStateStore(db_path=tmp_path / 'state.db') + store.set_event_cursor(100) + store.insert_active_event(50, 'hk_old', True) + store.close() + + w = ContractEventWatcher( + substrate=MagicMock(), + contract_address=TEST_CONTRACT_ADDRESS, + metadata_path=METADATA_PATH, + state_store=ValidatorStateStore(db_path=tmp_path / 'state.db'), + ) + client = MagicMock() + client.get_miner_active_flag.side_effect = lambda hk: hk == 'hk_new' + client.get_active_swaps.return_value = [] + current_block = 100 + SCORING_WINDOW_BLOCKS + 50 + w.initialize(current_block=current_block, metagraph_hotkeys=['hk_new'], contract_client=client) + + # Stale rows were wiped; new cold-bootstrap anchor for hk_new exists. + loaded = w.state_store.load_all_active_events() + assert {r['hotkey'] for r in loaded} == {'hk_new'} + assert w.active_miners == {'hk_new'} + assert w.cursor == current_block - SCORING_WINDOW_BLOCKS + w.state_store.close() + + def test_terminal_event_removes_bootstrapped_swap_from_db(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.state_store.add_bootstrapped_swap(42) + w.bootstrapped_swap_ids.add(42) + w.apply_event(2000, 'SwapCompleted', {'swap_id': 42, 'miner': 'hk_a', 'tao_amount': 100}) + assert 42 not in w.state_store.load_bootstrapped_swaps() + assert 42 not in w.bootstrapped_swap_ids + w.state_store.close() + + +class TestEventWatcherWriteThrough: + """In-memory transitions also persist.""" + + def test_record_active_transition_writes_to_db(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.record_active_transition(500, 'hk_a', True) + loaded = w.state_store.load_all_active_events() + assert loaded == [{'block_num': 500, 'hotkey': 'hk_a', 'active': True}] + w.state_store.close() + + def test_apply_busy_delta_writes_to_db_with_swap_id(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.apply_busy_delta(500, 'hk_a', +1, swap_id=7) + loaded = w.state_store.load_all_busy_events() + assert loaded == [{'block_num': 500, 'hotkey': 'hk_a', 'delta': 1, 'swap_id': 7}] + w.state_store.close() + + def test_process_block_advances_cursor_per_block(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.substrate.get_block_hash.side_effect = lambda b: f'0x{b:064x}' + w.substrate.get_events.return_value = [] + w.cursor = 99 + w.sync_to(101) + assert w.cursor == 101 + assert w.state_store.get_event_cursor() == 101 + w.state_store.close() + + +class TestEventWatcherLogHygiene: + """Pruned-block errors collapse into a single summary line.""" + + def test_pruned_block_error_increments_counter_silently(self, tmp_path: Path, caplog): + import logging + + w = make_watcher(tmp_path) + w.substrate.get_block_hash.side_effect = RuntimeError( + 'Other error: -32603: Unable to fetch block at hash 0x...: State already discarded' + ) + w.cursor = 0 + caplog.set_level(logging.INFO) + w.sync_to(5) + assert w.pruned_block_count == 5 + assert w.pruned_block_first == 1 + assert w.pruned_block_last == 5 + # Cursor MUST advance through pruned blocks — they are permanently + # unavailable, so stalling here would never reach the live region. + assert w.cursor == 5 + assert w.state_store.get_event_cursor() == 5 + w.state_store.close() + + def test_cursor_does_not_stall_on_pruned_cold_start_region(self, tmp_path: Path): + """Regression: a cold start whose first blocks are all pruned must + still march the cursor forward until it reaches a live block, rather + than looping on the first pruned block forever.""" + from allways.validator import event_watcher as ew_module + + w = make_watcher(tmp_path) + original_chunk = ew_module.MAX_BLOCKS_PER_SYNC + ew_module.MAX_BLOCKS_PER_SYNC = 100 + try: + live_from = 60 + + def events_for(block_hash): + block = int(block_hash, 16) + if block < live_from: + raise RuntimeError('State already discarded') + return [] + + w.substrate.get_block_hash.side_effect = lambda block: f'0x{block:064x}' + w.substrate.get_events.side_effect = events_for + w.cursor = 0 + w.sync_to(80) + # Walked through the 1..59 pruned zone and on to the live tail. + assert w.cursor == 80 + assert w.state_store.get_event_cursor() == 80 + finally: + ew_module.MAX_BLOCKS_PER_SYNC = original_chunk + w.state_store.close() + + def test_unrelated_exception_holds_cursor_for_retry(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.substrate.get_block_hash.side_effect = RuntimeError('connection refused') + w.cursor = 0 + w.sync_to(2) + # Transient error is not pruned-state — counter stays at zero AND the + # cursor holds so the block is retried next sync. + assert w.pruned_block_count == 0 + assert w.cursor == 0 + w.state_store.close() + + def test_pruned_block_counter_resets_between_sync_calls(self, tmp_path: Path): + from allways.validator import event_watcher as ew_module + + w = make_watcher(tmp_path) + original_chunk = ew_module.MAX_BLOCKS_PER_SYNC + ew_module.MAX_BLOCKS_PER_SYNC = 10 + try: + calls = {'n': 0} + + def hash_for(block): + calls['n'] += 1 + if calls['n'] <= 3: + raise RuntimeError('State already discarded') + return f'0x{block:064x}' + + w.substrate.get_block_hash.side_effect = hash_for + w.substrate.get_events.return_value = [] + w.cursor = 0 + w.sync_to(10) + assert w.pruned_block_count == 3 + w.sync_to(20) + # All later blocks succeed → counter should reset to zero. + assert w.pruned_block_count == 0 + finally: + ew_module.MAX_BLOCKS_PER_SYNC = original_chunk + w.state_store.close() + + +class TestSwapOutcomesIdempotency: + """Re-applying terminal events doesn't duplicate swap_outcomes rows.""" + + def test_replaying_swap_completed_does_not_duplicate_outcome(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.apply_event(1000, 'SwapCompleted', {'swap_id': 42, 'miner': 'hk_a', 'tao_amount': 500}) + w.apply_event(1000, 'SwapCompleted', {'swap_id': 42, 'miner': 'hk_a', 'tao_amount': 500}) + rows = w.state_store.get_success_rates_since(0) + # Two SwapCompleted apply()s, but swap_outcomes is keyed by swap_id (INSERT OR REPLACE). + assert rows.get('hk_a') == (1, 0) + w.state_store.close()