Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 112 additions & 12 deletions allways/validator/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,16 @@ def __init__(
# Swap IDs whose +1 was seeded directly from the contract's active-swap
# list during initialize(). Replay must skip their SwapInitiated event
# to avoid double-counting — the busy tick is already in open_swap_count.
# Entries are discarded on the matching terminal event.
# Entries are discarded on the matching terminal event. Persisted to
# bootstrapped_swaps so a warm restart preserves the skip-list.
self.bootstrapped_swap_ids: Set[int] = set()
# Pruned-block counters: each sync_to resets these and emits a single
# summary INFO line at the end if any blocks were skipped because their
# state had been pruned by the RPC node (public finney nodes keep only
# ~240 blocks of state, but the watcher replays a full scoring window).
self.pruned_block_count: int = 0
self.pruned_block_first: Optional[int] = None
self.pruned_block_last: Optional[int] = None

# ─── Public API consumed by scoring ─────────────────────────────────

Expand Down Expand Up @@ -286,9 +294,35 @@ def initialize(
metagraph_hotkeys: Optional[List[str]] = None,
contract_client: Any = None,
) -> None:
"""Cold start: snapshot contract state for every metagraph miner, then
rewind the cursor by one scoring window so ``sync_to`` backfills it
before the first scoring pass runs."""
"""Branch on persisted cursor: warm restart hydrates from state.db so
sync_to picks up where the last process left off (no contract reads,
no replay of pre-cursor history). A fresh DB or a cursor more than
one scoring window behind falls back to cold bootstrap."""
persisted_cursor = self.state_store.get_event_cursor()
if persisted_cursor is None:
self.cold_bootstrap(current_block, metagraph_hotkeys, contract_client)
return
gap = current_block - persisted_cursor
if gap > SCORING_WINDOW_BLOCKS:
bt.logging.warning(
f'EventWatcher: persisted cursor {persisted_cursor} is {gap} blocks behind '
f'current {current_block} (> SCORING_WINDOW_BLOCKS={SCORING_WINDOW_BLOCKS}). '
'Resetting persistence and falling back to cold bootstrap.'
)
self.state_store.reset_event_watcher_state()
self.cold_bootstrap(current_block, metagraph_hotkeys, contract_client)
return
self.hydrate_from_db()

def cold_bootstrap(
self,
current_block: int,
metagraph_hotkeys: Optional[List[str]] = None,
contract_client: Any = None,
) -> None:
"""Snapshot contract state for every metagraph miner, persist the
anchors, then rewind the cursor by one scoring window so ``sync_to``
backfills it before the first scoring pass runs."""
if metagraph_hotkeys and contract_client is not None:
for hotkey in metagraph_hotkeys:
try:
Expand All @@ -309,9 +343,13 @@ def initialize(
swap_id = getattr(swap, 'id', None)
if isinstance(swap_id, int):
self.bootstrapped_swap_ids.add(swap_id)
self.state_store.add_bootstrapped_swap(swap_id)
seen_hotkeys.add(hk)
self.open_swap_count[hk] = self.open_swap_count.get(hk, 0) + 1
self.busy_events.append(BusyEvent(hotkey=hk, delta=+1, block=init_block))
self.state_store.insert_busy_event(
init_block, hk, +1, swap_id if isinstance(swap_id, int) else None
)
if seen_hotkeys:
self.busy_events.sort(key=lambda ev: ev.block)
bt.logging.info(f'EventWatcher bootstrap: seeded {len(seen_hotkeys)} miners as busy from contract')
Expand All @@ -326,19 +364,59 @@ def initialize(
event = ActiveEvent(hotkey=hotkey, active=True, block=self.cursor)
self.active_events.append(event)
self.active_events_by_hotkey.setdefault(hotkey, []).append(event)
self.state_store.insert_active_event(self.cursor, hotkey, True)
self.state_store.set_event_cursor(self.cursor)

def hydrate_from_db(self) -> None:
"""Rebuild every in-memory mirror from state.db. Called on warm restart
when the persisted cursor is within one scoring window of head — the
contract is bypassed entirely; DB is treated as source of truth."""
self.cursor = self.state_store.get_event_cursor() or 0
self.bootstrapped_swap_ids = self.state_store.load_bootstrapped_swaps()

active_rows = self.state_store.load_all_active_events()
self.active_events = [
ActiveEvent(hotkey=r['hotkey'], active=bool(r['active']), block=r['block_num']) for r in active_rows
]
self.active_events_by_hotkey = {}
latest_active: Dict[str, bool] = {}
for ev in self.active_events:
self.active_events_by_hotkey.setdefault(ev.hotkey, []).append(ev)
latest_active[ev.hotkey] = ev.active
self.active_miners = {hk for hk, is_active in latest_active.items() if is_active}

busy_rows = self.state_store.load_all_busy_events()
self.busy_events = [BusyEvent(hotkey=r['hotkey'], delta=r['delta'], block=r['block_num']) for r in busy_rows]
counts: Dict[str, int] = {}
for ev in self.busy_events:
counts[ev.hotkey] = counts.get(ev.hotkey, 0) + ev.delta
self.open_swap_count = {hk: c for hk, c in counts.items() if c > 0}

bt.logging.info(
f'EventWatcher hydrated from DB: cursor={self.cursor}, '
f'{len(self.active_miners)} active, {sum(self.open_swap_count.values())} open swaps'
)

def sync_to(self, current_block: int) -> None:
"""Catch up from cursor to ``current_block`` in MAX_BLOCKS_PER_SYNC
chunks so a long outage doesn't freeze the forward loop."""
if current_block <= self.cursor:
return
self.pruned_block_count = 0
self.pruned_block_first = None
self.pruned_block_last = None
end = min(current_block, self.cursor + MAX_BLOCKS_PER_SYNC)
for block_num in range(self.cursor + 1, end + 1):
self.process_block(block_num)
self.cursor = end
if current_block - self.last_prune_block >= EVENT_PRUNE_INTERVAL_BLOCKS:
self.prune_old_events(current_block)
self.last_prune_block = current_block
if self.pruned_block_count > 0:
bt.logging.info(
f'EventWatcher: {self.pruned_block_count} pruned blocks skipped '
f'(blocks {self.pruned_block_first}..{self.pruned_block_last}) — '
'RPC node retains only recent state'
)

def process_block(self, block_num: int) -> None:
try:
Expand All @@ -347,7 +425,19 @@ def process_block(self, block_num: int) -> None:
return
events = self.substrate.get_events(block_hash=block_hash)
except Exception as e:
bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}')
msg = str(e).lower()
# Public finney nodes drop state past ~240 blocks; replaying a full
# scoring window after restart hits this on every old block.
# Collapse into one summary line per sync_to instead of logging
# per-block. Substrate-interface raises this as "State already
# discarded" or "state pruned" — match liberally on either word.
if ('state' in msg and 'discarded' in msg) or 'pruned' in msg:
self.pruned_block_count += 1
if self.pruned_block_first is None:
self.pruned_block_first = block_num
self.pruned_block_last = block_num
else:
bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}')
return

for event_record in events:
Expand All @@ -362,6 +452,8 @@ def process_block(self, block_num: int) -> None:
# which would re-replay every successful apply_event in the
# same block on the next pass and double-apply busy deltas.
bt.logging.warning(f'EventWatcher: apply_event {name}@{block_num} failed: {e}')
self.cursor = block_num
self.state_store.set_event_cursor(block_num)

def decode_contract_event(self, event_record: Any) -> Optional[Tuple[str, Dict[str, Any]]]:
record = event_record.value if hasattr(event_record, 'value') else event_record
Expand Down Expand Up @@ -420,7 +512,7 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
# double-count the miner as busy.
if isinstance(swap_id, int) and swap_id in self.bootstrapped_swap_ids:
return
self.apply_busy_delta(block_num, miner, +1)
self.apply_busy_delta(block_num, miner, +1, swap_id if isinstance(swap_id, int) else None)
elif name == 'SwapCompleted':
swap_id = values.get('swap_id')
miner = values.get('miner', '')
Expand All @@ -432,8 +524,9 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
resolved_block=block_num,
tao_amount=int(values.get('tao_amount') or 0),
)
self.apply_busy_delta(block_num, miner, -1)
self.apply_busy_delta(block_num, miner, -1, swap_id)
self.bootstrapped_swap_ids.discard(swap_id)
self.state_store.remove_bootstrapped_swap(swap_id)
if self.swap_tracker is not None:
self.swap_tracker.resolve(swap_id, SwapStatus.COMPLETED, block_num)
elif name == 'SwapTimedOut':
Expand All @@ -446,8 +539,9 @@ def apply_event(self, block_num: int, name: str, values: Dict[str, Any]) -> None
completed=False,
resolved_block=block_num,
)
self.apply_busy_delta(block_num, miner, -1)
self.apply_busy_delta(block_num, miner, -1, swap_id)
self.bootstrapped_swap_ids.discard(swap_id)
self.state_store.remove_bootstrapped_swap(swap_id)
if self.swap_tracker is not None:
self.swap_tracker.resolve(swap_id, SwapStatus.TIMED_OUT, block_num)
elif name == 'ReservationExtensionFinalized':
Expand Down Expand Up @@ -482,10 +576,12 @@ def record_active_transition(self, block_num: int, hotkey: str, active: bool) ->
event = ActiveEvent(hotkey=hotkey, active=active, block=block_num)
self.active_events.append(event)
self.active_events_by_hotkey.setdefault(hotkey, []).append(event)
self.state_store.insert_active_event(block_num, hotkey, active)

