From 708c1adc1699d38367a9501c11de4da4494b27ef Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Thu, 8 Feb 2024 22:24:40 -0500 Subject: [PATCH 01/20] Support firmware extensions --- bellows/ezsp/__init__.py | 15 +++++++++++++ bellows/ezsp/custom_commands.py | 12 +++++++++++ bellows/zigbee/application.py | 37 ++++++++++++++++++++++----------- 3 files changed, 52 insertions(+), 12 deletions(-) create mode 100644 bellows/ezsp/custom_commands.py diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 669fd0f2..a8a1c46a 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -23,6 +23,7 @@ import bellows.config as conf from bellows.exception import EzspError, InvalidCommandError +from bellows.ezsp import custom_commands from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig import bellows.types as t import bellows.uart @@ -616,3 +617,17 @@ async def write_config(self, config: dict) -> None: status, ) continue + + async def get_supported_firmware_features( + self, + ) -> custom_commands.FirmwareFeatures: + """Get supported firmware extensions.""" + try: + status, rsp_data = await self.customFrame( + bytes([custom_commands.CustomCommand.CMD_GET_SUPPORTED_FEATURES]), + ) + except InvalidCommandError: + return custom_commands.SupportedCustomFeatures(0) + + features, _ = custom_commands.SupportedCustomFeatures.deserialize(rsp_data) + return features diff --git a/bellows/ezsp/custom_commands.py b/bellows/ezsp/custom_commands.py new file mode 100644 index 00000000..e03b0767 --- /dev/null +++ b/bellows/ezsp/custom_commands.py @@ -0,0 +1,12 @@ +"""Custom EZSP commands.""" + +import zigpy.types as t + + +class CustomCommand(t.enum8): + CMD_GET_SUPPORTED_FEATURES = 0x00 + + +class FirmwareFeatures(t.bitmap32): + # The firmware passes through all group traffic, regardless of group membership + MEMBER_OF_ALL_GROUPS = 0b00000000_00000000_00000000_00000001 diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 229e1499..b75b7474 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -32,6 +32,7 @@ ) from bellows.exception import ControllerError, EzspError, StackAlreadyRunning import bellows.ezsp +from bellows.ezsp.custom_commands import FirmwareFeatures import bellows.multicast import bellows.types as t from bellows.zigbee import repairs @@ -200,15 +201,14 @@ async def start_network(self): ezsp.add_callback(self.ezsp_callback_handler) self.controller_event.set() - group_membership = {} + custom_features = await self._ezsp.get_supported_custom_features() + LOGGER.debug("Supported custom firmware features: %r", custom_features) - try: - db_device = self.get_device(ieee=self.state.node_info.ieee) - except KeyError: - pass + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in custom_features: + # If the firmware passes through all incoming group messages, do nothing + endpoint_cls = zigpy.endpoint.Endpoint else: - if 1 in db_device.endpoints: - group_membership = db_device.endpoints[1].member_of + endpoint_cls = EZSPEndpoint ezsp_device = zigpy.device.Device( application=self, @@ -220,18 +220,31 @@ async def start_network(self): # The coordinator device does not respond to attribute reads so we have to # divine the internal NCP state. for zdo_desc in self._created_device_endpoints: - ep = EZSPEndpoint(ezsp_device, zdo_desc.endpoint, zdo_desc) + ep = endpoint_cls(ezsp_device, zdo_desc.endpoint, zdo_desc) ezsp_device.endpoints[zdo_desc.endpoint] = ep ezsp_device.model = ep.model ezsp_device.manufacturer = ep.manufacturer await ezsp_device.schedule_initialize() - # Group membership is stored in the database for EZSP coordinators - ezsp_device.endpoints[1].member_of.update(group_membership) + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in custom_features: + # If the firmware does not support group traffic passthrough, register the + # replacement `Endpoint` objects that proxy endpoint registration to the NCP + group_membership = {} + + try: + db_device = self.get_device(ieee=self.state.node_info.ieee) + except KeyError: + pass + else: + if 1 in db_device.endpoints: + group_membership = db_device.endpoints[1].member_of + + # Group membership is stored in the database for EZSP coordinators + ezsp_device.endpoints[1].member_of.update(group_membership) - self._multicast = bellows.multicast.Multicast(ezsp) - await self._multicast.startup(ezsp_device) + self._multicast = bellows.multicast.Multicast(ezsp) + await self._multicast.startup(ezsp_device) async def load_network_info(self, *, load_devices=False) -> None: ezsp = self._ezsp From 52b5f1b5a219770f7f00f377b31748de47e64405 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Thu, 8 Feb 2024 23:06:56 -0500 Subject: [PATCH 02/20] Fix startup --- bellows/ezsp/__init__.py | 4 ++-- bellows/zigbee/application.py | 10 +++++----- bellows/zigbee/device.py | 28 ++++++++++++++++------------ 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index a8a1c46a..a1f6741c 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -627,7 +627,7 @@ async def get_supported_firmware_features( bytes([custom_commands.CustomCommand.CMD_GET_SUPPORTED_FEATURES]), ) except InvalidCommandError: - return custom_commands.SupportedCustomFeatures(0) + return custom_commands.FirmwareFeatures(0) - features, _ = custom_commands.SupportedCustomFeatures.deserialize(rsp_data) + features, _ = custom_commands.FirmwareFeatures.deserialize(rsp_data) return features diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index b75b7474..f3136113 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -36,7 +36,7 @@ import bellows.multicast import bellows.types as t from bellows.zigbee import repairs -from bellows.zigbee.device import EZSPEndpoint +from bellows.zigbee.device import EZSPEndpoint, EZSPGroupEndpoint import bellows.zigbee.util as util APS_ACK_TIMEOUT = 120 @@ -201,14 +201,14 @@ async def start_network(self): ezsp.add_callback(self.ezsp_callback_handler) self.controller_event.set() - custom_features = await self._ezsp.get_supported_custom_features() + custom_features = await self._ezsp.get_supported_firmware_features() LOGGER.debug("Supported custom firmware features: %r", custom_features) if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in custom_features: # If the firmware passes through all incoming group messages, do nothing - endpoint_cls = zigpy.endpoint.Endpoint - else: endpoint_cls = EZSPEndpoint + else: + endpoint_cls = EZSPGroupEndpoint ezsp_device = zigpy.device.Device( application=self, @@ -220,7 +220,7 @@ async def start_network(self): # The coordinator device does not respond to attribute reads so we have to # divine the internal NCP state. for zdo_desc in self._created_device_endpoints: - ep = endpoint_cls(ezsp_device, zdo_desc.endpoint, zdo_desc) + ep = endpoint_cls.from_descriptor(ezsp_device, zdo_desc.endpoint, zdo_desc) ezsp_device.endpoints[zdo_desc.endpoint] = ep ezsp_device.model = ep.model ezsp_device.manufacturer = ep.manufacturer diff --git a/bellows/zigbee/device.py b/bellows/zigbee/device.py index f34f3b80..9e6b65f7 100644 --- a/bellows/zigbee/device.py +++ b/bellows/zigbee/device.py @@ -21,30 +21,32 @@ class EZSPEndpoint(zigpy.endpoint.Endpoint): - def __init__( - self, + @classmethod + def from_descriptor( + cls, device: zigpy.device.Device, endpoint_id: int, descriptor: zdo_t.SimpleDescriptor, ) -> None: - super().__init__(device, endpoint_id) + ep = cls(device, endpoint_id) + ep.profile_id = descriptor.profile - self.profile_id = descriptor.profile - - if self.profile_id in PROFILE_TO_DEVICE_TYPE: - self.device_type = PROFILE_TO_DEVICE_TYPE[self.profile_id]( + if ep.profile_id in PROFILE_TO_DEVICE_TYPE: + ep.device_type = PROFILE_TO_DEVICE_TYPE[ep.profile_id]( descriptor.device_type ) else: - self.device_type = descriptor.device_type + ep.device_type = descriptor.device_type for cluster in descriptor.input_clusters: - self.add_input_cluster(cluster) + ep.add_input_cluster(cluster) for cluster in descriptor.output_clusters: - self.add_output_cluster(cluster) + ep.add_output_cluster(cluster) + + ep.status = zigpy.endpoint.Status.ZDO_INIT - self.status = zigpy.endpoint.Status.ZDO_INIT + return ep @property def manufacturer(self) -> str: @@ -56,7 +58,9 @@ def model(self) -> str: """Model.""" return "EZSP" - async def add_to_group(self, grp_id: int, name: str = None) -> None: + +class EZSPGroupEndpoint(EZSPEndpoint): + async def add_to_group(self, grp_id: int, name: str = None) -> t.EmberStatus: if grp_id in self.member_of: return From 9a9c7c8640208750601ad981273774bae5dd20e6 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 3 May 2024 16:52:44 -0400 Subject: [PATCH 03/20] Expand the custom command protocol --- bellows/ezsp/__init__.py | 19 +++++++++++----- bellows/ezsp/custom_commands.py | 40 +++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index a1f6741c..ba8b7dc9 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -622,12 +622,21 @@ async def get_supported_firmware_features( self, ) -> custom_commands.FirmwareFeatures: """Get supported firmware extensions.""" + req = custom_commands.CustomCommand( + command_id=custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES_REQ, + payload=custom_commands.GetSupportedFeaturesReq().serialize(), + ) + try: - status, rsp_data = await self.customFrame( - bytes([custom_commands.CustomCommand.CMD_GET_SUPPORTED_FEATURES]), - ) + status, data = await self.customFrame(req.serialize()) except InvalidCommandError: return custom_commands.FirmwareFeatures(0) - features, _ = custom_commands.FirmwareFeatures.deserialize(rsp_data) - return features + rsp_cmd, _ = custom_commands.CustomCommand.deserialize(data) + assert ( + rsp_cmd.command_id + == custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES_RSP + ) + + rsp, _ = custom_commands.GetSupportedFeaturesRsp.deserialize(rsp_cmd.payload) + return rsp.features diff --git a/bellows/ezsp/custom_commands.py b/bellows/ezsp/custom_commands.py index e03b0767..e5edb6e4 100644 --- a/bellows/ezsp/custom_commands.py +++ b/bellows/ezsp/custom_commands.py @@ -1,12 +1,48 @@ """Custom EZSP commands.""" +from __future__ import annotations import zigpy.types as t -class CustomCommand(t.enum8): - CMD_GET_SUPPORTED_FEATURES = 0x00 +class Bytes(bytes): + def serialize(self) -> Bytes: + return self + + @classmethod + def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]: + return cls(data), b"" + + def __repr__(self) -> str: + # Reading byte sequences like \x200\x21 is extremely annoying + # compared to \x20\x30\x21 + escaped = "".join(f"\\x{b:02X}" for b in self) + + return f"b'{escaped}'" + + __str__ = __repr__ + + +class CustomCommandId(t.enum16): + CMD_GET_PROTOCOL_VERSION_REQ = 0x0000 + CMD_GET_PROTOCOL_VERSION_RSP = 0x8000 + + CMD_GET_SUPPORTED_FEATURES_REQ = 0x0001 + CMD_GET_SUPPORTED_FEATURES_RSP = 0x8001 + + +class CustomCommand(t.Struct): + command_id: CustomCommandId + payload: Bytes class FirmwareFeatures(t.bitmap32): # The firmware passes through all group traffic, regardless of group membership MEMBER_OF_ALL_GROUPS = 0b00000000_00000000_00000000_00000001 + + +class GetSupportedFeaturesReq(t.Struct): + pass + + +class GetSupportedFeaturesRsp(t.Struct): + features: FirmwareFeatures From 8704efdcd3b4a88e6487cd21fb09e4e5714a2245 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Mon, 6 May 2024 16:01:50 -0400 Subject: [PATCH 04/20] Fix existing unit tests --- bellows/ezsp/__init__.py | 2 +- bellows/ezsp/custom_commands.py | 4 +++- bellows/zigbee/application.py | 24 ++++++++++-------------- tests/test_application.py | 12 +++++++++--- 4 files changed, 23 insertions(+), 19 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index ba8b7dc9..15c0feea 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -630,7 +630,7 @@ async def get_supported_firmware_features( try: status, data = await self.customFrame(req.serialize()) except InvalidCommandError: - return custom_commands.FirmwareFeatures(0) + return custom_commands.FirmwareFeatures.NONE rsp_cmd, _ = custom_commands.CustomCommand.deserialize(data) assert ( diff --git a/bellows/ezsp/custom_commands.py b/bellows/ezsp/custom_commands.py index e5edb6e4..895b8e6f 100644 --- a/bellows/ezsp/custom_commands.py +++ b/bellows/ezsp/custom_commands.py @@ -36,8 +36,10 @@ class CustomCommand(t.Struct): class FirmwareFeatures(t.bitmap32): + NONE = 0 + # The firmware passes through all group traffic, regardless of group membership - MEMBER_OF_ALL_GROUPS = 0b00000000_00000000_00000000_00000001 + MEMBER_OF_ALL_GROUPS = 1 << 0 class GetSupportedFeaturesReq(t.Struct): diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index f3136113..f86f15c0 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -204,12 +204,22 @@ async def start_network(self): custom_features = await self._ezsp.get_supported_firmware_features() LOGGER.debug("Supported custom firmware features: %r", custom_features) + group_membership = {} + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in custom_features: # If the firmware passes through all incoming group messages, do nothing endpoint_cls = EZSPEndpoint else: endpoint_cls = EZSPGroupEndpoint + try: + db_device = self.get_device(ieee=self.state.node_info.ieee) + except KeyError: + pass + else: + if 1 in db_device.endpoints: + group_membership = db_device.endpoints[1].member_of + ezsp_device = zigpy.device.Device( application=self, ieee=self.state.node_info.ieee, @@ -228,21 +238,7 @@ async def start_network(self): await ezsp_device.schedule_initialize() if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in custom_features: - # If the firmware does not support group traffic passthrough, register the - # replacement `Endpoint` objects that proxy endpoint registration to the NCP - group_membership = {} - - try: - db_device = self.get_device(ieee=self.state.node_info.ieee) - except KeyError: - pass - else: - if 1 in db_device.endpoints: - group_membership = db_device.endpoints[1].member_of - - # Group membership is stored in the database for EZSP coordinators ezsp_device.endpoints[1].member_of.update(group_membership) - self._multicast = bellows.multicast.Multicast(ezsp) await self._multicast.startup(ezsp_device) diff --git a/tests/test_application.py b/tests/test_application.py index 8581e52f..8f044235 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -16,6 +16,7 @@ import bellows.config as config from bellows.exception import ControllerError, EzspError import bellows.ezsp as ezsp +from bellows.ezsp.custom_commands import FirmwareFeatures from bellows.ezsp.v9.commands import GetTokenDataRsp import bellows.types import bellows.types as t @@ -122,6 +123,9 @@ def _create_app_for_startup( ezsp_mock.wait_for_stack_status.return_value.__enter__ = AsyncMock( return_value=t.EmberStatus.NETWORK_UP ) + ezsp_mock.get_supported_firmware_features = AsyncMock( + return_value=FirmwareFeatures.NONE + ) if board_info: ezsp_mock.get_board_info = AsyncMock( @@ -1290,7 +1294,9 @@ async def test_shutdown(app): @pytest.fixture def coordinator(app, ieee): dev = zigpy.device.Device(app, ieee, 0x0000) - dev.endpoints[1] = bellows.zigbee.device.EZSPEndpoint(dev, 1, MagicMock()) + dev.endpoints[1] = bellows.zigbee.device.EZSPGroupEndpoint.from_descriptor( + dev, 1, MagicMock() + ) dev.model = dev.endpoints[1].model dev.manufacturer = dev.endpoints[1].manufacturer @@ -1623,8 +1629,8 @@ async def test_startup_coordinator_existing_groups_joined(app, ieee): db_device = app.add_device(ieee, 0x0000) db_ep = db_device.add_endpoint(1) - app.groups.add_group(0x1234, "Group Name", suppress_event=True) - app.groups[0x1234].add_member(db_ep, suppress_event=True) + group = app.groups.add_group(0x1234, "Group Name", suppress_event=True) + group.add_member(db_ep, suppress_event=True) await app.start_network() From 7dbb45bb6d39668fc3c63474a6187edfd114efa6 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 7 May 2024 17:54:42 -0400 Subject: [PATCH 05/20] Handle empty `customFrame` response --- bellows/ezsp/__init__.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 15c0feea..cd03d54c 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -632,6 +632,9 @@ async def get_supported_firmware_features( except InvalidCommandError: return custom_commands.FirmwareFeatures.NONE + if not data: + return custom_commands.FirmwareFeatures.NONE + rsp_cmd, _ = custom_commands.CustomCommand.deserialize(data) assert ( rsp_cmd.command_id From dcdde48b0b98e810751bb99de07da670c2ae70d4 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 28 May 2024 15:00:03 -0400 Subject: [PATCH 06/20] Support setting source routes as well --- bellows/ezsp/__init__.py | 27 +++++++++++++++++++-------- bellows/ezsp/custom_commands.py | 19 ++++++++++++++----- bellows/zigbee/application.py | 23 +++++++++++++++-------- 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index cd03d54c..84d0a16b 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -618,12 +618,12 @@ async def write_config(self, config: dict) -> None: ) continue - async def get_supported_firmware_features( + async def xncp_get_supported_firmware_features( self, ) -> custom_commands.FirmwareFeatures: """Get supported firmware extensions.""" req = custom_commands.CustomCommand( - command_id=custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES_REQ, + command_id=custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES, payload=custom_commands.GetSupportedFeaturesReq().serialize(), ) @@ -635,11 +635,22 @@ async def get_supported_firmware_features( if not data: return custom_commands.FirmwareFeatures.NONE - rsp_cmd, _ = custom_commands.CustomCommand.deserialize(data) - assert ( - rsp_cmd.command_id - == custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES_RSP + rsp, _ = custom_commands.GetSupportedFeaturesRsp.deserialize(data) + return rsp.features + + async def xncp_set_manual_source_route( + self, destination: t.NWK, route: list[t.NWK] + ) -> None: + """Set a manual source route.""" + req = custom_commands.CustomCommand( + command_id=custom_commands.CustomCommandId.CMD_SET_SOURCE_ROUTE, + payload=custom_commands.SetSourceRouteReq( + destination=destination, source_route=route + ).serialize(), ) - rsp, _ = custom_commands.GetSupportedFeaturesRsp.deserialize(rsp_cmd.payload) - return rsp.features + status, data = await self.customFrame(req.serialize()) + if status != self.types.EmberStatus.SUCCESS: + raise EzspError(f"Failed to set source route: {status}") + + return None diff --git a/bellows/ezsp/custom_commands.py b/bellows/ezsp/custom_commands.py index 895b8e6f..c4778404 100644 --- a/bellows/ezsp/custom_commands.py +++ b/bellows/ezsp/custom_commands.py @@ -23,11 +23,8 @@ def __repr__(self) -> str: class CustomCommandId(t.enum16): - CMD_GET_PROTOCOL_VERSION_REQ = 0x0000 - CMD_GET_PROTOCOL_VERSION_RSP = 0x8000 - - CMD_GET_SUPPORTED_FEATURES_REQ = 0x0001 - CMD_GET_SUPPORTED_FEATURES_RSP = 0x8001 + CMD_GET_SUPPORTED_FEATURES = 0x0000 + CMD_SET_SOURCE_ROUTE = 0x0001 class CustomCommand(t.Struct): @@ -41,6 +38,9 @@ class FirmwareFeatures(t.bitmap32): # The firmware passes through all group traffic, regardless of group membership MEMBER_OF_ALL_GROUPS = 1 << 0 + # Source routes can be overridden by the application + MANUAL_SOURCE_ROUTE = 1 << 1 + class GetSupportedFeaturesReq(t.Struct): pass @@ -48,3 +48,12 @@ class GetSupportedFeaturesReq(t.Struct): class GetSupportedFeaturesRsp(t.Struct): features: FirmwareFeatures + + +class SetSourceRouteReq(t.Struct): + destination: t.NWK + source_route: t.List[t.NWK] + + +class SetSourceRouteRsp(t.Struct): + pass diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index f86f15c0..d4d561c2 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -88,6 +88,7 @@ def __init__(self, config: dict): self._pending = zigpy.util.Requests() self._watchdog_failures = 0 self._watchdog_feed_counter = 0 + self._custom_features = FirmwareFeatures.NONE self._req_lock = asyncio.Lock() @@ -201,12 +202,12 @@ async def start_network(self): ezsp.add_callback(self.ezsp_callback_handler) self.controller_event.set() - custom_features = await self._ezsp.get_supported_firmware_features() - LOGGER.debug("Supported custom firmware features: %r", custom_features) + self._custom_features = await self._ezsp.xncp_get_supported_firmware_features() + LOGGER.debug("Supported custom firmware features: %r", self._custom_features) group_membership = {} - if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in custom_features: + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in self._custom_features: # If the firmware passes through all incoming group messages, do nothing endpoint_cls = EZSPEndpoint else: @@ -237,7 +238,7 @@ async def start_network(self): await ezsp_device.schedule_initialize() - if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in custom_features: + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in self._custom_features: ezsp_device.endpoints[1].member_of.update(group_membership) self._multicast = bellows.multicast.Multicast(ezsp) await self._multicast.startup(ezsp_device) @@ -762,10 +763,16 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: ) if packet.source_route is not None: - await self._ezsp.set_source_route( - nwk=packet.dst.address, - relays=packet.source_route, - ) + if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._custom_features: + await self._ezsp.xncp_set_manual_source_route( + nwk=packet.dst.address, + relays=packet.source_route, + ) + else: + await self._ezsp.set_source_route( + nwk=packet.dst.address, + relays=packet.source_route, + ) status, _ = await self._ezsp.send_unicast( nwk=packet.dst.address, From 5fbe1793722132d70bbae197ad76ae4e782a0c37 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 29 May 2024 17:19:41 -0400 Subject: [PATCH 07/20] Rename custom commands to XNCP and support board string overrides --- bellows/ezsp/__init__.py | 123 ++++++++++++++++++---------- bellows/ezsp/custom_commands.py | 59 ------------- bellows/ezsp/xncp.py | 141 ++++++++++++++++++++++++++++++++ bellows/zigbee/application.py | 16 ++-- tests/test_application.py | 1 + 5 files changed, 230 insertions(+), 110 deletions(-) delete mode 100644 bellows/ezsp/custom_commands.py create mode 100644 bellows/ezsp/xncp.py diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 84d0a16b..272a9127 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -23,8 +23,9 @@ import bellows.config as conf from bellows.exception import EzspError, InvalidCommandError -from bellows.ezsp import custom_commands +from bellows.ezsp import xncp from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig +from bellows.ezsp.xncp import FirmwareFeatures import bellows.types as t import bellows.uart @@ -63,6 +64,7 @@ def __init__(self, device_config: dict, application: Any | None = None): self._callbacks = {} self._ezsp_event = asyncio.Event() self._ezsp_version = v4.EZSPv4.VERSION + self._xncp_features = FirmwareFeatures.NONE self._gw = None self._protocol = None self._application = application @@ -168,11 +170,23 @@ async def version(self): if ver != self.ezsp_version: self._switch_protocol_version(ver) await self._command("version", desiredProtocolVersion=ver) + + try: + self._xncp_features = await self.xncp_get_supported_firmware_features() + except InvalidCommandError: + self._xncp_features = xncp.FirmwareFeatures.NONE + LOGGER.debug( - "EZSP Stack Type: %s, Stack Version: %04x, Protocol version: %s", + ( + "EZSP Stack Type: %s" + ", Stack Version: %04x" + ", Protocol version: %s" + ", XNCP features: %s" + ), stack_type, stack_version, ver, + self._xncp_features, ) async def disconnect(self): @@ -309,26 +323,46 @@ async def get_board_info( ) -> tuple[str, str, str | None] | tuple[None, None, str | None]: """Return board info.""" - tokens = {} + raw_tokens: dict[t.EzspMfgTokenId, list[bytes]] = { + t.EzspMfgTokenId.MFG_STRING: [], + t.EzspMfgTokenId.MFG_BOARD_NAME: [], + } + # Prefer XNCP overrides if they exist + try: + override_board, override_manuf = await self.xncp_get_board_info_overrides() + except InvalidCommandError: + pass + else: + raw_tokens[t.EzspMfgTokenId.MFG_STRING].append(override_manuf) + raw_tokens[t.EzspMfgTokenId.MFG_BOARD_NAME].append(override_board) + + # If not, read manufacturing tokens for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): (value,) = await self.getMfgToken(tokenId=token) LOGGER.debug("Read %s token: %s", token.name, value) - - # Tokens are fixed-length and initially filled with \xFF but also can end - # with \x00 - while value.endswith((b"\xFF", b"\x00")): - value = value.rstrip(b"\xFF").rstrip(b"\x00") - - try: - result = value.decode("utf-8") - except UnicodeDecodeError: - result = "0x" + value.hex().upper() - - if not result: - result = None - - tokens[token] = result + raw_tokens[token].append(value) + + # Try to parse them + tokens: dict[t.EzspMfgTokenId, str] = {} + + for token_id, values in raw_tokens.items(): + for value in values: + # Tokens are fixed-length and initially filled with \xFF but also can end + # with \x00 + while value.endswith((b"\xFF", b"\x00")): + value = value.rstrip(b"\xFF").rstrip(b"\x00") + + try: + result = value.decode("utf-8") + except UnicodeDecodeError: + result = "0x" + value.hex().upper() + + if result: + tokens[token_id] = result + break + else: + tokens[token_id] = None (status, ver_info_bytes) = await self.getValue( valueId=t.EzspValueId.VALUE_VERSION_INFO @@ -618,39 +652,44 @@ async def write_config(self, config: dict) -> None: ) continue - async def xncp_get_supported_firmware_features( - self, - ) -> custom_commands.FirmwareFeatures: - """Get supported firmware extensions.""" - req = custom_commands.CustomCommand( - command_id=custom_commands.CustomCommandId.CMD_GET_SUPPORTED_FEATURES, - payload=custom_commands.GetSupportedFeaturesReq().serialize(), - ) + async def send_xncp_frame( + self, payload: xncp.XncpCommandPayload + ) -> xncp.XncpCommandPayload: + """Send an XNCP frame.""" + req_frame = xncp.XncpCommand.from_payload(payload) + LOGGER.debug("Sending XNCP frame: %s", req_frame) + status, data = await self.customFrame(req_frame.serialize()) - try: - status, data = await self.customFrame(req.serialize()) - except InvalidCommandError: - return custom_commands.FirmwareFeatures.NONE + if status != t.EmberStatus.SUCCESS: + raise InvalidCommandError("XNCP is not supported") - if not data: - return custom_commands.FirmwareFeatures.NONE + rsp_frame = xncp.XncpCommand.from_bytes(data) + LOGGER.debug("Received XNCP frame: %s", rsp_frame) + + if rsp_frame.status != t.EmberStatus.SUCCESS: + raise InvalidCommandError(f"XNCP response error: {rsp_frame.status}") + + return rsp_frame.payload - rsp, _ = custom_commands.GetSupportedFeaturesRsp.deserialize(data) + async def xncp_get_supported_firmware_features(self) -> xncp.FirmwareFeatures: + """Get supported firmware extensions.""" + rsp = await self.send_xncp_frame(xncp.GetSupportedFeaturesReq()) return rsp.features async def xncp_set_manual_source_route( self, destination: t.NWK, route: list[t.NWK] ) -> None: """Set a manual source route.""" - req = custom_commands.CustomCommand( - command_id=custom_commands.CustomCommandId.CMD_SET_SOURCE_ROUTE, - payload=custom_commands.SetSourceRouteReq( - destination=destination, source_route=route - ).serialize(), + await self.send_xncp_frame( + xncp.SetSourceRouteReq( + destination=destination, + source_route=route, + ) ) - status, data = await self.customFrame(req.serialize()) - if status != self.types.EmberStatus.SUCCESS: - raise EzspError(f"Failed to set source route: {status}") + async def xncp_get_board_info_overrides(self) -> tuple[str | None, str | None]: + """Get board information overrides.""" + name_rsp = await self.send_xncp_frame(xncp.GetBoardNameReq()) + manuf_rsp = await self.send_xncp_frame(xncp.GetManufNameReq()) - return None + return (name_rsp.board_name or None, manuf_rsp.manuf_name or None) diff --git a/bellows/ezsp/custom_commands.py b/bellows/ezsp/custom_commands.py deleted file mode 100644 index c4778404..00000000 --- a/bellows/ezsp/custom_commands.py +++ /dev/null @@ -1,59 +0,0 @@ -"""Custom EZSP commands.""" -from __future__ import annotations - -import zigpy.types as t - - -class Bytes(bytes): - def serialize(self) -> Bytes: - return self - - @classmethod - def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]: - return cls(data), b"" - - def __repr__(self) -> str: - # Reading byte sequences like \x200\x21 is extremely annoying - # compared to \x20\x30\x21 - escaped = "".join(f"\\x{b:02X}" for b in self) - - return f"b'{escaped}'" - - __str__ = __repr__ - - -class CustomCommandId(t.enum16): - CMD_GET_SUPPORTED_FEATURES = 0x0000 - CMD_SET_SOURCE_ROUTE = 0x0001 - - -class CustomCommand(t.Struct): - command_id: CustomCommandId - payload: Bytes - - -class FirmwareFeatures(t.bitmap32): - NONE = 0 - - # The firmware passes through all group traffic, regardless of group membership - MEMBER_OF_ALL_GROUPS = 1 << 0 - - # Source routes can be overridden by the application - MANUAL_SOURCE_ROUTE = 1 << 1 - - -class GetSupportedFeaturesReq(t.Struct): - pass - - -class GetSupportedFeaturesRsp(t.Struct): - features: FirmwareFeatures - - -class SetSourceRouteReq(t.Struct): - destination: t.NWK - source_route: t.List[t.NWK] - - -class SetSourceRouteRsp(t.Struct): - pass diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py new file mode 100644 index 00000000..a2d5781e --- /dev/null +++ b/bellows/ezsp/xncp.py @@ -0,0 +1,141 @@ +"""Custom EZSP commands.""" +from __future__ import annotations + +import dataclasses +from typing import Callable + +import zigpy.types as t + +from bellows.types import EmberStatus + +COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {} +REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {} + + +def register_command(command_id: XncpCommandId) -> Callable[[type], type]: + def decorator(cls: type) -> type: + COMMANDS[command_id] = cls + REV_COMMANDS[cls] = command_id + return cls + + return decorator + + +class Bytes(bytes): + def serialize(self) -> Bytes: + return self + + @classmethod + def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]: + return cls(data), b"" + + def __repr__(self) -> str: + # Reading byte sequences like \x200\x21 is extremely annoying + # compared to \x20\x30\x21 + escaped = "".join(f"\\x{b:02X}" for b in self) + + return f"b'{escaped}'" + + __str__ = __repr__ + + +class XncpCommandId(t.enum16): + GET_SUPPORTED_FEATURES_REQ = 0x0000 + SET_SOURCE_ROUTE_REQ = 0x0001 + GET_BOARD_NAME_REQ = 0x0002 + GET_MANUF_NAME_REQ = 0x0003 + + GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 + SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 + GET_BOARD_NAME_RSP = GET_BOARD_NAME_REQ | 0x8000 + GET_MANUF_NAME_RSP = GET_MANUF_NAME_REQ | 0x8000 + + UNKNOWN = 0xFFFF + + +@dataclasses.dataclass +class XncpCommand: + command_id: XncpCommandId + status: EmberStatus + payload: XncpCommandPayload + + @classmethod + def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand: + return cls( + command_id=REV_COMMANDS[type(payload)], + status=EmberStatus.SUCCESS, + payload=payload, + ) + + @classmethod + def from_bytes(cls, data: bytes) -> XncpCommand: + command_id, data = XncpCommandId.deserialize(data) + status, data = EmberStatus.deserialize(data) + payload = COMMANDS[command_id].deserialize(data) + + return cls(command_id=command_id, status=status, payload=payload) + + def serialize(self) -> Bytes: + return ( + self.command_id.serialize() + + self.status.serialize() + + self.payload.serialize() + ) + + +class FirmwareFeatures(t.bitmap32): + NONE = 0 + + # The firmware passes through all group traffic, regardless of group membership + MEMBER_OF_ALL_GROUPS = 1 << 0 + + # Source routes can be overridden by the application + MANUAL_SOURCE_ROUTE = 1 << 1 + + # The firmware supports overriding the board name + BOARD_MANUF = 1 << 2 + + +class XncpCommandPayload(t.Struct): + pass + + +@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ) +class GetSupportedFeaturesReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_SUPPORTED_FEATURES_RSP) +class GetSupportedFeaturesRsp(XncpCommandPayload): + features: FirmwareFeatures + + +@register_command(XncpCommandId.SET_SOURCE_ROUTE_REQ) +class SetSourceRouteReq(XncpCommandPayload): + destination: t.NWK + source_route: t.List[t.NWK] + + +@register_command(XncpCommandId.SET_SOURCE_ROUTE_RSP) +class SetSourceRouteRsp(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_BOARD_NAME_REQ) +class GetBoardNameReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_BOARD_NAME_RSP) +class GetBoardNameRsp(XncpCommandPayload): + board_name: Bytes + + +@register_command(XncpCommandId.GET_MANUF_NAME_REQ) +class GetManufNameReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_MANUF_NAME_RSP) +class GetManufNameRsp(XncpCommandPayload): + manuf_name: Bytes diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index d4d561c2..7f738f91 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -32,7 +32,7 @@ ) from bellows.exception import ControllerError, EzspError, StackAlreadyRunning import bellows.ezsp -from bellows.ezsp.custom_commands import FirmwareFeatures +from bellows.ezsp.xncp import FirmwareFeatures import bellows.multicast import bellows.types as t from bellows.zigbee import repairs @@ -88,7 +88,6 @@ def __init__(self, config: dict): self._pending = zigpy.util.Requests() self._watchdog_failures = 0 self._watchdog_feed_counter = 0 - self._custom_features = FirmwareFeatures.NONE self._req_lock = asyncio.Lock() @@ -202,12 +201,9 @@ async def start_network(self): ezsp.add_callback(self.ezsp_callback_handler) self.controller_event.set() - self._custom_features = await self._ezsp.xncp_get_supported_firmware_features() - LOGGER.debug("Supported custom firmware features: %r", self._custom_features) - group_membership = {} - if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in self._custom_features: + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS in self._ezsp._xncp_features: # If the firmware passes through all incoming group messages, do nothing endpoint_cls = EZSPEndpoint else: @@ -238,7 +234,7 @@ async def start_network(self): await ezsp_device.schedule_initialize() - if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in self._custom_features: + if FirmwareFeatures.MEMBER_OF_ALL_GROUPS not in self._ezsp._xncp_features: ezsp_device.endpoints[1].member_of.update(group_membership) self._multicast = bellows.multicast.Multicast(ezsp) await self._multicast.startup(ezsp_device) @@ -676,7 +672,7 @@ async def _reset_mfg_id(self, mfg_id: int) -> None: """Resets manufacturer id if was temporary overridden by a joining device.""" await self._ezsp.setManufacturerCode(code=mfg_id) await asyncio.sleep(MFG_ID_RESET_DELAY) - await self._ezsp.setManufacturerCode(code=DEFAULT_MFG_ID) + await self._ezsp.setManufacturerCode(DEFAULT_MFG_ID) async def energy_scan( self, channels: t.Channels, duration_exp: int, count: int @@ -763,7 +759,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: ) if packet.source_route is not None: - if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._custom_features: + if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._ezsp._xncp_features: await self._ezsp.xncp_set_manual_source_route( nwk=packet.dst.address, relays=packet.source_route, @@ -921,6 +917,8 @@ async def _watchdog_feed(self): cnt._raw_value = free_buffers cnt._last_reset_value = 0 + await self._ezsp.getMulticastTableEntry(0) + LOGGER.debug("%s", counters) except (asyncio.TimeoutError, EzspError) as exc: # TODO: converted Silvercrest gateways break without this diff --git a/tests/test_application.py b/tests/test_application.py index 8f044235..94020a42 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -18,6 +18,7 @@ import bellows.ezsp as ezsp from bellows.ezsp.custom_commands import FirmwareFeatures from bellows.ezsp.v9.commands import GetTokenDataRsp +from bellows.ezsp.xncp import FirmwareFeatures import bellows.types import bellows.types as t import bellows.types.struct From 884e453914abe70430e4f03bdcd965e5b4bb2874 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 29 May 2024 18:21:27 -0400 Subject: [PATCH 08/20] Use a generic interface to override manufacturing tokens --- bellows/ezsp/__init__.py | 46 ++++++++++++++++++++-------------------- bellows/ezsp/xncp.py | 42 ++++++++++++++++-------------------- 2 files changed, 41 insertions(+), 47 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 272a9127..d0d2107a 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -323,24 +323,10 @@ async def get_board_info( ) -> tuple[str, str, str | None] | tuple[None, None, str | None]: """Return board info.""" - raw_tokens: dict[t.EzspMfgTokenId, list[bytes]] = { - t.EzspMfgTokenId.MFG_STRING: [], - t.EzspMfgTokenId.MFG_BOARD_NAME: [], - } + tokens = {} - # Prefer XNCP overrides if they exist - try: - override_board, override_manuf = await self.xncp_get_board_info_overrides() - except InvalidCommandError: - pass - else: - raw_tokens[t.EzspMfgTokenId.MFG_STRING].append(override_manuf) - raw_tokens[t.EzspMfgTokenId.MFG_BOARD_NAME].append(override_board) - - # If not, read manufacturing tokens for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): - (value,) = await self.getMfgToken(tokenId=token) - LOGGER.debug("Read %s token: %s", token.name, value) + value = await self.get_mfg_token(token) raw_tokens[token].append(value) # Try to parse them @@ -404,9 +390,25 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None: return None + async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes: + (value,) = await self.getMfgToken(tokenId=token) + LOGGER.debug("Read manufacturing token %s: %s", token.name, value) + + override_value = None + + if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features: + try: + override_value = await self.xncp_get_mfg_token_override(token) + except InvalidCommandError: + pass + + LOGGER.debug("XNCP override token %s: %s", token.name, override_value) + + return override_value or value + async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None: """Get the custom EUI 64 manufacturing token, if it has a valid value.""" - (data,) = await self.getMfgToken(tokenId=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + data = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) # Manufacturing tokens do not exist in RCP firmware: all reads are empty if not data: @@ -687,9 +689,7 @@ async def xncp_set_manual_source_route( ) ) - async def xncp_get_board_info_overrides(self) -> tuple[str | None, str | None]: - """Get board information overrides.""" - name_rsp = await self.send_xncp_frame(xncp.GetBoardNameReq()) - manuf_rsp = await self.send_xncp_frame(xncp.GetManufNameReq()) - - return (name_rsp.board_name or None, manuf_rsp.manuf_name or None) + async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes: + """Get manufacturing token override.""" + rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token)) + return rsp.value diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py index a2d5781e..02805a06 100644 --- a/bellows/ezsp/xncp.py +++ b/bellows/ezsp/xncp.py @@ -2,11 +2,14 @@ from __future__ import annotations import dataclasses +import logging from typing import Callable import zigpy.types as t -from bellows.types import EmberStatus +from bellows.types import EmberStatus, EzspMfgTokenId + +_LOGGER = logging.getLogger(__name__) COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {} REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {} @@ -42,13 +45,11 @@ def __repr__(self) -> str: class XncpCommandId(t.enum16): GET_SUPPORTED_FEATURES_REQ = 0x0000 SET_SOURCE_ROUTE_REQ = 0x0001 - GET_BOARD_NAME_REQ = 0x0002 - GET_MANUF_NAME_REQ = 0x0003 + GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002 GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 - GET_BOARD_NAME_RSP = GET_BOARD_NAME_REQ | 0x8000 - GET_MANUF_NAME_RSP = GET_MANUF_NAME_REQ | 0x8000 + GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000 UNKNOWN = 0xFFFF @@ -71,7 +72,10 @@ def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand: def from_bytes(cls, data: bytes) -> XncpCommand: command_id, data = XncpCommandId.deserialize(data) status, data = EmberStatus.deserialize(data) - payload = COMMANDS[command_id].deserialize(data) + payload, rest = COMMANDS[command_id].deserialize(data) + + if rest: + _LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest) return cls(command_id=command_id, status=status, payload=payload) @@ -92,8 +96,8 @@ class FirmwareFeatures(t.bitmap32): # Source routes can be overridden by the application MANUAL_SOURCE_ROUTE = 1 << 1 - # The firmware supports overriding the board name - BOARD_MANUF = 1 << 2 + # The firmware supports overriding some manufacturing tokens + MFG_TOKEN_OVERRIDES = 1 << 2 class XncpCommandPayload(t.Struct): @@ -121,21 +125,11 @@ class SetSourceRouteRsp(XncpCommandPayload): pass -@register_command(XncpCommandId.GET_BOARD_NAME_REQ) -class GetBoardNameReq(XncpCommandPayload): - pass - - -@register_command(XncpCommandId.GET_BOARD_NAME_RSP) -class GetBoardNameRsp(XncpCommandPayload): - board_name: Bytes - - -@register_command(XncpCommandId.GET_MANUF_NAME_REQ) -class GetManufNameReq(XncpCommandPayload): - pass +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ) +class GetMfgTokenOverrideReq(XncpCommandPayload): + token: EzspMfgTokenId -@register_command(XncpCommandId.GET_MANUF_NAME_RSP) -class GetManufNameRsp(XncpCommandPayload): - manuf_name: Bytes +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP) +class GetMfgTokenOverrideRsp(XncpCommandPayload): + value: Bytes From 3f684d248a9156d05abf5722021f56dead996665 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:29:40 -0400 Subject: [PATCH 09/20] Hide firmware-level manual source routing behind `manual_source_routing` --- bellows/config/__init__.py | 9 +++++++++ bellows/zigbee/application.py | 4 +++- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/bellows/config/__init__.py b/bellows/config/__init__.py index 29ffe647..74d91d9c 100644 --- a/bellows/config/__init__.py +++ b/bellows/config/__init__.py @@ -18,6 +18,9 @@ cv_boolean, ) +CONF_BELLOWS_CONFIG = "bellows_config" +CONF_MANUAL_SOURCE_ROUTING = "manual_source_routing" + CONF_USE_THREAD = "use_thread" CONF_EZSP_CONFIG = "ezsp_config" CONF_EZSP_POLICIES = "ezsp_policies" @@ -31,6 +34,12 @@ {vol.Optional(str): int} ), vol.Optional(CONF_USE_THREAD, default=True): cv_boolean, + # The above config really should belong in here + vol.Optional(CONF_BELLOWS_CONFIG, default={}): vol.Schema( + { + vol.Optional(CONF_MANUAL_SOURCE_ROUTING, default=False): bool, + } + ), } ) diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 7f738f91..92fc923b 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -25,8 +25,10 @@ import bellows from bellows.config import ( + CONF_BELLOWS_CONFIG, CONF_EZSP_CONFIG, CONF_EZSP_POLICIES, + CONF_MANUAL_SOURCE_ROUTING, CONF_USE_THREAD, CONFIG_SCHEMA, ) @@ -759,7 +761,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: ) if packet.source_route is not None: - if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._ezsp._xncp_features: + if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._ezsp._xncp_features and self.config[CONF_BELLOWS_CONFIG][CONF_MANUAL_SOURCE_ROUTING]: await self._ezsp.xncp_set_manual_source_route( nwk=packet.dst.address, relays=packet.source_route, From 4691bb25dbddeec709c21a81f4b3de177ede00b9 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:38:25 -0400 Subject: [PATCH 10/20] Fix ruff issue --- bellows/ezsp/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index d0d2107a..c27789d4 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -397,10 +397,8 @@ async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes: override_value = None if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features: - try: + with contextlib.suppress(InvalidCommandError): override_value = await self.xncp_get_mfg_token_override(token) - except InvalidCommandError: - pass LOGGER.debug("XNCP override token %s: %s", token.name, override_value) From d5c2c2ce580f8a34ebf617803c1328c7f08a143e Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:42:45 -0400 Subject: [PATCH 11/20] Remove erroneous `getMulticastTableEntry(0)` --- bellows/zigbee/application.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 92fc923b..c38ae8d8 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -919,8 +919,6 @@ async def _watchdog_feed(self): cnt._raw_value = free_buffers cnt._last_reset_value = 0 - await self._ezsp.getMulticastTableEntry(0) - LOGGER.debug("%s", counters) except (asyncio.TimeoutError, EzspError) as exc: # TODO: converted Silvercrest gateways break without this From 3311348dd1e4da62fb1dd97a76ea802568c543c7 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Jun 2024 17:50:08 -0400 Subject: [PATCH 12/20] Fix unit tests --- bellows/ezsp/__init__.py | 22 ++++++++++------------ tests/test_application.py | 3 ++- tests/test_ezsp.py | 6 ++++-- 3 files changed, 16 insertions(+), 15 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index c27789d4..ba711770 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -127,6 +127,7 @@ async def startup_reset(self) -> None: await self.reset() await self.version() + await self.get_xncp_features() async def connect(self, *, use_thread: bool = True) -> None: assert self._gw is None @@ -171,24 +172,21 @@ async def version(self): self._switch_protocol_version(ver) await self._command("version", desiredProtocolVersion=ver) - try: - self._xncp_features = await self.xncp_get_supported_firmware_features() - except InvalidCommandError: - self._xncp_features = xncp.FirmwareFeatures.NONE - LOGGER.debug( - ( - "EZSP Stack Type: %s" - ", Stack Version: %04x" - ", Protocol version: %s" - ", XNCP features: %s" - ), + ("EZSP Stack Type: %s" ", Stack Version: %04x" ", Protocol version: %s"), stack_type, stack_version, ver, - self._xncp_features, ) + async def get_xncp_features(self) -> None: + try: + self._xncp_features = await self.xncp_get_supported_firmware_features() + except InvalidCommandError: + self._xncp_features = xncp.FirmwareFeatures.NONE + + LOGGER.debug("XNCP features: %s", self._xncp_features) + async def disconnect(self): self.stop_ezsp() if self._gw: diff --git a/tests/test_application.py b/tests/test_application.py index 94020a42..b6a00003 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -124,9 +124,10 @@ def _create_app_for_startup( ezsp_mock.wait_for_stack_status.return_value.__enter__ = AsyncMock( return_value=t.EmberStatus.NETWORK_UP ) - ezsp_mock.get_supported_firmware_features = AsyncMock( + ezsp_mock.xncp_get_supported_firmware_features = AsyncMock( return_value=FirmwareFeatures.NONE ) + ezsp_mock._xncp_features = FirmwareFeatures.NONE if board_info: ezsp_mock.get_board_info = AsyncMock( diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 28fbf2c2..cf602bc3 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -588,7 +588,8 @@ async def test_write_custom_eui64_rcp(ezsp_f): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) -async def test_ezsp_init_zigbeed(reset_mock, version_mock): +@patch.object(ezsp.EZSP, "get_xncp_features", new_callable=AsyncMock) +async def test_ezsp_init_zigbeed(reset_mock, xncp_mock, version_mock): """Test initialize method with a received startup reset frame.""" ezsp = make_ezsp( config={ @@ -609,8 +610,9 @@ async def test_ezsp_init_zigbeed(reset_mock, version_mock): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) +@patch.object(ezsp.EZSP, "get_xncp_features", new_callable=AsyncMock) @patch("bellows.ezsp.NETWORK_COORDINATOR_STARTUP_RESET_WAIT", 0.01) -async def test_ezsp_init_zigbeed_timeout(reset_mock, version_mock): +async def test_ezsp_init_zigbeed_timeout(reset_mock, xncp_mock, version_mock): """Test initialize method with a received startup reset frame.""" ezsp = make_ezsp( config={ From 2258b3d894e7f89b6117d675ab627bc69a114f1d Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 5 Jun 2024 18:32:17 -0400 Subject: [PATCH 13/20] Support `GET_BUILD_STRING_REQ` --- bellows/ezsp/__init__.py | 14 ++++++++++++++ bellows/ezsp/xncp.py | 15 +++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index ba711770..7b893494 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -361,6 +361,15 @@ async def get_board_info( special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes) version = f"{major}.{minor}.{patch}.{special} build {build}" + if xncp.FirmwareFeatures.BUILD_STRING in self._xncp_features: + try: + build_string = await self.xncp_get_build_string() + except InvalidCommandError: + build_string = None + + if build_string: + version = f"{version} ({build_string})" + return ( tokens[t.EzspMfgTokenId.MFG_STRING], tokens[t.EzspMfgTokenId.MFG_BOARD_NAME], @@ -689,3 +698,8 @@ async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes: """Get manufacturing token override.""" rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token)) return rsp.value + + async def xncp_get_build_string(self) -> bytes: + """Get build string.""" + rsp = await self.send_xncp_frame(xncp.GetBuildStringReq()) + return rsp.build_string.decode("utf-8") diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py index 02805a06..af7c03ec 100644 --- a/bellows/ezsp/xncp.py +++ b/bellows/ezsp/xncp.py @@ -46,10 +46,12 @@ class XncpCommandId(t.enum16): GET_SUPPORTED_FEATURES_REQ = 0x0000 SET_SOURCE_ROUTE_REQ = 0x0001 GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002 + GET_BUILD_STRING_REQ = 0x0003 GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000 + GET_BUILD_STRING_RSP = GET_BUILD_STRING_REQ | 0x8000 UNKNOWN = 0xFFFF @@ -99,6 +101,9 @@ class FirmwareFeatures(t.bitmap32): # The firmware supports overriding some manufacturing tokens MFG_TOKEN_OVERRIDES = 1 << 2 + # The firmware contains a free-form build string + BUILD_STRING = 1 << 3 + class XncpCommandPayload(t.Struct): pass @@ -133,3 +138,13 @@ class GetMfgTokenOverrideReq(XncpCommandPayload): @register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP) class GetMfgTokenOverrideRsp(XncpCommandPayload): value: Bytes + + +@register_command(XncpCommandId.GET_BUILD_STRING_REQ) +class GetBuildStringReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_BUILD_STRING_RSP) +class GetBuildStringRsp(XncpCommandPayload): + build_string: Bytes From 848d2fe00f591ba94645b9c4b4be33a1824c31d8 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 13 Aug 2024 16:56:38 -0400 Subject: [PATCH 14/20] Fix up after rebase --- bellows/ezsp/__init__.py | 34 ++++++++++++---------------------- bellows/zigbee/application.py | 10 ++++++++-- tests/test_application.py | 1 - 3 files changed, 20 insertions(+), 25 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 7b893494..e0577f53 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -321,32 +321,22 @@ async def get_board_info( ) -> tuple[str, str, str | None] | tuple[None, None, str | None]: """Return board info.""" - tokens = {} + tokens: dict[t.EzspMfgTokenId, str | None] = {} - for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): - value = await self.get_mfg_token(token) - raw_tokens[token].append(value) + for token_id in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): + value = await self.get_mfg_token(token_id) - # Try to parse them - tokens: dict[t.EzspMfgTokenId, str] = {} + # Tokens are fixed-length and initially filled with \xFF but also can end + # with \x00 + while value.endswith((b"\xFF", b"\x00")): + value = value.rstrip(b"\xFF").rstrip(b"\x00") - for token_id, values in raw_tokens.items(): - for value in values: - # Tokens are fixed-length and initially filled with \xFF but also can end - # with \x00 - while value.endswith((b"\xFF", b"\x00")): - value = value.rstrip(b"\xFF").rstrip(b"\x00") - - try: - result = value.decode("utf-8") - except UnicodeDecodeError: - result = "0x" + value.hex().upper() + try: + result = value.decode("utf-8") + except UnicodeDecodeError: + result = "0x" + value.hex().upper() - if result: - tokens[token_id] = result - break - else: - tokens[token_id] = None + tokens[token_id] = result or None (status, ver_info_bytes) = await self.getValue( valueId=t.EzspValueId.VALUE_VERSION_INFO diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index c38ae8d8..7d22bd25 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -674,7 +674,7 @@ async def _reset_mfg_id(self, mfg_id: int) -> None: """Resets manufacturer id if was temporary overridden by a joining device.""" await self._ezsp.setManufacturerCode(code=mfg_id) await asyncio.sleep(MFG_ID_RESET_DELAY) - await self._ezsp.setManufacturerCode(DEFAULT_MFG_ID) + await self._ezsp.setManufacturerCode(code=DEFAULT_MFG_ID) async def energy_scan( self, channels: t.Channels, duration_exp: int, count: int @@ -761,7 +761,13 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None: ) if packet.source_route is not None: - if FirmwareFeatures.MANUAL_SOURCE_ROUTE in self._ezsp._xncp_features and self.config[CONF_BELLOWS_CONFIG][CONF_MANUAL_SOURCE_ROUTING]: + if ( + FirmwareFeatures.MANUAL_SOURCE_ROUTE + in self._ezsp._xncp_features + and self.config[CONF_BELLOWS_CONFIG][ + CONF_MANUAL_SOURCE_ROUTING + ] + ): await self._ezsp.xncp_set_manual_source_route( nwk=packet.dst.address, relays=packet.source_route, diff --git a/tests/test_application.py b/tests/test_application.py index b6a00003..2dd078d3 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -16,7 +16,6 @@ import bellows.config as config from bellows.exception import ControllerError, EzspError import bellows.ezsp as ezsp -from bellows.ezsp.custom_commands import FirmwareFeatures from bellows.ezsp.v9.commands import GetTokenDataRsp from bellows.ezsp.xncp import FirmwareFeatures import bellows.types From 314d7cf63fa25061f789676ee894a86e318fa8ca Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Tue, 15 Oct 2024 15:07:55 -0400 Subject: [PATCH 15/20] Add support for the `GET_FLOW_CONTROL_TYPE` XNCP command --- bellows/ezsp/__init__.py | 7 ++++++- bellows/ezsp/xncp.py | 20 ++++++++++++++++++++ bellows/zigbee/application.py | 15 ++++++++++++++- 3 files changed, 40 insertions(+), 2 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index e0577f53..cdb808f8 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -25,7 +25,7 @@ from bellows.exception import EzspError, InvalidCommandError from bellows.ezsp import xncp from bellows.ezsp.config import DEFAULT_CONFIG, RuntimeConfig, ValueConfig -from bellows.ezsp.xncp import FirmwareFeatures +from bellows.ezsp.xncp import FirmwareFeatures, FlowControlType import bellows.types as t import bellows.uart @@ -693,3 +693,8 @@ async def xncp_get_build_string(self) -> bytes: """Get build string.""" rsp = await self.send_xncp_frame(xncp.GetBuildStringReq()) return rsp.build_string.decode("utf-8") + + async def xncp_get_flow_control_type(self) -> FlowControlType: + """Get flow control type.""" + rsp = await self.send_xncp_frame(xncp.GetFlowControlTypeReq()) + return rsp.flow_control_type diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py index af7c03ec..f6dc52ce 100644 --- a/bellows/ezsp/xncp.py +++ b/bellows/ezsp/xncp.py @@ -47,11 +47,13 @@ class XncpCommandId(t.enum16): SET_SOURCE_ROUTE_REQ = 0x0001 GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002 GET_BUILD_STRING_REQ = 0x0003 + GET_FLOW_CONTROL_TYPE_REQ = 0x0004 GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000 GET_BUILD_STRING_RSP = GET_BUILD_STRING_REQ | 0x8000 + GET_FLOW_CONTROL_TYPE_RSP = GET_FLOW_CONTROL_TYPE_REQ | 0x8000 UNKNOWN = 0xFFFF @@ -104,11 +106,19 @@ class FirmwareFeatures(t.bitmap32): # The firmware contains a free-form build string BUILD_STRING = 1 << 3 + # The flow control type (software or hardware) can be queried + FLOW_CONTROL_TYPE = 1 << 4 + class XncpCommandPayload(t.Struct): pass +class FlowControlType(t.enum8): + Software = 0x00 + Hardware = 0x01 + + @register_command(XncpCommandId.GET_SUPPORTED_FEATURES_REQ) class GetSupportedFeaturesReq(XncpCommandPayload): pass @@ -148,3 +158,13 @@ class GetBuildStringReq(XncpCommandPayload): @register_command(XncpCommandId.GET_BUILD_STRING_RSP) class GetBuildStringRsp(XncpCommandPayload): build_string: Bytes + + +@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_REQ) +class GetFlowControlTypeReq(XncpCommandPayload): + pass + + +@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_RSP) +class GetFlowControlTypeRsp(XncpCommandPayload): + flow_control_type: FlowControlType diff --git a/bellows/zigbee/application.py b/bellows/zigbee/application.py index 7d22bd25..c8cfdd35 100644 --- a/bellows/zigbee/application.py +++ b/bellows/zigbee/application.py @@ -32,7 +32,12 @@ CONF_USE_THREAD, CONFIG_SCHEMA, ) -from bellows.exception import ControllerError, EzspError, StackAlreadyRunning +from bellows.exception import ( + ControllerError, + EzspError, + InvalidCommandError, + StackAlreadyRunning, +) import bellows.ezsp from bellows.ezsp.xncp import FirmwareFeatures import bellows.multicast @@ -294,6 +299,11 @@ async def load_network_info(self, *, load_devices=False) -> None: can_burn_userdata_custom_eui64 = await ezsp.can_burn_userdata_custom_eui64() can_rewrite_custom_eui64 = await ezsp.can_rewrite_custom_eui64() + try: + flow_control = await self._ezsp.xncp_get_flow_control_type() + except InvalidCommandError: + flow_control = None + self.state.network_info = zigpy.state.NetworkInfo( source=f"bellows@{LIB_VERSION}", extended_pan_id=zigpy.types.ExtendedPanId(nwk_params.extendedPanId), @@ -314,6 +324,9 @@ async def load_network_info(self, *, load_devices=False) -> None: "stack_version": ezsp.ezsp_version, "can_burn_userdata_custom_eui64": can_burn_userdata_custom_eui64, "can_rewrite_custom_eui64": can_rewrite_custom_eui64, + "flow_control": ( + flow_control.name.lower() if flow_control is not None else None + ), } }, ) From 1dcd392c78a8eaa4893c2751c7d85cc37ca2b36a Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 8 Nov 2024 16:19:44 -0500 Subject: [PATCH 16/20] Fix up after rebase --- tests/test_application.py | 4 ++++ tests/test_ezsp.py | 9 ++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/tests/test_application.py b/tests/test_application.py index 2dd078d3..45aaa24e 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -123,6 +123,9 @@ def _create_app_for_startup( ezsp_mock.wait_for_stack_status.return_value.__enter__ = AsyncMock( return_value=t.EmberStatus.NETWORK_UP ) + ezsp_mock.customFrame = AsyncMock( + return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""] + ) ezsp_mock.xncp_get_supported_firmware_features = AsyncMock( return_value=FirmwareFeatures.NONE ) @@ -1827,6 +1830,7 @@ def zigpy_backup() -> zigpy.backups.NetworkBackup: metadata={ "ezsp": { "stack_version": 8, + "flow_control": None, "can_burn_userdata_custom_eui64": True, "can_rewrite_custom_eui64": True, } diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index cf602bc3..148833c4 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -40,6 +40,9 @@ async def mock_command(command, *args, **kwargs): api._mock_commands = {} api._mock_commands["version"] = AsyncMock(return_value=[version, 0, 0]) + api._mock_commands["customFrame"] = AsyncMock( + return_value=[t.EmberStatus.LIBRARY_NOT_PRESENT, b""] + ) api._command = AsyncMock(side_effect=mock_command) return api @@ -588,8 +591,8 @@ async def test_write_custom_eui64_rcp(ezsp_f): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) -@patch.object(ezsp.EZSP, "get_xncp_features", new_callable=AsyncMock) -async def test_ezsp_init_zigbeed(reset_mock, xncp_mock, version_mock): +@patch.object(EZSP, "get_xncp_features", new_callable=AsyncMock) +async def test_ezsp_init_zigbeed(xncp_mock, reset_mock, version_mock): """Test initialize method with a received startup reset frame.""" ezsp = make_ezsp( config={ @@ -610,7 +613,7 @@ async def test_ezsp_init_zigbeed(reset_mock, xncp_mock, version_mock): @patch.object(EZSP, "version", new_callable=AsyncMock) @patch.object(EZSP, "reset", new_callable=AsyncMock) -@patch.object(ezsp.EZSP, "get_xncp_features", new_callable=AsyncMock) +@patch.object(EZSP, "get_xncp_features", new_callable=AsyncMock) @patch("bellows.ezsp.NETWORK_COORDINATOR_STARTUP_RESET_WAIT", 0.01) async def test_ezsp_init_zigbeed_timeout(reset_mock, xncp_mock, version_mock): """Test initialize method with a received startup reset frame.""" From a91399d1b2a7ba06dec2e96d2ee0fc99a6334f8e Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 8 Nov 2024 17:21:34 -0500 Subject: [PATCH 17/20] Coverage --- bellows/ezsp/__init__.py | 15 +++++++-------- tests/test_ezsp.py | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 9 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index cdb808f8..e0c1dcfe 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -351,14 +351,13 @@ async def get_board_info( special, ver_info_bytes = t.uint8_t.deserialize(ver_info_bytes) version = f"{major}.{minor}.{patch}.{special} build {build}" - if xncp.FirmwareFeatures.BUILD_STRING in self._xncp_features: - try: - build_string = await self.xncp_get_build_string() - except InvalidCommandError: - build_string = None + try: + build_string = await self.xncp_get_build_string() + except InvalidCommandError: + build_string = None - if build_string: - version = f"{version} ({build_string})" + if build_string: + version = f"{version} ({build_string})" return ( tokens[t.EzspMfgTokenId.MFG_STRING], @@ -689,7 +688,7 @@ async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes: rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token)) return rsp.value - async def xncp_get_build_string(self) -> bytes: + async def xncp_get_build_string(self) -> str: """Get build string.""" rsp = await self.send_xncp_frame(xncp.GetBuildStringReq()) return rsp.build_string.decode("utf-8") diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 148833c4..c8702281 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -12,6 +12,7 @@ from bellows.ash import NcpFailure from bellows.exception import EzspError, InvalidCommandError from bellows.ezsp import EZSP, EZSP_LATEST +from bellows.ezsp.xncp import FirmwareFeatures import bellows.types as t if sys.version_info[:2] < (3, 11): @@ -323,6 +324,7 @@ async def test_ezsp_newer_version(ezsp_f): ( "mfg_board_name", "mfg_string", + "xncp_build_string", "value_version_info", "expected", ), @@ -330,39 +332,52 @@ async def test_ezsp_newer_version(ezsp_f): ( (b"\xfe\xff\xff\xff",), (b"Manufacturer\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\x01\x02\x03\x04\x05\x06"), ("Manufacturer", "0xFE", "3.4.5.6 build 513"), ), ( (b"\xfe\xff\xff\xff",), (b"Manufacturer\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.ERR_FATAL, b"\x01\x02\x03\x04\x05\x06"), ("Manufacturer", "0xFE", None), ), ( (b"SkyBlue v0.1\x00\xff\xff\xff",), (b"Nabu Casa\x00\xff\xff\xff\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), ("Nabu Casa", "SkyBlue v0.1", "7.1.0.0 build 191"), ), ( (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), (None, None, "7.1.0.0 build 191"), ), ( (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff",), (b"\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00",), + (InvalidCommandError("XNCP is not supported"),), (t.EmberStatus.SUCCESS, b")\x01\x06\n\x03\x00\xaa"), (None, None, "6.10.3.0 build 297"), ), + ( + (b"SkyBlue v0.1\x00\xff\xff\xff",), + (b"Nabu Casa\x00\xff\xff\xff\xff\xff\xff",), + ("special build",), + (t.EmberStatus.SUCCESS, b"\xbf\x00\x07\x01\x00\x00\xaa"), + ("Nabu Casa", "SkyBlue v0.1", "7.1.0.0 build 191 (special build)"), + ), ], ) async def test_board_info( ezsp_f, mfg_board_name: bytes, mfg_string: bytes, + xncp_build_string: str | Exception, value_version_info: tuple[t.EmberStatus, bytes], expected: tuple[str | None, str | None, str], ): @@ -384,7 +399,7 @@ async def replacement(command_name, tokenId=None, valueId=None): ("getValue", t.EzspValueId.VALUE_VERSION_INFO): value_version_info, } ), - ): + ), patch.object(ezsp_f, "xncp_get_build_string", side_effect=xncp_build_string): mfg, brd, ver = await ezsp_f.get_board_info() assert (mfg, brd, ver) == expected @@ -432,6 +447,28 @@ async def _mock_cmd(*args, **kwargs): await ezsp_f.leaveNetwork(timeout=0.01) +async def test_xncp_token_override(ezsp_f): + ezsp_f.getMfgToken = AsyncMock(return_value=[b"firmware value"]) + ezsp_f.xncp_get_mfg_token_override = AsyncMock(return_value=b"xncp value") + + # Without firmware support, the XNCP command isn't sent + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"firmware value" + + # With firmware support, it is + ezsp_f._xncp_features |= FirmwareFeatures.MFG_TOKEN_OVERRIDES + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"xncp value" + + # Tokens without overrides are still read normally + ezsp_f.xncp_get_mfg_token_override.side_effect = InvalidCommandError + assert ( + await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ) == b"firmware value" + + @pytest.mark.parametrize( "value, expected_result", [ From d532f65efaefa449b4d4597843a0a614b0e0bc1b Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 8 Nov 2024 22:30:08 -0500 Subject: [PATCH 18/20] Unit test XNCP commands --- tests/test_ezsp.py | 106 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index c8702281..6d03410e 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -11,8 +11,7 @@ from bellows import config, uart from bellows.ash import NcpFailure from bellows.exception import EzspError, InvalidCommandError -from bellows.ezsp import EZSP, EZSP_LATEST -from bellows.ezsp.xncp import FirmwareFeatures +from bellows.ezsp import EZSP, EZSP_LATEST, xncp import bellows.types as t if sys.version_info[:2] < (3, 11): @@ -457,7 +456,7 @@ async def test_xncp_token_override(ezsp_f): ) == b"firmware value" # With firmware support, it is - ezsp_f._xncp_features |= FirmwareFeatures.MFG_TOKEN_OVERRIDES + ezsp_f._xncp_features |= xncp.FirmwareFeatures.MFG_TOKEN_OVERRIDES assert ( await ezsp_f.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) ) == b"xncp value" @@ -871,3 +870,104 @@ def test_frame_parsing_error_doesnt_disconnect(ezsp_f, caplog): ezsp_f.frame_received(b"test") assert "Failed to parse frame" in caplog.text + + +async def test_xncp_get_supported_firmware_features(ezsp_f): + """Test XNCP get_supported_firmware_features.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetSupportedFeaturesRsp( + features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + ) + ).serialize(), + ] + ) + + assert ( + await ezsp_f.xncp_get_supported_firmware_features() + ) == xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) + ] + + +async def test_xncp_get_build_string(ezsp_f): + """Test XNCP get_build_string.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetBuildStringRsp(build_string="Some complex string 🦜".encode()) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_build_string() == "Some complex string 🦜" + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetBuildStringReq()).serialize()) + ] + + +async def test_xncp_set_manual_source_route(ezsp_f): + """Test XNCP set_manual_source_route.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload(xncp.SetSourceRouteRsp()).serialize(), + ] + ) + + await ezsp_f.xncp_set_manual_source_route( + destination=0x1234, route=[0x5678, 0xABCD] + ) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.SetSourceRouteReq( + destination=0x1234, source_route=[0x5678, 0xABCD] + ) + ).serialize() + ) + ] + + +async def test_xncp_get_mfg_token_override(ezsp_f): + """Test XNCP get_mfg_token_override.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideRsp(value=b"value") + ).serialize(), + ] + ) + + await ezsp_f.xncp_get_mfg_token_override(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideReq(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ).serialize() + ) + ] + + +async def test_xncp_get_flow_control_type(ezsp_f): + """Test XNCP get_flow_control_type.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetFlowControlTypeRsp( + flow_control_type=xncp.FlowControlType.Hardware + ) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.Hardware + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetFlowControlTypeReq()).serialize()) + ] From 0b100f1b6c049052cfd8ddec96ea58d2aff1fbfa Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 8 Nov 2024 22:35:31 -0500 Subject: [PATCH 19/20] Move XNCP tests into their own module --- bellows/ezsp/xncp.py | 9 --- tests/test_ezsp.py | 101 --------------------------------- tests/test_xncp.py | 132 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 132 insertions(+), 110 deletions(-) create mode 100644 tests/test_xncp.py diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py index f6dc52ce..f09c1064 100644 --- a/bellows/ezsp/xncp.py +++ b/bellows/ezsp/xncp.py @@ -32,15 +32,6 @@ def serialize(self) -> Bytes: def deserialize(cls, data: bytes) -> tuple[Bytes, bytes]: return cls(data), b"" - def __repr__(self) -> str: - # Reading byte sequences like \x200\x21 is extremely annoying - # compared to \x20\x30\x21 - escaped = "".join(f"\\x{b:02X}" for b in self) - - return f"b'{escaped}'" - - __str__ = __repr__ - class XncpCommandId(t.enum16): GET_SUPPORTED_FEATURES_REQ = 0x0000 diff --git a/tests/test_ezsp.py b/tests/test_ezsp.py index 6d03410e..fa07f3df 100644 --- a/tests/test_ezsp.py +++ b/tests/test_ezsp.py @@ -870,104 +870,3 @@ def test_frame_parsing_error_doesnt_disconnect(ezsp_f, caplog): ezsp_f.frame_received(b"test") assert "Failed to parse frame" in caplog.text - - -async def test_xncp_get_supported_firmware_features(ezsp_f): - """Test XNCP get_supported_firmware_features.""" - ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( - return_value=[ - t.EmberStatus.SUCCESS, - xncp.XncpCommand.from_payload( - xncp.GetSupportedFeaturesRsp( - features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE - ) - ).serialize(), - ] - ) - - assert ( - await ezsp_f.xncp_get_supported_firmware_features() - ) == xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE - assert customFrame.mock_calls == [ - call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) - ] - - -async def test_xncp_get_build_string(ezsp_f): - """Test XNCP get_build_string.""" - ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( - return_value=[ - t.EmberStatus.SUCCESS, - xncp.XncpCommand.from_payload( - xncp.GetBuildStringRsp(build_string="Some complex string 🦜".encode()) - ).serialize(), - ] - ) - - assert await ezsp_f.xncp_get_build_string() == "Some complex string 🦜" - assert customFrame.mock_calls == [ - call(xncp.XncpCommand.from_payload(xncp.GetBuildStringReq()).serialize()) - ] - - -async def test_xncp_set_manual_source_route(ezsp_f): - """Test XNCP set_manual_source_route.""" - ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( - return_value=[ - t.EmberStatus.SUCCESS, - xncp.XncpCommand.from_payload(xncp.SetSourceRouteRsp()).serialize(), - ] - ) - - await ezsp_f.xncp_set_manual_source_route( - destination=0x1234, route=[0x5678, 0xABCD] - ) - assert customFrame.mock_calls == [ - call( - xncp.XncpCommand.from_payload( - xncp.SetSourceRouteReq( - destination=0x1234, source_route=[0x5678, 0xABCD] - ) - ).serialize() - ) - ] - - -async def test_xncp_get_mfg_token_override(ezsp_f): - """Test XNCP get_mfg_token_override.""" - ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( - return_value=[ - t.EmberStatus.SUCCESS, - xncp.XncpCommand.from_payload( - xncp.GetMfgTokenOverrideRsp(value=b"value") - ).serialize(), - ] - ) - - await ezsp_f.xncp_get_mfg_token_override(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) - assert customFrame.mock_calls == [ - call( - xncp.XncpCommand.from_payload( - xncp.GetMfgTokenOverrideReq(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) - ).serialize() - ) - ] - - -async def test_xncp_get_flow_control_type(ezsp_f): - """Test XNCP get_flow_control_type.""" - ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( - return_value=[ - t.EmberStatus.SUCCESS, - xncp.XncpCommand.from_payload( - xncp.GetFlowControlTypeRsp( - flow_control_type=xncp.FlowControlType.Hardware - ) - ).serialize(), - ] - ) - - assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.Hardware - assert customFrame.mock_calls == [ - call(xncp.XncpCommand.from_payload(xncp.GetFlowControlTypeReq()).serialize()) - ] diff --git a/tests/test_xncp.py b/tests/test_xncp.py new file mode 100644 index 00000000..5563fd95 --- /dev/null +++ b/tests/test_xncp.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, call + +import pytest + +from bellows.exception import InvalidCommandError +from bellows.ezsp import EZSP, xncp +import bellows.types as t + +from tests.test_ezsp import ezsp_f + + +async def test_xncp_failure(ezsp_f: EZSP) -> None: + """Test XNCP failure.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.ERR_FATAL, + b"some other XNCP protocol that returns non-SUCCESS status codes", + ] + ) + + with pytest.raises(InvalidCommandError): + await ezsp_f.xncp_get_supported_firmware_features() + + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) + ] + + +async def test_xncp_get_supported_firmware_features(ezsp_f: EZSP) -> None: + """Test XNCP get_supported_firmware_features.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetSupportedFeaturesRsp( + features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + ) + ).serialize(), + ] + ) + + assert ( + await ezsp_f.xncp_get_supported_firmware_features() + ) == xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize()) + ] + + +async def test_xncp_get_build_string(ezsp_f: EZSP) -> None: + """Test XNCP get_build_string.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetBuildStringRsp(build_string="Some complex string 🦜".encode()) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_build_string() == "Some complex string 🦜" + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetBuildStringReq()).serialize()) + ] + + +async def test_xncp_set_manual_source_route(ezsp_f: EZSP) -> None: + """Test XNCP set_manual_source_route.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + ( + xncp.XncpCommand.from_payload(xncp.SetSourceRouteRsp()).serialize() + + b"some extra data" + ), + ] + ) + + await ezsp_f.xncp_set_manual_source_route( + destination=0x1234, route=[0x5678, 0xABCD] + ) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.SetSourceRouteReq( + destination=0x1234, source_route=[0x5678, 0xABCD] + ) + ).serialize() + ) + ] + + +async def test_xncp_get_mfg_token_override(ezsp_f: EZSP) -> None: + """Test XNCP get_mfg_token_override.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideRsp(value=b"value") + ).serialize(), + ] + ) + + await ezsp_f.xncp_get_mfg_token_override(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + assert customFrame.mock_calls == [ + call( + xncp.XncpCommand.from_payload( + xncp.GetMfgTokenOverrideReq(token=t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + ).serialize() + ) + ] + + +async def test_xncp_get_flow_control_type(ezsp_f: EZSP) -> None: + """Test XNCP get_flow_control_type.""" + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( + return_value=[ + t.EmberStatus.SUCCESS, + xncp.XncpCommand.from_payload( + xncp.GetFlowControlTypeRsp( + flow_control_type=xncp.FlowControlType.Hardware + ) + ).serialize(), + ] + ) + + assert await ezsp_f.xncp_get_flow_control_type() == xncp.FlowControlType.Hardware + assert customFrame.mock_calls == [ + call(xncp.XncpCommand.from_payload(xncp.GetFlowControlTypeReq()).serialize()) + ] From cc7692eb9b972c241b9ec36039cd796421f9b49f Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Fri, 8 Nov 2024 22:42:38 -0500 Subject: [PATCH 20/20] Get coverage up to 100% --- tests/test_application.py | 44 +++++++++++++++++++++++++++++++++++++++ tests/test_xncp.py | 10 +++++++-- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/tests/test_application.py b/tests/test_application.py index 45aaa24e..ffb2588a 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -25,6 +25,7 @@ import bellows.zigbee.application from bellows.zigbee.application import ControllerApplication import bellows.zigbee.device +from bellows.zigbee.device import EZSPEndpoint, EZSPGroupEndpoint from bellows.zigbee.util import map_rssi_to_energy from tests.common import mock_ezsp_commands @@ -852,6 +853,36 @@ async def test_send_packet_unicast_source_route(make_app, packet): ) +async def test_send_packet_unicast_manual_source_route(make_app, packet): + app = make_app( + { + zigpy.config.CONF_SOURCE_ROUTING: True, + config.CONF_BELLOWS_CONFIG: {config.CONF_MANUAL_SOURCE_ROUTING: True}, + } + ) + + app._ezsp._xncp_features |= FirmwareFeatures.MANUAL_SOURCE_ROUTE + + app._ezsp.xncp_set_manual_source_route = AsyncMock( + return_value=None, spec=app._ezsp._protocol.set_source_route + ) + + packet.source_route = [0x0001, 0x0002] + await _test_send_packet_unicast( + app, + packet, + options=( + t.EmberApsOption.APS_OPTION_RETRY + | t.EmberApsOption.APS_OPTION_ENABLE_ADDRESS_DISCOVERY + ), + ) + + app._ezsp.xncp_set_manual_source_route.assert_called_once_with( + nwk=packet.dst.address, + relays=[0x0001, 0x0002], + ) + + async def test_send_packet_unicast_extended_timeout(app, ieee, packet): app.add_device(nwk=packet.dst.address, ieee=ieee) @@ -1646,6 +1677,19 @@ async def test_startup_coordinator_existing_groups_joined(app, ieee): ] +async def test_startup_coordinator_xncp_wildcard_groups(app, ieee): + """Coordinator ignores multicast workarounds with XNCP firmware.""" + with mock_for_startup(app, ieee) as ezsp: + ezsp._xncp_features |= FirmwareFeatures.MEMBER_OF_ALL_GROUPS + + await app.connect() + await app.start_network() + + # No multicast workarounds are present + assert app._multicast is None + assert not isinstance(app._device.endpoints[1], EZSPGroupEndpoint) + + async def test_startup_new_coordinator_no_groups_joined(app, ieee): """Coordinator freshy added to the database has no groups to join.""" with mock_for_startup(app, ieee): diff --git a/tests/test_xncp.py b/tests/test_xncp.py index 5563fd95..60a7daee 100644 --- a/tests/test_xncp.py +++ b/tests/test_xncp.py @@ -13,10 +13,16 @@ async def test_xncp_failure(ezsp_f: EZSP) -> None: """Test XNCP failure.""" + + command = xncp.XncpCommand.from_payload( + xncp.GetSupportedFeaturesRsp(features=xncp.FirmwareFeatures.MANUAL_SOURCE_ROUTE) + ) + command.status = t.EmberStatus.ERR_FATAL + ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock( return_value=[ - t.EmberStatus.ERR_FATAL, - b"some other XNCP protocol that returns non-SUCCESS status codes", + t.EmberStatus.SUCCESS, # The frame itself encodes a status code + command.serialize(), ] )