1 """Helper to deal with YAML + storage."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
7 from collections.abc
import Awaitable, Callable, Coroutine, Iterable
8 from dataclasses
import dataclass
9 from functools
import partial
10 from hashlib
import md5
11 from itertools
import groupby
13 from operator
import attrgetter
14 from typing
import Any, Generic, TypedDict
16 from typing_extensions
import TypeVar
17 import voluptuous
as vol
18 from voluptuous.humanize
import humanize_error
26 from .
import entity_registry
27 from .entity
import Entity
28 from .entity_component
import EntityComponent
29 from .json
import json_bytes
30 from .storage
import Store
31 from .typing
import ConfigType, VolDictType
36 CHANGE_ADDED =
"added"
37 CHANGE_UPDATED =
"updated"
38 CHANGE_REMOVED =
"removed"
40 _EntityT = TypeVar(
"_EntityT", bound=Entity, default=Entity)
43 @dataclass(slots=True)
45 """Class to represent an item in a change set.
47 change_type: One of CHANGE_*
48 item_id: The id of the item
55 item_hash: str |
None =
None
58 type ChangeListener = Callable[
70 type ChangeSetListener = Callable[[Iterable[CollectionChange]], Awaitable[
None]]
74 """Base class for collection related errors."""
77 class ItemNotFound(CollectionError):
78 """Raised when an item is not found."""
81 """Initialize item not found error."""
82 super().
__init__(f
"Item {item_id} not found.")
87 """Keep track of IDs across different collections."""
90 """Initiate the ID manager."""
91 self.collections: list[dict[str, Any]] = []
94 """Add a collection to check for ID usage."""
95 self.collections.append(collection)
97 def has_id(self, item_id: str) -> bool:
98 """Test if the ID exists."""
99 return any(item_id
in collection
for collection
in self.collections)
102 """Generate an ID."""
107 while self.
has_idhas_id(proposal):
109 proposal = f
"{base}_{attempt}"
115 """Mixin class for entities managed by an ObservableCollection."""
120 """Create instance from storage."""
124 def from_yaml(cls, config: ConfigType) -> CollectionEntity:
125 """Create instance from yaml config."""
129 """Handle updated configuration."""
133 """Base collection type that can be observed."""
135 def __init__(self, id_manager: IDManager |
None) ->
None:
136 """Initialize the base collection."""
138 self.data: dict[str, _ItemT] = {}
139 self.listeners: list[ChangeListener] = []
140 self.change_set_listeners: list[ChangeSetListener] = []
142 self.
id_managerid_manager.add_collection(self.data)
146 """Return list of items in collection."""
147 return list(self.data.values())
153 Will be called with (change_type, item_id, updated_config).
155 self.listeners.append(listener)
156 return partial(self.listeners.remove, listener)
160 self, listener: ChangeSetListener
161 ) -> Callable[[],
None]:
162 """Add a listener for a full change set.
164 Will be called with [(change_type, item_id, updated_config), ...]
166 self.change_set_listeners.append(listener)
167 return partial(self.change_set_listeners.remove, listener)
170 """Notify listeners of a change."""
171 await asyncio.gather(
173 listener(change.change_type, change.item_id, change.item)
174 for listener
in self.listeners
175 for change
in change_set
178 change_set_listener(change_set)
179 for change_set_listener
in self.change_set_listeners
185 """Offer a collection based on static data."""
189 logger: logging.Logger,
190 id_manager: IDManager |
None =
None,
192 """Initialize the storage collection."""
198 entity_class: type[CollectionEntity], config: ConfigType
199 ) -> CollectionEntity:
200 """Create a CollectionEntity instance."""
201 return entity_class.from_yaml(config)
204 """Load the YAML collection. Overrides existing data."""
205 old_ids = set(self.data)
210 item_id = item[CONF_ID]
212 if item_id
in old_ids:
213 old_ids.remove(item_id)
214 event = CHANGE_UPDATED
215 elif self.
id_managerid_manager.has_id(item_id):
216 self.
loggerlogger.warning(
"Duplicate ID '%s' detected, skipping", item_id)
221 self.data[item_id] = item
226 for item_id
in old_ids
234 """Serialized storage collection."""
236 items: list[dict[str, Any]]
240 ObservableCollection[_ItemT]
242 """Offer a CRUD interface on top of JSON storage."""
246 store: Store[_StoreT],
247 id_manager: IDManager |
None =
None,
249 """Initialize the storage collection."""
255 entity_class: type[CollectionEntity], config: ConfigType
256 ) -> CollectionEntity:
257 """Create a CollectionEntity instance."""
258 return entity_class.from_storage(config)
261 def hass(self) -> HomeAssistant:
262 """Home Assistant object."""
263 return self.store.hass
270 """Load the storage Manager."""
271 if not (raw_storage := await self._async_load_data()):
274 for item
in raw_storage[
"items"]:
275 self.data[item[CONF_ID]] = self._deserialize_item(item)
277 await self.notify_changes(
280 CHANGE_ADDED, item[CONF_ID], item, self._hash_item(item)
282 for item
in raw_storage[
"items"]
288 """Validate the config is valid."""
293 """Suggest an ID based on the config."""
296 async
def _update_data(self, item: _ItemT, update_data: dict) -> _ItemT:
297 """Return a new updated item."""
301 """Create an item from validated config."""
305 """Create an item from its serialized representation."""
309 """Return the serialized representation of an item for storing.
311 The serialized representation must include the item_id in the "id" key.
315 """Create a new item."""
316 validated_data = await self._process_create_data(data)
317 item_id = self.id_manager.generate_id(self._get_suggested_id(validated_data))
318 item = self._create_item(item_id, validated_data)
319 self.data[item_id] = item
320 self._async_schedule_save()
321 await self.notify_changes(
327 self._hash_item(self._serialize_item(item_id, item)),
335 if item_id
not in self.data:
338 if CONF_ID
in updates:
339 raise ValueError(
"Cannot update ID")
341 current = self.data[item_id]
343 updated = await self._update_data(current, updates)
345 self.data[item_id] = updated
346 self._async_schedule_save()
348 await self.notify_changes(
354 self._hash_item(self._serialize_item(item_id, updated)),
359 return self.data[item_id]
363 if item_id
not in self.data:
366 item = self.data.pop(item_id)
367 self._async_schedule_save()
369 await self.notify_changes([
CollectionChange(CHANGE_REMOVED, item_id, item)])
373 """Schedule saving the collection."""
378 """Return JSON-compatible data for storing to file."""
381 self._serialize_item(item_id, item)
382 for item_id, item
in self.data.items()
389 """Return JSON-compatible date for storing to file."""
392 """Return a hash of the item."""
397 """A specialized StorageCollection where the items are untyped dicts."""
400 """Create an item from its validated, serialized representation."""
401 return {CONF_ID: item_id} | data
404 """Create an item from its validated, serialized representation."""
408 """Return the serialized representation of an item for storing."""
413 """Return JSON-compatible date for storing to file."""
414 return self._base_data_to_save()
418 """A collection without IDs."""
423 """Load the collection. Overrides existing data."""
427 for item_id, item
in list(self.data.items())
435 item_id = f
"fakeid-{self.counter}"
437 self.data[item_id] = item
442 for item_id, item
in self.data.items()
447 _GROUP_BY_KEY = attrgetter(
"change_type")
450 @dataclass(slots=True, frozen=True)
452 """Life cycle for a collection of entities."""
456 entity_component: EntityComponent[_EntityT]
457 collection: StorageCollection | YamlCollection
458 entity_class: type[CollectionEntity]
460 entities: dict[str, CollectionEntity]
464 """Set up the collection life cycle."""
468 """Remove entity from entities if it's removed or not added."""
469 self.entities.pop(item_id,
None)
472 def _add_entity(self, change_set: CollectionChange) -> CollectionEntity:
473 item_id = change_set.item_id
474 entity = self.collection.
create_entity(self.entity_class, change_set.item)
475 self.entities[item_id] = entity
476 entity.async_on_remove(partial(self.
_entity_removed_entity_removed, item_id))
480 item_id = change_set.item_id
481 ent_reg = self.ent_reg
482 entities = self.entities
483 ent_to_remove = ent_reg.async_get_entity_id(self.domain, self.platform, item_id)
484 if ent_to_remove
is not None:
485 ent_reg.async_remove(ent_to_remove)
486 elif entity := entities.get(item_id):
487 await entity.async_remove(force_remove=
True)
490 entities.pop(item_id,
None)
493 if entity := self.entities.
get(change_set.item_id):
494 if change_set.item_hash:
495 self.ent_reg.async_update_entity_options(
496 entity.entity_id,
"collection", {
"hash": change_set.item_hash}
498 await entity.async_update_config(change_set.item)
501 """Handle a collection change."""
505 new_entities: list[CollectionEntity] = []
506 coros: list[Coroutine[Any, Any, CollectionEntity |
None]] = []
507 grouped: Iterable[CollectionChange]
508 for _, grouped
in groupby(change_set, _GROUP_BY_KEY):
509 for change
in grouped:
510 change_type = change.change_type
511 if change_type == CHANGE_ADDED:
512 new_entities.append(self.
_add_entity_add_entity(change))
513 elif change_type == CHANGE_REMOVED:
515 elif change_type == CHANGE_UPDATED:
519 await asyncio.gather(*coros)
530 entity_component: EntityComponent[_EntityT],
531 collection: StorageCollection | YamlCollection,
532 entity_class: type[CollectionEntity],
534 """Map a collection to an entity component."""
535 ent_reg = entity_registry.async_get(hass)
537 domain, platform, entity_component, collection, entity_class, ent_reg, {}
542 """Class to expose storage collection management over websocket."""
546 storage_collection: _StorageCollectionT,
549 create_schema: VolDictType,
550 update_schema: VolDictType,
552 """Initialize a websocket CRUD."""
553 self.storage_collection = storage_collection
554 self.api_prefix = api_prefix
555 self.model_name = model_name
556 self.create_schema = create_schema
557 self.update_schema = update_schema
559 self._remove_subscription: CALLBACK_TYPE |
None =
None
560 self._subscribers: set[tuple[websocket_api.ActiveConnection, int]] = set()
562 assert self.api_prefix[-1] !=
"/",
"API prefix should not end in /"
566 """Return item ID key."""
567 return f
"{self.model_name}_id"
571 """Set up the websocket commands."""
572 websocket_api.async_register_command(
574 f
"{self.api_prefix}/list",
576 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
577 {vol.Required(
"type"): f
"{self.api_prefix}/list"}
581 websocket_api.async_register_command(
583 f
"{self.api_prefix}/create",
584 websocket_api.require_admin(
585 websocket_api.async_response(self.ws_create_item)
587 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
589 **self.create_schema,
590 vol.Required(
"type"): f
"{self.api_prefix}/create",
595 websocket_api.async_register_command(
597 f
"{self.api_prefix}/subscribe",
599 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
600 {vol.Required(
"type"): f
"{self.api_prefix}/subscribe"}
604 websocket_api.async_register_command(
606 f
"{self.api_prefix}/update",
607 websocket_api.require_admin(
608 websocket_api.async_response(self.ws_update_item)
610 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
612 **self.update_schema,
613 vol.Required(
"type"): f
"{self.api_prefix}/update",
614 vol.Required(self.item_id_key): str,
619 websocket_api.async_register_command(
621 f
"{self.api_prefix}/delete",
622 websocket_api.require_admin(
623 websocket_api.async_response(self.ws_delete_item)
625 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
627 vol.Required(
"type"): f
"{self.api_prefix}/delete",
628 vol.Required(self.item_id_key): str,
635 self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
638 connection.send_result(msg[
"id"], self.storage_collection.async_items())
641 self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
643 """Create an item."""
649 connection.send_result(msg[
"id"], item)
650 except vol.Invalid
as err:
651 connection.send_error(
653 websocket_api.ERR_INVALID_FORMAT,
656 except ValueError
as err:
657 connection.send_error(msg[
"id"], websocket_api.ERR_INVALID_FORMAT,
str(err))
661 self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
663 """Subscribe to collection updates."""
665 async
def async_change_listener(
666 change_set: Iterable[CollectionChange],
670 "change_type": change.change_type,
671 self.item_id_key: change.item_id,
674 for change
in change_set
676 for conn, msg_id
in self._subscribers:
677 conn.send_message(websocket_api.event_message(msg_id, json_msg))
679 if not self._subscribers:
680 self._remove_subscription = (
681 self.storage_collection.async_add_change_set_listener(
682 async_change_listener
686 self._subscribers.
add((connection, msg[
"id"]))
689 def cancel_subscription() -> None:
690 self._subscribers.
remove((connection, msg[
"id"]))
691 if not self._subscribers
and self._remove_subscription:
692 self._remove_subscription()
693 self._remove_subscription =
None
695 connection.subscriptions[msg[
"id"]] = cancel_subscription
697 connection.send_message(websocket_api.result_message(msg[
"id"]))
701 "change_type": CHANGE_ADDED,
702 self.item_id_key: item_id,
705 for item_id, item
in self.storage_collection.data.items()
707 connection.send_message(websocket_api.event_message(msg[
"id"], json_msg))
710 self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
712 """Update an item."""
714 msg_id = data.pop(
"id")
715 item_id = data.pop(self.item_id_key)
720 connection.send_result(msg_id, item)
722 connection.send_error(
724 websocket_api.ERR_NOT_FOUND,
725 f
"Unable to find {self.item_id_key} {item_id}",
727 except vol.Invalid
as err:
728 connection.send_error(
730 websocket_api.ERR_INVALID_FORMAT,
733 except ValueError
as err:
734 connection.send_error(msg_id, websocket_api.ERR_INVALID_FORMAT,
str(err))
737 self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
739 """Delete an item."""
743 connection.send_error(
745 websocket_api.ERR_NOT_FOUND,
746 f
"Unable to find {self.item_id_key} {msg[self.item_id_key]}",
749 connection.send_result(msg[
"id"])
753 """Class to expose storage collection management over websocket."""
None async_update_config(self, ConfigType config)
CollectionEntity from_yaml(cls, ConfigType config)
CollectionEntity from_storage(cls, ConfigType config)
dict _deserialize_item(self, dict data)
dict _create_item(self, str item_id, dict data)
SerializedStorageCollection _data_to_save(self)
dict _serialize_item(self, str item_id, dict item)
None async_load(self, list[dict] data)
str generate_id(self, str suggestion)
bool has_id(self, str item_id)
None add_collection(self, dict[str, Any] collection)
None __init__(self, str item_id)
list[_ItemT] async_items(self)
Callable[[], None] async_add_listener(self, ChangeListener listener)
None notify_changes(self, Iterable[CollectionChange] change_set)
None __init__(self, IDManager|None id_manager)
Callable[[], None] async_add_change_set_listener(self, ChangeSetListener listener)
CollectionEntity create_entity(type[CollectionEntity] entity_class, ConfigType config)
None __init__(self, logging.Logger logger, IDManager|None id_manager=None)
None async_load(self, list[dict] data)
None _collection_changed(self, Iterable[CollectionChange] change_set)
None _remove_entity(self, CollectionChange change_set)
None _update_entity(self, CollectionChange change_set)
CollectionEntity _add_entity(self, CollectionChange change_set)
None _entity_removed(self, str item_id)
bool add(self, _T matcher)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
str humanize_error(HomeAssistant hass, vol.Invalid validation_error, str domain, dict config, str|None link, int max_sub_error_length=MAX_VALIDATION_ERROR_ITEM_LENGTH)
SerializedStorageCollection _base_data_to_save(self)
str _hash_item(self, dict item)
None _ws_subscribe(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
_ItemT _deserialize_item(self, dict data)
None ws_create_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
str _get_suggested_id(self, dict info)
None ws_update_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
_ItemT async_create_item(self, dict data)
None __init__(self, Store[_StoreT] store, IDManager|None id_manager=None)
_StoreT _data_to_save(self)
dict _process_create_data(self, dict data)
None ws_delete_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
_ItemT _create_item(self, str item_id, dict data)
_StoreT|None _async_load_data(self)
_ItemT _update_data(self, _ItemT item, dict update_data)
None async_setup(self, HomeAssistant hass)
None _async_schedule_save(self)
CollectionEntity create_entity(type[CollectionEntity] entity_class, ConfigType config)
None ws_list_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None sync_entity_lifecycle(HomeAssistant hass, str domain, str platform, EntityComponent[_EntityT] entity_component, StorageCollection|YamlCollection collection, type[CollectionEntity] entity_class)
_ItemT async_update_item(self, str item_id, dict updates)
dict _serialize_item(self, str item_id, _ItemT item)
None async_delete_item(self, str item_id)
None async_delay_save(self, Callable[[], _T] data_func, float delay=0)