1 """Recorder entity registry helper."""
4 from typing
import TYPE_CHECKING
10 from .core
import Recorder
11 from .util
import filter_unique_constraint_integrity_error, get_instance, session_scope
13 _LOGGER = logging.getLogger(__name__)
18 """Set up the entity hooks."""
21 def _async_entity_id_changed(
22 event: Event[er.EventEntityRegistryUpdatedData],
26 assert event.data[
"action"] ==
"update" and "old_entity_id" in event.data
27 old_entity_id = event.data[
"old_entity_id"]
28 new_entity_id = event.data[
"entity_id"]
29 instance.async_update_statistics_metadata(
30 old_entity_id, new_statistic_id=new_entity_id
32 instance.async_update_states_metadata(
33 old_entity_id, new_entity_id=new_entity_id
37 def entity_registry_changed_filter(
38 event_data: er.EventEntityRegistryUpdatedData,
40 """Handle entity_id changed filter."""
41 return event_data[
"action"] ==
"update" and "old_entity_id" in event_data
44 def _setup_entity_registry_event_handler(hass: HomeAssistant) ->
None:
45 """Subscribe to event registry events."""
46 hass.bus.async_listen(
47 er.EVENT_ENTITY_REGISTRY_UPDATED,
48 _async_entity_id_changed,
49 event_filter=entity_registry_changed_filter,
60 """Update the states metadata table when an entity is renamed."""
61 states_meta_manager = instance.states_meta_manager
62 if not states_meta_manager.active:
64 "Cannot rename entity_id `%s` to `%s` "
65 "because the states meta manager is not yet active",
72 session=instance.get_session(),
75 if not states_meta_manager.update_metadata(session, entity_id, new_entity_id):
77 "Cannot migrate history for entity_id `%s` to `%s` "
78 "because the new entity_id is already in use",
None update_states_metadata(Recorder instance, str entity_id, str new_entity_id)
None async_setup(HomeAssistant hass)
Callable[[Exception], bool] filter_unique_constraint_integrity_error(Recorder instance, str row_type)
Recorder get_instance(HomeAssistant hass)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
CALLBACK_TYPE async_at_start(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)