Skip to content

Commit

Permalink
Merge branch 'update'
Browse files Browse the repository at this point in the history
# Conflicts:
#	setup.py
  • Loading branch information
madnadyka committed Dec 21, 2022
2 parents b8a0f3d + ef2dd38 commit 1c174f0
Show file tree
Hide file tree
Showing 20 changed files with 894 additions and 1,005 deletions.
File renamed without changes.
324 changes: 324 additions & 0 deletions ethereumconnector/connector.py

Large diffs are not rendered by default.

36 changes: 36 additions & 0 deletions ethereumconnector/connector_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import time

def get_last_block_height(app):
return list(app.block_cache._store.values())[-1] if app.block_cache._store else -1

def get_last_block_hash(app):
return list(app.block_cache._store.keys())[-1] if app.block_cache._store else None

def get_block_hash_by_height(app, block_height):
try:
index = list(app.block_cache._store.values()).index(block_height)
except:
index = -1
return list(app.block_cache_store.keys())[index] if index !=-1 else None

def get_expired_tx(pending_tx_cache, expired_time):
pendings_expired_hash_list = []
expired_timestamp = int(time.time()) - expired_time
for key, value in pending_tx_cache._store.items():
if value[1] <= expired_timestamp:
pendings_expired_hash_list.append(key)
return pendings_expired_hash_list

def remove_orphan(app,block_height,bin_block_hash):
for key, value in app.confirmed_tx_cache._store.items():
if value[0]==block_height:
tx_cache=(-1,value[1])
app.confirmed_tx_cache.pop(key)
app.pending_tx_cache.set(key, tx_cache)
app.block_cache.pop(bin_block_hash)






85 changes: 85 additions & 0 deletions ethereumconnector/connector_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import asyncio
from binascii import hexlify, unhexlify
import asyncpg
from .utils import *
import traceback

async def init_db(app,conn):
level = await conn.fetchval("SHOW TRANSACTION ISOLATION LEVEL;")
if level != "repeatable read":
app.log.warning("RECOMMENDED Postgres isolation level is REPEATABLE READ! Current isolation level is %s" % level.upper())
await conn.execute("""
CREATE TABLE IF NOT EXISTS connector_block (
height BIGINT DEFAULT NULL,
hash bytea NOT NULL PRIMARY KEY,
previous_hash bytea,
timestamp INT4 DEFAULT 0
);
""")

await conn.execute("""
CREATE INDEX IF NOT EXISTS connector_block_height ON connector_block USING BTREE (height desc);
""")

await conn.execute("""
CREATE TABLE IF NOT EXISTS connector_transaction (
height INT4 DEFAULT NULL,
hash bytea NOT NULL PRIMARY KEY ,
timestamp INT4 DEFAULT 0,
last_timestamp INT4 DEFAULT 0,
affected BIT(1) DEFAULT 'B0'
);
""")

await conn.execute("""
CREATE INDEX IF NOT EXISTS connector_transaction_height ON connector_transaction USING BTREE (height desc);
""")

async def create_pool(app):
app.db_pool = await asyncpg.create_pool(dsn=app.postgresql_dsn, loop=app.loop,
min_size=1, max_size=app.postgresql_pool_max_size)

async def ping(app):
async with app.db_pool.acquire() as conn:
await conn.fetchval("SELECT 1")

async def cache_load_handler(app, conn):
stmt = await conn.prepare("SELECT hash, height, last_timestamp FROM connector_transaction WHERE height IS NOT NULL ORDER BY height DESC, last_timestamp DESC LIMIT $1;")
rows = await stmt.fetch(DEFAULT_CONFIRMED_TX_CACHE_SIZE)
rows = sorted(rows, key=lambda i: i["height"])
[app.confirmed_tx_cache.set(row["hash"], (row["height"], row["last_timestamp"])) for row in rows]
stmt = await conn.prepare("SELECT hash, height, last_timestamp FROM connector_transaction WHERE height IS NULL ORDER BY last_timestamp DESC;")
rows = await stmt.fetch()
rows = sorted(rows, key=lambda i: i["last_timestamp"])
[app.pending_tx_cache.set(row["hash"], (-1, row["last_timestamp"])) for row in rows]
stmt = await conn.prepare("SELECT hash, height FROM connector_block ORDER BY height DESC LIMIT $1;")
rows = await stmt.fetch(DEFAULT_BLOCK_CACHE_SIZE)
rows = sorted(rows, key=lambda i: i["height"])
[app.block_cache.set(row["hash"], row["height"]) for row in rows]

