Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(reactor): add IReactorProcess and IReactorSocket support [part 3/11] #1163

Open
wants to merge 1 commit into
base: refactor/p2p/peer-connections
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions hathor/reactor/memory_reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping, Sequence
from typing import AnyStr

from twisted.internet.interfaces import IProcessProtocol, IProcessTransport
from twisted.internet.task import Clock
from twisted.internet.testing import MemoryReactor as TwistedMemoryReactor


class MemoryReactor(TwistedMemoryReactor):
"""A drop-in replacement for Twisted's own MemoryReactor that adds support for IReactorProcess."""

def run(self) -> None:
"""
We have to override TwistedMemoryReactor.run() because the original Twisted implementation weirdly calls stop()
inside run(), and we need the reactor running during our tests.
"""
self.running = True

def spawnProcess(
self,
processProtocol: IProcessProtocol,
executable: bytes | str,
args: Sequence[bytes | str],
env: Mapping[AnyStr, AnyStr] | None = None,
path: bytes | str | None = None,
uid: int | None = None,
gid: int | None = None,
usePTY: bool = False,
childFDs: Mapping[int, int | str] | None = None,
) -> IProcessTransport:
raise NotImplementedError


class MemoryReactorClock(MemoryReactor, Clock):
"""A drop-in replacement for Twisted's own MemoryReactorClock that adds support for IReactorProcess."""

def __init__(self) -> None:
MemoryReactor.__init__(self)
Clock.__init__(self)
4 changes: 3 additions & 1 deletion hathor/reactor/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import cast

from structlog import get_logger
from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime
from twisted.internet.interfaces import IReactorCore, IReactorProcess, IReactorSocket, IReactorTCP, IReactorTime
from zope.interface.verify import verifyObject

from hathor.reactor.reactor_protocol import ReactorProtocol
Expand Down Expand Up @@ -76,6 +76,8 @@ def initialize_global_reactor(*, use_asyncio_reactor: bool = False) -> ReactorPr
assert verifyObject(IReactorTime, twisted_reactor) is True
assert verifyObject(IReactorCore, twisted_reactor) is True
assert verifyObject(IReactorTCP, twisted_reactor) is True
assert verifyObject(IReactorProcess, twisted_reactor) is True
assert verifyObject(IReactorSocket, twisted_reactor) is True

# We cast to ReactorProtocol, our own type that stubs the necessary Twisted zope interfaces, to aid typing.
_reactor = cast(ReactorProtocol, twisted_reactor)
Expand Down
38 changes: 38 additions & 0 deletions hathor/reactor/reactor_process_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping, Sequence
from typing import AnyStr, Protocol

from twisted.internet.interfaces import IProcessProtocol, IProcessTransport, IReactorProcess
from zope.interface import implementer


@implementer(IReactorProcess)
class ReactorProcessProtocol(Protocol):
"""A Python protocol that stubs Twisted's IReactorProcess interface."""

def spawnProcess(
self,
processProtocol: IProcessProtocol,
executable: bytes | str,
args: Sequence[bytes | str],
env: Mapping[AnyStr, AnyStr] | None = None,
path: bytes | str | None = None,
uid: int | None = None,
gid: int | None = None,
usePTY: bool = False,
childFDs: Mapping[int, int | str] | None = None,
) -> IProcessTransport:
...
7 changes: 5 additions & 2 deletions hathor/reactor/reactor_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from typing import Protocol

from hathor.reactor.reactor_core_protocol import ReactorCoreProtocol
from hathor.reactor.reactor_process_protocol import ReactorProcessProtocol
from hathor.reactor.reactor_socket_protocol import ReactorSocketProtocol
from hathor.reactor.reactor_tcp_protocol import ReactorTCPProtocol
from hathor.reactor.reactor_time_protocol import ReactorTimeProtocol

