Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco committed Oct 23, 2024
1 parent cc3bba3 commit 8127eaa
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 1 deletion.
2 changes: 1 addition & 1 deletion hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
self.log.info(
'hathor-core v{hathor}',
hathor=hathor.__version__,
pid=os.getpid(),
main_pid=os.getpid(),
genesis=get_genesis_short_hash(),
my_peer_id=str(peer.id),
python=python,
Expand Down
Empty file added hathor/multiprocess/__init__.py
Empty file.
89 changes: 89 additions & 0 deletions hathor/multiprocess/connect_on_subprocess.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
from pathlib import Path

from structlog import get_logger
from twisted.internet import tcp
from twisted.internet.interfaces import IAddress, ITransport
from twisted.internet.protocol import Protocol, ServerFactory

from hathor.multiprocess.subprocess_protocol import SubprocessProtocol
from hathor.reactor import ReactorProtocol

logger = get_logger()


class ConnectOnSubprocessProtocol(Protocol):
__slots__ = ('log', 'reactor', '_main_file', '_addr')

def __init__(self, *, reactor: ReactorProtocol, main_file: Path, addr: IAddress) -> None:
self.log = logger.new(addr=addr)
self.reactor = reactor
self._main_file = main_file
self._addr = addr

def makeConnection(self, transport: ITransport) -> None:
assert isinstance(transport, tcp.Server)
assert self._addr == transport.getPeer()

fileno = transport.fileno()
self.log.info('spawning new subprocess for connection', fileno=fileno, main_pid=os.getpid())

# - We spawn the new subprocess by running python on self._main_file using the same python executable
# as us (the main process).
# - We pass the addr and fileno of the connection to argv.
# - We execute with the same env vars and working path from us (the main process).
# - We configure direct mapping to the following file descriptors: stdout (1), stderr (2), and fileno.
subprocess_transport = self.reactor.spawnProcess(
processProtocol=SubprocessProtocol(addr=self._addr),
executable=sys.executable,
args=[
sys.executable,
str(self._main_file.absolute()),
str(self._addr),
str(fileno),
],
env=os.environ,
path=os.getcwd(),
childFDs={1: 1, 2: 2, fileno: fileno},
)

# Just after spawning the subprocess, the socket associated with the connection is made available in the
# subprocess through its file descriptor. We must close it here as we (the main process) must never read
# from it.
transport.socket.close()

self.log.info(
'spawned subprocess for connection',
fileno=fileno,
subprocess_pid=subprocess_transport.pid,
)

def dataReceived(self, data: bytes) -> None:
self.log.error('subprocess data received on the main process', addr=self._addr, data=data)
raise AssertionError('ConnectOnSubprocessProtocol.dataReceived should never be called!')


class ConnectOnSubprocessFactory(ServerFactory):
__slots__ = ('reactor', '_main_file')

def __init__(self, *, reactor: ReactorProtocol, main_file: Path) -> None:
self.reactor = reactor
self._main_file = main_file

def buildProtocol(self, addr: IAddress) -> Protocol | None:
return ConnectOnSubprocessProtocol(reactor=self.reactor, main_file=self._main_file, addr=addr)
55 changes: 55 additions & 0 deletions hathor/multiprocess/main_subprocess_runner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
from socket import AF_INET
from typing import Callable

from structlog import get_logger
from twisted.internet.protocol import Factory

from hathor.cli.util import LoggingOptions, LoggingOutput, setup_logging
from hathor.multiprocess.subprocess_wrapper import SubprocessWrappingFactory
from hathor.reactor import ReactorProtocol, initialize_global_reactor

logger = get_logger()


def main_subprocess_runner(factory: Callable[[ReactorProtocol, str], Factory]) -> None:
# TODO: Correctly initialize log config
setup_logging(
logging_output=LoggingOutput.PRETTY,
logging_options=LoggingOptions(debug=False, sentry=False)
)

_, addr, fileno_str = sys.argv
fileno = int(fileno_str)
log = logger.new(addr=addr, fileno=fileno, subprocess_pid=os.getpid())
log.debug('running subprocess for connection')

reactor = initialize_global_reactor()
wrapping_factory = SubprocessWrappingFactory(
reactor=reactor,
addr_str=addr,
wrapped_factory=factory(reactor, addr),
)

reactor.callWhenRunning(
callable=reactor.adoptStreamConnection,
fileDescriptor=fileno,
addressFamily=AF_INET,
factory=wrapping_factory,
)
reactor.run()
61 changes: 61 additions & 0 deletions hathor/multiprocess/subprocess_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import signal

from structlog import get_logger
from twisted.internet.error import ProcessExitedAlready
from twisted.internet.interfaces import IAddress
from twisted.internet.protocol import ProcessProtocol
from twisted.python.failure import Failure

from hathor.multiprocess.utils import log_connection_closed

logger = get_logger()


class SubprocessProtocol(ProcessProtocol):
__slots__ = ('log', '_addr')

def __init__(self, *, addr: IAddress) -> None:
self.log = logger.new(addr=addr)
self._addr = addr

def connectionMade(self) -> None:
assert self.transport and self.transport.pid is not None
self.log = self.log.bind(subprocess_pid=self.transport.pid)
self.log.debug('subprocess connection made')
# TODO: Setup RPC here? And then ping/wait for ping?

def childDataReceived(self, childFD: int, data: bytes) -> None:
self.log.error(
'subprocess data received through pipes',
childFD=childFD,
data=data,
)
raise AssertionError('SubprocessProtocol.childDataReceived should never be called!')

def childConnectionLost(self, childFD: int) -> None:
assert self.transport is not None
self.log.error(
'subprocess pipe unexpectedly closed, terminating...',
childFD=childFD,
)
try:
self.transport.signalProcess(signal.SIGTERM)
except ProcessExitedAlready:
pass

def processExited(self, reason: Failure) -> None:
log_connection_closed(log=self.log, reason=reason, message='subprocess exited')
79 changes: 79 additions & 0 deletions hathor/multiprocess/subprocess_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

from structlog import get_logger
from twisted.internet.interfaces import IAddress, IProtocol
from twisted.internet.protocol import Factory, Protocol
from twisted.protocols.policies import ProtocolWrapper, WrappingFactory
from twisted.python.failure import Failure

from hathor.multiprocess.utils import log_connection_closed
from hathor.reactor import ReactorProtocol

logger = get_logger()


class SubprocessProtocolWrapper(ProtocolWrapper):
__slots__ = ('log', 'reactor' '_addr_str')

def __init__(
self,
*,
reactor: ReactorProtocol,
factory: WrappingFactory,
addr_str: str,
wrapped_protocol: IProtocol,
) -> None:
super().__init__(factory, wrapped_protocol)
self.log = logger.new(addr=addr_str, subprocess_pid=os.getpid())
self.reactor = reactor
self._addr_str = addr_str

def connectionMade(self) -> None:
self.log.debug('subprocess connection made')
super().connectionMade()

def dataReceived(self, data: bytes) -> None:
self.log.debug('data received', data=data)
super().dataReceived(data)

def connectionLost(self, reason: Failure) -> None: # type: ignore[override]
super().connectionLost(reason)
if not self.reactor.running:
return

log_connection_closed(log=self.log, reason=reason, message='connection lost, stopping subprocess reactor')
self.reactor.stop()


class SubprocessWrappingFactory(WrappingFactory):
__slots__ = ('log', 'reactor', '_addr_str')

def __init__(self, *, reactor: ReactorProtocol, addr_str: str, wrapped_factory: Factory) -> None:
super().__init__(wrapped_factory)
self.log = logger.new(addr=addr_str, subprocess_pid=os.getpid())
self.reactor = reactor
self._addr_str = addr_str

def buildProtocol(self, addr: IAddress) -> Protocol | None:
assert self._addr_str == str(addr)
self.log.debug('building protocol for subprocess wrapper')
wrapped_protocol = self.wrappedFactory.buildProtocol(addr)
return SubprocessProtocolWrapper(
reactor=self.reactor,
factory=self,
addr_str=self._addr_str,
wrapped_protocol=wrapped_protocol,
)
27 changes: 27 additions & 0 deletions hathor/multiprocess/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Copyright 2024 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from structlog import BoundLogger
from twisted.internet.error import ConnectionDone, ConnectionLost
from twisted.python.failure import Failure


def log_connection_closed(*, log: BoundLogger, reason: Failure, message: str) -> None:
if isinstance(reason.value, ConnectionDone):
log_func = log.info
else:
assert isinstance(reason.value, ConnectionLost)
log_func = log.error

log_func(message, reason=reason.getErrorMessage())

0 comments on commit 8127eaa

Please sign in to comment.