Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add the subscribe mechanism to the capnp session #9

Merged
merged 1 commit into from
Sep 1, 2023

Conversation

tobiasah
Copy link
Member

The subscribe mechanism is based on the capnp stream model. This means that the client creates for each subscription a server that is passed to the kernel. The kernel than calls a specific function on that value for each value change.

In this api the received value updates are stored in a async queue. To disconnect a connection the created server would need to be closed. In python this is not so easy due to the hidden reference counting. Therefore a subscription needs to be canceled manually through a disconnect call in the async queue.

@tobiasah tobiasah requested a review from markheik August 31, 2023 14:29
@tobiasah tobiasah self-assigned this Aug 31, 2023
@tobiasah tobiasah force-pushed the tobiasa/subscribe branch 5 times, most recently from cde0a4a to b3cd9f1 Compare August 31, 2023 15:13
Copy link
Collaborator

@markheik markheik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code looking clean!

Added some comments on highlighting Python types with `.


Subscriptions are implemented through the capnp stream mechanism. This handles
all the communication stuff, e.g. back pressure and flow control. The only thing
the client needs to provide is a session_protocol_capnp.StreamingHandle.Server
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpickly: session_protocol_capnp.StreamingHandle.Server

src/labone/core/subscription.py Outdated Show resolved Hide resolved
src/labone/core/subscription.py Outdated Show resolved Hide resolved
src/labone/core/subscription.py Outdated Show resolved Hide resolved
src/labone/core/subscription.py Outdated Show resolved Hide resolved
src/labone/core/subscription.py Show resolved Hide resolved
src/labone/core/subscription.py Outdated Show resolved Hide resolved
)
try:
subscription.path = path
except (AttributeError, capnp.KjException) as error:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think capnp also raises TypeError, when type of list is given

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would be fine to show to the user right? I just wanted to avoid KjExceptions being leaked. And the Attribiute Error a weird thing from capnp because it tires to convert the dict into a capnp object

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The error message is not really helpful from capnp:

r = ses._session.listNodes_request()
r.pathExpression = [123]  # Should be string
>>> TypeError: Cannot convert str to capnp.lib.capnp._DynamicListBuilder

This is probably super error rare edge case.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*unless capnp struct assignment throws a different error.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right just tested it:

await ses.subscribe(["foobar"])
>>>

TypeError                                 Traceback (most recent call last)
Cell In[5], line 1
----> 1 await ses.subscribe(["foobar"])

File ~/Documents/zhinst/zpy/labone/src/labone/core/session.py:466, in Session.subscribe(self, path)
    461 subscription = session_protocol_capnp.Subscription(
    462     streamingHandle=streaming_handle,
    463     subscriberId=self._client_id.bytes,
    464 )
    465 try:
--> 466     subscription.path = path
    467 except (AttributeError, capnp.KjException) as error:
    468     field_type = _request_field_type_description(subscription, "path")

File ~/.pyenv/versions/3.9.13/envs/pylabone/lib/python3.9/site-packages/capnp/lib/capnp.pyx:1368, in capnp.lib.capnp._DynamicStructBuilder.__setattr__()

File ~/.pyenv/versions/3.9.13/envs/pylabone/lib/python3.9/site-packages/capnp/lib/capnp.pyx:1361, in capnp.lib.capnp._DynamicStructBuilder._set()

File ~/.pyenv/versions/3.9.13/envs/pylabone/lib/python3.9/site-packages/capnp/lib/capnp.pyx:773, in capnp.lib.capnp._setDynamicField()

TypeError: Cannot convert str to capnp.lib.capnp._DynamicListBuilder

@tobiasah tobiasah force-pushed the tobiasa/subscribe branch 2 times, most recently from a7cd733 to 829f0ed Compare September 1, 2023 09:28
The subscribe mechanism is based on the capnp stream model.
This means that the client creates for each subscription a
server that is passed to the kernel. The kernel than calls
a specific function on that value for each value change.

In this api the received value updates are stored in a async
queue. To disconnect a connection the created server would need
to be closed. In python this is not so easy due to the hidden
reference counting. Therefore a subscription needs to be canceled
manually through a disconnect call in the async queue.
@tobiasah tobiasah merged commit 2da3fb1 into main Sep 1, 2023
8 checks passed
@tobiasah tobiasah deleted the tobiasa/subscribe branch January 24, 2024 14:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants