Skip to content

Commit

Permalink
Allow garcefull subscripiton cancellation
Browse files Browse the repository at this point in the history
This commit prevents an error is propagated to the data server if
a subcription is disconnected intentionnally without an error.
  • Loading branch information
tobiasah committed Nov 19, 2024
1 parent d442b1a commit abde7fb
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 55 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* `subscribe` accepts keyword arguments, which are forwarded to the data-server.
This allows to configure the subscription to the data-server.
Note that as of LabOne 24.10, no node supports yet subscription configuration.
* Fix error message in data server log if a subscription is cancelled gracefully.
* Adapt mock data server to hand unsubscribe events correctly.

## Version 3.1.2
* Fix bug which caused streaming errors to cancel the subscriptions
Expand Down
15 changes: 5 additions & 10 deletions src/labone/core/subscription.py
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,7 @@ def _distribute_to_data_queues(
value: The value to add to the data queue.
Raises:
capnp.KjException: If no data queues are registered any more and
the subscription should be removed.
ValueError: If the value could not be parsed.
"""
try:
parsed_value = self._parser_callback(AnnotatedValue.from_capnp(value))
Expand All @@ -510,10 +509,6 @@ def _distribute_to_data_queues(
raise
self.distribute_to_data_queues(parsed_value)

if not self._data_queues:
msg = "No data queues are registered anymore. Disconnecting subscription."
raise errors.StreamingError(msg)

async def capnp_callback(
self,
interface: int, # noqa: ARG002
Expand All @@ -531,13 +526,13 @@ async def capnp_callback(
method_index: The method index of the capnp schema.
call_input: The input data of the capnp schema.
fulfiller: The fulfiller to fulfill the promise.
Raises:
capnp.KjException: If no data queues are registered any more and
the subscription should be removed.
"""
try:
list(map(self._distribute_to_data_queues, call_input.values))
if len(self._data_queues) == 0:
msg = "No queues registered anymore"
fulfiller.reject(zhinst.comms.Fulfiller.DISCONNECTED, msg)
return
fulfiller.fulfill()
except Exception as err: # noqa: BLE001
fulfiller.reject(zhinst.comms.Fulfiller.FAILED, err.args[0])
73 changes: 45 additions & 28 deletions src/labone/mock/automatic_server.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
"""Partially predifined behaviour for HPK mock.
"""Partially predefined behavior for HPK mock.
This class provides basic Hpk mock functionality by taking over some usually
desired tasks. With that in place, the user may inherit from this class
in order to further specify behavior, without having to start from scratch.
Even if some of the predefined behaviour is not desired, the implementation
Even if some of the predefined behavior is not desired, the implementation
can give some reference on how an individual mock server can be implemented.
Already predefined behaviour:
Already predefined behavior:
* Simulating state for get/set:
A dictionary is used to store the state of the mock server.
Expand All @@ -22,7 +22,7 @@
The subscriptions are stored and on every change, the new value is passed
into the queues.
* Adding chronological timestamps to responses:
The server answers need timestamps to the responsis in any case.
The server answers need timestamps to the responses in any case.
By using the monotonic clock, the timestamps are added automatically.
"""
Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(
self._common_prefix = None

def get_timestamp(self) -> int:
"""Create a realisitc timestamp.
"""Create a realistic timestamp.
Call this function to obtain a timestamp for some response.
As a internal clock is used, subsequent calls will return
Expand All @@ -107,15 +107,15 @@ def get_timestamp(self) -> int:
return time.monotonic_ns()

def _sanitize_path(self, path: LabOneNodePath) -> LabOneNodePath:
"""Sanatize the path.
"""Sanitize the path.
Removes trailing slashes and replaces empty path with root path.
Args:
path: Path to sanatize.
path: Path to sanitize.
Returns:
Sanatized path.
Sanitized path.
"""
if self._common_prefix and not path.startswith("/"):
return f"{self._common_prefix}/{path}"
Expand All @@ -127,19 +127,19 @@ async def list_nodes_info(
*,
flags: ListNodesInfoFlags | int = ListNodesInfoFlags.ALL, # noqa: ARG002
) -> dict[LabOneNodePath, NodeInfoType]:
"""Predefined behaviour for list_nodes_info.
"""Predefined behavior for list_nodes_info.
Uses knowledge of the tree structure to answer.
Warning:
Flags will be ignored in this implementation. (TODO)
For now, the behaviour is equivalent to
For now, the behavior is equivalent to
ListNodesFlags.RECURSIVE | ListNodesFlags.ABSOLUTE
Args:
path: Path to narrow down which nodes should be listed. Omitting
the path will list all nodes by default.
flags: Flags to control the behaviour of the list_nodes_info method.
flags: Flags to control the behavior of the list_nodes_info method.
Returns:
Dictionary of paths to node info.
Expand All @@ -154,19 +154,19 @@ async def list_nodes(
*,
flags: ListNodesFlags | int = ListNodesFlags.ABSOLUTE, # noqa: ARG002
) -> list[LabOneNodePath]:
"""Predefined behaviour for list_nodes.
"""Predefined behavior for list_nodes.
Uses knowledge of the tree structure to answer.
Warning:
Flags will be ignored in this implementation. (TODO)
For now, the behaviour is equivalent to
For now, the behavior is equivalent to
ListNodesFlags.RECURSIVE | ListNodesFlags.ABSOLUTE
Args:
path: Path to narrow down which nodes should be listed. Omitting
the path will list all nodes by default.
flags: Flags to control the behaviour of the list_nodes method.
flags: Flags to control the behavior of the list_nodes method.
Returns:
List of paths.
Expand All @@ -183,7 +183,7 @@ async def list_nodes(
]

async def get(self, path: LabOneNodePath) -> AnnotatedValue:
"""Predefined behaviour for get.
"""Predefined behavior for get.
Look up the path in the internal dictionary.
Expand Down Expand Up @@ -212,20 +212,44 @@ async def get_with_expression(
| ListNodesFlags.EXCLUDE_STREAMING
| ListNodesFlags.GET_ONLY,
) -> list[AnnotatedValue]:
"""Predefined behaviour for get_with_expression.
"""Predefined behavior for get_with_expression.
Find all nodes associated with the path expression
and call get for each of them.
Args:
path_expression: Path expression to get.
flags: Flags to control the behaviour of the get_with_expression method.
flags: Flags to control the behavior of the get_with_expression method.
Returns:
List of values, corresponding to nodes of the path expression.
"""
return [await self.get(p) for p in await self.list_nodes(path=path_expression)]

async def _update_subscriptions(self, value: AnnotatedValue) -> None:
"""Update all subscriptions with the new value.
Args:
value: New value.
"""
if self.memory[value.path].streaming_handles:
# sending updated value to subscriptions
result = await asyncio.gather(
*[
handle.send_value(value)
for handle in self.memory[value.path].streaming_handles
],
)
# Remove all disconnected subscriptions
self.memory[value.path].streaming_handles = [
handle
for handle, success in zip(
self.memory[value.path].streaming_handles,
result,
)
if success
]

@t.overload
async def set(self, value: AnnotatedValue) -> AnnotatedValue: ...

Expand All @@ -241,7 +265,7 @@ async def set(
value: AnnotatedValue | Value,
path: str = "",
) -> AnnotatedValue:
"""Predefined behaviour for set.
"""Predefined behavior for set.
Updates the internal dictionary. A set command is considered
as an update and will be distributed to all registered subscription handlers.
Expand Down Expand Up @@ -271,14 +295,7 @@ async def set(
path=path,
timestamp=self.get_timestamp(),
)
if self.memory[path].streaming_handles:
# sending updated value to subscriptions
await asyncio.gather(
*[
handle.send_value(response)
for handle in self.memory[path].streaming_handles
],
)
await self._update_subscriptions(value=response)
return response

@t.overload
Expand All @@ -299,7 +316,7 @@ async def set_with_expression(
value: AnnotatedValue | Value,
path: LabOneNodePath | None = None,
) -> list[AnnotatedValue]:
"""Predefined behaviour for set_with_expression.
"""Predefined behavior for set_with_expression.
Finds all nodes associated with the path expression
and call set for each of them.
Expand All @@ -323,7 +340,7 @@ async def set_with_expression(
return result

async def subscribe(self, subscription: Subscription) -> None:
"""Predefined behaviour for subscribe.
"""Predefined behavior for subscribe.
Stores the subscription. Whenever an update event happens
they are distributed to all registered handles,
Expand Down
39 changes: 22 additions & 17 deletions src/labone/mock/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING

import zhinst.comms
from zhinst.comms.server import CapnpResult, CapnpServer, capnp_method

from labone.core import ListNodesFlags, ListNodesInfoFlags, hpk_schema
Expand All @@ -25,9 +26,6 @@
value_from_python_types,
)

if TYPE_CHECKING:
import zhinst.comms

HPK_SCHEMA_ID = 0xA621130A90860008
SESSION_SCHEMA_ID = 0xB9D445582DA4A55C
SERVER_ERROR = "SERVER_ERROR"
Expand Down Expand Up @@ -60,26 +58,33 @@ def __init__(
self._streaming_handle = streaming_handle
self.subscriber_id = subscriber_id

async def send_value(self, value: AnnotatedValue) -> None:
async def send_value(self, value: AnnotatedValue) -> bool:
"""Send value to the subscriber.
Args:
value: Value to send.
Returns:
Flag indicating if the subscription is active
"""
await self._streaming_handle.sendValues(
values=[
{
"value": value_from_python_types(
value.value,
capability_version=Session.CAPABILITY_VERSION,
),
"metadata": {
"path": value.path,
"timestamp": value.timestamp,
try:
await self._streaming_handle.sendValues(
values=[
{
"value": value_from_python_types(
value.value,
capability_version=Session.CAPABILITY_VERSION,
),
"metadata": {
"path": value.path,
"timestamp": value.timestamp,
},
},
},
],
)
],
)
except zhinst.comms.errors.DisconnectError:
return False
return True

@property
def path(self) -> LabOneNodePath:
Expand Down
10 changes: 10 additions & 0 deletions tests/mock/module_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ async def test_subscription():
assert queue.empty()


@pytest.mark.asyncio
async def test_unsubscribe():
session = await AutomaticLabOneServer({"/a/b": {}}).start_pipe()

queue = await session.subscribe("/a/b")
queue.disconnect()
await session.set(path="/a/b", value=7)
assert queue.empty()


@pytest.mark.asyncio
async def test_subscription_multiple_changes():
session = await AutomaticLabOneServer({"/a/b": {}}).start_pipe()
Expand Down

0 comments on commit abde7fb

Please sign in to comment.