Skip to content

Commit

Permalink
fakes.Device: implement commissioning functions
Browse files Browse the repository at this point in the history
This allows fakes to be discovered and assigned addresses.
  • Loading branch information
James Wah committed Jun 16, 2024
1 parent f30fa48 commit 0245607
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions dali/tests/fakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,11 +488,17 @@ def __init__(
shortaddr: Optional[address.DeviceShort] = None,
groups: Optional[Iterable[address.DeviceGroup]] = None,
memory_banks: Optional[Iterable[Type[FakeMemoryBank]]] = (FakeDeviceBank0,),
random_preload: list[int] = [],
):
# Store parameters
self.shortaddr = shortaddr
self.groups = set(groups) if groups else set()
# Configure internal variables
self.randomaddr = frame.Frame(24)
self.searchaddr = frame.Frame(24)
self.random_preload = random_preload
self.initialising = False
self.withdrawn = False
self.dtr0: int = 0
self.dtr1: int = 0
self.dtr2: int = 0
Expand All @@ -504,6 +510,12 @@ def __init__(
raise ValueError(f"Duplicate memory bank {bank_number}")
self.memory_banks[bank_number] = fake_bank()

def _next_random_address(self):
if self.random_preload:
return self.random_preload.pop(0)
else:
return random.randrange(0, 0x1000000)

def valid_address(self, cmd: Command) -> bool:
"""Should we respond to this command?"""
if len(cmd.frame) != 24:
Expand All @@ -521,6 +533,13 @@ def valid_address(self, cmd: Command) -> bool:
if isinstance(cmd.destination, address.DeviceGroup):
return cmd.destination in self.groups

@property
def shortaddr_int(self):
if self.shortaddr is None:
return 0xff
else:
return self.shortaddr.address

def send(self, cmd: Command) -> Optional[int]:
# Reset enable_write_memory if command is not one of the memory
# writing commands, even if the command is not addressed to us
Expand Down Expand Up @@ -627,6 +646,58 @@ def send(self, cmd: Command) -> Optional[int]:
finally:
if not bank.nobble_dtr0_update:
self.dtr0 = min(self.dtr0 + 1, 255)
elif isinstance(cmd, device.general.SetShortAddress):
if self.dtr0 == 0xff:
self.shortaddr = None
elif (self.dtr0 & 1) == 1:
self.shortaddr = address.DeviceShort((self.dtr0 & 0x7e) >> 1)
elif isinstance(cmd, device.general.QueryMissingShortAddress):
if self.shortaddr is None:
return _yes
elif isinstance(cmd, device.general.QueryRandomAddressH):
return self.randomaddr[23:16]
elif isinstance(cmd, device.general.QueryRandomAddressM):
return self.randomaddr[15:8]
elif isinstance(cmd, device.general.QueryRandomAddressL):
return self.randomaddr[7:0]
elif isinstance(cmd, device.general.Terminate):
self.initialising = False
self.withdrawn = False
elif isinstance(cmd, device.general.Initialise):
if cmd.param == 0xff \
or (cmd.param == 0x7f and self.shortaddr is None) \
or (cmd.param == self.shortaddr):
self.initialising = True
self.withdrawn = False
# We don't implement the 15 minute timer
elif isinstance(cmd, device.general.Randomise):
self.randomaddr = frame.Frame(24, self._next_random_address())
elif isinstance(cmd, device.general.Compare):
if self.initialising \
and not self.withdrawn \
and self.randomaddr.as_integer <= self.searchaddr.as_integer:
return _yes
elif isinstance(cmd, device.general.Withdraw):
if self.initialising \
and self.randomaddr == self.searchaddr:
self.withdrawn = True
elif isinstance(cmd, device.general.SearchAddrH):
self.searchaddr[23:16] = cmd.param
elif isinstance(cmd, device.general.SearchAddrM):
self.searchaddr[15:8] = cmd.param
elif isinstance(cmd, device.general.SearchAddrL):
self.searchaddr[7:0] = cmd.param
elif isinstance(cmd, device.general.ProgramShortAddress):
if self.initialising \
and self.randomaddr == self.searchaddr:
if cmd.param == 255:
self.shortaddr = None
else:
self.shortaddr = address.DeviceShort(cmd.param)
elif isinstance(cmd, device.general.VerifyShortAddress):
if self.initialising \
and self.shortaddr_int == cmd.param:
return _yes

return None

Expand Down

0 comments on commit 0245607

Please sign in to comment.