Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 113 additions & 22 deletions gittensor/classes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import copy
import re
from dataclasses import dataclass, field
from datetime import datetime, timezone
from datetime import datetime, timedelta, timezone
from enum import Enum
from math import prod
from typing import TYPE_CHECKING, DefaultDict, Dict, List, Optional, Set, Tuple
Expand All @@ -15,6 +15,7 @@
from gittensor.validator.oss_contributions.mirror.scored_pr import ScoredPR

from gittensor.constants import (
EVALUATION_CACHE_MAX_AGE_SECONDS,
EXTENSIONLESS_FILE_EXTENSIONS,
MAX_CODE_DENSITY_MULTIPLIER,
)
Expand Down Expand Up @@ -529,6 +530,7 @@ class CachedEvaluation:
github_id: str
evaluation: 'MinerEvaluation'
cached_at: datetime
issue_discovery_cached_at: Optional[datetime] = None


# Fields owned by the issue-discovery phase. store() preserves these across
Expand Down Expand Up @@ -576,8 +578,9 @@ class MinerEvaluationCache:
later same-round mirror outage can fall back to a non-zero score.
"""

def __init__(self):
def __init__(self, max_age_seconds: int = EVALUATION_CACHE_MAX_AGE_SECONDS):
self._cache: Dict[int, CachedEvaluation] = {}
self._max_age = timedelta(seconds=max_age_seconds)

def store(self, evaluation: 'MinerEvaluation') -> None:
"""Store a successful evaluation in the cache.
Expand All @@ -594,25 +597,35 @@ def store(self, evaluation: 'MinerEvaluation') -> None:
if not evaluation.hotkey or not evaluation.github_id or evaluation.github_id == '0':
return

now = datetime.now(timezone.utc)
cached_eval = self._build_cache_entry(evaluation)
issue_discovery_cached_at: Optional[datetime] = None

existing = self._cache.get(evaluation.uid)
if existing is not None and existing.hotkey == evaluation.hotkey and existing.github_id == evaluation.github_id:
for name in _ISSUE_DISCOVERY_FIELDS:
value = getattr(existing.evaluation, name)
setattr(cached_eval, name, _copy_issue_discovery_value(name, value))
for repo_name, prior_repo in existing.evaluation.repo_evaluations.items():
target = cached_eval.repo_evaluations.get(repo_name)
if target is None:
target = RepoEvaluation(repository_full_name=prior_repo.repository_full_name)
cached_eval.repo_evaluations[repo_name] = target
target.copy_issue_discovery_from(prior_repo)
if self._is_fresh(existing.issue_discovery_cached_at, now):
issue_discovery_cached_at = existing.issue_discovery_cached_at
for name in _ISSUE_DISCOVERY_FIELDS:
value = getattr(existing.evaluation, name)
setattr(cached_eval, name, _copy_issue_discovery_value(name, value))
for repo_name, prior_repo in existing.evaluation.repo_evaluations.items():
target = cached_eval.repo_evaluations.get(repo_name)
if target is None:
target = RepoEvaluation(repository_full_name=prior_repo.repository_full_name)
cached_eval.repo_evaluations[repo_name] = target
target.copy_issue_discovery_from(prior_repo)
elif existing.issue_discovery_cached_at is not None:
bt.logging.debug(
f'Not preserving expired cached issue discovery for UID {evaluation.uid} '
f'(cached at {existing.issue_discovery_cached_at.isoformat()})'
)

self._cache[evaluation.uid] = CachedEvaluation(
hotkey=evaluation.hotkey,
github_id=evaluation.github_id,
evaluation=cached_eval,
cached_at=datetime.now(timezone.utc),
cached_at=now,
issue_discovery_cached_at=issue_discovery_cached_at,
)

bt.logging.debug(f'Cached successful evaluation for UID {evaluation.uid}')
Expand Down Expand Up @@ -650,6 +663,7 @@ def update_issue_discovery(self, evaluation: 'MinerEvaluation') -> None:
existing.evaluation.repo_evaluations[repo_name] = target
target.copy_issue_discovery_from(repo_eval)

existing.issue_discovery_cached_at = datetime.now(timezone.utc)
bt.logging.debug(f'Refreshed cached issue discovery for UID {evaluation.uid}')

