Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the subscribe mechanism to the capnp session #9

Merged
merged 1 commit into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 52 additions & 4 deletions src/labone/core/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
create_session_client_stream,
)
from labone.core.resources import session_protocol_capnp # type: ignore[attr-defined]
from labone.core.result import unwrap
from labone.core.subscription import DataQueue, StreamingHandle
from labone.core.value import AnnotatedValue

NodeType: TypeAlias = Literal[
Expand Down Expand Up @@ -72,7 +74,6 @@ class ListNodesInfoFlags(IntFlag):
ALL: Return all matching nodes.
SETTINGS_ONLY: Return only setting nodes.
STREAMING_ONLY: Return only streaming nodes.
SUBSCRIBED_ONLY: Return only subscribed nodes.
BASE_CHANNEL_ONLY: Return only one instance of a channel
in case of multiple channels.
GET_ONLY: Return only nodes which can be used with the get command.
Expand All @@ -83,7 +84,6 @@ class ListNodesInfoFlags(IntFlag):
ALL = 0
SETTINGS_ONLY = 1 << 3
STREAMING_ONLY = 1 << 4
SUBSCRIBED_ONLY = 1 << 5
BASE_CHANNEL_ONLY = 1 << 6
GET_ONLY = 1 << 7
EXCLUDE_STREAMING = 1 << 20
Expand All @@ -105,7 +105,6 @@ class ListNodesFlags(IntFlag):
are at the outermost level of the three.
SETTINGS_ONLY: Return only setting nodes.
STREAMING_ONLY: Return only streaming nodes.
SUBSCRIBED_ONLY: Return only subscribed nodes.
BASE_CHANNEL_ONLY: Return only one instance of a channel
in case of multiple channels.
GET_ONLY: Return only nodes which can be used with the get command.
Expand All @@ -119,7 +118,6 @@ class ListNodesFlags(IntFlag):
LEAVES_ONLY = 1 << 2
SETTINGS_ONLY = 1 << 3
STREAMING_ONLY = 1 << 4
SUBSCRIBED_ONLY = 1 << 5
BASE_CHANNEL_ONLY = 1 << 6
GET_ONLY = 1 << 7
EXCLUDE_STREAMING = 1 << 20
Expand Down Expand Up @@ -428,3 +426,53 @@ async def set_value(self, value: AnnotatedValue) -> AnnotatedValue:
request.value = capnp_value.value
response = await _send_and_wait_request(request)
return AnnotatedValue.from_capnp(result.unwrap(response.result))

async def subscribe(self, path: str) -> DataQueue:
"""Register a new subscription to a node.

Registers a new subscription to a node on the kernel/server. All
updates to the node will be pushed to the returned data queue.

Note:
An update is triggered by the device itself and does not
exclusively mean a change in the value of the node. For example
a set request from any client will also trigger an update event.

It is safe to have multiple subscriptions to the same path. However
in most cases it is more efficient to fork (DataQueue.fork) an
existing DataQueue rather then registering a new subscription at the
server. This is because the kernel/server will send the update events
to every registered subscription independently, causing additional
network overhead.

Args:
path: String representing the path of the node to be streamed.
Currently does not support wildcards in the path.

Returns:
An instance of the DataQueue class. This async queue will receive
all update events for the subscribed node.

Raises:
TypeError: If `path` is not a string
LabOneConnectionError: If there is a problem in the connection.
"""
streaming_handle = StreamingHandle()
subscription = session_protocol_capnp.Subscription(
streamingHandle=streaming_handle,
subscriberId=self._client_id.bytes,
)
try:
subscription.path = path
except (AttributeError, TypeError, capnp.KjException) as error:
field_type = _request_field_type_description(subscription, "path")
msg = f"`path` attribute must be of type {field_type}."
raise TypeError(msg) from error
request = self._session.subscribe_request()
request.subscription = subscription
response = await _send_and_wait_request(request)
unwrap(response.result) # Result(Void, Error)
return DataQueue(
path=path,
register_function=streaming_handle.register_data_queue,
)
Loading
Loading