-
Notifications
You must be signed in to change notification settings - Fork 51
/
bluetooth_devices.py
261 lines (221 loc) · 11.7 KB
/
bluetooth_devices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
# Copyright (c) 2020 ruundii. All rights reserved.
import asyncio
import socket
import os
from subprocess import DEVNULL, PIPE
from typing import Awaitable, Callable, Optional, TYPE_CHECKING
from dasbus.connection import SystemMessageBus
if TYPE_CHECKING:
from hid_devices import HIDDeviceRegistry
OBJECT_MANAGER_INTERFACE = 'org.freedesktop.DBus.ObjectManager'
DEVICE_INTERFACE = 'org.bluez.Device1'
PROPERTIES_INTERFACE = 'org.freedesktop.DBus.Properties'
INPUT_DEVICE_INTERFACE = 'org.bluez.Input1'
INPUT_HOST_INTERFACE = 'org.bluez.InputHost1'
IGNORE_INPUT_DEVICES = True
class BluetoothDevice:
def __init__(self, bus: SystemMessageBus, loop: asyncio.AbstractEventLoop,
device_registry: "BluetoothDeviceRegistry", object_path: str,
is_host: bool, control_socket_path: str, interrupt_socket_path: str):
self.device = bus.get_proxy(service_name="org.bluez", object_path=object_path, interface_name=DEVICE_INTERFACE)
self.props = bus.get_proxy(service_name="org.bluez", object_path=object_path, interface_name=PROPERTIES_INTERFACE)
self.props.PropertiesChanged.connect(self.device_connected_state_changed)
self.bus = bus
self.loop = loop
self.device_registry = device_registry
self.object_path = object_path
self.is_host = is_host
self.control_socket_path: Optional[str] = control_socket_path
self.control_socket: Optional[socket.socket] = None
self.interrupt_socket_path: Optional[str] = interrupt_socket_path
self.interrupt_socket: Optional[socket.socket] = None
self.sockets_connected = False
print("BT Device ",object_path," created")
asyncio.run_coroutine_threadsafe(self.reconcile_connected_state(1), loop=self.loop)
async def reconcile_connected_state(self, delay: int) -> None:
await asyncio.sleep(delay)
try:
if self.connected and not self.sockets_connected:
await self.connect_sockets()
elif not self.connected and self.sockets_connected:
self.disconnect_sockets()
except Exception as exc:
print("Possibly dbus error during reconcile_connected_state ",exc)
async def connect_sockets(self) -> None:
if self.sockets_connected or self.control_socket_path is None or self.interrupt_socket_path is None:
return
print("Connecting sockets for ",self.object_path)
if not self.connected:
print("BT Device is not connected. No point connecting sockets. Skipping.")
try:
self.control_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
self.control_socket.connect(self.control_socket_path)
self.control_socket.setblocking(False)
self.interrupt_socket = socket.socket(socket.AF_UNIX, socket.SOCK_SEQPACKET)
self.interrupt_socket.connect(self.interrupt_socket_path)
self.interrupt_socket.setblocking(False)
self.sockets_connected = True
if(self.is_host):
self.device_registry.connected_hosts.append(self)
addr = self.object_path[-17:].replace("_",":")
asyncio.create_task(self.device_registry.switch_to_master(addr))
else:
self.device_registry.connected_devices.append(self)
print("Connected sockets for ",self.object_path)
asyncio.run_coroutine_threadsafe(self.loop_of_fun(True), loop=self.loop)
asyncio.run_coroutine_threadsafe(self.loop_of_fun(False), loop=self.loop)
except Exception as err:
print("Error while connecting sockets for ",self.object_path,". Will retry in a sec", err)
try:
if self.control_socket is not None:
self.control_socket.close()
if self.interrupt_socket is not None:
self.interrupt_socket.close()
except:
pass
await asyncio.sleep(1)
asyncio.run_coroutine_threadsafe(self.connect_sockets(), loop=self.loop)
def disconnect_sockets(self) -> None:
if self.control_socket is not None:
self.control_socket.close()
self.control_socket = None
if self.interrupt_socket is not None:
self.interrupt_socket.close()
self.interrupt_socket = None
if(self.is_host and self in self.device_registry.connected_hosts):
self.device_registry.connected_hosts.remove(self)
elif self in self.device_registry.connected_devices:
self.device_registry.connected_devices.remove(self)
self.sockets_connected = False
print("Disconnected sockets for ",self.object_path)
async def loop_of_fun(self, is_ctrl: bool) -> None:
sock = self.control_socket if is_ctrl else self.interrupt_socket
while sock is not None:
try:
msg = await self.loop.sock_recv(sock,255)
except Exception:
print("Cannot read data from socket. ", self.object_path ,"Closing sockets")
if self is not None:
try:
self.disconnect_sockets()
except:
print("Error while disconnecting sockets")
print("Arranging reconnect")
asyncio.run_coroutine_threadsafe(self.reconcile_connected_state(1), loop=self.loop)
break
if msg is None or len(msg)==0:
continue
self.device_registry.send_message(msg, not self.is_host, is_ctrl)
sock = self.control_socket if is_ctrl else self.interrupt_socket
@property
def name(self) -> str:
return self.device.Name
@property
def alias(self) -> str:
return self.device.Alias
@property
def connected(self) -> bool:
return self.device.Connected
def __eq__(self, other: object) -> bool:
if isinstance(other, BluetoothDevice):
return self.object_path == other.object_path
return False
def device_connected_state_changed(self, _arg1: object, _arg2: object, _arg3: object) -> None:
print("device_connected_state_changed")
asyncio.run_coroutine_threadsafe(self.reconcile_connected_state(1), loop=self.loop)
if self.device_registry.on_devices_changed_handler is not None:
asyncio.run_coroutine_threadsafe(self.device_registry.on_devices_changed_handler(), loop=self.loop)
def finalise(self) -> None:
self.props.PropertiesChanged.disconnect(self.device_connected_state_changed)
self.control_socket_path = None
self.interrupt_socket_path = None
# Close sockets
self.disconnect_sockets()
print("BT Device ",self.object_path," finalised")
def __del__(self) -> None:
print("BT Device ",self.object_path," removed")
class BluetoothDeviceRegistry:
def __init__(self, bus: SystemMessageBus, loop: asyncio.AbstractEventLoop):
self.bus = bus
self.loop = loop
self.all: dict[str, BluetoothDevice] = {}
self.connected_hosts: list[BluetoothDevice] = []
self.connected_devices: list[BluetoothDevice] = []
self.on_devices_changed_handler: Optional[Callable[[], Awaitable[None]]] = None
self.hid_devices: Optional["HIDDeviceRegistry"] = None
self.current_host_index = 0
def set_hid_devices(self, hid_devices: "HIDDeviceRegistry") -> None:
self.hid_devices = hid_devices
def set_on_devices_changed_handler(self, handler: Callable[[], Awaitable[None]]) -> None:
self.on_devices_changed_handler = handler
def add_devices(self) -> None:
print("Adding all BT devices")
om = self.bus.get_proxy(service_name= "org.bluez", object_path="/", interface_name=OBJECT_MANAGER_INTERFACE)
objs = om.GetManagedObjects()
for obj in list(objs):
if INPUT_HOST_INTERFACE in objs[obj]:
self.add_device(obj, True)
elif INPUT_DEVICE_INTERFACE in objs[obj]:
self.add_device(obj, False)
def add_device(self, device_object_path: str, is_host: bool) -> None:
if(IGNORE_INPUT_DEVICES and not is_host): return
if device_object_path in self.all:
print("Device ", device_object_path, " already exist. Cannot add. Skipping.")
return
#ensure master role for this connection, otherwise latency of sending packets to hosts may get pretty bad
asyncio.ensure_future(self.switch_to_master(device_object_path[-17:].replace("_",":")))
p = self.bus.get_proxy(service_name="org.bluez", object_path=device_object_path, interface_name=INPUT_HOST_INTERFACE if is_host else INPUT_DEVICE_INTERFACE)
device = BluetoothDevice(self.bus, self.loop, self, device_object_path, is_host, p.SocketPathCtrl, p.SocketPathIntr)
self.all[device_object_path] = device
async def switch_to_master(self, device_address: str) -> None:
print("switch to master called for ", device_address)
while await self.is_slave(device_address):
try:
proc = await asyncio.create_subprocess_exec("sudo", "hcitool", "sr", device_address, "MASTER", stdout=DEVNULL)
await proc.wait()
print("hcitool ", device_address, " success:", proc.returncode == 0)
except Exception as exc:
print("hcitool ",device_address," exception:",exc)
await asyncio.sleep(5)
async def is_slave(self, device_address: str) -> bool:
proc = await asyncio.create_subprocess_exec("sudo", "hcitool", "con", stdout=PIPE, stderr=DEVNULL)
stdout, stderr = await proc.communicate()
return any("SLAVE" in l and device_address in l for l in stdout.decode().split("\n"))
def remove_devices(self) -> None:
print("Removing all BT devices")
while len(self.all) >0:
self.remove_device(list(self.all)[0])
def remove_device(self, device_object_path: str) -> None:
if device_object_path not in self.all:
return # No such device
device = self.all[device_object_path]
del self.all[device_object_path]
list = self.connected_hosts if device.is_host else self.connected_devices
if device in list:
list.remove(device)
device.finalise()
del device
def switch_host(self) -> None:
self.current_host_index = (self.current_host_index + 1) % len(self.connected_hosts)
def __get_current_host_as_list(self) -> list[BluetoothDevice]:
if len(self.connected_hosts) <= self.current_host_index:
return []
return [self.connected_hosts[self.current_host_index]]
def send_message(self, msg: bytes, send_to_hosts: bool, is_control_channel: bool) -> None:
if IGNORE_INPUT_DEVICES and not send_to_hosts and not is_control_channel and self.hid_devices is not None:
asyncio.run_coroutine_threadsafe(self.hid_devices.send_message_to_devices(msg), loop=self.loop)
return
targets: list[BluetoothDevice] = self.__get_current_host_as_list() if send_to_hosts else self.connected_devices
for target in list(targets):
try:
socket = target.control_socket if is_control_channel else target.interrupt_socket
if socket is not None:
socket.sendall(msg)
except Exception:
print("Cannot send data to socket of ",target.object_path,". Closing")
if target is not None:
try:
target.disconnect_sockets()
except:
print("Error while trying to disconnect sockets")
asyncio.run_coroutine_threadsafe(target.reconcile_connected_state(1), loop=self.loop)