async def block_handler(block, conn):
stmt = await conn.prepare("INSERT INTO connector_block (hash, height, previous_hash, timestamp) VALUES ($1, $2, $3, $4);")
await stmt.fetch(unhexlify(block["hash"][2:]), int(block["number"], 16), unhexlify(block["parentHash"][2:]) if "parentHash" in block else None, int(block['timestamp'], 16))
tx_hash_list = [unhexlify(tx["hash"][2:]) for tx in block["transactions"]]
stmt = await conn.prepare("UPDATE connector_transaction SET height = $1, timestamp=$2 WHERE hash = ANY ($3);")
await stmt.fetch(int(block["number"], 16), int(block['timestamp'], 16), tx_hash_list)

async def tx_handler(tx, conn):
stmt = await conn.prepare("INSERT INTO connector_transaction (hash, timestamp, last_timestamp, affected) VALUES ($1, $2, $3, $4);")
await stmt.fetch(unhexlify(tx["hash"][2:]), tx["timestamp"], tx["timestamp"], asyncpg.BitString('1') if tx["handler_result"] else asyncpg.BitString('0'))

async def pending_tx_update_handler(bin_tx_hash,last_seen_timestamp,conn):
stmt = await conn.prepare("UPDATE connector_transaction SET last_timestamp = $1 WHERE hash=$2;")
await stmt.fetch(last_seen_timestamp,bin_tx_hash)

async def pending_tx_expire_handler(expired_hash_list, conn):
stmt = await conn.prepare("DELETE FROM connector_transaction WHERE hash = ANY ($1)")
await stmt.fetch(expired_hash_list)

async def orphan_handler(orphan_block_height,orphan_bin_block_hash, conn):
stmt = await conn.prepare("DELETE FROM connector_block WHERE hash = $1;")
await stmt.fetch(orphan_bin_block_hash)
stmt = await conn.prepare("UPDATE connector_transaction SET height = NULL WHERE height=$1;")
await stmt.fetch(orphan_block_height)


91 changes: 91 additions & 0 deletions ethereumconnector/handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import asyncio
from . import node,connector_db
from .utils import *
import traceback

@transaction
async def pending_tx_expire(app, pendings_expired_hash_list, **kwargs):
conn = kwargs.get("conn", None)
if app.pending_tx_expire_handler:
await app.pending_tx_expire_handler(pendings_expired_hash_list, conn=conn)
if app.connector_db:
await connector_db.pending_tx_expire_handler(pendings_expired_hash_list, conn)

@transaction
async def pending_tx_update(app, bin_tx_hash, last_seen_timestamp, **kwargs):
conn = kwargs.get("conn", None)
if app.pending_tx_update_handler:
await app.pending_tx_update_handler(bin_tx_hash, last_seen_timestamp, conn=conn)
if app.connector_db:
await connector_db.pending_tx_update_handler(bin_tx_hash, last_seen_timestamp, conn)

@transaction
async def tx(app, tx, **kwargs):
conn = kwargs.get("conn", None)
tx["handler_result"] = 0
if app.tx_handler:
await app.tx_handler(tx, conn=conn)
if tx["handler_result"] != 0 and tx["handler_result"] != 1: raise Exception('tx handler error')
if app.connector_db:
await connector_db.tx_handler(tx, conn)

@transaction
async def orphan(app,orphan_block_height,orphan_bin_block_hash, **kwargs):
conn = kwargs.get("conn", None)
if app.orphan_handler:
await app.orphan_handler(orphan_block_height, orphan_bin_block_hash, conn=conn)
if app.connector_db:
await connector_db.orphan_handler(orphan_block_height, orphan_bin_block_hash, conn)

@transaction
async def block(app, block, **kwargs):
conn=kwargs.get("conn",None)
if app.block_handler:
await app.block_handler(block, conn=conn)
if app.connector_db:
await connector_db.block_handler(block, conn)

async def preload_blocks(app):
while True:
try:
start_preload_block_height= max(app.last_block_height + 1000, app.last_preload_block_height)
if app.node_last_block > start_preload_block_height:
stop_preload_block_height = min(start_preload_block_height + max(DEFAULT_BLOCK_PRELOAD_CACHE_SIZE - app.block_preload_cache.len(),0),app.node_last_block)
if stop_preload_block_height>start_preload_block_height+1000:
app.log.info("start preloading block task from %s to %s" % (start_preload_block_height,stop_preload_block_height))
preload_tasks=[]
for i in range(app.preload_workers):
preload_tasks.append(app.loop.create_task(preload_blocks_worker(app,i, start_preload_block_height+i,stop_preload_block_height)))
if preload_tasks: await asyncio.wait(preload_tasks)
app.log.info("DONE preloading block task from %s to %s" % (start_preload_block_height, stop_preload_block_height))
except asyncio.CancelledError:
app.log.info("connector preload_block terminated")
break
except:
app.log.error(str(traceback.format_exc()))
await asyncio.sleep(10)

