diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..eeb8a6e --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +**/__pycache__ diff --git a/README.md b/README.md new file mode 100644 index 0000000..7ca3650 --- /dev/null +++ b/README.md @@ -0,0 +1,168 @@ +# Artur's Labels + +This custom component for Home Assistant expands the use of Labels in the system. The goal is for it to become the one, customizable, powerful system for all the grouping and targeting of entities that a user might ever need. + +As the name implies I made it primarily for my own usage. Since I put in all that work though, I thought it might be useful to some other people as well, so I decided to share it here. + +## Disclaimer + +Right now this is a very early beta release. You might encounter bugs, performance issues and other suboptimal behaviour. Do not even think of installing this without proper backups in place. Due to the nature of the system, this component has to integrate very deeply with Home Assistant internals, making it uniquely susceptible to breakage. This means you have to be very careful with updates, always have a backup ready just in case. + +Because of this early beta state some changes in configuration options and system behaviour might be required in the future. Make sure to always read the release notes and make the necessary adjustments. + +## What does it do? + +Currently the primary functionality of this component is to make it possible for labels to form hierarchies. Any label can become a child of any other label. This means that an entity that is assigned a child label by the user, will be assigned every parent label automatically by the system. + +## Start guide + +To use this component, follow the below steps. + +Before doing anything, read the entire README first. + +### Create labels + +First create all the labels you might need, using the normal UI interface of Home Assistant. + +As an example, let's use the following labels: + +- Home +- Ground floor +- First floor +- Kitchen +- Pantry +- Living Room +- TV Area +- Bedroom 1 +- Bedroom 2 +- Office +- Desk Area +- Stairs +- Low light rooms + +These are just area-related labels. You might also consider making more functional labels, like: + +- Sensors +- Battery devices +- Important Battery devices +- Motion sensors +- Security motion sensors +- Water pumps +- Critical + +Off course there are many other possibilities. The whole point of the system is that it is extremely flexible and can therefore fit a large group of diverse use cases. + +### Get the label ids + +Once you have your labels created, you need to get their `label_id`s. The easiest way is to go to the templates section in the developer tools and use the code: + +```jinja +{% for lbl in labels() -%} +{{ lbl }}: {{ label_name(lbl) }} +{% endfor %} +``` + +Make sure to copy the result and store it safely. + +### Create label relations + +In order to create relations between labels, you need to designate parents of every label. For identifying each label, you use its `label_id`. As an example, you put the following in your `configuration.yaml` file: + +```yaml +arturs_labels: + early_loader_hook: true + labels: + ground_floor: + parents: + - home + pantry: + parents: + - ground_floor + - low_light_rooms + living_room: + parents: + - ground_floor + tv_area: + parents: + - living_room + first_floor: + parents: + - home + stairs: + parents: + - ground_floor + - first_floor + office: + parents: + - first_floor + - low_light_rooms + # all the other area-related labels... + battery_devices: + parents: + - sensors + important_battery_devices: + parents: + - battery_devices + - critical + motion_sensors: + parents: + - sensors + security_motion_sensors: + parents: + - motion_sensors + - critical + water_pumps: + parents: + - critical + +``` + +Each time you make changes to the configuration, you have to restart Home Assistant. + +### Install prerequisites + +Make sure to install [Early Loader](https://github.com/arturpragacz/hass-cc-early-loader) before installing this component. + +### Install the component + +This component can be installed using [HACS](https://hacs.xyz/). + +- Add a new custom repository to HACS (in the three dot menu). +- Insert the link to this repository. +- Select `integration`. +- Click the add button. +- The integration should now display in HACS. +- Install it like every other HACS integration. +- Restart Home Assistant. + +### Assign the labels + +After restarting Home Assistant make sure that everything works correctly. If it does, you can now assign labels to your entities, if you didn't do it previously. + +One unfortunate limitation of the Home Assistant frontend is that it does not distinguish between assigned and effective labels. For this reason special virtual `assign:` labels are created. In order to change the entity labels, you change only those special labels, everything else will be applied automatically each time you save your changes. + +## Usage + +### Service actions + +You can target labels in your services. By targeting a parent label, you will automatically target all the entities of the child labels as well. + +### Devices + +You can assign labels not only to entities, but also to devices. Every label, that you assign to a device, will be acquired by all its entities automatically. + +### Templates + +The template `label_entities(label_name_or_id)` will allow you to get all the entities, for which the specified label is the effective label. The same is true for `label_devices(label_name_or_id)` with respect to devices. On the other hand `labels(entity_id)` will return only directly assigned labels to a given entity. + +### Areas + +This extensive labeling system is meant to effective replace the need for areas. For this reason areas will be completely **disabled**. You will not be able to target an area in service actions or templates. + +### Voice assistants + +Because voice assistants rely on areas, they will not function correctly with respect to those. This applies to both Home Assistant built-in voice functionality, as well as external systems. This is a limitation that I plan to address in the future. + +## Support + +If you want to support this project, then the best way to do it currently is to install it and report any bugs that you encounter. diff --git a/custom_components/arturs_labels/README.md b/custom_components/arturs_labels/README.md new file mode 100644 index 0000000..7ca3650 --- /dev/null +++ b/custom_components/arturs_labels/README.md @@ -0,0 +1,168 @@ +# Artur's Labels + +This custom component for Home Assistant expands the use of Labels in the system. The goal is for it to become the one, customizable, powerful system for all the grouping and targeting of entities that a user might ever need. + +As the name implies I made it primarily for my own usage. Since I put in all that work though, I thought it might be useful to some other people as well, so I decided to share it here. + +## Disclaimer + +Right now this is a very early beta release. You might encounter bugs, performance issues and other suboptimal behaviour. Do not even think of installing this without proper backups in place. Due to the nature of the system, this component has to integrate very deeply with Home Assistant internals, making it uniquely susceptible to breakage. This means you have to be very careful with updates, always have a backup ready just in case. + +Because of this early beta state some changes in configuration options and system behaviour might be required in the future. Make sure to always read the release notes and make the necessary adjustments. + +## What does it do? + +Currently the primary functionality of this component is to make it possible for labels to form hierarchies. Any label can become a child of any other label. This means that an entity that is assigned a child label by the user, will be assigned every parent label automatically by the system. + +## Start guide + +To use this component, follow the below steps. + +Before doing anything, read the entire README first. + +### Create labels + +First create all the labels you might need, using the normal UI interface of Home Assistant. + +As an example, let's use the following labels: + +- Home +- Ground floor +- First floor +- Kitchen +- Pantry +- Living Room +- TV Area +- Bedroom 1 +- Bedroom 2 +- Office +- Desk Area +- Stairs +- Low light rooms + +These are just area-related labels. You might also consider making more functional labels, like: + +- Sensors +- Battery devices +- Important Battery devices +- Motion sensors +- Security motion sensors +- Water pumps +- Critical + +Off course there are many other possibilities. The whole point of the system is that it is extremely flexible and can therefore fit a large group of diverse use cases. + +### Get the label ids + +Once you have your labels created, you need to get their `label_id`s. The easiest way is to go to the templates section in the developer tools and use the code: + +```jinja +{% for lbl in labels() -%} +{{ lbl }}: {{ label_name(lbl) }} +{% endfor %} +``` + +Make sure to copy the result and store it safely. + +### Create label relations + +In order to create relations between labels, you need to designate parents of every label. For identifying each label, you use its `label_id`. As an example, you put the following in your `configuration.yaml` file: + +```yaml +arturs_labels: + early_loader_hook: true + labels: + ground_floor: + parents: + - home + pantry: + parents: + - ground_floor + - low_light_rooms + living_room: + parents: + - ground_floor + tv_area: + parents: + - living_room + first_floor: + parents: + - home + stairs: + parents: + - ground_floor + - first_floor + office: + parents: + - first_floor + - low_light_rooms + # all the other area-related labels... + battery_devices: + parents: + - sensors + important_battery_devices: + parents: + - battery_devices + - critical + motion_sensors: + parents: + - sensors + security_motion_sensors: + parents: + - motion_sensors + - critical + water_pumps: + parents: + - critical + +``` + +Each time you make changes to the configuration, you have to restart Home Assistant. + +### Install prerequisites + +Make sure to install [Early Loader](https://github.com/arturpragacz/hass-cc-early-loader) before installing this component. + +### Install the component + +This component can be installed using [HACS](https://hacs.xyz/). + +- Add a new custom repository to HACS (in the three dot menu). +- Insert the link to this repository. +- Select `integration`. +- Click the add button. +- The integration should now display in HACS. +- Install it like every other HACS integration. +- Restart Home Assistant. + +### Assign the labels + +After restarting Home Assistant make sure that everything works correctly. If it does, you can now assign labels to your entities, if you didn't do it previously. + +One unfortunate limitation of the Home Assistant frontend is that it does not distinguish between assigned and effective labels. For this reason special virtual `assign:` labels are created. In order to change the entity labels, you change only those special labels, everything else will be applied automatically each time you save your changes. + +## Usage + +### Service actions + +You can target labels in your services. By targeting a parent label, you will automatically target all the entities of the child labels as well. + +### Devices + +You can assign labels not only to entities, but also to devices. Every label, that you assign to a device, will be acquired by all its entities automatically. + +### Templates + +The template `label_entities(label_name_or_id)` will allow you to get all the entities, for which the specified label is the effective label. The same is true for `label_devices(label_name_or_id)` with respect to devices. On the other hand `labels(entity_id)` will return only directly assigned labels to a given entity. + +### Areas + +This extensive labeling system is meant to effective replace the need for areas. For this reason areas will be completely **disabled**. You will not be able to target an area in service actions or templates. + +### Voice assistants + +Because voice assistants rely on areas, they will not function correctly with respect to those. This applies to both Home Assistant built-in voice functionality, as well as external systems. This is a limitation that I plan to address in the future. + +## Support + +If you want to support this project, then the best way to do it currently is to install it and report any bugs that you encounter. diff --git a/custom_components/arturs_labels/__init__.py b/custom_components/arturs_labels/__init__.py new file mode 100644 index 0000000..c9bd401 --- /dev/null +++ b/custom_components/arturs_labels/__init__.py @@ -0,0 +1,65 @@ +"""Artur's labels component.""" + +import logging + +import voluptuous as vol + +from homeassistant.core import HomeAssistant +import homeassistant.helpers.config_validation as cv +from homeassistant.helpers.typing import ConfigType + +from .overrides import ( + area_registry as ar, + device_registry as dr, + entity_registry as er, + label_registry as lr, + service as service_helper, +) +from .overrides.config import ( + device_registry as con_dr, + entity_registry as con_er, + label_registry as con_lr, +) + +_LOGGER = logging.getLogger(__name__) + +DOMAIN = "arturs_labels" + +LABEL_SCHEMA = { + str: { + vol.Required("parents", default=[]): vol.All( + cv.ensure_list, [str], vol.util.Set() + ), + }, +} + +CONFIG_SCHEMA = vol.Schema( + { + vol.Required(DOMAIN): { + vol.Required("labels", default=[]): LABEL_SCHEMA, + }, + }, + extra=vol.ALLOW_EXTRA, +) + + +async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool: + """Set up the arturs_labels component.""" + labels_parents = {} + for label_id, label_data in config[DOMAIN]["labels"].items(): + labels_parents[label_id] = label_data["parents"] + + service_helper.async_setup(hass) + + # lr has to be loaded first, because others depend on it + lr.async_load(hass, labels_parents) + + dr.async_load(hass) + er.async_load(hass) + ar.async_load(hass) + + con_lr.async_setup(hass) + con_dr.async_setup(hass) + con_er.async_setup(hass) + + return True diff --git a/custom_components/arturs_labels/manifest.json b/custom_components/arturs_labels/manifest.json new file mode 100644 index 0000000..7211d0a --- /dev/null +++ b/custom_components/arturs_labels/manifest.json @@ -0,0 +1,8 @@ +{ + "domain": "arturs_labels", + "name": "Artur's Labels", + "version": "0.1.0", + "codeowners": ["@arturpragacz"], + "integration_type": "system", + "quality_scale": "internal" +} diff --git a/custom_components/arturs_labels/overrides/__init__.py b/custom_components/arturs_labels/overrides/__init__.py new file mode 100644 index 0000000..4f8bcc0 --- /dev/null +++ b/custom_components/arturs_labels/overrides/__init__.py @@ -0,0 +1 @@ +"""Artur's overrides.""" diff --git a/custom_components/arturs_labels/overrides/area_registry.py b/custom_components/arturs_labels/overrides/area_registry.py new file mode 100644 index 0000000..f45e76a --- /dev/null +++ b/custom_components/arturs_labels/overrides/area_registry.py @@ -0,0 +1,117 @@ +"""Provide a registry for areas.""" + +from __future__ import annotations + +from collections.abc import Mapping +import dataclasses +from dataclasses import dataclass +import logging + +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import area_registry as old_ar # noqa: ICN001 +from homeassistant.helpers.singleton import singleton +from homeassistant.util.hass_dict import HassKey + +_LOGGER = logging.getLogger(__name__) + +DATA_REGISTRY: HassKey[AreaRegistry] = HassKey("arturs_area_registry") + +NULL_LABELS: set[str] = set() + + +@dataclass(frozen=True, kw_only=True) +class AreaEntry(old_ar.AreaEntry): + """Area Registry Entry.""" + + shadow_labels: set[str] + + +class AreaRegistryItems(old_ar.AreaRegistryItems): + """Container for active area registry items, maps area id -> entry.""" + + view: Mapping[str, AreaEntry] + + def __init__(self) -> None: + """Initialize the container.""" + super().__init__() + self.view = self.data # type: ignore [assignment] + + def _index_entry(self, key: str, entry: old_ar.AreaEntry) -> None: + """Index an entry.""" + if type(entry) is not AreaEntry: + entry_dict = dataclasses.asdict(entry) + entry_dict["labels"] = NULL_LABELS + + entry = AreaEntry(**entry_dict, shadow_labels=entry.labels) + self.data[key] = entry + + super()._index_entry(key, entry) + + +class AreaRegistry(old_ar.AreaRegistry): + """Class to hold a registry of devices.""" + + areas: AreaRegistryItems + + _old_registry: old_ar.AreaRegistry + + def __init__(self, hass: HomeAssistant, old_registry: old_ar.AreaRegistry) -> None: + """Initialize the device registry.""" + # pylint: disable=super-init-not-called + self.hass = hass + self._old_registry = old_registry + self._store = old_registry._store # noqa: SLF001 + + @callback + def async_create(self, *args, labels=None, **kwargs) -> old_ar.AreaEntry: + """Create a new area.""" + return super().async_create(*args, **kwargs) + + @callback + def _async_update(self, *args, labels=None, **kwargs) -> old_ar.AreaEntry: + """Update properties of an area.""" + return super()._async_update(*args, **kwargs) + + async def async_load(self) -> None: + """Erase method.""" + raise NotImplementedError + + @callback + def async_load_cb(self) -> None: + """Load the area registry.""" + areas = AreaRegistryItems() + areas.update(self._old_registry.areas) + + self.areas = areas + self._area_data = areas.data + + self._old_registry.areas = self.areas + self._old_registry._area_data = self._area_data # noqa: SLF001 + self._old_registry.__class__ = self.__class__ + + @callback + def _data_to_save(self) -> old_ar.AreasRegistryStoreData: + """Return data of area registry to store in a file.""" + result = super()._data_to_save() + + view = self.areas.view + for area in result["areas"]: + area["labels"] = list(view[area["id"]].shadow_labels) + + return result + + +@callback +@singleton(DATA_REGISTRY) +def async_get(hass: HomeAssistant) -> AreaRegistry: + """Get device registry.""" + old_registry = old_ar.async_get(hass) + return AreaRegistry(hass, old_registry) + + +@callback +def async_load(hass: HomeAssistant) -> None: + """Load device registry.""" + assert DATA_REGISTRY not in hass.data + async_get(hass).async_load_cb() + old_ar.async_get = async_get diff --git a/custom_components/arturs_labels/overrides/config/__init__.py b/custom_components/arturs_labels/overrides/config/__init__.py new file mode 100644 index 0000000..6f6625a --- /dev/null +++ b/custom_components/arturs_labels/overrides/config/__init__.py @@ -0,0 +1 @@ +"""Artur's config overrides.""" diff --git a/custom_components/arturs_labels/overrides/config/device_registry.py b/custom_components/arturs_labels/overrides/config/device_registry.py new file mode 100644 index 0000000..b12abe9 --- /dev/null +++ b/custom_components/arturs_labels/overrides/config/device_registry.py @@ -0,0 +1,45 @@ +"""Websocket API to interact with the device registry.""" + +import logging +from typing import Any + +import homeassistant.components.config.device_registry as old_m +import homeassistant.components.websocket_api as api +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.core import HomeAssistant, callback + +from ..utils import remove_assign_label_id +from .utils import async_setup as async_setup_template + +_LOGGER = logging.getLogger(__name__) + +old_mod: dict[str, api.WebSocketCommandHandler] = {} + + +@callback +def async_setup(hass: HomeAssistant) -> bool: + """Set up the Device Registry WS commands.""" + async_setup_template( + old_m, + old_mod, + (("websocket_update_device", websocket_update_device),), + ) + return True + + +@callback +def websocket_update_device( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """Update device command.""" + raw_labels = msg.get("labels") + if raw_labels is not None: + labels = [] + for raw_label_id in raw_labels: + label_id = remove_assign_label_id(raw_label_id) + if label_id is not None: + labels.append(label_id) + + msg["labels"] = labels + + old_mod["websocket_update_device"](hass, connection, msg) diff --git a/custom_components/arturs_labels/overrides/config/entity_registry.py b/custom_components/arturs_labels/overrides/config/entity_registry.py new file mode 100644 index 0000000..7286a4d --- /dev/null +++ b/custom_components/arturs_labels/overrides/config/entity_registry.py @@ -0,0 +1,45 @@ +"""Websocket API to interact with the entity registry.""" + +import logging +from typing import Any + +import homeassistant.components.config.entity_registry as old_m +import homeassistant.components.websocket_api as api +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.core import HomeAssistant, callback + +from ..utils import remove_assign_label_id +from .utils import async_setup as async_setup_template + +_LOGGER = logging.getLogger(__name__) + +old_mod: dict[str, api.WebSocketCommandHandler] = {} + + +@callback +def async_setup(hass: HomeAssistant) -> bool: + """Set up the Entity Registry WS commands.""" + async_setup_template( + old_m, + old_mod, + (("websocket_update_entity", websocket_update_entity),), + ) + return True + + +@callback +def websocket_update_entity( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """Update entity command.""" + raw_labels = msg.get("labels") + if raw_labels is not None: + labels = [] + for raw_label_id in raw_labels: + label_id = remove_assign_label_id(raw_label_id) + if label_id is not None: + labels.append(label_id) + + msg["labels"] = labels + + old_mod["websocket_update_entity"](hass, connection, msg) diff --git a/custom_components/arturs_labels/overrides/config/label_registry.py b/custom_components/arturs_labels/overrides/config/label_registry.py new file mode 100644 index 0000000..2eb991c --- /dev/null +++ b/custom_components/arturs_labels/overrides/config/label_registry.py @@ -0,0 +1,99 @@ +"""Websocket API to interact with the label registry.""" + +import logging +from typing import Any + +import homeassistant.components.config.label_registry as old_m +import homeassistant.components.websocket_api as api +from homeassistant.components.websocket_api.connection import ActiveConnection +from homeassistant.core import HomeAssistant, callback + +from .. import label_registry as lr +from ..utils import add_assign_label_id, add_assign_label_name +from .utils import async_setup as async_setup_template + +_LOGGER = logging.getLogger(__name__) + +old_mod: dict[str, api.WebSocketCommandHandler] = {} + + +@callback +def async_setup(hass: HomeAssistant) -> bool: + """Set up the Label Registry WS commands.""" + async_setup_template( + old_m, + old_mod, + ( + ("websocket_list_labels", websocket_list_labels), + ("websocket_create_label", websocket_create_label), + ("websocket_delete_label", websocket_delete_label), + ("websocket_update_label", websocket_update_label), + ), + ) + return True + + +@callback +def websocket_list_labels( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """List labels command.""" + registry = lr.async_get(hass) + + labels = [old_m._entry_dict(entry) for entry in registry.async_list_labels()] # noqa: SLF001 + assign_labels = [] + for label in labels: + assign_label = label.copy() + assign_label["label_id"] = add_assign_label_id(assign_label["label_id"]) + assign_label["name"] = add_assign_label_name(assign_label["name"]) + assign_labels.append(assign_label) + + label["name"] = " " + label["name"] + + labels += assign_labels + + connection.send_result(msg["id"], labels) + + +@callback +def websocket_create_label( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """Create label command.""" + name = msg["name"] + if ":" in name: + connection.send_error(msg["id"], "invalid_info", "Cannot create special label") + return + + old_mod["websocket_create_label"](hass, connection, msg) + + +@callback +def websocket_delete_label( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """Delete label command.""" + label_id = msg["label_id"] + if ":" in label_id: + connection.send_error(msg["id"], "invalid_info", "Cannot delete special label") + return + + old_mod["websocket_delete_label"](hass, connection, msg) + + +@callback +def websocket_update_label( + hass: HomeAssistant, connection: ActiveConnection, msg: dict[str, Any] +) -> None: + """Update label command.""" + label_id = msg["label_id"] + if ":" in label_id: + connection.send_error(msg["id"], "invalid_info", "Cannot update special label") + return + + name = msg.get("name") + if name is not None and ":" in name: + connection.send_error(msg["id"], "invalid_info", "Cannot create special label") + return + + old_mod["websocket_update_label"](hass, connection, msg) diff --git a/custom_components/arturs_labels/overrides/config/utils.py b/custom_components/arturs_labels/overrides/config/utils.py new file mode 100644 index 0000000..b4569df --- /dev/null +++ b/custom_components/arturs_labels/overrides/config/utils.py @@ -0,0 +1,20 @@ +"""Config Override utilities.""" + +from collections.abc import Iterable +from types import ModuleType + +import homeassistant.components.websocket_api as api + + +def async_setup( + old_m: ModuleType, + old_mod: dict[str, api.WebSocketCommandHandler], + replacements: Iterable[tuple[str, api.WebSocketCommandHandler]], +) -> bool: + """Set up the Entity Registry WS commands.""" + for name, new in replacements: + old = old_mod[name] = getattr(old_m, name) + new._ws_command = old._ws_command # type: ignore[attr-defined] # noqa: SLF001 + new._ws_schema = old._ws_schema # type: ignore[attr-defined] # noqa: SLF001 + setattr(old_m, name, new) + return True diff --git a/custom_components/arturs_labels/overrides/device_registry.py b/custom_components/arturs_labels/overrides/device_registry.py new file mode 100644 index 0000000..3576686 --- /dev/null +++ b/custom_components/arturs_labels/overrides/device_registry.py @@ -0,0 +1,279 @@ +"""Provide a registry for devices.""" + +from __future__ import annotations + +from collections import defaultdict +from collections.abc import Mapping +from functools import cached_property +import logging +from typing import Any, TypedDict, cast + +import attr + +from homeassistant.core import Event, HomeAssistant, callback +from homeassistant.helpers import device_registry as old_dr # noqa: ICN001 +from homeassistant.helpers.json import json_fragment +from homeassistant.helpers.registry import RegistryIndexType +from homeassistant.helpers.singleton import singleton +from homeassistant.helpers.typing import UNDEFINED +from homeassistant.util.event_type import EventType +from homeassistant.util.hass_dict import HassKey + +from . import label_registry as lr +from .registry import RegistryEntryBase, async_get_effective_labels + +_LOGGER = logging.getLogger(__name__) + +DATA_REGISTRY: HassKey[DeviceRegistry] = HassKey("arturs_device_registry") + +EVENT_DEVICE_REGISTRY_LABELS_UPDATE: EventType[EventDeviceRegistryLabelsUpdateData] = ( + EventType("arturs_device_registry_labels_update") +) + + +class EventDeviceRegistryLabelsUpdateData(TypedDict): + """Event data for when the device labels are updated.""" + + device_id: str + + +type EventDeviceRegistryLabelsUpdate = Event[EventDeviceRegistryLabelsUpdateData] + + +@attr.s(frozen=True) +class DeviceEntry(RegistryEntryBase, old_dr.DeviceEntry): + """Device Registry Entry.""" + + @property + def dict_repr(self) -> dict[str, Any]: + """Return a dict representation of the entry.""" + result = super().dict_repr + result["labels"] = self._frontend_labels + result["area_id"] = self.shadow_area_id + return result + + @cached_property + def as_storage_fragment(self) -> json_fragment: + """Return a json fragment for storage.""" + self.set_area_id_shadow(False) + result = super().as_storage_fragment + self.set_area_id_shadow(True) + return result + + +class ActiveDeviceRegistryItems(old_dr.ActiveDeviceRegistryItems): + """Container for active device registry items, maps device id -> entry. + + Maintains one additional index over base class: + - effective_label -> dict[key, True] + """ + + view: Mapping[str, DeviceEntry] + hass: HomeAssistant + no_devices_for_label: bool = False + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize the container.""" + super().__init__() + self.view = self.data # type: ignore [assignment] + self.hass = hass + self._effective_labels_index: RegistryIndexType = defaultdict(dict) + + def _index_entry(self, key: str, entry: old_dr.DeviceEntry) -> None: + """Index an entry.""" + wrong_type = type(entry) is not DeviceEntry + if wrong_type or cast(DeviceEntry, entry).extra_labels_init: + lab_reg = lr.async_get(self.hass) + + effective_labels = async_get_effective_labels(lab_reg, entry.labels) + + if wrong_type: + entry_dict = attr.asdict( + entry, filter=lambda a, _v: a.init, retain_collection_types=True + ) + entry = DeviceEntry( + **entry_dict, + effective_labels=effective_labels, + ) + else: + entry = cast(DeviceEntry, entry) + entry = attr.evolve( + entry, + effective_labels=effective_labels, + ) + + self.data[key] = entry + else: + entry = cast(DeviceEntry, entry) + entry.set_extra_labels_init() + + super()._index_entry(key, entry) + + # if (area_id := entry.shadow_area_id) is not None: + # self._area_id_index[area_id][key] = True + for label in entry.effective_labels: + self._effective_labels_index[label][key] = True + + def _unindex_entry( + self, + key: str, + replacement_entry: old_dr.DeviceEntry | None = None, + ) -> None: + """Unindex an entry.""" + entry = self.view[key] + + entry.set_area_id_shadow(True) + + super()._unindex_entry(key, replacement_entry) + + # if area_id := entry.shadow_area_id: + # self._unindex_entry_value(key, area_id, self._area_id_index) + if effective_labels := entry.effective_labels: + for label in effective_labels: + self._unindex_entry_value(key, label, self._effective_labels_index) + + def get_devices_for_label( + self, label: str, effective: bool = True + ) -> list[old_dr.DeviceEntry]: + """Get devices for label.""" + if self.no_devices_for_label: + return [] + if effective: + index = self._effective_labels_index + else: + index = self._labels_index + view = self.view + return [view[key] for key in index.get(label, ())] + + +class DeviceRegistry(old_dr.DeviceRegistry): + """Class to hold a registry of devices.""" + + devices: ActiveDeviceRegistryItems + + _old_registry: old_dr.DeviceRegistry + + def __init__( + self, hass: HomeAssistant, old_registry: old_dr.DeviceRegistry + ) -> None: + """Initialize the device registry.""" + # pylint: disable=super-init-not-called + self.hass = hass + self._old_registry = old_registry + self._store = old_registry._store # noqa: SLF001 + + @callback + def async_update_device(self, device_id: str, **kwargs) -> DeviceEntry | None: + """Update properties of a device.""" + old_entry = self.devices.view[device_id] + + fire = False + + labels = kwargs.get("labels", UNDEFINED) + if labels is None or labels == old_entry.labels: + old_entry.set_extra_labels_init(False) + else: + fire = True + + old_entry.set_area_id_shadow(False) + + try: + new_entry = super().async_update_device(device_id, **kwargs) + finally: + # in case the entry didn't change + old_entry.set_extra_labels_init() + old_entry.set_area_id_shadow(True) + + if new_entry is None: + return None + + if fire: + self.hass.bus.async_fire( + EVENT_DEVICE_REGISTRY_LABELS_UPDATE, + EventDeviceRegistryLabelsUpdateData( + device_id=device_id, + ), + ) + + # can change during indexing, so always get the fresh one + return self.devices.view[device_id] + + async def async_load(self) -> None: + """Erase method.""" + raise NotImplementedError + + @callback + def async_load_cb(self) -> None: + """Load the device registry.""" + _async_setup_labels(self.hass, self) + + devices = ActiveDeviceRegistryItems(self.hass) + devices.update(self._old_registry.devices) + + self.devices = devices + self._device_data = devices.data + self.deleted_devices = self._old_registry.deleted_devices + + self._old_registry.devices = self.devices + self._old_registry._device_data = self._device_data # noqa: SLF001 + self._old_registry.__class__ = self.__class__ + + @callback + def async_clear_label_id(self, label_id: str) -> None: + """Clear label from registry entries.""" + for device in self.devices.get_devices_for_label(label_id, effective=False): + self.async_update_device(device.id, labels=device.labels - {label_id}) + + @callback + def async_update_extra_labels(self) -> None: + """Update extra labels in registry entries.""" + lab_reg = lr.async_get(self.hass) + + for device_id, entry in self.devices.view.items(): + effective_labels = async_get_effective_labels(lab_reg, entry.labels) + if effective_labels == entry.effective_labels: + continue + + self.devices[device_id] = attr.evolve( + entry, effective_labels=effective_labels, extra_labels_init=False + ) + + data: old_dr._EventDeviceRegistryUpdatedData_Update = { + "action": "update", + "device_id": device_id, + "changes": {}, + } + + self.hass.bus.async_fire(old_dr.EVENT_DEVICE_REGISTRY_UPDATED, data) + + +@callback +@singleton(DATA_REGISTRY) +def async_get(hass: HomeAssistant) -> DeviceRegistry: + """Get device registry.""" + old_registry = old_dr.async_get(hass) + return DeviceRegistry(hass, old_registry) + + +@callback +def async_load(hass: HomeAssistant) -> None: + """Load device registry.""" + assert DATA_REGISTRY not in hass.data + async_get(hass).async_load_cb() + old_dr.async_get = async_get + + +@callback +def _async_setup_labels(hass: HomeAssistant, registry: DeviceRegistry) -> None: + """Clean up entities caches when labels ancestry updated.""" + + @callback + def _handle_label_registry_ancestry_update( + event: lr.EventLabelRegistryAncestryUpdated, + ) -> None: + registry.async_update_extra_labels() + + hass.bus.async_listen( + event_type=lr.EVENT_LABEL_REGISTRY_ANCESTRY_UPDATED, + listener=_handle_label_registry_ancestry_update, + ) diff --git a/custom_components/arturs_labels/overrides/entity_registry.py b/custom_components/arturs_labels/overrides/entity_registry.py new file mode 100644 index 0000000..377240f --- /dev/null +++ b/custom_components/arturs_labels/overrides/entity_registry.py @@ -0,0 +1,324 @@ +"""Provide a registry for entities.""" + +from __future__ import annotations + +from collections import defaultdict +from collections.abc import Callable, Mapping +from functools import cached_property +import logging +from typing import Any, cast + +import attr + +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers import ( + device_registry as old_dr, # noqa: ICN001 + entity_registry as old_er, # noqa: ICN001 +) +from homeassistant.helpers.json import json_fragment +from homeassistant.helpers.registry import RegistryIndexType +from homeassistant.helpers.singleton import singleton +from homeassistant.helpers.typing import UNDEFINED +from homeassistant.util.hass_dict import HassKey + +from . import device_registry as dr, label_registry as lr +from .registry import RegistryEntryBase, async_get_effective_labels + +_LOGGER = logging.getLogger(__name__) + +DATA_REGISTRY: HassKey[EntityRegistry] = HassKey("arturs_entity_registry") + + +@attr.s(frozen=True) +class RegistryEntry(RegistryEntryBase, old_er.RegistryEntry): + """Entity Registry Entry.""" + + assigned_labels: set[str] = attr.ib(factory=set) + + @property + def _as_display_dict(self) -> dict[str, Any] | None: + """Return a partial dict representation of the entry. + + This version only includes what's needed for display. + Returns None if there's no data needed for display. + """ + display_dict = cast(dict[str, Any], super()._as_display_dict) + display_dict["lb"] = self._frontend_labels + display_dict["ai"] = self.shadow_area_id + return display_dict + + @cached_property + def as_partial_dict(self) -> dict[str, Any]: + """Return a partial dict representation of the entry.""" + partial_dict = super().as_partial_dict + partial_dict["labels"] = self._frontend_labels + partial_dict["area_id"] = self.shadow_area_id + return partial_dict + + @cached_property + def as_storage_fragment(self) -> json_fragment: + """Return a json fragment for storage.""" + self.set_area_id_shadow(False) + result = super().as_storage_fragment + self.set_area_id_shadow(True) + return result + + +class EntityRegistryItems(old_er.EntityRegistryItems): + """Container for entity registry items, maps entity_id -> entry. + + Maintains one additional index over base class: + - effective_label -> dict[key, True] + """ + + view: Mapping[str, RegistryEntry] + hass: HomeAssistant + + def __init__(self, hass: HomeAssistant) -> None: + """Initialize the containold_er.""" + super().__init__() + self.view = self.data # type: ignore [assignment] + self.hass = hass + self._effective_labels_index: RegistryIndexType = defaultdict(dict) + + def _index_entry(self, key: str, entry: old_er.RegistryEntry) -> None: + """Index an entry.""" + wrong_type = type(entry) is not RegistryEntry + if wrong_type or cast(RegistryEntry, entry).extra_labels_init: + lab_reg = lr.async_get(self.hass) + dev_reg = old_dr.async_get(self.hass) + + assigned_labels = _async_get_assigned_labels(dev_reg, entry) + effective_labels = async_get_effective_labels(lab_reg, assigned_labels) + + if wrong_type: + entry_dict = attr.asdict( + entry, filter=lambda a, _v: a.init, retain_collection_types=True + ) + entry = RegistryEntry( + **entry_dict, + assigned_labels=assigned_labels, + effective_labels=effective_labels, + ) + else: + entry = cast(RegistryEntry, entry) + entry = attr.evolve( + entry, + assigned_labels=assigned_labels, + effective_labels=effective_labels, + ) + + self.data[key] = entry + else: + entry = cast(RegistryEntry, entry) + entry.set_extra_labels_init() + + super()._index_entry(key, entry) + + # if (area_id := entry.shadow_area_id) is not None: + # self._area_id_index[area_id][key] = True + for label in entry.effective_labels: + self._effective_labels_index[label][key] = True + + def _unindex_entry( + self, + key: str, + replacement_entry: old_er.RegistryEntry | None = None, + ) -> None: + """Unindex an entry.""" + entry = self.view[key] + + entry.set_area_id_shadow(True) + + super()._unindex_entry(key, replacement_entry) + + # if area_id := entry.shadow_area_id: + # self._unindex_entry_value(key, area_id, self._area_id_index) + if effective_labels := entry.effective_labels: + for label in effective_labels: + self._unindex_entry_value(key, label, self._effective_labels_index) + + def get_entries_for_label( + self, label: str, effective: bool = True + ) -> list[old_er.RegistryEntry]: + """Get entries for label.""" + if effective: + index = self._effective_labels_index + else: + index = self._labels_index + view = self.view + return [view[key] for key in index.get(label, ())] + + +class EntityRegistry(old_er.EntityRegistry): + """Class to hold a registry of entities.""" + + entities: EntityRegistryItems + + _old_registry: old_er.EntityRegistry + + def __init__( + self, hass: HomeAssistant, old_registry: old_er.EntityRegistry + ) -> None: + """Initialize the entity registry.""" + # pylint: disable=super-init-not-called + self.hass = hass + self._old_registry = old_registry + self._store = old_registry._store # noqa: SLF001 + + @callback + def async_update_entity(self, entity_id: str, **kwargs) -> RegistryEntry: + """Update properties of an entity.""" + old_entry = self.entities.view[entity_id] + + labels = kwargs.get("labels", UNDEFINED) + device_id = kwargs.get("device_id", UNDEFINED) + if (labels is UNDEFINED or labels == old_entry.labels) and ( + device_id is UNDEFINED or device_id == old_entry.device_id + ): + old_entry.set_extra_labels_init(False) + + old_entry.set_area_id_shadow(False) + + try: + new_entry = super().async_update_entity(entity_id, **kwargs) + entity_id = new_entry.entity_id # could change during update + finally: + # in case the entry didn't change + old_entry.set_extra_labels_init() + old_entry.set_area_id_shadow(True) + + # can change during indexing, so always get the fresh one + # although we don't care in async_get_or_create, so maybe here we don't have to also + return self.entities.view[entity_id] + + async def async_load(self) -> None: + """Erase method.""" + raise NotImplementedError + + @callback + def async_load_cb(self) -> None: + """Load the entity registry.""" + _async_setup_labels(self.hass, self) + + entities = EntityRegistryItems(self.hass) + entities.update(self._old_registry.entities) + + self.entities = entities + self._entities_data = entities.data + self.deleted_entities = self._old_registry.deleted_entities + + self._old_registry.entities = self.entities + self._old_registry._entities_data = self._entities_data # noqa: SLF001 + self._old_registry.__class__ = self.__class__ + + @callback + def async_clear_label_id(self, label_id: str) -> None: + """Clear label from registry entries.""" + for entry in self.entities.get_entries_for_label(label_id, effective=False): + self.async_update_entity(entry.entity_id, labels=entry.labels - {label_id}) + + def _async_update_extra_labels(self, filter: Callable[[str], bool]) -> None: + """Update extra labels in registry entries.""" + lab_reg = lr.async_get(self.hass) + dev_reg = old_dr.async_get(self.hass) + + for entity_id, entry in self.entities.view.items(): + if not filter(entity_id): + continue + + assigned_labels = _async_get_assigned_labels(dev_reg, entry) + effective_labels = async_get_effective_labels(lab_reg, assigned_labels) + if ( + assigned_labels == entry.assigned_labels + and effective_labels == entry.effective_labels + ): + continue + + self.entities[entity_id] = attr.evolve( + entry, + assigned_labels=assigned_labels, + effective_labels=effective_labels, + extra_labels_init=False, + ) + + data: old_er._EventEntityRegistryUpdatedData_Update = { + "action": "update", + "entity_id": entity_id, + "changes": {}, + } + + self.hass.bus.async_fire(old_er.EVENT_ENTITY_REGISTRY_UPDATED, data) + + @callback + def async_update_from_device_extra_labels(self, device_id: str) -> None: + """Update from device extra labels in registry entries.""" + self._async_update_extra_labels( + lambda entity_id: self.entities[entity_id].device_id == device_id + ) + + @callback + def async_update_all_extra_labels(self) -> None: + """Update all extra labels in registry entries.""" + self._async_update_extra_labels(lambda entity: True) + + +@callback +@singleton(DATA_REGISTRY) +def async_get(hass: HomeAssistant) -> EntityRegistry: + """Get entity registry.""" + old_registry = old_er.async_get(hass) + return EntityRegistry(hass, old_registry) + + +@callback +def async_load(hass: HomeAssistant) -> None: + """Load entity registry.""" + assert DATA_REGISTRY not in hass.data + async_get(hass).async_load_cb() + old_er.async_get = async_get + + +@callback +def _async_get_assigned_labels( + dev_reg: old_dr.DeviceRegistry, entry: old_er.RegistryEntry +) -> set[str]: + """Get assigned labels for entity.""" + labels = entry.labels + + device_id = entry.device_id + if device_id is None: + return labels + + device = dev_reg.async_get(device_id) + if device is None: + return labels + + return labels | device.labels + + +@callback +def _async_setup_labels(hass: HomeAssistant, registry: EntityRegistry) -> None: + """Clean up entities caches when labels ancestry updated.""" + + @callback + def _handle_label_registry_ancestry_update( + event: lr.EventLabelRegistryAncestryUpdated, + ) -> None: + registry.async_update_all_extra_labels() + + hass.bus.async_listen( + event_type=lr.EVENT_LABEL_REGISTRY_ANCESTRY_UPDATED, + listener=_handle_label_registry_ancestry_update, + ) + + @callback + def _handle_device_registry_labels_update( + event: dr.EventDeviceRegistryLabelsUpdate, + ) -> None: + registry.async_update_from_device_extra_labels(event.data["device_id"]) + + hass.bus.async_listen( + event_type=dr.EVENT_DEVICE_REGISTRY_LABELS_UPDATE, + listener=_handle_device_registry_labels_update, + ) diff --git a/custom_components/arturs_labels/overrides/label_registry.py b/custom_components/arturs_labels/overrides/label_registry.py new file mode 100644 index 0000000..34d1e16 --- /dev/null +++ b/custom_components/arturs_labels/overrides/label_registry.py @@ -0,0 +1,280 @@ +"""Provide a registry for labels.""" + +from __future__ import annotations + +from collections.abc import Iterable, Mapping +import dataclasses +from dataclasses import dataclass, field +import logging +from typing import Any, TypedDict + +from homeassistant.core import Event, HomeAssistant, callback +from homeassistant.helpers import label_registry as old_lr # noqa: ICN001 +from homeassistant.helpers.normalized_name_base_registry import ( + NormalizedNameBaseRegistryItems, +) +from homeassistant.helpers.singleton import singleton +from homeassistant.util.event_type import EventType +from homeassistant.util.hass_dict import HassKey + +_LOGGER = logging.getLogger(__name__) + +DATA_REGISTRY: HassKey[LabelRegistry] = HassKey("arturs_label_registry") + +EVENT_LABEL_REGISTRY_ANCESTRY_UPDATED: EventType[ + EventLabelRegistryAncestryUpdatedData +] = EventType("arturs_label_registry_ancestry_updated") + + +class EventLabelRegistryAncestryUpdatedData(TypedDict): + """Event data for when the label ancestry is updated.""" + + +type EventLabelRegistryAncestryUpdated = Event[EventLabelRegistryAncestryUpdatedData] + + +@dataclass(slots=True, frozen=True, kw_only=True) +class LabelEntry(old_lr.LabelEntry): + """Label Registry Entry.""" + + mut: dict = field( + default_factory=lambda: { + "parents": None, # : set[str] | None; does not include self; can be None + "ancestors": None, # : set[str] | None; includes self, if not None; can be None + "equivalents": None, # : set[str] | None; includes self, if not None; can be None + } + ) + + @property + def parents(self) -> set[str] | None: + """Parents.""" + return self.mut["parents"] + + @property + def ancestors(self) -> set[str] | None: + """Ancestors.""" + return self.mut["ancestors"] + + @property + def equivalents(self) -> set[str] | None: + """Equivalents.""" + return self.mut["equivalents"] + + +def _is_label_special(label_id: str) -> bool: + return ":" in label_id + + +class LabelRegistryItems(NormalizedNameBaseRegistryItems[old_lr.LabelEntry]): + """Container for label registry items, maps label_id -> entry.""" + + view: Mapping[str, LabelEntry] + + def __init__(self) -> None: + """Initialize the container.""" + super().__init__() + self.view = self.data # type: ignore [assignment] + + def _index_entry(self, key: str, entry: old_lr.LabelEntry) -> None: + """Index an entry.""" + if type(entry) is not LabelEntry: + entry = self.data[key] = LabelEntry(**dataclasses.asdict(entry)) + + super()._index_entry(key, entry) + + +class LabelRegistry(old_lr.LabelRegistry): + """Class to hold a registry of labels.""" + + labels: LabelRegistryItems + + _old_registry: old_lr.LabelRegistry + _parents: dict[str, set[str]] + + def __init__(self, hass: HomeAssistant, old_registry: old_lr.LabelRegistry) -> None: + """Initialize the label registry.""" + # pylint: disable=super-init-not-called + self.hass = hass + self._old_registry = old_registry + self._store = old_registry._store # noqa: SLF001 + + @callback + def async_create(self, *args, **kwargs) -> old_lr.LabelEntry: + """Create a new label.""" + label = super().async_create(*args, **kwargs) + self._async_compute_ancestry() + return label + + @callback + def async_delete(self, label_id: str) -> None: + """Delete label.""" + super().async_delete(label_id) + self._async_compute_ancestry() + + async def async_load(self) -> None: + """Erase method.""" + raise NotImplementedError + + @callback + def async_load_cb(self) -> None: + """Load the label registry.""" + labels = LabelRegistryItems() + labels.update(self._old_registry.labels) + + self.labels = labels + self._label_data = labels.data + + self._old_registry.labels = self.labels + self._old_registry._label_data = self._label_data # noqa: SLF001 + self._old_registry.__class__ = self.__class__ + + @callback + def async_load_parents( + self, labels_parents: dict[str, set[str]], *, fire: bool = True + ): + """Load the labels ancestry.""" + labels_parents = { + item[0]: item[1] + for item in labels_parents.items() + if not _is_label_special(item[0]) + } + for label_id, parents in labels_parents.items(): + parents.discard(label_id) + discards = [parent for parent in parents if _is_label_special(parent)] + parents.difference_update(discards) + + self._parents = labels_parents + self._async_compute_ancestry(fire=fire) + + def _async_compute_ancestry(self, *, fire: bool = True) -> None: + all_label_ids = self.labels.keys() + for label in self.labels.view.values(): + label.mut["ancestors"] = None + parents = self._parents.get(label.label_id, None) + if parents is None: + label.mut["parents"] = None + else: + real_parents = parents & all_label_ids + label.mut["parents"] = real_parents + + self._async_do_compute_ancestry() + + if fire: + self.hass.bus.async_fire( + EVENT_LABEL_REGISTRY_ANCESTRY_UPDATED, + EventLabelRegistryAncestryUpdatedData(), + ) + + def _async_do_compute_ancestry(self) -> None: + indices: dict[str, int] = {} + for label_id in self.labels: + indices[label_id] = -1 + + count = 0 + equivalents_stack: list[str] = [] + + def compute_ancestry_impl(label: LabelEntry) -> int | None: + """Compute ancestry and return encountered cycles. + + Uses DFS and modified Tarjan's strongly connected components algorithm. + + index == -1 -> not visited + index > 0 -> visiting + index == 0 -> visited + + result = 0 -> prev visited + result = index -> prev visiting + result = lowlink -> normal recurse + """ + label_id = label.label_id + + index = indices[label_id] + if index >= 0: # either already visited or a cycle + return index + + nonlocal count, equivalents_stack + count += 1 + index = indices[label_id] = count + stack_index = len(equivalents_stack) + + parents = label.parents + ancestors: set[str] = set() + lowlink = index + if parents is not None: + for parent_id in parents: + parent = self.labels.view[parent_id] + result = compute_ancestry_impl(parent) + + if parent.ancestors is not None: + ancestors |= parent.ancestors + else: + ancestors.add(parent_id) + + if result: + lowlink = min(lowlink, result) + + if ancestors: + ancestors.add(label_id) + label.mut["ancestors"] = ancestors + + if index == lowlink: # a root node, marks the boundary of SCC + if len(equivalents_stack) > stack_index: + equivalents = set(equivalents_stack[stack_index:]) + for equivalent_id in equivalents: + equivalent = self.labels.view[equivalent_id] + equivalent.mut["ancestors"] = ancestors + equivalent.mut["equivalents"] = equivalents + + equivalents.add(label_id) + label.mut["equivalents"] = equivalents + + del equivalents_stack[stack_index:] + else: + equivalents_stack.append(label_id) + + indices[label_id] = 0 + + return lowlink + + for label_id, index in indices.items(): + if index: + label = self.labels.view[label_id] + compute_ancestry_impl(label) + + @callback + def async_get_ancestors(self, label_ids: Iterable[str]) -> set[str]: + """Get labels' ancestors.""" + ancestors = set() + + for label_id in label_ids: + label = self.labels.view.get(label_id) + if label is not None: + if label.ancestors is not None: + ancestors |= label.ancestors + else: + ancestors.add(label_id) + + # label may have been removed, but ancestry not yet recalculated + # so let's remove the bad labels here to have some form of consistency in output + all_label_ids = self.labels.keys() + ancestors &= all_label_ids + + return ancestors + + +@callback +@singleton(DATA_REGISTRY) +def async_get(hass: HomeAssistant) -> LabelRegistry: + """Get label registry.""" + old_registry = old_lr.async_get(hass) + return LabelRegistry(hass, old_registry) + + +@callback +def async_load(hass: HomeAssistant, labels_parents: dict[str, Any]) -> None: + """Load label registry.""" + assert DATA_REGISTRY not in hass.data + registry = async_get(hass) + registry.async_load_cb() + registry.async_load_parents(labels_parents, fire=False) + old_lr.async_get = async_get diff --git a/custom_components/arturs_labels/overrides/registry.py b/custom_components/arturs_labels/overrides/registry.py new file mode 100644 index 0000000..eeab394 --- /dev/null +++ b/custom_components/arturs_labels/overrides/registry.py @@ -0,0 +1,49 @@ +"""Provide a registry base.""" + +from functools import cached_property + +import attr + +from homeassistant.core import callback + +from . import label_registry as lr +from .utils import add_assign_label_id + +NULL_AREA: None = None + + +@attr.s(frozen=True, kw_only=True) +class RegistryEntryBase: + """Registry Entry Base.""" + + labels: set[str] = attr.ib(factory=set) + effective_labels: set[str] = attr.ib(factory=set) + extra_labels_init: bool = attr.ib(default=True) + + area_id: str | None = attr.ib(init=False, default=NULL_AREA) + shadow_area_id: str | None = attr.ib(alias="area_id", default=None) + + def set_extra_labels_init(self, value=True) -> None: + """Set effective labels init.""" + self.__dict__["extra_labels_init"] = value + + @cached_property + def _frontend_labels(self) -> list[str]: + labels = list(self.effective_labels) + labels += [add_assign_label_id(label_id) for label_id in self.labels] + return labels + + def set_area_id_shadow(self, shadow) -> None: + """Set area_id.""" + if shadow: + self.__dict__["area_id"] = NULL_AREA + else: + self.__dict__["area_id"] = self.shadow_area_id + + +@callback +def async_get_effective_labels( + lab_reg: lr.LabelRegistry, assigned_labels: set[str] +) -> set[str]: + """Get effective labels.""" + return lab_reg.async_get_ancestors(assigned_labels) diff --git a/custom_components/arturs_labels/overrides/service.py b/custom_components/arturs_labels/overrides/service.py new file mode 100644 index 0000000..bcbc37c --- /dev/null +++ b/custom_components/arturs_labels/overrides/service.py @@ -0,0 +1,46 @@ +"""The methods for loading Home Assistant integrations.""" + +from collections.abc import Callable + +from homeassistant.core import HomeAssistant, ServiceCall, callback +import homeassistant.helpers.service as old_service +from homeassistant.helpers.service import SelectedEntities, ServiceTargetSelector +from homeassistant.loader import bind_hass + +from . import device_registry as dr + +old_func: Callable[[HomeAssistant, ServiceCall, bool], SelectedEntities] | None = None + + +@callback +def async_setup(hass: HomeAssistant) -> bool: + """Set up the services helper.""" + global old_func # pylint: disable=global-statement # noqa: PLW0603 + old_func = old_service.async_extract_referenced_entity_ids + old_service.async_extract_referenced_entity_ids = ( + async_extract_referenced_entity_ids + ) + return True + + +@bind_hass +def async_extract_referenced_entity_ids( + hass: HomeAssistant, service_call: ServiceCall, *args, **kwargs +) -> SelectedEntities: + """Extract referenced entity IDs from a service call.""" + dev_reg = dr.async_get(hass) + + dev_reg.devices.no_devices_for_label = True + assert old_func + try: + result = old_func(hass, service_call, *args, **kwargs) + finally: + dev_reg.devices.no_devices_for_label = False + + selector = ServiceTargetSelector(service_call) + if selector.label_ids: + for label_id in selector.label_ids: + for device_entry in dev_reg.devices.get_devices_for_label(label_id): + result.referenced_devices.add(device_entry.id) + + return result diff --git a/custom_components/arturs_labels/overrides/utils.py b/custom_components/arturs_labels/overrides/utils.py new file mode 100644 index 0000000..00c4b93 --- /dev/null +++ b/custom_components/arturs_labels/overrides/utils.py @@ -0,0 +1,21 @@ +"""Override utilities.""" + + +def add_assign_label_id(label_id: str) -> str: + """Add assign label id.""" + return "assign:" + label_id + + +def remove_assign_label_id(label_id: str) -> str | None: + """Remove assign label id.""" + colon = label_id.rfind(":") + if colon == -1: + return None + if label_id[:colon] != "assign": + return None + return label_id[colon + 1 :] + + +def add_assign_label_name(name: str) -> str: + """Add assign label name.""" + return "assign: " + name diff --git a/hacs.json b/hacs.json new file mode 100644 index 0000000..fc6edfe --- /dev/null +++ b/hacs.json @@ -0,0 +1,5 @@ +{ + "name": "Artur's Labels", + "render_readme": true, + "homeassistant": "2024.7.0" +}