Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The Tag integration."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 import logging
7 from typing import TYPE_CHECKING, Any, final
8 import uuid
9 
10 import voluptuous as vol
11 
12 from homeassistant.components import websocket_api
13 from homeassistant.const import CONF_ID, CONF_NAME
14 from homeassistant.core import Context, HomeAssistant, callback
15 from homeassistant.exceptions import HomeAssistantError
16 from homeassistant.helpers import collection, entity_registry as er
18 from homeassistant.helpers.entity import Entity
19 from homeassistant.helpers.entity_component import EntityComponent
20 from homeassistant.helpers.storage import Store
21 from homeassistant.helpers.typing import ConfigType, VolDictType
22 from homeassistant.util import slugify
23 import homeassistant.util.dt as dt_util
24 from homeassistant.util.hass_dict import HassKey
25 
26 from .const import DEFAULT_NAME, DEVICE_ID, DOMAIN, EVENT_TAG_SCANNED, LOGGER, TAG_ID
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 LAST_SCANNED = "last_scanned"
31 LAST_SCANNED_BY_DEVICE_ID = "last_scanned_by_device_id"
32 STORAGE_KEY = DOMAIN
33 STORAGE_VERSION = 1
34 STORAGE_VERSION_MINOR = 3
35 
36 TAG_DATA: HassKey[TagStorageCollection] = HassKey(DOMAIN)
37 
38 CREATE_FIELDS: VolDictType = {
39  vol.Optional(TAG_ID): cv.string,
40  vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
41  vol.Optional("description"): cv.string,
42  vol.Optional(LAST_SCANNED): cv.datetime,
43  vol.Optional(DEVICE_ID): cv.string,
44 }
45 
46 UPDATE_FIELDS: VolDictType = {
47  vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
48  vol.Optional("description"): cv.string,
49  vol.Optional(LAST_SCANNED): cv.datetime,
50  vol.Optional(DEVICE_ID): cv.string,
51 }
52 
53 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
54 
55 
57  """Raised when an item is not found."""
58 
59  def __init__(self, item_id: str) -> None:
60  """Initialize tag ID exists error."""
61  super().__init__(f"Tag with ID {item_id} already exists.")
62  self.item_iditem_id = item_id
63 
64 
65 class TagIDManager(collection.IDManager):
66  """ID manager for tags."""
67 
68  def generate_id(self, suggestion: str) -> str:
69  """Generate an ID."""
70  if self.has_id(suggestion):
71  raise TagIDExistsError(suggestion)
72 
73  return suggestion
74 
75 
77  entity_registry: er.EntityRegistry, tag_id: str, name: str | None
78 ) -> er.RegistryEntry:
79  """Create an entity registry entry for a tag."""
80  entry = entity_registry.async_get_or_create(
81  DOMAIN,
82  DOMAIN,
83  tag_id,
84  original_name=f"{DEFAULT_NAME} {tag_id}",
85  suggested_object_id=slugify(name) if name else tag_id,
86  )
87  if name:
88  return entity_registry.async_update_entity(entry.entity_id, name=name)
89  return entry
90 
91 
92 class TagStore(Store[collection.SerializedStorageCollection]):
93  """Store tag data."""
94 
96  self,
97  old_major_version: int,
98  old_minor_version: int,
99  old_data: dict[str, list[dict[str, Any]]],
100  ) -> dict:
101  """Migrate to the new version."""
102  data = old_data
103  if old_major_version == 1 and old_minor_version < 2:
104  entity_registry = er.async_get(self.hass)
105  # Version 1.2 moves name to entity registry
106  for tag in data["items"]:
107  # Copy name in tag store to the entity registry
108  _create_entry(entity_registry, tag[CONF_ID], tag.get(CONF_NAME))
109  tag["migrated"] = True
110  if old_major_version == 1 and old_minor_version < 3:
111  # Version 1.3 removes tag_id from the store
112  for tag in data["items"]:
113  if TAG_ID not in tag:
114  continue
115  del tag[TAG_ID]
116 
117  if old_major_version > 1:
118  raise NotImplementedError
119 
120  return data
121 
122 
123 class TagStorageCollection(collection.DictStorageCollection):
124  """Tag collection stored in storage."""
125 
126  CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
127  UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
128 
129  def __init__(
130  self,
131  store: TagStore,
132  id_manager: collection.IDManager | None = None,
133  ) -> None:
134  """Initialize the storage collection."""
135  super().__init__(store, id_manager)
136  self.entity_registryentity_registry = er.async_get(self.hass)
137 
138  async def _process_create_data(self, data: dict) -> dict:
139  """Validate the config is valid."""
140  data = self.CREATE_SCHEMACREATE_SCHEMA(data)
141  if not data[TAG_ID]:
142  data[TAG_ID] = str(uuid.uuid4())
143  # Move tag id to id
144  data[CONF_ID] = data.pop(TAG_ID)
145  # make last_scanned JSON serializeable
146  if LAST_SCANNED in data:
147  data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
148 
149  # Create entity in entity_registry when creating the tag
150  # This is done early to store name only once in entity registry
151  _create_entry(self.entity_registryentity_registry, data[CONF_ID], data.get(CONF_NAME))
152  return data
153 
154  @callback
155  def _get_suggested_id(self, info: dict[str, str]) -> str:
156  """Suggest an ID based on the config."""
157  return info[CONF_ID]
158 
159  async def _update_data(self, item: dict, update_data: dict) -> dict:
160  """Return a new updated data object."""
161  data = {**item, **self.UPDATE_SCHEMAUPDATE_SCHEMA(update_data)}
162  tag_id = item[CONF_ID]
163  # make last_scanned JSON serializeable
164  if LAST_SCANNED in update_data:
165  data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
166  if name := data.get(CONF_NAME):
167  if entity_id := self.entity_registryentity_registry.async_get_entity_id(
168  DOMAIN, DOMAIN, tag_id
169  ):
170  self.entity_registryentity_registry.async_update_entity(entity_id, name=name)
171  else:
172  raise collection.ItemNotFound(tag_id)
173 
174  return data
175 
176  def _serialize_item(self, item_id: str, item: dict) -> dict:
177  """Return the serialized representation of an item for storing.
178 
179  We don't store the name, it's stored in the entity registry.
180  """
181  # Preserve the name of migrated entries to allow downgrading to 2024.5
182  # without losing tag names. This can be removed in HA Core 2025.1.
183  migrated = item_id in self.data and "migrated" in self.data[item_id]
184  return {k: v for k, v in item.items() if k != CONF_NAME or migrated}
185 
186 
188  collection.StorageCollectionWebsocket[TagStorageCollection]
189 ):
190  """Class to expose tag storage collection management over websocket."""
191 
192  def __init__(
193  self,
194  storage_collection: TagStorageCollection,
195  api_prefix: str,
196  model_name: str,
197  create_schema: VolDictType,
198  update_schema: VolDictType,
199  ) -> None:
200  """Initialize a websocket for tag."""
201  super().__init__(
202  storage_collection, api_prefix, model_name, create_schema, update_schema
203  )
204  self.entity_registryentity_registryentity_registry = er.async_get(storage_collection.hass)
205 
206  @callback
208  self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
209  ) -> None:
210  """List items specifically for tag.
211 
212  Provides name from entity_registry instead of storage collection.
213  """
214  tag_items = []
215  for item in self.storage_collection.async_items():
216  # Make a copy to avoid adding name to the stored entry
217  item = {k: v for k, v in item.items() if k != "migrated"}
218  if (
219  entity_id := self.entity_registryentity_registryentity_registry.async_get_entity_id(
220  DOMAIN, DOMAIN, item[CONF_ID]
221  )
222  ) and (entity := self.entity_registryentity_registryentity_registry.async_get(entity_id)):
223  item[CONF_NAME] = entity.name or entity.original_name
224  tag_items.append(item)
225  if _LOGGER.isEnabledFor(logging.DEBUG):
226  _LOGGER.debug("Listing tags %s", tag_items)
227  connection.send_result(msg["id"], tag_items)
228 
229 
230 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
231  """Set up the Tag component."""
232  component = EntityComponent[TagEntity](LOGGER, DOMAIN, hass)
233  id_manager = TagIDManager()
234  hass.data[TAG_DATA] = storage_collection = TagStorageCollection(
235  TagStore(
236  hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
237  ),
238  id_manager,
239  )
240  await storage_collection.async_load()
242  storage_collection, DOMAIN, DOMAIN, CREATE_FIELDS, UPDATE_FIELDS
243  ).async_setup(hass)
244 
245  entity_registry = er.async_get(hass)
246  entity_update_handlers: dict[str, Callable[[str | None, str | None], None]] = {}
247 
248  async def tag_change_listener(
249  change_type: str, item_id: str, updated_config: dict
250  ) -> None:
251  """Tag storage change listener."""
252 
253  if _LOGGER.isEnabledFor(logging.DEBUG):
254  _LOGGER.debug(
255  "%s, item: %s, update: %s", change_type, item_id, updated_config
256  )
257  if change_type == collection.CHANGE_ADDED:
258  # When tags are added to storage
259  entity = _create_entry(entity_registry, updated_config[CONF_ID], None)
260  if TYPE_CHECKING:
261  assert entity.original_name
262  await component.async_add_entities(
263  [
264  TagEntity(
265  entity_update_handlers,
266  entity.name or entity.original_name,
267  updated_config[CONF_ID],
268  updated_config.get(LAST_SCANNED),
269  updated_config.get(DEVICE_ID),
270  )
271  ]
272  )
273 
274  elif change_type == collection.CHANGE_UPDATED:
275  # When tags are changed or updated in storage
276  if handler := entity_update_handlers.get(updated_config[CONF_ID]):
277  handler(
278  updated_config.get(DEVICE_ID),
279  updated_config.get(LAST_SCANNED),
280  )
281 
282  # Deleted tags
283  elif change_type == collection.CHANGE_REMOVED:
284  # When tags are removed from storage
285  entity_id = entity_registry.async_get_entity_id(
286  DOMAIN, DOMAIN, updated_config[CONF_ID]
287  )
288  if entity_id:
289  entity_registry.async_remove(entity_id)
290 
291  storage_collection.async_add_listener(tag_change_listener)
292 
293  entities: list[TagEntity] = []
294  for tag in storage_collection.async_items():
295  if _LOGGER.isEnabledFor(logging.DEBUG):
296  _LOGGER.debug("Adding tag: %s", tag)
297  entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag[CONF_ID])
298  if entity_id := entity_registry.async_get_entity_id(
299  DOMAIN, DOMAIN, tag[CONF_ID]
300  ):
301  entity = entity_registry.async_get(entity_id)
302  else:
303  entity = _create_entry(entity_registry, tag[CONF_ID], None)
304  if TYPE_CHECKING:
305  assert entity
306  assert entity.original_name
307  name = entity.name or entity.original_name
308  entities.append(
309  TagEntity(
310  entity_update_handlers,
311  name,
312  tag[CONF_ID],
313  tag.get(LAST_SCANNED),
314  tag.get(DEVICE_ID),
315  )
316  )
317  await component.async_add_entities(entities)
318 
319  return True
320 
321 
322 async def async_scan_tag(
323  hass: HomeAssistant,
324  tag_id: str,
325  device_id: str | None,
326  context: Context | None = None,
327 ) -> None:
328  """Handle when a tag is scanned."""
329  if DOMAIN not in hass.config.components:
330  raise HomeAssistantError("tag component has not been set up.")
331 
332  storage_collection = hass.data[TAG_DATA]
333  entity_registry = er.async_get(hass)
334  entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag_id)
335 
336  # Get name from entity registry, default value None if not present
337  tag_name = None
338  if entity_id and (entity := entity_registry.async_get(entity_id)):
339  tag_name = entity.name or entity.original_name
340 
341  hass.bus.async_fire(
342  EVENT_TAG_SCANNED,
343  {TAG_ID: tag_id, CONF_NAME: tag_name, DEVICE_ID: device_id},
344  context=context,
345  )
346 
347  extra_kwargs = {}
348  if device_id:
349  extra_kwargs[DEVICE_ID] = device_id
350  if tag_id in storage_collection.data:
351  if _LOGGER.isEnabledFor(logging.DEBUG):
352  _LOGGER.debug("Updating tag %s with extra %s", tag_id, extra_kwargs)
353  await storage_collection.async_update_item(
354  tag_id, {LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
355  )
356  else:
357  if _LOGGER.isEnabledFor(logging.DEBUG):
358  _LOGGER.debug("Creating tag %s with extra %s", tag_id, extra_kwargs)
359  await storage_collection.async_create_item(
360  {TAG_ID: tag_id, LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
361  )
362  _LOGGER.debug("Tag: %s scanned by device: %s", tag_id, device_id)
363 
364 
366  """Representation of a Tag entity."""
367 
368  _unrecorded_attributes = frozenset({TAG_ID})
369  _attr_should_poll = False
370 
371  def __init__(
372  self,
373  entity_update_handlers: dict[str, Callable[[str | None, str | None], None]],
374  name: str,
375  tag_id: str,
376  last_scanned: str | None,
377  device_id: str | None,
378  ) -> None:
379  """Initialize the Tag event."""
380  self._entity_update_handlers_entity_update_handlers = entity_update_handlers
381  self._attr_name_attr_name = name
382  self._tag_id_tag_id = tag_id
383  self._attr_unique_id_attr_unique_id = tag_id
384  self._last_device_id_last_device_id: str | None = device_id
385  self._last_scanned_last_scanned = last_scanned
386 
387  @callback
389  self, device_id: str | None, last_scanned: str | None
390  ) -> None:
391  """Handle the Tag scan event."""
392  if _LOGGER.isEnabledFor(logging.DEBUG):
393  _LOGGER.debug(
394  "Tag %s scanned by device %s at %s, last scanned at %s",
395  self._tag_id_tag_id,
396  device_id,
397  last_scanned,
398  self._last_scanned_last_scanned,
399  )
400  self._last_device_id_last_device_id = device_id
401  self._last_scanned_last_scanned = last_scanned
402  self.async_write_ha_stateasync_write_ha_state()
403 
404  @property
405  @final
406  def state(self) -> str | None:
407  """Return the entity state."""
408  if (
409  not self._last_scanned_last_scanned
410  or (last_scanned := dt_util.parse_datetime(self._last_scanned_last_scanned)) is None
411  ):
412  return None
413  return last_scanned.isoformat(timespec="milliseconds")
414 
415  @property
416  def extra_state_attributes(self) -> dict[str, Any]:
417  """Return the state attributes of the sun."""
418  return {TAG_ID: self._tag_id_tag_id, LAST_SCANNED_BY_DEVICE_ID: self._last_device_id_last_device_id}
419 
420  async def async_added_to_hass(self) -> None:
421  """Handle entity which will be added."""
422  await super().async_added_to_hass()
423  self._entity_update_handlers_entity_update_handlers[self._tag_id_tag_id] = self.async_handle_eventasync_handle_event
424 
425  async def async_will_remove_from_hass(self) -> None:
426  """Handle entity being removed."""
427  await super().async_will_remove_from_hass()
428  del self._entity_update_handlers_entity_update_handlers[self._tag_id_tag_id]
None ws_list_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: __init__.py:209
None __init__(self, TagStorageCollection storage_collection, str api_prefix, str model_name, VolDictType create_schema, VolDictType update_schema)
Definition: __init__.py:199
dict[str, Any] extra_state_attributes(self)
Definition: __init__.py:416
None async_handle_event(self, str|None device_id, str|None last_scanned)
Definition: __init__.py:390
None __init__(self, dict[str, Callable[[str|None, str|None], None]] entity_update_handlers, str name, str tag_id, str|None last_scanned, str|None device_id)
Definition: __init__.py:378
str generate_id(self, str suggestion)
Definition: __init__.py:68
None __init__(self, TagStore store, collection.IDManager|None id_manager=None)
Definition: __init__.py:133
dict _update_data(self, dict item, dict update_data)
Definition: __init__.py:159
str _get_suggested_id(self, dict[str, str] info)
Definition: __init__.py:155
dict _serialize_item(self, str item_id, dict item)
Definition: __init__.py:176
dict _async_migrate_func(self, int old_major_version, int old_minor_version, dict[str, list[dict[str, Any]]] old_data)
Definition: __init__.py:100
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:230
er.RegistryEntry _create_entry(er.EntityRegistry entity_registry, str tag_id, str|None name)
Definition: __init__.py:78
None async_scan_tag(HomeAssistant hass, str tag_id, str|None device_id, Context|None context=None)
Definition: __init__.py:327
AreaRegistry async_get(HomeAssistant hass)
None async_update_entity(HomeAssistant hass, str entity_id)
str slugify(str|None text, *str separator="_")
Definition: __init__.py:41