1 """KNX entity configuration store."""
3 from abc
import ABC, abstractmethod
5 from typing
import Any, Final, TypedDict
14 from ..const
import DOMAIN
15 from .const
import CONF_DATA
17 _LOGGER = logging.getLogger(__name__)
19 STORAGE_VERSION: Final = 1
20 STORAGE_KEY: Final = f
"{DOMAIN}/config_store.json"
22 type KNXPlatformStoreModel = dict[str, dict[str, Any]]
23 type KNXEntityStoreModel = dict[
24 str, KNXPlatformStoreModel
29 """Represent KNX configuration store data."""
31 entities: KNXEntityStoreModel
35 """Entity platform controller base class."""
38 async
def create_entity(self, unique_id: str, config: dict[str, Any]) ->
None:
39 """Create a new entity."""
42 async
def update_entity(
43 self, entity_entry: er.RegistryEntry, config: dict[str, Any]
45 """Update an existing entities configuration."""
49 """Manage KNX config store data."""
54 config_entry: ConfigEntry,
56 """Initialize config store."""
59 self.
_store_store = Store[KNXConfigStoreModel](hass, STORAGE_VERSION, STORAGE_KEY)
61 self._platform_controllers: dict[Platform, PlatformControllerBase] = {}
64 """Load config store data from storage."""
68 "Loaded KNX config data from storage. %s entity platforms",
69 len(self.
datadata[
"entities"]),
73 self, platform: Platform, controller: PlatformControllerBase
75 """Add platform controller."""
76 self._platform_controllers[platform] = controller
79 self, platform: Platform, data: dict[str, Any]
81 """Create a new entity."""
82 platform_controller = self._platform_controllers[platform]
83 unique_id = f
"knx_es_{ulid_now()}"
84 await platform_controller.create_entity(unique_id, data)
86 self.
datadata[
"entities"].setdefault(platform, {})[unique_id] = data
89 entity_registry = er.async_get(self.
hasshass)
90 return entity_registry.async_get_entity_id(platform, DOMAIN, unique_id)
94 """Return KNX entity configuration."""
95 entity_registry = er.async_get(self.
hasshass)
96 if (entry := entity_registry.async_get(entity_id))
is None:
100 CONF_PLATFORM: entry.domain,
101 CONF_DATA: self.
datadata[
"entities"][entry.domain][entry.unique_id],
103 except KeyError
as err:
107 self, platform: Platform, entity_id: str, data: dict[str, Any]
109 """Update an existing entity."""
110 platform_controller = self._platform_controllers[platform]
111 entity_registry = er.async_get(self.
hasshass)
112 if (entry := entity_registry.async_get(entity_id))
is None:
114 unique_id = entry.unique_id
116 platform
not in self.
datadata[
"entities"]
117 or unique_id
not in self.
datadata[
"entities"][platform]
120 f
"Entity not found in storage: {entity_id} - {unique_id}"
122 await platform_controller.update_entity(entry, data)
124 self.
datadata[
"entities"][platform][unique_id] = data
128 """Delete an existing entity."""
129 entity_registry = er.async_get(self.
hasshass)
130 if (entry := entity_registry.async_get(entity_id))
is None:
133 del self.
datadata[
"entities"][entry.domain][entry.unique_id]
134 except KeyError
as err:
136 f
"Entity not found in {entry.domain}: {entry.unique_id}"
138 entity_registry.async_remove(entity_id)
142 """Get entity_ids of all UI configured entities."""
143 entity_registry = er.async_get(self.
hasshass)
145 uid
for platform
in self.
datadata[
"entities"].values()
for uid
in platform
149 for registry_entry
in er.async_entries_for_config_entry(
152 if registry_entry.unique_id
in unique_ids
157 """KNX config store exception."""
list[er.RegistryEntry] get_entity_entries(self)
None add_platform(self, Platform platform, PlatformControllerBase controller)
None update_entity(self, Platform platform, str entity_id, dict[str, Any] data)
dict[str, Any] get_entity_config(self, str entity_id)
str|None create_entity(self, Platform platform, dict[str, Any] data)
None delete_entity(self, str entity_id)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
None async_load(HomeAssistant hass)
None async_save(self, _T data)