1 """Database executor helpers."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from concurrent.futures.thread
import _threads_queues, _worker
15 shutdown_hook: Callable[[],
None],
16 recorder_and_worker_thread_ids: set[int],
20 """Create a worker that calls a function after its finished."""
21 recorder_and_worker_thread_ids.add(threading.get_ident())
22 _worker(*args, **kwargs)
27 """A database instance that will not deadlock on shutdown."""
30 self, recorder_and_worker_thread_ids: set[int], *args: Any, **kwargs: Any
32 """Init the executor with a shutdown hook support."""
33 self._shutdown_hook: Callable[[],
None] = kwargs.pop(
"shutdown_hook")
38 """Overridden to add support for shutdown hook.
40 Based on the CPython 3.10 implementation.
43 if self._idle_semaphore.acquire(
56 num_threads = len(self._threads)
57 if num_threads < self._max_workers:
58 thread_name = f
"{self._thread_name_prefix or self}_{num_threads}"
59 executor_thread = threading.Thread(
61 target=_worker_with_shutdown_hook,
65 weakref.ref(self, weakref_cb),
71 executor_thread.start()
72 self._threads.
add(executor_thread)
73 _threads_queues[executor_thread] = self._work_queue
recorder_and_worker_thread_ids
None __init__(self, set[int] recorder_and_worker_thread_ids, *Any args, **Any kwargs)
None _adjust_thread_count(self)
bool add(self, _T matcher)
None _worker_with_shutdown_hook(Callable[[], None] shutdown_hook, set[int] recorder_and_worker_thread_ids, *Any args, **Any kwargs)