diff --git a/allways/validator/event_watcher.py b/allways/validator/event_watcher.py index 53d85a0..c1a1238 100644 --- a/allways/validator/event_watcher.py +++ b/allways/validator/event_watcher.py @@ -337,22 +337,30 @@ def sync_to(self, current_block: int) -> None: if current_block <= self.cursor: return end = min(current_block, self.cursor + MAX_BLOCKS_PER_SYNC) + last_processed = self.cursor for block_num in range(self.cursor + 1, end + 1): - self.process_block(block_num) - self.cursor = end + if not self.process_block(block_num): + break + last_processed = block_num + self.cursor = last_processed + if self.cursor < end: + bt.logging.warning( + f'EventWatcher: sync stopped at block {self.cursor} before requested block {end} ' + f'(current_block={current_block}); will retry from block {self.cursor + 1}' + ) if current_block - self.last_prune_block >= EVENT_PRUNE_INTERVAL_BLOCKS: self.prune_old_events(current_block) self.last_prune_block = current_block - def process_block(self, block_num: int) -> None: + def process_block(self, block_num: int) -> bool: try: block_hash = self.substrate.get_block_hash(block_num) if not block_hash: - return + return True events = self.substrate.get_events(block_hash=block_hash) except Exception as e: bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}') - return + return False for event_record in events: decoded = self.decode_contract_event(event_record) @@ -366,6 +374,7 @@ def process_block(self, block_num: int) -> None: # which would re-replay every successful apply_event in the # same block on the next pass and double-apply busy deltas. bt.logging.warning(f'EventWatcher: apply_event {name}@{block_num} failed: {e}') + return True def decode_contract_event(self, event_record: Any) -> Optional[Tuple[str, Dict[str, Any]]]: record = event_record.value if hasattr(event_record, 'value') else event_record diff --git a/tests/test_event_watcher.py b/tests/test_event_watcher.py index 8de4f4f..63e7c11 100644 --- a/tests/test_event_watcher.py +++ b/tests/test_event_watcher.py @@ -9,7 +9,7 @@ import struct from pathlib import Path -from unittest.mock import MagicMock +from unittest.mock import MagicMock, call from bittensor.utils import ss58_decode @@ -404,3 +404,94 @@ def test_bootstrap_tolerates_contract_read_failures(self, tmp_path: Path): assert w.active_miners == set() assert w.cursor == 0 w.state_store.close() + + +class TestSyncToRetryFailures: + def test_clean_sync_advances_cursor_to_end(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.cursor = 10 + w.substrate.get_block_hash.side_effect = lambda block_num: f'hash-{block_num}' + w.substrate.get_events.return_value = [] + + w.sync_to(13) + + assert w.cursor == 13 + assert w.substrate.get_block_hash.call_args_list == [call(11), call(12), call(13)] + w.state_store.close() + + def test_transient_block_fetch_failure_stops_cursor_before_failed_block(self, tmp_path: Path, monkeypatch): + w = make_watcher(tmp_path) + w.cursor = 10 + warning = MagicMock() + monkeypatch.setattr('allways.validator.event_watcher.bt.logging.warning', warning) + + def get_block_hash(block_num: int): + if block_num == 12: + raise RuntimeError('rpc timeout') + return f'hash-{block_num}' + + w.substrate.get_block_hash.side_effect = get_block_hash + w.substrate.get_events.return_value = [] + + w.sync_to(14) + + assert w.cursor == 11 + assert w.substrate.get_block_hash.call_args_list == [call(11), call(12)] + warning.assert_called_once_with( + 'EventWatcher: sync stopped at block 11 before requested block 14 ' + '(current_block=14); will retry from block 12' + ) + w.state_store.close() + + def test_failed_block_is_retried_on_next_sync(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.cursor = 10 + + def flaky_get_block_hash(block_num: int): + if block_num == 12: + raise RuntimeError('rpc timeout') + return f'hash-{block_num}' + + w.substrate.get_block_hash.side_effect = flaky_get_block_hash + w.substrate.get_events.return_value = [] + w.sync_to(14) + assert w.cursor == 11 + + w.substrate.get_block_hash.reset_mock() + w.substrate.get_block_hash.side_effect = lambda block_num: f'hash-{block_num}' + + w.sync_to(14) + + assert w.cursor == 14 + assert w.substrate.get_block_hash.call_args_list == [call(12), call(13), call(14)] + w.state_store.close() + + def test_failed_event_fetch_is_retried_on_next_sync(self, tmp_path: Path): + w = make_watcher(tmp_path) + w.cursor = 10 + w.substrate.get_block_hash.side_effect = lambda block_num: f'hash-{block_num}' + + def flaky_get_events(block_hash: str): + if block_hash == 'hash-12': + raise RuntimeError('rpc timeout') + return [] + + w.substrate.get_events.side_effect = flaky_get_events + w.sync_to(14) + assert w.cursor == 11 + + w.substrate.get_block_hash.reset_mock() + w.substrate.get_events.reset_mock() + w.substrate.get_events.side_effect = None + w.substrate.get_events.return_value = [] + + w.sync_to(14) + + assert w.cursor == 14 + assert w.substrate.get_block_hash.call_args_list == [call(12), call(13), call(14)] + assert w.substrate.get_events.call_args_list == [ + call(block_hash='hash-12'), + call(block_hash='hash-13'), + call(block_hash='hash-14'), + ] + w.state_store.close()