Skip to content

Commit

Permalink
feat(reactor): add IReactorProcess and IReactorSocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Nov 8, 2024
1 parent eee9bb0 commit 72330e8
Show file tree
Hide file tree
Showing 16 changed files with 162 additions and 53 deletions.
53 changes: 53 additions & 0 deletions hathor/reactor/memory_reactor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping, Sequence
from typing import AnyStr

from twisted.internet.interfaces import IProcessProtocol, IProcessTransport
from twisted.internet.task import Clock
from twisted.internet.testing import MemoryReactor as TwistedMemoryReactor


class MemoryReactor(TwistedMemoryReactor):
"""A drop-in replacement for Twisted's own MemoryReactor that adds support for IReactorProcess."""

def run(self) -> None:
"""
We have to override TwistedMemoryReactor.run() because the original Twisted implementation weirdly calls stop()
inside run(), and we need the reactor running during our tests.
"""
self.running = True

def spawnProcess(
self,
processProtocol: IProcessProtocol,
executable: bytes | str,
args: Sequence[bytes | str],
env: Mapping[AnyStr, AnyStr] | None = None,
path: bytes | str | None = None,
uid: int | None = None,
gid: int | None = None,
usePTY: bool = False,
childFDs: Mapping[int, int | str] | None = None,
) -> IProcessTransport:
raise NotImplementedError


class MemoryReactorClock(MemoryReactor, Clock):
"""A drop-in replacement for Twisted's own MemoryReactorClock that adds support for IReactorProcess."""

def __init__(self) -> None:
MemoryReactor.__init__(self)
Clock.__init__(self)
4 changes: 3 additions & 1 deletion hathor/reactor/reactor.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import cast

from structlog import get_logger
from twisted.internet.interfaces import IReactorCore, IReactorTCP, IReactorTime
from twisted.internet.interfaces import IReactorCore, IReactorProcess, IReactorSocket, IReactorTCP, IReactorTime
from zope.interface.verify import verifyObject

from hathor.reactor.reactor_protocol import ReactorProtocol
Expand Down Expand Up @@ -76,6 +76,8 @@ def initialize_global_reactor(*, use_asyncio_reactor: bool = False) -> ReactorPr
assert verifyObject(IReactorTime, twisted_reactor) is True
assert verifyObject(IReactorCore, twisted_reactor) is True
assert verifyObject(IReactorTCP, twisted_reactor) is True
assert verifyObject(IReactorProcess, twisted_reactor) is True
assert verifyObject(IReactorSocket, twisted_reactor) is True

# We cast to ReactorProtocol, our own type that stubs the necessary Twisted zope interfaces, to aid typing.
_reactor = cast(ReactorProtocol, twisted_reactor)
Expand Down
38 changes: 38 additions & 0 deletions hathor/reactor/reactor_process_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from collections.abc import Mapping, Sequence
from typing import AnyStr, Protocol

from twisted.internet.interfaces import IProcessProtocol, IProcessTransport, IReactorProcess
from zope.interface import implementer


@implementer(IReactorProcess)
class ReactorProcessProtocol(Protocol):
"""A Python protocol that stubs Twisted's IReactorProcess interface."""

def spawnProcess(
self,
processProtocol: IProcessProtocol,
executable: bytes | str,
args: Sequence[bytes | str],
env: Mapping[AnyStr, AnyStr] | None = None,
path: bytes | str | None = None,
uid: int | None = None,
gid: int | None = None,
usePTY: bool = False,
childFDs: Mapping[int, int | str] | None = None,
) -> IProcessTransport:
...
7 changes: 5 additions & 2 deletions hathor/reactor/reactor_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
from typing import Protocol

from hathor.reactor.reactor_core_protocol import ReactorCoreProtocol
from hathor.reactor.reactor_process_protocol import ReactorProcessProtocol
from hathor.reactor.reactor_socket_protocol import ReactorSocketProtocol
from hathor.reactor.reactor_tcp_protocol import ReactorTCPProtocol
from hathor.reactor.reactor_time_protocol import ReactorTimeProtocol

