From 180cfa29918d63eafd524d1e5640503f4509d1a7 Mon Sep 17 00:00:00 2001 From: Alex X Date: Fri, 29 Mar 2024 17:29:12 +0300 Subject: [PATCH] Add migration for devices --- custom_components/xiaomi_gateway3/__init__.py | 1 + .../xiaomi_gateway3/hass/hass_utils.py | 79 +++++++++++-------- tests/test_migrate.py | 75 +++++++++--------- 3 files changed, 84 insertions(+), 71 deletions(-) diff --git a/custom_components/xiaomi_gateway3/__init__.py b/custom_components/xiaomi_gateway3/__init__.py index 6586c61f..77f96468 100644 --- a/custom_components/xiaomi_gateway3/__init__.py +++ b/custom_components/xiaomi_gateway3/__init__.py @@ -131,6 +131,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry): if config_entry.version == 1: + hass_utils.migrate_legacy_devices_unique_id(hass) hass_utils.migrate_legacy_entitites_unique_id(hass) hass_utils.migrate_devices_store() return True diff --git a/custom_components/xiaomi_gateway3/hass/hass_utils.py b/custom_components/xiaomi_gateway3/hass/hass_utils.py index 9bbfd473..748e093b 100644 --- a/custom_components/xiaomi_gateway3/hass/hass_utils.py +++ b/custom_components/xiaomi_gateway3/hass/hass_utils.py @@ -207,18 +207,44 @@ def remove_stats_entities(hass: HomeAssistant, config_entry: ConfigEntry): registry.async_remove(entity_id) +def migrate_legacy_devices_unique_id(hass: HomeAssistant): + registry = device_registry.async_get(hass) + for device in registry.devices.values(): + try: + if not any( + i[1] != migrate_uid(i[1]) for i in device.identifiers if i[0] == DOMAIN + ): + continue + + new_identifiers = { + (DOMAIN, migrate_uid(i[1])) if i[0] == DOMAIN else i + for i in device.identifiers + } + registry.async_update_device(device.id, new_identifiers=new_identifiers) + except Exception as e: + _LOGGER.warning(f"Migration error for {device}: {repr(e)}") + + def migrate_legacy_entitites_unique_id(hass: HomeAssistant): registry = entity_registry.async_get(hass) - for entity in list(registry.entities.values()): - if entity.platform != DOMAIN: + for entry in list(registry.entities.values()): + if entry.platform != DOMAIN: continue try: - if new_uid := check_entity_unique_id(entity): - _LOGGER.info(f"Migrate {entity.entity_id} to unique_id: {new_uid}") - registry.async_update_entity(entity.entity_id, new_unique_id=new_uid) + # split mac and attr in unique id + uid, attr = entry.unique_id.split("_", 1) + + new_uid = migrate_uid(uid) + new_attr = migrate_attr(attr, entry.original_device_class) + if uid == new_uid and attr == new_attr: + continue + + new_unique_id = f"{uid}_{attr}" + _LOGGER.info(f"Migrate {entry.entity_id} to unique_id: {new_unique_id}") + registry.async_update_entity(entry.entity_id, new_unique_id=new_unique_id) except Exception as e: - _LOGGER.warning(f"Migration error for {entity}: {repr(e)}") + _LOGGER.warning(f"Migration error for {entry}: {repr(e)}") def migrate_devices_store(): @@ -237,46 +263,35 @@ def migrate_devices_store(): v["last_report_ts"] = v.pop("last_decode_ts") -def check_entity_unique_id(registry_entry: RegistryEntry) -> str | None: - has_update = False - - # split mac and attr in unique id - uid, attr = registry_entry.unique_id.split("_", 1) - +def migrate_uid(uid: str) -> str: if uid.startswith("0x"): # ZIGBEE format should be "0x" + 16 hex lowercase if len(uid) < 18: - uid = f"0x{uid[2:]:>016s}" - has_update = True + return f"0x{uid[2:]:>016s}" elif len(uid) == 12: # GATEWAY, BLE, MESH format should be 12 hex lowercase if uid.isupper(): - uid = uid.lower() - has_update = True - elif not uid.startswith("group"): + return uid.lower() + elif len(uid) == 16: # GROUP format should be "group" + big int - if registry_entry.original_icon == "mdi:lightbulb-group": - did = int.from_bytes(bytes.fromhex(uid), "big") - uid = f"group{did}" - has_update = True + did = int.from_bytes(bytes.fromhex(uid), "big") + return f"group{did}" + return uid + +def migrate_attr(attr: str, device_class: str) -> str: if attr == "switch": # attr for "plug" and "outlet" should be not "switch" - if registry_entry.original_device_class in ("plug", "outlet"): - attr = registry_entry.original_device_class - has_update = True + if device_class in ("plug", "outlet"): + return device_class elif attr.startswith("channel ") or attr.endswith(" density"): # spaces in attr was by mistake - attr = attr.replace(" ", "_") - has_update = True + return attr.replace(" ", "_") elif attr == "pressure_state": - attr = "pressure" - has_update = True + return "pressure" elif attr == "occupancy_distance": - attr = "distance" - has_update = True - - return f"{uid}_{attr}" if has_update else None + return "distance" + return attr def remove_device(hass: HomeAssistant, device: XDevice): diff --git a/tests/test_migrate.py b/tests/test_migrate.py index 3e70d812..38728cd0 100644 --- a/tests/test_migrate.py +++ b/tests/test_migrate.py @@ -1,47 +1,44 @@ -from homeassistant.helpers.entity_registry import RegistryEntry +from custom_components.xiaomi_gateway3.core.const import DOMAIN +from custom_components.xiaomi_gateway3.hass.hass_utils import migrate_uid, migrate_attr -from custom_components.xiaomi_gateway3.hass.hass_utils import check_entity_unique_id +def test_migrate_uid(): + def new_identifiers(identifiers: set) -> set | None: + assert any(i[1] != migrate_uid(i[1]) for i in identifiers if i[0] == DOMAIN) + return { + (DOMAIN, migrate_uid(i[1])) if i[0] == DOMAIN else i for i in identifiers + } -def test_migrate_entity_unique_id(): - eid = "sensor.dummy" + p = new_identifiers({(DOMAIN, "0x158d000fffffff")}) + assert p == {("xiaomi_gateway3", "0x00158d000fffffff")} - entry = RegistryEntry( - entity_id=eid, - unique_id="0x158d000fffffff_gas density", - platform="", - ) - assert check_entity_unique_id(entry) == "0x00158d000fffffff_gas_density" + p = new_identifiers({(DOMAIN, "14ae46fffffff000")}) + assert p == {("xiaomi_gateway3", "group1490206592031780864")} - entry = RegistryEntry( - entity_id=eid, - unique_id="0x158d000fffffff_smoke density", - platform="", + p = new_identifiers( + {(DOMAIN, "50EC50FFFFFF"), (DOMAIN, "50ec50ffffff"), ("dummy", "id")} ) - assert check_entity_unique_id(entry) == "0x00158d000fffffff_smoke_density" + assert p == {("dummy", "id"), ("xiaomi_gateway3", "50ec50ffffff")} - entry = RegistryEntry( - entity_id=eid, - unique_id="50EC50FFFFFF_light", - platform="", - ) - assert check_entity_unique_id(entry) == "50ec50ffffff_light" - - entry = RegistryEntry( - entity_id=eid, - unique_id="0x158d000fffffff_switch", - platform="", - original_device_class="plug", - original_icon="mdi:power-plug", - ) - assert check_entity_unique_id(entry) == "0x00158d000fffffff_plug" - - entry = RegistryEntry( - entity_id=eid, - unique_id="13e81ad1f34ab000_light", - platform="", - original_device_class=None, - original_icon="mdi:lightbulb-group", - ) - assert check_entity_unique_id(entry) == "group1434425970349748224_light" +def test_migrate_entity_unique_id(): + def new_unique_id(unique_id: str, original_device_class: str = None): + uid, attr = unique_id.split("_", 1) + new_uid = migrate_uid(uid) + new_attr = migrate_attr(attr, original_device_class) + return f"{new_uid}_{new_attr}" + + p = new_unique_id("0x158d000fffffff_gas density") + assert p == "0x00158d000fffffff_gas_density" + + p = new_unique_id("0x158d000fffffff_smoke density") + assert p == "0x00158d000fffffff_smoke_density" + + p = new_unique_id("50EC50FFFFFF_light") + assert p == "50ec50ffffff_light" + + p = new_unique_id("0x158d000fffffff_switch", "plug") + assert p == "0x00158d000fffffff_plug" + + p = new_unique_id("13e81ad1f34ab000_light") + assert p == "group1434425970349748224_light"