Skip to content

Commit

Permalink
Add the subscribe mechanism to the capnp session
Browse files Browse the repository at this point in the history
The subscribe mechanism is based on the capnp stream model.
This means that the client creates for each subscription a
server that is passed to the kernel. The kernel than calls
a specific function on that value for each value change.

In this api the received value updates are stored in a async
queue. To disconnect a connection the created server would need
to be closed. In python this is not so easy due to the hidden
reference counting. Therefore a subscription needs to be canceled
manually through a disconnect call in the async queue.
  • Loading branch information
tobiasah committed Aug 31, 2023
1 parent 1db5082 commit b3cd9f1
Show file tree
Hide file tree
Showing 4 changed files with 743 additions and 4 deletions.
56 changes: 52 additions & 4 deletions src/labone/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
create_session_client_stream,
)
from labone.core.resources import session_protocol_capnp # type: ignore[attr-defined]
from labone.core.result import unwrap
from labone.core.subscription import DataQueue, StreamingHandle
from labone.core.value import AnnotatedValue

NodeType: TypeAlias = Literal[
Expand Down Expand Up @@ -72,7 +74,6 @@ class ListNodesInfoFlags(IntFlag):
ALL: Return all matching nodes.
SETTINGS_ONLY: Return only setting nodes.
STREAMING_ONLY: Return only streaming nodes.
SUBSCRIBED_ONLY: Return only subscribed nodes.
BASE_CHANNEL_ONLY: Return only one instance of a channel
in case of multiple channels.
GET_ONLY: Return only nodes which can be used with the get command.
Expand All @@ -83,7 +84,6 @@ class ListNodesInfoFlags(IntFlag):
ALL = 0
SETTINGS_ONLY = 1 << 3
STREAMING_ONLY = 1 << 4
SUBSCRIBED_ONLY = 1 << 5
BASE_CHANNEL_ONLY = 1 << 6
GET_ONLY = 1 << 7
EXCLUDE_STREAMING = 1 << 20
Expand All @@ -105,7 +105,6 @@ class ListNodesFlags(IntFlag):
are at the outermost level of the three.
SETTINGS_ONLY: Return only setting nodes.
STREAMING_ONLY: Return only streaming nodes.
SUBSCRIBED_ONLY: Return only subscribed nodes.
BASE_CHANNEL_ONLY: Return only one instance of a channel
in case of multiple channels.
GET_ONLY: Return only nodes which can be used with the get command.
Expand All @@ -119,7 +118,6 @@ class ListNodesFlags(IntFlag):
LEAVES_ONLY = 1 << 2
SETTINGS_ONLY = 1 << 3
STREAMING_ONLY = 1 << 4
SUBSCRIBED_ONLY = 1 << 5
BASE_CHANNEL_ONLY = 1 << 6
GET_ONLY = 1 << 7
EXCLUDE_STREAMING = 1 << 20
Expand Down Expand Up @@ -428,3 +426,53 @@ async def set_value(self, value: AnnotatedValue) -> AnnotatedValue:
request.value = capnp_value.value
response = await _send_and_wait_request(request)
return AnnotatedValue.from_capnp(result.unwrap(response.result))

async def subscribe(self, path: str) -> DataQueue:
"""Register a new subscription to a node.
Registers a new subscription to a node on the kernel/server. All
updates to the node will be pushed to the returned data queue.
Note:
An update is triggered by the device itself and does not
exclusively mean a change in the value of the node. For example
a set request from any client will also trigger an update event.
It is safe to have multiple subscriptions to the same path. However
in most cases it is more efficient to fork (DataQueue.fork) an
existing DataQueue rather then registering a new subscription at the
server. This is because the kernel/server will send the update events
to every registered subscription independently, causing additional
network overhead.
Args:
path: String representing the path of the node to be streamed.
Currently does not support wildcards in the path.
Returns:
An instance of the DataQueue class. This async queue will receive
all update events for the subscribed node.
Raises:
TypeError: If `path` is not a string
LabOneConnectionError: If there is a problem in the connection.
"""
streaming_handle = StreamingHandle()
subscription = session_protocol_capnp.Subscription(
streamingHandle=streaming_handle,
subscriberId=self._client_id.bytes,
)
try:
subscription.path = path
except (AttributeError, capnp.KjException) as error:
field_type = _request_field_type_description(subscription, "path")
msg = f"`path` attribute must be of type {field_type}."
raise TypeError(msg) from error
request = self._session.subscribe_request()
request.subscription = subscription
response = await _send_and_wait_request(request)
unwrap(response.result) # Result(Void, Error)
return DataQueue(
path=path,
register_function=streaming_handle.register_data_queue,
)
Loading

0 comments on commit b3cd9f1

Please sign in to comment.