Skip to content

Commit

Permalink
refactor(p2p): remove HathorManager and P2PManager dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 23, 2024
1 parent e0fb633 commit b6b19b2
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 73 deletions.
29 changes: 27 additions & 2 deletions hathor/p2p/dependencies/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Protocol
from __future__ import annotations

from typing import TYPE_CHECKING, Iterable, Protocol

from hathor.indexes.height_index import HeightInfo
from hathor.transaction import Block, Vertex
from hathor.transaction import BaseTransaction, Block, Vertex
from hathor.types import VertexId

if TYPE_CHECKING:
from hathor.p2p.peer import PublicPeer, UnverifiedPeer
from hathor.p2p.peer_id import PeerId
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion


class P2PManagerProtocol(Protocol):
"""Abstract the P2PManager as a Python protocol to be used in P2P classes."""

def is_peer_whitelisted(self, peer_id: PeerId) -> bool: ...
def get_enabled_sync_versions(self) -> set[SyncVersion]: ...
def get_sync_factory(self, sync_version: SyncVersion) -> SyncAgentFactory: ...
def get_verified_peers(self) -> Iterable[PublicPeer]: ...
def on_receive_peer(self, peer: UnverifiedPeer) -> None: ...
def on_peer_connect(self, protocol: HathorProtocol) -> None: ...
def on_peer_ready(self, protocol: HathorProtocol) -> None: ...
def on_peer_disconnect(self, protocol: HathorProtocol) -> None: ...
def get_randbytes(self, n: int) -> bytes: ...
def is_peer_connected(self, peer_id: PeerId) -> bool: ...
def send_tx_to_peers(self, tx: BaseTransaction) -> None: ...


