diff --git a/pynvim/api/nvim.py b/pynvim/api/nvim.py index b1b75eb8..b5ea84b7 100644 --- a/pynvim/api/nvim.py +++ b/pynvim/api/nvim.py @@ -1,4 +1,6 @@ """Main Nvim interface.""" + +import asyncio import os import sys import threading @@ -140,7 +142,16 @@ def __init__( self._err_cb: Callable[[str], Any] = lambda _: None else: self._err_cb = err_cb - self.loop = self._session.loop._loop + + @property + def loop(self) -> asyncio.AbstractEventLoop: + """Get the event loop (exposed to rplugins).""" # noqa + + # see #294: for python 3.4+, the only available and guaranteed + # implementation of msgpack_rpc BaseEventLoop is the AsyncioEventLoop. + # The underlying asyncio event loop is exposed to rplugins. + # pylint: disable=protected-access + return self._session.loop._loop # type: ignore def _from_nvim(self, obj: Any, decode: Optional[TDecodeMode] = None) -> Any: if decode is None: diff --git a/pynvim/msgpack_rpc/async_session.py b/pynvim/msgpack_rpc/async_session.py index 333c6cf2..e7766454 100644 --- a/pynvim/msgpack_rpc/async_session.py +++ b/pynvim/msgpack_rpc/async_session.py @@ -1,13 +1,19 @@ """Asynchronous msgpack-rpc handling in the event loop pipeline.""" import logging from traceback import format_exc +from typing import Any, AnyStr, Callable, Dict +from pynvim.msgpack_rpc.msgpack_stream import MsgpackStream logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) -class AsyncSession(object): +# response call back takes two arguments: (err, return_value) +ResponseCallback = Callable[..., None] + + +class AsyncSession: """Asynchronous msgpack-rpc layer that wraps a msgpack stream. @@ -16,11 +22,11 @@ class AsyncSession(object): requests and notifications. """ - def __init__(self, msgpack_stream): + def __init__(self, msgpack_stream: MsgpackStream): """Wrap `msgpack_stream` on a msgpack-rpc interface.""" self._msgpack_stream = msgpack_stream self._next_request_id = 1 - self._pending_requests = {} + self._pending_requests: Dict[int, ResponseCallback] = {} self._request_cb = self._notification_cb = None self._handlers = { 0: self._on_request, @@ -33,7 +39,8 @@ def threadsafe_call(self, fn): """Wrapper around `MsgpackStream.threadsafe_call`.""" self._msgpack_stream.threadsafe_call(fn) - def request(self, method, args, response_cb): + def request(self, method: AnyStr, args: Any, + response_cb: ResponseCallback) -> None: """Send a msgpack-rpc request to Nvim. A msgpack-rpc with method `method` and argument `args` is sent to @@ -89,8 +96,9 @@ def _on_request(self, msg): # - msg[2]: method name # - msg[3]: arguments debug('received request: %s, %s', msg[2], msg[3]) - self._request_cb(msg[2], msg[3], Response(self._msgpack_stream, - msg[1])) + assert self._request_cb is not None + self._request_cb(msg[2], msg[3], + Response(self._msgpack_stream, msg[1])) def _on_response(self, msg): # response to a previous request: @@ -105,6 +113,7 @@ def _on_notification(self, msg): # - msg[1]: event name # - msg[2]: arguments debug('received notification: %s, %s', msg[1], msg[2]) + assert self._notification_cb is not None self._notification_cb(msg[1], msg[2]) def _on_invalid_message(self, msg): @@ -113,15 +122,14 @@ def _on_invalid_message(self, msg): self._msgpack_stream.send([1, 0, error, None]) -class Response(object): - +class Response: """Response to a msgpack-rpc request that came from Nvim. When Nvim sends a msgpack-rpc request, an instance of this class is created for remembering state required to send a response. """ - def __init__(self, msgpack_stream, request_id): + def __init__(self, msgpack_stream: MsgpackStream, request_id: int): """Initialize the Response instance.""" self._msgpack_stream = msgpack_stream self._request_id = request_id diff --git a/pynvim/msgpack_rpc/event_loop/__init__.py b/pynvim/msgpack_rpc/event_loop/__init__.py index e94cdbfe..1cf40a77 100644 --- a/pynvim/msgpack_rpc/event_loop/__init__.py +++ b/pynvim/msgpack_rpc/event_loop/__init__.py @@ -1,6 +1,6 @@ """Event loop abstraction subpackage. -Tries to use pyuv as a backend, falling back to the asyncio implementation. +We use python's built-in asyncio as the backend. """ from pynvim.msgpack_rpc.event_loop.asyncio import AsyncioEventLoop as EventLoop diff --git a/pynvim/msgpack_rpc/event_loop/asyncio.py b/pynvim/msgpack_rpc/event_loop/asyncio.py index 164173b8..532fbef6 100644 --- a/pynvim/msgpack_rpc/event_loop/asyncio.py +++ b/pynvim/msgpack_rpc/event_loop/asyncio.py @@ -1,12 +1,4 @@ -"""Event loop implementation that uses the `asyncio` standard module. - -The `asyncio` module was added to python standard library on 3.4, and it -provides a pure python implementation of an event loop library. It is used -as a fallback in case pyuv is not available(on python implementations other -than CPython). - -""" -from __future__ import absolute_import +"""Event loop implementation that uses the `asyncio` standard module.""" import asyncio import logging @@ -14,17 +6,23 @@ import sys from collections import deque from signal import Signals -from typing import Any, Callable, Deque, List, Optional +from typing import Any, Callable, Deque, List, Optional, cast + +if sys.version_info >= (3, 12): + from typing import Final, override +else: + from typing_extensions import Final, override -from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop, TTransportType logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) loop_cls = asyncio.SelectorEventLoop + if os.name == 'nt': + import msvcrt # pylint: disable=import-error from asyncio.windows_utils import PipeHandle # type: ignore[attr-defined] - import msvcrt # On windows use ProactorEventLoop which support pipes and is backed by the # more powerful IOCP facility @@ -32,134 +30,249 @@ loop_cls = asyncio.ProactorEventLoop # type: ignore[attr-defined,misc] -class AsyncioEventLoop(BaseEventLoop, asyncio.Protocol, - asyncio.SubprocessProtocol): - """`BaseEventLoop` subclass that uses `asyncio` as a backend.""" +# pylint: disable=logging-fstring-interpolation - _queued_data: Deque[bytes] - if os.name != 'nt': - _child_watcher: Optional['asyncio.AbstractChildWatcher'] +class Protocol(asyncio.Protocol, asyncio.SubprocessProtocol): + """The protocol class used for asyncio-based RPC communication.""" + + def __init__(self, on_data, on_error): + """Initialize the Protocol object.""" + assert on_data is not None + assert on_error is not None + self._on_data = on_data + self._on_error = on_error + @override def connection_made(self, transport): """Used to signal `asyncio.Protocol` of a successful connection.""" - self._transport = transport - self._raw_transport = transport - if isinstance(transport, asyncio.SubprocessTransport): - self._transport = transport.get_pipe_transport(0) + del transport # no-op - def connection_lost(self, exc): + @override + def connection_lost(self, exc: Optional[Exception]) -> None: """Used to signal `asyncio.Protocol` of a lost connection.""" - self._on_error(exc.args[0] if exc else 'EOF') + debug(f"connection_lost: exc = {exc}") + self._on_error(exc if exc else EOFError()) + @override def data_received(self, data: bytes) -> None: """Used to signal `asyncio.Protocol` of incoming data.""" - if self._on_data: - self._on_data(data) - return - self._queued_data.append(data) + self._on_data(data) - def pipe_connection_lost(self, fd, exc): + @override + def pipe_connection_lost(self, fd: int, exc: Optional[Exception]) -> None: """Used to signal `asyncio.SubprocessProtocol` of a lost connection.""" debug("pipe_connection_lost: fd = %s, exc = %s", fd, exc) if os.name == 'nt' and fd == 2: # stderr # On windows, ignore piped stderr being closed immediately (#505) return - self._on_error(exc.args[0] if exc else 'EOF') + self._on_error(exc if exc else EOFError()) + @override def pipe_data_received(self, fd, data): """Used to signal `asyncio.SubprocessProtocol` of incoming data.""" if fd == 2: # stderr fd number # Ignore stderr message, log only for debugging debug("stderr: %s", str(data)) - elif self._on_data: - self._on_data(data) - else: - self._queued_data.append(data) + elif fd == 1: # stdout + self.data_received(data) + @override def process_exited(self) -> None: """Used to signal `asyncio.SubprocessProtocol` when the child exits.""" - self._on_error('EOF') + debug("process_exited") + self._on_error(EOFError()) + + +class AsyncioEventLoop(BaseEventLoop): + """`BaseEventLoop` subclass that uses core `asyncio` as a backend.""" + + _protocol: Optional[Protocol] + _transport: Optional[asyncio.WriteTransport] + _signals: List[Signals] + _data_buffer: Deque[bytes] + if os.name != 'nt': + _child_watcher: Optional['asyncio.AbstractChildWatcher'] + + def __init__(self, + transport_type: TTransportType, + *args: Any, **kwargs: Any): + """asyncio-specific initialization. see BaseEventLoop.__init__.""" + + # The underlying asyncio event loop. + self._loop: Final[asyncio.AbstractEventLoop] = loop_cls() + + # Handle messages from nvim that may arrive before run() starts. + self._data_buffer = deque() - def _init(self) -> None: - self._loop = loop_cls() - self._queued_data = deque() - self._fact = lambda: self - self._raw_transport = None + def _on_data(data: bytes) -> None: + if self._on_data is None: + self._data_buffer.append(data) + return + self._on_data(data) + + # pylint: disable-next=unnecessary-lambda + self._protocol_factory = lambda: Protocol( + on_data=_on_data, + on_error=self._on_error, + ) + self._protocol = None + + # The communication channel (endpoint) created by _connect_*() methods, + # where we write request messages to be sent to neovim + self._transport = None + self._to_close: List[asyncio.BaseTransport] = [] self._child_watcher = None + super().__init__(transport_type, *args, **kwargs) + + @override def _connect_tcp(self, address: str, port: int) -> None: - coroutine = self._loop.create_connection(self._fact, address, port) - self._loop.run_until_complete(coroutine) + async def connect_tcp(): + transport, protocol = await self._loop.create_connection( + self._protocol_factory, address, port) + debug(f"tcp connection successful: {address}:{port}") + self._transport = transport + self._protocol = protocol + self._loop.run_until_complete(connect_tcp()) + + @override def _connect_socket(self, path: str) -> None: - if os.name == 'nt': - coroutine = self._loop.create_pipe_connection( # type: ignore[attr-defined] - self._fact, path - ) - else: - coroutine = self._loop.create_unix_connection(self._fact, path) - self._loop.run_until_complete(coroutine) + async def connect_socket(): + if os.name == 'nt': + _create_connection = self._loop.create_pipe_connection + else: + _create_connection = self._loop.create_unix_connection + transport, protocol = await _create_connection( + self._protocol_factory, path) + debug("socket connection successful: %s", self._transport) + self._transport = transport + self._protocol = protocol + + self._loop.run_until_complete(connect_socket()) + + @override def _connect_stdio(self) -> None: - if os.name == 'nt': - pipe: Any = PipeHandle( - msvcrt.get_osfhandle(sys.stdin.fileno()) # type: ignore[attr-defined] - ) - else: - pipe = sys.stdin - coroutine = self._loop.connect_read_pipe(self._fact, pipe) - self._loop.run_until_complete(coroutine) - debug("native stdin connection successful") + async def connect_stdin(): + if os.name == 'nt': + pipe = PipeHandle(msvcrt.get_osfhandle(sys.stdin.fileno())) + else: + pipe = sys.stdin + transport, protocol = await self._loop.connect_read_pipe( + self._protocol_factory, pipe) + debug("native stdin connection successful") + self._to_close.append(transport) + del protocol + self._loop.run_until_complete(connect_stdin()) # Make sure subprocesses don't clobber stdout, # send the output to stderr instead. rename_stdout = os.dup(sys.stdout.fileno()) os.dup2(sys.stderr.fileno(), sys.stdout.fileno()) - if os.name == 'nt': - pipe = PipeHandle( - msvcrt.get_osfhandle(rename_stdout) # type: ignore[attr-defined] - ) - else: - pipe = os.fdopen(rename_stdout, 'wb') - coroutine = self._loop.connect_write_pipe(self._fact, pipe) # type: ignore[assignment] - self._loop.run_until_complete(coroutine) - debug("native stdout connection successful") + async def connect_stdout(): + if os.name == 'nt': + pipe = PipeHandle(msvcrt.get_osfhandle(rename_stdout)) + else: + pipe = os.fdopen(rename_stdout, 'wb') + transport, protocol = await self._loop.connect_write_pipe( + self._protocol_factory, pipe) + debug("native stdout connection successful") + self._transport = transport + self._protocol = protocol + self._loop.run_until_complete(connect_stdout()) + + @override def _connect_child(self, argv: List[str]) -> None: if os.name != 'nt': + # see #238, #241 self._child_watcher = asyncio.get_child_watcher() self._child_watcher.attach_loop(self._loop) - coroutine = self._loop.subprocess_exec(self._fact, *argv) - self._loop.run_until_complete(coroutine) + async def create_subprocess(): + transport: asyncio.SubprocessTransport # type: ignore + transport, protocol = await self._loop.subprocess_exec( + self._protocol_factory, *argv) + pid = transport.get_pid() + debug("child subprocess_exec successful, PID = %s", pid) + + self._transport = cast(asyncio.WriteTransport, + transport.get_pipe_transport(0)) # stdin + self._protocol = protocol + + # proactor transport implementations do not close the pipes + # automatically, so make sure they are closed upon shutdown + def _close_later(transport): + if transport is not None: + self._to_close.append(transport) + + _close_later(transport.get_pipe_transport(1)) + _close_later(transport.get_pipe_transport(2)) + _close_later(transport) + + # await until child process have been launched and the transport has + # been established + self._loop.run_until_complete(create_subprocess()) + + @override def _start_reading(self) -> None: pass + @override def _send(self, data: bytes) -> None: + assert self._transport, "connection has not been established." self._transport.write(data) + @override def _run(self) -> None: - while self._queued_data: - data = self._queued_data.popleft() + # process the early messages that arrived as soon as the transport + # channels are open and on_data is fully ready to receive messages. + while self._data_buffer: + data: bytes = self._data_buffer.popleft() if self._on_data is not None: self._on_data(data) + self._loop.run_forever() + @override def _stop(self) -> None: self._loop.stop() + @override def _close(self) -> None: - if self._raw_transport is not None: - self._raw_transport.close() + def _close_transport(transport): + transport.close() + + # Windows: for ProactorBasePipeTransport, close() doesn't take in + # effect immediately (closing happens asynchronously inside the + # event loop), need to wait a bit for completing graceful shutdown. + if os.name == 'nt' and hasattr(transport, '_sock'): + async def wait_until_closed(): + # pylint: disable-next=protected-access + while transport._sock is not None: + await asyncio.sleep(0.01) + self._loop.run_until_complete(wait_until_closed()) + + if self._transport: + _close_transport(self._transport) + self._transport = None + for transport in self._to_close: + _close_transport(transport) + self._to_close[:] = [] + self._loop.close() + if self._child_watcher is not None: self._child_watcher.close() self._child_watcher = None + @override def _threadsafe_call(self, fn: Callable[[], Any]) -> None: self._loop.call_soon_threadsafe(fn) + @override def _setup_signals(self, signals: List[Signals]) -> None: if os.name == 'nt': # add_signal_handler is not supported in win32 @@ -170,6 +283,7 @@ def _setup_signals(self, signals: List[Signals]) -> None: for signum in self._signals: self._loop.add_signal_handler(signum, self._on_signal, signum) + @override def _teardown_signals(self) -> None: for signum in self._signals: self._loop.remove_signal_handler(signum) diff --git a/pynvim/msgpack_rpc/event_loop/base.py b/pynvim/msgpack_rpc/event_loop/base.py index 86fde9c2..c7def3e1 100644 --- a/pynvim/msgpack_rpc/event_loop/base.py +++ b/pynvim/msgpack_rpc/event_loop/base.py @@ -4,7 +4,7 @@ import sys import threading from abc import ABC, abstractmethod -from typing import Any, Callable, List, Optional, Type, Union +from typing import Any, Callable, List, Optional, Union if sys.version_info < (3, 8): from typing_extensions import Literal @@ -28,15 +28,28 @@ Literal['child'] ] +# TODO: Since pynvim now supports python 3, the only available backend of the +# msgpack_rpc BaseEventLoop is the built-in asyncio (see #294). We will have +# to remove some unnecessary abstractions as well as greenlet. See also #489 -class BaseEventLoop(ABC): +class BaseEventLoop(ABC): """Abstract base class for all event loops. Event loops act as the bottom layer for Nvim sessions created by this library. They hide system/transport details behind a simple interface for reading/writing bytes to the connected Nvim instance. + A lifecycle of event loop is as follows: (1. -> [2. -> 3.]* -> 4.) + 1. initialization (__init__): connection to Nvim is established. + 2. run(data_cb): run the event loop (blocks until the loop stops). + Requests are sent to the remote neovim by calling send(), and + responses (messages) from the remote neovim will be passed to the + given `data_cb` callback function while the event loop is running. + Note that run() may be called multiple times. + 3. stop(): stop the event loop. + 4. close(): close the event loop, destroying all the internal resources. + This class exposes public methods for interacting with the underlying event loop and delegates implementation-specific work to the following methods, which subclasses are expected to implement: @@ -50,15 +63,17 @@ class BaseEventLoop(ABC): embedded Nvim that has its stdin/stdout connected to the event loop. - `_start_reading()`: Called after any of _connect_* methods. Can be used to perform any post-connection setup or validation. - - `_send(data)`: Send `data`(byte array) to Nvim. The data is only + - `_send(data)`: Send `data` (byte array) to Nvim (usually RPC request). - `_run()`: Runs the event loop until stopped or the connection is closed. - calling the following methods when some event happens: - actually sent when the event loop is running. - - `_on_data(data)`: When Nvim sends some data. + The following methods can be called upon some events by the event loop: + - `_on_data(data)`: When Nvim sends some data (usually RPC response). - `_on_signal(signum)`: When a signal is received. - - `_on_error(message)`: When a non-recoverable error occurs(eg: - connection lost) - - `_stop()`: Stop the event loop + - `_on_error(exc)`: When a non-recoverable error occurs (e.g: + connection lost, or any other OSError) + Note that these _on_{data,signal,error} methods are not 'final', may be + changed around an execution of run(). The subclasses are expected to + handle any early messages arriving while _on_data is not yet set. + - `_stop()`: Stop the event loop. - `_interrupt(data)`: Like `stop()`, but may be called from other threads this. - `_setup_signals(signals)`: Add implementation-specific listeners for @@ -73,33 +88,20 @@ def __init__(self, transport_type: TTransportType, *args: Any, **kwargs: Any): configuration, like this: >>> BaseEventLoop('tcp', '127.0.0.1', 7450) - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' >>> BaseEventLoop('socket', '/tmp/nvim-socket') - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' >>> BaseEventLoop('stdio') - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' - >>> BaseEventLoop('child', - ['nvim', '--embed', '--headless', '-u', 'NONE']) - Traceback (most recent call last): - ... - AttributeError: 'BaseEventLoop' object has no attribute '_init' - - This calls the implementation-specific initialization - `_init`, one of the `_connect_*` methods(based on `transport_type`) - and `_start_reading()` + >>> BaseEventLoop('child', ['nvim', '--embed', '--headless', '-u', 'NONE']) + + Implementation-specific initialization should be made in the __init__ + constructor of the subclass, which must call the constructor of the + super class (BaseEventLoop), in which one of the `_connect_*` methods + (based on `transport_type`) and then `_start_reading()`. """ self._transport_type = transport_type self._signames = dict((k, v) for v, k in signal.__dict__.items() if v.startswith('SIG')) self._on_data: Optional[Callable[[bytes], None]] = None self._error: Optional[BaseException] = None - self._init() try: getattr(self, '_connect_{}'.format(transport_type))(*args, **kwargs) except Exception as e: @@ -107,10 +109,6 @@ def __init__(self, transport_type: TTransportType, *args: Any, **kwargs: Any): raise e self._start_reading() - @abstractmethod - def _init(self) -> None: - raise NotImplementedError() - @abstractmethod def _start_reading(self) -> None: raise NotImplementedError() @@ -168,17 +166,23 @@ def threadsafe_call(self, fn): """ self._threadsafe_call(fn) - def run(self, data_cb): - """Run the event loop.""" + @abstractmethod + def _threadsafe_call(self, fn: Callable[[], Any]) -> None: + raise NotImplementedError() + + def run(self, data_cb: Callable[[bytes], None]) -> None: + """Run the event loop, and receives response messages to a callback.""" if self._error: err = self._error if isinstance(self._error, KeyboardInterrupt): - # KeyboardInterrupt is not destructive(it may be used in + # KeyboardInterrupt is not destructive (it may be used in # the REPL). # After throwing KeyboardInterrupt, cleanup the _error field # so the loop may be started again self._error = None raise err + + # data_cb: e.g., MsgpackStream._on_data self._on_data = data_cb if threading.current_thread() == main_thread: self._setup_signals([signal.SIGINT, signal.SIGTERM]) @@ -190,6 +194,10 @@ def run(self, data_cb): signal.signal(signal.SIGINT, default_int_handler) self._on_data = None + @abstractmethod + def _run(self) -> None: + raise NotImplementedError() + def stop(self) -> None: """Stop the event loop.""" self._stop() @@ -209,23 +217,32 @@ def _close(self) -> None: raise NotImplementedError() def _on_signal(self, signum: signal.Signals) -> None: - msg = 'Received {}'.format(self._signames[signum]) + # pylint: disable-next=consider-using-f-string + msg = 'Received signal {}'.format(self._signames[signum]) debug(msg) + if signum == signal.SIGINT and self._transport_type == 'stdio': # When the transport is stdio, we are probably running as a Nvim # child process. In that case, we don't want to be killed by # ctrl+C return - cls: Type[BaseException] = Exception + if signum == signal.SIGINT: - cls = KeyboardInterrupt - self._error = cls(msg) + self._error = KeyboardInterrupt() + else: + self._error = Exception(msg) self.stop() - def _on_error(self, error: str) -> None: - debug(error) - self._error = OSError(error) + def _on_error(self, exc: Exception) -> None: + debug(str(exc)) + self._error = exc self.stop() def _on_interrupt(self) -> None: self.stop() + + def _setup_signals(self, signals: List[signal.Signals]) -> None: + pass # no-op by default + + def _teardown_signals(self) -> None: + pass # no-op by default diff --git a/pynvim/msgpack_rpc/event_loop/uv.py b/pynvim/msgpack_rpc/event_loop/uv.py deleted file mode 100644 index 969187ee..00000000 --- a/pynvim/msgpack_rpc/event_loop/uv.py +++ /dev/null @@ -1,124 +0,0 @@ -"""Event loop implementation that uses pyuv(libuv-python bindings).""" -import sys -from collections import deque - -import pyuv - -from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop - - -class UvEventLoop(BaseEventLoop): - - """`BaseEventLoop` subclass that uses `pvuv` as a backend.""" - - def _init(self): - self._loop = pyuv.Loop() - self._async = pyuv.Async(self._loop, self._on_async) - self._connection_error = None - self._error_stream = None - self._callbacks = deque() - - def _on_connect(self, stream, error): - self.stop() - if error: - msg = 'Cannot connect to {}: {}'.format( - self._connect_address, pyuv.errno.strerror(error)) - self._connection_error = OSError(msg) - return - self._read_stream = self._write_stream = stream - - def _on_read(self, handle, data, error): - if error or not data: - msg = pyuv.errno.strerror(error) if error else 'EOF' - self._on_error(msg) - return - if handle == self._error_stream: - return - self._on_data(data) - - def _on_write(self, handle, error): - if error: - msg = pyuv.errno.strerror(error) - self._on_error(msg) - - def _on_exit(self, handle, exit_status, term_signal): - self._on_error('EOF') - - def _disconnected(self, *args): - raise OSError('Not connected to Nvim') - - def _connect_tcp(self, address, port): - stream = pyuv.TCP(self._loop) - self._connect_address = '{}:{}'.format(address, port) - stream.connect((address, port), self._on_connect) - - def _connect_socket(self, path): - stream = pyuv.Pipe(self._loop) - self._connect_address = path - stream.connect(path, self._on_connect) - - def _connect_stdio(self): - self._read_stream = pyuv.Pipe(self._loop) - self._read_stream.open(sys.stdin.fileno()) - self._write_stream = pyuv.Pipe(self._loop) - self._write_stream.open(sys.stdout.fileno()) - - def _connect_child(self, argv): - self._write_stream = pyuv.Pipe(self._loop) - self._read_stream = pyuv.Pipe(self._loop) - self._error_stream = pyuv.Pipe(self._loop) - stdin = pyuv.StdIO(self._write_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_READABLE_PIPE) - stdout = pyuv.StdIO(self._read_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE) - stderr = pyuv.StdIO(self._error_stream, - flags=pyuv.UV_CREATE_PIPE + pyuv.UV_WRITABLE_PIPE) - pyuv.Process.spawn(self._loop, - args=argv, - exit_callback=self._on_exit, - flags=pyuv.UV_PROCESS_WINDOWS_HIDE, - stdio=(stdin, stdout, stderr,)) - self._error_stream.start_read(self._on_read) - - def _start_reading(self): - if self._transport_type in ['tcp', 'socket']: - self._loop.run() - if self._connection_error: - self.run = self.send = self._disconnected - raise self._connection_error - self._read_stream.start_read(self._on_read) - - def _send(self, data): - self._write_stream.write(data, self._on_write) - - def _run(self): - self._loop.run(pyuv.UV_RUN_DEFAULT) - - def _stop(self): - self._loop.stop() - - def _close(self): - pass - - def _threadsafe_call(self, fn): - self._callbacks.append(fn) - self._async.send() - - def _on_async(self, handle): - while self._callbacks: - self._callbacks.popleft()() - - def _setup_signals(self, signals): - self._signal_handles = [] - - def handler(h, signum): - self._on_signal(signum) - - for signum in signals: - handle = pyuv.Signal(self._loop) - handle.start(handler, signum) - self._signal_handles.append(handle) - - def _teardown_signals(self): - for handle in self._signal_handles: - handle.stop() diff --git a/pynvim/msgpack_rpc/msgpack_stream.py b/pynvim/msgpack_rpc/msgpack_stream.py index 49340c50..f209d849 100644 --- a/pynvim/msgpack_rpc/msgpack_stream.py +++ b/pynvim/msgpack_rpc/msgpack_stream.py @@ -4,20 +4,20 @@ from msgpack import Packer, Unpacker from pynvim.compat import unicode_errors_default +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop logger = logging.getLogger(__name__) debug, info, warn = (logger.debug, logger.info, logger.warning,) -class MsgpackStream(object): - +class MsgpackStream: """Two-way msgpack stream that wraps a event loop byte stream. This wraps the event loop interface for reading/writing bytes and exposes an interface for reading/writing msgpack documents. """ - def __init__(self, event_loop): + def __init__(self, event_loop: BaseEventLoop) -> None: """Wrap `event_loop` on a msgpack-aware interface.""" self.loop = event_loop self._packer = Packer(unicode_errors=unicode_errors_default) @@ -30,7 +30,7 @@ def threadsafe_call(self, fn): def send(self, msg): """Queue `msg` for sending to Nvim.""" - debug('sent %s', msg) + debug('sending %s', msg) self.loop.send(self._packer.pack(msg)) def run(self, message_cb): @@ -51,14 +51,15 @@ def close(self): """Close the event loop.""" self.loop.close() - def _on_data(self, data): + def _on_data(self, data: bytes) -> None: self._unpacker.feed(data) while True: try: debug('waiting for message...') msg = next(self._unpacker) debug('received message: %s', msg) - self._message_cb(msg) + assert self._message_cb is not None + self._message_cb(msg) # type: ignore[unreachable] except StopIteration: debug('unpacker needs more data...') break diff --git a/pynvim/msgpack_rpc/session.py b/pynvim/msgpack_rpc/session.py index 453f218b..4e8f6259 100644 --- a/pynvim/msgpack_rpc/session.py +++ b/pynvim/msgpack_rpc/session.py @@ -11,6 +11,7 @@ from pynvim.compat import check_async from pynvim.msgpack_rpc.async_session import AsyncSession +from pynvim.msgpack_rpc.event_loop.base import BaseEventLoop if sys.version_info < (3, 8): from typing_extensions import Literal @@ -42,7 +43,7 @@ class Notification(NamedTuple): Message = Union[Request, Notification] -class Session(object): +class Session: """Msgpack-rpc session layer that uses coroutines for a synchronous API. @@ -59,11 +60,15 @@ def __init__(self, async_session: AsyncSession): self._pending_messages: Deque[Message] = deque() self._is_running = False self._setup_exception: Optional[Exception] = None - self.loop = async_session.loop self._loop_thread: Optional[threading.Thread] = None self.error_wrapper: Callable[[Tuple[int, str]], Exception] = \ lambda e: Exception(e[1]) + @property + def loop(self) -> BaseEventLoop: + """Get the underlying msgpack EventLoop.""" + return self._async_session.loop + def threadsafe_call( self, fn: Callable[..., Any], *args: Any, **kwargs: Any ) -> None: diff --git a/setup.cfg b/setup.cfg index 3b593d6b..de3fce30 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,7 +2,7 @@ test = pytest [flake8] -extend-ignore = D211,E731,D401,W503 +extend-ignore = D211,E731,D401,W503,D202 max-line-length = 100 per-file-ignores = test/*:D1 diff --git a/setup.py b/setup.py index dea5ae0f..f4ebee0c 100644 --- a/setup.py +++ b/setup.py @@ -23,7 +23,6 @@ ] extras_require = { - 'pyuv': ['pyuv>=1.0.0'], 'test': tests_require, } @@ -31,8 +30,8 @@ # pypy already includes an implementation of the greenlet module install_requires.append('greenlet>=3.0') -if sys.version_info < (3, 8): - install_requires.append('typing-extensions') +if sys.version_info < (3, 12): + install_requires.append('typing-extensions>=4.5') # __version__: see pynvim/_version.py diff --git a/tox.ini b/tox.ini index a57a7909..2252504c 100644 --- a/tox.ini +++ b/tox.ini @@ -25,7 +25,6 @@ extras = test deps = pytest-timeout # cov: pytest-cov -# pyuv: pyuv # setenv = # cov: PYTEST_ADDOPTS=--cov=. {env:PYTEST_ADDOPTS:} # passenv = PYTEST_ADDOPTS