Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 23, 2024
1 parent b6b19b2 commit 2034c11
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 15 deletions.
29 changes: 24 additions & 5 deletions hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,43 @@
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.peer import PublicPeer, UnverifiedPeer
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.protocol import ConnectionMetrics
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion


class P2PConnectionProtocol(Protocol):
"""Abstract HathorProtocol as a Python protocol to be used in P2PManager."""

def is_synced(self) -> bool: ...
def send_tx_to_peer(self, tx: BaseTransaction) -> None: ...
def disconnect(self, reason: str = '', *, force: bool = False) -> None: ...
def get_peer(self) -> PublicPeer: ...
def get_peer_if_set(self) -> PublicPeer | None: ...
def get_entrypoint(self) -> Entrypoint | None: ...
def enable_sync(self) -> None: ...
def disable_sync(self) -> None: ...
def is_sync_enabled(self) -> bool: ...
def send_peers(self, peers: Iterable[PublicPeer]) -> None: ...
def is_inbound(self) -> bool: ...
def send_error_and_close_connection(self, msg: str) -> None: ...
def get_metrics(self) -> ConnectionMetrics: ...


class P2PManagerProtocol(Protocol):
"""Abstract the P2PManager as a Python protocol to be used in P2P classes."""

def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ...
def get_enabled_sync_versions(self) -> set[SyncVersion]: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ... # TODO: We have to remove this for IPC # noqa: E501
def get_verified_peers(self) -> Iterable[PublicPeer]: ...
def on_receive_peer(self, peer: UnverifiedPeer) -> None: ...
def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_peer_disconnect(self, protocol: HathorProtocol) -> None: ...
def on_peer_connect(self, protocol: P2PConnectionProtocol) -> None: ...
def on_peer_ready(self, protocol: P2PConnectionProtocol) -> None: ...
def on_peer_disconnect(self, protocol: P2PConnectionProtocol) -> None: ...
def get_randbytes(self, n: int) -> bytes: ...
def is_peer_connected(self, peer_id: PeerId) -> bool: ...
def send_tx_to_peers(self, tx: BaseTransaction) -> None: ...
Expand Down
21 changes: 11 additions & 10 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from twisted.web.client import Agent

from hathor.p2p import P2PDependencies
from hathor.p2p.dependencies.protocols import P2PConnectionProtocol
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer
Expand Down Expand Up @@ -77,10 +78,10 @@ class GlobalRateLimiter:
SEND_TIPS = 'NodeSyncTimestamp.send_tips'

manager: Optional['HathorManager']
connections: set[HathorProtocol]
connected_peers: dict[PeerId, HathorProtocol]
connections: set[P2PConnectionProtocol]
connected_peers: dict[PeerId, P2PConnectionProtocol]
connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
handshaking_peers: set[HathorProtocol]
handshaking_peers: set[P2PConnectionProtocol]
unverified_peer_storage: UnverifiedPeerStorage
verified_peer_storage: VerifiedPeerStorage
_sync_factories: dict[SyncVersion, SyncAgentFactory]
Expand Down Expand Up @@ -374,7 +375,7 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer
peers_count=self._get_peers_count()
)

