Home Assistant Unofficial Reference 2024.12.1
timeout.py
Go to the documentation of this file.
1 """Advanced timeout handling.
2 
3 Set of helper classes to handle timeouts of tasks with advanced options
4 like zones and freezing of timeouts.
5 """
6 
7 from __future__ import annotations
8 
9 import asyncio
10 import enum
11 from types import TracebackType
12 from typing import Any, Self
13 
14 from .async_ import run_callback_threadsafe
15 
16 ZONE_GLOBAL = "global"
17 
18 
19 class _State(enum.Enum):
20  """States of a task."""
21 
22  INIT = "INIT"
23  ACTIVE = "ACTIVE"
24  TIMEOUT = "TIMEOUT"
25  EXIT = "EXIT"
26 
27 
29  """Context manager that freezes the global timeout."""
30 
31  def __init__(self, manager: TimeoutManager) -> None:
32  """Initialize internal timeout context manager."""
33  self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
34  self._manager: TimeoutManager = manager
35 
36  async def __aenter__(self) -> Self:
37  self._enter_enter()
38  return self
39 
40  async def __aexit__(
41  self,
42  exc_type: type[BaseException] | None,
43  exc_val: BaseException | None,
44  exc_tb: TracebackType | None,
45  ) -> bool | None:
46  self._exit_exit()
47  return None
48 
49  def __enter__(self) -> Self:
50  self._loop.call_soon_threadsafe(self._enter_enter)
51  return self
52 
53  def __exit__(
54  self,
55  exc_type: type[BaseException] | None,
56  exc_val: BaseException | None,
57  exc_tb: TracebackType | None,
58  ) -> bool | None:
59  self._loop.call_soon_threadsafe(self._exit_exit)
60  return None
61 
62  def _enter(self) -> None:
63  """Run freeze."""
64  if self._manager.freezes_done:
65  # Global reset
66  for task in self._manager.global_tasks:
67  task.pause()
68 
69  # Zones reset
70  for zone in self._manager.zones.values():
71  if not zone.freezes_done:
72  continue
73  zone.pause()
74 
75  self._manager.global_freezes.append(self)
76 
77  def _exit(self) -> None:
78  """Finish freeze."""
79  self._manager.global_freezes.remove(self)
80  if not self._manager.freezes_done:
81  return
82 
83  # Global reset
84  for task in self._manager.global_tasks:
85  task.reset()
86 
87  # Zones reset
88  for zone in self._manager.zones.values():
89  if not zone.freezes_done:
90  continue
91  zone.reset()
92 
93 
95  """Context manager that freezes a zone timeout."""
96 
97  def __init__(self, zone: _ZoneTimeoutManager) -> None:
98  """Initialize internal timeout context manager."""
99  self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
100  self._zone: _ZoneTimeoutManager = zone
101 
102  async def __aenter__(self) -> Self:
103  self._enter_enter()
104  return self
105 
106  async def __aexit__(
107  self,
108  exc_type: type[BaseException] | None,
109  exc_val: BaseException | None,
110  exc_tb: TracebackType | None,
111  ) -> bool | None:
112  self._exit_exit()
113  return None
114 
115  def __enter__(self) -> Self:
116  self._loop.call_soon_threadsafe(self._enter_enter)
117  return self
118 
119  def __exit__(
120  self,
121  exc_type: type[BaseException] | None,
122  exc_val: BaseException | None,
123  exc_tb: TracebackType | None,
124  ) -> bool | None:
125  self._loop.call_soon_threadsafe(self._exit_exit)
126  return None
127 
128  def _enter(self) -> None:
129  """Run freeze."""
130  if self._zone.freezes_done:
131  self._zone.pause()
132  self._zone.enter_freeze(self)
133 
134  def _exit(self) -> None:
135  """Finish freeze."""
136  self._zone.exit_freeze(self)
137  if not self._zone.freezes_done:
138  return
139  self._zone.reset()
140 
141 
143  """Context manager that tracks a global task."""
144 
145  def __init__(
146  self,
147  manager: TimeoutManager,
148  task: asyncio.Task[Any],
149  timeout: float,
150  cool_down: float,
151  ) -> None:
152  """Initialize internal timeout context manager."""
153  self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
154  self._manager: TimeoutManager = manager
155  self._task: asyncio.Task[Any] = task
156  self._time_left_time_left: float = timeout
157  self._expiration_time_expiration_time: float | None = None
158  self._timeout_handler_timeout_handler: asyncio.Handle | None = None
159  self._on_wait_task_on_wait_task: asyncio.Task | None = None
160  self._wait_zone: asyncio.Event = asyncio.Event()
161  self._state_state: _State = _State.INIT
162  self._cool_down: float = cool_down
163  self._cancelling_cancelling = 0
164 
165  async def __aenter__(self) -> Self:
166  self._manager.global_tasks.append(self)
167  self._start_timer_start_timer()
168  self._state_state = _State.ACTIVE
169  # Remember if the task was already cancelling
170  # so when we __aexit__ we can decide if we should
171  # raise asyncio.TimeoutError or let the cancellation propagate
172  self._cancelling_cancelling = self._task.cancelling()
173  return self
174 
175  async def __aexit__(
176  self,
177  exc_type: type[BaseException] | None,
178  exc_val: BaseException | None,
179  exc_tb: TracebackType | None,
180  ) -> bool | None:
181  self._stop_timer_stop_timer()
182  self._manager.global_tasks.remove(self)
183 
184  # Timeout on exit
185  if exc_type is asyncio.CancelledError and self.statestate is _State.TIMEOUT:
186  # The timeout was hit, and the task was cancelled
187  # so we need to uncancel the task since the cancellation
188  # should not leak out of the context manager
189  if self._task.uncancel() > self._cancelling_cancelling:
190  # If the task was already cancelling don't raise
191  # asyncio.TimeoutError and instead return None
192  # to allow the cancellation to propagate
193  return None
194  raise TimeoutError
195 
196  self._state_state = _State.EXIT
197  self._wait_zone.set()
198  return None
199 
200  @property
201  def state(self) -> _State:
202  """Return state of the Global task."""
203  return self._state_state
204 
205  def zones_done_signal(self) -> None:
206  """Signal that all zones are done."""
207  self._wait_zone.set()
208 
209  def _start_timer(self) -> None:
210  """Start timeout handler."""
211  if self._timeout_handler_timeout_handler:
212  return
213 
214  self._expiration_time_expiration_time = self._loop.time() + self._time_left_time_left
215  self._timeout_handler_timeout_handler = self._loop.call_at(
216  self._expiration_time_expiration_time, self._on_timeout_on_timeout
217  )
218 
219  def _stop_timer(self) -> None:
220  """Stop zone timer."""
221  if self._timeout_handler_timeout_handler is None:
222  return
223 
224  self._timeout_handler_timeout_handler.cancel()
225  self._timeout_handler_timeout_handler = None
226  # Calculate new timeout
227  assert self._expiration_time_expiration_time
228  self._time_left_time_left = self._expiration_time_expiration_time - self._loop.time()
229 
230  def _on_timeout(self) -> None:
231  """Process timeout."""
232  self._state_state = _State.TIMEOUT
233  self._timeout_handler_timeout_handler = None
234 
235  # Reset timer if zones are running
236  if not self._manager.zones_done:
237  self._on_wait_task_on_wait_task = asyncio.create_task(self._on_wait_on_wait())
238  else:
239  self._cancel_task_cancel_task()
240 
241  def _cancel_task(self) -> None:
242  """Cancel own task."""
243  if self._task.done():
244  return
245  self._task.cancel("Global task timeout")
246 
247  def pause(self) -> None:
248  """Pause timers while it freeze."""
249  self._stop_timer_stop_timer()
250 
251  def reset(self) -> None:
252  """Reset timer after freeze."""
253  self._start_timer_start_timer()
254 
255  async def _on_wait(self) -> None:
256  """Wait until zones are done."""
257  await self._wait_zone.wait()
258  await asyncio.sleep(self._cool_down) # Allow context switch
259  self._on_wait_task_on_wait_task = None
260  if self.statestate != _State.TIMEOUT:
261  return
262  self._cancel_task_cancel_task()
263 
264 
266  """Context manager that tracks an active task for a zone."""
267 
268  def __init__(
269  self,
270  zone: _ZoneTimeoutManager,
271  task: asyncio.Task[Any],
272  timeout: float,
273  ) -> None:
274  """Initialize internal timeout context manager."""
275  self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
276  self._zone: _ZoneTimeoutManager = zone
277  self._task: asyncio.Task[Any] = task
278  self._state_state: _State = _State.INIT
279  self._time_left_time_left: float = timeout
280  self._expiration_time_expiration_time: float | None = None
281  self._timeout_handler_timeout_handler: asyncio.Handle | None = None
282  self._cancelling_cancelling = 0
283 
284  @property
285  def state(self) -> _State:
286  """Return state of the Zone task."""
287  return self._state_state
288 
289  async def __aenter__(self) -> Self:
290  self._zone.enter_task(self)
291  self._state_state = _State.ACTIVE
292 
293  # Zone is on freeze
294  if self._zone.freezes_done:
295  self._start_timer_start_timer()
296 
297  # Remember if the task was already cancelling
298  # so when we __aexit__ we can decide if we should
299  # raise asyncio.TimeoutError or let the cancellation propagate
300  self._cancelling_cancelling = self._task.cancelling()
301 
302  return self
303 
304  async def __aexit__(
305  self,
306  exc_type: type[BaseException] | None,
307  exc_val: BaseException | None,
308  exc_tb: TracebackType | None,
309  ) -> bool | None:
310  self._zone.exit_task(self)
311  self._stop_timer_stop_timer()
312 
313  # Timeout on exit
314  if exc_type is asyncio.CancelledError and self.statestate is _State.TIMEOUT:
315  # The timeout was hit, and the task was cancelled
316  # so we need to uncancel the task since the cancellation
317  # should not leak out of the context manager
318  if self._task.uncancel() > self._cancelling_cancelling:
319  # If the task was already cancelling don't raise
320  # asyncio.TimeoutError and instead return None
321  # to allow the cancellation to propagate
322  return None
323  raise TimeoutError
324 
325  self._state_state = _State.EXIT
326  return None
327 
328  def _start_timer(self) -> None:
329  """Start timeout handler."""
330  if self._timeout_handler_timeout_handler:
331  return
332 
333  self._expiration_time_expiration_time = self._loop.time() + self._time_left_time_left
334  self._timeout_handler_timeout_handler = self._loop.call_at(
335  self._expiration_time_expiration_time, self._on_timeout_on_timeout
336  )
337 
338  def _stop_timer(self) -> None:
339  """Stop zone timer."""
340  if self._timeout_handler_timeout_handler is None:
341  return
342 
343  self._timeout_handler_timeout_handler.cancel()
344  self._timeout_handler_timeout_handler = None
345  # Calculate new timeout
346  assert self._expiration_time_expiration_time
347  self._time_left_time_left = self._expiration_time_expiration_time - self._loop.time()
348 
349  def _on_timeout(self) -> None:
350  """Process timeout."""
351  self._state_state = _State.TIMEOUT
352  self._timeout_handler_timeout_handler = None
353 
354  # Timeout
355  if self._task.done():
356  return
357  self._task.cancel("Zone timeout")
358 
359  def pause(self) -> None:
360  """Pause timers while it freeze."""
361  self._stop_timer_stop_timer()
362 
363  def reset(self) -> None:
364  """Reset timer after freeze."""
365  self._start_timer_start_timer()
366 
367 
369  """Manage the timeouts for a zone."""
370 
371  def __init__(self, manager: TimeoutManager, zone: str) -> None:
372  """Initialize internal timeout context manager."""
373  self._manager: TimeoutManager = manager
374  self._zone: str = zone
375  self._tasks: list[_ZoneTaskContext] = []
376  self._freezes: list[_ZoneFreezeContext] = []
377 
378  def __repr__(self) -> str:
379  """Representation of a zone."""
380  return f"<{self.name}: {len(self._tasks)} / {len(self._freezes)}>"
381 
382  @property
383  def name(self) -> str:
384  """Return Zone name."""
385  return self._zone
386 
387  @property
388  def active(self) -> bool:
389  """Return True if zone is active."""
390  return len(self._tasks) > 0 or len(self._freezes) > 0
391 
392  @property
393  def freezes_done(self) -> bool:
394  """Return True if all freeze are done."""
395  return len(self._freezes) == 0 and self._manager.freezes_done
396 
397  def enter_task(self, task: _ZoneTaskContext) -> None:
398  """Start into new Task."""
399  self._tasks.append(task)
400 
401  def exit_task(self, task: _ZoneTaskContext) -> None:
402  """Exit a running Task."""
403  self._tasks.remove(task)
404 
405  # On latest listener
406  if not self.activeactive:
407  self._manager.drop_zone(self.namename)
408 
409  def enter_freeze(self, freeze: _ZoneFreezeContext) -> None:
410  """Start into new freeze."""
411  self._freezes.append(freeze)
412 
413  def exit_freeze(self, freeze: _ZoneFreezeContext) -> None:
414  """Exit a running Freeze."""
415  self._freezes.remove(freeze)
416 
417  # On latest listener
418  if not self.activeactive:
419  self._manager.drop_zone(self.namename)
420 
421  def pause(self) -> None:
422  """Stop timers while it freeze."""
423  if not self.activeactive:
424  return
425 
426  # Forward pause
427  for task in self._tasks:
428  task.pause()
429 
430  def reset(self) -> None:
431  """Reset timer after freeze."""
432  if not self.activeactive:
433  return
434 
435  # Forward reset
436  for task in self._tasks:
437  task.reset()
438 
439 
441  """Class to manage timeouts over different zones.
442 
443  Manages both global and zone based timeouts.
444  """
445 
446  def __init__(self) -> None:
447  """Initialize TimeoutManager."""
448  self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
449  self._zones: dict[str, _ZoneTimeoutManager] = {}
450  self._globals: list[_GlobalTaskContext] = []
451  self._freezes: list[_GlobalFreezeContext] = []
452 
453  @property
454  def zones_done(self) -> bool:
455  """Return True if all zones are finished."""
456  return not bool(self._zones)
457 
458  @property
459  def freezes_done(self) -> bool:
460  """Return True if all freezes are finished."""
461  return not self._freezes
462 
463  @property
464  def zones(self) -> dict[str, _ZoneTimeoutManager]:
465  """Return all Zones."""
466  return self._zones
467 
468  @property
469  def global_tasks(self) -> list[_GlobalTaskContext]:
470  """Return all global Tasks."""
471  return self._globals
472 
473  @property
474  def global_freezes(self) -> list[_GlobalFreezeContext]:
475  """Return all global Freezes."""
476  return self._freezes
477 
478  def drop_zone(self, zone_name: str) -> None:
479  """Drop a zone out of scope."""
480  self._zones.pop(zone_name, None)
481  if self._zones:
482  return
483 
484  # Signal Global task, all zones are done
485  for task in self._globals:
486  task.zones_done_signal()
487 
489  self, timeout: float, zone_name: str = ZONE_GLOBAL, cool_down: float = 0
490  ) -> _ZoneTaskContext | _GlobalTaskContext:
491  """Timeout based on a zone.
492 
493  For using as Async Context Manager.
494  """
495  current_task: asyncio.Task[Any] | None = asyncio.current_task()
496  assert current_task
497 
498  # Global Zone
499  if zone_name == ZONE_GLOBAL:
500  return _GlobalTaskContext(self, current_task, timeout, cool_down)
501 
502  # Zone Handling
503  if zone_name in self.zoneszones:
504  zone: _ZoneTimeoutManager = self.zoneszones[zone_name]
505  else:
506  self.zoneszones[zone_name] = zone = _ZoneTimeoutManager(self, zone_name)
507 
508  # Create Task
509  return _ZoneTaskContext(zone, current_task, timeout)
510 
512  self, zone_name: str = ZONE_GLOBAL
513  ) -> _ZoneFreezeContext | _GlobalFreezeContext:
514  """Freeze all timer until job is done.
515 
516  For using as Async Context Manager.
517  """
518  # Global Freeze
519  if zone_name == ZONE_GLOBAL:
520  return _GlobalFreezeContext(self)
521 
522  # Zone Freeze
523  if zone_name in self.zoneszones:
524  zone: _ZoneTimeoutManager = self.zoneszones[zone_name]
525  else:
526  self.zoneszones[zone_name] = zone = _ZoneTimeoutManager(self, zone_name)
527 
528  return _ZoneFreezeContext(zone)
529 
530  def freeze(
531  self, zone_name: str = ZONE_GLOBAL
532  ) -> _ZoneFreezeContext | _GlobalFreezeContext:
533  """Freeze all timer until job is done.
534 
535  For using as Context Manager.
536  """
537  return run_callback_threadsafe(
538  self._loop, self.async_freezeasync_freeze, zone_name
539  ).result()
_ZoneFreezeContext|_GlobalFreezeContext async_freeze(self, str zone_name=ZONE_GLOBAL)
Definition: timeout.py:513
dict[str, _ZoneTimeoutManager] zones(self)
Definition: timeout.py:464
list[_GlobalFreezeContext] global_freezes(self)
Definition: timeout.py:474
_ZoneFreezeContext|_GlobalFreezeContext freeze(self, str zone_name=ZONE_GLOBAL)
Definition: timeout.py:532
list[_GlobalTaskContext] global_tasks(self)
Definition: timeout.py:469
None drop_zone(self, str zone_name)
Definition: timeout.py:478
_ZoneTaskContext|_GlobalTaskContext async_timeout(self, float timeout, str zone_name=ZONE_GLOBAL, float cool_down=0)
Definition: timeout.py:490
None __init__(self, TimeoutManager manager)
Definition: timeout.py:31
bool|None __exit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:58
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:45
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:180
None __init__(self, TimeoutManager manager, asyncio.Task[Any] task, float timeout, float cool_down)
Definition: timeout.py:151
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:111
None __init__(self, _ZoneTimeoutManager zone)
Definition: timeout.py:97
bool|None __exit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:124
None __init__(self, _ZoneTimeoutManager zone, asyncio.Task[Any] task, float timeout)
Definition: timeout.py:273
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
Definition: timeout.py:309
None __init__(self, TimeoutManager manager, str zone)
Definition: timeout.py:371
None enter_task(self, _ZoneTaskContext task)
Definition: timeout.py:397
None enter_freeze(self, _ZoneFreezeContext freeze)
Definition: timeout.py:409
None exit_task(self, _ZoneTaskContext task)
Definition: timeout.py:401
None exit_freeze(self, _ZoneFreezeContext freeze)
Definition: timeout.py:413
bool remove(self, _T matcher)
Definition: match.py:214
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
bool cancelling(Future[Any] task)
Definition: async_.py:48