1 """Asyncio utilities."""
3 from __future__
import annotations
14 from collections.abc
import Awaitable, Callable, Coroutine
15 import concurrent.futures
18 from typing
import Any
20 _LOGGER = logging.getLogger(__name__)
22 _SHUTDOWN_RUN_CALLBACK_THREADSAFE =
"_shutdown_run_callback_threadsafe"
25 def create_eager_task[_T](
26 coro: Coroutine[Any, Any, _T],
28 name: str |
None =
None,
29 loop: AbstractEventLoop |
None =
None,
31 """Create a task from a coroutine and schedule it to run immediately."""
34 loop = get_running_loop()
42 frame.report_usage(
"attempted to create an asyncio task from a thread")
45 return Task(coro, loop=loop, name=name, eager_start=
True)
49 """Return True if task is cancelling."""
50 return bool((cancelling_ := getattr(task,
"cancelling",
None))
and cancelling_())
53 def run_callback_threadsafe[_T, *_Ts](
54 loop: AbstractEventLoop, callback: Callable[[*_Ts], _T], *args: *_Ts
55 ) -> concurrent.futures.Future[_T]:
56 """Submit a callback object to a given event loop.
58 Return a concurrent.futures.Future to access the result.
60 if (ident := loop.__dict__.get(
"_thread_id"))
and ident == threading.get_ident():
61 raise RuntimeError(
"Cannot be called from within the event loop")
63 future: concurrent.futures.Future[_T] = concurrent.futures.Future()
65 def run_callback() -> None:
66 """Run callback and store result."""
68 future.set_result(callback(*args))
69 except Exception
as exc:
70 if future.set_running_or_notify_cancel():
71 future.set_exception(exc)
73 _LOGGER.warning(
"Exception on lost future: ", exc_info=
True)
75 loop.call_soon_threadsafe(run_callback)
77 if hasattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE):
96 raise RuntimeError(
"The event loop is in the process of shutting down.")
102 limit: int, *tasks: Any, return_exceptions: bool =
False
104 """Wrap asyncio.gather to limit the number of concurrent tasks.
106 From: https://stackoverflow.com/a/61478547/9127614
108 semaphore = Semaphore(limit)
110 async
def sem_task(task: Awaitable[Any]) -> Any:
111 async
with semaphore:
115 *(create_eager_task(sem_task(task))
for task
in tasks),
116 return_exceptions=return_exceptions,
121 """Call when run_callback_threadsafe should prevent creating new futures.
123 We must finish all callbacks before the executor is shutdown
124 or we can end up in a deadlock state where:
126 `executor.result()` is waiting for its `._condition`
127 and the executor shutdown is trying to `.join()` the
130 This function is considered irreversible and should only ever
131 be called when Home Assistant is going to shutdown and
132 python is going to exit.
134 setattr(loop, _SHUTDOWN_RUN_CALLBACK_THREADSAFE,
True)
138 """Return a list of scheduled TimerHandles."""
139 handles: list[TimerHandle] = loop._scheduled
list[TimerHandle] get_scheduled_timer_handles(AbstractEventLoop loop)
bool cancelling(Future[Any] task)
Any gather_with_limited_concurrency(int limit, *Any tasks, bool return_exceptions=False)
None shutdown_run_callback_threadsafe(AbstractEventLoop loop)