def on_peer_connect(self, protocol: HathorProtocol) -> None:
def on_peer_connect(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a new connection is established."""
if len(self.connections) >= self.max_connections:
self.log.warn('reached maximum number of connections', max_connections=self.max_connections)
Expand All @@ -389,7 +390,7 @@ def on_peer_connect(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

def on_peer_ready(self, protocol: HathorProtocol) -> None:
def on_peer_ready(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a peer is ready."""
protocol_peer = protocol.get_peer()
self.verified_peer_storage.add_or_replace(protocol_peer)
Expand Down Expand Up @@ -434,7 +435,7 @@ def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
continue
conn.send_peers([peer])

def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
def on_peer_disconnect(self, protocol: P2PConnectionProtocol) -> None:
"""Called when a peer disconnect."""
self.connections.discard(protocol)
if protocol in self.handshaking_peers:
Expand All @@ -458,12 +459,12 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

def iter_all_connections(self) -> Iterable[HathorProtocol]:
def iter_all_connections(self) -> Iterable[P2PConnectionProtocol]:
"""Iterate over all connections."""
for conn in self.connections:
yield conn

def iter_ready_connections(self) -> Iterable[HathorProtocol]:
def iter_ready_connections(self) -> Iterable[P2PConnectionProtocol]:
"""Iterate over ready connections."""
for conn in self.connected_peers.values():
yield conn
Expand Down Expand Up @@ -711,7 +712,7 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
hostname_entrypoint = Entrypoint.from_hostname_address(hostname, address)
self.my_peer.info.entrypoints.append(hostname_entrypoint)

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
def get_connection_to_drop(self, protocol: P2PConnectionProtocol) -> P2PConnectionProtocol:
""" When there are duplicate connections, determine which one should be dropped.
We keep the connection initiated by the peer with larger id. A simple (peer_id1 > peer_id2)
Expand All @@ -734,7 +735,7 @@ def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
else:
return other_connection

def drop_connection(self, protocol: HathorProtocol) -> None:
def drop_connection(self, protocol: P2PConnectionProtocol) -> None:
""" Drop a connection
"""
protocol_peer = protocol.get_peer()
Expand Down
12 changes: 12 additions & 0 deletions tests/p2p/test_get_best_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from hathor.indexes.height_index import HeightInfo
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.resources import StatusResource
from hathor.p2p.states import ReadyState
from hathor.p2p.utils import to_height_info
Expand Down Expand Up @@ -41,6 +42,8 @@ def test_get_best_blockchain(self) -> None:
# HelloState is responsible to transmite to protocol the capabilities
protocol1 = connected_peers2[0]
protocol2 = connected_peers1[0]
assert isinstance(protocol1, HathorProtocol)
assert isinstance(protocol2, HathorProtocol)
self.assertIsNotNone(protocol1.capabilities)
self.assertIsNotNone(protocol2.capabilities)

Expand Down Expand Up @@ -97,12 +100,14 @@ def test_handle_get_best_blockchain(self) -> None:
connected_peers1 = list(manager1.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers1))
protocol2 = connected_peers1[0]
assert isinstance(protocol2, HathorProtocol)
state2 = protocol2.state
assert isinstance(state2, ReadyState)

connected_peers2 = list(manager2.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers2))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
state1 = protocol1.state
assert isinstance(state1, ReadyState)

Expand Down Expand Up @@ -137,6 +142,7 @@ def test_handle_get_best_blockchain(self) -> None:
connected_peers2 = list(manager2.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers2))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
state1 = protocol1.state
assert isinstance(state1, ReadyState)

Expand All @@ -156,12 +162,14 @@ def test_handle_best_blockchain(self) -> None:
connected_peers1 = list(manager1.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers1))
protocol2 = connected_peers1[0]
assert isinstance(protocol2, HathorProtocol)
state2 = protocol2.state
assert isinstance(state2, ReadyState)

connected_peers2 = list(manager2.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers2))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
state1 = protocol1.state
assert isinstance(state1, ReadyState)

Expand Down Expand Up @@ -222,8 +230,10 @@ def test_node_without_get_best_blockchain_capability(self) -> None:

# assert the peers have the proper capabilities
protocol2 = connected_peers1[0]
assert isinstance(protocol2, HathorProtocol)
self.assertTrue(protocol2.capabilities.issuperset(set(capabilities_without_get_best_blockchain)))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
default_capabilities = self._settings.get_default_capabilities()
self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities)))

Expand Down Expand Up @@ -316,12 +326,14 @@ def test_stop_looping_on_exit(self) -> None:
connected_peers1 = list(manager1.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers1))
protocol2 = connected_peers1[0]
assert isinstance(protocol2, HathorProtocol)
state2 = protocol2.state
assert isinstance(state2, ReadyState)

connected_peers2 = list(manager2.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers2))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
state1 = protocol1.state
assert isinstance(state1, ReadyState)

Expand Down
5 changes: 5 additions & 0 deletions tests/p2p/test_sync_rate_limiter.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from twisted.python.failure import Failure

from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.states import ReadyState
from hathor.p2p.sync_v1.agent import NodeSyncTimestamp
from hathor.simulator import FakeConnection
Expand Down Expand Up @@ -34,6 +35,7 @@ def test_sync_rate_limiter(self) -> None:
connected_peers2 = list(manager2.connections.connected_peers.values())
self.assertEqual(1, len(connected_peers2))
protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
assert isinstance(protocol1.state, ReadyState)
sync2 = protocol1.state.sync_agent
assert isinstance(sync2, NodeSyncTimestamp)
Expand Down Expand Up @@ -68,6 +70,7 @@ def test_sync_rate_limiter_disconnect(self) -> None:
self.assertEqual(1, len(connected_peers2))

protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
assert isinstance(protocol1.state, ReadyState)
sync1 = protocol1.state.sync_agent
assert isinstance(sync1, NodeSyncTimestamp)
Expand Down Expand Up @@ -118,6 +121,7 @@ def test_sync_rate_limiter_delayed_calls_draining(self) -> None:
self.assertEqual(1, len(connected_peers2))

protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
assert isinstance(protocol1.state, ReadyState)
sync1 = protocol1.state.sync_agent
assert isinstance(sync1, NodeSyncTimestamp)
Expand Down Expand Up @@ -158,6 +162,7 @@ def test_sync_rate_limiter_delayed_calls_stop(self) -> None:
self.assertEqual(1, len(connected_peers2))

protocol1 = connected_peers2[0]
assert isinstance(protocol1, HathorProtocol)
assert isinstance(protocol1.state, ReadyState)
sync1 = protocol1.state.sync_agent
assert isinstance(sync1, NodeSyncTimestamp)
Expand Down

0 comments on commit 2034c11

Please sign in to comment.