-
Notifications
You must be signed in to change notification settings - Fork 74
/
test_device_sequences.py
365 lines (306 loc) · 11.2 KB
/
test_device_sequences.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
import pytest
from dali.address import DeviceShort, InstanceNumber
from dali.command import NumericResponse, YesNoResponse
from dali.device import pushbutton
from dali.device.general import (
DTR0,
EventScheme,
QueryDeviceStatus,
QueryDeviceStatusResponse,
QueryEventFilterZeroToSeven,
QueryEventScheme,
QueryEventSchemeResponse,
QueryInputValue,
QueryInputValueLatch,
QueryResolution,
SetEventFilter,
)
from dali.device.helpers import DeviceInstanceTypeMapper, check_bad_rsp
from dali.device.pushbutton import InstanceEventFilter as EventFilter_pb
from dali.device.sequences import (
QueryEventFilters,
SetEventFilters,
SetEventSchemes,
query_input_value,
)
from dali.frame import BackwardFrame, BackwardFrameError
from dali.tests import fakes
def test_check_bad_rsp_none():
# A 'None' is always a bad response
assert check_bad_rsp(None)
def test_check_bad_rsp_yes():
# A 'Yes' is always a good response
assert not check_bad_rsp(YesNoResponse(BackwardFrame(0xFF)))
def test_check_bad_rsp_numeric():
# A numeric is always a good response, even if 0
assert not check_bad_rsp(NumericResponse(BackwardFrame(0xFF)))
assert not check_bad_rsp(NumericResponse(BackwardFrame(0x00)))
def test_check_bad_rsp_missing():
# A numeric response expects a value
assert check_bad_rsp(NumericResponse(None))
def test_check_bad_rsp_framing_error():
# A numeric response expects a value
assert check_bad_rsp(NumericResponse(BackwardFrameError(0xFF)))
@pytest.fixture
def fakes_bus():
return fakes.Bus(
[
fakes.Device(DeviceShort(0), memory_banks=(fakes.FakeBank0,)),
fakes.Device(DeviceShort(1), memory_banks=(fakes.FakeBank0,)),
fakes.Device(DeviceShort(2), memory_banks=(fakes.FakeBank0,)),
]
)
def test_device_autodiscover_good(fakes_bus):
dev_inst_map = DeviceInstanceTypeMapper()
fakes_bus.run_sequence(dev_inst_map.autodiscover())
# There are 4 instances on each fake device, and 3 devices, so 12 in total
assert len(dev_inst_map.mapping) == 12
class DeviceFakeError(fakes.Device):
# Refer to Table 15 of Part 103, status bit 0 is "inputDeviceError"
_device_status = 0b00000001
def test_device_autodiscover_dont_skip_bad(fakes_bus):
dev_inst_map = DeviceInstanceTypeMapper()
# Add an extra fake device, one with an error bit set
fakes_bus.gear.append(
DeviceFakeError(DeviceShort(3), memory_banks=(fakes.FakeBank0,))
)
# The input device error bit should show up
rsp = fakes_bus.send(QueryDeviceStatus(DeviceShort(3)))
assert isinstance(rsp, QueryDeviceStatusResponse)
assert rsp.input_device_error
fakes_bus.run_sequence(dev_inst_map.autodiscover())
# The mere fact that a device has "inputDeviceError" set does not mean it is not usable
assert len(dev_inst_map.mapping) == 16
class DeviceNoInstances(fakes.Device):
_instances = []
def test_device_autodiscover_skip_no_instances(fakes_bus):
dev_inst_map = DeviceInstanceTypeMapper()
# Add an extra fake device, one with an error bit set
fakes_bus.gear.append(
DeviceNoInstances(DeviceShort(3), memory_banks=(fakes.FakeBank0,))
)
fakes_bus.run_sequence(dev_inst_map.autodiscover())
# One device has no instances to count
assert len(dev_inst_map.mapping) == 12
def test_device_query_event_scheme(fakes_bus):
rsp = fakes_bus.send(QueryEventScheme(DeviceShort(1), InstanceNumber(1)))
assert isinstance(rsp, QueryEventSchemeResponse)
assert rsp.value == EventScheme.device_instance
def test_device_set_event_schemes(fakes_bus):
rsp = fakes_bus.send(QueryEventScheme(DeviceShort(2), InstanceNumber(3)))
assert isinstance(rsp, QueryEventSchemeResponse)
assert rsp.value == EventScheme.device_instance
rsp = fakes_bus.run_sequence(
SetEventSchemes(DeviceShort(2), InstanceNumber(3), EventScheme.device)
)
assert isinstance(rsp, QueryEventSchemeResponse)
assert rsp.value == EventScheme.device
def test_device_set_event_filters(fakes_bus):
rsp = fakes_bus.run_sequence(
QueryEventFilters(DeviceShort(1), InstanceNumber(2), pushbutton)
)
assert isinstance(rsp, EventFilter_pb)
assert rsp.value == 0
rsp = fakes_bus.run_sequence(
SetEventFilters(
DeviceShort(1),
InstanceNumber(2),
EventFilter_pb.short_press | EventFilter_pb.long_press_start,
)
)
assert isinstance(rsp, EventFilter_pb)
assert EventFilter_pb.short_press in rsp
assert EventFilter_pb.long_press_start in rsp
filter_rsp = rsp
rsp = fakes_bus.run_sequence(
QueryEventFilters(DeviceShort(1), InstanceNumber(2), EventFilter_pb)
)
assert isinstance(rsp, EventFilter_pb)
assert rsp == filter_rsp
def test_event_filter_pushbutton_sequence_partial():
"""
Confirms that the SetEventFilters sequence yields the expected
DALI message objects and sets the correct bit flags for DTR0 for
pushbutton instances
"""
filter_to_set = (
EventFilter_pb.button_released
| EventFilter_pb.button_stuck_free
| EventFilter_pb.double_press
)
sequence = SetEventFilters(
device=DeviceShort(0),
instance=InstanceNumber(0),
filter_value=filter_to_set,
)
# The first message the sequence should send is DTR0
rsp = None
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, DTR0)
assert cmd.frame.as_byte_sequence[2] == 0b10001001
# The second message should be SetEventFilter
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, SetEventFilter)
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
# The third message should be QueryEventFilterZeroToSeven
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryEventFilterZeroToSeven)
# The sequence should then just return whatever the response is
rsp = NumericResponse(BackwardFrame(0b10001001))
ret = None
try:
sequence.send(rsp)
except StopIteration as r:
ret = r.value
assert isinstance(ret, EventFilter_pb)
assert ret == filter_to_set
assert EventFilter_pb.button_released in ret
assert EventFilter_pb.button_stuck_free in ret
assert EventFilter_pb.double_press in ret
assert EventFilter_pb.short_press not in ret
def test_event_filter_pushbutton_sequence_all():
filter_to_set = (
EventFilter_pb.double_press
| EventFilter_pb.long_press_start
| EventFilter_pb.button_pressed
| EventFilter_pb.short_press
| EventFilter_pb.long_press_repeat
| EventFilter_pb.button_released
| EventFilter_pb.long_press_stop
| EventFilter_pb.button_stuck_free
| EventFilter_pb.double_press # Intentional duplicate
)
sequence = SetEventFilters(
device=DeviceShort(0),
instance=InstanceNumber(0),
filter_value=filter_to_set,
)
# The first message the sequence should send is DTR0
try:
cmd = sequence.send(None)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, DTR0)
assert cmd.frame.as_byte_sequence[2] == 0b11111111
# The previous test for the sequence adequately checks the remaining logic
def test_event_filter_sequence_int():
# InstanceEventFilter ultimately inherits from int, so it should be
# possible to use a plain int in the sequence
sequence = SetEventFilters(
device=DeviceShort(0),
instance=InstanceNumber(0),
filter_value=3,
)
# The first message the sequence should send is DTR0
try:
cmd = sequence.send(None)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, DTR0)
assert cmd.frame.as_byte_sequence[2] == 3
# The previous test for the sequence adequately checks the remaining logic
def test_event_filter_sequence_bad_type():
sequence = SetEventFilters(
device=DeviceShort(0),
instance=InstanceNumber(0),
filter_value="double_press",
)
# Using a string is not valid, it must be an enum object
with pytest.raises(TypeError):
sequence.send(None)
sequence = QueryEventFilters(
device=DeviceShort(0),
instance=InstanceNumber(0),
filter_type=EventFilter_pb.short_press,
)
# filter_type needs to be a type, not an instance
with pytest.raises(TypeError):
sequence.send(None)
def test_query_input_values_unknown_1bit():
"""
Reading a device's input register which has a one-bit resolution, with
no prior information. Device sends 0xff, which is converted to 1.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=None,
)
rsp = None
# No resolution was given, so the first message sends out a query for that
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryResolution)
assert cmd.frame.as_byte_sequence[2] == 0x81 # QUERY_RESOLUTION
rsp = NumericResponse(BackwardFrame(1))
# Ask for the first byte
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0xff))
ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value
assert ret == 1
def test_query_input_values_10bit():
"""
Reading a device's input register. The resolution is known upfront to be
10bit. The first byte is:
0x6c = 0110 1100
The second byte is:
0x9b = 1001 1011
And since only the first two bits are needed (10-8), the rest is just
repeated first byte, so the result is 0b0110110010 = 434.
"""
sequence = query_input_value(
device=DeviceShort(0),
instance=InstanceNumber(0),
resolution=10,
)
rsp = None
# resolution is known upfront, so there will be no querying
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValue)
assert cmd.frame.as_byte_sequence[2] == 0x8c # QUERY_INPUT_VALUE
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x6c))
try:
cmd = sequence.send(rsp)
except StopIteration:
raise RuntimeError()
assert isinstance(cmd, QueryInputValueLatch)
assert cmd.frame.as_byte_sequence[2] == 0x8d # QUERY_INPUT_VALUE_LATCH
assert cmd.destination == DeviceShort(0)
assert cmd.instance == InstanceNumber(0)
rsp = NumericResponse(BackwardFrame(0x9b))
ret = None
try:
cmd = sequence.send(rsp)
raise RuntimeError()
except StopIteration as r:
ret = r.value
assert ret == 434