diff --git a/CHANGELOG.md b/CHANGELOG.md index 00b8608..74fd43d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Labone Python API Changelog +## Version 3.2.1 +* Fix error message in data server log if a subscription is cancelled gracefully. +* Adapt mock data server to hand unsubscribe events correctly. + ## Version 3.2.0 * `subscribe` accepts keyword arguments, which are forwarded to the data-server. This allows to configure the subscription to the data-server. diff --git a/src/labone/core/subscription.py b/src/labone/core/subscription.py index 990a966..484d4f3 100644 --- a/src/labone/core/subscription.py +++ b/src/labone/core/subscription.py @@ -485,8 +485,7 @@ def _distribute_to_data_queues( value: The value to add to the data queue. Raises: - capnp.KjException: If no data queues are registered any more and - the subscription should be removed. + ValueError: If the value could not be parsed. """ try: parsed_value = self._parser_callback(AnnotatedValue.from_capnp(value)) @@ -510,10 +509,6 @@ def _distribute_to_data_queues( raise self.distribute_to_data_queues(parsed_value) - if not self._data_queues: - msg = "No data queues are registered anymore. Disconnecting subscription." - raise errors.StreamingError(msg) - async def capnp_callback( self, interface: int, # noqa: ARG002 @@ -531,13 +526,13 @@ async def capnp_callback( method_index: The method index of the capnp schema. call_input: The input data of the capnp schema. fulfiller: The fulfiller to fulfill the promise. - - Raises: - capnp.KjException: If no data queues are registered any more and - the subscription should be removed. """ try: list(map(self._distribute_to_data_queues, call_input.values)) + if len(self._data_queues) == 0: + msg = "No queues registered anymore" + fulfiller.reject(zhinst.comms.Fulfiller.DISCONNECTED, msg) + return fulfiller.fulfill() except Exception as err: # noqa: BLE001 fulfiller.reject(zhinst.comms.Fulfiller.FAILED, err.args[0]) diff --git a/src/labone/mock/automatic_server.py b/src/labone/mock/automatic_server.py index bfdf9e1..a69f89f 100644 --- a/src/labone/mock/automatic_server.py +++ b/src/labone/mock/automatic_server.py @@ -1,13 +1,13 @@ -"""Partially predifined behaviour for HPK mock. +"""Partially predefined behavior for HPK mock. This class provides basic Hpk mock functionality by taking over some usually desired tasks. With that in place, the user may inherit from this class in order to further specify behavior, without having to start from scratch. -Even if some of the predefined behaviour is not desired, the implementation +Even if some of the predefined behavior is not desired, the implementation can give some reference on how an individual mock server can be implemented. -Already predefined behaviour: +Already predefined behavior: * Simulating state for get/set: A dictionary is used to store the state of the mock server. @@ -22,7 +22,7 @@ The subscriptions are stored and on every change, the new value is passed into the queues. * Adding chronological timestamps to responses: - The server answers need timestamps to the responsis in any case. + The server answers need timestamps to the responses in any case. By using the monotonic clock, the timestamps are added automatically. """ @@ -95,7 +95,7 @@ def __init__( self._common_prefix = None def get_timestamp(self) -> int: - """Create a realisitc timestamp. + """Create a realistic timestamp. Call this function to obtain a timestamp for some response. As a internal clock is used, subsequent calls will return @@ -107,15 +107,15 @@ def get_timestamp(self) -> int: return time.monotonic_ns() def _sanitize_path(self, path: LabOneNodePath) -> LabOneNodePath: - """Sanatize the path. + """Sanitize the path. Removes trailing slashes and replaces empty path with root path. Args: - path: Path to sanatize. + path: Path to sanitize. Returns: - Sanatized path. + Sanitized path. """ if self._common_prefix and not path.startswith("/"): return f"{self._common_prefix}/{path}" @@ -127,19 +127,19 @@ async def list_nodes_info( *, flags: ListNodesInfoFlags | int = ListNodesInfoFlags.ALL, # noqa: ARG002 ) -> dict[LabOneNodePath, NodeInfoType]: - """Predefined behaviour for list_nodes_info. + """Predefined behavior for list_nodes_info. Uses knowledge of the tree structure to answer. Warning: Flags will be ignored in this implementation. (TODO) - For now, the behaviour is equivalent to + For now, the behavior is equivalent to ListNodesFlags.RECURSIVE | ListNodesFlags.ABSOLUTE Args: path: Path to narrow down which nodes should be listed. Omitting the path will list all nodes by default. - flags: Flags to control the behaviour of the list_nodes_info method. + flags: Flags to control the behavior of the list_nodes_info method. Returns: Dictionary of paths to node info. @@ -154,19 +154,19 @@ async def list_nodes( *, flags: ListNodesFlags | int = ListNodesFlags.ABSOLUTE, # noqa: ARG002 ) -> list[LabOneNodePath]: - """Predefined behaviour for list_nodes. + """Predefined behavior for list_nodes. Uses knowledge of the tree structure to answer. Warning: Flags will be ignored in this implementation. (TODO) - For now, the behaviour is equivalent to + For now, the behavior is equivalent to ListNodesFlags.RECURSIVE | ListNodesFlags.ABSOLUTE Args: path: Path to narrow down which nodes should be listed. Omitting the path will list all nodes by default. - flags: Flags to control the behaviour of the list_nodes method. + flags: Flags to control the behavior of the list_nodes method. Returns: List of paths. @@ -183,7 +183,7 @@ async def list_nodes( ] async def get(self, path: LabOneNodePath) -> AnnotatedValue: - """Predefined behaviour for get. + """Predefined behavior for get. Look up the path in the internal dictionary. @@ -212,20 +212,44 @@ async def get_with_expression( | ListNodesFlags.EXCLUDE_STREAMING | ListNodesFlags.GET_ONLY, ) -> list[AnnotatedValue]: - """Predefined behaviour for get_with_expression. + """Predefined behavior for get_with_expression. Find all nodes associated with the path expression and call get for each of them. Args: path_expression: Path expression to get. - flags: Flags to control the behaviour of the get_with_expression method. + flags: Flags to control the behavior of the get_with_expression method. Returns: List of values, corresponding to nodes of the path expression. """ return [await self.get(p) for p in await self.list_nodes(path=path_expression)] + async def _update_subscriptions(self, value: AnnotatedValue) -> None: + """Update all subscriptions with the new value. + + Args: + value: New value. + """ + if self.memory[value.path].streaming_handles: + # sending updated value to subscriptions + result = await asyncio.gather( + *[ + handle.send_value(value) + for handle in self.memory[value.path].streaming_handles + ], + ) + # Remove all disconnected subscriptions + self.memory[value.path].streaming_handles = [ + handle + for handle, success in zip( + self.memory[value.path].streaming_handles, + result, + ) + if success + ] + @t.overload async def set(self, value: AnnotatedValue) -> AnnotatedValue: ... @@ -241,7 +265,7 @@ async def set( value: AnnotatedValue | Value, path: str = "", ) -> AnnotatedValue: - """Predefined behaviour for set. + """Predefined behavior for set. Updates the internal dictionary. A set command is considered as an update and will be distributed to all registered subscription handlers. @@ -271,14 +295,7 @@ async def set( path=path, timestamp=self.get_timestamp(), ) - if self.memory[path].streaming_handles: - # sending updated value to subscriptions - await asyncio.gather( - *[ - handle.send_value(response) - for handle in self.memory[path].streaming_handles - ], - ) + await self._update_subscriptions(value=response) return response @t.overload @@ -299,7 +316,7 @@ async def set_with_expression( value: AnnotatedValue | Value, path: LabOneNodePath | None = None, ) -> list[AnnotatedValue]: - """Predefined behaviour for set_with_expression. + """Predefined behavior for set_with_expression. Finds all nodes associated with the path expression and call set for each of them. @@ -323,7 +340,7 @@ async def set_with_expression( return result async def subscribe(self, subscription: Subscription) -> None: - """Predefined behaviour for subscribe. + """Predefined behavior for subscribe. Stores the subscription. Whenever an update event happens they are distributed to all registered handles, diff --git a/src/labone/mock/session.py b/src/labone/mock/session.py index 8ee55d2..3f2b7e2 100644 --- a/src/labone/mock/session.py +++ b/src/labone/mock/session.py @@ -14,6 +14,7 @@ from abc import ABC, abstractmethod from typing import TYPE_CHECKING +import zhinst.comms from zhinst.comms.server import CapnpResult, CapnpServer, capnp_method from labone.core import ListNodesFlags, ListNodesInfoFlags, hpk_schema @@ -25,9 +26,6 @@ value_from_python_types, ) -if TYPE_CHECKING: - import zhinst.comms - HPK_SCHEMA_ID = 0xA621130A90860008 SESSION_SCHEMA_ID = 0xB9D445582DA4A55C SERVER_ERROR = "SERVER_ERROR" @@ -60,26 +58,33 @@ def __init__( self._streaming_handle = streaming_handle self.subscriber_id = subscriber_id - async def send_value(self, value: AnnotatedValue) -> None: + async def send_value(self, value: AnnotatedValue) -> bool: """Send value to the subscriber. Args: value: Value to send. + + Returns: + Flag indicating if the subscription is active """ - await self._streaming_handle.sendValues( - values=[ - { - "value": value_from_python_types( - value.value, - capability_version=Session.CAPABILITY_VERSION, - ), - "metadata": { - "path": value.path, - "timestamp": value.timestamp, + try: + await self._streaming_handle.sendValues( + values=[ + { + "value": value_from_python_types( + value.value, + capability_version=Session.CAPABILITY_VERSION, + ), + "metadata": { + "path": value.path, + "timestamp": value.timestamp, + }, }, - }, - ], - ) + ], + ) + except zhinst.comms.errors.DisconnectError: + return False + return True @property def path(self) -> LabOneNodePath: diff --git a/tests/mock/module_test.py b/tests/mock/module_test.py index 84be9e4..462dde5 100644 --- a/tests/mock/module_test.py +++ b/tests/mock/module_test.py @@ -41,6 +41,16 @@ async def test_subscription(): assert queue.empty() +@pytest.mark.asyncio +async def test_unsubscribe(): + session = await AutomaticLabOneServer({"/a/b": {}}).start_pipe() + + queue = await session.subscribe("/a/b") + queue.disconnect() + await session.set(path="/a/b", value=7) + assert queue.empty() + + @pytest.mark.asyncio async def test_subscription_multiple_changes(): session = await AutomaticLabOneServer({"/a/b": {}}).start_pipe()