1 """Executor util helpers."""
3 from __future__
import annotations
5 from concurrent.futures
import ThreadPoolExecutor
9 from threading
import Thread
12 from typing
import Any
14 from .thread
import async_raise
16 _LOGGER = logging.getLogger(__name__)
22 EXECUTOR_SHUTDOWN_TIMEOUT = 10
26 """Log the stack of a thread that was still running at shutdown."""
27 frames = sys._current_frames()
28 stack = frames.get(ident)
29 formatted_stack = traceback.format_stack(stack)
31 "Thread[%s] is still running at shutdown: %s",
33 "".join(formatted_stack).strip(),
38 threads: set[Thread], timeout: float, log: bool
40 """Attempt to join or interrupt a set of threads."""
42 timeout_per_thread = timeout / len(threads)
44 for thread
in threads:
45 thread.join(timeout=timeout_per_thread)
47 if not thread.is_alive()
or thread.ident
is None:
54 with contextlib.suppress(SystemError):
64 """A ThreadPoolExecutor instance that will not deadlock on shutdown."""
67 self, *args: Any, join_threads_or_timeout: bool =
True, **kwargs: Any
69 """Shutdown with interrupt support added.
71 By default shutdown will wait for threads to finish up
72 to the timeout before forcefully stopping them. This can
73 be disabled by setting `join_threads_or_timeout` to False.
75 super().
shutdown(wait=
False, cancel_futures=
True)
76 if join_threads_or_timeout:
80 """Join threads or timeout."""
81 remaining_threads = set(self._threads)
82 start_time = time.monotonic()
83 timeout_remaining: float = EXECUTOR_SHUTDOWN_TIMEOUT
87 if not remaining_threads:
94 timeout_remaining / _JOIN_ATTEMPTS,
95 attempt <= MAX_LOG_ATTEMPTS,
98 timeout_remaining = EXECUTOR_SHUTDOWN_TIMEOUT - (
99 time.monotonic() - start_time
101 if timeout_remaining <= 0:
None join_threads_or_timeout(self)
None shutdown(self, *Any args, bool join_threads_or_timeout=True, **Any kwargs)
set[Thread] join_or_interrupt_threads(set[Thread] threads, float timeout, bool log)
None _log_thread_running_at_shutdown(str name, int ident)
None async_raise(int tid, Any exctype)