async def preload_blocks_worker(app, i, start_preload_block_height,stop_preload_block_height):
preload_block_height = start_preload_block_height
while True:
if not app.active: raise asyncio.CancelledError
if preload_block_height < app.last_block_height:
preload_block_height += app.preload_workers
continue
ex = app.block_preload_cache.get(preload_block_height)
if ex:
preload_block_height += app.preload_workers
else:
app.log.debug('preload block height %s worker %s' % (preload_block_height, i))
try:
block = await node.get_block_by_height(app, preload_block_height)
except:
block=None
app.log.error("preload worker can't receive block height %s" %preload_block_height)
if block:
app.block_preload_cache.set(preload_block_height, block)
app.last_preload_block_height = max(app.last_preload_block_height, preload_block_height)
preload_block_height += app.preload_workers
if preload_block_height > stop_preload_block_height:
break

110 changes: 110 additions & 0 deletions ethereumconnector/node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
from .utils import *
import traceback

async def health_check(app):
try:
return await app.rpc.eth_syncing()
except Exception:
app.log.error("Health check failed")
raise

async def get_last_block(app):
try:
return await app.rpc.eth_blockNumber()
except Exception:
app.log.error(str(traceback.format_exc()))
app.log.error("Get last_block failed")
raise

async def get_transaction(app, tx_hash):
try:
return await app.rpc.eth_getTransactionByHash(tx_hash)
except Exception:
app.log.error(str(traceback.format_exc()))
app.log.error("Get transaction %s failed" % tx_hash)
raise

async def get_block_uncles(app, block_hash, index):
try:
return await app.rpc.eth_getUncleByBlockHashAndIndex(block_hash, hex(index))
except Exception:
app.log.error(str(traceback.format_exc()))
app.log.error("Get uncle %s failed" % block_hash)
raise

async def get_block_by_height(app, block_height):
try:
block = await app.rpc.eth_getBlockByNumber(hex(block_height), True)
if block is None:
await asyncio.sleep(1)
else:
if not(block["number"] == hex(block_height)): raise Exception
await get_block_trace_and_receipt(app,block_height, block["hash"], block["transactions"])
uncles_data = []
if block["uncles"]:
for index in range(len(block["uncles"])):
u_data = await get_block_uncles(app,block["hash"], index)
uncles_data.append(u_data)
block['uncles_data'] = uncles_data
block['details'] = True
return block
except Exception:
app.log.error(str(traceback.format_exc()))
app.log.error("Get block by height %s failed" % block_height)
raise

async def get_block_trace_and_receipt(app, block_height, block_hash, transactions):
if transactions:
trace_tx = {}
if app.trace:
block_trace = await app.rpc.trace_block(hex(block_height))
if not (block_trace[0]['blockHash'] == block_hash):
raise Exception ('block trace hash %s block hash %s' %(block_trace[0]['blockHash'],block_hash))
for tx in block_trace:
if 'author' in tx['action']:continue
if not tx['transactionHash'] in trace_tx:
trace_tx[tx['transactionHash']] = list()
trace_tx[tx['transactionHash']].append(tx)
receipt = {}
if CLIENTS[app.client]["getBlockReceipts_method"]:
func_name = CLIENTS[app.client]["getBlockReceipts_method"]
func = getattr(app.rpc, func_name)
block_receipt = await func(hex(block_height))
if not (block_receipt[0]['blockHash'] == block_hash):
raise Exception('block receipt hash %s block hash %s' % (block_receipt[0]['blockHash'], block_hash))
else:
block_receipt=[]
for tx in transactions:
tx_receipt = await app.rpc.eth_getTransactionReceipt(tx["hash"])
block_receipt.append(tx_receipt)
for tx in block_receipt:
if not tx['transactionHash'] in receipt:
receipt[tx['transactionHash']] = {}
if 'status' in tx:
receipt[tx['transactionHash']]['status'] = tx['status']
else:
receipt[tx['transactionHash']]['status'] = '0x1'
receipt[tx['transactionHash']]['logs'] = tx['logs']
receipt[tx['transactionHash']]['gasUsed']=tx['gasUsed']
receipt[tx['transactionHash']]['effectiveGasPrice']=tx['effectiveGasPrice']
for tx in transactions:
if receipt[tx['hash']]['status'] == '0x0':
tx['status'] = 0
else:
tx['status'] = 1
tx['logs'] = receipt[tx['hash']]['logs']
tx['gasUsed'] = receipt[tx['hash']]['gasUsed']
tx['effectiveGasPrice'] = receipt[tx['hash']]['effectiveGasPrice']
if tx['hash'] in trace_tx:
if 'result' in tx:
tx['result'] = trace_tx[tx['hash']][0]['result']
else:
tx['result']=None
tx['trace'] = trace_tx[tx['hash']]
if 'error' in trace_tx[tx['hash']][0]:
tx['status'] = 0




Loading

0 comments on commit 1c174f0

Please sign in to comment.