class P2PVertexHandlerProtocol(Protocol):
"""Abstract the VertexHandler as a Python protocol to be used in P2P classes."""
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.states.ready import ReadyState
from hathor.p2p.sync_factory import SyncAgentFactory
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import parse_whitelist
Expand Down Expand Up @@ -486,7 +485,7 @@ def is_peer_connected(self, peer_id: PeerId) -> bool:
"""
return peer_id in self.connected_peers

def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None:
def on_receive_peer(self, peer: UnverifiedPeer) -> None:
""" Update a peer information in our storage, and instantly attempt to connect
to it if it is not connected yet.
"""
Expand Down Expand Up @@ -677,7 +676,7 @@ def listen(self, description: str, use_ssl: Optional[bool] = None) -> None:
else:
factory = self.server_factory

factory = NetfilterFactory(self, factory)
factory = NetfilterFactory(factory)

self.log.info('trying to listen on', endpoint=description)
deferred: Deferred[IListeningPort] = endpoint.listen(factory)
Expand Down
5 changes: 1 addition & 4 deletions hathor/p2p/netfilter/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,12 @@
if TYPE_CHECKING:
from twisted.internet.interfaces import IAddress

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.protocol import HathorProtocol


class NetfilterContext:
"""Context sent to the targets when a match occurs."""
def __init__(self, *, connections: Optional['ConnectionsManager'] = None, addr: Optional['IAddress'] = None,
protocol: Optional['HathorProtocol'] = None):
def __init__(self, *, addr: Optional['IAddress'] = None, protocol: Optional['HathorProtocol'] = None):
"""Initialize the context."""
self.addr = addr
self.protocol = protocol
self.connections = connections
9 changes: 2 additions & 7 deletions hathor/p2p/netfilter/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Optional
from typing import Optional

from twisted.internet.interfaces import IAddress, IProtocolFactory
from twisted.internet.protocol import Protocol
Expand All @@ -21,19 +21,14 @@
from hathor.p2p.netfilter import get_table
from hathor.p2p.netfilter.context import NetfilterContext

if TYPE_CHECKING:
from hathor.p2p.manager import ConnectionsManager


class NetfilterFactory(WrappingFactory):
"""Wrapper factory to easily check new connections."""
def __init__(self, connections: 'ConnectionsManager', wrappedFactory: 'IProtocolFactory'):
def __init__(self, wrappedFactory: 'IProtocolFactory'):
super().__init__(wrappedFactory)
self.connections = connections

def buildProtocol(self, addr: IAddress) -> Optional[Protocol]:
context = NetfilterContext(
connections=self.connections,
addr=addr,
)
verdict = get_table('filter').get_chain('pre_conn').process(context)
Expand Down
22 changes: 7 additions & 15 deletions hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from twisted.python.failure import Failure

from hathor.p2p import P2PDependencies
from hathor.p2p.dependencies.protocols import P2PManagerProtocol
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.messages import ProtocolMessages
from hathor.p2p.peer import PrivatePeer, PublicPeer
Expand Down Expand Up @@ -75,8 +76,6 @@ class WarningFlags(str, Enum):
NO_ENTRYPOINTS = 'no_entrypoints'

my_peer: PrivatePeer
connections: 'ConnectionsManager'
node: 'HathorManager'
app_version: str
last_message: float
_peer: Optional[PublicPeer]
Expand All @@ -100,7 +99,7 @@ def peer(self) -> PublicPeer:
def __init__(
self,
my_peer: PrivatePeer,
p2p_manager: 'ConnectionsManager',
p2p_manager: P2PManagerProtocol,
*,
dependencies: P2PDependencies,
use_ssl: bool,
Expand All @@ -109,13 +108,9 @@ def __init__(
self.dependencies = dependencies
self._settings = dependencies.settings
self.my_peer = my_peer
self.connections = p2p_manager
self.p2p_manager: P2PManagerProtocol = p2p_manager

assert p2p_manager.manager is not None
self.node = p2p_manager.manager

assert self.connections.reactor is not None
self.reactor = self.connections.reactor
self.reactor = self.dependencies.reactor

# Indicate whether it is an inbound connection (true) or an outbound connection (false).
self.inbound = inbound
Expand Down Expand Up @@ -253,19 +248,17 @@ def on_connect(self) -> None:
# The initial state is HELLO.
self.change_state(self.PeerState.HELLO)

if self.connections:
self.connections.on_peer_connect(self)
self.p2p_manager.on_peer_connect(self)

def on_outbound_connect(self, entrypoint: Entrypoint) -> None:
"""Called when we successfully establish an outbound connection to a peer."""
# Save the used entrypoint in protocol so we can validate that it matches the entrypoints data
self.entrypoint = entrypoint

def on_peer_ready(self) -> None:
assert self.connections is not None
assert self.peer is not None
self.update_log_context()
self.connections.on_peer_ready(self)
self.p2p_manager.on_peer_ready(self)
self.log.info('peer connected', peer_id=self.peer.id)

def on_disconnect(self, reason: Failure) -> None:
Expand All @@ -283,8 +276,7 @@ def on_disconnect(self, reason: Failure) -> None:
if self.state:
self.state.on_exit()
self.state = None
if self.connections:
self.connections.on_peer_disconnect(self)
self.p2p_manager.on_peer_disconnect(self)

def send_message(self, cmd: ProtocolMessages, payload: Optional[str] = None) -> None:
""" A generic message which must be implemented to send a message
Expand Down
5 changes: 1 addition & 4 deletions hathor/p2p/states/hello.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ def _get_hello_data(self) -> dict[str, Any]:

def _get_sync_versions(self) -> set[SyncVersion]:
"""Shortcut to ConnectionManager.get_enabled_sync_versions"""
connections_manager = self.protocol.connections
assert connections_manager is not None
return connections_manager.get_enabled_sync_versions()
return self.protocol.p2p_manager.get_enabled_sync_versions()

def on_enter(self) -> None:
# After a connection is made, we just send a HELLO message.
Expand Down Expand Up @@ -162,7 +160,6 @@ def handle_hello(self, payload: str) -> None:

