1 """A pool for sqlite connections."""
3 from __future__
import annotations
11 from sqlalchemy.exc
import SQLAlchemyError
12 from sqlalchemy.pool
import (
22 _LOGGER = logging.getLogger(__name__)
25 DEBUG_MUTEX_POOL =
True
26 DEBUG_MUTEX_POOL_TRACE =
False
31 "Use homeassistant.components.recorder.get_instance(hass).async_add_executor_job()"
36 """A hybrid of NullPool and SingletonThreadPool.
38 When called from the creating thread or db executor acts like SingletonThreadPool
39 When called from any other thread, acts like NullPool
45 recorder_and_worker_thread_ids: set[int] |
None =
None,
48 """Create the pool."""
49 kw[
"pool_size"] = POOL_SIZE
51 recorder_and_worker_thread_ids
is not None
52 ),
"recorder_and_worker_thread_ids is required"
54 SingletonThreadPool.__init__(self, creator, **kw)
57 """Recreate the pool."""
58 self.logger.info(
"Pool recreating")
59 return self.__class__(
62 recycle=self._recycle,
64 pre_ping=self._pre_ping,
65 logging_name=self._orig_logging_name,
66 reset_on_return=self._reset_on_return,
67 _dispatch=self.dispatch,
68 dialect=self._dialect,
79 """Close the connection."""
83 and hasattr(self._conn,
"current")
84 and (conn := self._conn.current())
89 """Dispose of the connection."""
97 asyncio.get_running_loop()
106 advise_msg=ADVISE_MSG,
113 "accesses the database without the database executor; "
115 "for faster database operations"
117 exclude_integrations={
"recorder"},
118 core_behavior=ReportBehavior.LOG,
120 return NullPool._create_connection(self)
124 """A pool which prevents concurrent accesses from multiple threads.
126 This is used in tests to prevent unsafe concurrent accesses to in-memory SQLite
130 _reference_counter = 0
131 pool_lock: threading.RLock
134 if DEBUG_MUTEX_POOL_TRACE:
135 trace = traceback.extract_stack()
136 trace_msg =
"\n" +
"".join(traceback.format_list(trace[:-1]))
144 "%s return conn ctr: %s%s",
145 threading.current_thread().name,
149 MutexPool.pool_lock.release()
152 if DEBUG_MUTEX_POOL_TRACE:
153 trace = traceback.extract_stack()
154 trace_msg =
"".join(traceback.format_list(trace[:-1]))
159 _LOGGER.debug(
"%s wait conn%s", threading.current_thread().name, trace_msg)
161 got_lock = MutexPool.pool_lock.acquire(timeout=10)
163 raise SQLAlchemyError
168 "%s get conn: ctr: %s",
169 threading.current_thread().name,
None _do_return_conn(self, ConnectionPoolEntry record)
ConnectionPoolEntry _do_get(self)
None _do_return_conn(self, ConnectionPoolEntry record)
ConnectionPoolEntry _do_get_db_connection_protected(self)
ConnectionPoolEntry _do_get(self)
None __init__(self, Any creator, set[int]|None recorder_and_worker_thread_ids=None, **Any kw)
recorder_and_worker_thread_ids
RecorderPool recreate(self)
None report_usage(str what, *str|None breaks_in_ha_version=None, ReportBehavior core_behavior=ReportBehavior.ERROR, ReportBehavior core_integration_behavior=ReportBehavior.LOG, ReportBehavior custom_integration_behavior=ReportBehavior.LOG, set[str]|None exclude_integrations=None, str|None integration_domain=None, int level=logging.WARNING)
None raise_for_blocking_call(Callable[..., Any] func, Callable[[dict[str, Any]], bool]|None check_allowed=None, bool strict=True, bool strict_core=True, **Any mapped_args)