Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(p2p): minor pre-IPC refactors [part 5/11] #1165

Open
wants to merge 2 commits into
base: refactor/remove-vertex-storage-protocol
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion hathor/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ def build(self) -> BuildArtifacts:
vertex_handler = self._get_or_create_vertex_handler()
vertex_parser = self._get_or_create_vertex_parser()
poa_block_producer = self._get_or_create_poa_block_producer()
capabilities = self._get_or_create_capabilities()

if self._enable_address_index:
indexes.enable_address_index(pubsub)
Expand Down Expand Up @@ -266,7 +267,7 @@ def build(self) -> BuildArtifacts:
wallet=wallet,
rng=self._rng,
checkpoints=self._checkpoints,
capabilities=self._capabilities,
capabilities=capabilities,
environment_info=get_environment_info(self._cmdline, str(peer.id)),
bit_signaling_service=bit_signaling_service,
verification_service=verification_service,
Expand Down Expand Up @@ -647,6 +648,13 @@ def _get_or_create_poa_block_producer(self) -> PoaBlockProducer | None:

return self._poa_block_producer

def _get_or_create_capabilities(self) -> list[str]:
if self._capabilities is None:
settings = self._get_or_create_settings()
self._capabilities = settings.get_default_capabilities()

return self._capabilities

def use_memory(self) -> 'Builder':
self.check_if_can_modify()
self._storage_type = StorageType.MEMORY
Expand Down
2 changes: 2 additions & 0 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
)

cpu_mining_service = CpuMiningService()
capabilities = settings.get_default_capabilities()

p2p_manager = ConnectionsManager(
settings=settings,
Expand Down Expand Up @@ -386,6 +387,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
vertex_handler=vertex_handler,
vertex_parser=vertex_parser,
poa_block_producer=poa_block_producer,
capabilities=capabilities,
)

if self._args.x_ipython_kernel:
Expand Down
9 changes: 9 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,15 @@ def from_yaml(cls, *, filepath: str) -> 'HathorSettings':
validators=_VALIDATORS
)

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities."""
return [
self.CAPABILITY_WHITELIST,
self.CAPABILITY_SYNC_VERSION,
self.CAPABILITY_GET_BEST_BLOCKCHAIN,
self.CAPABILITY_IPV6,
]


def _parse_checkpoints(checkpoints: Union[dict[int, str], list[Checkpoint]]) -> list[Checkpoint]:
"""Parse a dictionary of raw checkpoint data into a list of checkpoints."""
Expand Down
16 changes: 2 additions & 14 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,9 @@ def __init__(
execution_manager: ExecutionManager,
vertex_handler: VertexHandler,
vertex_parser: VertexParser,
capabilities: list[str],
hostname: Optional[str] = None,
wallet: Optional[BaseWallet] = None,
capabilities: Optional[list[str]] = None,
checkpoints: Optional[list[Checkpoint]] = None,
rng: Optional[Random] = None,
environment_info: Optional[EnvironmentInfo] = None,
Expand Down Expand Up @@ -231,10 +231,7 @@ def __init__(
self.peers_whitelist: list[PeerId] = []

# List of capabilities of the peer
if capabilities is not None:
self.capabilities = capabilities
else:
self.capabilities = self.get_default_capabilities()
self.capabilities = capabilities

# This is included in some logs to provide more context
self.environment_info = environment_info
Expand All @@ -246,15 +243,6 @@ def __init__(
self.lc_check_sync_state.clock = self.reactor
self.lc_check_sync_state_interval = self.CHECK_SYNC_STATE_INTERVAL

def get_default_capabilities(self) -> list[str]:
"""Return the default capabilities for this manager."""
return [
self._settings.CAPABILITY_WHITELIST,
self._settings.CAPABILITY_SYNC_VERSION,
self._settings.CAPABILITY_GET_BEST_BLOCKCHAIN,
self._settings.CAPABILITY_IPV6,
]

def start(self) -> None:
""" A factory must be started only once. And it is usually automatically started.
"""
Expand Down
20 changes: 11 additions & 9 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,23 +248,25 @@ def collect_peer_connection_metrics(self) -> None:
self.peer_connection_metrics.clear()

for connection in self.connections.get_connected_peers():
if not connection._peer:
peer = connection.get_peer_if_set()
if not peer:
# A connection without peer will not be able to communicate
# So we can just discard it for the sake of the metrics
continue

metrics = connection.get_metrics()
metric = PeerConnectionMetrics(
connection_string=str(connection.addr),
peer_id=str(connection.peer.id),
network=settings.NETWORK_NAME,
received_messages=connection.metrics.received_messages,
sent_messages=connection.metrics.sent_messages,
received_bytes=connection.metrics.received_bytes,
sent_bytes=connection.metrics.sent_bytes,
received_txs=connection.metrics.received_txs,
discarded_txs=connection.metrics.discarded_txs,
received_blocks=connection.metrics.received_blocks,
discarded_blocks=connection.metrics.discarded_blocks,
received_messages=metrics.received_messages,
sent_messages=metrics.sent_messages,
received_bytes=metrics.received_bytes,
sent_bytes=metrics.sent_bytes,
received_txs=metrics.received_txs,
discarded_txs=metrics.discarded_txs,
received_blocks=metrics.received_blocks,
discarded_blocks=metrics.discarded_blocks,
)

self.peer_connection_metrics.append(metric)
Expand Down
33 changes: 21 additions & 12 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,9 +330,7 @@ def has_synced_peer(self) -> bool:
"""
connections = list(self.iter_ready_connections())
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
if conn.state.is_synced():
if conn.is_synced():
return True
return False

