forked from lAmeR1/kaspa-db-filler
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathBlocksProcessor.py
353 lines (302 loc) · 16.8 KB
/
BlocksProcessor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
import asyncio
import logging
import sys
from datetime import datetime
from sqlalchemy.exc import IntegrityError
from dbsession import session_maker
from models.Block import Block
from models.Transaction import Transaction, TransactionOutput, TransactionInput
from utils.Event import Event
_logger = logging.getLogger(__name__)
CLUSTER_SIZE = 5
CLUSTER_WAIT_SECONDS = 4
B_TREE_SIZE = 2500
task_runner = None
class BlocksProcessor(object):
"""
BlocksProcessor polls hoosat for blocks and adds the meta information and it's transactions into database.
"""
def __init__(self, client, vcp_instance, balance, batch_processing = False, env_enable_balance = False):
self.client = client
self.blocks_to_add = []
self.balance = balance
self.addresses_to_update = []
self.txs = {}
self.txs_output = []
self.txs_input = []
self.vcp = vcp_instance
self.env_enable_balance = env_enable_balance
self.batch_processing = batch_processing
# Did the loop already see the DAG tip
self.synced = False
async def loop(self, start_point):
# go through each block added to DAG
_logger.info('Start processing blocks from %s', start_point)
async for block_hash, block in self.blockiter(start_point):
# prepare add block and tx to database
await self.__add_block_to_queue(block_hash, block)
await self.__add_tx_to_queue(block_hash, block)
# if cluster size is reached, insert to database
cluster_size = CLUSTER_SIZE
if not self.synced:
cluster_size *= 50
if len(self.blocks_to_add) >= cluster_size:
_logger.debug(f'Committing {cluster_size} blocks at {block_hash}')
await self.commit_blocks()
if self.batch_processing == False:
await self.commit_txs()
else:
await self.batch_commit_txs()
asyncio.create_task(self.handle_blocks_committed())
if self.env_enable_balance != False and self.synced:
asyncio.create_task(self.commit_balances(self.addresses_to_update))
async def commit_balances(self, addresses):
unique_addresses = list(set(addresses))
for address in unique_addresses:
await self.balance.update_balance_from_rpc(address)
await asyncio.sleep(0.1)
async def handle_blocks_committed(self):
"""
this function is executed, when a new cluster of blocks were added to the database
"""
global task_runner
while task_runner and not task_runner.done():
return
task_runner = asyncio.create_task(self.vcp.yield_to_database())
async def blockiter(self, start_point):
"""
generator for iterating the blocks added to the blockDAG
"""
low_hash = start_point
while True:
_logger.info('New low hash block %s.', low_hash)
daginfo = await self.client.request("getBlockDagInfoRequest", {})
resp = await self.client.request("getBlocksRequest",
params={
"lowHash": low_hash,
"includeTransactions": True,
"includeBlocks": True
},
timeout=60)
# go through each block and yield
block_hashes = resp["getBlocksResponse"].get("blockHashes", [])
_logger.info(f'Received {len(block_hashes)} blocks from getBlocksResponse')
blocks = resp["getBlocksResponse"].get("blocks", [])
for i, blockHash in enumerate(block_hashes):
if daginfo["getBlockDagInfoResponse"]["tipHashes"][0] == blockHash:
_logger.info('Found tip hash. Generator is synced now.')
self.synced = True
break # Dont iterate over the tipHash, because getBlock request returns old blocks.
# yield blockhash and it's data
yield blockHash, blocks[i]
if self.synced:
low_hash = daginfo["getBlockDagInfoResponse"]["tipHashes"][0]
else:
if len(block_hashes) > 1:
low_hash = block_hashes[len(block_hashes) - 1]
_logger.info(f'Waiting for the next blocks request.')
await asyncio.sleep(CLUSTER_WAIT_SECONDS)
async def __add_tx_to_queue(self, block_hash, block):
"""
Adds block's transactions to queue. This is only prepartion without commit!
"""
self.addresses_to_update = []
if block.get("transactions") is not None:
for transaction in block["transactions"]:
if transaction.get("verboseData") is not None:
tx_id = transaction["verboseData"]["transactionId"]
# Check, that the transaction isn't prepared yet. Otherwise ignore
if not self.is_tx_id_in_queue(tx_id):
# Add transaction
self.txs[tx_id] = Transaction(subnetwork_id=transaction["subnetworkId"],
transaction_id=tx_id,
hash=transaction["verboseData"]["hash"],
mass=transaction["verboseData"].get("mass"),
block_hash=[transaction["verboseData"]["blockHash"]],
block_time=int(transaction["verboseData"]["blockTime"]))
for index, out in enumerate(transaction.get("outputs", [])):
address = out["verboseData"]["scriptPublicKeyAddress"]
amount = out["amount"]
if self.env_enable_balance != False:
if address not in self.addresses_to_update:
self.addresses_to_update.append(address)
self.txs_output.append(TransactionOutput(transaction_id=tx_id,
index=index,
amount=amount,
script_public_key=out["scriptPublicKey"]["scriptPublicKey"],
script_public_key_address=address,
script_public_key_type=out["verboseData"]["scriptPublicKeyType"]))
for index, tx_in in enumerate(transaction.get("inputs", [])):
if self.env_enable_balance != False:
prev_out_tx_id = tx_in["previousOutpoint"]["transactionId"]
prev_out_index = int(tx_in["previousOutpoint"].get("index", 0))
with session_maker() as session:
prev_output = session.query(TransactionOutput).filter_by(
transaction_id=prev_out_tx_id,
index=prev_out_index
).first()
if prev_output:
address = prev_output.script_public_key_address
if address not in self.addresses_to_update:
self.addresses_to_update.append(address)
self.txs_input.append(TransactionInput(transaction_id=tx_id,
index=index,
previous_outpoint_hash=tx_in["previousOutpoint"]["transactionId"],
previous_outpoint_index=int(tx_in["previousOutpoint"].get("index", 0)),
signature_script=tx_in["signatureScript"],
sig_op_count=tx_in.get("sigOpCount", 0)))
else:
# If the block if already in the Queue, merge the block_hashes.
self.txs[tx_id].block_hash = list(set(self.txs[tx_id].block_hash + [block_hash]))
async def batch_commit_txs(self):
"""
Add all queued transactions and their in- and outputs to the database in batches
to avoid exceeding PostgreSQL limits.
"""
BATCH_SIZE = 5 # Define a suitable batch size
# First, handle updates for existing transactions.
tx_ids_to_add = list(self.txs.keys())
# Calculate the number of batches needed for updating existing transactions
num_batches = len(tx_ids_to_add) // BATCH_SIZE + (1 if len(tx_ids_to_add) % BATCH_SIZE > 0 else 0)
# Handle updates for existing transactions in batches
for i in range(num_batches):
with session_maker() as session:
# Determine the subset of transaction IDs for this batch
batch_tx_ids = tx_ids_to_add[i * BATCH_SIZE : (i + 1) * BATCH_SIZE]
# Query only the transactions in the current batch
tx_items = session.query(Transaction).filter(Transaction.transaction_id.in_(batch_tx_ids)).all()
for tx_item in tx_items:
# Update block_hash by combining existing ones with new ones from self.txs
new_block_hashes = list(set(tx_item.block_hash) | set(self.txs[tx_item.transaction_id].block_hash))
tx_item.block_hash = new_block_hashes
# Remove the transaction from self.txs since it's now been processed
self.txs.pop(tx_item.transaction_id)
# Commit the updates for this batch
try:
session.commit()
except Exception as e:
session.rollback()
_logger.error(f'Error updating transactions in batch {i+1}/{num_batches}: {e}')
# Pre-map outputs and inputs to their transaction IDs for new transactions
outputs_by_tx = {tx_id: [] for tx_id in self.txs.keys()}
for output in self.txs_output:
if output.transaction_id in outputs_by_tx:
outputs_by_tx[output.transaction_id].append(output)
inputs_by_tx = {tx_id: [] for tx_id in self.txs.keys()}
for input in self.txs_input:
if input.transaction_id in inputs_by_tx:
inputs_by_tx[input.transaction_id].append(input)
# Now, handle insertion of new transactions in batches.
all_new_txs = list(self.txs.values())
# Insert new transactions and their inputs/outputs in batches
for i in range(0, len(all_new_txs), BATCH_SIZE):
with session_maker() as session:
batch_txs = all_new_txs[i:i + BATCH_SIZE]
batch_tx_ids = [tx.transaction_id for tx in batch_txs]
batch_outputs = [output for tx_id in batch_tx_ids for output in outputs_by_tx[tx_id]]
batch_inputs = [input for tx_id in batch_tx_ids for input in inputs_by_tx[tx_id]]
# Add all new transactions, outputs, and inputs to the session
session.add_all(batch_txs)
session.add_all(batch_outputs)
session.add_all(batch_inputs)
try:
session.commit()
# _logger.info(f'Added {len(batch_txs)} TXs to database in a batch.')
except Exception as e:
session.rollback()
_logger.error(f'Error adding TXs to database in a batch {i+1}/{num_batches}: {e}')
# Reset queues after all batches have been processed.
self.txs = {}
self.txs_input = []
self.txs_output = []
# Original commit_txs
async def commit_txs(self):
"""
Add all queued transactions and their in- and outputs to the database
"""
tx_ids_to_add = list(self.txs.keys())
# Use a single session and avoid duplicate transactions
with session_maker() as session:
# Check if any transactions already exist in the database
tx_items = session.query(Transaction).filter(Transaction.transaction_id.in_(tx_ids_to_add)).all()
# Update existing transactions (if any) and remove them from the queue
for tx_item in tx_items:
tx_item.block_hash = list(set(tx_item.block_hash) | set(self.txs[tx_item.transaction_id].block_hash))
self.txs.pop(tx_item.transaction_id)
# Now add new transactions and their inputs/outputs
for txv in self.txs.values():
session.add(txv) # This will insert new or update existing transaction
# Add related outputs and inputs
for tx_output in self.txs_output:
if tx_output.transaction_id in self.txs:
session.add(tx_output)
for tx_input in self.txs_input:
if tx_input.transaction_id in self.txs:
session.add(tx_input)
try:
session.commit() # Commit only once after all changes
# Reset queues
self.txs = {}
self.txs_input = []
self.txs_output = []
except IntegrityError:
session.rollback()
_logger.error('Error adding TXs to database')
raise
async def __add_block_to_queue(self, block_hash, block):
"""
Adds a block to the queue, which is used for adding a cluster
"""
if 'parents' in block["header"] and block["header"]["parents"]:
parent_hashes = block["header"]["parents"][0].get("parentHashes", [])
else:
parent_hashes = []
block_entity = Block(hash=block_hash,
accepted_id_merkle_root=block["header"]["acceptedIdMerkleRoot"],
difficulty=block["verboseData"]["difficulty"],
is_chain_block=block["verboseData"].get("isChainBlock", False),
merge_set_blues_hashes=block["verboseData"].get("mergeSetBluesHashes", []),
merge_set_reds_hashes=block["verboseData"].get("mergeSetRedsHashes", []),
selected_parent_hash=block["verboseData"]["selectedParentHash"],
bits=block["header"]["bits"],
blue_score=int(block["header"].get("blueScore", 0)),
blue_work=block["header"]["blueWork"],
daa_score=int(block["header"].get("daaScore", 0)),
hash_merkle_root=block["header"]["hashMerkleRoot"],
nonce=block["header"]["nonce"],
parents=parent_hashes,
pruning_point=block["header"]["pruningPoint"],
timestamp=datetime.fromtimestamp(int(block["header"]["timestamp"]) / 1000).isoformat(),
utxo_commitment=block["header"]["utxoCommitment"],
version=block["header"].get("version", 0))
# remove same block hash
self.blocks_to_add = [b for b in self.blocks_to_add if b.hash != block_hash]
self.blocks_to_add.append(block_entity)
async def commit_blocks(self):
"""
Insert queued blocks to database
"""
# delete already set old blocks
with session_maker() as session:
d = session.query(Block).filter(
Block.hash.in_([b.hash for b in self.blocks_to_add])).delete()
session.commit()
# insert blocks
with session_maker() as session:
for block in self.blocks_to_add:
session.add(block)
try:
session.commit()
_logger.debug(f'Added {len(self.blocks_to_add)} blocks to database. '
f'Timestamp: {self.blocks_to_add[-1].timestamp}')
# reset queue
self.blocks_to_add = []
except IntegrityError:
session.rollback()
_logger.error('Error adding group of blocks')
raise
def is_tx_id_in_queue(self, tx_id):
"""
Checks if given TX ID is already in the queue
"""
return tx_id in self.txs