1 """Helpers for script and condition tracing."""
3 from __future__
import annotations
5 from collections
import deque
6 from collections.abc
import Callable, Coroutine, Generator
7 from contextlib
import contextmanager
8 from contextvars
import ContextVar
9 from functools
import wraps
10 from typing
import Any
15 from .typing
import TemplateVarsType
19 """Container for trace data."""
33 def __init__(self, variables: TemplateVarsType, path: str) ->
None:
34 """Container for trace data."""
37 self.
_error_error: BaseException |
None =
None
39 self.
_result_result: dict[str, Any] |
None =
None
47 """Container for trace data."""
51 """Set trace id of a nested script run."""
55 def set_error(self, ex: BaseException |
None) ->
None:
65 old_result = self.
_result_result
or {}
66 self.
_result_result = {**old_result, **kwargs}
69 """Update variables."""
73 variables_cv.set(
dict(variables))
76 for key, value
in variables.items()
77 if key
not in last_variables
or last_variables[key] != value
82 """Return dictionary version of this TraceElement."""
83 result: dict[str, Any] = {
"path": self.path,
"timestamp": self.
_timestamp_timestamp}
85 domain, _, item_id = self.
_child_key_child_key.partition(
".")
86 result[
"child_id"] = {
92 result[
"changed_variables"] = self.
_variables_variables
93 if self.
_error_error
is not None:
94 result[
"error"] =
str(self.
_error_error)
or self.
_error_error.__class__.__name__
95 if self.
_result_result
is not None:
96 result[
"result"] = self.
_result_result
102 trace_cv: ContextVar[dict[str, deque[TraceElement]] |
None] = ContextVar(
103 "trace_cv", default=
None
106 trace_stack_cv: ContextVar[list[TraceElement] |
None] = ContextVar(
107 "trace_stack_cv", default=
None
110 trace_path_stack_cv: ContextVar[list[str] |
None] = ContextVar(
111 "trace_path_stack_cv", default=
None
114 variables_cv: ContextVar[Any |
None] = ContextVar(
"variables_cv", default=
None)
116 trace_id_cv: ContextVar[tuple[str, str] |
None] = ContextVar(
117 "trace_id_cv", default=
None
120 script_execution_cv: ContextVar[StopReason |
None] = ContextVar(
121 "script_execution_cv", default=
None
126 """Set id of the current trace."""
127 trace_id_cv.set(trace_id)
131 """Get id if the current trace."""
132 return trace_id_cv.get()
135 def trace_stack_push[_T](
136 trace_stack_var: ContextVar[list[_T] |
None], node: _T
138 """Push an element to the top of a trace stack."""
139 trace_stack: list[_T] |
None
140 if (trace_stack := trace_stack_var.get())
is None:
142 trace_stack_var.set(trace_stack)
143 trace_stack.append(node)
147 """Remove the top element from a trace stack."""
148 trace_stack = trace_stack_var.get()
149 if trace_stack
is not None:
153 def trace_stack_top[_T](trace_stack_var: ContextVar[list[_T] |
None]) -> _T |
None:
154 """Return the element at the top of a trace stack."""
155 trace_stack = trace_stack_var.get()
156 return trace_stack[-1]
if trace_stack
else None
160 """Go deeper in the config tree."""
161 if isinstance(suffix, str):
164 trace_stack_push(trace_path_stack_cv, node)
169 """Go n levels up in the config tree."""
170 for _
in range(count):
175 """Return a string representing the current location in the config tree."""
176 if not (path := trace_path_stack_cv.get()):
178 return "/".join(path)
182 trace_element: TraceElement,
183 maxlen: int |
None =
None,
185 """Append a TraceElement to trace[path]."""
186 if (trace := trace_cv.get())
is None:
189 if (path := trace_element.path)
not in trace:
190 trace[path] = deque(maxlen=maxlen)
191 trace[path].append(trace_element)
194 def trace_get(clear: bool =
True) -> dict[str, deque[TraceElement]] |
None:
195 """Return the current trace."""
198 return trace_cv.get()
202 """Clear the trace."""
204 trace_stack_cv.set(
None)
205 trace_path_stack_cv.set(
None)
206 variables_cv.set(
None)
211 """Set child trace_id of TraceElement at the top of the stack."""
212 if node := trace_stack_top(trace_stack_cv):
213 node.set_child_id(child_key, child_run_id)
217 """Set the result of TraceElement at the top of the stack."""
218 if node := trace_stack_top(trace_stack_cv):
219 node.set_result(**kwargs)
223 """Update the result of TraceElement at the top of the stack."""
224 if node := trace_stack_top(trace_stack_cv):
225 node.update_result(**kwargs)
229 """Mutable container class for script_execution."""
231 script_execution: str |
None =
None
232 response: ServiceResponse =
None
236 """Set stop reason."""
237 if (data := script_execution_cv.get())
is None:
239 data.script_execution = reason
240 data.response = response
244 """Return the stop reason."""
245 if (data := script_execution_cv.get())
is None:
247 return data.script_execution
252 """Go deeper in the config tree.
254 Can not be used as a decorator on couroutine functions.
263 def async_trace_path[*_Ts](
264 suffix: str | list[str],
266 [Callable[[*_Ts], Coroutine[Any, Any,
None]]],
267 Callable[[*_Ts], Coroutine[Any, Any,
None]],
269 """Go deeper in the config tree.
271 To be used as a decorator on coroutine functions.
274 def _trace_path_decorator(
275 func: Callable[[*_Ts], Coroutine[Any, Any,
None]],
276 ) -> Callable[[*_Ts], Coroutine[Any, Any,
None]]:
277 """Decorate a coroutine function."""
280 async
def async_wrapper(*args: *_Ts) ->
None:
281 """Catch and log exception."""
287 return _trace_path_decorator
None set_result(self, **Any kwargs)
dict[str, Any] as_dict(self)
None update_result(self, **Any kwargs)
None __init__(self, TemplateVarsType variables, str path)
None update_variables(self, TemplateVarsType variables)
None set_error(self, BaseException|None ex)
None set_child_id(self, str child_key, str child_run_id)
None trace_id_set(tuple[str, str] trace_id)
None script_execution_set(str reason, ServiceResponse response=None)
None trace_stack_pop(ContextVar[list[Any]|None] trace_stack_var)
Generator[None] trace_path(str|list[str] suffix)
None trace_update_result(**Any kwargs)
tuple[str, str]|None trace_id_get()
dict[str, deque[TraceElement]]|None trace_get(bool clear=True)
None trace_set_result(**Any kwargs)
str|None script_execution_get()
int trace_path_push(str|list[str] suffix)
None trace_set_child_id(str child_key, str child_run_id)
None trace_path_pop(int count)
None trace_append_element(TraceElement trace_element, int|None maxlen=None)