def apply_busy_delta(self, block_num: int, hotkey: str, delta: int) -> None:
def apply_busy_delta(self, block_num: int, hotkey: str, delta: int, swap_id: Optional[int] = None) -> None:
"""Apply a ±1 transition. Drops any -1 with no matching prior +1
rather than letting the open-swap count go negative."""
rather than letting the open-swap count go negative. ``swap_id`` is
persisted on the row for traceability and pairing of +1/-1's."""
if delta == 0:
return
current = self.open_swap_count.get(hotkey, 0)
Expand All @@ -498,12 +594,14 @@ def apply_busy_delta(self, block_num: int, hotkey: str, delta: int) -> None:
return
self.open_swap_count[hotkey] = new_count
self.busy_events.append(BusyEvent(hotkey=hotkey, delta=delta, block=block_num))
self.state_store.insert_busy_event(block_num, hotkey, delta, swap_id)

def prune_old_events(self, current_block: int) -> None:
"""Drop busy and active events older than one scoring window. Latest
active event per hotkey is preserved as a state-reconstruction anchor;
busy events are kept while the open-swap count is still > 0 so the
matching -1 isn't orphaned."""
matching -1 isn't orphaned. Mirrors the prune onto the SQL tables so
warm restarts see the same anchor invariants."""
cutoff = current_block - SCORING_WINDOW_BLOCKS
if cutoff <= 0:
return
Expand All @@ -524,3 +622,5 @@ def prune_old_events(self, current_block: int) -> None:
self.active_events_by_hotkey[hotkey] = pruned
else:
del self.active_events_by_hotkey[hotkey]
self.state_store.prune_active_events(cutoff)
self.state_store.prune_busy_events(cutoff)
Loading
Loading