Home Assistant Unofficial Reference 2024.12.1
executor.py
Go to the documentation of this file.
1 """Executor util helpers."""
2 
3 from __future__ import annotations
4 
5 from concurrent.futures import ThreadPoolExecutor
6 import contextlib
7 import logging
8 import sys
9 from threading import Thread
10 import time
11 import traceback
12 from typing import Any
13 
14 from .thread import async_raise
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 MAX_LOG_ATTEMPTS = 2
19 
20 _JOIN_ATTEMPTS = 10
21 
22 EXECUTOR_SHUTDOWN_TIMEOUT = 10
23 
24 
25 def _log_thread_running_at_shutdown(name: str, ident: int) -> None:
26  """Log the stack of a thread that was still running at shutdown."""
27  frames = sys._current_frames() # noqa: SLF001
28  stack = frames.get(ident)
29  formatted_stack = traceback.format_stack(stack)
30  _LOGGER.warning(
31  "Thread[%s] is still running at shutdown: %s",
32  name,
33  "".join(formatted_stack).strip(),
34  )
35 
36 
38  threads: set[Thread], timeout: float, log: bool
39 ) -> set[Thread]:
40  """Attempt to join or interrupt a set of threads."""
41  joined = set()
42  timeout_per_thread = timeout / len(threads)
43 
44  for thread in threads:
45  thread.join(timeout=timeout_per_thread)
46 
47  if not thread.is_alive() or thread.ident is None:
48  joined.add(thread)
49  continue
50 
51  if log:
52  _log_thread_running_at_shutdown(thread.name, thread.ident)
53 
54  with contextlib.suppress(SystemError):
55  # SystemError at this stage is usually a race condition
56  # where the thread happens to die right before we force
57  # it to raise the exception
58  async_raise(thread.ident, SystemExit)
59 
60  return joined
61 
62 
63 class InterruptibleThreadPoolExecutor(ThreadPoolExecutor):
64  """A ThreadPoolExecutor instance that will not deadlock on shutdown."""
65 
66  def shutdown(
67  self, *args: Any, join_threads_or_timeout: bool = True, **kwargs: Any
68  ) -> None:
69  """Shutdown with interrupt support added.
70 
71  By default shutdown will wait for threads to finish up
72  to the timeout before forcefully stopping them. This can
73  be disabled by setting `join_threads_or_timeout` to False.
74  """
75  super().shutdown(wait=False, cancel_futures=True)
76  if join_threads_or_timeout:
77  self.join_threads_or_timeoutjoin_threads_or_timeout()
78 
79  def join_threads_or_timeout(self) -> None:
80  """Join threads or timeout."""
81  remaining_threads = set(self._threads)
82  start_time = time.monotonic()
83  timeout_remaining: float = EXECUTOR_SHUTDOWN_TIMEOUT
84  attempt = 0
85 
86  while True:
87  if not remaining_threads:
88  return
89 
90  attempt += 1
91 
92  remaining_threads -= join_or_interrupt_threads(
93  remaining_threads,
94  timeout_remaining / _JOIN_ATTEMPTS,
95  attempt <= MAX_LOG_ATTEMPTS,
96  )
97 
98  timeout_remaining = EXECUTOR_SHUTDOWN_TIMEOUT - (
99  time.monotonic() - start_time
100  )
101  if timeout_remaining <= 0:
102  return
None shutdown(self, *Any args, bool join_threads_or_timeout=True, **Any kwargs)
Definition: executor.py:68
set[Thread] join_or_interrupt_threads(set[Thread] threads, float timeout, bool log)
Definition: executor.py:39
None _log_thread_running_at_shutdown(str name, int ident)
Definition: executor.py:25
None async_raise(int tid, Any exctype)
Definition: thread.py:38