context = NetfilterContext(
protocol=protocol,
connections=protocol.connections,
addr=protocol.transport.getPeer(),
)
verdict = get_table('filter').get_chain('post_hello').process(context)
Expand Down
10 changes: 4 additions & 6 deletions hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,9 @@ async def handle_peer_id(self, payload: str) -> None:
protocol.send_error_and_close_connection('Are you my clone?!')
return

if protocol.connections is not None:
if protocol.connections.is_peer_connected(peer.id):
protocol.send_error_and_close_connection('We are already connected.')
return
if self.protocol.p2p_manager.is_peer_connected(peer.id):
protocol.send_error_and_close_connection('We are already connected.')
return

entrypoint_valid = await peer.info.validate_entrypoint(protocol)
if not entrypoint_valid:
Expand All @@ -130,7 +129,6 @@ async def handle_peer_id(self, payload: str) -> None:

context = NetfilterContext(
protocol=protocol,
connections=protocol.connections,
addr=protocol.transport.getPeer(),
)
verdict = get_table('filter').get_chain('post_peerid').process(context)
Expand All @@ -145,7 +143,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:
Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
peer_is_whitelisted = self.protocol.p2p_manager.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
19 changes: 7 additions & 12 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,18 @@ def __init__(self, protocol: 'HathorProtocol', *, dependencies: P2PDependencies)
ProtocolMessages.BEST_BLOCKCHAIN: self.handle_best_blockchain,
})

# Initialize sync manager and add its commands to the list of available commands.
connections = self.protocol.connections
assert connections is not None

# Get the sync factory and create a sync manager from it
# Get the sync factory and create a sync agent from it
sync_version = self.protocol.sync_version
assert sync_version is not None
self.log.debug(f'loading {sync_version}')
sync_factory = connections.get_sync_factory(sync_version)
sync_factory = self.protocol.p2p_manager.get_sync_factory(sync_version)

# Initialize sync agent and add its commands to the list of available commands.
self.sync_agent: SyncAgent = sync_factory.create_sync_agent(self.protocol)
self.cmd_map.update(self.sync_agent.get_cmd_dict())

def on_enter(self) -> None:
if self.protocol.connections:
self.protocol.on_peer_ready()
self.protocol.on_peer_ready()

self.lc_ping.start(1, now=False)

Expand Down Expand Up @@ -155,7 +151,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.get_verified_peers():
for peer in self.protocol.p2p_manager.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand All @@ -175,8 +171,7 @@ def handle_peers(self, payload: str) -> None:
received_peers = json_loads(payload)
for data in received_peers:
peer = UnverifiedPeer.create_from_json(data)
if self.protocol.connections:
self.protocol.connections.on_receive_peer(peer, origin=self)
self.protocol.p2p_manager.on_receive_peer(peer)
self.log.debug('received peers', payload=payload)

def send_ping_if_necessary(self) -> None:
Expand All @@ -195,7 +190,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.p2p_manager.get_randbytes(self.ping_salt_size).hex()
self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
11 changes: 6 additions & 5 deletions hathor/p2p/sync_v1/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from twisted.internet.interfaces import IDelayedCall

