Home Assistant Unofficial Reference 2024.12.1
area_registry.py
Go to the documentation of this file.
1 """Provide a way to connect devices to one physical location."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Iterable
7 import dataclasses
8 from dataclasses import dataclass, field
9 from datetime import datetime
10 from typing import TYPE_CHECKING, Any, Literal, TypedDict
11 
12 from homeassistant.core import HomeAssistant, callback
13 from homeassistant.util.dt import utc_from_timestamp, utcnow
14 from homeassistant.util.event_type import EventType
15 from homeassistant.util.hass_dict import HassKey
16 
17 from . import device_registry as dr, entity_registry as er
18 from .json import json_bytes, json_fragment
19 from .normalized_name_base_registry import (
20  NormalizedNameBaseRegistryEntry,
21  NormalizedNameBaseRegistryItems,
22 )
23 from .registry import BaseRegistry, RegistryIndexType
24 from .singleton import singleton
25 from .storage import Store
26 from .typing import UNDEFINED, UndefinedType
27 
28 if TYPE_CHECKING:
29  # mypy cannot workout _cache Protocol with dataclasses
30  from propcache import cached_property as under_cached_property
31 else:
32  from propcache import under_cached_property
33 
34 
35 DATA_REGISTRY: HassKey[AreaRegistry] = HassKey("area_registry")
36 EVENT_AREA_REGISTRY_UPDATED: EventType[EventAreaRegistryUpdatedData] = EventType(
37  "area_registry_updated"
38 )
39 STORAGE_KEY = "core.area_registry"
40 STORAGE_VERSION_MAJOR = 1
41 STORAGE_VERSION_MINOR = 7
42 
43 
44 class _AreaStoreData(TypedDict):
45  """Data type for individual area. Used in AreasRegistryStoreData."""
46 
47  aliases: list[str]
48  floor_id: str | None
49  icon: str | None
50  id: str
51  labels: list[str]
52  name: str
53  picture: str | None
54  created_at: str
55  modified_at: str
56 
57 
58 class AreasRegistryStoreData(TypedDict):
59  """Store data type for AreaRegistry."""
60 
61  areas: list[_AreaStoreData]
62 
63 
65  """EventAreaRegistryUpdated data."""
66 
67  action: Literal["create", "remove", "update"]
68  area_id: str
69 
70 
71 @dataclass(frozen=True, kw_only=True, slots=True)
73  """Area Registry Entry."""
74 
75  aliases: set[str]
76  floor_id: str | None
77  icon: str | None
78  id: str
79  labels: set[str] = field(default_factory=set)
80  picture: str | None
81  _cache: dict[str, Any] = field(default_factory=dict, compare=False, init=False)
82 
83  @under_cached_property
84  def json_fragment(self) -> json_fragment:
85  """Return a JSON representation of this AreaEntry."""
86  return json_fragment(
87  json_bytes(
88  {
89  "aliases": list(self.aliases),
90  "area_id": self.id,
91  "floor_id": self.floor_id,
92  "icon": self.icon,
93  "labels": list(self.labels),
94  "name": self.name,
95  "picture": self.picture,
96  "created_at": self.created_at.timestamp(),
97  "modified_at": self.modified_at.timestamp(),
98  }
99  )
100  )
101 
102 
103 class AreaRegistryStore(Store[AreasRegistryStoreData]):
104  """Store area registry data."""
105 
107  self,
108  old_major_version: int,
109  old_minor_version: int,
110  old_data: dict[str, list[dict[str, Any]]],
111  ) -> AreasRegistryStoreData:
112  """Migrate to the new version."""
113  if old_major_version < 2:
114  if old_minor_version < 2:
115  # Version 1.2 implements migration and freezes the available keys
116  for area in old_data["areas"]:
117  # Populate keys which were introduced before version 1.2
118  area.setdefault("picture", None)
119 
120  if old_minor_version < 3:
121  # Version 1.3 adds aliases
122  for area in old_data["areas"]:
123  area["aliases"] = []
124 
125  if old_minor_version < 4:
126  # Version 1.4 adds icon
127  for area in old_data["areas"]:
128  area["icon"] = None
129 
130  if old_minor_version < 5:
131  # Version 1.5 adds floor_id
132  for area in old_data["areas"]:
133  area["floor_id"] = None
134 
135  if old_minor_version < 6:
136  # Version 1.6 adds labels
137  for area in old_data["areas"]:
138  area["labels"] = []
139 
140  if old_minor_version < 7:
141  # Version 1.7 adds created_at and modiefied_at
142  created_at = utc_from_timestamp(0).isoformat()
143  for area in old_data["areas"]:
144  area["created_at"] = area["modified_at"] = created_at
145 
146  if old_major_version > 1:
147  raise NotImplementedError
148  return old_data # type: ignore[return-value]
149 
150 
152  """Class to hold area registry items."""
153 
154  def __init__(self) -> None:
155  """Initialize the area registry items."""
156  super().__init__()
157  self._labels_index: RegistryIndexType = defaultdict(dict)
158  self._floors_index: RegistryIndexType = defaultdict(dict)
159 
160  def _index_entry(self, key: str, entry: AreaEntry) -> None:
161  """Index an entry."""
162  super()._index_entry(key, entry)
163  if entry.floor_id is not None:
164  self._floors_index[entry.floor_id][key] = True
165  for label in entry.labels:
166  self._labels_index[label][key] = True
167 
169  self, key: str, replacement_entry: AreaEntry | None = None
170  ) -> None:
171  # always call base class before other indices
172  super()._unindex_entry(key, replacement_entry)
173  entry = self.data[key]
174  if labels := entry.labels:
175  for label in labels:
176  self._unindex_entry_value(key, label, self._labels_index)
177  if floor_id := entry.floor_id:
178  self._unindex_entry_value(key, floor_id, self._floors_index)
179 
180  def get_areas_for_label(self, label: str) -> list[AreaEntry]:
181  """Get areas for label."""
182  data = self.data
183  return [data[key] for key in self._labels_index.get(label, ())]
184 
185  def get_areas_for_floor(self, floor: str) -> list[AreaEntry]:
186  """Get areas for floor."""
187  data = self.data
188  return [data[key] for key in self._floors_index.get(floor, ())]
189 
190 
191 class AreaRegistry(BaseRegistry[AreasRegistryStoreData]):
192  """Class to hold a registry of areas."""
193 
194  areas: AreaRegistryItems
195  _area_data: dict[str, AreaEntry]
196 
197  def __init__(self, hass: HomeAssistant) -> None:
198  """Initialize the area registry."""
199  self.hasshass = hass
200  self._store_store = AreaRegistryStore(
201  hass,
202  STORAGE_VERSION_MAJOR,
203  STORAGE_KEY,
204  atomic_writes=True,
205  minor_version=STORAGE_VERSION_MINOR,
206  )
207 
208  @callback
209  def async_get_area(self, area_id: str) -> AreaEntry | None:
210  """Get area by id.
211 
212  We retrieve the DeviceEntry from the underlying dict to avoid
213  the overhead of the UserDict __getitem__.
214  """
215  return self._area_data_area_data.get(area_id)
216 
217  @callback
218  def async_get_area_by_name(self, name: str) -> AreaEntry | None:
219  """Get area by name."""
220  return self.areasareas.get_by_name(name)
221 
222  @callback
223  def async_list_areas(self) -> Iterable[AreaEntry]:
224  """Get all areas."""
225  return self.areasareas.values()
226 
227  @callback
228  def async_get_or_create(self, name: str) -> AreaEntry:
229  """Get or create an area."""
230  if area := self.async_get_area_by_nameasync_get_area_by_name(name):
231  return area
232  return self.async_createasync_create(name)
233 
234  def _generate_id(self, name: str) -> str:
235  """Generate area ID."""
236  return self.areasareas.generate_id_from_name(name)
237 
238  @callback
240  self,
241  name: str,
242  *,
243  aliases: set[str] | None = None,
244  floor_id: str | None = None,
245  icon: str | None = None,
246  labels: set[str] | None = None,
247  picture: str | None = None,
248  ) -> AreaEntry:
249  """Create a new area."""
250  self.hasshass.verify_event_loop_thread("area_registry.async_create")
251 
252  if area := self.async_get_area_by_nameasync_get_area_by_name(name):
253  raise ValueError(
254  f"The name {name} ({area.normalized_name}) is already in use"
255  )
256 
257  area = AreaEntry(
258  aliases=aliases or set(),
259  floor_id=floor_id,
260  icon=icon,
261  id=self._generate_id_generate_id(name),
262  labels=labels or set(),
263  name=name,
264  picture=picture,
265  )
266  area_id = area.id
267  self.areasareas[area_id] = area
268  self.async_schedule_save()
269 
270  self.hasshass.bus.async_fire_internal(
271  EVENT_AREA_REGISTRY_UPDATED,
272  EventAreaRegistryUpdatedData(action="create", area_id=area_id),
273  )
274  return area
275 
276  @callback
277  def async_delete(self, area_id: str) -> None:
278  """Delete area."""
279  self.hasshass.verify_event_loop_thread("area_registry.async_delete")
280  device_registry = dr.async_get(self.hasshass)
281  entity_registry = er.async_get(self.hasshass)
282  device_registry.async_clear_area_id(area_id)
283  entity_registry.async_clear_area_id(area_id)
284 
285  del self.areasareas[area_id]
286 
287  self.hasshass.bus.async_fire_internal(
288  EVENT_AREA_REGISTRY_UPDATED,
289  EventAreaRegistryUpdatedData(action="remove", area_id=area_id),
290  )
291 
292  self.async_schedule_save()
293 
294  @callback
296  self,
297  area_id: str,
298  *,
299  aliases: set[str] | UndefinedType = UNDEFINED,
300  floor_id: str | None | UndefinedType = UNDEFINED,
301  icon: str | None | UndefinedType = UNDEFINED,
302  labels: set[str] | UndefinedType = UNDEFINED,
303  name: str | UndefinedType = UNDEFINED,
304  picture: str | None | UndefinedType = UNDEFINED,
305  ) -> AreaEntry:
306  """Update name of area."""
307  updated = self._async_update_async_update(
308  area_id,
309  aliases=aliases,
310  floor_id=floor_id,
311  icon=icon,
312  labels=labels,
313  name=name,
314  picture=picture,
315  )
316  # Since updated may be the old or the new and we always fire
317  # an event even if nothing has changed we cannot use async_fire_internal
318  # here because we do not know if the thread safety check already
319  # happened or not in _async_update.
320  self.hasshass.bus.async_fire(
321  EVENT_AREA_REGISTRY_UPDATED,
322  EventAreaRegistryUpdatedData(action="update", area_id=area_id),
323  )
324  return updated
325 
326  @callback
328  self,
329  area_id: str,
330  *,
331  aliases: set[str] | UndefinedType = UNDEFINED,
332  floor_id: str | None | UndefinedType = UNDEFINED,
333  icon: str | None | UndefinedType = UNDEFINED,
334  labels: set[str] | UndefinedType = UNDEFINED,
335  name: str | UndefinedType = UNDEFINED,
336  picture: str | None | UndefinedType = UNDEFINED,
337  ) -> AreaEntry:
338  """Update name of area."""
339  old = self.areasareas[area_id]
340 
341  new_values: dict[str, Any] = {
342  attr_name: value
343  for attr_name, value in (
344  ("aliases", aliases),
345  ("icon", icon),
346  ("labels", labels),
347  ("picture", picture),
348  ("floor_id", floor_id),
349  )
350  if value is not UNDEFINED and value != getattr(old, attr_name)
351  }
352 
353  if name is not UNDEFINED and name != old.name:
354  new_values["name"] = name
355 
356  if not new_values:
357  return old
358 
359  new_values["modified_at"] = utcnow()
360 
361  self.hasshass.verify_event_loop_thread("area_registry.async_update")
362  new = self.areasareas[area_id] = dataclasses.replace(old, **new_values)
363 
364  self.async_schedule_save()
365  return new
366 
367  async def async_load(self) -> None:
368  """Load the area registry."""
369  self._async_setup_cleanup_async_setup_cleanup()
370 
371  data = await self._store_store.async_load()
372 
373  areas = AreaRegistryItems()
374 
375  if data is not None:
376  for area in data["areas"]:
377  assert area["name"] is not None and area["id"] is not None
378  areas[area["id"]] = AreaEntry(
379  aliases=set(area["aliases"]),
380  floor_id=area["floor_id"],
381  icon=area["icon"],
382  id=area["id"],
383  labels=set(area["labels"]),
384  name=area["name"],
385  picture=area["picture"],
386  created_at=datetime.fromisoformat(area["created_at"]),
387  modified_at=datetime.fromisoformat(area["modified_at"]),
388  )
389 
390  self.areasareas = areas
391  self._area_data_area_data = areas.data
392 
393  @callback
394  def _data_to_save(self) -> AreasRegistryStoreData:
395  """Return data of area registry to store in a file."""
396  return {
397  "areas": [
398  {
399  "aliases": list(entry.aliases),
400  "floor_id": entry.floor_id,
401  "icon": entry.icon,
402  "id": entry.id,
403  "labels": list(entry.labels),
404  "name": entry.name,
405  "picture": entry.picture,
406  "created_at": entry.created_at.isoformat(),
407  "modified_at": entry.modified_at.isoformat(),
408  }
409  for entry in self.areasareas.values()
410  ]
411  }
412 
413  @callback
414  def _async_setup_cleanup(self) -> None:
415  """Set up the area registry cleanup."""
416  # pylint: disable-next=import-outside-toplevel
417  from . import ( # Circular dependencies
418  floor_registry as fr,
419  label_registry as lr,
420  )
421 
422  @callback
423  def _removed_from_registry_filter(
424  event_data: fr.EventFloorRegistryUpdatedData
425  | lr.EventLabelRegistryUpdatedData,
426  ) -> bool:
427  """Filter all except for the item removed from registry events."""
428  return event_data["action"] == "remove"
429 
430  @callback
431  def _handle_floor_registry_update(event: fr.EventFloorRegistryUpdated) -> None:
432  """Update areas that are associated with a floor that has been removed."""
433  floor_id = event.data["floor_id"]
434  for area in self.areasareas.get_areas_for_floor(floor_id):
435  self.async_updateasync_update(area.id, floor_id=None)
436 
437  self.hasshass.bus.async_listen(
438  event_type=fr.EVENT_FLOOR_REGISTRY_UPDATED,
439  event_filter=_removed_from_registry_filter,
440  listener=_handle_floor_registry_update,
441  )
442 
443  @callback
444  def _handle_label_registry_update(event: lr.EventLabelRegistryUpdated) -> None:
445  """Update areas that have a label that has been removed."""
446  label_id = event.data["label_id"]
447  for area in self.areasareas.get_areas_for_label(label_id):
448  self.async_updateasync_update(area.id, labels=area.labels - {label_id})
449 
450  self.hasshass.bus.async_listen(
451  event_type=lr.EVENT_LABEL_REGISTRY_UPDATED,
452  event_filter=_removed_from_registry_filter,
453  listener=_handle_label_registry_update,
454  )
455 
456 
457 @callback
458 @singleton(DATA_REGISTRY)
459 def async_get(hass: HomeAssistant) -> AreaRegistry:
460  """Get area registry."""
461  return AreaRegistry(hass)
462 
463 
464 async def async_load(hass: HomeAssistant) -> None:
465  """Load area registry."""
466  assert DATA_REGISTRY not in hass.data
467  await async_get(hass).async_load()
468 
469 
470 @callback
471 def async_entries_for_floor(registry: AreaRegistry, floor_id: str) -> list[AreaEntry]:
472  """Return entries that match a floor."""
473  return registry.areas.get_areas_for_floor(floor_id)
474 
475 
476 @callback
477 def async_entries_for_label(registry: AreaRegistry, label_id: str) -> list[AreaEntry]:
478  """Return entries that match a label."""
479  return registry.areas.get_areas_for_label(label_id)
list[AreaEntry] get_areas_for_floor(self, str floor)
None _unindex_entry(self, str key, AreaEntry|None replacement_entry=None)
list[AreaEntry] get_areas_for_label(self, str label)
None _index_entry(self, str key, AreaEntry entry)
AreasRegistryStoreData _async_migrate_func(self, int old_major_version, int old_minor_version, dict[str, list[dict[str, Any]]] old_data)
AreaEntry _async_update(self, str area_id, *set[str]|UndefinedType aliases=UNDEFINED, str|None|UndefinedType floor_id=UNDEFINED, str|None|UndefinedType icon=UNDEFINED, set[str]|UndefinedType labels=UNDEFINED, str|UndefinedType name=UNDEFINED, str|None|UndefinedType picture=UNDEFINED)
AreaEntry async_update(self, str area_id, *set[str]|UndefinedType aliases=UNDEFINED, str|None|UndefinedType floor_id=UNDEFINED, str|None|UndefinedType icon=UNDEFINED, set[str]|UndefinedType labels=UNDEFINED, str|UndefinedType name=UNDEFINED, str|None|UndefinedType picture=UNDEFINED)
AreaEntry|None async_get_area(self, str area_id)
AreaEntry|None async_get_area_by_name(self, str name)
AreaEntry async_create(self, str name, *set[str]|None aliases=None, str|None floor_id=None, str|None icon=None, set[str]|None labels=None, str|None picture=None)
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_load(HomeAssistant hass)
list[AreaEntry] async_entries_for_floor(AreaRegistry registry, str floor_id)
list[AreaEntry] async_entries_for_label(AreaRegistry registry, str label_id)
AreaRegistry async_get(HomeAssistant hass)