Home Assistant Unofficial Reference 2024.12.1
requirements.py
Go to the documentation of this file.
1 """Module to handle installing requirements."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Iterable
7 import contextlib
8 import logging
9 import os
10 from typing import Any
11 
12 from packaging.requirements import Requirement
13 
14 from .core import HomeAssistant, callback
15 from .exceptions import HomeAssistantError
16 from .helpers import singleton
17 from .loader import Integration, IntegrationNotFound, async_get_integration
18 from .util import package as pkg_util
19 
20 # The default is too low when the internet connection is satellite or high latency
21 PIP_TIMEOUT = 60
22 MAX_INSTALL_FAILURES = 3
23 DATA_REQUIREMENTS_MANAGER = "requirements_manager"
24 CONSTRAINT_FILE = "package_constraints.txt"
25 DISCOVERY_INTEGRATIONS: dict[str, Iterable[str]] = {
26  "dhcp": ("dhcp",),
27  "mqtt": ("mqtt",),
28  "ssdp": ("ssdp",),
29  "zeroconf": ("zeroconf", "homekit"),
30 }
31 _LOGGER = logging.getLogger(__name__)
32 
33 
35  """Raised when a component is not found."""
36 
37  def __init__(self, domain: str, requirements: list[str]) -> None:
38  """Initialize a component not found error."""
39  super().__init__(f"Requirements for {domain} not found: {requirements}.")
40  self.domaindomain = domain
41  self.requirementsrequirements = requirements
42 
43 
45  hass: HomeAssistant, domain: str
46 ) -> Integration:
47  """Get an integration with all requirements installed, including the dependencies.
48 
49  This can raise IntegrationNotFound if manifest or integration
50  is invalid, RequirementNotFound if there was some type of
51  failure to install requirements.
52  """
53  manager = _async_get_manager(hass)
54  return await manager.async_get_integration_with_requirements(domain)
55 
56 
58  hass: HomeAssistant, name: str, requirements: list[str]
59 ) -> None:
60  """Install the requirements for a component or platform.
61 
62  This method is a coroutine. It will raise RequirementsNotFound
63  if an requirement can't be satisfied.
64  """
65  await _async_get_manager(hass).async_process_requirements(name, requirements)
66 
67 
69  hass: HomeAssistant, requirements: set[str]
70 ) -> None:
71  """Load the installed version of requirements."""
72  await _async_get_manager(hass).async_load_installed_versions(requirements)
73 
74 
75 @callback
76 @singleton.singleton(DATA_REQUIREMENTS_MANAGER)
77 def _async_get_manager(hass: HomeAssistant) -> RequirementsManager:
78  """Get the requirements manager."""
79  return RequirementsManager(hass)
80 
81 
82 @callback
83 def async_clear_install_history(hass: HomeAssistant) -> None:
84  """Forget the install history."""
85  _async_get_manager(hass).install_failure_history.clear()
86 
87 
88 def pip_kwargs(config_dir: str | None) -> dict[str, Any]:
89  """Return keyword arguments for PIP install."""
90  is_docker = pkg_util.is_docker_env()
91  kwargs = {
92  "constraints": os.path.join(os.path.dirname(__file__), CONSTRAINT_FILE),
93  "timeout": PIP_TIMEOUT,
94  }
95  if not (config_dir is None or pkg_util.is_virtual_env()) and not is_docker:
96  kwargs["target"] = os.path.join(config_dir, "deps")
97  return kwargs
98 
99 
100 def _install_with_retry(requirement: str, kwargs: dict[str, Any]) -> bool:
101  """Try to install a package up to MAX_INSTALL_FAILURES times."""
102  for _ in range(MAX_INSTALL_FAILURES):
103  if pkg_util.install_package(requirement, **kwargs):
104  return True
105  return False
106 
107 
109  requirements: list[str], kwargs: dict[str, Any]
110 ) -> tuple[set[str], set[str]]:
111  """Install requirements if missing."""
112  installed: set[str] = set()
113  failures: set[str] = set()
114  for req in requirements:
115  if pkg_util.is_installed(req) or _install_with_retry(req, kwargs):
116  installed.add(req)
117  continue
118  failures.add(req)
119  return installed, failures
120 
121 
123  """Manage requirements."""
124 
125  def __init__(self, hass: HomeAssistant) -> None:
126  """Init the requirements manager."""
127  self.hasshass = hass
128  self.pip_lockpip_lock = asyncio.Lock()
129  self.integrations_with_reqs: dict[
130  str, Integration | asyncio.Future[Integration]
131  ] = {}
132  self.install_failure_history: set[str] = set()
133  self.is_installed_cache: set[str] = set()
134 
136  self, domain: str, done: set[str] | None = None
137  ) -> Integration:
138  """Get an integration with all requirements installed, including dependencies.
139 
140  This can raise IntegrationNotFound if manifest or integration
141  is invalid, RequirementNotFound if there was some type of
142  failure to install requirements.
143  """
144  if done is None:
145  done = {domain}
146  else:
147  done.add(domain)
148 
149  cache = self.integrations_with_reqs
150  if int_or_fut := cache.get(domain):
151  if isinstance(int_or_fut, Integration):
152  return int_or_fut
153  return await int_or_fut
154 
155  future = cache[domain] = self.hasshass.loop.create_future()
156  try:
157  integration = await async_get_integration(self.hasshass, domain)
158  if not self.hasshass.config.skip_pip:
159  await self._async_process_integration_async_process_integration(integration, done)
160  except BaseException as ex:
161  # We do not cache failures as we want to retry, or
162  # else people can't fix it and then restart, because
163  # their config will never be valid.
164  del cache[domain]
165  future.set_exception(ex)
166  with contextlib.suppress(BaseException):
167  # Clear the flag as its normal that nothing
168  # will wait for this future to be resolved
169  # if there are no concurrent requirements fetches.
170  await future
171  raise
172 
173  cache[domain] = integration
174  future.set_result(integration)
175  return integration
176 
178  self, integration: Integration, done: set[str]
179  ) -> None:
180  """Process an integration and requirements."""
181  if integration.requirements:
182  await self.async_process_requirementsasync_process_requirements(
183  integration.domain, integration.requirements
184  )
185 
186  cache = self.integrations_with_reqs
187 
188  deps_to_check = {
189  dep
190  for dep in integration.dependencies + integration.after_dependencies
191  if dep not in done
192  # If the dep is in the cache and it's an Integration
193  # it's already been checked for the requirements and we should
194  # not check it again.
195  and (
196  not (cached_integration := cache.get(dep))
197  or type(cached_integration) is not Integration
198  )
199  }
200 
201  for check_domain, to_check in DISCOVERY_INTEGRATIONS.items():
202  if (
203  check_domain not in done
204  and check_domain not in deps_to_check
205  # If the integration is in the cache and it's an Integration
206  # it's already been checked for the requirements and we should
207  # not check it again.
208  and (
209  not (cached_integration := cache.get(check_domain))
210  or type(cached_integration) is not Integration
211  )
212  and any(check in integration.manifest for check in to_check)
213  ):
214  deps_to_check.add(check_domain)
215 
216  if not deps_to_check:
217  return
218 
219  exceptions: list[Exception] = []
220  # We don't create tasks here since everything waits for the pip lock
221  # anyways and we want to make sure we don't start a bunch of tasks
222  # that will just wait for the lock.
223  for dep in deps_to_check:
224  # We want all the async_get_integration_with_requirements calls to
225  # happen even if one fails. So we catch the exception and store it
226  # to raise the first one after all are done to behave like asyncio
227  # gather.
228  try:
229  await self.async_get_integration_with_requirementsasync_get_integration_with_requirements(dep, done)
230  except IntegrationNotFound as ex:
231  if (
232  integration.is_built_in
233  or ex.domain not in integration.after_dependencies
234  ):
235  exceptions.append(ex)
236  except Exception as ex: # noqa: BLE001
237  exceptions.insert(0, ex)
238 
239  if exceptions:
240  raise exceptions[0]
241 
243  self, name: str, requirements: list[str]
244  ) -> None:
245  """Install the requirements for a component or platform.
246 
247  This method is a coroutine. It will raise RequirementsNotFound
248  if an requirement can't be satisfied.
249  """
250  if self.hasshass.config.skip_pip_packages:
251  skipped_requirements = {
252  req
253  for req in requirements
254  if Requirement(req).name in self.hasshass.config.skip_pip_packages
255  }
256 
257  for req in skipped_requirements:
258  _LOGGER.warning("Skipping requirement %s. This may cause issues", req)
259 
260  requirements = [r for r in requirements if r not in skipped_requirements]
261 
262  if not (missing := self._find_missing_requirements_find_missing_requirements(requirements)):
263  return
264  self._raise_for_failed_requirements_raise_for_failed_requirements(name, missing)
265 
266  async with self.pip_lockpip_lock:
267  # Recalculate missing again now that we have the lock
268  if missing := self._find_missing_requirements_find_missing_requirements(requirements):
269  await self._async_process_requirements_async_process_requirements(name, missing)
270 
271  def _find_missing_requirements(self, requirements: list[str]) -> list[str]:
272  """Find requirements that are missing in the cache."""
273  return [req for req in requirements if req not in self.is_installed_cache]
274 
276  self, integration: str, missing: list[str]
277  ) -> None:
278  """Raise for failed installing integration requirements.
279 
280  Raise RequirementsNotFound so we do not keep trying requirements
281  that have already failed.
282  """
283  for req in missing:
284  if req in self.install_failure_history:
285  _LOGGER.info(
286  (
287  "Multiple attempts to install %s failed, install will be"
288  " retried after next configuration check or restart"
289  ),
290  req,
291  )
292  raise RequirementsNotFound(integration, [req])
293 
295  self,
296  name: str,
297  requirements: list[str],
298  ) -> None:
299  """Install a requirement and save failures."""
300  kwargs = pip_kwargs(self.hasshass.config.config_dir)
301  installed, failures = await self.hasshass.async_add_executor_job(
302  _install_requirements_if_missing, requirements, kwargs
303  )
304  self.is_installed_cache |= installed
305  self.install_failure_history |= failures
306  if failures:
307  raise RequirementsNotFound(name, list(failures))
308 
310  self,
311  requirements: set[str],
312  ) -> None:
313  """Load the installed version of requirements."""
314  if not (requirements_to_check := requirements - self.is_installed_cache):
315  return
316 
317  self.is_installed_cache |= await self.hasshass.async_add_executor_job(
318  pkg_util.get_installed_versions, requirements_to_check
319  )
None _raise_for_failed_requirements(self, str integration, list[str] missing)
list[str] _find_missing_requirements(self, list[str] requirements)
None _async_process_integration(self, Integration integration, set[str] done)
Integration async_get_integration_with_requirements(self, str domain, set[str]|None done=None)
None __init__(self, HomeAssistant hass)
None async_load_installed_versions(self, set[str] requirements)
None _async_process_requirements(self, str name, list[str] requirements)
None async_process_requirements(self, str name, list[str] requirements)
None __init__(self, str domain, list[str] requirements)
Definition: requirements.py:37
Integration async_get_integration(HomeAssistant hass, str domain)
Definition: loader.py:1354
None async_clear_install_history(HomeAssistant hass)
Definition: requirements.py:83
None async_load_installed_versions(HomeAssistant hass, set[str] requirements)
Definition: requirements.py:70
RequirementsManager _async_get_manager(HomeAssistant hass)
Definition: requirements.py:77
None async_process_requirements(HomeAssistant hass, str name, list[str] requirements)
Definition: requirements.py:59
tuple[set[str], set[str]] _install_requirements_if_missing(list[str] requirements, dict[str, Any] kwargs)
dict[str, Any] pip_kwargs(str|None config_dir)
Definition: requirements.py:88
Integration async_get_integration_with_requirements(HomeAssistant hass, str domain)
Definition: requirements.py:46
bool _install_with_retry(str requirement, dict[str, Any] kwargs)