Home Assistant Unofficial Reference 2024.12.1
entity_platform.py
Go to the documentation of this file.
1 """Class to manage the entities for a single platform."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Awaitable, Callable, Coroutine, Iterable
7 from contextvars import ContextVar
8 from datetime import timedelta
9 from logging import Logger, getLogger
10 from typing import TYPE_CHECKING, Any, Protocol
11 
12 from homeassistant import config_entries
13 from homeassistant.const import (
14  ATTR_RESTORED,
15  DEVICE_DEFAULT_NAME,
16  EVENT_HOMEASSISTANT_STARTED,
17 )
18 from homeassistant.core import (
19  CALLBACK_TYPE,
20  DOMAIN as HOMEASSISTANT_DOMAIN,
21  CoreState,
22  HomeAssistant,
23  ServiceCall,
24  SupportsResponse,
25  callback,
26  split_entity_id,
27  valid_entity_id,
28 )
29 from homeassistant.exceptions import (
30  ConfigEntryAuthFailed,
31  ConfigEntryError,
32  ConfigEntryNotReady,
33  HomeAssistantError,
34  PlatformNotReady,
35 )
36 from homeassistant.generated import languages
37 from homeassistant.setup import SetupPhases, async_start_setup
38 from homeassistant.util.async_ import create_eager_task
39 from homeassistant.util.hass_dict import HassKey
40 
41 from . import (
42  device_registry as dev_reg,
43  entity_registry as ent_reg,
44  service,
45  translation,
46 )
47 from .entity_registry import EntityRegistry, RegistryEntryDisabler, RegistryEntryHider
48 from .event import async_call_later
49 from .issue_registry import IssueSeverity, async_create_issue
50 from .typing import UNDEFINED, ConfigType, DiscoveryInfoType, VolDictType, VolSchemaType
51 
52 if TYPE_CHECKING:
53  from .entity import Entity
54 
55 
56 SLOW_SETUP_WARNING = 10
57 SLOW_SETUP_MAX_WAIT = 60
58 SLOW_ADD_ENTITY_MAX_WAIT = 15 # Per Entity
59 SLOW_ADD_MIN_TIMEOUT = 500
60 
61 PLATFORM_NOT_READY_RETRIES = 10
62 DATA_ENTITY_PLATFORM: HassKey[dict[str, list[EntityPlatform]]] = HassKey(
63  "entity_platform"
64 )
65 DATA_DOMAIN_ENTITIES: HassKey[dict[str, dict[str, Entity]]] = HassKey("domain_entities")
66 DATA_DOMAIN_PLATFORM_ENTITIES: HassKey[dict[tuple[str, str], dict[str, Entity]]] = (
67  HassKey("domain_platform_entities")
68 )
69 PLATFORM_NOT_READY_BASE_WAIT_TIME = 30 # seconds
70 
71 _LOGGER = getLogger(__name__)
72 
73 
74 class AddEntitiesCallback(Protocol):
75  """Protocol type for EntityPlatform.add_entities callback."""
76 
77  def __call__(
78  self, new_entities: Iterable[Entity], update_before_add: bool = False
79  ) -> None:
80  """Define add_entities type."""
81 
82 
83 class EntityPlatformModule(Protocol):
84  """Protocol type for entity platform modules."""
85 
87  self,
88  hass: HomeAssistant,
89  config: ConfigType,
90  async_add_entities: AddEntitiesCallback,
91  discovery_info: DiscoveryInfoType | None = None,
92  ) -> None:
93  """Set up an integration platform async."""
94 
96  self,
97  hass: HomeAssistant,
98  config: ConfigType,
99  add_entities: AddEntitiesCallback,
100  discovery_info: DiscoveryInfoType | None = None,
101  ) -> None:
102  """Set up an integration platform."""
103 
104  async def async_setup_entry(
105  self,
106  hass: HomeAssistant,
108  async_add_entities: AddEntitiesCallback,
109  ) -> None:
110  """Set up an integration platform from a config entry."""
111 
112 
114  """Manage the entities for a single platform.
115 
116  An example of an entity platform is 'hue.light', which is managed by
117  the entity component 'light'.
118  """
119 
120  def __init__(
121  self,
122  *,
123  hass: HomeAssistant,
124  logger: Logger,
125  domain: str,
126  platform_name: str,
127  platform: EntityPlatformModule | None,
128  scan_interval: timedelta,
129  entity_namespace: str | None,
130  ) -> None:
131  """Initialize the entity platform."""
132  self.hasshass = hass
133  self.loggerlogger = logger
134  self.domaindomain = domain
135  self.platform_nameplatform_name = platform_name
136  self.platformplatform = platform
137  self.scan_intervalscan_interval = scan_interval
138  self.scan_interval_secondsscan_interval_seconds = scan_interval.total_seconds()
139  self.entity_namespaceentity_namespace = entity_namespace
140  self.config_entryconfig_entry: config_entries.ConfigEntry | None = None
141  # Storage for entities for this specific platform only
142  # which are indexed by entity_id
143  self.entities: dict[str, Entity] = {}
144  self.component_translationscomponent_translations: dict[str, str] = {}
145  self.platform_translationsplatform_translations: dict[str, str] = {}
146  self.object_id_component_translationsobject_id_component_translations: dict[str, str] = {}
147  self.object_id_platform_translationsobject_id_platform_translations: dict[str, str] = {}
148  self.default_language_platform_translationsdefault_language_platform_translations: dict[str, str] = {}
149  self._tasks: list[asyncio.Task[None]] = []
150  # Stop tracking tasks after setup is completed
151  self._setup_complete_setup_complete = False
152  # Method to cancel the state change listener
153  self._async_polling_timer_async_polling_timer: asyncio.TimerHandle | None = None
154  # Method to cancel the retry of setup
155  self._async_cancel_retry_setup_async_cancel_retry_setup: CALLBACK_TYPE | None = None
156  self._process_updates_process_updates: asyncio.Lock | None = None
157 
158  self.parallel_updatesparallel_updates: asyncio.Semaphore | None = None
159  self._update_in_sequence_update_in_sequence: bool = False
160 
161  # Platform is None for the EntityComponent "catch-all" EntityPlatform
162  # which powers entity_component.add_entities
163  self.parallel_updates_createdparallel_updates_created = platform is None
164 
165  # Storage for entities indexed by domain
166  # with the child dict indexed by entity_id
167  #
168  # This is usually media_player, light, switch, etc.
169  self.domain_entitiesdomain_entities = hass.data.setdefault(
170  DATA_DOMAIN_ENTITIES, {}
171  ).setdefault(domain, {})
172 
173  # Storage for entities indexed by domain and platform
174  # with the child dict indexed by entity_id
175  #
176  # This is usually media_player.yamaha, light.hue, switch.tplink, etc.
177  key = (domain, platform_name)
178  self.domain_platform_entitiesdomain_platform_entities = hass.data.setdefault(
179  DATA_DOMAIN_PLATFORM_ENTITIES, {}
180  ).setdefault(key, {})
181 
182  def __repr__(self) -> str:
183  """Represent an EntityPlatform."""
184  return (
185  "<EntityPlatform "
186  f"domain={self.domain} "
187  f"platform_name={self.platform_name} "
188  f"config_entry={self.config_entry}>"
189  )
190 
191  @callback
193  self, entity_has_sync_update: bool
194  ) -> asyncio.Semaphore | None:
195  """Get or create a semaphore for parallel updates.
196 
197  Semaphore will be created on demand because we base it off if update
198  method is async or not.
199 
200  - If parallel updates is set to 0, we skip the semaphore.
201  - If parallel updates is set to a number, we initialize the semaphore
202  to that number.
203 
204  The default value for parallel requests is decided based on the first
205  entity of the platform which is added to Home Assistant. It's 1 if the
206  entity implements the update method, else it's 0.
207  """
208  if self.parallel_updates_createdparallel_updates_created:
209  return self.parallel_updatesparallel_updates
210 
211  self.parallel_updates_createdparallel_updates_created = True
212 
213  parallel_updates = getattr(self.platformplatform, "PARALLEL_UPDATES", None)
214 
215  if parallel_updates is None and entity_has_sync_update:
216  parallel_updates = 1
217 
218  if parallel_updates == 0:
219  parallel_updates = None
220 
221  if parallel_updates is not None:
222  self.parallel_updatesparallel_updates = asyncio.Semaphore(parallel_updates)
223  self._update_in_sequence_update_in_sequence = parallel_updates == 1
224 
225  return self.parallel_updatesparallel_updates
226 
227  async def async_setup(
228  self,
229  platform_config: ConfigType,
230  discovery_info: DiscoveryInfoType | None = None,
231  ) -> None:
232  """Set up the platform from a config file."""
233  platform = self.platformplatform
234  hass = self.hasshass
235 
236  if not hasattr(platform, "async_setup_platform") and not hasattr(
237  platform, "setup_platform"
238  ):
239  self.loggerlogger.error(
240  (
241  "The %s platform for the %s integration does not support platform"
242  " setup. Please remove it from your config."
243  ),
244  self.platform_nameplatform_name,
245  self.domaindomain,
246  )
247  learn_more_url = None
248  if self.platformplatform and "custom_components" not in self.platformplatform.__file__: # type: ignore[attr-defined]
249  learn_more_url = (
250  f"https://www.home-assistant.io/integrations/{self.platform_name}/"
251  )
252  platform_key = f"platform: {self.platform_name}"
253  yaml_example = f"```yaml\n{self.domain}:\n - {platform_key}\n```"
255  self.hasshass,
256  HOMEASSISTANT_DOMAIN,
257  f"platform_integration_no_support_{self.domain}_{self.platform_name}",
258  is_fixable=False,
259  issue_domain=self.platform_nameplatform_name,
260  learn_more_url=learn_more_url,
261  severity=IssueSeverity.ERROR,
262  translation_key="no_platform_setup",
263  translation_placeholders={
264  "domain": self.domaindomain,
265  "platform": self.platform_nameplatform_name,
266  "platform_key": platform_key,
267  "yaml_example": yaml_example,
268  },
269  )
270 
271  return
272 
273  @callback
274  def async_create_setup_awaitable() -> (
275  Coroutine[Any, Any, None] | asyncio.Future[None]
276  ):
277  """Get task to set up platform."""
278  if getattr(platform, "async_setup_platform", None):
279  return platform.async_setup_platform( # type: ignore[union-attr]
280  hass,
281  platform_config,
282  self._async_schedule_add_entities_async_schedule_add_entities,
283  discovery_info,
284  )
285 
286  # This should not be replaced with hass.async_add_job because
287  # we don't want to track this task in case it blocks startup.
288  return hass.loop.run_in_executor(
289  None,
290  platform.setup_platform, # type: ignore[union-attr]
291  hass,
292  platform_config,
293  self._schedule_add_entities_schedule_add_entities,
294  discovery_info,
295  )
296 
297  with async_start_setup(
298  hass,
299  integration=self.platform_nameplatform_name,
300  group=str(id(platform_config)),
301  phase=SetupPhases.PLATFORM_SETUP,
302  ):
303  await self._async_setup_platform_async_setup_platform(async_create_setup_awaitable)
304 
305  @callback
306  def async_shutdown(self) -> None:
307  """Call when Home Assistant is stopping."""
308  self.async_cancel_retry_setupasync_cancel_retry_setup()
309  self.async_unsub_pollingasync_unsub_polling()
310 
311  @callback
312  def async_cancel_retry_setup(self) -> None:
313  """Cancel retry setup."""
314  if self._async_cancel_retry_setup_async_cancel_retry_setup is not None:
315  self._async_cancel_retry_setup_async_cancel_retry_setup()
316  self._async_cancel_retry_setup_async_cancel_retry_setup = None
317 
318  async def async_setup_entry(self, config_entry: config_entries.ConfigEntry) -> bool:
319  """Set up the platform from a config entry."""
320  # Store it so that we can save config entry ID in entity registry
321  self.config_entryconfig_entry = config_entry
322  platform = self.platformplatform
323 
324  @callback
325  def async_create_setup_awaitable() -> Coroutine[Any, Any, None]:
326  """Get task to set up platform."""
327  config_entries.current_entry.set(config_entry)
328 
329  return platform.async_setup_entry( # type: ignore[union-attr]
330  self.hasshass, config_entry, self._async_schedule_add_entities_for_entry_async_schedule_add_entities_for_entry
331  )
332 
333  return await self._async_setup_platform_async_setup_platform(async_create_setup_awaitable)
334 
336  self,
337  async_create_setup_awaitable: Callable[[], Awaitable[None]],
338  tries: int = 0,
339  ) -> bool:
340  """Set up a platform via config file or config entry.
341 
342  async_create_setup_awaitable creates an awaitable that sets up platform.
343  """
344  current_platform.set(self)
345  logger = self.loggerlogger
346  hass = self.hasshass
347  full_name = f"{self.platform_name}.{self.domain}"
348 
349  await self.async_load_translationsasync_load_translations()
350 
351  logger.info("Setting up %s", full_name)
352  warn_task = hass.loop.call_at(
353  hass.loop.time() + SLOW_SETUP_WARNING,
354  logger.warning,
355  "Setup of %s platform %s is taking over %s seconds.",
356  self.domaindomain,
357  self.platform_nameplatform_name,
358  SLOW_SETUP_WARNING,
359  )
360  try:
361  awaitable = async_create_setup_awaitable()
362  if asyncio.iscoroutine(awaitable):
363  awaitable = create_eager_task(awaitable, loop=hass.loop)
364 
365  async with hass.timeout.async_timeout(SLOW_SETUP_MAX_WAIT, self.domaindomain):
366  await asyncio.shield(awaitable)
367 
368  # Block till all entities are done
369  while self._tasks:
370  # Await all tasks even if they are done
371  # to ensure exceptions are propagated
372  pending = self._tasks.copy()
373  self._tasks.clear()
374  await asyncio.gather(*pending)
375  except PlatformNotReady as ex:
376  tries += 1
377  wait_time = min(tries, 6) * PLATFORM_NOT_READY_BASE_WAIT_TIME
378  message = str(ex)
379  ready_message = f"ready yet: {message}" if message else "ready yet"
380  if tries == 1:
381  logger.warning(
382  "Platform %s not %s; Retrying in background in %d seconds",
383  self.platform_nameplatform_name,
384  ready_message,
385  wait_time,
386  )
387  else:
388  logger.debug(
389  "Platform %s not %s; Retrying in %d seconds",
390  self.platform_nameplatform_name,
391  ready_message,
392  wait_time,
393  )
394 
395  async def setup_again(*_args: Any) -> None:
396  """Run setup again."""
397  self._async_cancel_retry_setup_async_cancel_retry_setup = None
398  await self._async_setup_platform_async_setup_platform(async_create_setup_awaitable, tries)
399 
400  if hass.state is CoreState.running:
401  self._async_cancel_retry_setup_async_cancel_retry_setup = async_call_later(
402  hass, wait_time, setup_again
403  )
404  else:
405  self._async_cancel_retry_setup_async_cancel_retry_setup = hass.bus.async_listen_once(
406  EVENT_HOMEASSISTANT_STARTED, setup_again
407  )
408  return False
409  except TimeoutError:
410  logger.error(
411  (
412  "Setup of platform %s is taking longer than %s seconds."
413  " Startup will proceed without waiting any longer."
414  ),
415  self.platform_nameplatform_name,
416  SLOW_SETUP_MAX_WAIT,
417  )
418  return False
419  except (ConfigEntryNotReady, ConfigEntryAuthFailed, ConfigEntryError) as exc:
420  _LOGGER.error(
421  "%s raises exception %s in forwarded platform "
422  "%s; Instead raise %s before calling async_forward_entry_setups",
423  self.platform_nameplatform_name,
424  type(exc).__name__,
425  self.domaindomain,
426  type(exc).__name__,
427  )
428  return False
429  except Exception:
430  logger.exception(
431  "Error while setting up %s platform for %s",
432  self.platform_nameplatform_name,
433  self.domaindomain,
434  )
435  return False
436  else:
437  hass.config.components.add(full_name)
438  self._setup_complete_setup_complete = True
439  return True
440  finally:
441  warn_task.cancel()
442 
444  self, language: str, category: str, integration: str
445  ) -> dict[str, str]:
446  """Get translations for a language, category, and integration."""
447  try:
448  return await translation.async_get_translations(
449  self.hasshass, language, category, {integration}
450  )
451  except Exception as err: # noqa: BLE001
452  _LOGGER.debug(
453  "Could not load translations for %s",
454  integration,
455  exc_info=err,
456  )
457  return {}
458 
459  async def async_load_translations(self) -> None:
460  """Load translations."""
461  hass = self.hasshass
462  object_id_language = (
463  hass.config.language
464  if hass.config.language in languages.NATIVE_ENTITY_IDS
465  else languages.DEFAULT_LANGUAGE
466  )
467  config_language = hass.config.language
468  self.component_translationscomponent_translations = await self._async_get_translations_async_get_translations(
469  config_language, "entity_component", self.domaindomain
470  )
471  self.platform_translationsplatform_translations = await self._async_get_translations_async_get_translations(
472  config_language, "entity", self.platform_nameplatform_name
473  )
474  if object_id_language == config_language:
475  self.object_id_component_translationsobject_id_component_translations = self.component_translationscomponent_translations
476  self.object_id_platform_translationsobject_id_platform_translations = self.platform_translationsplatform_translations
477  else:
478  self.object_id_component_translationsobject_id_component_translations = await self._async_get_translations_async_get_translations(
479  object_id_language, "entity_component", self.domaindomain
480  )
481  self.object_id_platform_translationsobject_id_platform_translations = await self._async_get_translations_async_get_translations(
482  object_id_language, "entity", self.platform_nameplatform_name
483  )
484  if config_language == languages.DEFAULT_LANGUAGE:
485  self.default_language_platform_translationsdefault_language_platform_translations = self.platform_translationsplatform_translations
486  else:
487  self.default_language_platform_translationsdefault_language_platform_translations = (
488  await self._async_get_translations_async_get_translations(
489  languages.DEFAULT_LANGUAGE, "entity", self.platform_nameplatform_name
490  )
491  )
492 
494  self, new_entities: Iterable[Entity], update_before_add: bool = False
495  ) -> None:
496  """Schedule adding entities for a single platform, synchronously."""
497  self.hasshass.loop.call_soon_threadsafe(
498  self._async_schedule_add_entities_async_schedule_add_entities,
499  list(new_entities),
500  update_before_add,
501  )
502 
503  @callback
505  self, new_entities: Iterable[Entity], update_before_add: bool = False
506  ) -> None:
507  """Schedule adding entities for a single platform async."""
508  task = self.hasshass.async_create_task_internal(
509  self.async_add_entitiesasync_add_entities(new_entities, update_before_add=update_before_add),
510  f"EntityPlatform async_add_entities {self.domain}.{self.platform_name}",
511  eager_start=True,
512  )
513 
514  if not self._setup_complete_setup_complete:
515  self._tasks.append(task)
516 
517  @callback
519  self, new_entities: Iterable[Entity], update_before_add: bool = False
520  ) -> None:
521  """Schedule adding entities for a single platform async and track the task."""
522  assert self.config_entryconfig_entry
523  task = self.config_entryconfig_entry.async_create_task(
524  self.hasshass,
525  self.async_add_entitiesasync_add_entities(new_entities, update_before_add=update_before_add),
526  f"EntityPlatform async_add_entities_for_entry {self.domain}.{self.platform_name}",
527  eager_start=True,
528  )
529 
530  if not self._setup_complete_setup_complete:
531  self._tasks.append(task)
532 
534  self, new_entities: Iterable[Entity], update_before_add: bool = False
535  ) -> None:
536  """Add entities for a single platform."""
537  # That avoid deadlocks
538  if update_before_add:
539  self.loggerlogger.warning(
540  "Call 'add_entities' with update_before_add=True "
541  "only inside tests or you can run into a deadlock!"
542  )
543 
544  asyncio.run_coroutine_threadsafe(
545  self.async_add_entitiesasync_add_entities(list(new_entities), update_before_add),
546  self.hasshass.loop,
547  ).result()
548 
550  self,
551  coros: list[Coroutine[Any, Any, None]],
552  entities: list[Entity],
553  timeout: float,
554  ) -> None:
555  """Add entities for a single platform and update them.
556 
557  Since we are updating the entities before adding them, we need to
558  schedule the coroutines as tasks so we can await them in the event
559  loop. This is because the update is likely to yield control to the
560  event loop and will finish faster if we run them concurrently.
561  """
562  results: list[BaseException | None] | None = None
563  tasks = [create_eager_task(coro, loop=self.hasshass.loop) for coro in coros]
564  try:
565  async with self.hasshass.timeout.async_timeout(timeout, self.domaindomain):
566  results = await asyncio.gather(*tasks, return_exceptions=True)
567  except TimeoutError:
568  self.loggerlogger.warning(
569  "Timed out adding entities for domain %s with platform %s after %ds",
570  self.domaindomain,
571  self.platform_nameplatform_name,
572  timeout,
573  )
574 
575  if not results:
576  return
577 
578  for idx, result in enumerate(results):
579  if isinstance(result, Exception):
580  entity = entities[idx]
581  self.loggerlogger.exception(
582  "Error adding entity %s for domain %s with platform %s",
583  entity.entity_id,
584  self.domaindomain,
585  self.platform_nameplatform_name,
586  exc_info=result,
587  )
588  elif isinstance(result, BaseException):
589  raise result
590 
592  self,
593  coros: list[Coroutine[Any, Any, None]],
594  entities: list[Entity],
595  timeout: float,
596  ) -> None:
597  """Add entities for a single platform without updating.
598 
599  In this case we are not updating the entities before adding them
600  which means it is likely that we will not have to yield control
601  to the event loop so we can await the coros directly without
602  scheduling them as tasks.
603  """
604  try:
605  async with self.hasshass.timeout.async_timeout(timeout, self.domaindomain):
606  for idx, coro in enumerate(coros):
607  try:
608  await coro
609  except Exception as ex:
610  entity = entities[idx]
611  self.loggerlogger.exception(
612  "Error adding entity %s for domain %s with platform %s",
613  entity.entity_id,
614  self.domaindomain,
615  self.platform_nameplatform_name,
616  exc_info=ex,
617  )
618  except TimeoutError:
619  self.loggerlogger.warning(
620  "Timed out adding entities for domain %s with platform %s after %ds",
621  self.domaindomain,
622  self.platform_nameplatform_name,
623  timeout,
624  )
625 
627  self, new_entities: Iterable[Entity], update_before_add: bool = False
628  ) -> None:
629  """Add entities for a single platform async.
630 
631  This method must be run in the event loop.
632  """
633  # handle empty list from component/platform
634  if not new_entities: # type: ignore[truthy-iterable]
635  return
636 
637  hass = self.hasshass
638  entity_registry = ent_reg.async_get(hass)
639  coros: list[Coroutine[Any, Any, None]] = []
640  entities: list[Entity] = []
641  for entity in new_entities:
642  coros.append(
643  self._async_add_entity_async_add_entity(entity, update_before_add, entity_registry)
644  )
645  entities.append(entity)
646 
647  # No entities for processing
648  if not coros:
649  return
650 
651  timeout = max(SLOW_ADD_ENTITY_MAX_WAIT * len(coros), SLOW_ADD_MIN_TIMEOUT)
652  if update_before_add:
653  add_func = self._async_add_and_update_entities_async_add_and_update_entities
654  else:
655  add_func = self._async_add_entities_async_add_entities
656 
657  await add_func(coros, entities, timeout)
658 
659  if (
660  (self.config_entryconfig_entry and self.config_entryconfig_entry.pref_disable_polling)
661  or self._async_polling_timer_async_polling_timer is not None
662  or not any(
663  # Entity may have failed to add or called `add_to_platform_abort`
664  # so we check if the entity is in self.entities before
665  # checking `entity.should_poll` since `should_poll` may need to
666  # check `self.hass` which will be `None` if the entity did not add
667  entity.entity_id
668  and entity.entity_id in self.entities
669  and entity.should_poll
670  for entity in entities
671  )
672  ):
673  return
674 
675  self._async_polling_timer_async_polling_timer = self.hasshass.loop.call_later(
676  self.scan_interval_secondsscan_interval_seconds,
677  self._async_handle_interval_callback_async_handle_interval_callback,
678  )
679 
680  @callback
682  """Update all the entity states in a single platform."""
683  self._async_polling_timer_async_polling_timer = self.hasshass.loop.call_later(
684  self.scan_interval_secondsscan_interval_seconds,
685  self._async_handle_interval_callback_async_handle_interval_callback,
686  )
687  if self.config_entryconfig_entry:
688  self.config_entryconfig_entry.async_create_background_task(
689  self.hasshass,
690  self._async_update_entity_states_async_update_entity_states(),
691  name=f"EntityPlatform poll {self.domain}.{self.platform_name}",
692  eager_start=True,
693  )
694  else:
695  self.hasshass.async_create_background_task(
696  self._async_update_entity_states_async_update_entity_states(),
697  name=f"EntityPlatform poll {self.domain}.{self.platform_name}",
698  eager_start=True,
699  )
700 
701  def _entity_id_already_exists(self, entity_id: str) -> tuple[bool, bool]:
702  """Check if an entity_id already exists.
703 
704  Returns a tuple [already_exists, restored]
705  """
706  already_exists = entity_id in self.entities
707  restored = False
708 
709  if not already_exists and not self.hasshass.states.async_available(entity_id):
710  existing = self.hasshass.states.get(entity_id)
711  if existing is not None and ATTR_RESTORED in existing.attributes:
712  restored = True
713  else:
714  already_exists = True
715  return (already_exists, restored)
716 
717  async def _async_add_entity( # noqa: C901
718  self,
719  entity: Entity,
720  update_before_add: bool,
721  entity_registry: EntityRegistry,
722  ) -> None:
723  """Add an entity to the platform."""
724  if entity is None:
725  raise ValueError("Entity cannot be None")
726 
727  entity.add_to_platform_start(
728  self.hasshass,
729  self,
730  self._get_parallel_updates_semaphore_get_parallel_updates_semaphore(hasattr(entity, "update")),
731  )
732 
733  # Update properties before we generate the entity_id. This will happen
734  # also for disabled entities.
735  if update_before_add:
736  try:
737  await entity.async_device_update(warning=False)
738  except Exception:
739  self.loggerlogger.exception("%s: Error on device update!", self.platform_nameplatform_name)
740  entity.add_to_platform_abort()
741  return
742 
743  suggested_object_id: str | None = None
744 
745  entity_name = entity.name
746  if entity_name is UNDEFINED:
747  entity_name = None
748 
749  # Get entity_id from unique ID registration
750  if entity.unique_id is not None:
751  registered_entity_id = entity_registry.async_get_entity_id(
752  self.domaindomain, self.platform_nameplatform_name, entity.unique_id
753  )
754  if registered_entity_id:
755  already_exists, _ = self._entity_id_already_exists_entity_id_already_exists(registered_entity_id)
756 
757  if already_exists:
758  # If there's a collision, the entry belongs to another entity
759  entity.registry_entry = None
760  msg = (
761  f"Platform {self.platform_name} does not generate unique IDs. "
762  )
763  if entity.entity_id:
764  msg += (
765  f"ID {entity.unique_id} is already used by"
766  f" {registered_entity_id} - ignoring {entity.entity_id}"
767  )
768  else:
769  msg += (
770  f"ID {entity.unique_id} already exists - ignoring"
771  f" {registered_entity_id}"
772  )
773  self.loggerlogger.error(msg)
774  entity.add_to_platform_abort()
775  return
776 
777  if self.config_entryconfig_entry and (device_info := entity.device_info):
778  try:
779  device = dev_reg.async_get(self.hasshass).async_get_or_create(
780  config_entry_id=self.config_entryconfig_entry.entry_id,
781  **device_info,
782  )
783  except dev_reg.DeviceInfoError as exc:
784  self.loggerlogger.error(
785  "%s: Not adding entity with invalid device info: %s",
786  self.platform_nameplatform_name,
787  str(exc),
788  )
789  entity.add_to_platform_abort()
790  return
791  else:
792  device = None
793 
794  # An entity may suggest the entity_id by setting entity_id itself
795  suggested_entity_id: str | None = entity.entity_id
796  if suggested_entity_id is not None:
797  suggested_object_id = split_entity_id(entity.entity_id)[1]
798  else:
799  if device and entity.has_entity_name:
800  device_name = device.name_by_user or device.name
801  if entity.use_device_name:
802  suggested_object_id = device_name
803  else:
804  suggested_object_id = (
805  f"{device_name} {entity.suggested_object_id}"
806  )
807  if not suggested_object_id:
808  suggested_object_id = entity.suggested_object_id
809 
810  if self.entity_namespaceentity_namespace is not None:
811  suggested_object_id = f"{self.entity_namespace} {suggested_object_id}"
812 
813  disabled_by: RegistryEntryDisabler | None = None
814  if not entity.entity_registry_enabled_default:
815  disabled_by = RegistryEntryDisabler.INTEGRATION
816 
817  hidden_by: RegistryEntryHider | None = None
818  if not entity.entity_registry_visible_default:
819  hidden_by = RegistryEntryHider.INTEGRATION
820 
821  entry = entity_registry.async_get_or_create(
822  self.domaindomain,
823  self.platform_nameplatform_name,
824  entity.unique_id,
825  capabilities=entity.capability_attributes,
826  config_entry=self.config_entryconfig_entry,
827  device_id=device.id if device else None,
828  disabled_by=disabled_by,
829  entity_category=entity.entity_category,
830  get_initial_options=entity.get_initial_entity_options,
831  has_entity_name=entity.has_entity_name,
832  hidden_by=hidden_by,
833  known_object_ids=self.entities,
834  original_device_class=entity.device_class,
835  original_icon=entity.icon,
836  original_name=entity_name,
837  suggested_object_id=suggested_object_id,
838  supported_features=entity.supported_features,
839  translation_key=entity.translation_key,
840  unit_of_measurement=entity.unit_of_measurement,
841  )
842 
843  if device and device.disabled and not entry.disabled:
844  entry = entity_registry.async_update_entity(
845  entry.entity_id, disabled_by=RegistryEntryDisabler.DEVICE
846  )
847 
848  entity.registry_entry = entry
849  if device:
850  entity.device_entry = device
851  entity.entity_id = entry.entity_id
852 
853  else: # entity.unique_id is None
854  generate_new_entity_id = False
855  # We won't generate an entity ID if the platform has already set one
856  # We will however make sure that platform cannot pick a registered ID
857  if entity.entity_id is not None and entity_registry.async_is_registered(
858  entity.entity_id
859  ):
860  # If entity already registered, convert entity id to suggestion
861  suggested_object_id = split_entity_id(entity.entity_id)[1]
862  generate_new_entity_id = True
863 
864  # Generate entity ID
865  if entity.entity_id is None or generate_new_entity_id:
866  suggested_object_id = (
867  suggested_object_id
868  or entity.suggested_object_id
869  or DEVICE_DEFAULT_NAME
870  )
871 
872  if self.entity_namespaceentity_namespace is not None:
873  suggested_object_id = (
874  f"{self.entity_namespace} {suggested_object_id}"
875  )
876  entity.entity_id = entity_registry.async_generate_entity_id(
877  self.domaindomain, suggested_object_id, self.entities
878  )
879 
880  # Make sure it is valid in case an entity set the value themselves
881  # Avoid calling valid_entity_id if we already know it is valid
882  # since it already made it in the registry
883  if not valid_entity_id(entity.entity_id):
884  entity.add_to_platform_abort()
885  raise HomeAssistantError(f"Invalid entity ID: {entity.entity_id}")
886 
887  already_exists, restored = self._entity_id_already_exists_entity_id_already_exists(entity.entity_id)
888 
889  if already_exists:
890  self.loggerlogger.error(
891  "Entity id already exists - ignoring: %s", entity.entity_id
892  )
893  entity.add_to_platform_abort()
894  return
895 
896  if entity.registry_entry and entity.registry_entry.disabled:
897  self.loggerlogger.debug(
898  "Not adding entity %s because it's disabled",
899  entry.name
900  or entity_name
901  or f'"{self.platform_name} {entity.unique_id}"',
902  )
903  entity.add_to_platform_abort()
904  return
905 
906  entity_id = entity.entity_id
907  self.entities[entity_id] = entity
908  self.domain_entitiesdomain_entities[entity_id] = entity
909  self.domain_platform_entitiesdomain_platform_entities[entity_id] = entity
910 
911  if not restored:
912  # Reserve the state in the state machine
913  # because as soon as we return control to the event
914  # loop below, another entity could be added
915  # with the same id before `entity.add_to_platform_finish()`
916  # has a chance to finish.
917  self.hasshass.states.async_reserve(entity.entity_id)
918 
919  def remove_entity_cb() -> None:
920  """Remove entity from entities dict."""
921  del self.entities[entity_id]
922  del self.domain_entitiesdomain_entities[entity_id]
923  del self.domain_platform_entitiesdomain_platform_entities[entity_id]
924 
925  entity.async_on_remove(remove_entity_cb)
926 
927  await entity.add_to_platform_finish()
928 
929  async def async_reset(self) -> None:
930  """Remove all entities and reset data.
931 
932  This method must be run in the event loop.
933  """
934  self.async_cancel_retry_setupasync_cancel_retry_setup()
935 
936  if not self.entities:
937  return
938 
939  # Removals are awaited in series since in most
940  # cases calling async_remove will not yield control
941  # to the event loop and we want to avoid scheduling
942  # one task per entity.
943  for entity in list(self.entities.values()):
944  try:
945  await entity.async_remove()
946  except Exception:
947  self.loggerlogger.exception(
948  "Error while removing entity %s", entity.entity_id
949  )
950 
951  self.async_unsub_pollingasync_unsub_polling()
952  self._setup_complete_setup_complete = False
953 
954  @callback
955  def async_unsub_polling(self) -> None:
956  """Stop polling."""
957  if self._async_polling_timer_async_polling_timer is not None:
958  self._async_polling_timer_async_polling_timer.cancel()
959  self._async_polling_timer_async_polling_timer = None
960 
961  @callback
962  def async_prepare(self) -> None:
963  """Register the entity platform in DATA_ENTITY_PLATFORM."""
964  self.hasshass.data.setdefault(DATA_ENTITY_PLATFORM, {}).setdefault(
965  self.platform_nameplatform_name, []
966  ).append(self)
967 
968  async def async_destroy(self) -> None:
969  """Destroy an entity platform.
970 
971  Call before discarding the object.
972  """
973  await self.async_resetasync_reset()
974  self.hasshass.data[DATA_ENTITY_PLATFORM][self.platform_nameplatform_name].remove(self)
975 
976  async def async_remove_entity(self, entity_id: str) -> None:
977  """Remove entity id from platform."""
978  await self.entities[entity_id].async_remove()
979 
980  # Clean up polling job if no longer needed
981  if self._async_polling_timer_async_polling_timer is not None and not any(
982  entity.should_poll for entity in self.entities.values()
983  ):
984  self.async_unsub_pollingasync_unsub_polling()
985 
987  self, service_call: ServiceCall, expand_group: bool = True
988  ) -> list[Entity]:
989  """Extract all known and available entities from a service call.
990 
991  Will return an empty list if entities specified but unknown.
992 
993  This method must be run in the event loop.
994  """
995  return await service.async_extract_entities(
996  self.hasshass, self.entities.values(), service_call, expand_group
997  )
998 
999  @callback
1001  self,
1002  name: str,
1003  schema: VolDictType | VolSchemaType | None,
1004  func: str | Callable[..., Any],
1005  required_features: Iterable[int] | None = None,
1006  supports_response: SupportsResponse = SupportsResponse.NONE,
1007  ) -> None:
1008  """Register an entity service.
1009 
1010  Services will automatically be shared by all platforms of the same domain.
1011  """
1012  if self.hasshass.services.has_service(self.platform_nameplatform_name, name):
1013  return
1014 
1015  service.async_register_entity_service(
1016  self.hasshass,
1017  self.platform_nameplatform_name,
1018  name,
1019  entities=self.domain_platform_entitiesdomain_platform_entities,
1020  func=func,
1021  job_type=None,
1022  required_features=required_features,
1023  schema=schema,
1024  supports_response=supports_response,
1025  )
1026 
1027  async def _async_update_entity_states(self) -> None:
1028  """Update the states of all the polling entities.
1029 
1030  To protect from flooding the executor, we will update async entities
1031  in parallel and other entities sequential.
1032 
1033  This method must be run in the event loop.
1034  """
1035  if self._process_updates_process_updates is None:
1036  self._process_updates_process_updates = asyncio.Lock()
1037  if self._process_updates_process_updates.locked():
1038  self.loggerlogger.warning(
1039  "Updating %s %s took longer than the scheduled update interval %s",
1040  self.platform_nameplatform_name,
1041  self.domaindomain,
1042  self.scan_intervalscan_interval,
1043  )
1044  return
1045 
1046  async with self._process_updates_process_updates:
1047  if self._update_in_sequence_update_in_sequence or len(self.entities) <= 1:
1048  # If we know we will update sequentially, we want to avoid scheduling
1049  # the coroutines as tasks that will wait on the semaphore lock.
1050  for entity in list(self.entities.values()):
1051  # If the entity is removed from hass during the previous
1052  # entity being updated, we need to skip updating the
1053  # entity.
1054  if entity.should_poll and entity.hass:
1055  await entity.async_update_ha_state(True)
1056  return
1057 
1058  if tasks := [
1059  create_eager_task(
1060  entity.async_update_ha_state(True), loop=self.hasshass.loop
1061  )
1062  for entity in self.entities.values()
1063  if entity.should_poll
1064  ]:
1065  await asyncio.gather(*tasks)
1066 
1067 
1068 current_platform: ContextVar[EntityPlatform | None] = ContextVar(
1069  "current_platform", default=None
1070 )
1071 
1072 
1073 @callback
1074 def async_get_current_platform() -> EntityPlatform:
1075  """Get the current platform from context."""
1076  if (platform := current_platform.get()) is None:
1077  raise RuntimeError("Cannot get non-set current platform")
1078  return platform
1079 
1080 
1081 @callback
1083  hass: HomeAssistant, integration_name: str
1084 ) -> list[EntityPlatform]:
1085  """Find existing platforms."""
1086  if (
1087  DATA_ENTITY_PLATFORM not in hass.data
1088  or integration_name not in hass.data[DATA_ENTITY_PLATFORM]
1089  ):
1090  return []
1091 
1092  return hass.data[DATA_ENTITY_PLATFORM][integration_name]
None __call__(self, Iterable[Entity] new_entities, bool update_before_add=False)
None setup_platform(self, HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
None async_setup_entry(self, HomeAssistant hass, config_entries.ConfigEntry entry, AddEntitiesCallback async_add_entities)
None async_setup_platform(self, HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
None async_register_entity_service(self, str name, VolDictType|VolSchemaType|None schema, str|Callable[..., Any] func, Iterable[int]|None required_features=None, SupportsResponse supports_response=SupportsResponse.NONE)
list[Entity] async_extract_from_service(self, ServiceCall service_call, bool expand_group=True)
None _async_add_and_update_entities(self, list[Coroutine[Any, Any, None]] coros, list[Entity] entities, float timeout)
None _async_schedule_add_entities(self, Iterable[Entity] new_entities, bool update_before_add=False)
None _async_add_entities(self, list[Coroutine[Any, Any, None]] coros, list[Entity] entities, float timeout)
bool _async_setup_platform(self, Callable[[], Awaitable[None]] async_create_setup_awaitable, int tries=0)
tuple[bool, bool] _entity_id_already_exists(self, str entity_id)
None async_add_entities(self, Iterable[Entity] new_entities, bool update_before_add=False)
asyncio.Semaphore|None _get_parallel_updates_semaphore(self, bool entity_has_sync_update)
None _async_add_entity(self, Entity entity, bool update_before_add, EntityRegistry entity_registry)
dict[str, str] _async_get_translations(self, str language, str category, str integration)
None _async_schedule_add_entities_for_entry(self, Iterable[Entity] new_entities, bool update_before_add=False)
None _schedule_add_entities(self, Iterable[Entity] new_entities, bool update_before_add=False)
None add_entities(self, Iterable[Entity] new_entities, bool update_before_add=False)
None async_setup(self, ConfigType platform_config, DiscoveryInfoType|None discovery_info=None)
None __init__(self, *HomeAssistant hass, Logger logger, str domain, str platform_name, EntityPlatformModule|None platform, timedelta scan_interval, str|None entity_namespace)
bool async_setup_entry(self, config_entries.ConfigEntry config_entry)
bool remove(self, _T matcher)
Definition: match.py:214
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
bool valid_entity_id(str entity_id)
Definition: core.py:235
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
list[EntityPlatform] async_get_platforms(HomeAssistant hass, str integration_name)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597
None async_remove(HomeAssistant hass, str intent_type)
Definition: intent.py:90
Generator[None] async_start_setup(core.HomeAssistant hass, str integration, SetupPhases phase, str|None group=None)
Definition: setup.py:739