from hathor.p2p import P2PDependencies
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.messages import GetNextPayload, GetTipsPayload, NextPayload, ProtocolMessages, TipsPayload
from hathor.p2p.sync_agent import SyncAgent
from hathor.p2p.sync_v1.downloader import Downloader
Expand Down Expand Up @@ -86,9 +87,9 @@ def __init__(
self.tx_storage = self.dependencies.tx_storage

# Rate limit for this connection.
assert protocol.connections is not None
self.global_rate_limiter: 'RateLimiter' = protocol.connections.rate_limiter
self.GlobalRateLimiter = protocol.connections.GlobalRateLimiter
assert isinstance(self.protocol.p2p_manager, ConnectionsManager)
self.global_rate_limiter: 'RateLimiter' = self.protocol.p2p_manager.rate_limiter
self.GlobalRateLimiter = self.protocol.p2p_manager.GlobalRateLimiter

self.call_later_id: Optional[IDelayedCall] = None
self.call_later_interval: int = 1 # seconds
Expand Down Expand Up @@ -639,7 +640,7 @@ def handle_data(self, payload: str) -> None:
# in the network, thus, we propagate it as well.
success = self.dependencies.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.protocol.p2p_manager.send_tx_to_peers(tx)
self.update_received_stats(tx, success)

def update_received_stats(self, tx: 'BaseTransaction', result: bool) -> None:
Expand Down Expand Up @@ -690,7 +691,7 @@ def on_tx_success(self, tx: 'BaseTransaction') -> 'BaseTransaction':
# Add tx to the DAG.
success = self.dependencies.vertex_handler.on_new_vertex(tx)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.protocol.p2p_manager.send_tx_to_peers(tx)
# Updating stats data
self.update_received_stats(tx, success)
return tx
Expand Down
11 changes: 1 addition & 10 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ def handle_not_found(self, payload: str) -> None:
def handle_error(self, payload: str) -> None:
""" Override protocols original handle_error so we can recover a sync in progress.
"""
assert self.protocol.connections is not None
# forward message to overloaded handle_error:
self.protocol.handle_error(payload)

Expand Down Expand Up @@ -320,7 +319,6 @@ def _run_sync(self) -> Generator[Any, Any, None]:
"""
assert not self.receiving_stream
assert not self.mempool_manager.is_running()
assert self.protocol.connections is not None

is_block_synced = yield self.run_sync_blocks()
if is_block_synced:
Expand Down Expand Up @@ -742,7 +740,6 @@ def handle_blocks_end(self, payload: str) -> None:

response_code = StreamEnd(int(payload))
self.receiving_stream = False
assert self.protocol.connections is not None

if self.state is not PeerState.SYNCING_BLOCKS:
self.log.error('unexpected BLOCKS-END', state=self.state, response_code=response_code.name)
Expand All @@ -761,8 +758,6 @@ def handle_blocks(self, payload: str) -> None:
self.protocol.send_error_and_close_connection('Not expecting to receive BLOCK message')
return

assert self.protocol.connections is not None

blk_bytes = base64.b64decode(payload)
blk = self.vertex_parser.deserialize(blk_bytes)
if not isinstance(blk, Block):
Expand Down Expand Up @@ -998,7 +993,6 @@ def handle_transactions_end(self, payload: str) -> None:

response_code = StreamEnd(int(payload))
self.receiving_stream = False
assert self.protocol.connections is not None

if self.state is not PeerState.SYNCING_TRANSACTIONS:
self.log.error('unexpected TRANSACTIONS-END', state=self.state, response_code=response_code.name)
Expand All @@ -1012,9 +1006,6 @@ def handle_transactions_end(self, payload: str) -> None:
def handle_transaction(self, payload: str) -> None:
""" Handle a TRANSACTION message.
"""
assert self.protocol.connections is not None

# tx_bytes = bytes.fromhex(payload)
tx_bytes = base64.b64decode(payload)
tx = self.vertex_parser.deserialize(tx_bytes)
if not isinstance(tx, Transaction):
Expand Down Expand Up @@ -1157,7 +1148,7 @@ def handle_data(self, payload: str) -> None:
try:
success = self.dependencies.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.protocol.connections.send_tx_to_peers(tx)
self.protocol.p2p_manager.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.protocol.send_error_and_close_connection('invalid vertex received')
else:
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/sync_v2/mempool.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def _add_tx(self, tx: BaseTransaction) -> None:
try:
success = self.dependencies.vertex_handler.on_new_vertex(tx, fails_silently=False)
if success:
self.sync_agent.protocol.connections.send_tx_to_peers(tx)
self.sync_agent.protocol.p2p_manager.send_tx_to_peers(tx)
except InvalidNewTransaction:
self.sync_agent.protocol.send_error_and_close_connection('invalid vertex received')
raise
Loading

0 comments on commit b6b19b2

Please sign in to comment.