Skip to content

Commit

Permalink
refactor(p2p): implement P2PConnectionProtocol
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 27, 2024
1 parent b6b19b2 commit 72d3563
Show file tree
Hide file tree
Showing 15 changed files with 123 additions and 161 deletions.
14 changes: 5 additions & 9 deletions hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,26 +63,23 @@ class SyncSupportLevel(IntEnum):
ENABLED = 2 # available and enabled by default, possible to disable at runtime

@classmethod
def add_factories(
def add_versions(
cls,
p2p_manager: ConnectionsManager,
dependencies: P2PDependencies,
sync_v1_support: 'SyncSupportLevel',
sync_v2_support: 'SyncSupportLevel',
) -> None:
"""Adds the sync factory to the manager according to the support level."""
from hathor.p2p.sync_v1.factory import SyncV11Factory
from hathor.p2p.sync_v2.factory import SyncV2Factory
"""Adds the sync version to the manager according to the support level."""
from hathor.p2p.sync_version import SyncVersion

# sync-v1 support:
if sync_v1_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V1_1, SyncV11Factory(dependencies))
p2p_manager.add_sync_version(SyncVersion.V1_1)
if sync_v1_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V1_1)
# sync-v2 support:
if sync_v2_support > cls.UNAVAILABLE:
p2p_manager.add_sync_factory(SyncVersion.V2, SyncV2Factory(dependencies))
p2p_manager.add_sync_version(SyncVersion.V2)
if sync_v2_support is cls.ENABLED:
p2p_manager.enable_sync_version(SyncVersion.V2)

Expand Down Expand Up @@ -427,9 +424,8 @@ def _get_or_create_p2p_manager(self) -> ConnectionsManager:
ssl=enable_ssl,
rng=self._rng,
)
SyncSupportLevel.add_factories(
SyncSupportLevel.add_versions(
self._p2p_manager,
dependencies,
self._sync_v1_support,
self._sync_v2_support,
)
Expand Down
10 changes: 8 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,13 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
log_vertex_bytes=self._args.log_vertex_bytes,
)

if self._args.x_multiprocess_p2p:
self.check_or_raise(
self._args.x_remove_sync_v1,
'multiprocess support for P2P is only available if sync-v1 is removed (use --x-remove-sync-v1)'
)
raise NotImplementedError('Multiprocess support for P2P is not yet implemented.')

p2p_dependencies = P2PDependencies(
reactor=reactor,
settings=settings,
Expand All @@ -351,9 +358,8 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
rng=Random(),
)

