Skip to content

Commit 63d9c12

Browse files
committed
⚡ Pull data in parallel
1 parent fc01aaf commit 63d9c12

File tree

1 file changed

+72
-50
lines changed

1 file changed

+72
-50
lines changed

circulating-supply.py

Lines changed: 72 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import os
2525
import sys
2626
import time
27+
from concurrent.futures import ThreadPoolExecutor
2728
from datetime import datetime, timezone
2829

2930
try:
@@ -156,13 +157,17 @@ def multicall(calls: list[tuple[str, bytes]]) -> list[tuple[bool, bytes]]:
156157

157158

158159
def multicall_chunked(calls, chunk_size=1000):
159-
"""Execute calls in chunks to stay within gas limits."""
160+
"""Execute calls in chunks to stay within gas limits, chunks run in parallel."""
160161
if len(calls) == 0:
161162
return []
163+
chunks = [calls[i : i + chunk_size] for i in range(0, len(calls), chunk_size)]
164+
if len(chunks) == 1:
165+
return multicall(chunks[0])
166+
with ThreadPoolExecutor(max_workers=len(chunks)) as pool:
167+
chunk_results = list(pool.map(multicall, chunks))
162168
results = []
163-
for i in range(0, len(calls), chunk_size):
164-
chunk = calls[i : i + chunk_size]
165-
results.extend(multicall(chunk))
169+
for cr in chunk_results:
170+
results.extend(cr)
166171
return results
167172

168173

@@ -195,9 +200,17 @@ def discover_contract_addresses():
195200
print(f" Current Rollup: {current_rollup}")
196201
print(f" Current RewardDist: {current_reward_dist}")
197202

198-
# Fetch historical rollups from CanonicalRollupUpdated events
199-
print("\n Fetching historical rollups from CanonicalRollupUpdated events...")
200-
rollup_logs = get_logs_safe(REGISTRY, [TOPIC_CANONICAL_ROLLUP_UPDATED], DEPLOYMENT_BLOCKS["REGISTRY"])
203+
# Fetch historical contracts from Registry events (in parallel)
204+
print("\n Fetching historical contracts from Registry events...")
205+
reg_start = DEPLOYMENT_BLOCKS["REGISTRY"]
206+
with ThreadPoolExecutor(max_workers=3) as pool:
207+
fut_rollup = pool.submit(get_logs_safe, REGISTRY, [TOPIC_CANONICAL_ROLLUP_UPDATED], reg_start)
208+
fut_ownership = pool.submit(get_logs_safe, REGISTRY, [TOPIC_OWNERSHIP_TRANSFERRED], reg_start)
209+
fut_reward = pool.submit(get_logs_safe, REGISTRY, [TOPIC_REWARD_DISTRIBUTOR_UPDATED], reg_start)
210+
rollup_logs = fut_rollup.result()
211+
ownership_logs = fut_ownership.result()
212+
reward_logs = fut_reward.result()
213+
201214
all_rollups = []
202215
for log in rollup_logs:
203216
if len(log["topics"]) > 1:
@@ -208,9 +221,6 @@ def discover_contract_addresses():
208221
all_rollups = [current_rollup]
209222
print(f" Found {len(all_rollups)} rollup(s): {all_rollups}")
210223

211-
# Fetch historical Governance from OwnershipTransferred events
212-
print("\n Fetching historical Governance from OwnershipTransferred events...")
213-
ownership_logs = get_logs_safe(REGISTRY, [TOPIC_OWNERSHIP_TRANSFERRED], DEPLOYMENT_BLOCKS["REGISTRY"])
214224
all_governance = []
215225
for log in ownership_logs:
216226
if len(log["topics"]) > 2:
@@ -221,9 +231,6 @@ def discover_contract_addresses():
221231
all_governance.append(current_governance)
222232
print(f" Found {len(all_governance)} Governance instance(s): {all_governance}")
223233

224-
# Fetch historical RewardDistributor from RewardDistributorUpdated events
225-
print("\n Fetching historical RewardDistributor from events...")
226-
reward_logs = get_logs_safe(REGISTRY, [TOPIC_REWARD_DISTRIBUTOR_UPDATED], DEPLOYMENT_BLOCKS["REGISTRY"])
227234
all_reward_dists = []
228235
for log in reward_logs:
229236
if len(log["topics"]) > 1:
@@ -304,23 +311,27 @@ def get_logs_safe(address, topics, from_block=None):
304311

