From a24c84e9636512d593d4f7c9bd142df0e307d006 Mon Sep 17 00:00:00 2001 From: puddly <32534428+puddly@users.noreply.github.com> Date: Wed, 29 May 2024 18:21:27 -0400 Subject: [PATCH] Use a generic interface to override manufacturing tokens --- bellows/ezsp/__init__.py | 79 ++++++++++++++++++---------------------- bellows/ezsp/xncp.py | 42 +++++++++------------ 2 files changed, 54 insertions(+), 67 deletions(-) diff --git a/bellows/ezsp/__init__.py b/bellows/ezsp/__init__.py index 12befdd9..1fc1f8ba 100644 --- a/bellows/ezsp/__init__.py +++ b/bellows/ezsp/__init__.py @@ -352,44 +352,23 @@ async def get_board_info( ) -> tuple[str, str, str | None] | tuple[None, None, str | None]: """Return board info.""" - raw_tokens: dict[t.EzspMfgTokenId, list[bytes]] = { - t.EzspMfgTokenId.MFG_STRING: [], - t.EzspMfgTokenId.MFG_BOARD_NAME: [], - } + tokens = {} - # Prefer XNCP overrides if they exist - try: - override_board, override_manuf = await self.xncp_get_board_info_overrides() - except InvalidCommandError: - pass - else: - raw_tokens[t.EzspMfgTokenId.MFG_STRING].append(override_manuf) - raw_tokens[t.EzspMfgTokenId.MFG_BOARD_NAME].append(override_board) - - # If not, read manufacturing tokens for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME): - (value,) = await self.getMfgToken(token) - LOGGER.debug("Read %s token: %s", token.name, value) - raw_tokens[token].append(value) - - # Try to parse them - tokens: dict[t.EzspMfgTokenId, str] = {} - - for token_id, values in raw_tokens.items(): - for value in values: - # Tokens are fixed-length and initially filled with \xFF - result = value.rstrip(b"\xFF").split(b"\x00", 1)[0] - - try: - result = result.decode("utf-8") - except UnicodeDecodeError: - result = "0x" + result.hex().upper() - - if result: - tokens[token_id] = result - break - else: - tokens[token_id] = None + value = await self.get_mfg_token(token) + + # Tokens are fixed-length and initially filled with \xFF + result = value.rstrip(b"\xFF").split(b"\x00", 1)[0] + + try: + result = result.decode("utf-8") + except UnicodeDecodeError: + result = "0x" + result.hex().upper() + + if not result: + result = None + + tokens[token] = result (status, ver_info_bytes) = await self.getValue( self.types.EzspValueId.VALUE_VERSION_INFO @@ -431,9 +410,25 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None: return None + async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes: + (value,) = await self.getMfgToken(token) + LOGGER.debug("Read manufacturing token %s: %s", token.name, value) + + override_value = None + + if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features: + try: + override_value = await self.xncp_get_mfg_token_override(token) + except InvalidCommandError: + pass + + LOGGER.debug("XNCP override token %s: %s", token.name, override_value) + + return override_value or value + async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None: """Get the custom EUI 64 manufacturing token, if it has a valid value.""" - (data,) = await self.getMfgToken(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) + (data,) = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64) # Manufacturing tokens do not exist in RCP firmware: all reads are empty if not data: @@ -707,9 +702,7 @@ async def xncp_set_manual_source_route( ) ) - async def xncp_get_board_info_overrides(self) -> tuple[str | None, str | None]: - """Get board information overrides.""" - name_rsp = await self.send_xncp_frame(xncp.GetBoardNameReq()) - manuf_rsp = await self.send_xncp_frame(xncp.GetManufNameReq()) - - return (name_rsp.board_name or None, manuf_rsp.manuf_name or None) + async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes: + """Get manufacturing token override.""" + rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token)) + return rsp.value diff --git a/bellows/ezsp/xncp.py b/bellows/ezsp/xncp.py index a2d5781e..02805a06 100644 --- a/bellows/ezsp/xncp.py +++ b/bellows/ezsp/xncp.py @@ -2,11 +2,14 @@ from __future__ import annotations import dataclasses +import logging from typing import Callable import zigpy.types as t -from bellows.types import EmberStatus +from bellows.types import EmberStatus, EzspMfgTokenId + +_LOGGER = logging.getLogger(__name__) COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {} REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {} @@ -42,13 +45,11 @@ def __repr__(self) -> str: class XncpCommandId(t.enum16): GET_SUPPORTED_FEATURES_REQ = 0x0000 SET_SOURCE_ROUTE_REQ = 0x0001 - GET_BOARD_NAME_REQ = 0x0002 - GET_MANUF_NAME_REQ = 0x0003 + GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002 GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000 SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000 - GET_BOARD_NAME_RSP = GET_BOARD_NAME_REQ | 0x8000 - GET_MANUF_NAME_RSP = GET_MANUF_NAME_REQ | 0x8000 + GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000 UNKNOWN = 0xFFFF @@ -71,7 +72,10 @@ def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand: def from_bytes(cls, data: bytes) -> XncpCommand: command_id, data = XncpCommandId.deserialize(data) status, data = EmberStatus.deserialize(data) - payload = COMMANDS[command_id].deserialize(data) + payload, rest = COMMANDS[command_id].deserialize(data) + + if rest: + _LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest) return cls(command_id=command_id, status=status, payload=payload) @@ -92,8 +96,8 @@ class FirmwareFeatures(t.bitmap32): # Source routes can be overridden by the application MANUAL_SOURCE_ROUTE = 1 << 1 - # The firmware supports overriding the board name - BOARD_MANUF = 1 << 2 + # The firmware supports overriding some manufacturing tokens + MFG_TOKEN_OVERRIDES = 1 << 2 class XncpCommandPayload(t.Struct): @@ -121,21 +125,11 @@ class SetSourceRouteRsp(XncpCommandPayload): pass -@register_command(XncpCommandId.GET_BOARD_NAME_REQ) -class GetBoardNameReq(XncpCommandPayload): - pass - - -@register_command(XncpCommandId.GET_BOARD_NAME_RSP) -class GetBoardNameRsp(XncpCommandPayload): - board_name: Bytes - - -@register_command(XncpCommandId.GET_MANUF_NAME_REQ) -class GetManufNameReq(XncpCommandPayload): - pass +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ) +class GetMfgTokenOverrideReq(XncpCommandPayload): + token: EzspMfgTokenId -@register_command(XncpCommandId.GET_MANUF_NAME_RSP) -class GetManufNameRsp(XncpCommandPayload): - manuf_name: Bytes +@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP) +class GetMfgTokenOverrideRsp(XncpCommandPayload): + value: Bytes