SyncSupportLevel.add_factories(
SyncSupportLevel.add_versions(
p2p_manager,
p2p_dependencies,
sync_v1_support,
sync_v2_support,
)
Expand Down
7 changes: 1 addition & 6 deletions hathor/cli/quick_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,11 @@ def create_parser(cls) -> ArgumentParser:
return parser

def prepare(self, *, register_resources: bool = True) -> None:
from hathor.p2p.sync_v2.factory import SyncV2Factory
from hathor.p2p.sync_version import SyncVersion

super().prepare(register_resources=False)
self._no_wait = self._args.no_wait

self.log.info('patching vertex_handler.on_new_vertex to quit on success')
p2p_factory = self.manager.connections.get_sync_factory(SyncVersion.V2)
assert isinstance(p2p_factory, SyncV2Factory)
p2p_factory.dependencies.vertex_handler = VertexHandlerWrapper(
self.manager.connections.dependencies.vertex_handler = VertexHandlerWrapper(
self.manager.vertex_handler,
self.manager,
self._args.quit_after_n_blocks,
Expand Down
2 changes: 2 additions & 0 deletions hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class RunNode:
('--x-enable-event-queue', lambda args: bool(args.x_enable_event_queue)),
('--x-asyncio-reactor', lambda args: bool(args.x_asyncio_reactor)),
('--x-ipython-kernel', lambda args: bool(args.x_ipython_kernel)),
('--x-multiprocess-p2p', lambda args: bool(args.x_multiprocess_p2p)),
]

env_vars_prefix: str | None = None
Expand Down Expand Up @@ -162,6 +163,7 @@ def create_parser(cls) -> ArgumentParser:
help='Log tx bytes for debugging')
parser.add_argument('--disable-ws-history-streaming', action='store_true',
help='Disable websocket history streaming API')
parser.add_argument('--x-multiprocess-p2p', action='store_true', help='Enable multiprocess support for P2P.')
return parser

def prepare(self, *, register_resources: bool = True) -> None:
Expand Down
1 change: 1 addition & 0 deletions hathor/cli/run_node_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ class RunNodeArgs(BaseModel, extra=Extra.allow):
nano_testnet: bool
log_vertex_bytes: bool
disable_ws_history_streaming: bool
x_multiprocess_p2p: bool
29 changes: 23 additions & 6 deletions hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,41 @@
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.peer import PublicPeer, UnverifiedPeer
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.protocol import ConnectionMetrics
from hathor.p2p.sync_version import SyncVersion


class P2PConnectionProtocol(Protocol):
"""Abstract HathorProtocol as a Python protocol to be used in P2PManager."""

def is_synced(self) -> bool: ...
def send_tx_to_peer(self, tx: BaseTransaction) -> None: ...
def disconnect(self, reason: str = '', *, force: bool = False) -> None: ...
def get_peer(self) -> PublicPeer: ...
def get_peer_if_set(self) -> PublicPeer | None: ...
def get_entrypoint(self) -> Entrypoint | None: ...
def enable_sync(self) -> None: ...
def disable_sync(self) -> None: ...
def is_sync_enabled(self) -> bool: ...
def send_peers(self, peers: Iterable[PublicPeer]) -> None: ...
def is_inbound(self) -> bool: ...
def send_error_and_close_connection(self, msg: str) -> None: ...
def get_metrics(self) -> ConnectionMetrics: ...


class P2PManagerProtocol(Protocol):
"""Abstract the P2PManager as a Python protocol to be used in P2P classes."""

def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ...
def get_enabled_sync_versions(self) -> set[SyncVersion]: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ...
def get_verified_peers(self) -> Iterable[PublicPeer]: ...
def on_receive_peer(self, peer: UnverifiedPeer) -> None: ...
def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_peer_disconnect(self, protocol: HathorProtocol) -> None: ...
def on_peer_connect(self, protocol: P2PConnectionProtocol) -> None: ...
def on_peer_ready(self, protocol: P2PConnectionProtocol) -> None: ...
def on_peer_disconnect(self, protocol: P2PConnectionProtocol) -> None: ...
def get_randbytes(self, n: int) -> bytes: ...
def is_peer_connected(self, peer_id: PeerId) -> bool: ...
def send_tx_to_peers(self, tx: BaseTransaction) -> None: ...
Expand Down
59 changes: 32 additions & 27 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Iterable, NamedTuple, Optional

from structlog import get_logger
Expand All @@ -25,15 +27,15 @@
from twisted.web.client import Agent

from hathor.p2p import P2PDependencies
from hathor.p2p.dependencies.protocols import P2PConnectionProtocol
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_v1.downloader import Downloader
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import parse_whitelist
from hathor.pubsub import HathorEvents, PubSubManager
Expand All @@ -42,6 +44,7 @@

if TYPE_CHECKING:
from hathor.manager import HathorManager
from hathor.p2p.protocol import HathorProtocol

logger = get_logger()

Expand Down Expand Up @@ -77,13 +80,12 @@ class GlobalRateLimiter:
SEND_TIPS = 'NodeSyncTimestamp.send_tips'

manager: Optional['HathorManager']
connections: set[HathorProtocol]
connected_peers: dict[PeerId, HathorProtocol]
connections: set[P2PConnectionProtocol]
connected_peers: dict[PeerId, P2PConnectionProtocol]
connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
handshaking_peers: set[HathorProtocol]
handshaking_peers: set[P2PConnectionProtocol]
unverified_peer_storage: UnverifiedPeerStorage
verified_peer_storage: VerifiedPeerStorage
_sync_factories: dict[SyncVersion, SyncAgentFactory]
_enabled_sync_versions: set[SyncVersion]

rate_limiter: RateLimiter
Expand Down Expand Up @@ -193,27 +195,29 @@ def __init__(
self._last_discovery: float = 0.

# sync-manager factories
self._sync_factories = {}
self._available_sync_versions: set[SyncVersion] = set()
self._enabled_sync_versions = set()

# agent to perform HTTP requests
self._http_agent = Agent(self.reactor)

def add_sync_factory(self, sync_version: SyncVersion, sync_factory: SyncAgentFactory) -> None:
"""Add factory for the given sync version, must use a sync version that does not already exist."""
# XXX: to allow code in `set_manager` to safely use the the available sync versions, we add this restriction:
self._sync_v1_downloader: Downloader | None = None

def add_sync_version(self, sync_version: SyncVersion) -> None:
"""Add a sync version, must use one that is not already set."""
# XXX: to allow code in `set_manager` to safely use the available sync versions, we add this restriction:
assert self.manager is None, 'Cannot modify sync factories after a manager is set'
if sync_version in self._sync_factories:
if sync_version in self._available_sync_versions:
raise ValueError('sync version already exists')
self._sync_factories[sync_version] = sync_factory
self._available_sync_versions.add(sync_version)

def get_available_sync_versions(self) -> set[SyncVersion]:
"""What sync versions the manager is capable of using, they are not necessarily enabled."""
return set(self._sync_factories.keys())
return self._available_sync_versions

def is_sync_version_available(self, sync_version: SyncVersion) -> bool:
"""Whether the given sync version is available for use, is not necessarily enabled."""
return sync_version in self._sync_factories
return sync_version in self._available_sync_versions

def get_enabled_sync_versions(self) -> set[SyncVersion]:
"""What sync versions are enabled for use, it is necessarily a subset of the available versions."""
Expand All @@ -225,7 +229,7 @@ def is_sync_version_enabled(self, sync_version: SyncVersion) -> bool:

def enable_sync_version(self, sync_version: SyncVersion) -> None:
"""Enable using the given sync version on new connections, it must be available before being enabled."""
assert sync_version in self._sync_factories
assert sync_version in self._available_sync_versions
if sync_version in self._enabled_sync_versions:
self.log.info('tried to enable a sync verison that was already enabled, nothing to do')
return
Expand All @@ -238,6 +242,12 @@ def disable_sync_version(self, sync_version: SyncVersion) -> None:
return
self._enabled_sync_versions.discard(sync_version)

def get_sync_v1_downloader(self) -> Downloader:
assert self.is_sync_version_enabled(SyncVersion.V1_1)
if self._sync_v1_downloader is None:
self._sync_v1_downloader = Downloader(self.dependencies)
return self._sync_v1_downloader

def set_manager(self, manager: 'HathorManager') -> None:
"""Set the manager. This method must be called before start()."""
if len(self._enabled_sync_versions) == 0:
Expand Down Expand Up @@ -327,11 +337,6 @@ def _get_peers_count(self) -> PeerConnectionsMetrics:
len(self.verified_peer_storage)
)

def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory:
"""Get the sync factory for a given version, MUST be available or it will raise an assert."""
assert sync_version in self._sync_factories, f'sync_version {sync_version} is not available'
return self._sync_factories[sync_version]

def has_synced_peer(self) -> bool:
""" Return whether we are synced to at least one peer.
"""
Expand Down Expand Up @@ -374,7 +379,7 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer
peers_count=self._get_peers_count()
)