305312
def fetch_atps():
306313
"""Fetch all ATP addresses from ATPCreated events across all factories."""
307-
atps = []
308-
for factory in FACTORIES:
314+
def _fetch_factory(factory):
309315
logs = get_logs_safe(factory, [TOPIC_ATP_CREATED], DEPLOYMENT_BLOCKS["FACTORIES"])
310-
for log in logs:
311-
atps.append(
312-
{
313-
"address": to_checksum_cached(
314-
"0x" + log["topics"][2].hex()[-40:]
315-
),
316-
"beneficiary": to_checksum_cached(
317-
"0x" + log["topics"][1].hex()[-40:]
318-
),
319-
"allocation": decode(["uint256"], bytes(log["data"]))[0],
320-
"factory": factory,
321-
}
322-
)
323-
print(f" {factory}: {len(logs)} ATPs")
316+
return factory, logs
317+
318+
atps = []
319+
with ThreadPoolExecutor(max_workers=len(FACTORIES)) as pool:
320+
for factory, logs in pool.map(_fetch_factory, FACTORIES):
321+
for log in logs:
322+
atps.append(
323+
{
324+
"address": to_checksum_cached(
325+
"0x" + log["topics"][2].hex()[-40:]
326+
),
327+
"beneficiary": to_checksum_cached(
328+
"0x" + log["topics"][1].hex()[-40:]
329+
),
330+
"allocation": decode(["uint256"], bytes(log["data"]))[0],
331+
"factory": factory,
332+
}
333+
)
334+
print(f" {factory}: {len(logs)} ATPs")
324335
return atps
325336

326337

@@ -543,14 +554,19 @@ def _bool(i):
543554
# and query WITHDRAWAL_TIMESTAMP from each withdrawal-capable implementation
544555
print(f" Found {len(impl_to_regs)} unique staker implementation(s), checking bytecode...")
545556
withdrawal_capable_impls = {} # impl_addr -> set of registries
546-
for impl_addr, regs in impl_to_regs.items():
547-
code = retry(lambda a=impl_addr: w3.eth.get_code(to_checksum_cached(a)))
548-
if SEL_WITHDRAW_ALL_TO_BENEFICIARY in bytes(code):
549-
print(f" {impl_addr} has withdrawAllTokensToBeneficiary")
550-
withdrawal_capable_impls[impl_addr] = regs
551-
for f, r in factory_registries.items():
552-
if r in regs:
553-
withdrawal_capable_factories.add(f.lower())
557+
558+
def _get_code(impl_addr):
559+
return impl_addr, retry(lambda a=impl_addr: w3.eth.get_code(to_checksum_cached(a)))
560+
561+
with ThreadPoolExecutor(max_workers=len(impl_to_regs) or 1) as pool:
562+
for impl_addr, code in pool.map(_get_code, impl_to_regs.keys()):
563+
regs = impl_to_regs[impl_addr]
564+
if SEL_WITHDRAW_ALL_TO_BENEFICIARY in bytes(code):
565+
print(f" {impl_addr} has withdrawAllTokensToBeneficiary")
566+
withdrawal_capable_impls[impl_addr] = regs
567+
for f, r in factory_registries.items():
568+
if r in regs:
569+
withdrawal_capable_factories.add(f.lower())
554570

555571
if withdrawal_capable_factories:
556572
print(f" {len(withdrawal_capable_factories)} factory(ies) have withdrawal-capable stakers")
@@ -604,15 +620,19 @@ def _bool(i):
604620
# Follow-up: query Slashed events from all historical rollup contracts
605621
# When slashing occurs, the slashed amount stays in the rollup contract permanently
606622
print(f"\n Querying Slashed events from {len(all_rollups)} rollup(s)...")
607-
total_slashed_funds = 0
608-
for rollup_addr in all_rollups:
623+
624+
def _fetch_slashed(rollup_addr):
609625
logs = get_logs_safe(rollup_addr, [TOPIC_SLASHED], DEPLOYMENT_BLOCKS["REGISTRY"])
610-
for log in logs:
611-
# Slashed(address attester, uint256 amount)
612-
# amount is in the data field
613-
if len(log["data"]) >= 32:
614-
amount = decode(["uint256"], bytes(log["data"]))[0]
615-
total_slashed_funds += amount
626+
return sum(
627+
decode(["uint256"], bytes(log["data"]))[0]
628+
for log in logs
629+
if len(log["data"]) >= 32
630+
)
631+
632+
total_slashed_funds = 0
633+
with ThreadPoolExecutor(max_workers=len(all_rollups) or 1) as pool:
634+
for amount in pool.map(_fetch_slashed, all_rollups):
635+
total_slashed_funds += amount
616636

617637
if total_slashed_funds > 0:
618638
print(f" Total slashed: {fmt(total_slashed_funds)} AZTEC from {len(all_rollups)} rollup(s)")
@@ -1254,13 +1274,15 @@ def display(atps, data):
12541274

12551275

12561276
def main():
1257-
# Discover all contract addresses from Registry
1258-
contract_addrs = discover_contract_addresses()
1259-
1277+
# discover_contract_addresses and fetch_atps are independent — run in parallel
12601278
print("\n" + "=" * 70)
1261-
print(" FETCHING ATP CREATION EVENTS")
1279+
print(" DISCOVERING CONTRACTS + FETCHING ATP EVENTS (parallel)")
12621280
print("=" * 70)
1263-
atps = fetch_atps()
1281+
with ThreadPoolExecutor(max_workers=2) as pool:
1282+
fut_addrs = pool.submit(discover_contract_addresses)
1283+
fut_atps = pool.submit(fetch_atps)
1284+
contract_addrs = fut_addrs.result()
1285+
atps = fut_atps.result()
12641286
print(f" Found {len(atps)} ATPs total")
12651287

12661288
print("\n" + "=" * 70)

0 commit comments

Comments
 (0)