diff --git a/hub/herald/search.py b/hub/herald/search.py index d3b6b7f..7a0e10b 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -10,6 +10,7 @@ from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result from hub.db.common import ResolveResult if TYPE_CHECKING: + from prometheus_client import Counter as PrometheusCounter from hub.db import SecondaryDB @@ -29,9 +30,10 @@ class SearchIndex: VERSION = 1 def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', - elastic_port=9200): + elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): self.hub_db = hub_db self.search_timeout = search_timeout + self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None self.sync_client: Optional[AsyncElasticsearch] = None @@ -59,7 +61,7 @@ async def start(self) -> bool: return False hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) - self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) + self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) while True: try: await self.sync_client.cluster.health(wait_for_status='yellow') @@ -205,10 +207,14 @@ async def search_ahead(self, **kwargs): reordered_hits = cache_item.result else: query = expand_query(**kwargs) - search_hits = deque((await self.search_client.search( + es_resp = await self.search_client.search( query, index=self.index, track_total_hits=False, + timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ))['hits']['hits']) + ) + search_hits = deque(es_resp['hits']['hits']) + if self.timeout_counter and es_resp['timed_out']: + self.timeout_counter.inc() if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -235,7 +241,7 @@ def __remove_duplicates(self, search_hits: deque) -> deque: dropped.add(hit['_id']) return deque(hit for hit in search_hits if hit['_id'] not in dropped) - def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int): + def __search_ahead(self, search_hits: deque, page_size: int, per_channel_per_page: int) -> list: reordered_hits = [] channel_counters = Counter() next_page_hits_maybe_check_later = deque() diff --git a/hub/herald/session.py b/hub/herald/session.py index 99e29e5..018e418 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -210,7 +210,8 @@ def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', # Search index self.search_index = SearchIndex( self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + timeout_counter=self.interrupt_count_metric ) self.running = False # hashX: List[int]