From a3b77951f0a6017b09060291f134c128ae5b6a60 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Wed, 17 Aug 2022 16:57:08 -0400 Subject: [PATCH 1/4] Specify API-level search timeout to reduce hard timeout errors. --- hub/herald/search.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hub/herald/search.py b/hub/herald/search.py index d3b6b7f..0435bed 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -59,7 +59,7 @@ async def start(self) -> bool: return False hosts = [{'host': self._elastic_host, 'port': self._elastic_port}] self.sync_client = AsyncElasticsearch(hosts, timeout=self.sync_timeout) - self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout) + self.search_client = AsyncElasticsearch(hosts, timeout=self.search_timeout+1) while True: try: await self.sync_client.cluster.health(wait_for_status='yellow') @@ -207,6 +207,7 @@ async def search_ahead(self, **kwargs): query = expand_query(**kwargs) search_hits = deque((await self.search_client.search( query, index=self.index, track_total_hits=False, + timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] ))['hits']['hits']) if remove_duplicates: From 28c711efad7cec733475358d9ee4eb2ab1bfef33 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Thu, 18 Aug 2022 11:10:10 -0400 Subject: [PATCH 2/4] Report API-level search timeouts in interrupt_count_metric. --- hub/herald/search.py | 23 ++++++++++++++--------- hub/herald/session.py | 6 +++++- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/hub/herald/search.py b/hub/herald/search.py index 0435bed..b728fc9 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -146,11 +146,11 @@ async def cached_search(self, kwargs): total_referenced = [] cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) if cache_item.result is not None: - return cache_item.result + return cache_item.result, False async with cache_item.lock: if cache_item.result: - return cache_item.result - response, offset, total = await self.search(**kwargs) + return cache_item.result, False + response, offset, total, timed_out = await self.search(**kwargs) censored = {} for row in response: if (row.get('censor_type') or 0) >= Censor.SEARCH: @@ -159,7 +159,7 @@ async def cached_search(self, kwargs): censored[censoring_channel_hash].add(row['tx_hash']) total_referenced.extend(response) if censored: - response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) + response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) response = [self._make_resolve_result(r) for r in response] extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)] @@ -167,7 +167,7 @@ async def cached_search(self, kwargs): response, extra, offset, total, censored ) cache_item.result = result - return result + return result, timed_out async def get_many(self, *claim_ids): await self.populate_claim_cache(*claim_ids) @@ -186,7 +186,7 @@ async def search(self, **kwargs): try: return await self.search_ahead(**kwargs) except NotFoundError: - return [], 0, 0 + return [], 0, 0, False # return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) async def search_ahead(self, **kwargs): @@ -196,20 +196,25 @@ async def search_ahead(self, **kwargs): page_size = kwargs.pop('limit', 10) offset = kwargs.pop('offset', 0) kwargs['limit'] = 1000 + timed_out = None cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache) if cache_item.result is not None: reordered_hits = cache_item.result + timed_out = False else: async with cache_item.lock: if cache_item.result: reordered_hits = cache_item.result + timed_out = False else: query = expand_query(**kwargs) - search_hits = deque((await self.search_client.search( + es_resp = await self.search_client.search( query, index=self.index, track_total_hits=False, timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ))['hits']['hits']) + ) + search_hits = deque(es_resp['hits']['hits']) + timed_out = es_resp['timed_out'] if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -218,7 +223,7 @@ async def search_ahead(self, **kwargs): reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits] cache_item.result = reordered_hits result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)]))) - return result, 0, len(reordered_hits) + return result, 0, len(reordered_hits), timed_out def __remove_duplicates(self, search_hits: deque) -> deque: known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original diff --git a/hub/herald/session.py b/hub/herald/session.py index 99e29e5..b447ee1 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1261,7 +1261,11 @@ async def claimtrie_search(self, **kwargs): if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): return Outputs.to_base64([], []) kwargs['channel_id'] = channel_claim.claim_hash.hex() - return await self.session_manager.search_index.cached_search(kwargs) + + result, timed_out = await self.session_manager.search_index.cached_search(kwargs) + if timed_out: + self.session_manager.interrupt_count_metric.inc() + return result except ConnectionTimeout: self.session_manager.interrupt_count_metric.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') From 4c5ca2dc8c2038ebb317625de1a329563bfcbf5b Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Wed, 24 Aug 2022 12:23:12 -0400 Subject: [PATCH 3/4] Revert "Report API-level search timeouts in interrupt_count_metric." This reverts commit 28c711efad7cec733475358d9ee4eb2ab1bfef33. --- hub/herald/search.py | 23 +++++++++-------------- hub/herald/session.py | 6 +----- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/hub/herald/search.py b/hub/herald/search.py index b728fc9..0435bed 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -146,11 +146,11 @@ async def cached_search(self, kwargs): total_referenced = [] cache_item = ResultCacheItem.from_cache(str(kwargs), self.search_cache) if cache_item.result is not None: - return cache_item.result, False + return cache_item.result async with cache_item.lock: if cache_item.result: - return cache_item.result, False - response, offset, total, timed_out = await self.search(**kwargs) + return cache_item.result + response, offset, total = await self.search(**kwargs) censored = {} for row in response: if (row.get('censor_type') or 0) >= Censor.SEARCH: @@ -159,7 +159,7 @@ async def cached_search(self, kwargs): censored[censoring_channel_hash].add(row['tx_hash']) total_referenced.extend(response) if censored: - response, _, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) + response, _, _ = await self.search(**kwargs, censor_type=Censor.NOT_CENSORED) total_referenced.extend(response) response = [self._make_resolve_result(r) for r in response] extra = [self._make_resolve_result(r) for r in await self._get_referenced_rows(total_referenced)] @@ -167,7 +167,7 @@ async def cached_search(self, kwargs): response, extra, offset, total, censored ) cache_item.result = result - return result, timed_out + return result async def get_many(self, *claim_ids): await self.populate_claim_cache(*claim_ids) @@ -186,7 +186,7 @@ async def search(self, **kwargs): try: return await self.search_ahead(**kwargs) except NotFoundError: - return [], 0, 0, False + return [], 0, 0 # return expand_result(result['hits']), 0, result.get('total', {}).get('value', 0) async def search_ahead(self, **kwargs): @@ -196,25 +196,20 @@ async def search_ahead(self, **kwargs): page_size = kwargs.pop('limit', 10) offset = kwargs.pop('offset', 0) kwargs['limit'] = 1000 - timed_out = None cache_item = ResultCacheItem.from_cache(f"ahead{per_channel_per_page}{kwargs}", self.search_cache) if cache_item.result is not None: reordered_hits = cache_item.result - timed_out = False else: async with cache_item.lock: if cache_item.result: reordered_hits = cache_item.result - timed_out = False else: query = expand_query(**kwargs) - es_resp = await self.search_client.search( + search_hits = deque((await self.search_client.search( query, index=self.index, track_total_hits=False, timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ) - search_hits = deque(es_resp['hits']['hits']) - timed_out = es_resp['timed_out'] + ))['hits']['hits']) if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -223,7 +218,7 @@ async def search_ahead(self, **kwargs): reordered_hits = [(hit['_id'], hit['_source']['channel_id']) for hit in search_hits] cache_item.result = reordered_hits result = list(await self.get_many(*(claim_id for claim_id, _ in reordered_hits[offset:(offset + page_size)]))) - return result, 0, len(reordered_hits), timed_out + return result, 0, len(reordered_hits) def __remove_duplicates(self, search_hits: deque) -> deque: known_ids = {} # claim_id -> (creation_height, hit_id), where hit_id is either reposted claim id or original diff --git a/hub/herald/session.py b/hub/herald/session.py index b447ee1..99e29e5 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -1261,11 +1261,7 @@ async def claimtrie_search(self, **kwargs): if not channel_claim or isinstance(channel_claim, (ResolveCensoredError, LookupError, ValueError)): return Outputs.to_base64([], []) kwargs['channel_id'] = channel_claim.claim_hash.hex() - - result, timed_out = await self.session_manager.search_index.cached_search(kwargs) - if timed_out: - self.session_manager.interrupt_count_metric.inc() - return result + return await self.session_manager.search_index.cached_search(kwargs) except ConnectionTimeout: self.session_manager.interrupt_count_metric.inc() raise RPCError(JSONRPC.QUERY_TIMEOUT, 'query timed out') From 08931a1aa875d59fd3b72835fe6ce58ffc463517 Mon Sep 17 00:00:00 2001 From: Jonathan Moody <103143855+moodyjon@users.noreply.github.com> Date: Wed, 24 Aug 2022 12:43:39 -0400 Subject: [PATCH 4/4] Updates for review comments. Implement timeout counter bump in different way. --- hub/herald/search.py | 13 +++++++++---- hub/herald/session.py | 3 ++- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/hub/herald/search.py b/hub/herald/search.py index 0435bed..7a0e10b 100644 --- a/hub/herald/search.py +++ b/hub/herald/search.py @@ -10,6 +10,7 @@ from hub.common import LRUCache, IndexVersionMismatch, INDEX_DEFAULT_SETTINGS, expand_query, expand_result from hub.db.common import ResolveResult if TYPE_CHECKING: + from prometheus_client import Counter as PrometheusCounter from hub.db import SecondaryDB @@ -29,9 +30,10 @@ class SearchIndex: VERSION = 1 def __init__(self, hub_db: 'SecondaryDB', index_prefix: str, search_timeout=3.0, elastic_host='localhost', - elastic_port=9200): + elastic_port=9200, timeout_counter: Optional['PrometheusCounter'] = None): self.hub_db = hub_db self.search_timeout = search_timeout + self.timeout_counter: Optional['PrometheusCounter'] = timeout_counter self.sync_timeout = 600 # wont hit that 99% of the time, but can hit on a fresh import self.search_client: Optional[AsyncElasticsearch] = None self.sync_client: Optional[AsyncElasticsearch] = None @@ -205,11 +207,14 @@ async def search_ahead(self, **kwargs): reordered_hits = cache_item.result else: query = expand_query(**kwargs) - search_hits = deque((await self.search_client.search( + es_resp = await self.search_client.search( query, index=self.index, track_total_hits=False, timeout=f'{int(1000*self.search_timeout)}ms', _source_includes=['_id', 'channel_id', 'reposted_claim_id', 'creation_height'] - ))['hits']['hits']) + ) + search_hits = deque(es_resp['hits']['hits']) + if self.timeout_counter and es_resp['timed_out']: + self.timeout_counter.inc() if remove_duplicates: search_hits = self.__remove_duplicates(search_hits) if per_channel_per_page > 0: @@ -236,7 +241,7 @@ def __remove_duplicates(self, search_hits: deque) -> deque: dropped.add(hit['_id']) return deque(hit for hit in search_hits if hit['_id'] not in dropped) - def __search_ahead(self, search_hits: list, page_size: int, per_channel_per_page: int): + def __search_ahead(self, search_hits: deque, page_size: int, per_channel_per_page: int) -> list: reordered_hits = [] channel_counters = Counter() next_page_hits_maybe_check_later = deque() diff --git a/hub/herald/session.py b/hub/herald/session.py index 99e29e5..018e418 100644 --- a/hub/herald/session.py +++ b/hub/herald/session.py @@ -210,7 +210,8 @@ def __init__(self, env: 'ServerEnv', db: 'SecondaryDB', mempool: 'HubMemPool', # Search index self.search_index = SearchIndex( self.db, self.env.es_index_prefix, self.env.database_query_timeout, - elastic_host=env.elastic_host, elastic_port=env.elastic_port + elastic_host=env.elastic_host, elastic_port=env.elastic_port, + timeout_counter=self.interrupt_count_metric ) self.running = False # hashX: List[int]