Skip to content

Commit

Permalink
refactor(p2p): minor pre-IPC refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 23, 2024
1 parent 5234061 commit 5d0d2fb
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 76 deletions.
10 changes: 9 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ def build(self) -> BuildArtifacts:
vertex_handler = self._get_or_create_vertex_handler()
vertex_parser = self._get_or_create_vertex_parser()
poa_block_producer = self._get_or_create_poa_block_producer()
capabilities = self._get_or_create_capabilities()

if self._enable_address_index:
indexes.enable_address_index(pubsub)
Expand Down Expand Up @@ -263,7 +264,7 @@ def build(self) -> BuildArtifacts:
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
capabilities=capabilities,
environment_info=get_environment_info(self._cmdline, str(peer.id)),
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down Expand Up @@ -642,6 +643,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def _get_or_create_capabilities(self) -> list[str]:
if self._capabilities is None:
settings = self._get_or_create_settings()
self._capabilities = settings.get_default_capabilities()

return self._capabilities

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
Expand Down
2 changes: 2 additions & 0 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

cpu_mining_service = CpuMiningService()
capabilities = settings.get_default_capabilities()

p2p_manager = ConnectionsManager(
settings=settings,
Expand Down Expand Up @@ -384,6 +385,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
vertex_handler=vertex_handler,
vertex_parser=vertex_parser,
poa_block_producer=poa_block_producer,
capabilities=capabilities,
)

if self._args.x_ipython_kernel:
Expand Down
8 changes: 8 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
validators=_VALIDATORS
)

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities."""
return [
self.CAPABILITY_WHITELIST,
self.CAPABILITY_SYNC_VERSION,
self.CAPABILITY_GET_BEST_BLOCKCHAIN
]


def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""
Expand Down
15 changes: 2 additions & 13 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ def __init__(
execution_manager: ExecutionManager,
vertex_handler: VertexHandler,
vertex_parser: VertexParser,
capabilities: list[str],
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
checkpoints: Optional[list[Checkpoint]] = None,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
Expand Down Expand Up @@ -230,10 +230,7 @@ def __init__(
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
else:
self.capabilities = self.get_default_capabilities()
self.capabilities = capabilities

# This is included in some logs to provide more context
self.environment_info = environment_info
Expand All @@ -245,14 +242,6 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
return [
self._settings.CAPABILITY_WHITELIST,
self._settings.CAPABILITY_SYNC_VERSION,
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN
]

def start(self) -> None:
""" A factory must be started only once. And it is usually automatically started.
"""
Expand Down
25 changes: 14 additions & 11 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,26 @@ def collect_peer_connection_metrics(self) -> None:
self.peer_connection_metrics.clear()

for connection in self.connections.connections:
if not connection._peer:
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
# So we can just discard it for the sake of the metrics
continue

entrypoint = connection.get_entrypoint()
metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.entrypoint) if connection.entrypoint else "",
peer_id=str(connection.peer.id),
connection_string=str(entrypoint) if entrypoint else '',
peer_id=str(peer.id),
network=settings.NETWORK_NAME,
received_messages=connection.metrics.received_messages,
sent_messages=connection.metrics.sent_messages,
received_bytes=connection.metrics.received_bytes,
sent_bytes=connection.metrics.sent_bytes,
received_txs=connection.metrics.received_txs,
discarded_txs=connection.metrics.discarded_txs,
received_blocks=connection.metrics.received_blocks,
discarded_blocks=connection.metrics.discarded_blocks,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
received_bytes=metrics.received_bytes,
sent_bytes=metrics.sent_bytes,
received_txs=metrics.received_txs,
discarded_txs=metrics.discarded_txs,
received_blocks=metrics.received_blocks,
discarded_blocks=metrics.discarded_blocks,
)

self.peer_connection_metrics.append(metric)
Expand Down
73 changes: 40 additions & 33 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,7 @@ def has_synced_peer(self) -> bool:
"""
connections = list(self.iter_ready_connections())
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
if conn.state.is_synced():
if conn.is_synced():
return True
return False

Expand All @@ -357,9 +355,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
connections = list(self.iter_ready_connections())
self.rng.shuffle(connections)
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
conn.state.send_tx_to_peer(tx)
conn.send_tx_to_peer(tx)

def disconnect_all_peers(self, *, force: bool = False) -> None:
"""Disconnect all peers."""
Expand Down Expand Up @@ -396,12 +392,10 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:

def on_peer_ready(self, protocol: HathorProtocol) -> None:
"""Called when a peer is ready."""
assert protocol.peer is not None
self.verified_peer_storage.add_or_replace(protocol.peer)
assert protocol.peer.id is not None

protocol_peer = protocol.get_peer()
self.verified_peer_storage.add_or_replace(protocol_peer)
self.handshaking_peers.remove(protocol)
self.unverified_peer_storage.pop(protocol.peer.id, None)
self.unverified_peer_storage.pop(protocol_peer.id, None)

# we emit the event even if it's a duplicate peer as a matching
# NETWORK_PEER_DISCONNECTED will be emitted regardless
Expand All @@ -411,7 +405,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

if protocol.peer.id in self.connected_peers:
if protocol_peer.id in self.connected_peers:
# connected twice to same peer
self.log.warn('duplicate connection to peer', protocol=protocol)
conn = self.get_connection_to_drop(protocol)
Expand All @@ -420,35 +414,35 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
# the new connection is being dropped, so don't save it to connected_peers
return

self.connected_peers[protocol.peer.id] = protocol
self.connected_peers[protocol_peer.id] = protocol

# In case it was a retry, we must reset the data only here, after it gets ready
protocol.peer.info.reset_retry_timestamp()
protocol_peer.info.reset_retry_timestamp()

