Home Assistant Unofficial Reference 2024.12.1
async_.py
Go to the documentation of this file.
1 """Asyncio utilities."""
2 
3 from __future__ import annotations
4 
5 from asyncio import (
6  AbstractEventLoop,
7  Future,
8  Semaphore,
9  Task,
10  TimerHandle,
11  gather,
12  get_running_loop,
13 )
14 from collections.abc import Awaitable, Callable, Coroutine
15 import concurrent.futures
16 import logging
17 import threading
18 from typing import Any
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 _SHUTDOWN_RUN_CALLBACK_THREADSAFE = "_shutdown_run_callback_threadsafe"
23 
24 
25 def create_eager_task[_T](
26  coro: Coroutine[Any, Any, _T],
27  *,
28  name: str | None = None,
29  loop: AbstractEventLoop | None = None,
30 ) -> Task[_T]:
31  """Create a task from a coroutine and schedule it to run immediately."""
32  if not loop:
33  try:
34  loop = get_running_loop()
35  except RuntimeError:
36  # If there is no running loop, create_eager_task is being called from
37  # the wrong thread.
38  # Late import to avoid circular dependencies
39  # pylint: disable-next=import-outside-toplevel
40  from homeassistant.helpers import frame
41 
42  frame.report_usage("attempted to create an asyncio task from a thread")
43  raise
44 
45  return Task(coro, loop=loop, name=name, eager_start=True)
46 
47 
48 def cancelling(task: Future[Any]) -> bool:
49  """Return True if task is cancelling."""
50  return bool((cancelling_ := getattr(task, "cancelling", None)) and cancelling_())
51 
52 
53 def run_callback_threadsafe[_T, *_Ts](
54  loop: AbstractEventLoop, callback: Callable[[*_Ts], _T], *args: *_Ts
55 ) -> concurrent.futures.Future[_T]:
56  """Submit a callback object to a given event loop.
57 
58  Return a concurrent.futures.Future to access the result.
59  """
60  if (ident := loop.__dict__.get("_thread_id")) and ident == threading.get_ident():
61  raise RuntimeError("Cannot be called from within the event loop")
62 
63  future: concurrent.futures.Future[_T] = concurrent.futures.Future()
64 
65  def run_callback() -> None:
66  """Run callback and store result."""
67  try:
68  future.set_result(callback(*args))
69  except Exception as exc: # noqa: BLE001
70  if future.set_running_or_notify_cancel():
71  future.set_exception(exc)
72  else:
73  _LOGGER.warning("Exception on lost future: ", exc_info=True)
74 
75  loop.call_soon_threadsafe(run_callback)
76 
77  if hasattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE):
78  #
79  # If the final `HomeAssistant.async_block_till_done` in
80  # `HomeAssistant.async_stop` has already been called, the callback
81  # will never run and, `future.result()` will block forever which
82  # will prevent the thread running this code from shutting down which
83  # will result in a deadlock when the main thread attempts to shutdown
84  # the executor and `.join()` the thread running this code.
85  #
86  # To prevent this deadlock we do the following on shutdown:
87  #
88  # 1. Set the _SHUTDOWN_RUN_CALLBACK_THREADSAFE attr on this function
89  # by calling `shutdown_run_callback_threadsafe`
90  # 2. Call `hass.async_block_till_done` at least once after shutdown
91  # to ensure all callbacks have run
92  # 3. Raise an exception here to ensure `future.result()` can never be
93  # called and hit the deadlock since once `shutdown_run_callback_threadsafe`
94  # we cannot promise the callback will be executed.
95  #
96  raise RuntimeError("The event loop is in the process of shutting down.")
97 
98  return future
99 
100 
102  limit: int, *tasks: Any, return_exceptions: bool = False
103 ) -> Any:
104  """Wrap asyncio.gather to limit the number of concurrent tasks.
105 
106  From: https://stackoverflow.com/a/61478547/9127614
107  """
108  semaphore = Semaphore(limit)
109 
110  async def sem_task(task: Awaitable[Any]) -> Any:
111  async with semaphore:
112  return await task
113 
114  return await gather(
115  *(create_eager_task(sem_task(task)) for task in tasks),
116  return_exceptions=return_exceptions,
117  )
118 
119 
120 def shutdown_run_callback_threadsafe(loop: AbstractEventLoop) -> None:
121  """Call when run_callback_threadsafe should prevent creating new futures.
122 
123  We must finish all callbacks before the executor is shutdown
124  or we can end up in a deadlock state where:
125 
126  `executor.result()` is waiting for its `._condition`
127  and the executor shutdown is trying to `.join()` the
128  executor thread.
129 
130  This function is considered irreversible and should only ever
131  be called when Home Assistant is going to shutdown and
132  python is going to exit.
133  """
134  setattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE, True)
135 
136 
137 def get_scheduled_timer_handles(loop: AbstractEventLoop) -> list[TimerHandle]:
138  """Return a list of scheduled TimerHandles."""
139  handles: list[TimerHandle] = loop._scheduled # type: ignore[attr-defined] # noqa: SLF001
140  return handles
list[TimerHandle] get_scheduled_timer_handles(AbstractEventLoop loop)
Definition: async_.py:137
bool cancelling(Future[Any] task)
Definition: async_.py:48
Any gather_with_limited_concurrency(int limit, *Any tasks, bool return_exceptions=False)
Definition: async_.py:103
None shutdown_run_callback_threadsafe(AbstractEventLoop loop)
Definition: async_.py:120