Skip to content

Commit

Permalink
Use a generic interface to override manufacturing tokens
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed May 29, 2024
1 parent ef77644 commit a24c84e
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 67 deletions.
79 changes: 36 additions & 43 deletions bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,44 +352,23 @@ async def get_board_info(
) -> tuple[str, str, str | None] | tuple[None, None, str | None]:
"""Return board info."""

raw_tokens: dict[t.EzspMfgTokenId, list[bytes]] = {
t.EzspMfgTokenId.MFG_STRING: [],
t.EzspMfgTokenId.MFG_BOARD_NAME: [],
}
tokens = {}

# Prefer XNCP overrides if they exist
try:
override_board, override_manuf = await self.xncp_get_board_info_overrides()
except InvalidCommandError:
pass
else:
raw_tokens[t.EzspMfgTokenId.MFG_STRING].append(override_manuf)
raw_tokens[t.EzspMfgTokenId.MFG_BOARD_NAME].append(override_board)

# If not, read manufacturing tokens
for token in (t.EzspMfgTokenId.MFG_STRING, t.EzspMfgTokenId.MFG_BOARD_NAME):
(value,) = await self.getMfgToken(token)
LOGGER.debug("Read %s token: %s", token.name, value)
raw_tokens[token].append(value)

# Try to parse them
tokens: dict[t.EzspMfgTokenId, str] = {}

for token_id, values in raw_tokens.items():
for value in values:
# Tokens are fixed-length and initially filled with \xFF
result = value.rstrip(b"\xFF").split(b"\x00", 1)[0]

try:
result = result.decode("utf-8")
except UnicodeDecodeError:
result = "0x" + result.hex().upper()

if result:
tokens[token_id] = result
break
else:
tokens[token_id] = None
value = await self.get_mfg_token(token)

# Tokens are fixed-length and initially filled with \xFF
result = value.rstrip(b"\xFF").split(b"\x00", 1)[0]

try:
result = result.decode("utf-8")
except UnicodeDecodeError:
result = "0x" + result.hex().upper()

if not result:
result = None

tokens[token] = result

(status, ver_info_bytes) = await self.getValue(
self.types.EzspValueId.VALUE_VERSION_INFO
Expand Down Expand Up @@ -431,9 +410,25 @@ async def _get_nv3_restored_eui64_key(self) -> t.NV3KeyId | None:

return None

async def get_mfg_token(self, token: t.EzspMfgTokenId) -> bytes:
(value,) = await self.getMfgToken(token)
LOGGER.debug("Read manufacturing token %s: %s", token.name, value)

override_value = None

if FirmwareFeatures.MFG_TOKEN_OVERRIDES in self._xncp_features:
try:
override_value = await self.xncp_get_mfg_token_override(token)
except InvalidCommandError:
pass

LOGGER.debug("XNCP override token %s: %s", token.name, override_value)

return override_value or value

async def _get_mfg_custom_eui_64(self) -> t.EUI64 | None:
"""Get the custom EUI 64 manufacturing token, if it has a valid value."""
(data,) = await self.getMfgToken(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)
(data,) = await self.get_mfg_token(t.EzspMfgTokenId.MFG_CUSTOM_EUI_64)

# Manufacturing tokens do not exist in RCP firmware: all reads are empty
if not data:
Expand Down Expand Up @@ -707,9 +702,7 @@ async def xncp_set_manual_source_route(
)
)

async def xncp_get_board_info_overrides(self) -> tuple[str | None, str | None]:
"""Get board information overrides."""
name_rsp = await self.send_xncp_frame(xncp.GetBoardNameReq())
manuf_rsp = await self.send_xncp_frame(xncp.GetManufNameReq())

return (name_rsp.board_name or None, manuf_rsp.manuf_name or None)
async def xncp_get_mfg_token_override(self, token: t.EzspMfgTokenId) -> bytes:
"""Get manufacturing token override."""
rsp = await self.send_xncp_frame(xncp.GetMfgTokenOverrideReq(token=token))
return rsp.value
42 changes: 18 additions & 24 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
from __future__ import annotations

import dataclasses
import logging
from typing import Callable

import zigpy.types as t

from bellows.types import EmberStatus
from bellows.types import EmberStatus, EzspMfgTokenId

_LOGGER = logging.getLogger(__name__)

COMMANDS: dict[XncpCommandId, type[XncpCommandPayload]] = {}
REV_COMMANDS: dict[type[XncpCommandPayload], XncpCommandId] = {}
Expand Down Expand Up @@ -42,13 +45,11 @@ def __repr__(self) -> str:
class XncpCommandId(t.enum16):
GET_SUPPORTED_FEATURES_REQ = 0x0000
SET_SOURCE_ROUTE_REQ = 0x0001
GET_BOARD_NAME_REQ = 0x0002
GET_MANUF_NAME_REQ = 0x0003
GET_MFG_TOKEN_OVERRIDE_REQ = 0x0002

GET_SUPPORTED_FEATURES_RSP = GET_SUPPORTED_FEATURES_REQ | 0x8000
SET_SOURCE_ROUTE_RSP = SET_SOURCE_ROUTE_REQ | 0x8000
GET_BOARD_NAME_RSP = GET_BOARD_NAME_REQ | 0x8000
GET_MANUF_NAME_RSP = GET_MANUF_NAME_REQ | 0x8000
GET_MFG_TOKEN_OVERRIDE_RSP = GET_MFG_TOKEN_OVERRIDE_REQ | 0x8000

UNKNOWN = 0xFFFF

Expand All @@ -71,7 +72,10 @@ def from_payload(cls, payload: XncpCommandPayload) -> XncpCommand:
def from_bytes(cls, data: bytes) -> XncpCommand:
command_id, data = XncpCommandId.deserialize(data)
status, data = EmberStatus.deserialize(data)
payload = COMMANDS[command_id].deserialize(data)
payload, rest = COMMANDS[command_id].deserialize(data)

if rest:
_LOGGER.debug("Unparsed data remains after %s frame: %s", payload, rest)

return cls(command_id=command_id, status=status, payload=payload)

Expand All @@ -92,8 +96,8 @@ class FirmwareFeatures(t.bitmap32):
# Source routes can be overridden by the application
MANUAL_SOURCE_ROUTE = 1 << 1

# The firmware supports overriding the board name
BOARD_MANUF = 1 << 2
# The firmware supports overriding some manufacturing tokens
MFG_TOKEN_OVERRIDES = 1 << 2


class XncpCommandPayload(t.Struct):
Expand Down Expand Up @@ -121,21 +125,11 @@ class SetSourceRouteRsp(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BOARD_NAME_REQ)
class GetBoardNameReq(XncpCommandPayload):
pass


@register_command(XncpCommandId.GET_BOARD_NAME_RSP)
class GetBoardNameRsp(XncpCommandPayload):
board_name: Bytes


@register_command(XncpCommandId.GET_MANUF_NAME_REQ)
class GetManufNameReq(XncpCommandPayload):
pass
@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_REQ)
class GetMfgTokenOverrideReq(XncpCommandPayload):
token: EzspMfgTokenId


@register_command(XncpCommandId.GET_MANUF_NAME_RSP)
class GetManufNameRsp(XncpCommandPayload):
manuf_name: Bytes
@register_command(XncpCommandId.GET_MFG_TOKEN_OVERRIDE_RSP)
class GetMfgTokenOverrideRsp(XncpCommandPayload):
value: Bytes

0 comments on commit a24c84e

Please sign in to comment.