From 92df7475490588038e0b0584c50f971949577725 Mon Sep 17 00:00:00 2001 From: Aymeric Augustin Date: Wed, 14 Aug 2024 21:23:53 +0200 Subject: [PATCH] Add max_queue parameter to the new asyncio API. Also remove set_limits and get_limits because they aren't used internally and won't be exposed as public APIs. --- docs/howto/upgrade.rst | 78 ++++++++++++++++++---------- src/websockets/asyncio/client.py | 8 +++ src/websockets/asyncio/connection.py | 13 +++-- src/websockets/asyncio/messages.py | 21 ++++---- src/websockets/asyncio/server.py | 8 +++ tests/asyncio/test_connection.py | 15 ++++++ tests/asyncio/test_messages.py | 35 ++++++++----- 7 files changed, 121 insertions(+), 57 deletions(-) diff --git a/docs/howto/upgrade.rst b/docs/howto/upgrade.rst index c23068fb..10e8967d 100644 --- a/docs/howto/upgrade.rst +++ b/docs/howto/upgrade.rst @@ -115,23 +115,6 @@ In other words, the following pattern isn't supported:: async for websocket in connect(...): # this doesn't work yet ... -Configuring buffers -................... - -The new implementation doesn't provide a way to configure read buffers yet. - -In practice, :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve` -don't accept the ``max_queue`` and ``read_limit`` arguments. - -Here's the most likely outcome: - -* ``max_queue`` will be implemented but its semantics will change from "maximum - number of messages" to "maximum number of frames", which makes a difference - when messages are fragmented. -* ``read_limit`` won't be implemented because the buffer that it configured was - removed from the new implementation. The queue that ``max_queue`` configures - is the only read buffer now. - .. _Update import paths: Import paths @@ -337,21 +320,60 @@ client. The list of subprotocols supported by the server was removed because ``select_subprotocols`` already knows which subprotocols it may select and under which conditions. -Miscellaneous changes -..................... +Arguments of :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve` +.............................................................................. + +``ws_handler`` → ``handler`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The first argument of :func:`~asyncio.server.serve` is called ``handler`` instead -of ``ws_handler``. It's usually passed as a positional argument, making this -change transparent. If you're passing it as a keyword argument, you must update -its name. +The first argument of :func:`~asyncio.server.serve` is now called ``handler`` +instead of ``ws_handler``. It's usually passed as a positional argument, making +this change transparent. If you're passing it as a keyword argument, you must +update its name. + +``create_protocol`` → ``create_connection`` +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The keyword argument of :func:`~asyncio.server.serve` for customizing the -creation of the connection object is called ``create_connection`` instead of +creation of the connection object is now called ``create_connection`` instead of ``create_protocol``. It must return a :class:`~asyncio.server.ServerConnection` -instead of a :class:`~server.WebSocketServerProtocol`. If you were customizing -connection objects, you should check the new implementation and possibly redo -your customization. Keep in mind that the changes to ``process_request`` and -``select_subprotocol`` remove most use cases for ``create_connection``. +instead of a :class:`~server.WebSocketServerProtocol`. + +If you were customizing connection objects, you should check the new +implementation and possibly redo your customization. Keep in mind that the +changes to ``process_request`` and ``select_subprotocol`` remove most use cases +for ``create_connection``. + +``max_queue`` +~~~~~~~~~~~~~ + +The ``max_queue`` argument of :func:`~asyncio.client.connect` and +:func:`~asyncio.server.serve` has a new meaning but achieves a similar effect. + +It is now the high-water mark of a buffer of incoming frames. It defaults to 16 +frames. It used to be the size of a buffer of incoming messages that refilled as +soon as a message was read. It used to default to 32 messages. + +This can make a difference when messages are fragmented in several frames. In +that case, you may want to increase ``max_queue``. If you're writing a high +performance server and you know that you're receiving fragmented messages, +probably you should adopt :meth:`~asyncio.connection.Connection.recv_streaming` +and optimize the performance of reads again. In all other cases, given how +uncommon fragmentation is, you shouldn't worry about this change. + +``read_limit`` +~~~~~~~~~~~~~~ + +The ``read_limit`` argument doesn't exist in the new implementation because it +doesn't buffer data received from the network in a +:class:`~asyncio.StreamReader`. With a better design, this buffer could be +removed. + +The buffer of incoming frames configured by ``max_queue`` is the only read +buffer now. + +``write_limit`` +~~~~~~~~~~~~~~~ The ``write_limit`` argument of :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve` defaults to 32 KiB instead of 64 KiB. diff --git a/src/websockets/asyncio/client.py b/src/websockets/asyncio/client.py index 5a764061..fb9c8567 100644 --- a/src/websockets/asyncio/client.py +++ b/src/websockets/asyncio/client.py @@ -49,12 +49,14 @@ def __init__( protocol: ClientProtocol, *, close_timeout: float | None = 10, + max_queue: int | tuple[int, int] = 16, write_limit: int | tuple[int, int] = 2**15, ) -> None: self.protocol: ClientProtocol super().__init__( protocol, close_timeout=close_timeout, + max_queue=max_queue, write_limit=write_limit, ) self.response_rcvd: asyncio.Future[None] = self.loop.create_future() @@ -148,6 +150,10 @@ class connect: :obj:`None` disables the timeout. max_size: Maximum size of incoming messages in bytes. :obj:`None` disables the limit. + max_queue: High-water mark of the buffer where frames are received. + It defaults to 16 frames. The low-water mark defaults to ``max_queue + // 4``. You may pass a ``(high, low)`` tuple to set the high-water + and low-water marks. write_limit: High-water mark of write buffer in bytes. It is passed to :meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults to 32 KiB. You may pass a ``(high, low)`` tuple to set the @@ -205,6 +211,7 @@ def __init__( close_timeout: float | None = 10, # Limits max_size: int | None = 2**20, + max_queue: int | tuple[int, int] = 16, write_limit: int | tuple[int, int] = 2**15, # Logging logger: LoggerLike | None = None, @@ -250,6 +257,7 @@ def factory() -> ClientConnection: connection = create_connection( protocol, close_timeout=close_timeout, + max_queue=max_queue, write_limit=write_limit, ) return connection diff --git a/src/websockets/asyncio/connection.py b/src/websockets/asyncio/connection.py index d2359915..2de704df 100644 --- a/src/websockets/asyncio/connection.py +++ b/src/websockets/asyncio/connection.py @@ -48,10 +48,14 @@ def __init__( protocol: Protocol, *, close_timeout: float | None = 10, + max_queue: int | tuple[int, int] = 16, write_limit: int | tuple[int, int] = 2**15, ) -> None: self.protocol = protocol self.close_timeout = close_timeout + if isinstance(max_queue, int): + max_queue = (max_queue, None) + self.max_queue = max_queue if isinstance(write_limit, int): write_limit = (write_limit, None) self.write_limit = write_limit @@ -807,12 +811,13 @@ def close_transport(self) -> None: def connection_made(self, transport: asyncio.BaseTransport) -> None: transport = cast(asyncio.Transport, transport) - transport.set_write_buffer_limits(*self.write_limit) - self.transport = transport self.recv_messages = Assembler( - pause=self.transport.pause_reading, - resume=self.transport.resume_reading, + *self.max_queue, + pause=transport.pause_reading, + resume=transport.resume_reading, ) + transport.set_write_buffer_limits(*self.write_limit) + self.transport = transport def connection_lost(self, exc: Exception | None) -> None: self.protocol.receive_eof() # receive_eof is idempotent diff --git a/src/websockets/asyncio/messages.py b/src/websockets/asyncio/messages.py index bc33df8d..33ab6a5e 100644 --- a/src/websockets/asyncio/messages.py +++ b/src/websockets/asyncio/messages.py @@ -89,6 +89,8 @@ class Assembler: # coverage reports incorrectly: "line NN didn't jump to the function exit" def __init__( # pragma: no cover self, + high: int = 16, + low: int | None = None, pause: Callable[[], Any] = lambda: None, resume: Callable[[], Any] = lambda: None, ) -> None: @@ -99,11 +101,16 @@ def __init__( # pragma: no cover # call to Protocol.data_received() could produce thousands of frames, # which must be buffered. Instead, we pause reading when the buffer goes # above the high limit and we resume when it goes under the low limit. - self.high = 16 - self.low = 4 - self.paused = False + if low is None: + low = high // 4 + if low < 0: + raise ValueError("low must be positive or equal to zero") + if high < low: + raise ValueError("high must be greater than or equal to low") + self.high, self.low = high, low self.pause = pause self.resume = resume + self.paused = False # This flag prevents concurrent calls to get() by user code. self.get_in_progress = False @@ -254,14 +261,6 @@ def put(self, frame: Frame) -> None: self.frames.put(frame) self.maybe_pause() - def get_limits(self) -> tuple[int, int]: - """Return low and high water marks for flow control.""" - return self.low, self.high - - def set_limits(self, low: int = 4, high: int = 16) -> None: - """Configure low and high water marks for flow control.""" - self.low, self.high = low, high - def maybe_pause(self) -> None: """Pause the writer if queue is above the high water mark.""" # Check for "> high" to support high = 0 diff --git a/src/websockets/asyncio/server.py b/src/websockets/asyncio/server.py index b9ac50e3..67c46332 100644 --- a/src/websockets/asyncio/server.py +++ b/src/websockets/asyncio/server.py @@ -62,12 +62,14 @@ def __init__( server: WebSocketServer, *, close_timeout: float | None = 10, + max_queue: int | tuple[int, int] = 16, write_limit: int | tuple[int, int] = 2**15, ) -> None: self.protocol: ServerProtocol super().__init__( protocol, close_timeout=close_timeout, + max_queue=max_queue, write_limit=write_limit, ) self.server = server @@ -576,6 +578,10 @@ def handler(websocket): :obj:`None` disables the timeout. max_size: Maximum size of incoming messages in bytes. :obj:`None` disables the limit. + max_queue: High-water mark of the buffer where frames are received. + It defaults to 16 frames. The low-water mark defaults to ``max_queue + // 4``. You may pass a ``(high, low)`` tuple to set the high-water + and low-water marks. write_limit: High-water mark of write buffer in bytes. It is passed to :meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults to 32 KiB. You may pass a ``(high, low)`` tuple to set the @@ -643,6 +649,7 @@ def __init__( close_timeout: float | None = 10, # Limits max_size: int | None = 2**20, + max_queue: int | tuple[int, int] = 16, write_limit: int | tuple[int, int] = 2**15, # Logging logger: LoggerLike | None = None, @@ -716,6 +723,7 @@ def protocol_select_subprotocol( protocol, self.server, close_timeout=close_timeout, + max_queue=max_queue, write_limit=write_limit, ) return connection diff --git a/tests/asyncio/test_connection.py b/tests/asyncio/test_connection.py index 2932077c..02029b75 100644 --- a/tests/asyncio/test_connection.py +++ b/tests/asyncio/test_connection.py @@ -874,6 +874,21 @@ async def test_close_timeout(self): connection = Connection(Protocol(self.LOCAL), close_timeout=42 * MS) self.assertEqual(connection.close_timeout, 42 * MS) + async def test_max_queue(self): + """max_queue parameter configures high-water mark of frames buffer.""" + connection = Connection(Protocol(self.LOCAL), max_queue=4) + transport = Mock() + connection.connection_made(transport) + self.assertEqual(connection.recv_messages.high, 4) + + async def test_max_queue_tuple(self): + """max_queue parameter configures high-water mark of frames buffer.""" + connection = Connection(Protocol(self.LOCAL), max_queue=(4, 2)) + transport = Mock() + connection.connection_made(transport) + self.assertEqual(connection.recv_messages.high, 4) + self.assertEqual(connection.recv_messages.low, 2) + async def test_write_limit(self): """write_limit parameter configures high-water mark of write buffer.""" connection = Connection(Protocol(self.LOCAL), write_limit=4096) diff --git a/tests/asyncio/test_messages.py b/tests/asyncio/test_messages.py index c8a2d7cd..615b1f3a 100644 --- a/tests/asyncio/test_messages.py +++ b/tests/asyncio/test_messages.py @@ -70,8 +70,7 @@ class AssemblerTests(unittest.IsolatedAsyncioTestCase): async def asyncSetUp(self): self.pause = unittest.mock.Mock() self.resume = unittest.mock.Mock() - self.assembler = Assembler(pause=self.pause, resume=self.resume) - self.assembler.set_limits(low=1, high=2) + self.assembler = Assembler(high=2, low=1, pause=self.pause, resume=self.resume) # Test get @@ -455,17 +454,25 @@ async def test_get_iter_fails_when_get_iter_is_running(self): await alist(self.assembler.get_iter()) self.assembler.close() # let task terminate - # Test getting and setting limits + # Test setting limits - async def test_get_limits(self): - """get_limits returns low and high water marks.""" - low, high = self.assembler.get_limits() - self.assertEqual(low, 1) - self.assertEqual(high, 2) + async def test_set_high_water_mark(self): + """high sets the high-water mark.""" + assembler = Assembler(high=10) + self.assertEqual(assembler.high, 10) - async def test_set_limits(self): - """set_limits changes low and high water marks.""" - self.assembler.set_limits(low=2, high=4) - low, high = self.assembler.get_limits() - self.assertEqual(low, 2) - self.assertEqual(high, 4) + async def test_set_high_and_low_water_mark(self): + """high sets the high-water mark.""" + assembler = Assembler(high=10, low=5) + self.assertEqual(assembler.high, 10) + self.assertEqual(assembler.low, 5) + + async def test_set_invalid_high_water_mark(self): + """high must be a non-negative integer.""" + with self.assertRaises(ValueError): + Assembler(high=-1) + + async def test_set_invalid_low_water_mark(self): + """low must be higher than high.""" + with self.assertRaises(ValueError): + Assembler(low=10, high=5)