Skip to content

Commit

Permalink
Add max_queue parameter to the new asyncio API.
Browse files Browse the repository at this point in the history
Also remove set_limits and get_limits because they aren't used
internally and won't be exposed as public APIs.
  • Loading branch information
aaugustin committed Aug 16, 2024
1 parent bb78a16 commit 92df747
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 57 deletions.
78 changes: 50 additions & 28 deletions docs/howto/upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -115,23 +115,6 @@ In other words, the following pattern isn't supported::
async for websocket in connect(...): # this doesn't work yet
...

Configuring buffers
...................

The new implementation doesn't provide a way to configure read buffers yet.

In practice, :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve`
don't accept the ``max_queue`` and ``read_limit`` arguments.

Here's the most likely outcome:

* ``max_queue`` will be implemented but its semantics will change from "maximum
number of messages" to "maximum number of frames", which makes a difference
when messages are fragmented.
* ``read_limit`` won't be implemented because the buffer that it configured was
removed from the new implementation. The queue that ``max_queue`` configures
is the only read buffer now.

.. _Update import paths:

Import paths
Expand Down Expand Up @@ -337,21 +320,60 @@ client. The list of subprotocols supported by the server was removed because
``select_subprotocols`` already knows which subprotocols it may select and under
which conditions.

Miscellaneous changes
.....................
Arguments of :func:`~asyncio.client.connect` and :func:`~asyncio.server.serve`
..............................................................................

``ws_handler`` → ``handler``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The first argument of :func:`~asyncio.server.serve` is called ``handler`` instead
of ``ws_handler``. It's usually passed as a positional argument, making this
change transparent. If you're passing it as a keyword argument, you must update
its name.
The first argument of :func:`~asyncio.server.serve` is now called ``handler``
instead of ``ws_handler``. It's usually passed as a positional argument, making
this change transparent. If you're passing it as a keyword argument, you must
update its name.

``create_protocol`` → ``create_connection``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The keyword argument of :func:`~asyncio.server.serve` for customizing the
creation of the connection object is called ``create_connection`` instead of
creation of the connection object is now called ``create_connection`` instead of
``create_protocol``. It must return a :class:`~asyncio.server.ServerConnection`
instead of a :class:`~server.WebSocketServerProtocol`. If you were customizing
connection objects, you should check the new implementation and possibly redo
your customization. Keep in mind that the changes to ``process_request`` and
``select_subprotocol`` remove most use cases for ``create_connection``.
instead of a :class:`~server.WebSocketServerProtocol`.

If you were customizing connection objects, you should check the new
implementation and possibly redo your customization. Keep in mind that the
changes to ``process_request`` and ``select_subprotocol`` remove most use cases
for ``create_connection``.

``max_queue``
~~~~~~~~~~~~~

The ``max_queue`` argument of :func:`~asyncio.client.connect` and
:func:`~asyncio.server.serve` has a new meaning but achieves a similar effect.

It is now the high-water mark of a buffer of incoming frames. It defaults to 16
frames. It used to be the size of a buffer of incoming messages that refilled as
soon as a message was read. It used to default to 32 messages.

This can make a difference when messages are fragmented in several frames. In
that case, you may want to increase ``max_queue``. If you're writing a high
performance server and you know that you're receiving fragmented messages,
probably you should adopt :meth:`~asyncio.connection.Connection.recv_streaming`
and optimize the performance of reads again. In all other cases, given how
uncommon fragmentation is, you shouldn't worry about this change.

``read_limit``
~~~~~~~~~~~~~~

The ``read_limit`` argument doesn't exist in the new implementation because it
doesn't buffer data received from the network in a
:class:`~asyncio.StreamReader`. With a better design, this buffer could be
removed.

The buffer of incoming frames configured by ``max_queue`` is the only read
buffer now.

``write_limit``
~~~~~~~~~~~~~~~

The ``write_limit`` argument of :func:`~asyncio.client.connect` and
:func:`~asyncio.server.serve` defaults to 32 KiB instead of 64 KiB.
8 changes: 8 additions & 0 deletions src/websockets/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ def __init__(
protocol: ClientProtocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int] = 16,
write_limit: int | tuple[int, int] = 2**15,
) -> None:
self.protocol: ClientProtocol
super().__init__(
protocol,
close_timeout=close_timeout,
max_queue=max_queue,
write_limit=write_limit,
)
self.response_rcvd: asyncio.Future[None] = self.loop.create_future()
Expand Down Expand Up @@ -148,6 +150,10 @@ class connect:
:obj:`None` disables the timeout.
max_size: Maximum size of incoming messages in bytes.
:obj:`None` disables the limit.
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -205,6 +211,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int] = 16,
write_limit: int | tuple[int, int] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down Expand Up @@ -250,6 +257,7 @@ def factory() -> ClientConnection:
connection = create_connection(
protocol,
close_timeout=close_timeout,
max_queue=max_queue,
write_limit=write_limit,
)
return connection
Expand Down
13 changes: 9 additions & 4 deletions src/websockets/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,14 @@ def __init__(
protocol: Protocol,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int] = 16,
write_limit: int | tuple[int, int] = 2**15,
) -> None:
self.protocol = protocol
self.close_timeout = close_timeout
if isinstance(max_queue, int):
max_queue = (max_queue, None)
self.max_queue = max_queue
if isinstance(write_limit, int):
write_limit = (write_limit, None)
self.write_limit = write_limit
Expand Down Expand Up @@ -807,12 +811,13 @@ def close_transport(self) -> None:

def connection_made(self, transport: asyncio.BaseTransport) -> None:
transport = cast(asyncio.Transport, transport)
transport.set_write_buffer_limits(*self.write_limit)
self.transport = transport
self.recv_messages = Assembler(
pause=self.transport.pause_reading,
resume=self.transport.resume_reading,
*self.max_queue,
pause=transport.pause_reading,
resume=transport.resume_reading,
)
transport.set_write_buffer_limits(*self.write_limit)
self.transport = transport

def connection_lost(self, exc: Exception | None) -> None:
self.protocol.receive_eof() # receive_eof is idempotent
Expand Down
21 changes: 10 additions & 11 deletions src/websockets/asyncio/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Assembler:
# coverage reports incorrectly: "line NN didn't jump to the function exit"
def __init__( # pragma: no cover
self,
high: int = 16,
low: int | None = None,
pause: Callable[[], Any] = lambda: None,
resume: Callable[[], Any] = lambda: None,
) -> None:
Expand All @@ -99,11 +101,16 @@ def __init__( # pragma: no cover
# call to Protocol.data_received() could produce thousands of frames,
# which must be buffered. Instead, we pause reading when the buffer goes
# above the high limit and we resume when it goes under the low limit.
self.high = 16
self.low = 4
self.paused = False
if low is None:
low = high // 4
if low < 0:
raise ValueError("low must be positive or equal to zero")
if high < low:
raise ValueError("high must be greater than or equal to low")
self.high, self.low = high, low
self.pause = pause
self.resume = resume
self.paused = False

# This flag prevents concurrent calls to get() by user code.
self.get_in_progress = False
Expand Down Expand Up @@ -254,14 +261,6 @@ def put(self, frame: Frame) -> None:
self.frames.put(frame)
self.maybe_pause()

def get_limits(self) -> tuple[int, int]:
"""Return low and high water marks for flow control."""
return self.low, self.high

def set_limits(self, low: int = 4, high: int = 16) -> None:
"""Configure low and high water marks for flow control."""
self.low, self.high = low, high

def maybe_pause(self) -> None:
"""Pause the writer if queue is above the high water mark."""
# Check for "> high" to support high = 0
Expand Down
8 changes: 8 additions & 0 deletions src/websockets/asyncio/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,14 @@ def __init__(
server: WebSocketServer,
*,
close_timeout: float | None = 10,
max_queue: int | tuple[int, int] = 16,
write_limit: int | tuple[int, int] = 2**15,
) -> None:
self.protocol: ServerProtocol
super().__init__(
protocol,
close_timeout=close_timeout,
max_queue=max_queue,
write_limit=write_limit,
)
self.server = server
Expand Down Expand Up @@ -576,6 +578,10 @@ def handler(websocket):
:obj:`None` disables the timeout.
max_size: Maximum size of incoming messages in bytes.
:obj:`None` disables the limit.
max_queue: High-water mark of the buffer where frames are received.
It defaults to 16 frames. The low-water mark defaults to ``max_queue
// 4``. You may pass a ``(high, low)`` tuple to set the high-water
and low-water marks.
write_limit: High-water mark of write buffer in bytes. It is passed to
:meth:`~asyncio.WriteTransport.set_write_buffer_limits`. It defaults
to 32 KiB. You may pass a ``(high, low)`` tuple to set the
Expand Down Expand Up @@ -643,6 +649,7 @@ def __init__(
close_timeout: float | None = 10,
# Limits
max_size: int | None = 2**20,
max_queue: int | tuple[int, int] = 16,
write_limit: int | tuple[int, int] = 2**15,
# Logging
logger: LoggerLike | None = None,
Expand Down Expand Up @@ -716,6 +723,7 @@ def protocol_select_subprotocol(
protocol,
self.server,
close_timeout=close_timeout,
max_queue=max_queue,
write_limit=write_limit,
)
return connection
Expand Down
15 changes: 15 additions & 0 deletions tests/asyncio/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,6 +874,21 @@ async def test_close_timeout(self):
connection = Connection(Protocol(self.LOCAL), close_timeout=42 * MS)
self.assertEqual(connection.close_timeout, 42 * MS)

async def test_max_queue(self):
"""max_queue parameter configures high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=4)
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, 4)

async def test_max_queue_tuple(self):
"""max_queue parameter configures high-water mark of frames buffer."""
connection = Connection(Protocol(self.LOCAL), max_queue=(4, 2))
transport = Mock()
connection.connection_made(transport)
self.assertEqual(connection.recv_messages.high, 4)
self.assertEqual(connection.recv_messages.low, 2)

async def test_write_limit(self):
"""write_limit parameter configures high-water mark of write buffer."""
connection = Connection(Protocol(self.LOCAL), write_limit=4096)
Expand Down
35 changes: 21 additions & 14 deletions tests/asyncio/test_messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ class AssemblerTests(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.pause = unittest.mock.Mock()
self.resume = unittest.mock.Mock()
self.assembler = Assembler(pause=self.pause, resume=self.resume)
self.assembler.set_limits(low=1, high=2)
self.assembler = Assembler(high=2, low=1, pause=self.pause, resume=self.resume)

# Test get

Expand Down Expand Up @@ -455,17 +454,25 @@ async def test_get_iter_fails_when_get_iter_is_running(self):
await alist(self.assembler.get_iter())
self.assembler.close() # let task terminate

# Test getting and setting limits
# Test setting limits

async def test_get_limits(self):
"""get_limits returns low and high water marks."""
low, high = self.assembler.get_limits()
self.assertEqual(low, 1)
self.assertEqual(high, 2)
async def test_set_high_water_mark(self):
"""high sets the high-water mark."""
assembler = Assembler(high=10)
self.assertEqual(assembler.high, 10)

async def test_set_limits(self):
"""set_limits changes low and high water marks."""
self.assembler.set_limits(low=2, high=4)
low, high = self.assembler.get_limits()
self.assertEqual(low, 2)
self.assertEqual(high, 4)
async def test_set_high_and_low_water_mark(self):
"""high sets the high-water mark."""
assembler = Assembler(high=10, low=5)
self.assertEqual(assembler.high, 10)
self.assertEqual(assembler.low, 5)

async def test_set_invalid_high_water_mark(self):
"""high must be a non-negative integer."""
with self.assertRaises(ValueError):
Assembler(high=-1)

async def test_set_invalid_low_water_mark(self):
"""low must be higher than high."""
with self.assertRaises(ValueError):
Assembler(low=10, high=5)

0 comments on commit 92df747

Please sign in to comment.