Expand All @@ -23,9 +25,10 @@ class ReactorProtocol(
ReactorCoreProtocol,
ReactorTimeProtocol,
ReactorTCPProtocol,
ReactorProcessProtocol,
ReactorSocketProtocol,
Protocol,
):
"""
A Python protocol that represents the intersection of Twisted's IReactorCore+IReactorTime+IReactorTCP interfaces.
A Python protocol that represents an intersection of the Twisted reactor interfaces that we use.
"""
pass
45 changes: 45 additions & 0 deletions hathor/reactor/reactor_socket_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from socket import AddressFamily
from typing import Protocol

from twisted.internet.interfaces import IListeningPort, IReactorSocket
from twisted.internet.protocol import DatagramProtocol, ServerFactory
from zope.interface import implementer


@implementer(IReactorSocket)
class ReactorSocketProtocol(Protocol):
"""A Python protocol that stubs Twisted's IReactorSocket interface."""

def adoptStreamPort(
self,
fileDescriptor: int,
addressFamily: AddressFamily,
factory: ServerFactory,
) -> IListeningPort:
...

def adoptStreamConnection(self, fileDescriptor: int, addressFamily: AddressFamily, factory: ServerFactory) -> None:
...

def adoptDatagramPort(
self,
fileDescriptor: int,
addressFamily: AddressFamily,
protocol: DatagramProtocol,
maxPacketSize: int,
) -> IListeningPort:
...
10 changes: 2 additions & 8 deletions hathor/simulator/clock.py → hathor/simulator/heap_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

from twisted.internet.base import DelayedCall
from twisted.internet.interfaces import IDelayedCall, IReactorTime
from twisted.internet.testing import MemoryReactor
from zope.interface import implementer

from hathor.reactor.memory_reactor import MemoryReactor


@implementer(IReactorTime)
class HeapClock:
Expand Down Expand Up @@ -94,10 +95,3 @@ class MemoryReactorHeapClock(MemoryReactor, HeapClock):
def __init__(self):
MemoryReactor.__init__(self)
HeapClock.__init__(self)

def run(self):
"""
We have to override MemoryReactor.run() because the original Twisted implementation weirdly calls stop() inside
run(), and we need the reactor running during our tests.
"""
self.running = True
2 changes: 1 addition & 1 deletion hathor/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from hathor.feature_activation.feature_service import FeatureService
from hathor.manager import HathorManager
from hathor.p2p.peer import PrivatePeer
from hathor.simulator.clock import HeapClock, MemoryReactorHeapClock
from hathor.simulator.heap_clock import HeapClock, MemoryReactorHeapClock
from hathor.simulator.miner.geometric_miner import GeometricMiner
from hathor.simulator.patches import SimulatorCpuMiningService, SimulatorVertexVerifier
from hathor.simulator.tx_generator import RandomTransactionGenerator
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/test_events_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
from hathor.cli.events_simulator.event_forwarding_websocket_factory import EventForwardingWebsocketFactory
from hathor.cli.events_simulator.events_simulator import create_parser, execute
from hathor.conf.get_settings import get_global_settings
from tests.test_memory_reactor_clock import TestMemoryReactorClock
from hathor.reactor.memory_reactor import MemoryReactorClock


def test_events_simulator() -> None:
parser = create_parser()
args = parser.parse_args(['--scenario', 'ONLY_LOAD'])
reactor = TestMemoryReactorClock()
reactor = MemoryReactorClock()

execute(args, reactor)
reactor.advance(1)
Expand Down
2 changes: 1 addition & 1 deletion tests/event/websocket/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from hathor.event.websocket.factory import EventWebsocketFactory
from hathor.event.websocket.protocol import EventWebsocketProtocol
from hathor.event.websocket.response import EventResponse, InvalidRequestType
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from tests.utils import EventMocker


Expand Down
4 changes: 2 additions & 2 deletions tests/p2p/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint, Protocol
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import PubSubManager
from hathor.reactor.memory_reactor import MemoryReactorClock
from tests import unittest
from tests.test_memory_reactor_clock import TestMemoryReactorClock


