22
22
TYPE_CHECKING ,
23
23
)
24
24
25
- import asyncstdlib as a
26
25
from bt_decode import MetadataV15 , PortableRegistry , decode as decode_by_type_string
27
26
from scalecodec .base import ScaleBytes , ScaleType , RuntimeConfigurationObject
28
27
from scalecodec .types import (
42
41
BlockNotFound ,
43
42
MaxRetriesExceeded ,
44
43
MetadataAtVersionNotFound ,
44
+ StateDiscardedError ,
45
45
)
46
46
from async_substrate_interface .protocols import Keypair
47
47
from async_substrate_interface .types import (
58
58
get_next_id ,
59
59
rng as random ,
60
60
)
61
- from async_substrate_interface .utils .cache import async_sql_lru_cache
61
+ from async_substrate_interface .utils .cache import async_sql_lru_cache , CachedFetcher
62
62
from async_substrate_interface .utils .decoding import (
63
63
_determine_if_old_runtime_call ,
64
64
_bt_decode_to_dict_or_list ,
@@ -539,14 +539,17 @@ def __init__(
539
539
"You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. "
540
540
"Verify this is intended."
541
541
)
542
- now = asyncio .new_event_loop ().time ()
542
+ # default value for in case there's no running asyncio loop
543
+ # this really doesn't matter in most cases, as it's only used for comparison on the first call to
544
+ # see how long it's been since the last call
545
+ now = 0.0
543
546
self .last_received = now
544
547
self .last_sent = now
548
+ self ._in_use_ids = set ()
545
549
546
550
async def __aenter__ (self ):
547
- async with self ._lock :
548
- self ._in_use += 1
549
- await self .connect ()
551
+ self ._in_use += 1
552
+ await self .connect ()
550
553
return self
551
554
552
555
@staticmethod
@@ -559,18 +562,19 @@ async def connect(self, force=False):
559
562
self .last_sent = now
560
563
if self ._exit_task :
561
564
self ._exit_task .cancel ()
562
- if not self ._initialized or force :
563
- self ._initialized = True
564
- try :
565
- self ._receiving_task .cancel ()
566
- await self ._receiving_task
567
- await self .ws .close ()
568
- except (AttributeError , asyncio .CancelledError ):
569
- pass
570
- self .ws = await asyncio .wait_for (
571
- connect (self .ws_url , ** self ._options ), timeout = 10
572
- )
573
- self ._receiving_task = asyncio .create_task (self ._start_receiving ())
565
+ async with self ._lock :
566
+ if not self ._initialized or force :
567
+ try :
568
+ self ._receiving_task .cancel ()
569
+ await self ._receiving_task
570
+ await self .ws .close ()
571
+ except (AttributeError , asyncio .CancelledError ):
572
+ pass
573
+ self .ws = await asyncio .wait_for (
574
+ connect (self .ws_url , ** self ._options ), timeout = 10
575
+ )
576
+ self ._receiving_task = asyncio .create_task (self ._start_receiving ())
577
+ self ._initialized = True
574
578
575
579
async def __aexit__ (self , exc_type , exc_val , exc_tb ):
576
580
async with self ._lock : # TODO is this actually what I want to happen?
@@ -619,6 +623,7 @@ async def _recv(self) -> None:
619
623
self ._open_subscriptions -= 1
620
624
if "id" in response :
621
625
self ._received [response ["id" ]] = response
626
+ self ._in_use_ids .remove (response ["id" ])
622
627
elif "params" in response :
623
628
self ._received [response ["params" ]["subscription" ]] = response
624
629
else :
@@ -649,6 +654,9 @@ async def send(self, payload: dict) -> int:
649
654
id: the internal ID of the request (incremented int)
650
655
"""
651
656
original_id = get_next_id ()
657
+ while original_id in self ._in_use_ids :
658
+ original_id = get_next_id ()
659
+ self ._in_use_ids .add (original_id )
652
660
# self._open_subscriptions += 1
653
661
await self .max_subscriptions .acquire ()
654
662
try :
@@ -674,7 +682,7 @@ async def retrieve(self, item_id: int) -> Optional[dict]:
674
682
self .max_subscriptions .release ()
675
683
return item
676
684
except KeyError :
677
- await asyncio .sleep (0.001 )
685
+ await asyncio .sleep (0.1 )
678
686
return None
679
687
680
688
@@ -725,6 +733,7 @@ def __init__(
725
733
)
726
734
else :
727
735
self .ws = AsyncMock (spec = Websocket )
736
+
728
737
self ._lock = asyncio .Lock ()
729
738
self .config = {
730
739
"use_remote_preset" : use_remote_preset ,
@@ -748,6 +757,12 @@ def __init__(
748
757
self .registry_type_map = {}
749
758
self .type_id_to_name = {}
750
759
self ._mock = _mock
760
+ self ._block_hash_fetcher = CachedFetcher (512 , self ._get_block_hash )
761
+ self ._parent_hash_fetcher = CachedFetcher (512 , self ._get_parent_block_hash )
762
+ self ._runtime_info_fetcher = CachedFetcher (16 , self ._get_block_runtime_info )
763
+ self ._runtime_version_for_fetcher = CachedFetcher (
764
+ 512 , self ._get_block_runtime_version_for
765
+ )
751
766
752
767
async def __aenter__ (self ):
753
768
if not self ._mock :
@@ -1869,9 +1884,8 @@ async def get_metadata(self, block_hash=None) -> MetadataV15:
1869
1884
1870
1885
return runtime .metadata_v15
1871
1886
1872
- @a .lru_cache (maxsize = 512 )
1873
1887
async def get_parent_block_hash (self , block_hash ):
1874
- return await self ._get_parent_block_hash (block_hash )
1888
+ return await self ._parent_hash_fetcher . execute (block_hash )
1875
1889
1876
1890
async def _get_parent_block_hash (self , block_hash ):
1877
1891
block_header = await self .rpc_request ("chain_getHeader" , [block_hash ])
@@ -1916,9 +1930,8 @@ async def get_storage_by_key(self, block_hash: str, storage_key: str) -> Any:
1916
1930
"Unknown error occurred during retrieval of events"
1917
1931
)
1918
1932
1919
- @a .lru_cache (maxsize = 16 )
1920
1933
async def get_block_runtime_info (self , block_hash : str ) -> dict :
1921
- return await self ._get_block_runtime_info (block_hash )
1934
+ return await self ._runtime_info_fetcher . execute (block_hash )
1922
1935
1923
1936
get_block_runtime_version = get_block_runtime_info
1924
1937
@@ -1929,9 +1942,8 @@ async def _get_block_runtime_info(self, block_hash: str) -> dict:
1929
1942
response = await self .rpc_request ("state_getRuntimeVersion" , [block_hash ])
1930
1943
return response .get ("result" )
1931
1944
1932
- @a .lru_cache (maxsize = 512 )
1933
1945
async def get_block_runtime_version_for (self , block_hash : str ):
1934
- return await self ._get_block_runtime_version_for (block_hash )
1946
+ return await self ._runtime_version_for_fetcher . execute (block_hash )
1935
1947
1936
1948
async def _get_block_runtime_version_for (self , block_hash : str ):
1937
1949
"""
@@ -2137,6 +2149,7 @@ async def _make_rpc_request(
2137
2149
storage_item ,
2138
2150
result_handler ,
2139
2151
)
2152
+
2140
2153
request_manager .add_response (
2141
2154
item_id , decoded_response , complete
2142
2155
)
@@ -2149,14 +2162,14 @@ async def _make_rpc_request(
2149
2162
and current_time - self .ws .last_sent >= self .retry_timeout
2150
2163
):
2151
2164
if attempt >= self .max_retries :
2152
- logger .warning (
2165
+ logger .error (
2153
2166
f"Timed out waiting for RPC requests { attempt } times. Exiting."
2154
2167
)
2155
2168
raise MaxRetriesExceeded ("Max retries reached." )
2156
2169
else :
2157
2170
self .ws .last_received = time .time ()
2158
2171
await self .ws .connect (force = True )
2159
- logger .error (
2172
+ logger .warning (
2160
2173
f"Timed out waiting for RPC requests. "
2161
2174
f"Retrying attempt { attempt + 1 } of { self .max_retries } "
2162
2175
)
@@ -2223,9 +2236,8 @@ async def rpc_request(
2223
2236
]
2224
2237
result = await self ._make_rpc_request (payloads , result_handler = result_handler )
2225
2238
if "error" in result [payload_id ][0 ]:
2226
- if (
2227
- "Failed to get runtime version"
2228
- in result [payload_id ][0 ]["error" ]["message" ]
2239
+ if "Failed to get runtime version" in (
2240
+ err_msg := result [payload_id ][0 ]["error" ]["message" ]
2229
2241
):
2230
2242
logger .warning (
2231
2243
"Failed to get runtime. Re-fetching from chain, and retrying."
@@ -2234,15 +2246,21 @@ async def rpc_request(
2234
2246
return await self .rpc_request (
2235
2247
method , params , result_handler , block_hash , reuse_block_hash
2236
2248
)
2237
- raise SubstrateRequestException (result [payload_id ][0 ]["error" ]["message" ])
2249
+ elif (
2250
+ "Client error: Api called for an unknown Block: State already discarded"
2251
+ in err_msg
2252
+ ):
2253
+ bh = err_msg .split ("State already discarded for " )[1 ].strip ()
2254
+ raise StateDiscardedError (bh )
2255
+ else :
2256
+ raise SubstrateRequestException (err_msg )
2238
2257
if "result" in result [payload_id ][0 ]:
2239
2258
return result [payload_id ][0 ]
2240
2259
else :
2241
2260
raise SubstrateRequestException (result [payload_id ][0 ])
2242
2261
2243
- @a .lru_cache (maxsize = 512 )
2244
2262
async def get_block_hash (self , block_id : int ) -> str :
2245
- return await self ._get_block_hash (block_id )
2263
+ return await self ._block_hash_fetcher . execute (block_id )
2246
2264
2247
2265
async def _get_block_hash (self , block_id : int ) -> str :
2248
2266
return (await self .rpc_request ("chain_getBlockHash" , [block_id ]))["result" ]
0 commit comments