Home Assistant Unofficial Reference 2024.12.1
integration_platform.py
Go to the documentation of this file.
1 """Helpers to help with integration platforms."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Awaitable, Callable
7 from dataclasses import dataclass
8 from functools import partial
9 import logging
10 from types import ModuleType
11 from typing import Any
12 
13 from homeassistant.const import EVENT_COMPONENT_LOADED
14 from homeassistant.core import Event, HassJob, HomeAssistant, callback
15 from homeassistant.loader import (
16  Integration,
17  async_get_integrations,
18  async_get_loaded_integration,
19  async_register_preload_platform,
20  bind_hass,
21 )
22 from homeassistant.setup import ATTR_COMPONENT, EventComponentLoaded
23 from homeassistant.util.hass_dict import HassKey
24 from homeassistant.util.logging import catch_log_exception
25 
26 _LOGGER = logging.getLogger(__name__)
27 DATA_INTEGRATION_PLATFORMS: HassKey[list[IntegrationPlatform]] = HassKey(
28  "integration_platforms"
29 )
30 
31 
32 @dataclass(slots=True, frozen=True)
34  """An integration platform."""
35 
36  platform_name: str
37  process_job: HassJob[[HomeAssistant, str, Any], Awaitable[None] | None]
38  seen_components: set[str]
39 
40 
41 @callback
43  hass: HomeAssistant,
44  integration_platforms: list[IntegrationPlatform],
45  event: Event[EventComponentLoaded],
46 ) -> None:
47  """Process integration platforms for a component."""
48  if "." in (component_name := event.data[ATTR_COMPONENT]):
49  return
50 
51  integration = async_get_loaded_integration(hass, component_name)
52  # First filter out platforms that the integration already processed.
53  integration_platforms_by_name: dict[str, IntegrationPlatform] = {}
54  for integration_platform in integration_platforms:
55  if component_name in integration_platform.seen_components:
56  continue
57  integration_platform.seen_components.add(component_name)
58  integration_platforms_by_name[integration_platform.platform_name] = (
59  integration_platform
60  )
61 
62  if not integration_platforms_by_name:
63  return
64 
65  # Next, check which platforms exist for this integration.
66  platforms_that_exist = integration.platforms_exists(integration_platforms_by_name)
67  if not platforms_that_exist:
68  return
69 
70  # If everything is already loaded, we can avoid creating a task.
71  can_use_cache = True
72  platforms: dict[str, ModuleType] = {}
73  for platform_name in platforms_that_exist:
74  if platform := integration.get_platform_cached(platform_name):
75  platforms[platform_name] = platform
76  else:
77  can_use_cache = False
78  break
79 
80  if can_use_cache:
82  hass,
83  integration,
84  platforms,
85  integration_platforms_by_name,
86  )
87  return
88 
89  # At least one of the platforms is not loaded, we need to load them
90  # so we have to fall back to creating a task.
91  hass.async_create_task_internal(
93  hass, integration, platforms_that_exist, integration_platforms_by_name
94  ),
95  eager_start=True,
96  )
97 
98 
100  hass: HomeAssistant,
101  integration: Integration,
102  platforms_that_exist: list[str],
103  integration_platforms_by_name: dict[str, IntegrationPlatform],
104 ) -> None:
105  """Process integration platforms for a component."""
106  # Now we know which platforms to load, let's load them.
107  try:
108  platforms = await integration.async_get_platforms(platforms_that_exist)
109  except ImportError:
110  _LOGGER.debug(
111  "Unexpected error importing integration platforms for %s",
112  integration.domain,
113  )
114  return
115 
116  if futures := _process_integration_platforms(
117  hass,
118  integration,
119  platforms,
120  integration_platforms_by_name,
121  ):
122  await asyncio.gather(*futures)
123 
124 
125 @callback
127  hass: HomeAssistant,
128  integration: Integration,
129  platforms: dict[str, ModuleType],
130  integration_platforms_by_name: dict[str, IntegrationPlatform],
131 ) -> list[asyncio.Future[Awaitable[None] | None]]:
132  """Process integration platforms for a component.
133 
134  Only the platforms that are passed in will be processed.
135  """
136  return [
137  future
138  for platform_name, platform in platforms.items()
139  if (integration_platform := integration_platforms_by_name[platform_name])
140  and (
141  future := hass.async_run_hass_job(
142  integration_platform.process_job,
143  hass,
144  integration.domain,
145  platform,
146  )
147  )
148  ]
149 
150 
151 def _format_err(name: str, platform_name: str, *args: Any) -> str:
152  """Format error message."""
153  return f"Exception in {name} when processing platform '{platform_name}': {args}"
154 
155 
156 @bind_hass
158  hass: HomeAssistant,
159  platform_name: str,
160  # Any = platform.
161  process_platform: Callable[[HomeAssistant, str, Any], Awaitable[None] | None],
162  wait_for_platforms: bool = False,
163 ) -> None:
164  """Process a specific platform for all current and future loaded integrations."""
165  if DATA_INTEGRATION_PLATFORMS not in hass.data:
166  integration_platforms = hass.data[DATA_INTEGRATION_PLATFORMS] = []
167  hass.bus.async_listen(
168  EVENT_COMPONENT_LOADED,
169  partial(
170  _async_integration_platform_component_loaded,
171  hass,
172  integration_platforms,
173  ),
174  )
175  else:
176  integration_platforms = hass.data[DATA_INTEGRATION_PLATFORMS]
177 
178  async_register_preload_platform(hass, platform_name)
179  top_level_components = hass.config.top_level_components.copy()
180  process_job = HassJob(
181  catch_log_exception(
182  process_platform,
183  partial(_format_err, str(process_platform), platform_name),
184  ),
185  f"process_platform {platform_name}",
186  )
187  integration_platform = IntegrationPlatform(
188  platform_name, process_job, top_level_components
189  )
190  # Tell the loader that it should try to pre-load the integration
191  # for any future components that are loaded so we can reduce the
192  # amount of import executor usage.
193  async_register_preload_platform(hass, platform_name)
194  integration_platforms.append(integration_platform)
195  if not top_level_components:
196  return
197 
198  # We create a task here for two reasons:
199  #
200  # 1. We want the integration that provides the integration platform to
201  # not be delayed by waiting on each individual platform to be processed
202  # since the import or the integration platforms themselves may have to
203  # schedule I/O or executor jobs.
204  #
205  # 2. We want the behavior to be the same as if the integration that has
206  # the integration platform is loaded after the platform is processed.
207  #
208  # We use hass.async_create_task instead of asyncio.create_task because
209  # we want to make sure that startup waits for the task to complete.
210  #
211  future = hass.async_create_task_internal(
213  hass, platform_name, top_level_components.copy(), process_job
214  ),
215  eager_start=True,
216  )
217  if wait_for_platforms:
218  await future
219 
220 
222  hass: HomeAssistant,
223  platform_name: str,
224  top_level_components: set[str],
225  process_job: HassJob,
226 ) -> None:
227  """Process integration platforms for a component."""
228  integrations = await async_get_integrations(hass, top_level_components)
229  loaded_integrations: list[Integration] = [
230  integration
231  for integration in integrations.values()
232  if not isinstance(integration, Exception)
233  ]
234  # Finally, fetch the platforms for each integration and process them.
235  # This uses the import executor in a loop. If there are a lot
236  # of integration with the integration platform to process,
237  # this could be a bottleneck.
238  futures: list[asyncio.Future[None]] = []
239  for integration in loaded_integrations:
240  if not integration.platforms_exists((platform_name,)):
241  continue
242  try:
243  platform = await integration.async_get_platform(platform_name)
244  except ImportError:
245  _LOGGER.debug(
246  "Unexpected error importing %s for %s",
247  platform_name,
248  integration.domain,
249  )
250  continue
251 
252  if future := hass.async_run_hass_job(
253  process_job, hass, integration.domain, platform
254  ):
255  futures.append(future)
256 
257  if futures:
258  await asyncio.gather(*futures)
None _async_integration_platform_component_loaded(HomeAssistant hass, list[IntegrationPlatform] integration_platforms, Event[EventComponentLoaded] event)
None _async_process_integration_platforms(HomeAssistant hass, str platform_name, set[str] top_level_components, HassJob process_job)
None async_process_integration_platforms(HomeAssistant hass, str platform_name, Callable[[HomeAssistant, str, Any], Awaitable[None]|None] process_platform, bool wait_for_platforms=False)
list[asyncio.Future[Awaitable[None]|None]] _process_integration_platforms(HomeAssistant hass, Integration integration, dict[str, ModuleType] platforms, dict[str, IntegrationPlatform] integration_platforms_by_name)
str _format_err(str name, str platform_name, *Any args)
None _async_process_integration_platforms_for_component(HomeAssistant hass, Integration integration, list[str] platforms_that_exist, dict[str, IntegrationPlatform] integration_platforms_by_name)
Integration async_get_loaded_integration(HomeAssistant hass, str domain)
Definition: loader.py:1341
dict[str, Integration|Exception] async_get_integrations(HomeAssistant hass, Iterable[str] domains)
Definition: loader.py:1368
None async_register_preload_platform(HomeAssistant hass, str platform_name)
Definition: loader.py:640