Expand All @@ -23,9 +25,10 @@ class ReactorProtocol(
ReactorCoreProtocol,
ReactorTimeProtocol,
ReactorTCPProtocol,
ReactorProcessProtocol,
ReactorSocketProtocol,
Protocol,
):
"""
A Python protocol that represents the intersection of Twisted's IReactorCore+IReactorTime+IReactorTCP interfaces.
A Python protocol that represents an intersection of the Twisted reactor interfaces that we use.
"""
pass
45 changes: 45 additions & 0 deletions hathor/reactor/reactor_socket_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from socket import AddressFamily
from typing import Protocol

from twisted.internet.interfaces import IListeningPort, IReactorSocket
from twisted.internet.protocol import DatagramProtocol, ServerFactory
from zope.interface import implementer


@implementer(IReactorSocket)
class ReactorSocketProtocol(Protocol):
"""A Python protocol that stubs Twisted's IReactorSocket interface."""

def adoptStreamPort(
self,
fileDescriptor: int,
addressFamily: AddressFamily,
factory: ServerFactory,
) -> IListeningPort:
...

def adoptStreamConnection(self, fileDescriptor: int, addressFamily: AddressFamily, factory: ServerFactory) -> None:
...

def adoptDatagramPort(
self,
fileDescriptor: int,
addressFamily: AddressFamily,
protocol: DatagramProtocol,
maxPacketSize: int,
) -> IListeningPort:
...
10 changes: 2 additions & 8 deletions hathor/simulator/clock.py → hathor/simulator/heap_clock.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@

from twisted.internet.base import DelayedCall
from twisted.internet.interfaces import IDelayedCall, IReactorTime
from twisted.internet.testing import MemoryReactor
from zope.interface import implementer

from hathor.reactor.memory_reactor import MemoryReactor


@implementer(IReactorTime)
class HeapClock:
Expand Down Expand Up @@ -94,10 +95,3 @@ class MemoryReactorHeapClock(MemoryReactor, HeapClock):
def __init__(self):
MemoryReactor.__init__(self)
HeapClock.__init__(self)

def run(self):
"""
We have to override MemoryReactor.run() because the original Twisted implementation weirdly calls stop() inside
run(), and we need the reactor running during our tests.
"""
self.running = True
2 changes: 1 addition & 1 deletion hathor/simulator/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from hathor.feature_activation.feature_service import FeatureService
from hathor.manager import HathorManager
from hathor.p2p.peer import PrivatePeer
from hathor.simulator.clock import HeapClock, MemoryReactorHeapClock
from hathor.simulator.heap_clock import HeapClock, MemoryReactorHeapClock
from hathor.simulator.miner.geometric_miner import GeometricMiner
from hathor.simulator.patches import SimulatorCpuMiningService, SimulatorVertexVerifier
from hathor.simulator.tx_generator import RandomTransactionGenerator
Expand Down
4 changes: 2 additions & 2 deletions tests/cli/test_events_simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
from hathor.cli.events_simulator.event_forwarding_websocket_factory import EventForwardingWebsocketFactory
from hathor.cli.events_simulator.events_simulator import create_parser, execute
from hathor.conf.get_settings import get_global_settings
from tests.test_memory_reactor_clock import TestMemoryReactorClock
from hathor.reactor.memory_reactor import MemoryReactorClock


def test_events_simulator() -> None:
parser = create_parser()
args = parser.parse_args(['--scenario', 'ONLY_LOAD'])
reactor = TestMemoryReactorClock()
reactor = MemoryReactorClock()

execute(args, reactor)
reactor.advance(1)
Expand Down
2 changes: 1 addition & 1 deletion tests/event/websocket/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from hathor.event.websocket.factory import EventWebsocketFactory
from hathor.event.websocket.protocol import EventWebsocketProtocol
from hathor.event.websocket.response import EventResponse, InvalidRequestType
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from tests.utils import EventMocker


Expand Down
4 changes: 2 additions & 2 deletions tests/p2p/test_bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint, Protocol
from hathor.p2p.peer_id import PeerId
from hathor.pubsub import PubSubManager
from hathor.reactor.memory_reactor import MemoryReactorClock
from tests import unittest
from tests.test_memory_reactor_clock import TestMemoryReactorClock