class MockPeerDiscovery(PeerDiscovery):
Expand All @@ -30,7 +30,7 @@ async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferr
class MockDNSPeerDiscovery(DNSPeerDiscovery):
def __init__(
self,
reactor: TestMemoryReactorClock,
reactor: MemoryReactorClock,
bootstrap_txt: list[tuple[str, int, str | None]],
bootstrap_a: list[str],
):
Expand Down
8 changes: 4 additions & 4 deletions tests/poa/test_poa_block_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
from hathor.consensus.poa import PoaBlockProducer
from hathor.crypto.util import get_public_key_bytes_compressed
from hathor.manager import HathorManager
from hathor.reactor.memory_reactor import MemoryReactorClock
from hathor.transaction.poa import PoaBlock
from tests.poa.utils import get_settings, get_signer
from tests.test_memory_reactor_clock import TestMemoryReactorClock
from tests.unittest import TestBuilder


def _get_manager(settings: HathorSettings) -> HathorManager:
reactor = TestMemoryReactorClock()
reactor = MemoryReactorClock()
reactor.advance(settings.GENESIS_BLOCK_TIMESTAMP)

artifacts = TestBuilder() \
Expand All @@ -45,7 +45,7 @@ def test_poa_block_producer_one_signer() -> None:
settings = get_settings(signer, time_between_blocks=10)
manager = _get_manager(settings)
reactor = manager.reactor
assert isinstance(reactor, TestMemoryReactorClock)
assert isinstance(reactor, MemoryReactorClock)
manager = Mock(wraps=manager)
producer = PoaBlockProducer(settings=settings, reactor=reactor, poa_signer=signer)
producer.manager = manager
Expand Down Expand Up @@ -103,7 +103,7 @@ def test_poa_block_producer_two_signers() -> None:
settings = get_settings(signer1, signer2, time_between_blocks=10)
manager = _get_manager(settings)
reactor = manager.reactor
assert isinstance(reactor, TestMemoryReactorClock)
assert isinstance(reactor, MemoryReactorClock)
manager = Mock(wraps=manager)
producer = PoaBlockProducer(settings=settings, reactor=reactor, poa_signer=signer1)
producer.manager = manager
Expand Down
2 changes: 1 addition & 1 deletion tests/pubsub/test_pubsub2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from unittest.mock import Mock, patch

import pytest
from twisted.internet.testing import MemoryReactorClock

from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor.memory_reactor import MemoryReactorClock


@pytest.mark.parametrize('is_in_main_thread', [False, True])
Expand Down
26 changes: 0 additions & 26 deletions tests/test_memory_reactor_clock.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/tx/test_merged_mining.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def test_coordinator(self):
from cryptography.hazmat.primitives.asymmetric import ec

from hathor.crypto.util import get_address_b58_from_public_key
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock

super().setUp()
self.manager = self.create_peer('testnet')
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from twisted.internet.testing import StringTransportWithDisconnection

from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from hathor.stratum import (
INVALID_PARAMS,
INVALID_REQUEST,
Expand Down
6 changes: 3 additions & 3 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor, get_global_reactor
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.reactor.memory_reactor import MemoryReactorClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.storage.transaction_storage import TransactionStorage
from hathor.types import VertexId
from hathor.util import Random, not_none
from hathor.wallet import BaseWallet, HDWallet, Wallet
from tests.test_memory_reactor_clock import TestMemoryReactorClock
from tests.utils import GENESIS_SEED

logger = get_logger()
Expand Down Expand Up @@ -117,7 +117,7 @@ class TestCase(unittest.TestCase):

def setUp(self) -> None:
self.tmpdirs: list[str] = []
self.clock = TestMemoryReactorClock()
self.clock = MemoryReactorClock()
self.clock.advance(time.time())
self.reactor = self.clock
self.log = logger.new()
Expand Down