def on_peer_connect(self, protocol: HathorProtocol) -> None:
def on_peer_connect(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a new connection is established."""
if len(self.connections) >= self.max_connections:
self.log.warn('reached maximum number of connections', max_connections=self.max_connections)
Expand All @@ -389,7 +394,7 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

def on_peer_ready(self, protocol: HathorProtocol) -> None:
def on_peer_ready(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a peer is ready."""
protocol_peer = protocol.get_peer()
self.verified_peer_storage.add_or_replace(protocol_peer)
Expand Down Expand Up @@ -434,7 +439,7 @@ def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
continue
conn.send_peers([peer])

def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
def on_peer_disconnect(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a peer disconnect."""
self.connections.discard(protocol)
if protocol in self.handshaking_peers:
Expand All @@ -458,12 +463,12 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

def iter_all_connections(self) -> Iterable[HathorProtocol]:
def iter_all_connections(self) -> Iterable[P2PConnectionProtocol]:
"""Iterate over all connections."""
for conn in self.connections:
yield conn

def iter_ready_connections(self) -> Iterable[HathorProtocol]:
def iter_ready_connections(self) -> Iterable[P2PConnectionProtocol]:
"""Iterate over ready connections."""
for conn in self.connected_peers.values():
yield conn
Expand Down Expand Up @@ -711,7 +716,7 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
hostname_entrypoint = Entrypoint.from_hostname_address(hostname, address)
self.my_peer.info.entrypoints.append(hostname_entrypoint)

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
def get_connection_to_drop(self, protocol: P2PConnectionProtocol) -> P2PConnectionProtocol:
""" When there are duplicate connections, determine which one should be dropped.
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2)
Expand All @@ -734,7 +739,7 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
else:
return other_connection

def drop_connection(self, protocol: HathorProtocol) -> None:
def drop_connection(self, protocol: P2PConnectionProtocol) -> None:
""" Drop a connection
"""
protocol_peer = protocol.get_peer()
Expand Down
3 changes: 1 addition & 2 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,9 @@ def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies)
sync_version = self.protocol.sync_version
assert sync_version is not None
self.log.debug(f'loading {sync_version}')
sync_factory = self.protocol.p2p_manager.get_sync_factory(sync_version)

# Initialize sync agent and add its commands to the list of available commands.
self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol)
self.sync_agent = SyncAgent.create(sync_version=sync_version, protocol=protocol, dependencies=dependencies)
self.cmd_map.update(self.sync_agent.get_cmd_dict())

def on_enter(self) -> None:
Expand Down
32 changes: 31 additions & 1 deletion hathor/p2p/sync_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,44 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Callable
from typing import TYPE_CHECKING, Callable

from typing_extensions import assert_never

from hathor.p2p import P2PDependencies
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.sync_version import SyncVersion
from hathor.transaction import BaseTransaction

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol


class SyncAgent(ABC):
@classmethod
def create(
cls,
*,
sync_version: SyncVersion,
protocol: HathorProtocol,
dependencies: P2PDependencies,
) -> SyncAgent:
match sync_version:
case SyncVersion.V1_1:
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.sync_v1.agent import NodeSyncTimestamp
assert isinstance(protocol.p2p_manager, ConnectionsManager)
downloader = protocol.p2p_manager.get_sync_v1_downloader()
return NodeSyncTimestamp(protocol=protocol, dependencies=dependencies, downloader=downloader)
case SyncVersion.V2:
from hathor.p2p.sync_v2.agent import NodeBlockSync
return NodeBlockSync(protocol=protocol, dependencies=dependencies)
case _:
assert_never(sync_version)

@abstractmethod
def is_started(self) -> bool:
"""Whether the manager started running"""
Expand Down
Loading

0 comments on commit 72d3563

Please sign in to comment.