def get(self, uid: int, hotkey: str, github_id: str) -> Optional['MinerEvaluation']:
Expand All @@ -659,8 +673,59 @@ def get(self, uid: int, hotkey: str, github_id: str) -> Optional['MinerEvaluatio
Returns:
Cached MinerEvaluation if found and identity matches, None otherwise
"""
cached = self._cache.get(uid)
cached = self._get_identity_matched(uid, hotkey, github_id)
if cached is None:
return None

now = datetime.now(timezone.utc)
if not self._is_fresh(cached.cached_at, now):
bt.logging.warning(
f'Cache miss for UID {uid}: cached evaluation expired '
f'(cached at {cached.cached_at.isoformat()}, max_age={self._max_age})'
)
self._cache.pop(uid, None)
return None

include_issue_discovery = self._is_fresh(cached.issue_discovery_cached_at, now)
bt.logging.debug(f'Cache hit for UID {uid} (cached at {cached.cached_at.isoformat()})')

return self._isolate_for_downstream(cached.evaluation, include_issue_discovery=include_issue_discovery)

def get_issue_discovery(self, uid: int, hotkey: str, github_id: str) -> Optional['MinerEvaluation']:
"""Retrieve cached issue-discovery fields when both cache layers are fresh."""
cached = self._get_identity_matched(uid, hotkey, github_id)
if cached is None:
return None

now = datetime.now(timezone.utc)
if not self._is_fresh(cached.cached_at, now):
bt.logging.warning(
f'Issue-discovery cache miss for UID {uid}: cached evaluation expired '
f'(cached at {cached.cached_at.isoformat()}, max_age={self._max_age})'
)
self._cache.pop(uid, None)
return None

issue_discovery_cached_at = cached.issue_discovery_cached_at
if issue_discovery_cached_at is None or not self._is_fresh(issue_discovery_cached_at, now):
cached_at = issue_discovery_cached_at.isoformat() if issue_discovery_cached_at else 'never'
bt.logging.warning(
f'Issue-discovery cache miss for UID {uid}: issue-discovery fields expired '
f'(cached at {cached_at}, max_age={self._max_age})'
)
return None

bt.logging.debug(f'Issue-discovery cache hit for UID {uid} (cached at {issue_discovery_cached_at.isoformat()})')
return self._isolate_for_downstream(cached.evaluation, include_issue_discovery=True)

def evict_many(self, uids: Set[int]) -> None:
"""Remove cached evaluations for all provided UIDs."""
for uid in uids:
if self._cache.pop(uid, None) is not None:
bt.logging.debug(f'Evicted cached evaluation for UID {uid}')

def _get_identity_matched(self, uid: int, hotkey: str, github_id: str) -> Optional[CachedEvaluation]:
cached = self._cache.get(uid)
if cached is None:
return None

Expand All @@ -674,15 +739,14 @@ def get(self, uid: int, hotkey: str, github_id: str) -> Optional['MinerEvaluatio
del self._cache[uid]
return None

bt.logging.debug(f'Cache hit for UID {uid} (cached at {cached.cached_at.isoformat()})')

return self._isolate_for_downstream(cached.evaluation)
return cached

def evict_many(self, uids: Set[int]) -> None:
"""Remove cached evaluations for all provided UIDs."""
for uid in uids:
if self._cache.pop(uid, None) is not None:
bt.logging.debug(f'Evicted cached evaluation for UID {uid}')
def _is_fresh(self, cached_at: Optional[datetime], now: datetime) -> bool:
if cached_at is None:
return False
if cached_at.tzinfo is None:
cached_at = cached_at.replace(tzinfo=timezone.utc)
return now - cached_at <= self._max_age

@staticmethod
def _build_cache_entry(evaluation: 'MinerEvaluation') -> 'MinerEvaluation':
Expand All @@ -699,14 +763,20 @@ def _build_cache_entry(evaluation: 'MinerEvaluation') -> 'MinerEvaluation':
return cached

@staticmethod
def _isolate_for_downstream(cached_eval: 'MinerEvaluation') -> 'MinerEvaluation':
def _isolate_for_downstream(
cached_eval: 'MinerEvaluation',
*,
include_issue_discovery: bool,
) -> 'MinerEvaluation':
# Downstream scoring mutates top-level scalar fields on MinerEvaluation
# and discovery_* fields on Issue. Mirror PRs are shared — the issue
# adapters produce fresh Issue objects per call via get_all_issues().
copy_eval = copy.copy(cached_eval)
copy_eval.unique_repos_contributed_to = set(cached_eval.unique_repos_contributed_to)
copy_eval.issue_discovery_issues = _copy_issue_discovery_issues(cached_eval.issue_discovery_issues)
copy_eval.repo_evaluations = {name: copy.copy(re) for name, re in cached_eval.repo_evaluations.items()}
if not include_issue_discovery:
_clear_cached_issue_discovery(copy_eval)
return copy_eval


Expand All @@ -722,3 +792,24 @@ def _scored_mirror_pr_for_cache(scored: 'ScoredPR') -> 'ScoredPR':
scored_copy = copy.copy(scored)
scored_copy.files = None
return scored_copy


def _clear_cached_issue_discovery(evaluation: MinerEvaluation) -> None:
evaluation.issue_discovery_score = 0.0
evaluation.issue_token_score = 0.0
evaluation.issue_credibility = 0.0
evaluation.is_issue_eligible = False
evaluation.total_solved_issues = 0
evaluation.total_valid_solved_issues = 0
evaluation.total_closed_issues = 0
evaluation.total_open_issues = 0
evaluation.issue_discovery_issues = []
for repo_eval in evaluation.repo_evaluations.values():
repo_eval.is_issue_eligible = False
repo_eval.issue_credibility = 0.0
repo_eval.issue_discovery_score = 0.0
repo_eval.issue_token_score = 0.0
repo_eval.total_solved_issues = 0
repo_eval.total_valid_solved_issues = 0
repo_eval.total_closed_issues = 0
repo_eval.total_open_issues = 0
4 changes: 4 additions & 0 deletions gittensor/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
SECONDS_PER_DAY = 86400
SECONDS_PER_HOUR = 3600

# Cache fallback is for short-lived upstream outages, not indefinite reward
# preservation. Six hours covers three normal validator scoring intervals.
EVALUATION_CACHE_MAX_AGE_SECONDS = 6 * SECONDS_PER_HOUR

# =============================================================================
# Network
# =============================================================================
Expand Down
2 changes: 1 addition & 1 deletion gittensor/validator/issue_discovery/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def _restore_issue_discovery_from_cache(
if evaluation_cache is None:
return False

cached = evaluation_cache.get(evaluation.uid, evaluation.hotkey, evaluation.github_id or '')
cached = evaluation_cache.get_issue_discovery(evaluation.uid, evaluation.hotkey, evaluation.github_id or '')
if cached is None:
bt.logging.warning(f'├─ UID {evaluation.uid}: no cached issue-discovery evaluation available')
return False
Expand Down
55 changes: 52 additions & 3 deletions tests/validator/issue_discovery/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from __future__ import annotations

import asyncio
from datetime import datetime, timedelta, timezone
from typing import Optional
from unittest.mock import Mock

Expand Down Expand Up @@ -171,6 +172,15 @@ def _run(coro):
return asyncio.run(coro)


def _seed_issue_discovery_cache(cache: MinerEvaluationCache, evaluation: MinerEvaluation) -> None:
cache.store(evaluation)
cache.update_issue_discovery(evaluation)


def _expire_issue_discovery_cache(cache: MinerEvaluationCache, uid: int) -> None:
cache._cache[uid].issue_discovery_cached_at = datetime.now(timezone.utc) - cache._max_age - timedelta(seconds=1)


# ============================================================================
# _classify_issue (anti-gaming gates)
# ============================================================================
Expand Down Expand Up @@ -405,7 +415,7 @@ def test_mirror_request_error_restores_cached_issue_discovery_fields(self):
cached.is_issue_eligible = True
cached.total_solved_issues = 7
cached.total_valid_solved_issues = 7
cache.store(cached)
_seed_issue_discovery_cache(cache, cached)

client = Mock()
working_issues = [
Expand Down Expand Up @@ -528,6 +538,45 @@ def test_oss_store_preserves_cached_issue_fields_across_rounds(self):
assert round_n.total_solved_issues == 7
assert round_n.total_valid_solved_issues == 7

def test_expired_issue_discovery_cache_is_not_restored_after_oss_store_refresh(self):
cache = MinerEvaluationCache()

prior_round = _eval(uid=1, github_id='999')
prior_round.issue_discovery_score = 8.12
prior_round.issue_token_score = 700.0
prior_round.issue_credibility = 1.0
prior_round.is_issue_eligible = True
prior_round.total_solved_issues = 7
prior_round.total_valid_solved_issues = 7
_seed_issue_discovery_cache(cache, prior_round)
_expire_issue_discovery_cache(cache, uid=1)

# A fresh OSS store should refresh the PR-side cache without making
# stale issue-discovery fields eligible for another failure fallback.
current_round = _eval(uid=1, github_id='999')
cache.store(current_round)

client = Mock()
client.get_miner_issues.side_effect = MirrorRequestError('boom')

_run(
run_issue_discovery(
{1: current_round},
_mirror_repos('entrius/gittensor-ui'),
_EMPTY_LANGS,
_EMPTY_TOKEN_CONFIG,
client=client,
evaluation_cache=cache,
)
)

assert current_round.issue_discovery_score == 0.0
assert current_round.issue_token_score == 0.0
assert current_round.issue_credibility == 0.0
assert current_round.is_issue_eligible is False
assert current_round.total_solved_issues == 0
assert current_round.total_valid_solved_issues == 0

def test_successful_no_issue_fetch_clears_stale_cached_issue_fields(self):
cache = MinerEvaluationCache()
stale = _eval(uid=1, github_id='999')
Expand All @@ -537,7 +586,7 @@ def test_successful_no_issue_fetch_clears_stale_cached_issue_fields(self):
stale.is_issue_eligible = True
stale.total_solved_issues = 7
stale.total_valid_solved_issues = 7
cache.store(stale)
_seed_issue_discovery_cache(cache, stale)

client = Mock()
client.get_miner_issues.return_value = _response([_issue_dict(repo='foo/not-enabled')])
Expand Down Expand Up @@ -575,7 +624,7 @@ def test_solving_pr_file_fetch_failure_does_not_overwrite_cached_issue_fields(se
stale.is_issue_eligible = True
stale.total_solved_issues = 7
stale.total_valid_solved_issues = 7
cache.store(stale)
_seed_issue_discovery_cache(cache, stale)

client = Mock()
client.get_miner_issues.return_value = _response([_issue_dict()])
Expand Down
Loading