Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait notification functionality feature #407

Open
vovagorodok opened this issue Feb 4, 2024 · 2 comments
Open

Wait notification functionality feature #407

vovagorodok opened this issue Feb 4, 2024 · 2 comments

Comments

@vovagorodok
Copy link

Will be nice to have functionality like tx_char.wait_for_notify() in order to have possibility to write simple scripts of serial communication that works like:

tx_char.wait_for_notify()
value = tx_char.value

I have this issue in my ArduinoBleOTA library for firmware uploading over ble:
https://github.com/vovagorodok/ArduinoBleOTA/blob/7fb9e079cf54e02b0701d117754e14ebe5db34cf/tools/bluezero_uploader.py#L146

@ukBaz
Copy link
Owner

ukBaz commented Feb 7, 2024

Apologies for the slow response on this.

I see you have been offered a way forward with the Bleak library hbldh/bleak#1501 (comment) so is this still of interest?

I thinking you are looking for a solution that doesn't use an event loop and blocks until the notification happens.

I've put the following quick example together

from dataclasses import dataclass
from typing import Optional
from gi.repository import GLib
from bluezero.central import Central

adapter_addr = "xx:xx:xx:xx:xx:xx"
peripheral_addr = "xx:xx:xx:xx:xx:xx"
batt_srvc_uuid = "0000180f-0000-1000-8000-00805f9b34fb"
batt_char_uuid = "00002a19-0000-1000-8000-00805f9b34fb"


@dataclass
class Battery:
    battery_value: Optional[int] = None
    notification_happened: bool = False

    def on_battery_value_changed(self, iface, changed_props, invalidated_props):
        print(iface, changed_props, invalidated_props)
        if "Value" in changed_props:
            self.battery_value = int.from_bytes(changed_props["Value"], "little")
            self.notification_happened = True

    def wait_on_notification(self):
        main_context = GLib.MainContext.default()
        self.notification_happened = False
        while not self.notification_happened:
            main_context.iteration(False)
        return self.battery_value


def main():
    batt_level = Battery()

    batt_device = Central(adapter_addr=adapter_addr, device_addr=peripheral_addr)

    batt_characteristic = batt_device.add_characteristic(batt_srvc_uuid, batt_char_uuid)

    batt_device.connect()
    batt_characteristic.add_characteristic_cb(batt_level.on_battery_value_changed)
    batt_characteristic.start_notify()

    returned_value = batt_level.wait_on_notification()
    batt_characteristic.stop_notify()
    batt_device.disconnect()
    print(f"Battery value is: {returned_value}")


if __name__ == "__main__":
    main()

I probably need to think more about this (especially if it was to be added to the library), but it might give you a way forward to experiment and report back if it is what you are looking for.

I did this experiment with the BLE Peripheral Simulator

@vovagorodok
Copy link
Author

I see you have been offered a way forward with the Bleak library hbldh/bleak#1501 (comment) so is this still of interest?

Yes, I have provided two uploader scripts in order to have user possibility to chose one:

Bleak is by default because supports other platform than Linux.

I thinking you are looking for a solution that doesn't use an event loop and blocks until the notification happens.

Yes

I've put the following quick example together

Added:

class NotifiedReader:
    _value = None

    def callback(self, iface, changed_props, invalidated_props):
        print(changed_props)
        if "Value" in changed_props:
            self._value = changed_props['Value']

    def read_value(self):
        main_context = gi.repository.GLib.MainContext.default()
        while self._value is None:
            main_context.iteration(True)
        value = self._value
        self._value = None
        return value

...


    reader = NotifiedReader()
    tx_char.add_characteristic_cb(reader.callback)

    rx_char.value = int_to_u8_bytes(BEGIN) + int_to_u32_bytes(firmware_len)
    begin_resp = handleBeginResponse(reader.read_value())

But callback was not called first time and we stuck on reader.read_value(). When reader.read_value() was changed to tx_char.value than callbeck was called, but we stuck on the next reader.read_value()

I probably need to think more about this (especially if it was to be added to the library), but it might give you a way forward to experiment and report back if it is what you are looking for.

Will be nice to have such functionality in library in future if it's possible

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants