Skip to content

Commit 35728ab

Browse files
authored
Merge pull request #144 from opentensor/feat/thewhaleking/fully-exhaust-query-map
fully exhaust query map
2 parents 9bf206c + 7c1ece4 commit 35728ab

File tree

3 files changed

+114
-21
lines changed

3 files changed

+114
-21
lines changed

async_substrate_interface/async_substrate.py

Lines changed: 65 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,6 @@ async def retrieve_next_page(self, start_key) -> list:
456456
)
457457
if len(result.records) < self.page_size:
458458
self.loading_complete = True
459-
460459
# Update last key from new result set to use as offset for next page
461460
self.last_key = result.last_key
462461
return result.records
@@ -3373,6 +3372,7 @@ async def query_map(
33733372
page_size: int = 100,
33743373
ignore_decoding_errors: bool = False,
33753374
reuse_block_hash: bool = False,
3375+
fully_exhaust: bool = False,
33763376
) -> AsyncQueryMapResult:
33773377
"""
33783378
Iterates over all key-pairs located at the given module and storage_function. The storage
@@ -3403,6 +3403,8 @@ async def query_map(
34033403
decoding
34043404
reuse_block_hash: use True if you wish to make the query using the last-used block hash. Do not mark True
34053405
if supplying a block_hash
3406+
fully_exhaust: Pull the entire result at once, rather than paginating. Only use if you need the entire query
3407+
map result.
34063408
34073409
Returns:
34083410
AsyncQueryMapResult object
@@ -3453,11 +3455,16 @@ async def query_map(
34533455
page_size = max_results
34543456

34553457
# Retrieve storage keys
3456-
response = await self.rpc_request(
3457-
method="state_getKeysPaged",
3458-
params=[prefix, page_size, start_key, block_hash],
3459-
runtime=runtime,
3460-
)
3458+
if not fully_exhaust:
3459+
response = await self.rpc_request(
3460+
method="state_getKeysPaged",
3461+
params=[prefix, page_size, start_key, block_hash],
3462+
runtime=runtime,
3463+
)
3464+
else:
3465+
response = await self.rpc_request(
3466+
method="state_getKeys", params=[prefix, block_hash], runtime=runtime
3467+
)
34613468

34623469
if "error" in response:
34633470
raise SubstrateRequestException(response["error"]["message"])
@@ -3470,18 +3477,60 @@ async def query_map(
34703477
if len(result_keys) > 0:
34713478
last_key = result_keys[-1]
34723479

3473-
# Retrieve corresponding value
3474-
response = await self.rpc_request(
3475-
method="state_queryStorageAt",
3476-
params=[result_keys, block_hash],
3477-
runtime=runtime,
3478-
)
3480+
# Retrieve corresponding value(s)
3481+
if not fully_exhaust:
3482+
response = await self.rpc_request(
3483+
method="state_queryStorageAt",
3484+
params=[result_keys, block_hash],
3485+
runtime=runtime,
3486+
)
3487+
if "error" in response:
3488+
raise SubstrateRequestException(response["error"]["message"])
3489+
for result_group in response["result"]:
3490+
result = decode_query_map(
3491+
result_group["changes"],
3492+
prefix,
3493+
runtime,
3494+
param_types,
3495+
params,
3496+
value_type,
3497+
key_hashers,
3498+
ignore_decoding_errors,
3499+
self.decode_ss58,
3500+
)
3501+
else:
3502+
all_responses = []
3503+
page_batches = [
3504+
result_keys[i : i + page_size]
3505+
for i in range(0, len(result_keys), page_size)
3506+
]
3507+
changes = []
3508+
for batch_group in [
3509+
# run five concurrent batch pulls; could go higher, but it's good to be a good citizens
3510+
# of the ecosystem
3511+
page_batches[i : i + 5]
3512+
for i in range(0, len(page_batches), 5)
3513+
]:
3514+
all_responses.extend(
3515+
await asyncio.gather(
3516+
*[
3517+
self.rpc_request(
3518+
method="state_queryStorageAt",
3519+
params=[batch_keys, block_hash],
3520+
runtime=runtime,
3521+
)
3522+
for batch_keys in batch_group
3523+
]
3524+
)
3525+
)
3526+
for response in all_responses:
3527+
if "error" in response:
3528+
raise SubstrateRequestException(response["error"]["message"])
3529+
for result_group in response["result"]:
3530+
changes.extend(result_group["changes"])
34793531

3480-
if "error" in response:
3481-
raise SubstrateRequestException(response["error"]["message"])
3482-
for result_group in response["result"]:
34833532
result = decode_query_map(
3484-
result_group["changes"],
3533+
changes,
34853534
prefix,
34863535
runtime,
34873536
param_types,

async_substrate_interface/utils/decoding.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def _decode_scale_list_with_runtime(
7373

7474

7575
def decode_query_map(
76-
result_group_changes,
76+
result_group_changes: list,
7777
prefix,
7878
runtime: "Runtime",
7979
param_types,
@@ -123,10 +123,10 @@ def concat_hash_len(key_hasher: str) -> int:
123123
decoded_keys = all_decoded[:middl_index]
124124
decoded_values = all_decoded[middl_index:]
125125
for kts, vts, dk, dv in zip(
126-
pre_decoded_key_types,
127-
pre_decoded_value_types,
128-
decoded_keys,
129-
decoded_values,
126+
pre_decoded_key_types,
127+
pre_decoded_value_types,
128+
decoded_keys,
129+
decoded_values,
130130
):
131131
try:
132132
# strip key_hashers to use as item key

tests/integration_tests/test_async_substrate_interface.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import time
2+
13
import pytest
24
from scalecodec import ss58_encode
35

@@ -71,3 +73,45 @@ async def test_ss58_conversion():
7173
if len(value.value) > 0:
7274
for decoded_key in value.value:
7375
assert isinstance(decoded_key, str)
76+
77+
78+
@pytest.mark.asyncio
79+
async def test_fully_exhaust_query_map():
80+
async with AsyncSubstrateInterface(LATENT_LITE_ENTRYPOINT) as substrate:
81+
block_hash = await substrate.get_chain_finalised_head()
82+
non_fully_exhauster_start = time.time()
83+
non_fully_exhausted_qm = await substrate.query_map(
84+
"SubtensorModule",
85+
"CRV3WeightCommits",
86+
block_hash=block_hash,
87+
)
88+
initial_records_count = len(non_fully_exhausted_qm.records)
89+
assert initial_records_count <= 100 # default page size
90+
exhausted_records_count = 0
91+
async for _ in non_fully_exhausted_qm:
92+
exhausted_records_count += 1
93+
non_fully_exhausted_time = time.time() - non_fully_exhauster_start
94+
95+
assert len(non_fully_exhausted_qm.records) >= initial_records_count
96+
fully_exhausted_start = time.time()
97+
fully_exhausted_qm = await substrate.query_map(
98+
"SubtensorModule",
99+
"CRV3WeightCommits",
100+
block_hash=block_hash,
101+
fully_exhaust=True,
102+
)
103+
104+
fully_exhausted_time = time.time() - fully_exhausted_start
105+
initial_records_count_fully_exhaust = len(fully_exhausted_qm.records)
106+
assert fully_exhausted_time <= non_fully_exhausted_time, (
107+
f"Fully exhausted took longer than non-fully exhausted with "
108+
f"{len(non_fully_exhausted_qm.records)} records in non-fully exhausted "
109+
f"in {non_fully_exhausted_time} seconds, and {initial_records_count_fully_exhaust} in fully exhausted"
110+
f" in {fully_exhausted_time} seconds. This could be caused by the fact that on this specific block, "
111+
f"there are fewer records than take up a single page. This difference should still be small."
112+
)
113+
fully_exhausted_records_count = 0
114+
async for _ in fully_exhausted_qm:
115+
fully_exhausted_records_count += 1
116+
assert fully_exhausted_records_count == initial_records_count_fully_exhaust
117+
assert initial_records_count_fully_exhaust == exhausted_records_count

0 commit comments

Comments
 (0)