Home Assistant Unofficial Reference 2024.12.1
executor.py
Go to the documentation of this file.
1 """Database executor helpers."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from concurrent.futures.thread import _threads_queues, _worker
7 import threading
8 from typing import Any
9 import weakref
10 
11 from homeassistant.util.executor import InterruptibleThreadPoolExecutor
12 
13 
15  shutdown_hook: Callable[[], None],
16  recorder_and_worker_thread_ids: set[int],
17  *args: Any,
18  **kwargs: Any,
19 ) -> None:
20  """Create a worker that calls a function after its finished."""
21  recorder_and_worker_thread_ids.add(threading.get_ident())
22  _worker(*args, **kwargs)
23  shutdown_hook()
24 
25 
27  """A database instance that will not deadlock on shutdown."""
28 
29  def __init__(
30  self, recorder_and_worker_thread_ids: set[int], *args: Any, **kwargs: Any
31  ) -> None:
32  """Init the executor with a shutdown hook support."""
33  self._shutdown_hook: Callable[[], None] = kwargs.pop("shutdown_hook")
34  self.recorder_and_worker_thread_idsrecorder_and_worker_thread_ids = recorder_and_worker_thread_ids
35  super().__init__(*args, **kwargs)
36 
37  def _adjust_thread_count(self) -> None:
38  """Overridden to add support for shutdown hook.
39 
40  Based on the CPython 3.10 implementation.
41  """
42  # if idle threads are available, don't spin new threads
43  if self._idle_semaphore.acquire( # pylint: disable=consider-using-with
44  timeout=0
45  ):
46  return
47 
48  # When the executor gets lost, the weakref callback will wake up
49  # the worker threads.
50  def weakref_cb( # type: ignore[no-untyped-def]
51  _: Any,
52  q=self._work_queue,
53  ) -> None:
54  q.put(None)
55 
56  num_threads = len(self._threads)
57  if num_threads < self._max_workers:
58  thread_name = f"{self._thread_name_prefix or self}_{num_threads}"
59  executor_thread = threading.Thread(
60  name=thread_name,
61  target=_worker_with_shutdown_hook,
62  args=(
63  self._shutdown_hook,
64  self.recorder_and_worker_thread_idsrecorder_and_worker_thread_ids,
65  weakref.ref(self, weakref_cb),
66  self._work_queue,
67  self._initializer,
68  self._initargs,
69  ),
70  )
71  executor_thread.start()
72  self._threads.add(executor_thread) # type: ignore[attr-defined]
73  _threads_queues[executor_thread] = self._work_queue # type: ignore[index]
None __init__(self, set[int] recorder_and_worker_thread_ids, *Any args, **Any kwargs)
Definition: executor.py:31
bool add(self, _T matcher)
Definition: match.py:185
None _worker_with_shutdown_hook(Callable[[], None] shutdown_hook, set[int] recorder_and_worker_thread_ids, *Any args, **Any kwargs)
Definition: executor.py:19