class MockPeerDiscovery(PeerDiscovery):
Expand All @@ -30,7 +30,7 @@ async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], Deferr
class MockDNSPeerDiscovery(DNSPeerDiscovery):
def __init__(
self,
reactor: TestMemoryReactorClock,
reactor: MemoryReactorClock,
bootstrap_txt: list[tuple[str, int, str | None]],
bootstrap_a: list[str],
):
Expand Down
8 changes: 4 additions & 4 deletions tests/poa/test_poa_block_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
from hathor.consensus.poa import PoaBlockProducer
from hathor.crypto.util import get_public_key_bytes_compressed
from hathor.manager import HathorManager
from hathor.reactor.memory_reactor import MemoryReactorClock
from hathor.transaction.poa import PoaBlock
from tests.poa.utils import get_settings, get_signer
from tests.test_memory_reactor_clock import TestMemoryReactorClock
from tests.unittest import TestBuilder


def _get_manager(settings: HathorSettings) -> HathorManager:
reactor = TestMemoryReactorClock()
reactor = MemoryReactorClock()
reactor.advance(settings.GENESIS_BLOCK_TIMESTAMP)

artifacts = TestBuilder() \
Expand All @@ -45,7 +45,7 @@ def test_poa_block_producer_one_signer() -> None:
settings = get_settings(signer, time_between_blocks=10)
manager = _get_manager(settings)
reactor = manager.reactor
assert isinstance(reactor, TestMemoryReactorClock)
assert isinstance(reactor, MemoryReactorClock)
manager = Mock(wraps=manager)
producer = PoaBlockProducer(settings=settings, reactor=reactor, poa_signer=signer)
producer.manager = manager
Expand Down Expand Up @@ -103,7 +103,7 @@ def test_poa_block_producer_two_signers() -> None:
settings = get_settings(signer1, signer2, time_between_blocks=10)
manager = _get_manager(settings)
reactor = manager.reactor
assert isinstance(reactor, TestMemoryReactorClock)
assert isinstance(reactor, MemoryReactorClock)
manager = Mock(wraps=manager)
producer = PoaBlockProducer(settings=settings, reactor=reactor, poa_signer=signer1)
producer.manager = manager
Expand Down
2 changes: 1 addition & 1 deletion tests/pubsub/test_pubsub2.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
from unittest.mock import Mock, patch

import pytest
from twisted.internet.testing import MemoryReactorClock

from hathor.pubsub import HathorEvents, PubSubManager
from hathor.reactor.memory_reactor import MemoryReactorClock


@pytest.mark.parametrize('is_in_main_thread', [False, True])
Expand Down
26 changes: 0 additions & 26 deletions tests/test_memory_reactor_clock.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/tx/test_merged_mining.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async def test_coordinator(self):
from cryptography.hazmat.primitives.asymmetric import ec

from hathor.crypto.util import get_address_b58_from_public_key
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock

super().setUp()
self.manager = self.create_peer('testnet')
Expand Down
2 changes: 1 addition & 1 deletion tests/tx/test_stratum.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import pytest
from twisted.internet.testing import StringTransportWithDisconnection

from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from hathor.stratum import (
INVALID_PARAMS,
INVALID_REQUEST,
Expand Down
6 changes: 3 additions & 3 deletions tests/unittest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
from hathor.p2p.sync_version import SyncVersion
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor, get_global_reactor
from hathor.simulator.clock import MemoryReactorHeapClock
from hathor.reactor.memory_reactor import MemoryReactorClock
from hathor.simulator.heap_clock import MemoryReactorHeapClock
from hathor.transaction import BaseTransaction, Block, Transaction
from hathor.transaction.storage.transaction_storage import TransactionStorage
from hathor.types import VertexId
from hathor.util import Random, not_none
from hathor.wallet import BaseWallet, HDWallet, Wallet
from tests.test_memory_reactor_clock import TestMemoryReactorClock

logger = get_logger()
main = ut_main
Expand Down Expand Up @@ -115,7 +115,7 @@ class TestCase(unittest.TestCase):

def setUp(self) -> None:
self.tmpdirs: list[str] = []
self.clock = TestMemoryReactorClock()
self.clock = MemoryReactorClock()
self.clock.advance(time.time())
self.reactor = self.clock
self.log = logger.new()
Expand Down

0 comments on commit 72330e8

Please sign in to comment.