Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experiment with using Gio.DBusProxy to replace python-dbus #240

Open
ukBaz opened this issue Jul 8, 2019 · 11 comments
Open

Experiment with using Gio.DBusProxy to replace python-dbus #240

ukBaz opened this issue Jul 8, 2019 · 11 comments

Comments

@ukBaz
Copy link
Owner

ukBaz commented Jul 8, 2019

Bluezero currently uses dbus-python which is a legacy API, built with a deprecated dbus-glib library so I am keen to move away from having it as a dependency.

There have been some experiments to move to pydbus (#170), and while pydbus is a great library, it does not work for a couple of the modes e.g. peripheral mode.

I have been look at the DBusProxy from gi.repository import Gio
I haven't been able to find any good documentation so looking at source:
https://gitlab.gnome.org/GNOME/pygobject/blob/master/gi/overrides/Gio.py#L373
and using the help on the module:

>>> from gi.repository import Gio
>>> help(Gio.DBusProxy)

Using Python inheritance, meta-classes, and DBus introspection it is possible to build something that works and is easier to maintain. Currently I have only done a central device. Next action is to look into doing a beacon or peripheral.
If that experiment is also successful I'll tidy up the code and post it somewhere before changing Bluezero.

The purpose of opening this issue is to allow people that follow this repository to comment.

This is what the API structure looks like today:

Level 1 Level 10 Level 100 shared
microbit.py broadcaster.py adapter.py tools.py
eddystone_beacon.py central.py advertisement.py constants.py
  observer.py device.py dbus_tools.py
  peripheral.py GATT.py async_tools.py
    localGATT.py  

The idea would be that level 1 and 10 API would not change. There is a high chance that level 100 would have to. Need to think about how disruptive that would be.
It is also highly likely that Python 2 support would be dropped.
Need to think about how to reflect that in the version numbering.

The API for accessing the level 100 API would look like this for a central device.

if __name__ == '__main__':
    om = Manager()
    dongle = om.get_adapter()
    print(dongle['Address'])
    # dongle['Powered'] = False
    # sleep(2)
    # dongle['Powered'] = True
    # sleep(1)
    # dongle.StartDiscovery()
    # sleep(5)
    # dongle.StopDiscovery()
    for dev in om.get_devices():
        print('Name' in dev)
        if 'Name' in dev:
            print('Name:', dev['Name'])
        else:
            print('Alias:', dev['Alias'])
    ubit = om.get_device('EB:F6:95:27:84:A0',
                          dongle.get_object_path())
    ubit.Connect()
    print('Connected')
    while ubit['ServicesResolved']:
        print('Waiting for Services to be resolved')
        sleep(1)
    print('Services Resolved')
    sleep(3)
    # How to get the object manager to re-read once services have been resolved?
    btn_a = om.get_gatt_characteristic('E95DDA90-251D-470A-A062-FA1922DFA9A8',
                                       ubit.get_object_path())
    print('Btn A', btn_a)
    while btn_a.ReadValue('(a{sv})', {})[0] < 1:
        print('Press btn_a')
        sleep(0.5)
    ubit.Disconnect()
@andysan
Copy link

andysan commented Jan 23, 2021

Have you considered any DBus interfaces that support Python's native asyncio functionality? I used dbus-next for a couple of Bluetooth Mesh experiments a while back and found it reliable easy to use.

@ukBaz
Copy link
Owner Author

ukBaz commented Jan 23, 2021

Thanks for the suggestion @andysan. I have just had a brief look at the docs and done the simple example below. I'll leave this here for future reference:

from dbus_next import Message, BusType, Variant
from dbus_next.glib import MessageBus

bus = MessageBus(bus_type=BusType.SYSTEM)

print(bus)
print(dir(bus))
bus.connect_sync()
print('connected')
adptr_xml = bus.introspect_sync(bus_name='org.bluez', path='/org/bluez/hci0')
print(dir(adptr_xml))
print(adptr_xml.tostring())
obj = bus.get_proxy_object(bus_name='org.bluez', path='/org/bluez/hci0',
                           introspection=adptr_xml.tostring())
print(dir(obj))

dongle = obj.get_interface('org.bluez.Adapter1')
print(dir(dongle))
print('This adapters address:', dongle.get_address_sync())

@andysan
Copy link

andysan commented Jan 23, 2021

Here is another library that I'm planning to evaluate for one of my own projects: https://github.com/ccxtechnologies/adbus

The only reason I'm not immediately going with dbus-next is that it (ab)uses type annotations to control dbus types in exported services. This is a bit annoying if you are planning to use a type checker like Mypy since the type annotations won't match Python's "canonical" form. I ended up sprinkling # type: ignore on all of the exported methods, which gets a bit annoying.

The adbus library is more traditional and resembles the old python-dbus pakages. Unfortunately, it seems to require a fairly recent Python 3 version (IIRC, Python 3.7).

@ukBaz
Copy link
Owner Author

ukBaz commented Jan 24, 2021

The Bluezero project grew out of helping school children do STEM projects. When looking at design choices I always have in my mind, how would I explain this to a classroom of students?

This typically means if it is provided as part of the Raspberry Pi OS then it scores highly. So for example Python 3.7 isn't an issue as that is the default in current releases of RPi OS. Having to build from C or install lots of dependencies will score lower.

The biggest challenge facing Bluezero is testing. This is unit, integration, and functional tests. Helping students to use/understand Bluetooth is a challenge without difficult to write or flaky code.

Event driven Python programming is generally something new to them. While Scratch and MakeCode on micro:bit are all using event driven, it is not what they have experience of in Python. The few experiments I've done, I've found ascyncio is something difficult for Bluezero to use in a way that allows the presentation of an API that is easy to explain.

Of the D-Bus libraries I've used, pydbus is the library I find easiest to use (for most things). It has a rubbish bus factor and publishing on to the D-Bus requires threads which is more involved than I would like Bluezero to be. This is why I haven't made the move.

This conversation has motivated me to capture a list of the D-Bus Python bindings which now exists at:
https://github.com/ukBaz/python-bluezero/wiki/D-Bus-Bindings-for-Python

If you have any others then please let me know. Also, if you have a strong favourite that would also be interesting to hear.

@ukBaz
Copy link
Owner Author

ukBaz commented Sep 16, 2021

Some progress has been made on using from gi.repository import Gio. For reference, the most helpful documentation was at:
https://lazka.github.io/pgi-docs/#Gio-2.0

A minimal example is below. This requires that property and a method proxy is separate for each D-Bus object path. i.e.

    # Methods
    dongle_methods = bluez.get_proxy(dongle_path, 'org.bluez.Adapter1')
    dongle_methods.StartDiscovery()
    # Properties
    dongle_props = bluez.get_proxy(dongle_path, 'org.freedesktop.DBus.Properties')
    dongle_address = dongle_props.Get('(ss)', 'org.bluez.Adapter1', 'Address')

The full working example is:

from gi.repository import Gio, GLib


class BluezDBusClient:

    def __init__(self):
        self.con = Gio.DBusObjectManagerClient.new_for_bus_sync(
            bus_type=Gio.BusType.SYSTEM,
            flags=Gio.DBusObjectManagerClientFlags.NONE,
            name='org.bluez',
            object_path='/',
            get_proxy_type_func=None,
            get_proxy_type_user_data=None,
            cancellable=None,
        )
        self.con.connect('object-added', self._on_object_added)
        self.con.connect('object-removed', self._on_object_removed)
        self.con.connect('interface-proxy-properties-changed',
                         self._on_properties_changed)
        self.on_device_added = None
        self.on_device_removed = None

    def get_proxy(self, dbus_object, iface):
        return self.con.get_interface(dbus_object, iface)

    def _on_object_added(self,
                         dbus_obj_mngr: Gio.DBusObjectManager,
                         dbus_object: Gio.DBusObject) -> None:

        device_proxy = dbus_object.get_interface('org.bluez.Device1')

        if device_proxy and self.on_device_added:
            object_path = device_proxy.get_object_path()
            self.on_device_added(object_path)

    def _on_object_removed(self,
                           dbus_obj_mngr: Gio.DBusObjectManager,
                           dbus_object: Gio.DBusObject) -> None:

        device_proxy = dbus_object.get_interface('org.bluez.Device1')

        if device_proxy and self.on_device_removed:
            object_path = device_proxy.get_object_path()
            self.on_device_removed(object_path)

    def _on_properties_changed(
            self,
            dbus_client: Gio.DBusObjectManagerClient,
            object_proxy: Gio.DBusObjectProxy,
            interface_proxy: Gio.DBusProxy,
            changed_properties: GLib.Variant,
            invalidated_properties: list):
        print('Properites changed')
        print('\t', dbus_client)
        print('\t', object_proxy.get_object_path())
        print('\t', interface_proxy.get_interface_name())
        print('\t', changed_properties.unpack())
        print('\t', invalidated_properties)


def device_added(dbus_object: Gio.DBusObject):
    print(f'Device added: {dbus_object}')


def device_removed(dbus_object: Gio.DBusObject):
    print(f'Device removed: {dbus_object}')


if __name__ == '__main__':
    mainloop = GLib.MainLoop()
    bluez = BluezDBusClient()
    dongle_path = '/org/bluez/hci0'

    print('Known Devices:')
    for obj_proxy in bluez.con.get_objects():
        proxy = obj_proxy.get_interface('org.bluez.Device1')
        if proxy:
            print(f'\t{proxy.get_object_path()}')

    dongle_methods = bluez.get_proxy(dongle_path, 'org.bluez.Adapter1')
    dongle_props = bluez.get_proxy(dongle_path, 'org.freedesktop.DBus.Properties')
    dongle_address = dongle_props.Get('(ss)', 'org.bluez.Adapter1', 'Address')
    print(f'Dongle address: {dongle_address}')
    bluez.on_device_added = device_added
    bluez.on_device_removed = device_removed
    dongle_methods.StartDiscovery()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        mainloop.quit()

@ukBaz
Copy link
Owner Author

ukBaz commented Sep 16, 2021

The following example is a client again but this tries to introspect the D-Bus object and combine interfaces so that they can all be access from the one python object e.g.

    dongle = bluez.get(dongle_path)
    print(dongle.Address)
    dongle.StartDiscovery()

The full code for this experiment was:

from pprint import pprint
from xml.etree import ElementTree as ET
from gi.repository import Gio, GLib


class ProxyComposite:
    def __init__(self, client, dbus_path):
        self._client = client
        self._dbus_path = dbus_path
        self._prop_proxy = self._client.get_proxy(
            dbus_path, 'org.freedesktop.DBus.Properties')


class ProxyProperty:
    def __init__(self, iface_name, prop_xml):
        self._iface_name = iface_name
        self.__name__ = prop_xml.attrib["name"]
        self._type = prop_xml.attrib["type"]
        access = prop_xml.attrib["access"]
        self._readable = access.startswith("read")
        self._writeable = access.endswith("write")
        self.__doc__ = "(" + self._type + ") " + access

    def __get__(self, instance, owner):
        return instance._prop_proxy.Get('(ss)',
                                        self._iface_name,
                                        self.__name__)

    def __set__(self, instance, value):
        return instance._prop_proxy.Set('(ssv)',
                                        self._iface_name,
                                        self.__name__,
                                        GLib.Variant(self._type, value))


class ProxyMethod:
    def __init__(self, iface_name, method):
        self._iface_name = iface_name
        self.__name__ = method.attrib["name"]
        self.__qualname__ = self._iface_name + "." + self.__name__

        self._inargs = [(arg.attrib.get("name", ""),
                         arg.attrib["type"]) for arg in method if
                        arg.tag == "arg" and arg.attrib.get("direction", "in") == "in"]
        self._outargs = [arg.attrib["type"] for arg in method if
                         arg.tag == "arg" and arg.attrib.get("direction", "in") == "out"]
        self._sinargs = "(" + "".join(x[1] for x in self._inargs) + ")"
        self._soutargs = "(" + "".join(self._outargs) + ")"


def get_method(client, iface, path, method):
    proxy = client.get_proxy(path, iface)
    return proxy.__getattr__(method)


def iface_factory(iface, client):
    class Interface(ProxyComposite):
        @staticmethod
        def _introspect():
            print(iface.attrib["name"] + ":")
            for member in iface:
                print("\t" + member.tag + " " + member.attrib["name"])
            print()
    Interface.__name__ = iface.attrib['name']
    for member in iface:
        member_name = member.attrib["name"]
        if member.tag == "property":
            setattr(Interface,
                    member_name,
                    ProxyProperty(Interface.__name__, member))
        elif member.tag == 'method':
            setattr(Interface, member_name, get_method(client,
                                                       Interface.__name__,
                                                       '/org/bluez/hci0',
                                                       member.attrib['name']))

    return Interface


def proxy_factory(object_data, client):
    class CompositeObject(ProxyComposite):
        def __getitem__(self, iface):
            matching_bases = [base for base in type(self).__bases__ if base.__name__ == iface]

            if len(matching_bases) == 0:
                raise KeyError(iface)
            assert (len(matching_bases) == 1)

            iface_class = matching_bases[0]
            return iface_class(self._client, self._dbus_path)

        # client, dbus_path
        @classmethod
        def _Introspect(cls):
            for iface in cls.__bases__:
                try:
                    iface._Introspect()
                except:
                    pass

    bluez_ifaces = [_ for _ in object_data if _.tag == "interface" and _.attrib['name'].startswith('org.bluez.')]

    print('bluez_ifaces', bluez_ifaces)
    CompositeObject.__bases__ = tuple(iface_factory(iface, client) for iface in bluez_ifaces)
    print(CompositeObject.__bases__)
    return CompositeObject


class BluezDBusClient:

    def __init__(self):
        self.con = Gio.DBusObjectManagerClient.new_for_bus_sync(
            bus_type=Gio.BusType.SYSTEM,
            flags=Gio.DBusObjectManagerClientFlags.NONE,
            name='org.bluez',
            object_path='/',
            get_proxy_type_func=None,
            get_proxy_type_user_data=None,
            cancellable=None,
        )
        self.con.connect('object-added', self._on_object_added)
        self.con.connect('object-removed', self._on_object_removed)
        self.con.connect('interface-proxy-properties-changed',
                         self._on_properties_changed)
        self.on_device_added = None
        self.on_device_removed = None

    def _introspect(self, object_path):
        introspectable = self.get_proxy(object_path, 'org.freedesktop.DBus.Introspectable')
        ret = introspectable.Introspect()
        print(ret)
        xml_object = ET.fromstring(ret)
        return xml_object

    def get(self, object_path):
        introspect_data = self._introspect(object_path)
        return proxy_factory(introspect_data, self)(self, object_path)

    def get_proxy(self, dbus_object, iface):
        return self.con.get_interface(dbus_object, iface)

    def _on_object_added(self,
                         dbus_obj_mngr: Gio.DBusObjectManager,
                         dbus_object: Gio.DBusObject) -> None:

        device_proxy = dbus_object.get_interface('org.bluez.Device1')

        if device_proxy and self.on_device_added:
            object_path = device_proxy.get_object_path()
            self.on_device_added(object_path)

    def _on_object_removed(self,
                           dbus_obj_mngr: Gio.DBusObjectManager,
                           dbus_object: Gio.DBusObject) -> None:

        device_proxy = dbus_object.get_interface('org.bluez.Device1')

        if device_proxy and self.on_device_removed:
            object_path = device_proxy.get_object_path()
            self.on_device_removed(object_path)

    def _on_properties_changed(
            self,
            dbus_client: Gio.DBusObjectManagerClient,
            object_proxy: Gio.DBusObjectProxy,
            interface_proxy: Gio.DBusProxy,
            changed_properties: GLib.Variant,
            invalidated_properties: list):
        print('Properites changed')
        print('\t', dbus_client)
        print('\t', object_proxy.get_object_path())
        print('\t', interface_proxy.get_interface_name())
        print('\t', changed_properties.unpack())
        print('\t', invalidated_properties)


def device_added(dbus_object: Gio.DBusObject):
    print(f'Device added: {dbus_object}')


def device_removed(dbus_object: Gio.DBusObject):
    print(f'Device removed: {dbus_object}')


if __name__ == '__main__':
    mainloop = GLib.MainLoop()
    bluez = BluezDBusClient()
    bluez.on_device_added = device_added
    bluez.on_device_removed = device_removed

    dongle_path = '/org/bluez/hci0'

    dongle = bluez.get(dongle_path)
    pprint(dir(dongle))
    print(dongle.Address)
    print(dongle.Name)
    dongle.StartDiscovery()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        mainloop.quit()

@ukBaz
Copy link
Owner Author

ukBaz commented Sep 16, 2021

Further investigation is still required before this is ready to be used on the bluezero library. However, it has given me encouragement that it is worth pursuing. It seems like there might be a case to use gi.repository (which is required anyway) directly. This is because the work required seems reasonable to remove a dependency.
The two items to investigate further are:

  • GetManagedObjects
  • Publish D-Bus service (e.g. BlueZ Profile, advertisement, and GATT server(

@ukBaz
Copy link
Owner Author

ukBaz commented Sep 16, 2021

Example of publishing methods to the System Bus

from gi.repository import Gio, GLib, GObject


class Server:

    def __init__(self, con, path):
        method_outargs = {}
        method_inargs = {}
        for interface in Gio.DBusNodeInfo.new_for_xml(self.__doc__).interfaces:

            for method in interface.methods:
                method_outargs[method.name] = "(" + "".join(
                              [arg.signature for arg in method.out_args]) + ")"
                method_inargs[method.name] = tuple(
                    arg.signature for arg in method.in_args)

            con.register_object(object_path=path,
                                interface_info=interface,
                                method_call_closure=self.on_method_call)

        self.method_inargs = method_inargs
        self.method_outargs = method_outargs

    def on_method_call(self,
                       connection,
                       sender,
                       object_path,
                       interface_name,
                       method_name,
                       parameters,
                       invocation):

        args = list(parameters.unpack())
        for i, sig in enumerate(self.method_inargs[method_name]):
            if sig == "h":
                msg = invocation.get_message()
                fd_list = msg.get_unix_fd_list()
                args[i] = fd_list.get(args[i])

        try:
            result = getattr(self, method_name)(*args)
            result = (result,)
            out_args = self.method_outargs[method_name]
            if out_args != "()":
                variant = GLib.Variant(out_args, result)
                invocation.return_value(variant)
            else:
                invocation.return_value(None)
        except:
            pass


class Greeter(Server):
    """
    <!DOCTYPE node PUBLIC
    "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
    "http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
    <node>
        <interface name="org.ukbaz.greeting">
            <method name="Hello">
                <arg direction="out" name="greeting" type="s"/>
            </method>
            <method name="Name">
                <arg direction="in" name="name" type="s"/>
                <arg direction="out" name="greeting" type="s"/>
            </method>
            <property name="Friendly" type="b" access="readwrite"/>
        </interface>
    </node>

    """
    def __init__(self, dbus_object_path):
        self.__bus = Gio.bus_get_sync(Gio.BusType.SYSTEM, None)
        Server.__init__(self,
                        self.__bus,
                        dbus_object_path)

    def Hello(self) -> str:
        return 'Hello'

    def Name(self, person: str) -> str:
        return f'Hello {person}'


if __name__ == '__main__':
    greeter = Greeter('/org/ukbaz/test')
    mainloop = GLib.MainLoop()
    try:
        mainloop.run()
    except KeyboardInterrupt:
        mainloop.quit()

To test this on the command line the following was done. (It is NOT published to a well-known name hence the use of grep to get the connection information of this instance).

$ busctl | grep python | grep -P "^:\d\.\d+"
:1.179                              12188 python          thinkabit1       :1.179        [email protected]           -       -          
$ busctl call :1.179 /org/ukbaz/test org.ukbaz.greeting Name s 'World!'
s "Hello World!"

@ukBaz
Copy link
Owner Author

ukBaz commented Sep 16, 2021

Some experiments to get the DBus object information managed by BlueZ:

from pprint import pprint
from gi.repository import Gio


class BluezObjectManager:
    def __init__(self):
        self.obj_mngr = Gio.DBusProxy.new_for_bus_sync(
            bus_type=Gio.BusType.SYSTEM,
            flags=Gio.DBusProxyFlags.NONE,
            info=None,
            name='org.bluez',
            object_path='/',
            interface_name='org.freedesktop.DBus.ObjectManager',
            cancellable=None)

    def get_managed_objects(self):
        return self.obj_mngr.GetManagedObjects()


def get_managed_objects():
    return Gio.DBusProxy.new_for_bus_sync(
            bus_type=Gio.BusType.SYSTEM,
            flags=Gio.DBusProxyFlags.NONE,
            info=None,
            name='org.bluez',
            object_path='/',
            interface_name='org.freedesktop.DBus.ObjectManager',
            cancellable=None).GetManagedObjects()


if __name__ == '__main__':
    bluez_om = BluezObjectManager()
    # pprint(bluez_om.obj_mngr.GetManagedObjects())
    # pprint(bluez_om.get_managed_objects())
    pprint(get_managed_objects())

@ukBaz
Copy link
Owner Author

ukBaz commented Jan 3, 2022

Have started to make changes to Bluezero on the following development branch:
https://github.com/ukBaz/python-bluezero/tree/gdbus

@ukBaz
Copy link
Owner Author

ukBaz commented Feb 7, 2022

I've written some notes about using PyGObject library for BlueZ D-Bus APIs:

https://ukbaz.github.io/howto/python_gio_1.html

https://ukbaz.github.io/howto/python_gio_2.html

https://ukbaz.github.io/howto/python_gio_3.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants