Home Assistant Unofficial Reference 2024.12.1
frame.py
Go to the documentation of this file.
1 """Provide frame helper for finding the current frame context."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable
7 from dataclasses import dataclass
8 import enum
9 import functools
10 import linecache
11 import logging
12 import sys
13 from types import FrameType
14 from typing import Any, cast
15 
16 from propcache import cached_property
17 
18 from homeassistant.core import HomeAssistant, async_get_hass_or_none
19 from homeassistant.exceptions import HomeAssistantError
20 from homeassistant.loader import (
21  Integration,
22  async_get_issue_integration,
23  async_suggest_report_issue,
24 )
25 
26 _LOGGER = logging.getLogger(__name__)
27 
28 # Keep track of integrations already reported to prevent flooding
29 _REPORTED_INTEGRATIONS: set[str] = set()
30 
31 
32 @dataclass(kw_only=True)
34  """Integration frame container."""
35 
36  custom_integration: bool
37  integration: str
38  module: str | None
39  relative_filename: str
40  frame: FrameType
41 
42  @cached_property
43  def line_number(self) -> int:
44  """Return the line number of the frame."""
45  return self.frame.f_lineno
46 
47  @cached_property
48  def filename(self) -> str:
49  """Return the filename of the frame."""
50  return self.frame.f_code.co_filename
51 
52  @cached_property
53  def line(self) -> str:
54  """Return the line of the frame."""
55  return (linecache.getline(self.filenamefilename, self.line_numberline_number) or "?").strip()
56 
57 
58 def get_integration_logger(fallback_name: str) -> logging.Logger:
59  """Return a logger by checking the current integration frame.
60 
61  If Python is unable to access the sources files, the call stack frame
62  will be missing information, so let's guard by requiring a fallback name.
63  https://github.com/home-assistant/core/issues/24982
64  """
65  try:
66  integration_frame = get_integration_frame()
67  except MissingIntegrationFrame:
68  return logging.getLogger(fallback_name)
69 
70  if integration_frame.custom_integration:
71  logger_name = f"custom_components.{integration_frame.integration}"
72  else:
73  logger_name = f"homeassistant.components.{integration_frame.integration}"
74 
75  return logging.getLogger(logger_name)
76 
77 
78 def get_current_frame(depth: int = 0) -> FrameType:
79  """Return the current frame."""
80  # Add one to depth since get_current_frame is included
81  return sys._getframe(depth + 1) # noqa: SLF001
82 
83 
84 def get_integration_frame(exclude_integrations: set | None = None) -> IntegrationFrame:
85  """Return the frame, integration and integration path of the current stack frame."""
86  found_frame = None
87  if not exclude_integrations:
88  exclude_integrations = set()
89 
90  frame: FrameType | None = get_current_frame()
91  while frame is not None:
92  filename = frame.f_code.co_filename
93 
94  for path in ("custom_components/", "homeassistant/components/"):
95  try:
96  index = filename.index(path)
97  start = index + len(path)
98  end = filename.index("/", start)
99  integration = filename[start:end]
100  if integration not in exclude_integrations:
101  found_frame = frame
102 
103  break
104  except ValueError:
105  continue
106 
107  if found_frame is not None:
108  break
109 
110  frame = frame.f_back
111 
112  if found_frame is None:
113  raise MissingIntegrationFrame
114 
115  found_module: str | None = None
116  for module, module_obj in dict(sys.modules).items():
117  if not hasattr(module_obj, "__file__"):
118  continue
119  if module_obj.__file__ == found_frame.f_code.co_filename:
120  found_module = module
121  break
122 
123  return IntegrationFrame(
124  custom_integration=path == "custom_components/",
125  integration=integration,
126  module=found_module,
127  relative_filename=found_frame.f_code.co_filename[index:],
128  frame=found_frame,
129  )
130 
131 
133  """Raised when no integration is found in the frame."""
134 
135 
136 def report(
137  what: str,
138  *,
139  exclude_integrations: set[str] | None = None,
140  error_if_core: bool = True,
141  error_if_integration: bool = False,
142  level: int = logging.WARNING,
143  log_custom_component_only: bool = False,
144 ) -> None:
145  """Report incorrect usage.
146 
147  If error_if_core is True, raise instead of log if an integration is not found
148  when unwinding the stack frame.
149  If error_if_integration is True, raise instead of log if an integration is found
150  when unwinding the stack frame.
151  """
152  core_behavior = ReportBehavior.ERROR if error_if_core else ReportBehavior.LOG
153  core_integration_behavior = (
154  ReportBehavior.ERROR if error_if_integration else ReportBehavior.LOG
155  )
156  custom_integration_behavior = core_integration_behavior
157 
158  if log_custom_component_only:
159  if core_behavior is ReportBehavior.LOG:
160  core_behavior = ReportBehavior.IGNORE
161  if core_integration_behavior is ReportBehavior.LOG:
162  core_integration_behavior = ReportBehavior.IGNORE
163 
164  report_usage(
165  what,
166  core_behavior=core_behavior,
167  core_integration_behavior=core_integration_behavior,
168  custom_integration_behavior=custom_integration_behavior,
169  exclude_integrations=exclude_integrations,
170  level=level,
171  )
172 
173 
174 class ReportBehavior(enum.Enum):
175  """Enum for behavior on code usage."""
176 
177  IGNORE = enum.auto()
178  """Ignore the code usage."""
179  LOG = enum.auto()
180  """Log the code usage."""
181  ERROR = enum.auto()
182  """Raise an error on code usage."""
183 
184 
186  what: str,
187  *,
188  breaks_in_ha_version: str | None = None,
189  core_behavior: ReportBehavior = ReportBehavior.ERROR,
190  core_integration_behavior: ReportBehavior = ReportBehavior.LOG,
191  custom_integration_behavior: ReportBehavior = ReportBehavior.LOG,
192  exclude_integrations: set[str] | None = None,
193  integration_domain: str | None = None,
194  level: int = logging.WARNING,
195 ) -> None:
196  """Report incorrect code usage.
197 
198  :param what: will be wrapped with "Detected that integration 'integration' {what}.
199  Please create a bug report at https://..."
200  :param breaks_in_ha_version: if set, the report will be adjusted to specify the
201  breaking version
202  :param exclude_integrations: skip specified integration when reviewing the stack.
203  If no integration is found, the core behavior will be applied
204  :param integration_domain: fallback for identifying the integration if the
205  frame is not found
206  """
207  try:
208  integration_frame = get_integration_frame(
209  exclude_integrations=exclude_integrations
210  )
211  except MissingIntegrationFrame as err:
212  if integration := async_get_issue_integration(
213  hass := async_get_hass_or_none(), integration_domain
214  ):
216  hass,
217  what,
218  breaks_in_ha_version,
219  integration,
220  core_integration_behavior,
221  custom_integration_behavior,
222  level,
223  )
224  return
225  msg = f"Detected code that {what}. Please report this issue"
226  if core_behavior is ReportBehavior.ERROR:
227  raise RuntimeError(msg) from err
228  if core_behavior is ReportBehavior.LOG:
229  if breaks_in_ha_version:
230  msg = (
231  f"Detected code that {what}. This will stop working in Home "
232  f"Assistant {breaks_in_ha_version}, please report this issue"
233  )
234  _LOGGER.warning(msg, stack_info=True)
235  return
236 
237  integration_behavior = core_integration_behavior
238  if integration_frame.custom_integration:
239  integration_behavior = custom_integration_behavior
240 
241  if integration_behavior is not ReportBehavior.IGNORE:
243  what,
244  breaks_in_ha_version,
245  integration_frame,
246  level,
247  integration_behavior is ReportBehavior.ERROR,
248  )
249 
250 
252  hass: HomeAssistant | None,
253  what: str,
254  breaks_in_ha_version: str | None,
255  integration: Integration,
256  core_integration_behavior: ReportBehavior,
257  custom_integration_behavior: ReportBehavior,
258  level: int,
259 ) -> None:
260  """Report incorrect usage in an integration (identified via domain).
261 
262  Async friendly.
263  """
264  integration_behavior = core_integration_behavior
265  if not integration.is_built_in:
266  integration_behavior = custom_integration_behavior
267 
268  if integration_behavior is ReportBehavior.IGNORE:
269  return
270 
271  # Keep track of integrations already reported to prevent flooding
272  key = f"{integration.domain}:{what}"
273  if (
274  integration_behavior is not ReportBehavior.ERROR
275  and key in _REPORTED_INTEGRATIONS
276  ):
277  return
278  _REPORTED_INTEGRATIONS.add(key)
279 
280  report_issue = async_suggest_report_issue(hass, integration=integration)
281  integration_type = "" if integration.is_built_in else "custom "
282  _LOGGER.log(
283  level,
284  "Detected that %sintegration '%s' %s. %s %s",
285  integration_type,
286  integration.domain,
287  what,
288  f"This will stop working in Home Assistant {breaks_in_ha_version}, please"
289  if breaks_in_ha_version
290  else "Please",
291  report_issue,
292  )
293 
294  if integration_behavior is ReportBehavior.ERROR:
295  raise RuntimeError(
296  f"Detected that {integration_type}integration "
297  f"'{integration.domain}' {what}. Please {report_issue}"
298  )
299 
300 
302  what: str,
303  breaks_in_ha_version: str | None,
304  integration_frame: IntegrationFrame,
305  level: int = logging.WARNING,
306  error: bool = False,
307 ) -> None:
308  """Report incorrect usage in an integration (identified via frame).
309 
310  Async friendly.
311  """
312  # Keep track of integrations already reported to prevent flooding
313  key = f"{integration_frame.filename}:{integration_frame.line_number}"
314  if not error and key in _REPORTED_INTEGRATIONS:
315  return
316  _REPORTED_INTEGRATIONS.add(key)
317 
318  report_issue = async_suggest_report_issue(
320  integration_domain=integration_frame.integration,
321  module=integration_frame.module,
322  )
323  integration_type = "custom " if integration_frame.custom_integration else ""
324  _LOGGER.log(
325  level,
326  "Detected that %sintegration '%s' %s at %s, line %s: %s. %s %s",
327  integration_type,
328  integration_frame.integration,
329  what,
330  integration_frame.relative_filename,
331  integration_frame.line_number,
332  integration_frame.line,
333  f"This will stop working in Home Assistant {breaks_in_ha_version}, please"
334  if breaks_in_ha_version
335  else "Please",
336  report_issue,
337  )
338  if not error:
339  return
340  raise RuntimeError(
341  f"Detected that {integration_type}integration "
342  f"'{integration_frame.integration}' {what} at "
343  f"{integration_frame.relative_filename}, line "
344  f"{integration_frame.line_number}: {integration_frame.line}. "
345  f"Please {report_issue}"
346  )
347 
348 
349 def warn_use[_CallableT: Callable](func: _CallableT, what: str) -> _CallableT:
350  """Mock a function to warn when it was about to be used."""
351  if asyncio.iscoroutinefunction(func):
352 
353  @functools.wraps(func)
354  async def report_use(*args: Any, **kwargs: Any) -> None:
355  report(what)
356 
357  else:
358 
359  @functools.wraps(func)
360  def report_use(*args: Any, **kwargs: Any) -> None:
361  report(what)
362 
363  return cast(_CallableT, report_use)
364 
365 
366 def report_non_thread_safe_operation(what: str) -> None:
367  """Report a non-thread safe operation."""
368  report(
369  f"calls {what} from a thread other than the event loop, "
370  "which may cause Home Assistant to crash or data to corrupt. "
371  "For more information, see "
372  "https://developers.home-assistant.io/docs/asyncio_thread_safety/"
373  f"#{what.replace('.', '')}",
374  error_if_core=True,
375  error_if_integration=True,
376  )
HomeAssistant|None async_get_hass_or_none()
Definition: core.py:299
None _report_integration_frame(str what, str|None breaks_in_ha_version, IntegrationFrame integration_frame, int level=logging.WARNING, bool error=False)
Definition: frame.py:307
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)
Definition: frame.py:195
FrameType get_current_frame(int depth=0)
Definition: frame.py:78
None report(str what, *set[str]|None exclude_integrations=None, bool error_if_core=True, bool error_if_integration=False, int level=logging.WARNING, bool log_custom_component_only=False)
Definition: frame.py:144
None report_non_thread_safe_operation(str what)
Definition: frame.py:366
logging.Logger get_integration_logger(str fallback_name)
Definition: frame.py:58
IntegrationFrame get_integration_frame(set|None exclude_integrations=None)
Definition: frame.py:84
None _report_integration_domain(HomeAssistant|None hass, str what, str|None breaks_in_ha_version, Integration integration, ReportBehavior core_integration_behavior, ReportBehavior custom_integration_behavior, int level)
Definition: frame.py:259
Integration|None async_get_issue_integration(HomeAssistant|None hass, str|None integration_domain)
Definition: loader.py:1693
str async_suggest_report_issue(HomeAssistant|None hass, *Integration|None integration=None, str|None integration_domain=None, str|None module=None)
Definition: loader.py:1752