Skip to content

Commit

Permalink
Add support for configurable subscriptions
Browse files Browse the repository at this point in the history
Add "kwargs" to the "subscribe" method to allow the forwarding of
configurations to the data-server. This feature is needed to allow the
configuration of the severity level when subscribing to the
"/zi/debug/log" node.
  • Loading branch information
Fabio Rossetto committed Nov 8, 2024
1 parent 49fe5fb commit 9fadee8
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
17 changes: 17 additions & 0 deletions src/labone/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ async def subscribe(
queue_type: None = None,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
get_initial_value: bool = False,
**kwargs,
) -> DataQueue: ...

@t.overload
Expand All @@ -765,6 +766,7 @@ async def subscribe(
queue_type: type[QueueProtocol],
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol: ...

@async_translate_comms_error
Expand All @@ -775,6 +777,7 @@ async def subscribe(
queue_type: type[QueueProtocol] | None = None,
parser_callback: t.Callable[[AnnotatedValue], AnnotatedValue] | None = None,
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol | DataQueue:
"""Register a new subscription to a node.
Expand Down Expand Up @@ -810,6 +813,8 @@ async def subscribe(
the default DataQueue class is used. (default=None)
get_initial_value: If True, the initial value of the node is
is placed in the queue. (default=False)
kwargs: extra keyword arguments which are passed to the data-server
to further configure the subscription.
Returns:
An instance of the DataQueue class. This async queue will receive
Expand All @@ -834,6 +839,18 @@ async def subscribe(
streaming_handle.capnp_callback,
),
"subscriberId": self._client_id.bytes,
"kwargs": {
"entries": [
{
"key": k,
"value": value_from_python_types(
v,
capability_version=self._capability_version,
),
}
for k, v in kwargs.items()
],
},
}
if get_initial_value:
_, initial_value = await asyncio.gather(
Expand Down
27 changes: 25 additions & 2 deletions src/labone/nodetree/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -1013,14 +1013,20 @@ async def wait_for_state_change(
...

@t.overload
async def subscribe(self, *, get_initial_value: bool = False) -> DataQueue: ...
async def subscribe(
self,
*,
get_initial_value: bool = False,
**kwargs,
) -> DataQueue: ...

@t.overload
async def subscribe(
self,
*,
queue_type: type[QueueProtocol],
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol: ...

@abstractmethod
Expand All @@ -1029,6 +1035,7 @@ async def subscribe(
*,
queue_type: type[QueueProtocol] | None = None,
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Expand Down Expand Up @@ -1060,6 +1067,8 @@ async def subscribe(
the default DataQueue class is used. (default=None)
get_initial_value: If True, the initial value of the node is
is placed in the queue. (default=False)
kwargs: extra keyword arguments which are passed to the data-server
to further configure the subscription.
Returns:
A DataQueue, which can be used to receive any changes to the node in a
Expand Down Expand Up @@ -1155,21 +1164,28 @@ def try_generate_subnode(
raise LabOneInvalidPathError(msg)

@t.overload
async def subscribe(self, *, get_initial_value: bool = False) -> DataQueue: ...
async def subscribe(
self,
*,
get_initial_value: bool = False,
**kwargs,
) -> DataQueue: ...

@t.overload
async def subscribe(
self,
*,
queue_type: type[QueueProtocol],
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol: ...

async def subscribe(
self,
*,
queue_type: type[QueueProtocol] | None = None,
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Expand Down Expand Up @@ -1200,6 +1216,9 @@ async def subscribe(
get_initial_value: If True, the initial value of the node is
is placed in the queue. (default=False)
kwargs: extra keyword arguments which are passed to the data-server
to further configure the subscription.
Returns:
A DataQueue, which can be used to receive any changes to the node in a
flexible manner.
Expand All @@ -1209,6 +1228,7 @@ async def subscribe(
parser_callback=self._tree_manager.parser,
queue_type=queue_type or DataQueue,
get_initial_value=get_initial_value,
**kwargs,
)

async def wait_for_state_change(
Expand Down Expand Up @@ -1340,6 +1360,7 @@ async def subscribe(
self,
*,
get_initial_value: bool = False,
**kwargs,
) -> DataQueue: ...

@t.overload
Expand All @@ -1348,13 +1369,15 @@ async def subscribe(
*,
queue_type: type[QueueProtocol],
get_initial_value: bool = False,
**kwargs,
) -> QueueProtocol: ...

async def subscribe(
self,
*,
queue_type: type[QueueProtocol] | None = None, # noqa: ARG002
get_initial_value: bool = False, # noqa: ARG002
**kwargs, # noqa: ARG002
) -> QueueProtocol | DataQueue:
"""Subscribe to a node.
Expand Down

0 comments on commit 9fadee8

Please sign in to comment.