Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions allways/validator/event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,22 +337,25 @@ def sync_to(self, current_block: int) -> None:
if current_block <= self.cursor:
return
end = min(current_block, self.cursor + MAX_BLOCKS_PER_SYNC)
last_processed = self.cursor
for block_num in range(self.cursor + 1, end + 1):
self.process_block(block_num)
self.cursor = end
if not self.process_block(block_num):
break
last_processed = block_num
self.cursor = last_processed
Comment on lines 341 to +345
if current_block - self.last_prune_block >= EVENT_PRUNE_INTERVAL_BLOCKS:
self.prune_old_events(current_block)
self.last_prune_block = current_block

def process_block(self, block_num: int) -> None:
def process_block(self, block_num: int) -> bool:
try:
block_hash = self.substrate.get_block_hash(block_num)
if not block_hash:
return
return True
events = self.substrate.get_events(block_hash=block_hash)
except Exception as e:
bt.logging.debug(f'EventWatcher: block {block_num} events unavailable: {e}')
return
return False
Comment on lines 351 to +358

for event_record in events:
decoded = self.decode_contract_event(event_record)
Expand All @@ -366,6 +369,7 @@ def process_block(self, block_num: int) -> None:
# which would re-replay every successful apply_event in the
# same block on the next pass and double-apply busy deltas.
bt.logging.warning(f'EventWatcher: apply_event {name}@{block_num} failed: {e}')
return True

def decode_contract_event(self, event_record: Any) -> Optional[Tuple[str, Dict[str, Any]]]:
record = event_record.value if hasattr(event_record, 'value') else event_record
Expand Down
57 changes: 56 additions & 1 deletion tests/test_event_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

import struct
from pathlib import Path
from unittest.mock import MagicMock
from unittest.mock import MagicMock, call

from bittensor.utils import ss58_decode

Expand Down Expand Up @@ -404,3 +404,58 @@ def test_bootstrap_tolerates_contract_read_failures(self, tmp_path: Path):
assert w.active_miners == set()
assert w.cursor == 0
w.state_store.close()


class TestSyncToRetryFailures:
def test_clean_sync_advances_cursor_to_end(self, tmp_path: Path):
w = make_watcher(tmp_path)
w.cursor = 10
w.substrate.get_block_hash.side_effect = lambda block_num: f'hash-{block_num}'
w.substrate.get_events.return_value = []

w.sync_to(13)

assert w.cursor == 13
assert w.substrate.get_block_hash.call_args_list == [call(11), call(12), call(13)]
w.state_store.close()

def test_transient_block_fetch_failure_stops_cursor_before_failed_block(self, tmp_path: Path):
w = make_watcher(tmp_path)
w.cursor = 10

def get_block_hash(block_num: int):
if block_num == 12:
raise RuntimeError('rpc timeout')
return f'hash-{block_num}'

w.substrate.get_block_hash.side_effect = get_block_hash
w.substrate.get_events.return_value = []

Comment on lines +422 to +433
w.sync_to(14)

assert w.cursor == 11
assert w.substrate.get_block_hash.call_args_list == [call(11), call(12)]
w.state_store.close()

def test_failed_block_is_retried_on_next_sync(self, tmp_path: Path):
w = make_watcher(tmp_path)
w.cursor = 10

def flaky_get_block_hash(block_num: int):
if block_num == 12:
raise RuntimeError('rpc timeout')
return f'hash-{block_num}'

w.substrate.get_block_hash.side_effect = flaky_get_block_hash
w.substrate.get_events.return_value = []
w.sync_to(14)
assert w.cursor == 11

w.substrate.get_block_hash.reset_mock()
w.substrate.get_block_hash.side_effect = lambda block_num: f'hash-{block_num}'

w.sync_to(14)

assert w.cursor == 14
assert w.substrate.get_block_hash.call_args_list == [call(12), call(13), call(14)]
w.state_store.close()
Loading