1 """Advanced timeout handling.
3 Set of helper classes to handle timeouts of tasks with advanced options
4 like zones and freezing of timeouts.
7 from __future__
import annotations
11 from types
import TracebackType
12 from typing
import Any, Self
14 from .async_
import run_callback_threadsafe
16 ZONE_GLOBAL =
"global"
20 """States of a task."""
29 """Context manager that freezes the global timeout."""
31 def __init__(self, manager: TimeoutManager) ->
None:
32 """Initialize internal timeout context manager."""
33 self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
34 self._manager: TimeoutManager = manager
42 exc_type: type[BaseException] |
None,
43 exc_val: BaseException |
None,
44 exc_tb: TracebackType |
None,
50 self._loop.call_soon_threadsafe(self.
_enter_enter)
55 exc_type: type[BaseException] |
None,
56 exc_val: BaseException |
None,
57 exc_tb: TracebackType |
None,
59 self._loop.call_soon_threadsafe(self.
_exit_exit)
64 if self._manager.freezes_done:
66 for task
in self._manager.global_tasks:
70 for zone
in self._manager.zones.values():
71 if not zone.freezes_done:
75 self._manager.global_freezes.append(self)
79 self._manager.global_freezes.remove(self)
80 if not self._manager.freezes_done:
84 for task
in self._manager.global_tasks:
88 for zone
in self._manager.zones.values():
89 if not zone.freezes_done:
95 """Context manager that freezes a zone timeout."""
97 def __init__(self, zone: _ZoneTimeoutManager) ->
None:
98 """Initialize internal timeout context manager."""
99 self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
100 self._zone: _ZoneTimeoutManager = zone
108 exc_type: type[BaseException] |
None,
109 exc_val: BaseException |
None,
110 exc_tb: TracebackType |
None,
116 self._loop.call_soon_threadsafe(self.
_enter_enter)
121 exc_type: type[BaseException] |
None,
122 exc_val: BaseException |
None,
123 exc_tb: TracebackType |
None,
125 self._loop.call_soon_threadsafe(self.
_exit_exit)
130 if self._zone.freezes_done:
132 self._zone.enter_freeze(self)
136 self._zone.exit_freeze(self)
137 if not self._zone.freezes_done:
143 """Context manager that tracks a global task."""
147 manager: TimeoutManager,
148 task: asyncio.Task[Any],
152 """Initialize internal timeout context manager."""
153 self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
154 self._manager: TimeoutManager = manager
155 self._task: asyncio.Task[Any] = task
160 self._wait_zone: asyncio.Event = asyncio.Event()
161 self.
_state_state: _State = _State.INIT
162 self._cool_down: float = cool_down
166 self._manager.global_tasks.append(self)
177 exc_type: type[BaseException] |
None,
178 exc_val: BaseException |
None,
179 exc_tb: TracebackType |
None,
182 self._manager.global_tasks.remove(self)
185 if exc_type
is asyncio.CancelledError
and self.
statestate
is _State.TIMEOUT:
189 if self._task.uncancel() > self.
_cancelling_cancelling:
196 self.
_state_state = _State.EXIT
197 self._wait_zone.set()
202 """Return state of the Global task."""
206 """Signal that all zones are done."""
207 self._wait_zone.set()
210 """Start timeout handler."""
220 """Stop zone timer."""
231 """Process timeout."""
232 self.
_state_state = _State.TIMEOUT
236 if not self._manager.zones_done:
242 """Cancel own task."""
243 if self._task.done():
245 self._task.cancel(
"Global task timeout")
248 """Pause timers while it freeze."""
252 """Reset timer after freeze."""
256 """Wait until zones are done."""
257 await self._wait_zone.wait()
258 await asyncio.sleep(self._cool_down)
260 if self.
statestate != _State.TIMEOUT:
266 """Context manager that tracks an active task for a zone."""
270 zone: _ZoneTimeoutManager,
271 task: asyncio.Task[Any],
274 """Initialize internal timeout context manager."""
275 self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
276 self._zone: _ZoneTimeoutManager = zone
277 self._task: asyncio.Task[Any] = task
278 self.
_state_state: _State = _State.INIT
286 """Return state of the Zone task."""
290 self._zone.enter_task(self)
294 if self._zone.freezes_done:
306 exc_type: type[BaseException] |
None,
307 exc_val: BaseException |
None,
308 exc_tb: TracebackType |
None,
310 self._zone.exit_task(self)
314 if exc_type
is asyncio.CancelledError
and self.
statestate
is _State.TIMEOUT:
318 if self._task.uncancel() > self.
_cancelling_cancelling:
325 self.
_state_state = _State.EXIT
329 """Start timeout handler."""
339 """Stop zone timer."""
350 """Process timeout."""
351 self.
_state_state = _State.TIMEOUT
355 if self._task.done():
357 self._task.cancel(
"Zone timeout")
360 """Pause timers while it freeze."""
364 """Reset timer after freeze."""
369 """Manage the timeouts for a zone."""
371 def __init__(self, manager: TimeoutManager, zone: str) ->
None:
372 """Initialize internal timeout context manager."""
373 self._manager: TimeoutManager = manager
374 self._zone: str = zone
375 self._tasks: list[_ZoneTaskContext] = []
376 self._freezes: list[_ZoneFreezeContext] = []
379 """Representation of a zone."""
380 return f
"<{self.name}: {len(self._tasks)} / {len(self._freezes)}>"
384 """Return Zone name."""
389 """Return True if zone is active."""
390 return len(self._tasks) > 0
or len(self._freezes) > 0
394 """Return True if all freeze are done."""
395 return len(self._freezes) == 0
and self._manager.freezes_done
398 """Start into new Task."""
399 self._tasks.append(task)
402 """Exit a running Task."""
407 self._manager.drop_zone(self.
namename)
410 """Start into new freeze."""
411 self._freezes.append(freeze)
414 """Exit a running Freeze."""
415 self._freezes.
remove(freeze)
419 self._manager.drop_zone(self.
namename)
422 """Stop timers while it freeze."""
427 for task
in self._tasks:
431 """Reset timer after freeze."""
436 for task
in self._tasks:
441 """Class to manage timeouts over different zones.
443 Manages both global and zone based timeouts.
447 """Initialize TimeoutManager."""
448 self._loop: asyncio.AbstractEventLoop = asyncio.get_running_loop()
449 self._zones: dict[str, _ZoneTimeoutManager] = {}
450 self._globals: list[_GlobalTaskContext] = []
451 self._freezes: list[_GlobalFreezeContext] = []
455 """Return True if all zones are finished."""
456 return not bool(self._zones)
460 """Return True if all freezes are finished."""
461 return not self._freezes
464 def zones(self) -> dict[str, _ZoneTimeoutManager]:
465 """Return all Zones."""
470 """Return all global Tasks."""
475 """Return all global Freezes."""
479 """Drop a zone out of scope."""
480 self._zones.pop(zone_name,
None)
485 for task
in self._globals:
486 task.zones_done_signal()
489 self, timeout: float, zone_name: str = ZONE_GLOBAL, cool_down: float = 0
490 ) -> _ZoneTaskContext | _GlobalTaskContext:
491 """Timeout based on a zone.
493 For using as Async Context Manager.
495 current_task: asyncio.Task[Any] |
None = asyncio.current_task()
499 if zone_name == ZONE_GLOBAL:
503 if zone_name
in self.
zoneszones:
504 zone: _ZoneTimeoutManager = self.
zoneszones[zone_name]
512 self, zone_name: str = ZONE_GLOBAL
513 ) -> _ZoneFreezeContext | _GlobalFreezeContext:
514 """Freeze all timer until job is done.
516 For using as Async Context Manager.
519 if zone_name == ZONE_GLOBAL:
523 if zone_name
in self.
zoneszones:
524 zone: _ZoneTimeoutManager = self.
zoneszones[zone_name]
531 self, zone_name: str = ZONE_GLOBAL
532 ) -> _ZoneFreezeContext | _GlobalFreezeContext:
533 """Freeze all timer until job is done.
535 For using as Context Manager.
537 return run_callback_threadsafe(
_ZoneFreezeContext|_GlobalFreezeContext async_freeze(self, str zone_name=ZONE_GLOBAL)
dict[str, _ZoneTimeoutManager] zones(self)
list[_GlobalFreezeContext] global_freezes(self)
_ZoneFreezeContext|_GlobalFreezeContext freeze(self, str zone_name=ZONE_GLOBAL)
list[_GlobalTaskContext] global_tasks(self)
None drop_zone(self, str zone_name)
_ZoneTaskContext|_GlobalTaskContext async_timeout(self, float timeout, str zone_name=ZONE_GLOBAL, float cool_down=0)
None __init__(self, TimeoutManager manager)
bool|None __exit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
None zones_done_signal(self)
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
None __init__(self, TimeoutManager manager, asyncio.Task[Any] task, float timeout, float cool_down)
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
None __init__(self, _ZoneTimeoutManager zone)
bool|None __exit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
None __init__(self, _ZoneTimeoutManager zone, asyncio.Task[Any] task, float timeout)
bool|None __aexit__(self, type[BaseException]|None exc_type, BaseException|None exc_val, TracebackType|None exc_tb)
None __init__(self, TimeoutManager manager, str zone)
None enter_task(self, _ZoneTaskContext task)
None enter_freeze(self, _ZoneFreezeContext freeze)
None exit_task(self, _ZoneTaskContext task)
None exit_freeze(self, _ZoneFreezeContext freeze)
bool remove(self, _T matcher)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
bool cancelling(Future[Any] task)