if len(self.connected_peers) <= self.MAX_ENABLED_SYNC:
protocol.enable_sync()

if protocol.peer.id in self.always_enable_sync:
if protocol_peer.id in self.always_enable_sync:
protocol.enable_sync()

# Notify other peers about this new peer connection.
self.relay_peer_to_ready_connections(protocol.peer)
self.relay_peer_to_ready_connections(protocol_peer)

def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
"""Relay peer to all ready connections."""
for conn in self.iter_ready_connections():
if conn.peer == peer:
if conn.get_peer() == peer:
continue
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([peer])
conn.send_peers([peer])

def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
"""Called when a peer disconnect."""
self.connections.discard(protocol)
if protocol in self.handshaking_peers:
self.handshaking_peers.remove(protocol)
if protocol._peer is not None:
existing_protocol = self.connected_peers.pop(protocol.peer.id, None)
protocol_peer = protocol.get_peer_if_set()
if protocol_peer is not None:
existing_protocol = self.connected_peers.pop(protocol_peer.id, None)
if existing_protocol is None:
# in this case, the connection was closed before it got to READY state
return
Expand All @@ -458,7 +452,7 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
# A check for duplicate connections is done during PEER_ID state, but there's still a
# chance it can happen if both connections start at the same time and none of them has
# reached READY state while the other is on PEER_ID state
self.connected_peers[protocol.peer.id] = existing_protocol
self.connected_peers[protocol_peer.id] = existing_protocol
self.pubsub.publish(
HathorEvents.NETWORK_PEER_DISCONNECTED,
protocol=protocol,
Expand All @@ -480,8 +474,9 @@ def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]:
for connecting_peer in self.connecting_peers.values():
yield connecting_peer.entrypoint
for protocol in self.handshaking_peers:
if protocol.entrypoint is not None:
yield protocol.entrypoint
protocol_entrypoint = protocol.get_entrypoint()
if protocol_entrypoint is not None:
yield protocol_entrypoint
else:
self.log.warn('handshaking protocol has empty connection string', protocol=protocol)

Expand Down Expand Up @@ -723,30 +718,28 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2)
on the peer id string is used for this comparison.
"""
assert protocol.peer is not None
assert protocol.peer.id is not None
assert protocol.my_peer.id is not None
other_connection = self.connected_peers[protocol.peer.id]
if bytes(protocol.my_peer.id) > bytes(protocol.peer.id):
protocol_peer = protocol.get_peer()
other_connection = self.connected_peers[protocol_peer.id]
if bytes(self.my_peer.id) > bytes(protocol_peer.id):
# connection started by me is kept
if not protocol.inbound:
if not protocol.is_inbound():
# other connection is dropped
return other_connection
else:
# this was started by peer, so drop it
return protocol
else:
# connection started by peer is kept
if not protocol.inbound:
if not protocol.is_inbound():
return protocol
else:
return other_connection

def drop_connection(self, protocol: HathorProtocol) -> None:
""" Drop a connection
"""
assert protocol.peer is not None
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
protocol_peer = protocol.get_peer()
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection droped')

def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
Expand Down Expand Up @@ -843,3 +836,17 @@ def reload_entrypoints_and_connections(self) -> None:
self.log.warn('Killing all connections and resetting entrypoints...')
self.disconnect_all_peers(force=True)
self.my_peer.reload_entrypoints_from_source_file()

def get_peers_whitelist(self) -> list[PeerId]:
assert self.manager is not None
return self.manager.peers_whitelist

def get_verified_peers(self) -> Iterable[PublicPeer]:
return self.verified_peer_storage.values()

def get_randbytes(self, n: int) -> bytes:
return self.rng.randbytes(n)

def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
assert self.manager is not None
return peer_id in self.manager.peers_whitelist
32 changes: 31 additions & 1 deletion hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time
from enum import Enum
from typing import TYPE_CHECKING, Any, Coroutine, Generator, Optional, cast
from typing import TYPE_CHECKING, Any, Coroutine, Generator, Iterable, Optional, cast

from structlog import get_logger
from twisted.internet.defer import Deferred
Expand All @@ -33,6 +33,7 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import format_address
from hathor.profiler import get_cpu_profiler
from hathor.transaction import BaseTransaction

if TYPE_CHECKING:
from hathor.manager import HathorManager # noqa: F401
Expand Down Expand Up @@ -390,6 +391,35 @@ def disable_sync(self) -> None:
self.log.info('disable sync')
self.state.sync_agent.disable_sync()

def is_synced(self) -> bool:
assert self.state is not None
assert isinstance(self.state, ReadyState)
return self.state.is_synced()

def send_tx_to_peer(self, tx: BaseTransaction) -> None:
assert self.state is not None
assert isinstance(self.state, ReadyState)
return self.state.send_tx_to_peer(tx)

def get_peer(self) -> PublicPeer:
return self.peer

def get_peer_if_set(self) -> PublicPeer | None:
return self._peer

def send_peers(self, peers: Iterable[PublicPeer]) -> None:
assert isinstance(self.state, ReadyState)
self.state.send_peers(peers)

def get_entrypoint(self) -> Entrypoint | None:
return self.entrypoint

def is_inbound(self) -> bool:
return self.inbound

def get_metrics(self) -> 'ConnectionMetrics':
return self.metrics


class HathorLineReceiver(LineReceiver, HathorProtocol):
""" Implements HathorProtocol in a LineReceiver protocol.
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.verified_peer_storage.values():
for peer in self.protocol.connections.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand Down Expand Up @@ -195,8 +195,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
rng = self.protocol.connections.rng
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
Loading

0 comments on commit 5d0d2fb

Please sign in to comment.