From 49d3f639de150378708cd399b288ca8c1c320e74 Mon Sep 17 00:00:00 2001 From: Tobias Ahrens Date: Wed, 30 Aug 2023 14:26:23 +0200 Subject: [PATCH] Add the subscribe mechanism to the capnp session The subscribe mechanism is based on the capnp stream model. This means that the client creates for each subscription a server that is passed to the kernel. The kernel than calls a specific function on that value for each value change. In this api the received value updates are stored in a async queue. To disconnect a connection the created server would need to be closed. In python this is not so easy due to the hidden reference counting. Therefore a subscription needs to be canceled manually through a disconnect call in the async queue. --- src/labone/core/session.py | 56 +++++- src/labone/core/subscription.py | 321 ++++++++++++++++++++++++++++++++ tests/core/test_session.py | 83 +++++++++ tests/core/test_subscription.py | 288 ++++++++++++++++++++++++++++ 4 files changed, 744 insertions(+), 4 deletions(-) create mode 100644 src/labone/core/subscription.py create mode 100644 tests/core/test_subscription.py diff --git a/src/labone/core/session.py b/src/labone/core/session.py index f2abb69..9eaaa18 100644 --- a/src/labone/core/session.py +++ b/src/labone/core/session.py @@ -15,6 +15,8 @@ create_session_client_stream, ) from labone.core.resources import session_protocol_capnp # type: ignore[attr-defined] +from labone.core.result import unwrap +from labone.core.subscription import DataQueue, StreamingHandle from labone.core.value import AnnotatedValue NodeType: TypeAlias = Literal[ @@ -72,7 +74,6 @@ class ListNodesInfoFlags(IntFlag): ALL: Return all matching nodes. SETTINGS_ONLY: Return only setting nodes. STREAMING_ONLY: Return only streaming nodes. - SUBSCRIBED_ONLY: Return only subscribed nodes. BASE_CHANNEL_ONLY: Return only one instance of a channel in case of multiple channels. GET_ONLY: Return only nodes which can be used with the get command. @@ -83,7 +84,6 @@ class ListNodesInfoFlags(IntFlag): ALL = 0 SETTINGS_ONLY = 1 << 3 STREAMING_ONLY = 1 << 4 - SUBSCRIBED_ONLY = 1 << 5 BASE_CHANNEL_ONLY = 1 << 6 GET_ONLY = 1 << 7 EXCLUDE_STREAMING = 1 << 20 @@ -105,7 +105,6 @@ class ListNodesFlags(IntFlag): are at the outermost level of the three. SETTINGS_ONLY: Return only setting nodes. STREAMING_ONLY: Return only streaming nodes. - SUBSCRIBED_ONLY: Return only subscribed nodes. BASE_CHANNEL_ONLY: Return only one instance of a channel in case of multiple channels. GET_ONLY: Return only nodes which can be used with the get command. @@ -119,7 +118,6 @@ class ListNodesFlags(IntFlag): LEAVES_ONLY = 1 << 2 SETTINGS_ONLY = 1 << 3 STREAMING_ONLY = 1 << 4 - SUBSCRIBED_ONLY = 1 << 5 BASE_CHANNEL_ONLY = 1 << 6 GET_ONLY = 1 << 7 EXCLUDE_STREAMING = 1 << 20 @@ -428,3 +426,53 @@ async def set_value(self, value: AnnotatedValue) -> AnnotatedValue: request.value = capnp_value.value response = await _send_and_wait_request(request) return AnnotatedValue.from_capnp(result.unwrap(response.result)) + + async def subscribe(self, path: str) -> DataQueue: + """Register a new subscription to a node. + + Registers a new subscription to a node on the kernel/server. All + updates to the node will be pushed to the returned data queue. + + Note: + An update is triggered by the device itself and does not + exclusively mean a change in the value of the node. For example + a set request from any client will also trigger an update event. + + It is safe to have multiple subscriptions to the same path. However + in most cases it is more efficient to fork (DataQueue.fork) an + existing DataQueue rather then registering a new subscription at the + server. This is because the kernel/server will send the update events + to every registered subscription independently, causing additional + network overhead. + + Args: + path: String representing the path of the node to be streamed. + Currently does not support wildcards in the path. + + Returns: + An instance of the DataQueue class. This async queue will receive + all update events for the subscribed node. + + Raises: + TypeError: If `path` is not a string + LabOneConnectionError: If there is a problem in the connection. + """ + streaming_handle = StreamingHandle() + subscription = session_protocol_capnp.Subscription( + streamingHandle=streaming_handle, + subscriberId=self._client_id.bytes, + ) + try: + subscription.path = path + except (AttributeError, TypeError, capnp.KjException) as error: + field_type = _request_field_type_description(subscription, "path") + msg = f"`path` attribute must be of type {field_type}." + raise TypeError(msg) from error + request = self._session.subscribe_request() + request.subscription = subscription + response = await _send_and_wait_request(request) + unwrap(response.result) # Result(Void, Error) + return DataQueue( + path=path, + register_function=streaming_handle.register_data_queue, + ) diff --git a/src/labone/core/subscription.py b/src/labone/core/subscription.py new file mode 100644 index 0000000..614a438 --- /dev/null +++ b/src/labone/core/subscription.py @@ -0,0 +1,321 @@ +"""This module contains the logic for the subscription mechanism. + +Subscriptions are implemented through the capnp stream mechanism. This handles +all the communication stuff, e.g. back pressure and flow control. The only thing +the client needs to provide is a `session_protocol_capnp.StreamingHandle.Server` +implementation. The `sendValues()` method of this class will be called by the +kernel through RPC whenever an update event for the subscribed node is +received. To make this as simple as possible the user only interacts with +a DataQueue object. A async queue with the addition of a connection guard. + +It is possible to create a fork of a data queue. A fork will receive the +values from the same underlying subscription as the original data queue. +However, the connection state of the fork is independent of the original. + +It is always recommended to disconnect the data queue when it is not needed +anymore. This will free up resources on the server side and prevent the server +from sending unnecessary data. +""" +from __future__ import annotations + +import asyncio +import logging +import typing as t +import weakref + +import capnp + +from labone.core import errors +from labone.core.resources import session_protocol_capnp # type: ignore[attr-defined] +from labone.core.value import AnnotatedValue + +logger = logging.getLogger(__name__) + + +class _ConnectionState: + """Connection state guard. + + Helper class that represents the connection state. The sole purpose of this + class is to have an expressive way of showing that a disconnect on a data + queue is final and can not be reverted. + """ + + def __init__(self) -> None: + self.__connected = True + + @property + def connected(self) -> bool: + """Connection state.""" + return self.__connected + + def disconnect(self) -> None: + """Disconnect the data queue. + + This operation is final and can not be reverted. + """ + self.__connected = False + + def __bool__(self) -> bool: + """All.""" + return self.connected + + +class DataQueue(asyncio.Queue): + """Queue for a single node subscription. + + The Queue holds all values updates received for the subscribed node. This + interface is identical to the asyncio.Queue interface, with a additional + connection guard. If the data queue is disconnected, the subscription will + eventually be canceled on the kernel side. In any case a disconnected data + queue will not receive any new values. + + Warning: + The disconnect will only be recognized by the kernel/server during the + next update event. Until that the server will not be aware of the + disconnect. (e.g. asking the kernel which nodes are subscribed might not + reflect the reality). + + Args: + path: Path of the subscribed node. + """ + + def __init__( + self, + *, + path: str, + register_function: t.Callable[[weakref.ReferenceType[DataQueue]], None], + ) -> None: + super().__init__() + self._path = path + self._connection_state = _ConnectionState() + self._register_function = register_function + register_function(weakref.ref(self)) + + def __repr__(self) -> str: + return str( + f"{self.__class__.__name__}(path={self._path!r}, " + f"maxsize={self.maxsize}, qsize={self.qsize()}, " + f"connected={self.connected})", + ) + + def fork(self) -> DataQueue: + """Create a fork of the subscription. + + The forked subscription will receive all updates that the original + subscription receives. Its connection state is independent of the original + subscription, meaning even if the original subscription is disconnected, + the forked subscription will still receive updates. + + Warning: + The forked subscription will not contain any values before the fork. + + Returns: + A new data queue to the same underlying subscription. + """ + if not self._connection_state: + msg = str( + "The data queue has been disconnected. A fork does not make " + "sense as it would never receive data.", + ) + raise errors.StreamingError(msg) + return DataQueue(path=self._path, register_function=self._register_function) + + def disconnect(self) -> None: + """Disconnect the data queue. + + This operation is final and can not be reverted. A disconnected queue + will not receive any new values. + + Important: + It is always recommended to disconnect the data queue when it is not + needed anymore. This will free up resources on the server side and + prevent the server from sending unnecessary data. + """ + self._connection_state.disconnect() + + def put_nowait(self, item: AnnotatedValue) -> None: + """Put an item into the queue without blocking. + + Args: + item: The item to the put in the queue. + + Raises: + StreamingError: If the data queue has been disconnected. + """ + if not self._connection_state: + msg = "The data queue has been disconnected." + raise errors.StreamingError(msg) + return super().put_nowait(item) + + async def get(self) -> AnnotatedValue: + """Remove and return an item from the queue. + + Returns: + The first item in the queue. If the queue is empty, wait until an + item is available. + + Raises: + EmptyDisconnectedDataQueueError: If the data queue if empty AND + disconnected. + """ + if self.empty() and not self._connection_state: + msg = str( + "The data queue is empty and it has been disconnected, " + "therefore it will not receive data anymore.", + ) + raise errors.EmptyDisconnectedDataQueueError( + msg, + ) + return await super().get() + + @property + def connected(self) -> bool: + """Connection state.""" + return bool(self._connection_state) + + @property + def path(self) -> str: + """Path of the subscribed node.""" + return self._path + + @property + def maxsize(self) -> int: + """Number of items allowed in the queue.""" + return self._maxsize + + @maxsize.setter + def maxsize(self, maxsize: int) -> None: + """Number of items allowed in the queue.""" + if not self._connection_state: + msg = str( + "Has been disconnected, therefore it will not receive data anymore." + "Changing the maxsize will not have any effect.", + ) + raise errors.StreamingError(msg) + if self.qsize() >= maxsize: + msg = str( + "The new maxsize is smaller or equal than the current qsize. " + "This can result in potential data loss and is forbidden.", + ) + raise errors.StreamingError(msg) + self._maxsize = maxsize + + +class StreamingHandle(session_protocol_capnp.StreamingHandle.Server): + """Streaming Handle server implementation. + + This class is passed to the kernel when a subscription is requested. + Every update event to the subscribed node will result in the kernel + calling the sendValues method. + + Warning: + This function is owned by capnp and should not be called or referenced + by the user. + + The StreamingHandle holds only a weak reference to the data queue to which + the values will be added. This is done to avoid the subscription to stay + alive even if no one hold a reference to the data queue. + + Args: + data_queue: Weak reference to the data queue to which the values + will be added. + """ + + def __init__(self) -> None: + self._data_queues: list[weakref.ReferenceType[DataQueue]] = [] + + def register_data_queue(self, data_queue: weakref.ReferenceType[DataQueue]) -> None: + """Register a new data queue. + + Args: + data_queue: Weak reference to the data queue to which the values + will be added. + """ + self._data_queues.append(data_queue) + + def _add_to_data_queue( + self, + data_queue: DataQueue | None, + value: AnnotatedValue, + ) -> bool: + """Add a value to the data queue. + + The value is added to the queue non blocking, meaning that if the queue + is full, an error is raised. + + Args: + data_queue: The data queue to which the value will be added. + value: The value to add to the data queue. + + Raises: + StreamingError: If the data queue is full or disconnected. + AttributeError: If the data queue has been garbage collected. + """ + if data_queue is None or data_queue.full(): + logger.warning( + "Data queue %s is full. No more data will be pushed to the data queue.", + hex(id(data_queue)), + ) + data_queue.disconnect() # type: ignore[union-attr] # supposed to throw + return False + try: + data_queue.put_nowait(value) + except errors.StreamingError: + logger.debug( + "Data queue %s has disconnected. Removing from list of data queues.", + hex(id(data_queue)), + ) + return False + return True + + def _distribute_to_data_queues( + self, + value: session_protocol_capnp.AnnotatedValue, + ) -> None: + """Add a value to all data queues. + + The value is added to the queue non blocking, meaning that if the queue + is full, an error is raised. + + Args: + value: The value to add to the data queue. + + Raises: + capnp.KjException: If no data queues are registered any more and + the subscription should be removed. + """ + parsed_value = AnnotatedValue.from_capnp(value) + self._data_queues = [ + data_queue + for data_queue in self._data_queues + if self._add_to_data_queue(data_queue(), parsed_value) + ] + if not self._data_queues: + # TODO(tobiasa): The kernel expects a KjException of type # noqa: FIX002 + # DISCONNECTED for a clean removal of the subscription. However, + # pycapnp does currently not support this. + # https://github.com/capnproto/pycapnp/issues/324 + msg = "DISCONNECTED" + raise capnp.KjException( + type=capnp.KjException.Type.DISCONNECTED, + message=msg, + ) + + async def sendValues( # noqa: N802 (function name is enforced through the schema) + self, + values: list[session_protocol_capnp.AnnotatedValue], + **_, + ) -> None: + """Capnp Interface callback. + + This function is called by the kernel (through RPC) when an update + event for the subscribed node is received. + + Args: + values: List of update events for the subscribed node. + + Raises: + capnp.KjException: If no data queues are registered any more and + the subscription should be removed. + """ + list(map(self._distribute_to_data_queues, values)) diff --git a/tests/core/test_session.py b/tests/core/test_session.py index 10eb6df..d2612cc 100644 --- a/tests/core/test_session.py +++ b/tests/core/test_session.py @@ -42,6 +42,9 @@ async def listNodesJson(self, _context, **_): # noqa: N802 async def setValue(self, _context, **_): # noqa: N802 return self._mock.setValue(_context.params, _context.results) + async def subscribe(self, _context, **_): + return self._mock.subscribe(_context.params, _context.results) + @pytest.fixture() async def session_server() -> tuple[Session, MagicMock]: @@ -334,3 +337,83 @@ def mock_method(_, results): server.setValue.side_effect = mock_method response = await session.set_value(value) assert response == value + + +class TestSessionSubscribe: + class SubscriptionServer: + def __init__(self, error=None): + self.server_handle = None + self.path = None + self.client_id = None + self.error = error + + def subscribe(self, params, results): + self.path = params.subscription.path + self.client_id = params.subscription.subscriberId + self.server_handle = params.subscription.streamingHandle + if self.error: + results.result.from_dict({"err": {"code": 1, "message": self.error}}) + else: + results.result.from_dict({"ok": {}}) + + @utils.ensure_event_loop + async def test_subscribe_meta_data(self, session_server): + session, server = await session_server + path = "/dev1234/demods/0/sample" + subscription_server = self.SubscriptionServer() + server.subscribe.side_effect = subscription_server.subscribe + queue = await session.subscribe(path) + assert subscription_server.path == path + assert subscription_server.client_id == session._client_id.bytes + assert queue.qsize() == 0 + assert queue.path == path + + @utils.ensure_event_loop + async def test_subscribe_error(self, session_server): + session, server = await session_server + path = "/dev1234/demods/0/sample" + subscription_server = self.SubscriptionServer(error="test error") + server.subscribe.side_effect = subscription_server.subscribe + + with pytest.raises(errors.LabOneCoreError, match="test error"): + await session.subscribe(path) + + @utils.ensure_event_loop + async def test_subscribe_invalid_argument_dict(self, session_server): + session, _ = await session_server + with pytest.raises(TypeError): + await session.subscribe({"I": "am", "not": "a", "path": 1}) + + @utils.ensure_event_loop + async def test_subscribe_invalid_argument_int(self, session_server): + session, _ = await session_server + with pytest.raises(TypeError): + await session.subscribe(2) + + @pytest.mark.parametrize("num_values", range(0, 20, 4)) + @utils.ensure_event_loop + async def test_subscribe_send_value_ok(self, session_server, num_values): + session, server = await session_server + path = "/dev1234/demods/0/sample" + subscription_server = self.SubscriptionServer() + server.subscribe.side_effect = subscription_server.subscribe + queue = await session.subscribe(path) + + values = [] + for i in range(num_values): + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = path + value.value.int64 = i + values.append(value) + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = "dummy" + value.value.int64 = 1 + await subscription_server.server_handle.sendValues(values) + assert queue.qsize() == num_values + for i in range(num_values): + assert queue.get_nowait() == AnnotatedValue( + value=i, + path=path, + timestamp=0, + extra_header=None, + ) diff --git a/tests/core/test_subscription.py b/tests/core/test_subscription.py new file mode 100644 index 0000000..8aa29bb --- /dev/null +++ b/tests/core/test_subscription.py @@ -0,0 +1,288 @@ +"""Tests for the `labone.core.subscription` module.""" +import asyncio + +import capnp +import pytest +from labone.core import errors +from labone.core.resources import session_protocol_capnp +from labone.core.subscription import DataQueue, StreamingHandle +from labone.core.value import AnnotatedValue + + +class FakeSubscription: + def __init__(self): + self.data_queues = [] + + def register_data_queue(self, data_queue) -> None: + self.data_queues.append(data_queue) + + +def test_data_queue_path(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert queue.path == "dummy" + + +def test_data_queue_maxsize(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert queue.maxsize == 0 + + +def test_data_queue_maxsize_to_low(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.put_nowait("test") + queue.maxsize = 2 + with pytest.raises(errors.StreamingError): + queue.maxsize = 1 + + +def test_data_queue_maxsize_disconnected(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.disconnect() + with pytest.raises(errors.StreamingError): + queue.maxsize = 42 + + +def test_data_queue_repr_idle(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert repr(queue) == "DataQueue(path='dummy', maxsize=0, qsize=0, connected=True)" + + +def test_data_queue_repr(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.maxsize = 42 + queue.put_nowait("test") + queue.put_nowait("test") + queue.disconnect() + assert ( + repr(queue) == "DataQueue(path='dummy', maxsize=42, qsize=2, connected=False)" + ) + + +def test_data_queue_disconnect(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert queue.connected + queue.disconnect() + assert not queue.connected + + +def test_data_queue_fork(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert len(subscription.data_queues) == 1 + forked_queue = queue.fork() + assert len(subscription.data_queues) == 2 + assert forked_queue.path == queue.path + assert forked_queue.connected + + +def test_data_queue_fork_disconnected(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.disconnect() + with pytest.raises(errors.StreamingError): + queue.fork() + + +def test_data_queue_put_nowait(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + assert queue.qsize() == 0 + queue.put_nowait("test") + assert queue.qsize() == 1 + assert queue.get_nowait() == "test" + assert queue.qsize() == 0 + + +def test_data_queue_put_nowait_disconnected(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.disconnect() + with pytest.raises(errors.StreamingError): + queue.put_nowait("test") + + +@pytest.mark.asyncio() +async def test_data_queue_get(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.put_nowait("test") + assert await queue.get() == "test" + + +@pytest.mark.asyncio() +async def test_data_queue_get_timeout(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(queue.get(), 0.01) + + +@pytest.mark.asyncio() +async def test_data_queue_get_disconnected_ok(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.put_nowait("test") + queue.disconnect() + assert await queue.get() == "test" + + +@pytest.mark.asyncio() +async def test_data_queue_get_disconnected_empty(): + subscription = FakeSubscription() + queue = DataQueue(path="dummy", register_function=subscription.register_data_queue) + queue.disconnect() + with pytest.raises(errors.EmptyDisconnectedDataQueueError): + await queue.get() + + +def test_streaming_handle_register(): + streaming_handle = StreamingHandle() + DataQueue(path="dummy", register_function=streaming_handle.register_data_queue) + assert len(streaming_handle._data_queues) == 1 + + +@pytest.mark.parametrize("num_values", range(0, 20, 4)) +@pytest.mark.parametrize("num_queues", [1, 2, 6]) +@pytest.mark.asyncio() +async def test_streaming_handle_update_event(num_values, num_queues): + streaming_handle = StreamingHandle() + queues = [] + for _ in range(num_queues): + queue = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queues.append(queue) + values = [] + for i in range(num_values): + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = "dummy" + value.value.int64 = i + values.append(value) + await streaming_handle.sendValues(values) + for queue in queues: + assert queue.qsize() == num_values + for i in range(num_values): + assert queue.get_nowait() == AnnotatedValue( + value=i, + path="dummy", + timestamp=0, + extra_header=None, + ) + + +@pytest.mark.asyncio() +async def test_streaming_handle_update_empty(): + streaming_handle = StreamingHandle() + values = [] + value = session_protocol_capnp.AnnotatedValue.new_message() + values.append(value) + with pytest.raises(capnp.KjException): + await streaming_handle.sendValues(values) + + +@pytest.mark.asyncio() +async def test_streaming_handle_update_disconnect(): + streaming_handle = StreamingHandle() + queue = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue.disconnect() + values = [] + value = session_protocol_capnp.AnnotatedValue.new_message() + values.append(value) + with pytest.raises(capnp.KjException): + await streaming_handle.sendValues(values) + + +@pytest.mark.asyncio() +async def test_streaming_handle_update_partially_disconnected(): + streaming_handle = StreamingHandle() + queue_0 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_1 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_0.disconnect() + values = [] + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = "dummy" + value.value.int64 = 1 + values.append(value) + await streaming_handle.sendValues(values) + assert queue_0.qsize() == 0 + assert queue_1.qsize() == 1 + assert queue_1.get_nowait() == AnnotatedValue( + value=1, + path="dummy", + timestamp=0, + extra_header=None, + ) + queue_1.disconnect() + with pytest.raises(capnp.KjException): + await streaming_handle.sendValues(values) + + +@pytest.mark.asyncio() +async def test_streaming_handle_update_queue_full_single(): + streaming_handle = StreamingHandle() + queue_0 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_1 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_0.maxsize = 1 + queue_0.put_nowait("dummy") + assert queue_0.qsize() == 1 + values = [] + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = "dummy" + value.value.int64 = 1 + values.append(value) + await streaming_handle.sendValues(values) + assert queue_0.qsize() == 1 + assert queue_1.qsize() == 1 + assert queue_1.get_nowait() == AnnotatedValue( + value=1, + path="dummy", + timestamp=0, + extra_header=None, + ) + + +@pytest.mark.asyncio() +async def test_streaming_handle_update_queue_full_multiple(): + streaming_handle = StreamingHandle() + queue_0 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_1 = DataQueue( + path="dummy", + register_function=streaming_handle.register_data_queue, + ) + queue_0.maxsize = 1 + queue_0.put_nowait("dummy") + queue_1.maxsize = 1 + queue_1.put_nowait("dummy") + values = [] + value = session_protocol_capnp.AnnotatedValue.new_message() + value.metadata.path = "dummy" + value.value.int64 = 1 + values.append(value) + with pytest.raises(capnp.KjException): + await streaming_handle.sendValues(values)