Home Assistant Unofficial Reference 2024.12.1
trace.py
Go to the documentation of this file.
1 """Helpers for script and condition tracing."""
2 
3 from __future__ import annotations
4 
5 from collections import deque
6 from collections.abc import Callable, Coroutine, Generator
7 from contextlib import contextmanager
8 from contextvars import ContextVar
9 from functools import wraps
10 from typing import Any
11 
12 from homeassistant.core import ServiceResponse
13 import homeassistant.util.dt as dt_util
14 
15 from .typing import TemplateVarsType
16 
17 
19  """Container for trace data."""
20 
21  __slots__ = (
22  "_child_key",
23  "_child_run_id",
24  "_error",
25  "_last_variables",
26  "path",
27  "_result",
28  "reuse_by_child",
29  "_timestamp",
30  "_variables",
31  )
32 
33  def __init__(self, variables: TemplateVarsType, path: str) -> None:
34  """Container for trace data."""
35  self._child_key_child_key: str | None = None
36  self._child_run_id_child_run_id: str | None = None
37  self._error_error: BaseException | None = None
38  self.path: str = path
39  self._result_result: dict[str, Any] | None = None
40  self.reuse_by_childreuse_by_child = False
41  self._timestamp_timestamp = dt_util.utcnow()
42 
43  self._last_variables_last_variables = variables_cv.get() or {}
44  self.update_variablesupdate_variables(variables)
45 
46  def __repr__(self) -> str:
47  """Container for trace data."""
48  return str(self.as_dictas_dict())
49 
50  def set_child_id(self, child_key: str, child_run_id: str) -> None:
51  """Set trace id of a nested script run."""
52  self._child_key_child_key = child_key
53  self._child_run_id_child_run_id = child_run_id
54 
55  def set_error(self, ex: BaseException | None) -> None:
56  """Set error."""
57  self._error_error = ex
58 
59  def set_result(self, **kwargs: Any) -> None:
60  """Set result."""
61  self._result_result = {**kwargs}
62 
63  def update_result(self, **kwargs: Any) -> None:
64  """Set result."""
65  old_result = self._result_result or {}
66  self._result_result = {**old_result, **kwargs}
67 
68  def update_variables(self, variables: TemplateVarsType) -> None:
69  """Update variables."""
70  if variables is None:
71  variables = {}
72  last_variables = self._last_variables_last_variables
73  variables_cv.set(dict(variables))
74  changed_variables = {
75  key: value
76  for key, value in variables.items()
77  if key not in last_variables or last_variables[key] != value
78  }
79  self._variables_variables = changed_variables
80 
81  def as_dict(self) -> dict[str, Any]:
82  """Return dictionary version of this TraceElement."""
83  result: dict[str, Any] = {"path": self.path, "timestamp": self._timestamp_timestamp}
84  if self._child_key_child_key is not None:
85  domain, _, item_id = self._child_key_child_key.partition(".")
86  result["child_id"] = {
87  "domain": domain,
88  "item_id": item_id,
89  "run_id": str(self._child_run_id_child_run_id),
90  }
91  if self._variables_variables:
92  result["changed_variables"] = self._variables_variables
93  if self._error_error is not None:
94  result["error"] = str(self._error_error) or self._error_error.__class__.__name__
95  if self._result_result is not None:
96  result["result"] = self._result_result
97  return result
98 
99 
100 # Context variables for tracing
101 # Current trace
102 trace_cv: ContextVar[dict[str, deque[TraceElement]] | None] = ContextVar(
103  "trace_cv", default=None
104 )
105 # Stack of TraceElements
106 trace_stack_cv: ContextVar[list[TraceElement] | None] = ContextVar(
107  "trace_stack_cv", default=None
108 )
109 # Current location in config tree
110 trace_path_stack_cv: ContextVar[list[str] | None] = ContextVar(
111  "trace_path_stack_cv", default=None
112 )
113 # Copy of last variables
114 variables_cv: ContextVar[Any | None] = ContextVar("variables_cv", default=None)
115 # (domain.item_id, Run ID)
116 trace_id_cv: ContextVar[tuple[str, str] | None] = ContextVar(
117  "trace_id_cv", default=None
118 )
119 # Reason for stopped script execution
120 script_execution_cv: ContextVar[StopReason | None] = ContextVar(
121  "script_execution_cv", default=None
122 )
123 
124 
125 def trace_id_set(trace_id: tuple[str, str]) -> None:
126  """Set id of the current trace."""
127  trace_id_cv.set(trace_id)
128 
129 
130 def trace_id_get() -> tuple[str, str] | None:
131  """Get id if the current trace."""
132  return trace_id_cv.get()
133 
134 
135 def trace_stack_push[_T](
136  trace_stack_var: ContextVar[list[_T] | None], node: _T
137 ) -> None:
138  """Push an element to the top of a trace stack."""
139  trace_stack: list[_T] | None
140  if (trace_stack := trace_stack_var.get()) is None:
141  trace_stack = []
142  trace_stack_var.set(trace_stack)
143  trace_stack.append(node)
144 
145 
146 def trace_stack_pop(trace_stack_var: ContextVar[list[Any] | None]) -> None:
147  """Remove the top element from a trace stack."""
148  trace_stack = trace_stack_var.get()
149  if trace_stack is not None:
150  trace_stack.pop()
151 
152 
153 def trace_stack_top[_T](trace_stack_var: ContextVar[list[_T] | None]) -> _T | None:
154  """Return the element at the top of a trace stack."""
155  trace_stack = trace_stack_var.get()
156  return trace_stack[-1] if trace_stack else None
157 
158 
159 def trace_path_push(suffix: str | list[str]) -> int:
160  """Go deeper in the config tree."""
161  if isinstance(suffix, str):
162  suffix = [suffix]
163  for node in suffix:
164  trace_stack_push(trace_path_stack_cv, node)
165  return len(suffix)
166 
167 
168 def trace_path_pop(count: int) -> None:
169  """Go n levels up in the config tree."""
170  for _ in range(count):
171  trace_stack_pop(trace_path_stack_cv)
172 
173 
174 def trace_path_get() -> str:
175  """Return a string representing the current location in the config tree."""
176  if not (path := trace_path_stack_cv.get()):
177  return ""
178  return "/".join(path)
179 
180 
182  trace_element: TraceElement,
183  maxlen: int | None = None,
184 ) -> None:
185  """Append a TraceElement to trace[path]."""
186  if (trace := trace_cv.get()) is None:
187  trace = {}
188  trace_cv.set(trace)
189  if (path := trace_element.path) not in trace:
190  trace[path] = deque(maxlen=maxlen)
191  trace[path].append(trace_element)
192 
193 
194 def trace_get(clear: bool = True) -> dict[str, deque[TraceElement]] | None:
195  """Return the current trace."""
196  if clear:
197  trace_clear()
198  return trace_cv.get()
199 
200 
201 def trace_clear() -> None:
202  """Clear the trace."""
203  trace_cv.set({})
204  trace_stack_cv.set(None)
205  trace_path_stack_cv.set(None)
206  variables_cv.set(None)
207  script_execution_cv.set(StopReason())
208 
209 
210 def trace_set_child_id(child_key: str, child_run_id: str) -> None:
211  """Set child trace_id of TraceElement at the top of the stack."""
212  if node := trace_stack_top(trace_stack_cv):
213  node.set_child_id(child_key, child_run_id)
214 
215 
216 def trace_set_result(**kwargs: Any) -> None:
217  """Set the result of TraceElement at the top of the stack."""
218  if node := trace_stack_top(trace_stack_cv):
219  node.set_result(**kwargs)
220 
221 
222 def trace_update_result(**kwargs: Any) -> None:
223  """Update the result of TraceElement at the top of the stack."""
224  if node := trace_stack_top(trace_stack_cv):
225  node.update_result(**kwargs)
226 
227 
229  """Mutable container class for script_execution."""
230 
231  script_execution: str | None = None
232  response: ServiceResponse = None
233 
234 
235 def script_execution_set(reason: str, response: ServiceResponse = None) -> None:
236  """Set stop reason."""
237  if (data := script_execution_cv.get()) is None:
238  return
239  data.script_execution = reason
240  data.response = response
241 
242 
243 def script_execution_get() -> str | None:
244  """Return the stop reason."""
245  if (data := script_execution_cv.get()) is None:
246  return None
247  return data.script_execution
248 
249 
250 @contextmanager
251 def trace_path(suffix: str | list[str]) -> Generator[None]:
252  """Go deeper in the config tree.
253 
254  Can not be used as a decorator on couroutine functions.
255  """
256  count = trace_path_push(suffix)
257  try:
258  yield
259  finally:
260  trace_path_pop(count)
261 
262 
263 def async_trace_path[*_Ts](
264  suffix: str | list[str],
265 ) -> Callable[
266  [Callable[[*_Ts], Coroutine[Any, Any, None]]],
267  Callable[[*_Ts], Coroutine[Any, Any, None]],
268 ]:
269  """Go deeper in the config tree.
270 
271  To be used as a decorator on coroutine functions.
272  """
273 
274  def _trace_path_decorator(
275  func: Callable[[*_Ts], Coroutine[Any, Any, None]],
276  ) -> Callable[[*_Ts], Coroutine[Any, Any, None]]:
277  """Decorate a coroutine function."""
278 
279  @wraps(func)
280  async def async_wrapper(*args: *_Ts) -> None:
281  """Catch and log exception."""
282  with trace_path(suffix):
283  await func(*args)
284 
285  return async_wrapper
286 
287  return _trace_path_decorator
None set_result(self, **Any kwargs)
Definition: trace.py:59
dict[str, Any] as_dict(self)
Definition: trace.py:81
None update_result(self, **Any kwargs)
Definition: trace.py:63
None __init__(self, TemplateVarsType variables, str path)
Definition: trace.py:33
None update_variables(self, TemplateVarsType variables)
Definition: trace.py:68
None set_error(self, BaseException|None ex)
Definition: trace.py:55
None set_child_id(self, str child_key, str child_run_id)
Definition: trace.py:50
None trace_id_set(tuple[str, str] trace_id)
Definition: trace.py:125
None script_execution_set(str reason, ServiceResponse response=None)
Definition: trace.py:235
None trace_stack_pop(ContextVar[list[Any]|None] trace_stack_var)
Definition: trace.py:146
Generator[None] trace_path(str|list[str] suffix)
Definition: trace.py:251
None trace_update_result(**Any kwargs)
Definition: trace.py:222
tuple[str, str]|None trace_id_get()
Definition: trace.py:130
dict[str, deque[TraceElement]]|None trace_get(bool clear=True)
Definition: trace.py:194
None trace_set_result(**Any kwargs)
Definition: trace.py:216
str|None script_execution_get()
Definition: trace.py:243
int trace_path_push(str|list[str] suffix)
Definition: trace.py:159
None trace_set_child_id(str child_key, str child_run_id)
Definition: trace.py:210
None trace_path_pop(int count)
Definition: trace.py:168
None trace_append_element(TraceElement trace_element, int|None maxlen=None)
Definition: trace.py:184