Expand All @@ -349,9 +347,7 @@ def send_tx_to_peers(self, tx: BaseTransaction) -> None:
connections = list(self.iter_ready_connections())
self.rng.shuffle(connections)
for conn in connections:
assert conn.state is not None
assert isinstance(conn.state, ReadyState)
conn.state.send_tx_to_peer(tx)
conn.send_tx_to_peer(tx)

def disconnect_all_peers(self, *, force: bool = False) -> None:
"""Disconnect all peers."""
Expand Down Expand Up @@ -416,10 +412,9 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
def relay_peer_to_ready_connections(self, peer: PublicPeer) -> None:
"""Relay peer to all ready connections."""
for conn in self.iter_ready_connections():
if conn.peer == peer:
if conn.get_peer() == peer:
continue
assert isinstance(conn.state, ReadyState)
conn.state.send_peers([peer])
conn.send_peers([peer])

def on_handshake_disconnect(self, *, addr: PeerAddress) -> None:
"""Called when a peer disconnects from a handshaking state (HELLO or PEER-ID)."""
Expand Down Expand Up @@ -694,9 +689,9 @@ def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Add
def drop_connection(self, protocol: HathorProtocol) -> None:
""" Drop a connection
"""
assert protocol.peer is not None
self.log.debug('dropping connection', peer_id=protocol.peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection dropped')
protocol_peer = protocol.get_peer()
self.log.debug('dropping connection', peer_id=protocol_peer.id, protocol=type(protocol).__name__)
protocol.send_error_and_close_connection('Connection droped')

def drop_connection_by_peer_id(self, peer_id: PeerId) -> None:
""" Drop a connection by peer id
Expand Down Expand Up @@ -795,3 +790,17 @@ def reload_entrypoints_and_connections(self) -> None:
self.log.warn('Killing all connections and resetting entrypoints...')
self.disconnect_all_peers(force=True)
self.my_peer.reload_entrypoints_from_source_file()

def get_peers_whitelist(self) -> list[PeerId]:
assert self.manager is not None
return self.manager.peers_whitelist

def get_verified_peers(self) -> Iterable[PublicPeer]:
return self.verified_peer_storage.values()

def get_randbytes(self, n: int) -> bytes:
return self.rng.randbytes(n)

def is_peer_whitelisted(self, peer_id: PeerId) -> bool:
assert self.manager is not None
return peer_id in self.manager.peers_whitelist
24 changes: 23 additions & 1 deletion hathor/p2p/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import time
from enum import Enum
from typing import TYPE_CHECKING, Optional, cast
from typing import TYPE_CHECKING, Iterable, Optional, cast

from structlog import get_logger
from twisted.internet import defer
Expand All @@ -34,6 +34,7 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.p2p.utils import format_address
from hathor.profiler import get_cpu_profiler
from hathor.transaction import BaseTransaction

if TYPE_CHECKING:
from hathor.manager import HathorManager # noqa: F401
Expand Down Expand Up @@ -410,6 +411,27 @@ def disable_sync(self) -> None:
self.log.info('disable sync')
self.state.sync_agent.disable_sync()

def is_synced(self) -> bool:
assert isinstance(self.state, ReadyState)
return self.state.is_synced()

def send_tx_to_peer(self, tx: BaseTransaction) -> None:
assert isinstance(self.state, ReadyState)
return self.state.send_tx_to_peer(tx)

def get_peer(self) -> PublicPeer:
return self.peer

def get_peer_if_set(self) -> PublicPeer | None:
return self._peer

def send_peers(self, peers: Iterable[PublicPeer]) -> None:
assert isinstance(self.state, ReadyState)
self.state.send_peers(peers)

def get_metrics(self) -> 'ConnectionMetrics':
return self.metrics


class HathorLineReceiver(LineReceiver, HathorProtocol):
""" Implements HathorProtocol in a LineReceiver protocol.
Expand Down
2 changes: 1 addition & 1 deletion hathor/p2p/states/peer_id.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def _should_block_peer(self, peer_id: PeerId) -> bool:

Currently this is only because the peer is not in a whitelist and whitelist blocking is active.
"""
peer_is_whitelisted = peer_id in self.protocol.node.peers_whitelist
peer_is_whitelisted = self.protocol.connections.is_peer_whitelisted(peer_id)
# never block whitelisted peers
if peer_is_whitelisted:
return False
Expand Down
5 changes: 2 additions & 3 deletions hathor/p2p/states/ready.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def handle_get_peers(self, payload: str) -> None:
""" Executed when a GET-PEERS command is received. It just responds with
a list of all known peers.
"""
for peer in self.protocol.connections.verified_peer_storage.values():
for peer in self.protocol.connections.get_verified_peers():
self.send_peers([peer])

def send_peers(self, peer_list: Iterable[PublicPeer]) -> None:
Expand Down Expand Up @@ -206,8 +206,7 @@ def send_ping(self) -> None:
"""
# Add a salt number to prevent peers from faking rtt.
self.ping_start_time = self.reactor.seconds()
rng = self.protocol.connections.rng
self.ping_salt = rng.randbytes(self.ping_salt_size).hex()
self.ping_salt = self.protocol.connections.get_randbytes(self.ping_salt_size).hex()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems better to simply pass an rng to the protocol. Isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RNG should be the same in the ConnectionsManager and in HathorProtocol, right? At least for simulation purposes.

If that's the case, we can't simply pass the RNG between them because now the protocol will live in a different process. Otherwise, if we can have different RNGs, then yes, it would be simpler to just have its own RNG here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use a "derived" rng instance, where rng in the main process just generates a seed that is used to instantiate the rng in the subprocess.

There's really only one reason for using an rng instance instead of a global rng, and that is using seeds to reproduce test cases. It's an important for reproducing rare cases that happen randomly in tests.

Considering there's parallelism, some events can happen in different order regardless of the initial seed, and this will affect how we generate random bytes regardless if we query the main thread for random bytes or generate them locally, so in that sense I think generating them locally is better because it avoids an IPC call. And if we really need to we can focus on avoiding that parallelism in tests or simulating it in a reproducible way.

self.send_message(ProtocolMessages.PING, self.ping_salt)

def send_pong(self, salt: str) -> None:
Expand Down
12 changes: 5 additions & 7 deletions hathor/p2p/sync_v2/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from hathor.p2p.sync_v2.transaction_streaming_client import TransactionStreamingClient
from hathor.reactor import ReactorProtocol as Reactor
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.genesis import is_genesis
from hathor.transaction.storage.exceptions import TransactionDoesNotExist
from hathor.transaction.vertex_parser import VertexParser
from hathor.types import VertexId
Expand Down Expand Up @@ -383,7 +384,7 @@ def run_sync_blocks(self) -> Generator[Any, Any, bool]:
# Not synced but same blockchain?
if self.peer_best_block.height <= my_best_block.height:
# Is peer behind me at the same blockchain?
common_block_hash = self.tx_storage.indexes.height.get(self.peer_best_block.height)
common_block_hash = self.tx_storage.get_block_id_by_height(self.peer_best_block.height)
if common_block_hash == self.peer_best_block.id:
# If yes, nothing to sync from this peer.
if not self.is_synced():
Expand Down Expand Up @@ -461,15 +462,13 @@ def send_get_tips(self) -> None:
def handle_get_tips(self, _payload: str) -> None:
""" Handle a GET-TIPS message.
"""
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.mempool_tips is not None
if self._is_streaming:
self.log.warn('can\'t send while streaming') # XXX: or can we?
self.send_message(ProtocolMessages.MEMPOOL_END)
return
self.log.debug('handle_get_tips')
# TODO Use a streaming of tips
for tx_id in self.tx_storage.indexes.mempool_tips.get():
for tx_id in self.tx_storage.get_mempool_tips():
self.send_tips(tx_id)
self.log.debug('tips end')
self.send_message(ProtocolMessages.TIPS_END)
Expand Down Expand Up @@ -645,15 +644,14 @@ def send_get_peer_block_hashes(self, heights: list[int]) -> None:
def handle_get_peer_block_hashes(self, payload: str) -> None:
""" Handle a GET-PEER-BLOCK-HASHES message.
"""
assert self.tx_storage.indexes is not None
heights = json.loads(payload)
if len(heights) > 20:
self.log.info('too many heights', heights_qty=len(heights))
self.protocol.send_error_and_close_connection('GET-PEER-BLOCK-HASHES: too many heights')
return
data = []
for h in heights:
blk_hash = self.tx_storage.indexes.height.get(h)
blk_hash = self.tx_storage.get_block_id_by_height(h)
if blk_hash is None:
break
blk = self.tx_storage.get_transaction(blk_hash)
Expand Down Expand Up @@ -1154,7 +1152,7 @@ def handle_data(self, payload: str) -> None:
return

assert tx is not None
if self.protocol.node.tx_storage.get_genesis(tx.hash):
if is_genesis(tx.hash, settings=self._settings):
# We just got the data of a genesis tx/block. What should we do?
# Will it reduce peer reputation score?
return
Expand Down
1 change: 0 additions & 1 deletion hathor/p2p/sync_v2/blockchain_streaming_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def handle_blocks(self, blk: Block) -> None:
if self.tx_storage.partial_vertex_exists(blk.hash):
# We reached a block we already have. Skip it.
self._blk_repeated += 1
is_duplicated = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you remove this? It seems unrelated to the PR and it would change the behavior of sync-v2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is unused in this context. It's just a local variable that we set here, and the method returns just below.

In fact, in the latest commit (b9b84ea) I completely removed as it's just used to toggle a debug message that would never change.

if self._blk_repeated > self.max_repeated_blocks:
self.log.info('too many repeated block received', total_repeated=self._blk_repeated)
self.fails(TooManyRepeatedVerticesError())
Expand Down
15 changes: 11 additions & 4 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,14 @@ def get_transaction(self, hash_bytes: bytes) -> BaseTransaction:
self.post_get_validation(tx)
return tx

def get_block_by_height(self, height: int) -> Optional[Block]:
"""Return a block in the best blockchain from the height index. This is fast."""
def get_block_id_by_height(self, height: int) -> VertexId | None:
assert self.indexes is not None
ancestor_hash = self.indexes.height.get(height)
return self.indexes.height.get(height)

return None if ancestor_hash is None else self.get_block(ancestor_hash)
def get_block_by_height(self, height: int) -> Optional[Block]:
"""Return a block in the best blockchain from the height index. This is fast."""
block_id = self.get_block_id_by_height(height)
return None if block_id is None else self.get_block(block_id)

def get_metadata(self, hash_bytes: bytes) -> Optional[TransactionMetadata]:
"""Returns the transaction metadata with hash `hash_bytes`.
Expand Down Expand Up @@ -1146,6 +1148,11 @@ def partial_vertex_exists(self, vertex_id: VertexId) -> bool:
with self.allow_partially_validated_context():
return self.transaction_exists(vertex_id)

def get_mempool_tips(self) -> set[VertexId]:
assert self.indexes is not None
assert self.indexes.mempool_tips is not None
return self.indexes.mempool_tips.get()


class BaseTransactionStorage(TransactionStorage):
indexes: Optional[IndexesManager]
Expand Down
2 changes: 1 addition & 1 deletion tests/p2p/test_get_best_blockchain.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def test_node_without_get_best_blockchain_capability(self) -> None:
protocol2 = connected_peers1[0]
self.assertTrue(protocol2.capabilities.issuperset(set(cababilities_without_get_best_blockchain)))
protocol1 = connected_peers2[0]
default_capabilities = manager2.get_default_capabilities()
default_capabilities = self._settings.get_default_capabilities()
self.assertTrue(protocol1.capabilities.issuperset(set(default_capabilities)))

# assert the peers don't engage in get_best_blockchain messages
Expand Down