Skip to content

Commit

Permalink
wip - fully working
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 17, 2024
1 parent 09c7cd5 commit bcab73b
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 17 deletions.
2 changes: 1 addition & 1 deletion hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def GENESIS_TX2_TIMESTAMP(self) -> int:
PEER_CONNECTION_RETRY_MAX_RETRY_INTERVAL: int = 300

# Number max of connections in the p2p network
PEER_MAX_CONNECTIONS: int = 1
PEER_MAX_CONNECTIONS: int = 125

# Maximum period without receiving any messages from ther peer (in seconds).
PEER_IDLE_TIMEOUT: int = 60
Expand Down
6 changes: 3 additions & 3 deletions hathor/multiprocess/multiprocess_p2p_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from twisted.internet.interfaces import IProcessTransport
from twisted.protocols import amp

from hathor.multiprocess.node_ipc_server import NodeIpcServerFactor
from hathor.multiprocess.node_ipc_server import NodeIpcServerFactory
from hathor.multiprocess.p2p_ipc_main import P2P_IPC_MAIN
from hathor.multiprocess.p2p_ipc_server import Start
from hathor.p2p.factory import HathorClientFactory, HathorServerFactory
Expand Down Expand Up @@ -59,7 +59,7 @@ async def start(self) -> None:
outbound_socket = os.path.join(self._tmp_dir.name, 'out.sock')
inbound_socket = os.path.join(self._tmp_dir.name, 'in.sock')

server_factory = NodeIpcServerFactor(vertex_parser=self.vertex_parser, vertex_handler=self.vertex_handler, tx_storage=self.tx_storage)
server_factory = NodeIpcServerFactory(vertex_parser=self.vertex_parser, vertex_handler=self.vertex_handler, tx_storage=self.tx_storage)
server_endpoint = UNIXServerEndpoint(reactor=self.reactor, address=outbound_socket)
server_endpoint.listen(server_factory)

Expand All @@ -72,7 +72,7 @@ async def start(self) -> None:
)

client_endpoint = UNIXClientEndpoint(reactor=self.reactor, path=inbound_socket)
time.sleep(5) # TODO: Couldn't use timeout in endpoint
time.sleep(1) # TODO: Couldn't use timeout in endpoint? Improve this
client: amp.AMP = await connectProtocol(client_endpoint, amp.AMP())
await client.callRemote(Start)

Expand Down
2 changes: 1 addition & 1 deletion hathor/multiprocess/node_ipc_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def get_vertex(self, vertex_id: VertexId) -> dict[str, Any]:
return dict(vertex_bytes=[bytes(vertex), vertex.static_metadata.json_dumpb()])


class NodeIpcServerFactor(ServerFactory):
class NodeIpcServerFactory(ServerFactory):
__slots__ = ('vertex_parser', 'vertex_handler', 'tx_storage')

def __init__(self, *, vertex_parser: VertexParser, vertex_handler: VertexHandler, tx_storage: TransactionStorage) -> None:
Expand Down
4 changes: 2 additions & 2 deletions hathor/multiprocess/p2p_ipc_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ async def main(reactor: ReactorProtocol, settings: HathorSettings, inbound_socke
),
hostname=None,
)

# TODO: Hardcoded configs
p2p_manager.add_peer_discovery(DNSPeerDiscovery(settings.BOOTSTRAP_DNS))

server_factory = P2PIpcServerFactory(p2p_manager=p2p_manager)
server_endpoint = UNIXServerEndpoint(reactor=reactor, address=outbound_socket)
server_endpoint.listen(server_factory)

if __name__ == '__main__':
# import pydevd_pycharm
# pydevd_pycharm.settrace('localhost', port=8090, stdoutToServer=True, stderrToServer=True)
_, inbound_socket, outbound_socket = sys.argv
reactor = initialize_global_reactor()
settings = get_global_settings()
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/p2p_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def enable_rate_limiter(self, max_hits: int = 16, window_seconds: float = 1) ->
async def start(self) -> None:
"""Listen on the given address descriptions and start accepting and processing connections."""
self.lc_reconnect.start(5, now=False)
# self.lc_sync_update.start(self.lc_sync_update_interval, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

