From 80543b27b176d5d2b3a37036fa10aeede950af40 Mon Sep 17 00:00:00 2001 From: Yuki Schlarb Date: Sat, 5 Nov 2022 01:01:07 +0100 Subject: [PATCH 1/6] =?UTF-8?q?Disable=20broken=20towncrier=20check=20(exp?= =?UTF-8?q?ects=20main=20branch=20to=20be=20called=20=E2=80=9Cmaster?= =?UTF-8?q?=E2=80=9D)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 836b3ae..46d1faa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,7 +14,6 @@ jobs: matrix: task: - 'flake8' - - 'towncrier-check' - 'package' steps: - uses: actions/checkout@v1 From 74a3ec471428ac9d162d23ca35413c119a59bff8 Mon Sep 17 00:00:00 2001 From: Yuki Schlarb Date: Thu, 13 Oct 2022 01:54:26 +0200 Subject: [PATCH 2/6] Add support for driving `BufferedProtocol` instances using `sock_recv_into` Support for this feature is emulated for the `Pipe{Read,Write}Transport` however, since `glib.IOChannel` does not appear to expose anything similar to the `recvinto` syscall in its APIs. The emulation at least makes it possible to use `BufferedProtocol` instances without causing crashes due to API differences versus the classic `Protocol` interface at least. --- setup.cfg | 2 +- src/gbulb/glib_events.py | 8 ++++ src/gbulb/transports.py | 94 +++++++++++++++++++++++++++++----------- 3 files changed, 77 insertions(+), 27 deletions(-) diff --git a/setup.cfg b/setup.cfg index 3420869..6fae2c0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -68,7 +68,7 @@ exclude=\ .tox/* max-complexity = 10 max-line-length = 119 -ignore = E121,E123,E126,E226,E24,E704,W503,W504,C901 +ignore = E121,E123,E126,E226,E24,E265,E704,W503,W504,C901 [isort] skip_glob = diff --git a/src/gbulb/glib_events.py b/src/gbulb/glib_events.py index 057eab5..400ae1e 100644 --- a/src/gbulb/glib_events.py +++ b/src/gbulb/glib_events.py @@ -647,6 +647,14 @@ def read_func(channel, nbytes): return self._channel_read(channel, nbytes, read_func) + def sock_recv_into(self, sock, buf, flags=0): + channel = self._channel_from_socket(sock) + + def read_func(channel, nbytes): + return sock.recv_into(buf, flags) + + return self._channel_read(channel, len(buf), read_func) + def sock_recvfrom(self, sock, nbytes, flags=0): channel = self._channel_from_socket(sock) diff --git a/src/gbulb/transports.py b/src/gbulb/transports.py index f2169e0..e0a7598 100644 --- a/src/gbulb/transports.py +++ b/src/gbulb/transports.py @@ -1,4 +1,6 @@ +import asyncio import collections +import io import socket import subprocess from asyncio import base_subprocess, transports, CancelledError, InvalidStateError @@ -13,12 +15,12 @@ def __init__(self, loop, sock, protocol, waiter=None, extra=None, server=None): self._loop = loop self._sock = sock - self._protocol = protocol self._server = server self._closing = False self._closing_delayed = False self._closed = False self._cancelable = set() + self.set_protocol(protocol) if sock is not None: self._loop._transports[sock.fileno()] = self @@ -83,15 +85,23 @@ def _force_close_async(self, exc): class ReadTransport(BaseTransport, transports.ReadTransport): - max_size = 256 * 1024 + max_size = io.DEFAULT_BUFFER_SIZE def __init__(self, *args, **kwargs): - BaseTransport.__init__(self, *args, **kwargs) - self._paused = False self._read_fut = None + self._read_buffer = None + self._alloc_read_buffers = False + + BaseTransport.__init__(self, *args, **kwargs) + self._loop.call_soon(self._loop_reading) + def set_protocol(self, protocol): + if hasattr(asyncio, "BufferedProtocol"): # Python 3.7+ + self._alloc_read_buffers = isinstance(protocol, asyncio.BufferedProtocol) + super().set_protocol(protocol) + def pause_reading(self): if self._closing: raise RuntimeError("Cannot pause_reading() when closing") @@ -120,12 +130,23 @@ def close(self): super().close() def _create_read_future(self, size): - return self._loop.sock_recv(self._sock, size) + if self._alloc_read_buffers: + self._read_buffer = self._protocol.get_buffer(size) + return self._loop.sock_recv_into(self._sock, self._read_buffer) + else: + return self._loop.sock_recv(self._sock, size) def _submit_read_data(self, data): - if data: - self._protocol.data_received(data) + if data != b"" and data != 0: + if self._alloc_read_buffers: + assert isinstance(data, int) # Actually `nbytes` + self._protocol.buffer_updated(data) + self._read_buffer = None + else: + assert isinstance(data, bytes) + self._protocol.data_received(data) else: + self._read_buffer = None keep_open = self._protocol.eof_received() if not keep_open: self.close() @@ -133,9 +154,9 @@ def _submit_read_data(self, data): def _loop_reading(self, fut=None): if self._paused: return - data = None try: + data = None if fut is not None: assert self._read_fut is fut or ( self._read_fut is None and self._closing @@ -150,7 +171,10 @@ def _loop_reading(self, fut=None): data = None return - if data == b"": + if data is not None: + self._submit_read_data(data) + + if data == b"" or data == 0: # No need to reschedule on end-of-file return @@ -172,9 +196,6 @@ def _loop_reading(self, fut=None): self._cancelable.add(self._read_fut) else: self._read_fut.add_done_callback(self._loop_reading) - finally: - if data is not None: - self._submit_read_data(data) class WriteTransport(BaseTransport, transports._FlowControlMixin): @@ -184,8 +205,8 @@ def __init__(self, loop, *args, **kwargs): transports._FlowControlMixin.__init__(self, None, loop) BaseTransport.__init__(self, loop, *args, **kwargs) - self._buffer = self._buffer_factory() - self._buffer_empty_callbacks = set() + self._write_buffer = self._buffer_factory() + self._drained_callbacks = set() self._write_fut = None self._eof_written = False @@ -196,7 +217,7 @@ def can_write_eof(self): return True def get_write_buffer_size(self): - return len(self._buffer) + return len(self._write_buffer) def _close_write(self): if self._write_fut is not None: @@ -206,7 +227,7 @@ def transport_write_done_callback(): self._closing_delayed = False self.close() - self._buffer_empty_callbacks.add(transport_write_done_callback) + self._drained_callbacks.add(transport_write_done_callback) def close(self): self._close_write() @@ -231,12 +252,12 @@ def _create_write_future(self, data): return self._loop.sock_sendall(self._sock, data) def _buffer_add_data(self, data): - self._buffer.extend(data) + self._write_buffer.extend(data) def _buffer_pop_data(self): - if len(self._buffer) > 0: - data = self._buffer - self._buffer = bytearray() + if len(self._write_buffer) > 0: + data = self._write_buffer + self._write_buffer = self._buffer_factory() return data else: return None @@ -257,10 +278,10 @@ def _loop_writing(self, fut=None, data=None): data = self._buffer_pop_data() if not data: - if len(self._buffer_empty_callbacks) > 0: - for callback in self._buffer_empty_callbacks: + if len(self._drained_callbacks) > 0: + for callback in self._drained_callbacks: callback() - self._buffer_empty_callbacks.clear() + self._drained_callbacks.clear() self._maybe_resume_protocol() else: @@ -351,11 +372,11 @@ def _create_write_future(self, args): def _buffer_add_data(self, args): (data, addr) = args - self._buffer.append((bytes(data), addr)) + self._write_buffer.append((bytes(data), addr)) def _buffer_pop_data(self): - if len(self._buffer) > 0: - return self._buffer.popleft() + if len(self._write_buffer) > 0: + return self._write_buffer.popleft() else: return None @@ -387,8 +408,29 @@ def __init__(self, loop, channel, protocol, waiter, extra): super().__init__(loop, None, protocol, waiter, extra) def _create_read_future(self, size): + if self._alloc_read_buffers: + self._read_buffer = self._protocol.get_buffer(size) + size = len(self._read_buffer) return self._loop._channel_read(self._channel, size) + def _submit_read_data(self, data): + assert isinstance(data, bytes) + if data != b"" and data != 0: + if self._alloc_read_buffers: + #FIXME: GLib does not actually expose the equivalent to + # `recv_into` in its channel interface, so we have to + # add an extra copy here rather than avoiding one + self._read_buffer[0:len(data)] = data + self._protocol.buffer_updated(len(data)) + self._read_buffer = None + else: + self._protocol.data_received(data) + else: + self._read_buffer = None + keep_open = self._protocol.eof_received() + if not keep_open: + self.close() + def _force_close_async(self, exc): try: super()._force_close_async(exc) From 140a81e7dfaa9143e6200eda3862eccbd08b15f5 Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Fri, 2 Dec 2022 14:57:59 +0800 Subject: [PATCH 3/6] Added changenote. --- changes/60.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/60.feature.rst diff --git a/changes/60.feature.rst b/changes/60.feature.rst new file mode 100644 index 0000000..98bbf9c --- /dev/null +++ b/changes/60.feature.rst @@ -0,0 +1 @@ +Support for driving ``BufferedProtocol`` instances using ``sock_recv_into`` was added. From ce7837e83eb6f906a8eefd9c3578e4fec447bb9d Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Sat, 4 May 2024 12:37:55 +0800 Subject: [PATCH 4/6] Modify the changenote to reference the underlying issue. --- changes/{60.feature.rst => 58.feature.rst} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename changes/{60.feature.rst => 58.feature.rst} (100%) diff --git a/changes/60.feature.rst b/changes/58.feature.rst similarity index 100% rename from changes/60.feature.rst rename to changes/58.feature.rst From 5ce27ae46510c86f3cbb07d168cf320586ee35d5 Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Sat, 4 May 2024 12:38:42 +0800 Subject: [PATCH 5/6] Add protection against closed sockets. --- src/gbulb/glib_events.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/src/gbulb/glib_events.py b/src/gbulb/glib_events.py index b40386c..0215a37 100644 --- a/src/gbulb/glib_events.py +++ b/src/gbulb/glib_events.py @@ -659,7 +659,8 @@ def sock_recv(self, sock, nbytes, flags=0): channel = self._channel_from_socket(sock) def read_func(channel, nbytes): - return sock.recv(nbytes, flags) + if not sock._closed: + return sock.recv(nbytes, flags) return self._channel_read(channel, nbytes, read_func) @@ -667,7 +668,8 @@ def sock_recv_into(self, sock, buf, flags=0): channel = self._channel_from_socket(sock) def read_func(channel, nbytes): - return sock.recv_into(buf, flags) + if not sock._closed: + return sock.recv_into(buf, flags) return self._channel_read(channel, len(buf), read_func) @@ -675,7 +677,8 @@ def sock_recvfrom(self, sock, nbytes, flags=0): channel = self._channel_from_socket(sock) def read_func(channel, nbytes): - return sock.recvfrom(nbytes, flags) + if not sock._closed: + return sock.recvfrom(nbytes, flags) return self._channel_read(channel, nbytes, read_func) @@ -683,7 +686,8 @@ def sock_sendall(self, sock, buf, flags=0): channel = self._channel_from_socket(sock) def write_func(channel, buf): - return sock.send(buf, flags) + if not sock._closed: + return sock.send(buf, flags) return self._channel_write(channel, buf, write_func) @@ -691,7 +695,8 @@ def sock_sendallto(self, sock, buf, addr, flags=0): channel = self._channel_from_socket(sock) def write_func(channel, buf): - return sock.sendto(buf, flags, addr) + if not sock._closed: + return sock.sendto(buf, flags, addr) return self._channel_write(channel, buf, write_func) From 5f81bc2cc9895b5db37225e94d23317903a91878 Mon Sep 17 00:00:00 2001 From: Russell Keith-Magee Date: Sat, 4 May 2024 12:40:15 +0800 Subject: [PATCH 6/6] Removed a Python 3.7 compatibility shim. --- src/gbulb/transports.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/gbulb/transports.py b/src/gbulb/transports.py index 1d31d78..ac5ee9b 100644 --- a/src/gbulb/transports.py +++ b/src/gbulb/transports.py @@ -105,8 +105,7 @@ def __init__(self, *args, **kwargs): self._loop.call_soon(self._loop_reading) def set_protocol(self, protocol): - if hasattr(asyncio, "BufferedProtocol"): # Python 3.7+ - self._alloc_read_buffers = isinstance(protocol, asyncio.BufferedProtocol) + self._alloc_read_buffers = isinstance(protocol, asyncio.BufferedProtocol) super().set_protocol(protocol) def pause_reading(self):