1 """Class to manage the entities for a single platform."""
3 from __future__
import annotations
6 from collections.abc
import Awaitable, Callable, Coroutine, Iterable
7 from contextvars
import ContextVar
8 from datetime
import timedelta
9 from logging
import Logger, getLogger
10 from typing
import TYPE_CHECKING, Any, Protocol
12 from homeassistant
import config_entries
16 EVENT_HOMEASSISTANT_STARTED,
20 DOMAIN
as HOMEASSISTANT_DOMAIN,
30 ConfigEntryAuthFailed,
42 device_registry
as dev_reg,
43 entity_registry
as ent_reg,
47 from .entity_registry
import EntityRegistry, RegistryEntryDisabler, RegistryEntryHider
48 from .event
import async_call_later
49 from .issue_registry
import IssueSeverity, async_create_issue
50 from .typing
import UNDEFINED, ConfigType, DiscoveryInfoType, VolDictType, VolSchemaType
53 from .entity
import Entity
56 SLOW_SETUP_WARNING = 10
57 SLOW_SETUP_MAX_WAIT = 60
58 SLOW_ADD_ENTITY_MAX_WAIT = 15
59 SLOW_ADD_MIN_TIMEOUT = 500
61 PLATFORM_NOT_READY_RETRIES = 10
62 DATA_ENTITY_PLATFORM: HassKey[dict[str, list[EntityPlatform]]] =
HassKey(
65 DATA_DOMAIN_ENTITIES: HassKey[dict[str, dict[str, Entity]]] =
HassKey(
"domain_entities")
66 DATA_DOMAIN_PLATFORM_ENTITIES: HassKey[dict[tuple[str, str], dict[str, Entity]]] = (
67 HassKey(
"domain_platform_entities")
69 PLATFORM_NOT_READY_BASE_WAIT_TIME = 30
71 _LOGGER = getLogger(__name__)
75 """Protocol type for EntityPlatform.add_entities callback."""
78 self, new_entities: Iterable[Entity], update_before_add: bool =
False
80 """Define add_entities type."""
84 """Protocol type for entity platform modules."""
90 async_add_entities: AddEntitiesCallback,
91 discovery_info: DiscoveryInfoType |
None =
None,
93 """Set up an integration platform async."""
99 add_entities: AddEntitiesCallback,
100 discovery_info: DiscoveryInfoType |
None =
None,
102 """Set up an integration platform."""
108 async_add_entities: AddEntitiesCallback,
110 """Set up an integration platform from a config entry."""
114 """Manage the entities for a single platform.
116 An example of an entity platform is 'hue.light', which is managed by
117 the entity component 'light'.
127 platform: EntityPlatformModule |
None,
128 scan_interval: timedelta,
129 entity_namespace: str |
None,
131 """Initialize the entity platform."""
143 self.entities: dict[str, Entity] = {}
149 self._tasks: list[asyncio.Task[
None]] = []
170 DATA_DOMAIN_ENTITIES, {}
171 ).setdefault(domain, {})
177 key = (domain, platform_name)
179 DATA_DOMAIN_PLATFORM_ENTITIES, {}
180 ).setdefault(key, {})
183 """Represent an EntityPlatform."""
186 f
"domain={self.domain} "
187 f
"platform_name={self.platform_name} "
188 f
"config_entry={self.config_entry}>"
193 self, entity_has_sync_update: bool
194 ) -> asyncio.Semaphore |
None:
195 """Get or create a semaphore for parallel updates.
197 Semaphore will be created on demand because we base it off if update
198 method is async or not.
200 - If parallel updates is set to 0, we skip the semaphore.
201 - If parallel updates is set to a number, we initialize the semaphore
204 The default value for parallel requests is decided based on the first
205 entity of the platform which is added to Home Assistant. It's 1 if the
206 entity implements the update method, else it's 0.
213 parallel_updates = getattr(self.
platformplatform,
"PARALLEL_UPDATES",
None)
215 if parallel_updates
is None and entity_has_sync_update:
218 if parallel_updates == 0:
219 parallel_updates =
None
221 if parallel_updates
is not None:
229 platform_config: ConfigType,
230 discovery_info: DiscoveryInfoType |
None =
None,
232 """Set up the platform from a config file."""
236 if not hasattr(platform,
"async_setup_platform")
and not hasattr(
237 platform,
"setup_platform"
241 "The %s platform for the %s integration does not support platform"
242 " setup. Please remove it from your config."
247 learn_more_url =
None
248 if self.
platformplatform
and "custom_components" not in self.
platformplatform.__file__:
250 f
"https://www.home-assistant.io/integrations/{self.platform_name}/"
252 platform_key = f
"platform: {self.platform_name}"
253 yaml_example = f
"```yaml\n{self.domain}:\n - {platform_key}\n```"
256 HOMEASSISTANT_DOMAIN,
257 f
"platform_integration_no_support_{self.domain}_{self.platform_name}",
260 learn_more_url=learn_more_url,
261 severity=IssueSeverity.ERROR,
262 translation_key=
"no_platform_setup",
263 translation_placeholders={
264 "domain": self.
domaindomain,
266 "platform_key": platform_key,
267 "yaml_example": yaml_example,
274 def async_create_setup_awaitable() -> (
275 Coroutine[Any, Any,
None] | asyncio.Future[
None]
277 """Get task to set up platform."""
278 if getattr(platform,
"async_setup_platform",
None):
279 return platform.async_setup_platform(
288 return hass.loop.run_in_executor(
290 platform.setup_platform,
300 group=
str(id(platform_config)),
301 phase=SetupPhases.PLATFORM_SETUP,
307 """Call when Home Assistant is stopping."""
313 """Cancel retry setup."""
319 """Set up the platform from a config entry."""
325 def async_create_setup_awaitable() -> Coroutine[Any, Any, None]:
326 """Get task to set up platform."""
327 config_entries.current_entry.set(config_entry)
329 return platform.async_setup_entry(
337 async_create_setup_awaitable: Callable[[], Awaitable[
None]],
340 """Set up a platform via config file or config entry.
342 async_create_setup_awaitable creates an awaitable that sets up platform.
344 current_platform.set(self)
345 logger = self.
loggerlogger
347 full_name = f
"{self.platform_name}.{self.domain}"
351 logger.info(
"Setting up %s", full_name)
352 warn_task = hass.loop.call_at(
353 hass.loop.time() + SLOW_SETUP_WARNING,
355 "Setup of %s platform %s is taking over %s seconds.",
361 awaitable = async_create_setup_awaitable()
362 if asyncio.iscoroutine(awaitable):
363 awaitable = create_eager_task(awaitable, loop=hass.loop)
365 async
with hass.timeout.async_timeout(SLOW_SETUP_MAX_WAIT, self.
domaindomain):
366 await asyncio.shield(awaitable)
372 pending = self._tasks.copy()
374 await asyncio.gather(*pending)
375 except PlatformNotReady
as ex:
377 wait_time =
min(tries, 6) * PLATFORM_NOT_READY_BASE_WAIT_TIME
379 ready_message = f
"ready yet: {message}" if message
else "ready yet"
382 "Platform %s not %s; Retrying in background in %d seconds",
389 "Platform %s not %s; Retrying in %d seconds",
395 async
def setup_again(*_args: Any) ->
None:
396 """Run setup again."""
400 if hass.state
is CoreState.running:
402 hass, wait_time, setup_again
406 EVENT_HOMEASSISTANT_STARTED, setup_again
412 "Setup of platform %s is taking longer than %s seconds."
413 " Startup will proceed without waiting any longer."
419 except (ConfigEntryNotReady, ConfigEntryAuthFailed, ConfigEntryError)
as exc:
421 "%s raises exception %s in forwarded platform "
422 "%s; Instead raise %s before calling async_forward_entry_setups",
431 "Error while setting up %s platform for %s",
437 hass.config.components.add(full_name)
444 self, language: str, category: str, integration: str
446 """Get translations for a language, category, and integration."""
448 return await translation.async_get_translations(
449 self.
hasshass, language, category, {integration}
451 except Exception
as err:
453 "Could not load translations for %s",
460 """Load translations."""
462 object_id_language = (
464 if hass.config.language
in languages.NATIVE_ENTITY_IDS
465 else languages.DEFAULT_LANGUAGE
467 config_language = hass.config.language
469 config_language,
"entity_component", self.
domaindomain
474 if object_id_language == config_language:
479 object_id_language,
"entity_component", self.
domaindomain
482 object_id_language,
"entity", self.
platform_nameplatform_name
484 if config_language == languages.DEFAULT_LANGUAGE:
489 languages.DEFAULT_LANGUAGE,
"entity", self.
platform_nameplatform_name
494 self, new_entities: Iterable[Entity], update_before_add: bool =
False
496 """Schedule adding entities for a single platform, synchronously."""
497 self.
hasshass.loop.call_soon_threadsafe(
505 self, new_entities: Iterable[Entity], update_before_add: bool =
False
507 """Schedule adding entities for a single platform async."""
508 task = self.
hasshass.async_create_task_internal(
509 self.
async_add_entitiesasync_add_entities(new_entities, update_before_add=update_before_add),
510 f
"EntityPlatform async_add_entities {self.domain}.{self.platform_name}",
515 self._tasks.append(task)
519 self, new_entities: Iterable[Entity], update_before_add: bool =
False
521 """Schedule adding entities for a single platform async and track the task."""
525 self.
async_add_entitiesasync_add_entities(new_entities, update_before_add=update_before_add),
526 f
"EntityPlatform async_add_entities_for_entry {self.domain}.{self.platform_name}",
531 self._tasks.append(task)
534 self, new_entities: Iterable[Entity], update_before_add: bool =
False
536 """Add entities for a single platform."""
538 if update_before_add:
539 self.
loggerlogger.warning(
540 "Call 'add_entities' with update_before_add=True "
541 "only inside tests or you can run into a deadlock!"
544 asyncio.run_coroutine_threadsafe(
551 coros: list[Coroutine[Any, Any,
None]],
552 entities: list[Entity],
555 """Add entities for a single platform and update them.
557 Since we are updating the entities before adding them, we need to
558 schedule the coroutines as tasks so we can await them in the event
559 loop. This is because the update is likely to yield control to the
560 event loop and will finish faster if we run them concurrently.
562 results: list[BaseException |
None] |
None =
None
563 tasks = [create_eager_task(coro, loop=self.
hasshass.loop)
for coro
in coros]
565 async
with self.
hasshass.timeout.async_timeout(timeout, self.
domaindomain):
566 results = await asyncio.gather(*tasks, return_exceptions=
True)
568 self.
loggerlogger.warning(
569 "Timed out adding entities for domain %s with platform %s after %ds",
578 for idx, result
in enumerate(results):
579 if isinstance(result, Exception):
580 entity = entities[idx]
581 self.
loggerlogger.exception(
582 "Error adding entity %s for domain %s with platform %s",
588 elif isinstance(result, BaseException):
593 coros: list[Coroutine[Any, Any,
None]],
594 entities: list[Entity],
597 """Add entities for a single platform without updating.
599 In this case we are not updating the entities before adding them
600 which means it is likely that we will not have to yield control
601 to the event loop so we can await the coros directly without
602 scheduling them as tasks.
605 async
with self.
hasshass.timeout.async_timeout(timeout, self.
domaindomain):
606 for idx, coro
in enumerate(coros):
609 except Exception
as ex:
610 entity = entities[idx]
611 self.
loggerlogger.exception(
612 "Error adding entity %s for domain %s with platform %s",
619 self.
loggerlogger.warning(
620 "Timed out adding entities for domain %s with platform %s after %ds",
627 self, new_entities: Iterable[Entity], update_before_add: bool =
False
629 """Add entities for a single platform async.
631 This method must be run in the event loop.
638 entity_registry = ent_reg.async_get(hass)
639 coros: list[Coroutine[Any, Any,
None]] = []
640 entities: list[Entity] = []
641 for entity
in new_entities:
643 self.
_async_add_entity_async_add_entity(entity, update_before_add, entity_registry)
645 entities.append(entity)
651 timeout =
max(SLOW_ADD_ENTITY_MAX_WAIT * len(coros), SLOW_ADD_MIN_TIMEOUT)
652 if update_before_add:
657 await add_func(coros, entities, timeout)
668 and entity.entity_id
in self.entities
669 and entity.should_poll
670 for entity
in entities
682 """Update all the entity states in a single platform."""
688 self.
config_entryconfig_entry.async_create_background_task(
691 name=f
"EntityPlatform poll {self.domain}.{self.platform_name}",
695 self.
hasshass.async_create_background_task(
697 name=f
"EntityPlatform poll {self.domain}.{self.platform_name}",
702 """Check if an entity_id already exists.
704 Returns a tuple [already_exists, restored]
706 already_exists = entity_id
in self.entities
709 if not already_exists
and not self.
hasshass.states.async_available(entity_id):
710 existing = self.
hasshass.states.get(entity_id)
711 if existing
is not None and ATTR_RESTORED
in existing.attributes:
714 already_exists =
True
715 return (already_exists, restored)
720 update_before_add: bool,
721 entity_registry: EntityRegistry,
723 """Add an entity to the platform."""
725 raise ValueError(
"Entity cannot be None")
727 entity.add_to_platform_start(
735 if update_before_add:
737 await entity.async_device_update(warning=
False)
739 self.
loggerlogger.exception(
"%s: Error on device update!", self.
platform_nameplatform_name)
740 entity.add_to_platform_abort()
743 suggested_object_id: str |
None =
None
745 entity_name = entity.name
746 if entity_name
is UNDEFINED:
750 if entity.unique_id
is not None:
751 registered_entity_id = entity_registry.async_get_entity_id(
754 if registered_entity_id:
759 entity.registry_entry =
None
761 f
"Platform {self.platform_name} does not generate unique IDs. "
765 f
"ID {entity.unique_id} is already used by"
766 f
" {registered_entity_id} - ignoring {entity.entity_id}"
770 f
"ID {entity.unique_id} already exists - ignoring"
771 f
" {registered_entity_id}"
773 self.
loggerlogger.error(msg)
774 entity.add_to_platform_abort()
777 if self.
config_entryconfig_entry
and (device_info := entity.device_info):
779 device = dev_reg.async_get(self.
hasshass).async_get_or_create(
783 except dev_reg.DeviceInfoError
as exc:
785 "%s: Not adding entity with invalid device info: %s",
789 entity.add_to_platform_abort()
795 suggested_entity_id: str |
None = entity.entity_id
796 if suggested_entity_id
is not None:
799 if device
and entity.has_entity_name:
800 device_name = device.name_by_user
or device.name
801 if entity.use_device_name:
802 suggested_object_id = device_name
804 suggested_object_id = (
805 f
"{device_name} {entity.suggested_object_id}"
807 if not suggested_object_id:
808 suggested_object_id = entity.suggested_object_id
811 suggested_object_id = f
"{self.entity_namespace} {suggested_object_id}"
813 disabled_by: RegistryEntryDisabler |
None =
None
814 if not entity.entity_registry_enabled_default:
815 disabled_by = RegistryEntryDisabler.INTEGRATION
817 hidden_by: RegistryEntryHider |
None =
None
818 if not entity.entity_registry_visible_default:
819 hidden_by = RegistryEntryHider.INTEGRATION
821 entry = entity_registry.async_get_or_create(
825 capabilities=entity.capability_attributes,
827 device_id=device.id
if device
else None,
828 disabled_by=disabled_by,
829 entity_category=entity.entity_category,
830 get_initial_options=entity.get_initial_entity_options,
831 has_entity_name=entity.has_entity_name,
833 known_object_ids=self.entities,
834 original_device_class=entity.device_class,
835 original_icon=entity.icon,
836 original_name=entity_name,
837 suggested_object_id=suggested_object_id,
838 supported_features=entity.supported_features,
839 translation_key=entity.translation_key,
840 unit_of_measurement=entity.unit_of_measurement,
843 if device
and device.disabled
and not entry.disabled:
844 entry = entity_registry.async_update_entity(
845 entry.entity_id, disabled_by=RegistryEntryDisabler.DEVICE
848 entity.registry_entry = entry
850 entity.device_entry = device
851 entity.entity_id = entry.entity_id
854 generate_new_entity_id =
False
857 if entity.entity_id
is not None and entity_registry.async_is_registered(
862 generate_new_entity_id =
True
865 if entity.entity_id
is None or generate_new_entity_id:
866 suggested_object_id = (
868 or entity.suggested_object_id
869 or DEVICE_DEFAULT_NAME
873 suggested_object_id = (
874 f
"{self.entity_namespace} {suggested_object_id}"
876 entity.entity_id = entity_registry.async_generate_entity_id(
877 self.
domaindomain, suggested_object_id, self.entities
884 entity.add_to_platform_abort()
891 "Entity id already exists - ignoring: %s", entity.entity_id
893 entity.add_to_platform_abort()
896 if entity.registry_entry
and entity.registry_entry.disabled:
898 "Not adding entity %s because it's disabled",
901 or f
'"{self.platform_name} {entity.unique_id}"',
903 entity.add_to_platform_abort()
906 entity_id = entity.entity_id
907 self.entities[entity_id] = entity
917 self.
hasshass.states.async_reserve(entity.entity_id)
919 def remove_entity_cb() -> None:
920 """Remove entity from entities dict."""
921 del self.entities[entity_id]
925 entity.async_on_remove(remove_entity_cb)
927 await entity.add_to_platform_finish()
930 """Remove all entities and reset data.
932 This method must be run in the event loop.
936 if not self.entities:
943 for entity
in list(self.entities.values()):
945 await entity.async_remove()
947 self.
loggerlogger.exception(
948 "Error while removing entity %s", entity.entity_id
963 """Register the entity platform in DATA_ENTITY_PLATFORM."""
964 self.
hasshass.data.setdefault(DATA_ENTITY_PLATFORM, {}).setdefault(
969 """Destroy an entity platform.
971 Call before discarding the object.
977 """Remove entity id from platform."""
982 entity.should_poll
for entity
in self.entities.values()
987 self, service_call: ServiceCall, expand_group: bool =
True
989 """Extract all known and available entities from a service call.
991 Will return an empty list if entities specified but unknown.
993 This method must be run in the event loop.
995 return await service.async_extract_entities(
996 self.
hasshass, self.entities.values(), service_call, expand_group
1003 schema: VolDictType | VolSchemaType |
None,
1004 func: str | Callable[..., Any],
1005 required_features: Iterable[int] |
None =
None,
1006 supports_response: SupportsResponse = SupportsResponse.NONE,
1008 """Register an entity service.
1010 Services will automatically be shared by all platforms of the same domain.
1015 service.async_register_entity_service(
1022 required_features=required_features,
1024 supports_response=supports_response,
1028 """Update the states of all the polling entities.
1030 To protect from flooding the executor, we will update async entities
1031 in parallel and other entities sequential.
1033 This method must be run in the event loop.
1038 self.
loggerlogger.warning(
1039 "Updating %s %s took longer than the scheduled update interval %s",
1050 for entity
in list(self.entities.values()):
1054 if entity.should_poll
and entity.hass:
1055 await entity.async_update_ha_state(
True)
1060 entity.async_update_ha_state(
True), loop=self.
hasshass.loop
1062 for entity
in self.entities.values()
1063 if entity.should_poll
1065 await asyncio.gather(*tasks)
1068 current_platform: ContextVar[EntityPlatform |
None] = ContextVar(
1069 "current_platform", default=
None
1075 """Get the current platform from context."""
1076 if (platform := current_platform.get())
is None:
1077 raise RuntimeError(
"Cannot get non-set current platform")
1083 hass: HomeAssistant, integration_name: str
1084 ) -> list[EntityPlatform]:
1085 """Find existing platforms."""
1087 DATA_ENTITY_PLATFORM
not in hass.data
1088 or integration_name
not in hass.data[DATA_ENTITY_PLATFORM]
1092 return hass.data[DATA_ENTITY_PLATFORM][integration_name]
bool remove(self, _T matcher)
None async_create_issue(HomeAssistant hass, str entry_id)
bool valid_entity_id(str entity_id)
tuple[str, str] split_entity_id(str entity_id)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
None async_remove(HomeAssistant hass, str intent_type)
Generator[None] async_start_setup(core.HomeAssistant hass, str integration, SetupPhases phase, str|None group=None)