if self._settings.ENABLE_PEER_WHITELIST:
self._start_whitelist_reconnect()
Expand Down
8 changes: 4 additions & 4 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -841,7 +841,7 @@ def handle_best_block(self, payload: str) -> None:
if deferred:
deferred.callback(best_block)

def start_transactions_streaming(self, partial_blocks: list[Block]) -> Deferred[StreamEnd]:
async def start_transactions_streaming(self, partial_blocks: list[Block]) -> StreamEnd:
"""Request peer to start streaming transactions to us."""
self._tx_streaming_client = TransactionStreamingClient(
self, partial_blocks, limit=self.DEFAULT_STREAMING_LIMIT, dependencies=self.dependencies
Expand All @@ -850,12 +850,12 @@ def start_transactions_streaming(self, partial_blocks: list[Block]) -> Deferred[
start_from: list[bytes] = []
first_block_hash = partial_blocks[0].hash
last_block_hash = partial_blocks[-1].hash
self.log.info('requesting transactions streaming',
self.log.info('requesting transactions streaming (start)',
start_from=[x.hex() for x in start_from],
first_block=first_block_hash.hex(),
last_block=last_block_hash.hex())
self.send_get_transactions_bfs(start_from, first_block_hash, last_block_hash)
return self._tx_streaming_client.wait()
return await self._tx_streaming_client.wait()

def resume_transactions_streaming(self) -> Deferred[StreamEnd]:
"""Resume transaction streaming."""
Expand All @@ -866,7 +866,7 @@ def resume_transactions_streaming(self) -> Deferred[StreamEnd]:
start_from = list(self._tx_streaming_client._waiting_for)
first_block_hash = partial_blocks[0].hash
last_block_hash = partial_blocks[-1].hash
self.log.info('requesting transactions streaming',
self.log.info('requesting transactions streaming (resume)',
start_from=[x.hex() for x in start_from],
first_block=first_block_hash.hex(),
last_block=last_block_hash.hex())
Expand Down
6 changes: 3 additions & 3 deletions hathor/p2p/sync_v2/transaction_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def __init__(
self._db: dict[VertexId, BaseTransaction] = {}
self._existing_deps: set[VertexId] = set()

def wait(self) -> Deferred[StreamEnd]:
async def wait(self) -> StreamEnd:
"""Return the deferred."""
self._prepare_block(self.partial_blocks[0])
return self._deferred
await self._prepare_block(self.partial_blocks[0])
return await self._deferred

def resume(self) -> Deferred[StreamEnd]:
"""Resume receiving vertices."""
Expand Down
2 changes: 1 addition & 1 deletion hathor/storage/rocksdb_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__(self, path: str = './', cache_capacity: Optional[int] = None, *, op
column_families = {cf: rocksdb.ColumnFamilyOptions() for cf in cf_names}

# finally, open the database
self._db = rocksdb.DB(db_path, options, column_families=column_families, secondary_path=secondary_path)
self._db = rocksdb.DB(db_path, options, column_families=column_families)
self.log.debug('open db', cf_list=[cf.name.decode('ascii') for cf in self._db.column_families])

def get_db(self) -> 'rocksdb.DB':
Expand Down
6 changes: 5 additions & 1 deletion hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1138,7 +1138,11 @@ def can_validate_full(self, vertex: Vertex) -> bool:
"""
if vertex.is_genesis:
return True
deps = vertex.get_all_dependencies()
# TODO: temporarily removing the parent block from the validation to help async block streaming
# deps = vertex.get_all_dependencies()
deps = vertex.parents + [i.tx_id for i in vertex.inputs]
if vertex.is_block:
deps = deps[1:]
all_exist = True
all_valid = True
# either they all exist and are fully valid
Expand Down

0 comments on commit bcab73b

Please sign in to comment.