1 """The Tag integration."""
3 from __future__
import annotations
5 from collections.abc
import Callable
7 from typing
import TYPE_CHECKING, Any, final
10 import voluptuous
as vol
26 from .const
import DEFAULT_NAME, DEVICE_ID, DOMAIN, EVENT_TAG_SCANNED, LOGGER, TAG_ID
28 _LOGGER = logging.getLogger(__name__)
30 LAST_SCANNED =
"last_scanned"
31 LAST_SCANNED_BY_DEVICE_ID =
"last_scanned_by_device_id"
34 STORAGE_VERSION_MINOR = 3
36 TAG_DATA: HassKey[TagStorageCollection] =
HassKey(DOMAIN)
38 CREATE_FIELDS: VolDictType = {
39 vol.Optional(TAG_ID): cv.string,
40 vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
41 vol.Optional(
"description"): cv.string,
42 vol.Optional(LAST_SCANNED): cv.datetime,
43 vol.Optional(DEVICE_ID): cv.string,
46 UPDATE_FIELDS: VolDictType = {
47 vol.Optional(CONF_NAME): vol.All(str, vol.Length(min=1)),
48 vol.Optional(
"description"): cv.string,
49 vol.Optional(LAST_SCANNED): cv.datetime,
50 vol.Optional(DEVICE_ID): cv.string,
53 CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN)
57 """Raised when an item is not found."""
60 """Initialize tag ID exists error."""
61 super().
__init__(f
"Tag with ID {item_id} already exists.")
66 """ID manager for tags."""
70 if self.has_id(suggestion):
77 entity_registry: er.EntityRegistry, tag_id: str, name: str |
None
78 ) -> er.RegistryEntry:
79 """Create an entity registry entry for a tag."""
80 entry = entity_registry.async_get_or_create(
84 original_name=f
"{DEFAULT_NAME} {tag_id}",
85 suggested_object_id=
slugify(name)
if name
else tag_id,
88 return entity_registry.async_update_entity(entry.entity_id, name=name)
97 old_major_version: int,
98 old_minor_version: int,
99 old_data: dict[str, list[dict[str, Any]]],
101 """Migrate to the new version."""
103 if old_major_version == 1
and old_minor_version < 2:
104 entity_registry = er.async_get(self.hass)
106 for tag
in data[
"items"]:
108 _create_entry(entity_registry, tag[CONF_ID], tag.get(CONF_NAME))
109 tag[
"migrated"] =
True
110 if old_major_version == 1
and old_minor_version < 3:
112 for tag
in data[
"items"]:
113 if TAG_ID
not in tag:
117 if old_major_version > 1:
118 raise NotImplementedError
124 """Tag collection stored in storage."""
126 CREATE_SCHEMA = vol.Schema(CREATE_FIELDS)
127 UPDATE_SCHEMA = vol.Schema(UPDATE_FIELDS)
132 id_manager: collection.IDManager |
None =
None,
134 """Initialize the storage collection."""
139 """Validate the config is valid."""
142 data[TAG_ID] =
str(uuid.uuid4())
144 data[CONF_ID] = data.pop(TAG_ID)
146 if LAST_SCANNED
in data:
147 data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
156 """Suggest an ID based on the config."""
160 """Return a new updated data object."""
161 data = {**item, **self.
UPDATE_SCHEMAUPDATE_SCHEMA(update_data)}
162 tag_id = item[CONF_ID]
164 if LAST_SCANNED
in update_data:
165 data[LAST_SCANNED] = data[LAST_SCANNED].isoformat()
166 if name := data.get(CONF_NAME):
167 if entity_id := self.
entity_registryentity_registry.async_get_entity_id(
168 DOMAIN, DOMAIN, tag_id
172 raise collection.ItemNotFound(tag_id)
177 """Return the serialized representation of an item for storing.
179 We don't store the name, it's stored in the entity registry.
183 migrated = item_id
in self.data
and "migrated" in self.data[item_id]
184 return {k: v
for k, v
in item.items()
if k != CONF_NAME
or migrated}
188 collection.StorageCollectionWebsocket[TagStorageCollection]
190 """Class to expose tag storage collection management over websocket."""
194 storage_collection: TagStorageCollection,
197 create_schema: VolDictType,
198 update_schema: VolDictType,
200 """Initialize a websocket for tag."""
202 storage_collection, api_prefix, model_name, create_schema, update_schema
210 """List items specifically for tag.
212 Provides name from entity_registry instead of storage collection.
215 for item
in self.storage_collection.async_items():
217 item = {k: v
for k, v
in item.items()
if k !=
"migrated"}
220 DOMAIN, DOMAIN, item[CONF_ID]
223 item[CONF_NAME] = entity.name
or entity.original_name
224 tag_items.append(item)
225 if _LOGGER.isEnabledFor(logging.DEBUG):
226 _LOGGER.debug(
"Listing tags %s", tag_items)
227 connection.send_result(msg[
"id"], tag_items)
230 async
def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
231 """Set up the Tag component."""
232 component = EntityComponent[TagEntity](LOGGER, DOMAIN, hass)
236 hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
240 await storage_collection.async_load()
242 storage_collection, DOMAIN, DOMAIN, CREATE_FIELDS, UPDATE_FIELDS
245 entity_registry = er.async_get(hass)
246 entity_update_handlers: dict[str, Callable[[str |
None, str |
None],
None]] = {}
248 async
def tag_change_listener(
249 change_type: str, item_id: str, updated_config: dict
251 """Tag storage change listener."""
253 if _LOGGER.isEnabledFor(logging.DEBUG):
255 "%s, item: %s, update: %s", change_type, item_id, updated_config
257 if change_type == collection.CHANGE_ADDED:
259 entity =
_create_entry(entity_registry, updated_config[CONF_ID],
None)
261 assert entity.original_name
262 await component.async_add_entities(
265 entity_update_handlers,
266 entity.name
or entity.original_name,
267 updated_config[CONF_ID],
268 updated_config.get(LAST_SCANNED),
269 updated_config.get(DEVICE_ID),
274 elif change_type == collection.CHANGE_UPDATED:
276 if handler := entity_update_handlers.get(updated_config[CONF_ID]):
278 updated_config.get(DEVICE_ID),
279 updated_config.get(LAST_SCANNED),
283 elif change_type == collection.CHANGE_REMOVED:
285 entity_id = entity_registry.async_get_entity_id(
286 DOMAIN, DOMAIN, updated_config[CONF_ID]
289 entity_registry.async_remove(entity_id)
291 storage_collection.async_add_listener(tag_change_listener)
293 entities: list[TagEntity] = []
294 for tag
in storage_collection.async_items():
295 if _LOGGER.isEnabledFor(logging.DEBUG):
296 _LOGGER.debug(
"Adding tag: %s", tag)
297 entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag[CONF_ID])
298 if entity_id := entity_registry.async_get_entity_id(
299 DOMAIN, DOMAIN, tag[CONF_ID]
301 entity = entity_registry.async_get(entity_id)
306 assert entity.original_name
307 name = entity.name
or entity.original_name
310 entity_update_handlers,
313 tag.get(LAST_SCANNED),
317 await component.async_add_entities(entities)
325 device_id: str |
None,
326 context: Context |
None =
None,
328 """Handle when a tag is scanned."""
329 if DOMAIN
not in hass.config.components:
332 storage_collection = hass.data[TAG_DATA]
333 entity_registry = er.async_get(hass)
334 entity_id = entity_registry.async_get_entity_id(DOMAIN, DOMAIN, tag_id)
338 if entity_id
and (entity := entity_registry.async_get(entity_id)):
339 tag_name = entity.name
or entity.original_name
343 {TAG_ID: tag_id, CONF_NAME: tag_name, DEVICE_ID: device_id},
349 extra_kwargs[DEVICE_ID] = device_id
350 if tag_id
in storage_collection.data:
351 if _LOGGER.isEnabledFor(logging.DEBUG):
352 _LOGGER.debug(
"Updating tag %s with extra %s", tag_id, extra_kwargs)
353 await storage_collection.async_update_item(
354 tag_id, {LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
357 if _LOGGER.isEnabledFor(logging.DEBUG):
358 _LOGGER.debug(
"Creating tag %s with extra %s", tag_id, extra_kwargs)
359 await storage_collection.async_create_item(
360 {TAG_ID: tag_id, LAST_SCANNED: dt_util.utcnow(), **extra_kwargs}
362 _LOGGER.debug(
"Tag: %s scanned by device: %s", tag_id, device_id)
366 """Representation of a Tag entity."""
368 _unrecorded_attributes = frozenset({TAG_ID})
369 _attr_should_poll =
False
373 entity_update_handlers: dict[str, Callable[[str |
None, str |
None],
None]],
376 last_scanned: str |
None,
377 device_id: str |
None,
379 """Initialize the Tag event."""
389 self, device_id: str |
None, last_scanned: str |
None
391 """Handle the Tag scan event."""
392 if _LOGGER.isEnabledFor(logging.DEBUG):
394 "Tag %s scanned by device %s at %s, last scanned at %s",
407 """Return the entity state."""
410 or (last_scanned := dt_util.parse_datetime(self.
_last_scanned_last_scanned))
is None
413 return last_scanned.isoformat(timespec=
"milliseconds")
417 """Return the state attributes of the sun."""
421 """Handle entity which will be added."""
426 """Handle entity being removed."""
None ws_list_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None __init__(self, TagStorageCollection storage_collection, str api_prefix, str model_name, VolDictType create_schema, VolDictType update_schema)
None async_added_to_hass(self)
dict[str, Any] extra_state_attributes(self)
None async_handle_event(self, str|None device_id, str|None last_scanned)
None async_will_remove_from_hass(self)
None __init__(self, dict[str, Callable[[str|None, str|None], None]] entity_update_handlers, str name, str tag_id, str|None last_scanned, str|None device_id)
None __init__(self, str item_id)
str generate_id(self, str suggestion)
None async_write_ha_state(self)
bool async_setup(HomeAssistant hass, ConfigType config)
er.RegistryEntry _create_entry(er.EntityRegistry entity_registry, str tag_id, str|None name)
None async_scan_tag(HomeAssistant hass, str tag_id, str|None device_id, Context|None context=None)
AreaRegistry async_get(HomeAssistant hass)
None async_update_entity(HomeAssistant hass, str entity_id)
str slugify(str|None text, *str separator="_")