Home Assistant Unofficial Reference 2024.12.1
core.py
Go to the documentation of this file.
1 """Support for recording details."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Iterable
7 from concurrent.futures import CancelledError
8 import contextlib
9 from datetime import datetime, timedelta
10 import logging
11 import queue
12 import sqlite3
13 import threading
14 import time
15 from typing import TYPE_CHECKING, Any, cast
16 
17 from propcache import cached_property
18 import psutil_home_assistant as ha_psutil
19 from sqlalchemy import create_engine, event as sqlalchemy_event, exc, select, update
20 from sqlalchemy.engine import Engine
21 from sqlalchemy.engine.interfaces import DBAPIConnection
22 from sqlalchemy.exc import SQLAlchemyError
23 from sqlalchemy.orm import scoped_session, sessionmaker
24 from sqlalchemy.orm.session import Session
25 
26 from homeassistant.components import persistent_notification
27 from homeassistant.const import (
28  ATTR_ENTITY_ID,
29  EVENT_HOMEASSISTANT_CLOSE,
30  EVENT_HOMEASSISTANT_FINAL_WRITE,
31  EVENT_STATE_CHANGED,
32  MATCH_ALL,
33 )
34 from homeassistant.core import (
35  CALLBACK_TYPE,
36  Event,
37  EventStateChangedData,
38  HomeAssistant,
39  callback,
40 )
41 from homeassistant.helpers.event import (
42  async_track_time_change,
43  async_track_time_interval,
44  async_track_utc_time_change,
45 )
46 from homeassistant.helpers.start import async_at_started
47 from homeassistant.helpers.typing import UNDEFINED, UndefinedType
48 import homeassistant.util.dt as dt_util
49 from homeassistant.util.enum import try_parse_enum
50 from homeassistant.util.event_type import EventType
51 
52 from . import migration, statistics
53 from .const import (
54  DB_WORKER_PREFIX,
55  DOMAIN,
56  KEEPALIVE_TIME,
57  LAST_REPORTED_SCHEMA_VERSION,
58  MARIADB_PYMYSQL_URL_PREFIX,
59  MARIADB_URL_PREFIX,
60  MAX_QUEUE_BACKLOG_MIN_VALUE,
61  MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG,
62  MYSQLDB_PYMYSQL_URL_PREFIX,
63  MYSQLDB_URL_PREFIX,
64  SQLITE_MAX_BIND_VARS,
65  SQLITE_URL_PREFIX,
66  SupportedDialect,
67 )
68 from .db_schema import (
69  SCHEMA_VERSION,
70  Base,
71  EventData,
72  Events,
73  EventTypes,
74  StateAttributes,
75  States,
76  StatesMeta,
77  Statistics,
78  StatisticsShortTerm,
79 )
80 from .executor import DBInterruptibleThreadPoolExecutor
81 from .models import DatabaseEngine, StatisticData, StatisticMetaData, UnsupportedDialect
82 from .pool import POOL_SIZE, MutexPool, RecorderPool
83 from .table_managers.event_data import EventDataManager
84 from .table_managers.event_types import EventTypeManager
85 from .table_managers.recorder_runs import RecorderRunsManager
86 from .table_managers.state_attributes import StateAttributesManager
87 from .table_managers.states import StatesManager
88 from .table_managers.states_meta import StatesMetaManager
89 from .table_managers.statistics_meta import StatisticsMetaManager
90 from .tasks import (
91  AdjustLRUSizeTask,
92  AdjustStatisticsTask,
93  ChangeStatisticsUnitTask,
94  ClearStatisticsTask,
95  CommitTask,
96  CompileMissingStatisticsTask,
97  DatabaseLockTask,
98  ImportStatisticsTask,
99  KeepAliveTask,
100  PerodicCleanupTask,
101  PurgeTask,
102  RecorderTask,
103  StatisticsTask,
104  StopTask,
105  SynchronizeTask,
106  UpdateStatesMetadataTask,
107  UpdateStatisticsMetadataTask,
108  WaitTask,
109 )
110 from .util import (
111  async_create_backup_failure_issue,
112  build_mysqldb_conv,
113  dburl_to_path,
114  end_incomplete_runs,
115  is_second_sunday,
116  move_away_broken_database,
117  session_scope,
118  setup_connection_for_dialect,
119  validate_or_move_away_sqlite_database,
120  write_lock_db_sqlite,
121 )
122 
123 _LOGGER = logging.getLogger(__name__)
124 
125 DEFAULT_URL = "sqlite:///{hass_config_path}"
126 
127 # Controls how often we clean up
128 # States and Events objects
129 EXPIRE_AFTER_COMMITS = 120
130 
131 SHUTDOWN_TASK = object()
132 
133 COMMIT_TASK = CommitTask()
134 KEEP_ALIVE_TASK = KeepAliveTask()
135 WAIT_TASK = WaitTask()
136 ADJUST_LRU_SIZE_TASK = AdjustLRUSizeTask()
137 
138 DB_LOCK_TIMEOUT = 30
139 DB_LOCK_QUEUE_CHECK_TIMEOUT = 10 # check every 10 seconds
140 
141 QUEUE_CHECK_INTERVAL = timedelta(minutes=5)
142 
143 INVALIDATED_ERR = "Database connection invalidated"
144 CONNECTIVITY_ERR = "Error in database connectivity during commit"
145 
146 # Pool size must accommodate Recorder thread + All db executors
147 MAX_DB_EXECUTOR_WORKERS = POOL_SIZE - 1
148 
149 
150 class Recorder(threading.Thread):
151  """A threaded recorder class."""
152 
153  stop_requested: bool
154 
155  def __init__(
156  self,
157  hass: HomeAssistant,
158  auto_purge: bool,
159  auto_repack: bool,
160  keep_days: int,
161  commit_interval: int,
162  uri: str,
163  db_max_retries: int,
164  db_retry_wait: int,
165  entity_filter: Callable[[str], bool] | None,
166  exclude_event_types: set[EventType[Any] | str],
167  ) -> None:
168  """Initialize the recorder."""
169  threading.Thread.__init__(self, name="Recorder")
170 
171  self.hasshass = hass
172  self.thread_idthread_id: int | None = None
173  self.recorder_and_worker_thread_ids: set[int] = set()
174  self.auto_purgeauto_purge = auto_purge
175  self.auto_repackauto_repack = auto_repack
176  self.keep_dayskeep_days = keep_days
177  self.is_runningis_running: bool = False
178  self._hass_started: asyncio.Future[object] = hass.loop.create_future()
179  self.commit_intervalcommit_interval = commit_interval
180  self._queue: queue.SimpleQueue[RecorderTask | Event] = queue.SimpleQueue()
181  self.db_urldb_url = uri
182  self.db_max_retriesdb_max_retries = db_max_retries
183  self.db_retry_waitdb_retry_wait = db_retry_wait
184  self.database_enginedatabase_engine: DatabaseEngine | None = None
185  # Database connection is ready, but non-live migration may be in progress
186  db_connected: asyncio.Future[bool] = hass.data[DOMAIN].db_connected
187  self.async_db_connected: asyncio.Future[bool] = db_connected
188  # Database is ready to use but live migration may be in progress
189  self.async_db_ready: asyncio.Future[bool] = hass.loop.create_future()
190  # Database is ready to use and all migration steps completed (used by tests)
191  self.async_recorder_readyasync_recorder_ready = asyncio.Event()
192  self._queue_watch_queue_watch = threading.Event()
193  self.engineengine: Engine | None = None
194  self.max_backlog: int = MAX_QUEUE_BACKLOG_MIN_VALUE
195  self._psutil_psutil: ha_psutil.PsutilWrapper | None = None
196 
197  # The entity_filter is exposed on the recorder instance so that
198  # it can be used to see if an entity is being recorded and is called
199  # by is_entity_recorder and the sensor recorder.
200  self.entity_filterentity_filter = entity_filter
201  self.exclude_event_typesexclude_event_types = exclude_event_types
202 
203  self.schema_versionschema_version = 0
204  self._commits_without_expire_commits_without_expire = 0
205  self._event_session_has_pending_writes_event_session_has_pending_writes = False
206 
207  self.recorder_runs_managerrecorder_runs_manager = RecorderRunsManager()
208  self.states_managerstates_manager = StatesManager()
209  self.event_data_managerevent_data_manager = EventDataManager(self)
210  self.event_type_managerevent_type_manager = EventTypeManager(self)
211  self.states_meta_managerstates_meta_manager = StatesMetaManager(self)
212  self.state_attributes_managerstate_attributes_manager = StateAttributesManager(self)
213  self.statistics_meta_managerstatistics_meta_manager = StatisticsMetaManager(self)
214 
215  self.event_sessionevent_session: Session | None = None
216  self._get_session_get_session: Callable[[], Session] | None = None
217  self._completed_first_database_setup_completed_first_database_setup: bool | None = None
218  self.migration_in_progressmigration_in_progress = False
219  self.migration_is_livemigration_is_live = False
220  self.use_legacy_events_indexuse_legacy_events_index = False
221  self._database_lock_task_database_lock_task: DatabaseLockTask | None = None
222  self._db_executor_db_executor: DBInterruptibleThreadPoolExecutor | None = None
223 
224  self._event_listener_event_listener: CALLBACK_TYPE | None = None
225  self._queue_watcher_queue_watcher: CALLBACK_TYPE | None = None
226  self._keep_alive_listener_keep_alive_listener: CALLBACK_TYPE | None = None
227  self._commit_listener_commit_listener: CALLBACK_TYPE | None = None
228  self._periodic_listener_periodic_listener: CALLBACK_TYPE | None = None
229  self._nightly_listener_nightly_listener: CALLBACK_TYPE | None = None
230  self._dialect_name_dialect_name: SupportedDialect | None = None
231  self.enabledenabled = True
232 
233  # For safety we default to the lowest value for max_bind_vars
234  # of all the DB types (SQLITE_MAX_BIND_VARS).
235  #
236  # We update the value once we connect to the DB
237  # and determine what is actually supported.
238  self.max_bind_varsmax_bind_vars = SQLITE_MAX_BIND_VARS
239 
240  @property
241  def backlog(self) -> int:
242  """Return the number of items in the recorder backlog."""
243  return self._queue.qsize()
244 
245  @cached_property
246  def dialect_name(self) -> SupportedDialect | None:
247  """Return the dialect the recorder uses."""
248  return self._dialect_name_dialect_name
249 
250  @property
251  def _using_file_sqlite(self) -> bool:
252  """Short version to check if we are using sqlite3 as a file."""
253  return self.db_urldb_url != SQLITE_URL_PREFIX and self.db_urldb_url.startswith(
254  SQLITE_URL_PREFIX
255  )
256 
257  @property
258  def recording(self) -> bool:
259  """Return if the recorder is recording."""
260  return self._event_listener_event_listener is not None
261 
262  def get_session(self) -> Session:
263  """Get a new sqlalchemy session."""
264  if self._get_session_get_session is None:
265  raise RuntimeError("The database connection has not been established")
266  return self._get_session_get_session()
267 
268  def queue_task(self, task: RecorderTask | Event) -> None:
269  """Add a task to the recorder queue."""
270  self._queue.put(task)
271 
272  def set_enable(self, enable: bool) -> None:
273  """Enable or disable recording events and states."""
274  self.enabledenabled = enable
275 
276  @callback
277  def async_start_executor(self) -> None:
278  """Start the executor."""
280  self.recorder_and_worker_thread_ids,
281  thread_name_prefix=DB_WORKER_PREFIX,
282  max_workers=MAX_DB_EXECUTOR_WORKERS,
283  shutdown_hook=self._shutdown_pool_shutdown_pool,
284  )
285 
286  def _shutdown_pool(self) -> None:
287  """Close the dbpool connections in the current thread."""
288  if self.engineengine and hasattr(self.engineengine.pool, "shutdown"):
289  self.engineengine.pool.shutdown()
290 
291  @callback
292  def async_initialize(self) -> None:
293  """Initialize the recorder."""
294  entity_filter = self.entity_filterentity_filter
295  exclude_event_types = self.exclude_event_typesexclude_event_types
296  queue_put = self._queue.put_nowait
297 
298  @callback
299  def _event_listener(event: Event) -> None:
300  """Listen for new events and put them in the process queue."""
301  if event.event_type in exclude_event_types:
302  return
303 
304  if entity_filter is None or not (
305  entity_id := event.data.get(ATTR_ENTITY_ID)
306  ):
307  queue_put(event)
308  return
309 
310  if isinstance(entity_id, str):
311  if entity_filter(entity_id):
312  queue_put(event)
313  return
314 
315  if isinstance(entity_id, list):
316  for eid in entity_id:
317  if entity_filter(eid):
318  queue_put(event)
319  return
320  return
321 
322  # Unknown what it is.
323  queue_put(event)
324 
325  self._event_listener_event_listener = self.hasshass.bus.async_listen(
326  MATCH_ALL,
327  _event_listener,
328  )
330  self.hasshass,
331  self._async_check_queue_async_check_queue,
332  QUEUE_CHECK_INTERVAL,
333  name="Recorder queue watcher",
334  )
335 
336  @callback
337  def _async_keep_alive(self, now: datetime) -> None:
338  """Queue a keep alive."""
339  if self._event_listener_event_listener:
340  self.queue_taskqueue_task(KEEP_ALIVE_TASK)
341 
342  @callback
343  def _async_commit(self, now: datetime) -> None:
344  """Queue a commit."""
345  if (
346  self._event_listener_event_listener
347  and not self._database_lock_task_database_lock_task
348  and self._event_session_has_pending_writes_event_session_has_pending_writes
349  ):
350  self.queue_taskqueue_task(COMMIT_TASK)
351 
352  @callback
353  def async_add_executor_job[_T](
354  self, target: Callable[..., _T], *args: Any
355  ) -> asyncio.Future[_T]:
356  """Add an executor job from within the event loop."""
357  return self.hasshass.loop.run_in_executor(self._db_executor_db_executor, target, *args)
358 
359  @callback
360  def _async_check_queue(self, *_: Any) -> None:
361  """Periodic check of the queue size to ensure we do not exhaust memory.
362 
363  The queue grows during migration or if something really goes wrong.
364  """
365  _LOGGER.debug("Recorder queue size is: %s", self.backlogbacklog)
366  if not self._reached_max_backlog_reached_max_backlog():
367  return
368  _LOGGER.error(
369  (
370  "The recorder backlog queue reached the maximum size of %s events; "
371  "usually, the system is CPU bound, I/O bound, or the database "
372  "is corrupt due to a disk problem; The recorder will stop "
373  "recording events to avoid running out of memory"
374  ),
375  self.backlogbacklog,
376  )
377  self._async_stop_queue_watcher_and_event_listener_async_stop_queue_watcher_and_event_listener()
378 
379  def _available_memory(self) -> int:
380  """Return the available memory in bytes."""
381  if not self._psutil_psutil:
382  self._psutil_psutil = ha_psutil.PsutilWrapper()
383  return cast(int, self._psutil_psutil.psutil.virtual_memory().available)
384 
385  def _reached_max_backlog(self) -> bool:
386  """Check if the system has reached the max queue backlog and return True if it has."""
387  # First check the minimum value since its cheap
388  if self.backlogbacklog < MAX_QUEUE_BACKLOG_MIN_VALUE:
389  return False
390  # If they have more RAM available, keep filling the backlog
391  # since we do not want to stop recording events or give the
392  # user a bad backup when they have plenty of RAM available.
393  return self._available_memory_available_memory() < MIN_AVAILABLE_MEMORY_FOR_QUEUE_BACKLOG
394 
395  @callback
397  """Stop watching the queue and listening for events."""
398  if self._queue_watcher_queue_watcher:
399  self._queue_watcher_queue_watcher()
400  self._queue_watcher_queue_watcher = None
401  if self._event_listener_event_listener:
402  self._event_listener_event_listener()
403  self._event_listener_event_listener = None
404 
405  @callback
406  def _async_stop_listeners(self) -> None:
407  """Stop listeners."""
408  self._async_stop_queue_watcher_and_event_listener_async_stop_queue_watcher_and_event_listener()
409  if self._keep_alive_listener_keep_alive_listener:
410  self._keep_alive_listener_keep_alive_listener()
411  self._keep_alive_listener_keep_alive_listener = None
412  if self._commit_listener_commit_listener:
413  self._commit_listener_commit_listener()
414  self._commit_listener_commit_listener = None
415  if self._nightly_listener_nightly_listener:
416  self._nightly_listener_nightly_listener()
417  self._nightly_listener_nightly_listener = None
418  if self._periodic_listener_periodic_listener:
419  self._periodic_listener_periodic_listener()
420  self._periodic_listener_periodic_listener = None
421 
422  async def _async_close(self, event: Event) -> None:
423  """Empty the queue if its still present at close."""
424 
425  # If the queue is full of events to be processed because
426  # the database is so broken that every event results in a retry
427  # we will never be able to get though the events to shutdown in time.
428  #
429  # We drain all the events in the queue and then insert
430  # an empty one to ensure the next thing the recorder sees
431  # is a request to shutdown.
432  while True:
433  try:
434  self._queue.get_nowait()
435  except queue.Empty:
436  break
437  self.queue_taskqueue_task(StopTask())
438  await self.hasshass.async_add_executor_job(self.join)
439 
440  async def _async_shutdown(self, event: Event) -> None:
441  """Shut down the Recorder at final write."""
442  if not self._hass_started.done():
443  self._hass_started.set_result(SHUTDOWN_TASK)
444  self.queue_taskqueue_task(StopTask())
445  self._async_stop_listeners_async_stop_listeners()
446  await self.hasshass.async_add_executor_job(self.join)
447 
448  @callback
449  def _async_hass_started(self, hass: HomeAssistant) -> None:
450  """Notify that hass has started."""
451  self._hass_started.set_result(None)
452 
453  @callback
454  def async_register(self) -> None:
455  """Post connection initialize."""
456  bus = self.hasshass.bus
457  bus.async_listen_once(EVENT_HOMEASSISTANT_CLOSE, self._async_close_async_close)
458  bus.async_listen_once(EVENT_HOMEASSISTANT_FINAL_WRITE, self._async_shutdown_async_shutdown)
459  async_at_started(self.hasshass, self._async_hass_started_async_hass_started)
460 
461  @callback
462  def _async_startup_done(self, startup_failed: bool) -> None:
463  """Report startup failure."""
464  # If a live migration failed, we were able to connect (async_db_connected
465  # marked True), the database was marked ready (async_db_ready marked
466  # True), the data in the queue cannot be written to the database because
467  # the schema not in the correct format so we must stop listeners and report
468  # failure.
469  if not self.async_db_connected.done():
470  self.async_db_connected.set_result(False)
471  if not self.async_db_ready.done():
472  self.async_db_ready.set_result(False)
473  if startup_failed:
474  persistent_notification.async_create(
475  self.hasshass,
476  "The recorder could not start, check [the logs](/config/logs)",
477  "Recorder",
478  )
479  self._async_stop_listeners_async_stop_listeners()
480 
481  @callback
482  def async_connection_success(self) -> None:
483  """Connect to the database succeeded, schema version and migration need known.
484 
485  The database may not yet be ready for use in case of a non-live migration.
486  """
487  self.async_db_connected.set_result(True)
488 
489  @callback
490  def async_set_db_ready(self) -> None:
491  """Database live and ready for use.
492 
493  Called after non-live migration steps are finished.
494  """
495  if self.async_db_ready.done():
496  return
497  self.async_db_ready.set_result(True)
498  self.async_start_executorasync_start_executor()
499 
500  @callback
502  """Finish start and mark recorder ready.
503 
504  Called after all migration steps are finished.
505  """
506  self._async_setup_periodic_tasks_async_setup_periodic_tasks()
507  self.async_recorder_readyasync_recorder_ready.set()
508 
509  @callback
510  def async_nightly_tasks(self, now: datetime) -> None:
511  """Trigger the purge."""
512  if self.auto_purgeauto_purge:
513  # Purge will schedule the periodic cleanups
514  # after it completes to ensure it does not happen
515  # until after the database is vacuumed
516  repack = self.auto_repackauto_repack and is_second_sunday(now)
517  purge_before = dt_util.utcnow() - timedelta(days=self.keep_dayskeep_days)
518  self.queue_taskqueue_task(PurgeTask(purge_before, repack=repack, apply_filter=False))
519  else:
520  self.queue_taskqueue_task(PerodicCleanupTask())
521 
522  @callback
523  def _async_five_minute_tasks(self, now: datetime) -> None:
524  """Run tasks every five minutes."""
525  self.queue_taskqueue_task(ADJUST_LRU_SIZE_TASK)
526  self.async_periodic_statisticsasync_periodic_statistics()
527 
528  def _adjust_lru_size(self) -> None:
529  """Trigger the LRU adjustment.
530 
531  If the number of entities has increased, increase the size of the LRU
532  cache to avoid thrashing.
533  """
534  if new_size := self.hasshass.states.async_entity_ids_count() * 2:
535  self.state_attributes_managerstate_attributes_manager.adjust_lru_size(new_size)
536  self.states_meta_managerstates_meta_manager.adjust_lru_size(new_size)
537  self.statistics_meta_managerstatistics_meta_manager.adjust_lru_size(new_size)
538 
539  @callback
540  def async_periodic_statistics(self) -> None:
541  """Trigger the statistics run.
542 
543  Short term statistics run every 5 minutes
544  """
545  start = statistics.get_start_time()
546  self.queue_taskqueue_task(StatisticsTask(start, True))
547 
548  @callback
550  self,
551  statistic_id: str,
552  start_time: datetime,
553  sum_adjustment: float,
554  adjustment_unit: str,
555  ) -> None:
556  """Adjust statistics."""
557  self.queue_taskqueue_task(
559  statistic_id, start_time, sum_adjustment, adjustment_unit
560  )
561  )
562 
563  @callback
565  self, statistic_ids: list[str], *, on_done: Callable[[], None] | None = None
566  ) -> None:
567  """Clear statistics for a list of statistic_ids."""
568  self.queue_taskqueue_task(ClearStatisticsTask(on_done, statistic_ids))
569 
570  @callback
572  self,
573  statistic_id: str,
574  *,
575  new_statistic_id: str | UndefinedType = UNDEFINED,
576  new_unit_of_measurement: str | None | UndefinedType = UNDEFINED,
577  on_done: Callable[[], None] | None = None,
578  ) -> None:
579  """Update statistics metadata for a statistic_id."""
580  self.queue_taskqueue_task(
582  on_done, statistic_id, new_statistic_id, new_unit_of_measurement
583  )
584  )
585 
586  @callback
588  self,
589  entity_id: str,
590  new_entity_id: str,
591  ) -> None:
592  """Update states metadata for an entity_id."""
593  self.queue_taskqueue_task(UpdateStatesMetadataTask(entity_id, new_entity_id))
594 
595  @callback
597  self,
598  statistic_id: str,
599  *,
600  new_unit_of_measurement: str,
601  old_unit_of_measurement: str,
602  ) -> None:
603  """Change statistics unit for a statistic_id."""
604  self.queue_taskqueue_task(
606  statistic_id, new_unit_of_measurement, old_unit_of_measurement
607  )
608  )
609 
610  @callback
612  self,
613  metadata: StatisticMetaData,
614  stats: Iterable[StatisticData],
615  table: type[Statistics | StatisticsShortTerm],
616  ) -> None:
617  """Schedule import of statistics."""
618  self.queue_taskqueue_task(ImportStatisticsTask(metadata, stats, table))
619 
620  @callback
621  def _async_setup_periodic_tasks(self) -> None:
622  """Prepare periodic tasks."""
623  if self.hasshass.is_stopping or not self._get_session_get_session:
624  # Home Assistant is shutting down
625  return
626 
627  # If the db is using a socket connection, we need to keep alive
628  # to prevent errors from unexpected disconnects
629  if self.dialect_namedialect_name != SupportedDialect.SQLITE:
630  self._keep_alive_listener_keep_alive_listener = async_track_time_interval(
631  self.hasshass,
632  self._async_keep_alive_async_keep_alive,
633  timedelta(seconds=KEEPALIVE_TIME),
634  name="Recorder keep alive",
635  )
636 
637  # If the commit interval is not 0, we need to commit periodically
638  if self.commit_intervalcommit_interval:
639  self._commit_listener_commit_listener = async_track_time_interval(
640  self.hasshass,
641  self._async_commit_async_commit,
642  timedelta(seconds=self.commit_intervalcommit_interval),
643  name="Recorder commit",
644  )
645 
646  # Run nightly tasks at 4:12am
647  self._nightly_listener_nightly_listener = async_track_time_change(
648  self.hasshass, self.async_nightly_tasksasync_nightly_tasks, hour=4, minute=12, second=0
649  )
650 
651  # Compile short term statistics every 5 minutes
652  self._periodic_listener_periodic_listener = async_track_utc_time_change(
653  self.hasshass, self._async_five_minute_tasks_async_five_minute_tasks, minute=range(0, 60, 5), second=10
654  )
655 
656  async def _async_wait_for_started(self) -> object | None:
657  """Wait for the hass started future."""
658  return await self._hass_started
659 
660  def _wait_startup_or_shutdown(self) -> object | None:
661  """Wait for startup or shutdown before starting."""
662  try:
663  return asyncio.run_coroutine_threadsafe(
664  self._async_wait_for_started_async_wait_for_started(), self.hasshass.loop
665  ).result()
666  except CancelledError as ex:
667  _LOGGER.warning(
668  "Recorder startup was externally canceled before it could complete: %s",
669  ex,
670  )
671  return SHUTDOWN_TASK
672 
673  def run(self) -> None:
674  """Run the recorder thread."""
675  self.is_runningis_running = True
676  try:
677  self._run_run()
678  except Exception:
679  _LOGGER.exception(
680  "Recorder._run threw unexpected exception, recorder shutting down"
681  )
682  finally:
683  # Ensure shutdown happens cleanly if
684  # anything goes wrong in the run loop
685  self.is_runningis_running = False
686  self._shutdown_shutdown()
687 
688  def _add_to_session(self, session: Session, obj: object) -> None:
689  """Add an object to the session."""
690  self._event_session_has_pending_writes_event_session_has_pending_writes = True
691  session.add(obj)
692 
693  def _notify_migration_failed(self) -> None:
694  """Notify the user schema migration failed."""
695  persistent_notification.create(
696  self.hasshass,
697  "The database migration failed, check [the logs](/config/logs).",
698  "Database Migration Failed",
699  "recorder_database_migration",
700  )
701 
703  """Dismiss notification about migration in progress."""
704  persistent_notification.dismiss(self.hasshass, "recorder_database_migration")
705 
706  def _run(self) -> None:
707  """Start processing events to save."""
708  thread_id = threading.get_ident()
709  self.thread_idthread_id = thread_id
710  self.recorder_and_worker_thread_ids.add(thread_id)
711 
712  setup_result = self._setup_recorder_setup_recorder()
713 
714  if not setup_result:
715  # Give up if we could not connect
716  return
717 
718  schema_status = migration.validate_db_schema(self.hasshass, self, self.get_sessionget_session)
719  if schema_status is None:
720  # Give up if we could not validate the schema
721  return
722  self.schema_versionschema_version = schema_status.current_version
723 
724  if not schema_status.migration_needed and not schema_status.schema_errors:
725  self._setup_run_setup_run()
726  else:
727  self.migration_in_progressmigration_in_progress = True
728  self.migration_is_livemigration_is_live = migration.live_migration(schema_status)
729 
730  self.hasshass.add_job(self.async_connection_successasync_connection_success)
731 
732  # First do non-live migration steps, if needed
733  if schema_status.migration_needed:
734  # Do non-live schema migration
735  result, schema_status = self._migrate_schema_offline_migrate_schema_offline(schema_status)
736  if not result:
737  self._notify_migration_failed_notify_migration_failed()
738  self.migration_in_progressmigration_in_progress = False
739  return
740  self.schema_versionschema_version = schema_status.current_version
741 
742  # Do non-live data migration
743  self._migrate_data_offline_migrate_data_offline(schema_status)
744 
745  # Non-live migration is now completed, remaining steps are live
746  self.migration_is_livemigration_is_live = True
747 
748  # After non-live migration, activate the recorder
749  self._activate_and_set_db_ready_activate_and_set_db_ready(schema_status)
750  # We wait to start a live migration until startup has finished
751  # since it can be cpu intensive and we do not want it to compete
752  # with startup which is also cpu intensive
753  if self._wait_startup_or_shutdown_wait_startup_or_shutdown() is SHUTDOWN_TASK:
754  # Shutdown happened before Home Assistant finished starting
755  self.migration_in_progressmigration_in_progress = False
756  # Make sure we cleanly close the run if
757  # we restart before startup finishes
758  return
759 
760  # Do live migration steps and repairs, if needed
761  if schema_status.migration_needed or schema_status.schema_errors:
762  result, schema_status = self._migrate_schema_live_migrate_schema_live(schema_status)
763  if result:
764  self.schema_versionschema_version = SCHEMA_VERSION
765  if not self._event_listener_event_listener:
766  # If the schema migration takes so long that the end
767  # queue watcher safety kicks in because _reached_max_backlog
768  # was True, we need to reinitialize the listener.
769  self.hasshass.add_job(self.async_initializeasync_initialize)
770  else:
771  self.migration_in_progressmigration_in_progress = False
772  self._dismiss_migration_in_progress_dismiss_migration_in_progress()
773  self._notify_migration_failed_notify_migration_failed()
774  return
775 
776  # Schema migration and repair is now completed
777  if self.migration_in_progressmigration_in_progress:
778  self.migration_in_progressmigration_in_progress = False
779  self._dismiss_migration_in_progress_dismiss_migration_in_progress()
780  self._setup_run_setup_run()
781 
782  # Catch up with missed statistics
783  self._schedule_compile_missing_statistics_schedule_compile_missing_statistics()
784  _LOGGER.debug("Recorder processing the queue")
785  self._adjust_lru_size_adjust_lru_size()
786  self.hasshass.add_job(self._async_set_recorder_ready_migration_done_async_set_recorder_ready_migration_done)
787  self._run_event_loop_run_event_loop()
788 
790  self, schema_status: migration.SchemaValidationStatus
791  ) -> None:
792  """Activate the table managers or schedule migrations and mark the db as ready."""
793  with session_scope(session=self.get_sessionget_session()) as session:
794  # Prime the statistics meta manager as soon as possible
795  # since we want the frontend queries to avoid a thundering
796  # herd of queries to find the statistics meta data if
797  # there are a lot of statistics graphs on the frontend.
798  self.statistics_meta_managerstatistics_meta_manager.load(session)
799 
800  migration.migrate_data_live(self, self.get_sessionget_session, schema_status)
801 
802  # We must only set the db ready after we have set the table managers
803  # to active if there is no data to migrate.
804  #
805  # This ensures that the history queries will use the new tables
806  # and not the old ones as soon as the API is available.
807  self.hasshass.add_job(self.async_set_db_readyasync_set_db_ready)
808 
809  def _run_event_loop(self) -> None:
810  """Run the event loop for the recorder."""
811  # Use a session for the event read loop
812  # with a commit every time the event time
813  # has changed. This reduces the disk io.
814  queue_ = self._queue
815  startup_task_or_events: list[RecorderTask | Event] = []
816  while not queue_.empty() and (task_or_event := queue_.get_nowait()):
817  startup_task_or_events.append(task_or_event)
818  self._pre_process_startup_events_pre_process_startup_events(startup_task_or_events)
819  for task in startup_task_or_events:
820  self._guarded_process_one_task_or_event_or_recover_guarded_process_one_task_or_event_or_recover(task)
821 
822  # Clear startup tasks since this thread runs forever
823  # and we don't want to hold them in memory
824  del startup_task_or_events
825 
826  self.stop_requestedstop_requested = False
827  while not self.stop_requestedstop_requested:
828  self._guarded_process_one_task_or_event_or_recover_guarded_process_one_task_or_event_or_recover(queue_.get())
829 
831  self, startup_task_or_events: list[RecorderTask | Event[Any]]
832  ) -> None:
833  """Pre process startup events."""
834  # Prime all the state_attributes and event_data caches
835  # before we start processing events
836  state_change_events: list[Event[EventStateChangedData]] = []
837  non_state_change_events: list[Event] = []
838 
839  for task_or_event in startup_task_or_events:
840  # Event is never subclassed so we can
841  # use a fast type check
842  if type(task_or_event) is Event:
843  event_ = task_or_event
844  if event_.event_type == EVENT_STATE_CHANGED:
845  state_change_events.append(event_)
846  else:
847  non_state_change_events.append(event_)
848 
849  assert self.event_sessionevent_session is not None
850  session = self.event_sessionevent_session
851  self.event_data_managerevent_data_manager.load(non_state_change_events, session)
852  self.event_type_managerevent_type_manager.load(non_state_change_events, session)
853  self.states_meta_managerstates_meta_manager.load(state_change_events, session)
854  self.state_attributes_managerstate_attributes_manager.load(state_change_events, session)
855 
857  self, task: RecorderTask | Event
858  ) -> None:
859  """Process a task, guarding against exceptions to ensure the loop does not collapse."""
860  _LOGGER.debug("Processing task: %s", task)
861  try:
862  self._process_one_task_or_event_or_recover_process_one_task_or_event_or_recover(task)
863  except Exception:
864  _LOGGER.exception("Error while processing event %s", task)
865 
866  def _process_one_task_or_event_or_recover(self, task: RecorderTask | Event) -> None:
867  """Process a task or event, reconnect, or recover a malformed database."""
868  try:
869  # Almost everything coming in via the queue
870  # is an Event so we can process it directly
871  # and since its never subclassed, we can
872  # use a fast type check
873  if type(task) is Event:
874  self._process_one_event_process_one_event(task)
875  return
876  # If its not an event, commit everything
877  # that is pending before running the task
878  if TYPE_CHECKING:
879  assert isinstance(task, RecorderTask)
880  if task.commit_before:
881  self._commit_event_session_or_retry_commit_event_session_or_retry()
882  task.run(self)
883  except exc.DatabaseError as err:
884  if self._handle_database_error_handle_database_error(err, setup_run=True):
885  return
886  _LOGGER.exception("Unhandled database error while processing task %s", task)
887  except SQLAlchemyError:
888  _LOGGER.exception("SQLAlchemyError error processing task %s", task)
889  else:
890  return
891 
892  # Reset the session if an SQLAlchemyError (including DatabaseError)
893  # happens to rollback and recover
894  self._reopen_event_session_reopen_event_session()
895 
896  def _setup_recorder(self) -> bool:
897  """Create a connection to the database."""
898  tries = 1
899 
900  while tries <= self.db_max_retriesdb_max_retries:
901  try:
902  self._setup_connection_setup_connection()
903  return migration.initialize_database(self.get_sessionget_session)
904  except UnsupportedDialect:
905  break
906  except Exception:
907  _LOGGER.exception(
908  "Error during connection setup: (retrying in %s seconds)",
909  self.db_retry_waitdb_retry_wait,
910  )
911  tries += 1
912 
913  if tries <= self.db_max_retriesdb_max_retries:
914  self._close_connection_close_connection()
915  time.sleep(self.db_retry_waitdb_retry_wait)
916 
917  return False
918 
920  self, schema_status: migration.SchemaValidationStatus
921  ) -> None:
922  """Migrate data."""
923  with self.hasshass.timeout.freeze(DOMAIN):
924  migration.migrate_data_non_live(self, self.get_sessionget_session, schema_status)
925 
927  self, schema_status: migration.SchemaValidationStatus
928  ) -> tuple[bool, migration.SchemaValidationStatus]:
929  """Migrate schema to the latest version."""
930  with self.hasshass.timeout.freeze(DOMAIN):
931  return self._migrate_schema_migrate_schema(schema_status, False)
932 
934  self, schema_status: migration.SchemaValidationStatus
935  ) -> tuple[bool, migration.SchemaValidationStatus]:
936  """Migrate schema to the latest version."""
937  persistent_notification.create(
938  self.hasshass,
939  (
940  "System performance will temporarily degrade during the database"
941  " upgrade. Do not power down or restart the system until the upgrade"
942  " completes. Integrations that read the database, such as logbook,"
943  " history, and statistics may return inconsistent results until the"
944  " upgrade completes. This notification will be automatically dismissed"
945  " when the upgrade completes."
946  ),
947  "Database upgrade in progress",
948  "recorder_database_migration",
949  )
950  return self._migrate_schema_migrate_schema(schema_status, True)
951 
953  self,
954  schema_status: migration.SchemaValidationStatus,
955  live: bool,
956  ) -> tuple[bool, migration.SchemaValidationStatus]:
957  """Migrate schema to the latest version."""
958  assert self.engineengine is not None
959  try:
960  if live:
961  migrator = migration.migrate_schema_live
962  else:
963  migrator = migration.migrate_schema_non_live
964  new_schema_status = migrator(
965  self, self.hasshass, self.engineengine, self.get_sessionget_session, schema_status
966  )
967  except exc.DatabaseError as err:
968  if self._handle_database_error_handle_database_error(err, setup_run=False):
969  # If _handle_database_error returns True, we have a new database
970  # which does not need migration or repair.
971  new_schema_status = migration.SchemaValidationStatus(
972  current_version=SCHEMA_VERSION,
973  migration_needed=False,
974  non_live_data_migration_needed=False,
975  schema_errors=set(),
976  start_version=SCHEMA_VERSION,
977  )
978  return (True, new_schema_status)
979  _LOGGER.exception("Database error during schema migration")
980  return (False, schema_status)
981  except Exception:
982  _LOGGER.exception("Error during schema migration")
983  return (False, schema_status)
984  else:
985  return (True, new_schema_status)
986 
987  def _lock_database(self, task: DatabaseLockTask) -> None:
988  @callback
989  def _async_set_database_locked(task: DatabaseLockTask) -> None:
990  task.database_locked.set()
991 
992  local_start_time = dt_util.now()
993  hass = self.hasshass
994  with write_lock_db_sqlite(self):
995  # Notify that lock is being held, wait until database can be used again.
996  hass.add_job(_async_set_database_locked, task)
997  while not task.database_unlock.wait(timeout=DB_LOCK_QUEUE_CHECK_TIMEOUT):
998  if self._reached_max_backlog_reached_max_backlog():
999  _LOGGER.warning(
1000  "Database queue backlog reached more than %s events "
1001  "while waiting for backup to finish; recorder will now "
1002  "resume writing to database. The backup cannot be trusted and "
1003  "must be restarted",
1004  self.backlogbacklog,
1005  )
1006  task.queue_overflow = True
1007  hass.add_job(
1008  async_create_backup_failure_issue, self.hasshass, local_start_time
1009  )
1010  break
1011  _LOGGER.info(
1012  "Database queue backlog reached %d entries during backup",
1013  self.backlogbacklog,
1014  )
1015 
1016  def _process_one_event(self, event: Event[Any]) -> None:
1017  if not self.enabledenabled:
1018  return
1019  if event.event_type == EVENT_STATE_CHANGED:
1020  self._process_state_changed_event_into_session_process_state_changed_event_into_session(event)
1021  else:
1022  self._process_non_state_changed_event_into_session_process_non_state_changed_event_into_session(event)
1023  # Commit if the commit interval is zero
1024  if not self.commit_intervalcommit_interval:
1025  self._commit_event_session_or_retry_commit_event_session_or_retry()
1026 
1027  def _process_non_state_changed_event_into_session(self, event: Event) -> None:
1028  """Process any event into the session except state changed."""
1029  session = self.event_sessionevent_session
1030  assert session is not None
1031  dbevent = Events.from_event(event)
1032 
1033  # Map the event_type to the EventTypes table
1034  event_type_manager = self.event_type_managerevent_type_manager
1035  if pending_event_types := event_type_manager.get_pending(event.event_type):
1036  dbevent.event_type_rel = pending_event_types
1037  elif event_type_id := event_type_manager.get(event.event_type, session, True):
1038  dbevent.event_type_id = event_type_id
1039  else:
1040  event_types = EventTypes(event_type=event.event_type)
1041  event_type_manager.add_pending(event_types)
1042  self._add_to_session_add_to_session(session, event_types)
1043  dbevent.event_type_rel = event_types
1044 
1045  if not event.data:
1046  self._add_to_session_add_to_session(session, dbevent)
1047  return
1048 
1049  event_data_manager = self.event_data_managerevent_data_manager
1050  if not (shared_data_bytes := event_data_manager.serialize_from_event(event)):
1051  return
1052 
1053  # Map the event data to the EventData table
1054  shared_data = shared_data_bytes.decode("utf-8")
1055  # Matching attributes found in the pending commit
1056  if pending_event_data := event_data_manager.get_pending(shared_data):
1057  dbevent.event_data_rel = pending_event_data
1058  # Matching attributes id found in the cache
1059  elif (data_id := event_data_manager.get_from_cache(shared_data)) or (
1060  (hash_ := EventData.hash_shared_data_bytes(shared_data_bytes))
1061  and (data_id := event_data_manager.get(shared_data, hash_, session))
1062  ):
1063  dbevent.data_id = data_id
1064  else:
1065  # No matching attributes found, save them in the DB
1066  dbevent_data = EventData(shared_data=shared_data, hash=hash_)
1067  event_data_manager.add_pending(dbevent_data)
1068  self._add_to_session_add_to_session(session, dbevent_data)
1069  dbevent.event_data_rel = dbevent_data
1070 
1071  self._add_to_session_add_to_session(session, dbevent)
1072 
1074  self, event: Event[EventStateChangedData]
1075  ) -> None:
1076  """Process a state_changed event into the session."""
1077  state_attributes_manager = self.state_attributes_managerstate_attributes_manager
1078  states_meta_manager = self.states_meta_managerstates_meta_manager
1079  entity_removed = not event.data.get("new_state")
1080  entity_id = event.data["entity_id"]
1081 
1082  dbstate = States.from_event(event)
1083  old_state = event.data["old_state"]
1084 
1085  assert self.event_sessionevent_session is not None
1086  session = self.event_sessionevent_session
1087 
1088  states_manager = self.states_managerstates_manager
1089  if pending_state := states_manager.pop_pending(entity_id):
1090  dbstate.old_state = pending_state
1091  if old_state:
1092  pending_state.last_reported_ts = old_state.last_reported_timestamp
1093  elif old_state_id := states_manager.pop_committed(entity_id):
1094  dbstate.old_state_id = old_state_id
1095  if old_state:
1096  states_manager.update_pending_last_reported(
1097  old_state_id, old_state.last_reported_timestamp
1098  )
1099  if entity_removed:
1100  dbstate.state = None
1101  else:
1102  states_manager.add_pending(entity_id, dbstate)
1103 
1104  if states_meta_manager.active:
1105  dbstate.entity_id = None
1106 
1107  if entity_id is None or not (
1108  shared_attrs_bytes := state_attributes_manager.serialize_from_event(event)
1109  ):
1110  return
1111 
1112  # Map the entity_id to the StatesMeta table
1113  if pending_states_meta := states_meta_manager.get_pending(entity_id):
1114  dbstate.states_meta_rel = pending_states_meta
1115  elif metadata_id := states_meta_manager.get(entity_id, session, True):
1116  dbstate.metadata_id = metadata_id
1117  elif states_meta_manager.active and entity_removed:
1118  # If the entity was removed, we don't need to add it to the
1119  # StatesMeta table or record it in the pending commit
1120  # if it does not have a metadata_id allocated to it as
1121  # it either never existed or was just renamed.
1122  return
1123  else:
1124  states_meta = StatesMeta(entity_id=entity_id)
1125  states_meta_manager.add_pending(states_meta)
1126  self._add_to_session_add_to_session(session, states_meta)
1127  dbstate.states_meta_rel = states_meta
1128 
1129  # Map the event data to the StateAttributes table
1130  shared_attrs = shared_attrs_bytes.decode("utf-8")
1131  dbstate.attributes = None
1132  # Matching attributes found in the pending commit
1133  if pending_event_data := state_attributes_manager.get_pending(shared_attrs):
1134  dbstate.state_attributes = pending_event_data
1135  # Matching attributes id found in the cache
1136  elif (
1137  attributes_id := state_attributes_manager.get_from_cache(shared_attrs)
1138  ) or (
1139  (hash_ := StateAttributes.hash_shared_attrs_bytes(shared_attrs_bytes))
1140  and (
1141  attributes_id := state_attributes_manager.get(
1142  shared_attrs, hash_, session
1143  )
1144  )
1145  ):
1146  dbstate.attributes_id = attributes_id
1147  else:
1148  # No matching attributes found, save them in the DB
1149  dbstate_attributes = StateAttributes(shared_attrs=shared_attrs, hash=hash_)
1150  state_attributes_manager.add_pending(dbstate_attributes)
1151  self._add_to_session_add_to_session(session, dbstate_attributes)
1152  dbstate.state_attributes = dbstate_attributes
1153 
1154  self._add_to_session_add_to_session(session, dbstate)
1155 
1156  def _handle_database_error(self, err: Exception, *, setup_run: bool) -> bool:
1157  """Handle a database error that may result in moving away the corrupt db."""
1158  if (
1159  (cause := err.__cause__)
1160  and isinstance(cause, sqlite3.DatabaseError)
1161  and (cause_str := str(cause))
1162  # Make sure we do not move away a database when its only locked
1163  # externally by another process. sqlite does not give us a named
1164  # exception for this so we have to check the error message.
1165  and ("malformed" in cause_str or "not a database" in cause_str)
1166  ):
1167  _LOGGER.exception(
1168  "Unrecoverable sqlite3 database corruption detected: %s", err
1169  )
1170  self._handle_sqlite_corruption_handle_sqlite_corruption(setup_run)
1171  return True
1172  return False
1173 
1175  """Commit the event session if there is work to do."""
1176  if not self._event_session_has_pending_writes_event_session_has_pending_writes:
1177  return
1178  tries = 1
1179  while tries <= self.db_max_retriesdb_max_retries:
1180  try:
1181  self._commit_event_session_commit_event_session()
1182  except (exc.InternalError, exc.OperationalError) as err:
1183  _LOGGER.error(
1184  "%s: Error executing query: %s. (retrying in %s seconds)",
1185  INVALIDATED_ERR if err.connection_invalidated else CONNECTIVITY_ERR,
1186  err,
1187  self.db_retry_waitdb_retry_wait,
1188  )
1189  if tries == self.db_max_retriesdb_max_retries:
1190  raise
1191 
1192  tries += 1
1193  time.sleep(self.db_retry_waitdb_retry_wait)
1194  else:
1195  return
1196 
1197  def _commit_event_session(self) -> None:
1198  assert self.event_sessionevent_session is not None
1199  session = self.event_sessionevent_session
1200  self._commits_without_expire_commits_without_expire += 1
1201 
1202  if (
1203  pending_last_reported
1204  := self.states_managerstates_manager.get_pending_last_reported_timestamp()
1205  ) and self.schema_versionschema_version >= LAST_REPORTED_SCHEMA_VERSION:
1206  with session.no_autoflush:
1207  session.execute(
1208  update(States),
1209  [
1210  {
1211  "state_id": state_id,
1212  "last_reported_ts": last_reported_timestamp,
1213  }
1214  for state_id, last_reported_timestamp in pending_last_reported.items()
1215  ],
1216  )
1217  session.commit()
1218 
1219  self._event_session_has_pending_writes_event_session_has_pending_writes = False
1220  # We just committed the state attributes to the database
1221  # and we now know the attributes_ids. We can save
1222  # many selects for matching attributes by loading them
1223  # into the LRU or committed now.
1224  self.states_managerstates_manager.post_commit_pending()
1225  self.state_attributes_managerstate_attributes_manager.post_commit_pending()
1226  self.event_data_managerevent_data_manager.post_commit_pending()
1227  self.event_type_managerevent_type_manager.post_commit_pending()
1228  self.states_meta_managerstates_meta_manager.post_commit_pending()
1229 
1230  # Expire is an expensive operation (frequently more expensive
1231  # than the flush and commit itself) so we only
1232  # do it after EXPIRE_AFTER_COMMITS commits
1233  if self._commits_without_expire_commits_without_expire >= EXPIRE_AFTER_COMMITS:
1234  self._commits_without_expire_commits_without_expire = 0
1235  session.expire_all()
1236 
1237  def _handle_sqlite_corruption(self, setup_run: bool) -> None:
1238  """Handle the sqlite3 database being corrupt."""
1239  try:
1240  self._close_event_session_close_event_session()
1241  finally:
1242  self._close_connection_close_connection()
1244  self.recorder_runs_managerrecorder_runs_manager.reset()
1245  self._setup_recorder_setup_recorder()
1246  if setup_run:
1247  self._setup_run_setup_run()
1248 
1249  def _close_event_session(self) -> None:
1250  """Close the event session."""
1251  self.states_managerstates_manager.reset()
1252  self.state_attributes_managerstate_attributes_manager.reset()
1253  self.event_data_managerevent_data_manager.reset()
1254  self.event_type_managerevent_type_manager.reset()
1255  self.states_meta_managerstates_meta_manager.reset()
1256  self.statistics_meta_managerstatistics_meta_manager.reset()
1257 
1258  if not self.event_sessionevent_session:
1259  return
1260 
1261  try:
1262  self.event_sessionevent_session.rollback()
1263  self.event_sessionevent_session.close()
1264  except SQLAlchemyError:
1265  _LOGGER.exception("Error while rolling back and closing the event session")
1266 
1267  def _reopen_event_session(self) -> None:
1268  """Rollback the event session and reopen it after a failure."""
1269  self._close_event_session_close_event_session()
1270  self._open_event_session_open_event_session()
1271 
1272  def _open_event_session(self) -> None:
1273  """Open the event session."""
1274  self.event_sessionevent_session = self.get_sessionget_session()
1275  self.event_sessionevent_session.expire_on_commit = False
1276 
1277  def _send_keep_alive(self) -> None:
1278  """Send a keep alive to keep the db connection open."""
1279  assert self.event_sessionevent_session is not None
1280  _LOGGER.debug("Sending keepalive")
1281  self.event_sessionevent_session.connection().scalar(select(1))
1282 
1283  async def async_block_till_done(self) -> None:
1284  """Async version of block_till_done."""
1285  if self._queue.empty() and not self._event_session_has_pending_writes_event_session_has_pending_writes:
1286  return
1287  event = asyncio.Event()
1288  self.queue_taskqueue_task(SynchronizeTask(event))
1289  await event.wait()
1290 
1291  def block_till_done(self) -> None:
1292  """Block till all events processed.
1293 
1294  This is only called in tests.
1295 
1296  This only blocks until the queue is empty
1297  which does not mean the recorder is done.
1298 
1299  Call tests.common's wait_recording_done
1300  after calling this to ensure the data
1301  is in the database.
1302  """
1303  self._queue_watch_queue_watch.clear()
1304  self.queue_taskqueue_task(WAIT_TASK)
1305  self._queue_watch_queue_watch.wait()
1306 
1307  async def lock_database(self) -> bool:
1308  """Lock database so it can be backed up safely."""
1309  if self.dialect_namedialect_name != SupportedDialect.SQLITE:
1310  _LOGGER.debug(
1311  "Not a SQLite database or not connected, locking not necessary"
1312  )
1313  return True
1314 
1315  if self._database_lock_task_database_lock_task:
1316  _LOGGER.warning("Database already locked")
1317  return False
1318 
1319  database_locked = asyncio.Event()
1320  task = DatabaseLockTask(database_locked, threading.Event(), False)
1321  self.queue_taskqueue_task(task)
1322  try:
1323  async with asyncio.timeout(DB_LOCK_TIMEOUT):
1324  await database_locked.wait()
1325  except TimeoutError as err:
1326  task.database_unlock.set()
1327  raise TimeoutError(
1328  f"Could not lock database within {DB_LOCK_TIMEOUT} seconds."
1329  ) from err
1330  self._database_lock_task_database_lock_task = task
1331  return True
1332 
1333  @callback
1334  def unlock_database(self) -> bool:
1335  """Unlock database.
1336 
1337  Returns true if database lock has been held throughout the process.
1338  """
1339  if self.dialect_namedialect_name != SupportedDialect.SQLITE:
1340  _LOGGER.debug(
1341  "Not a SQLite database or not connected, unlocking not necessary"
1342  )
1343  return True
1344 
1345  if not self._database_lock_task_database_lock_task:
1346  _LOGGER.warning("Database currently not locked")
1347  return False
1348 
1349  self._database_lock_task_database_lock_task.database_unlock.set()
1350  success = not self._database_lock_task_database_lock_task.queue_overflow
1351 
1352  self._database_lock_task_database_lock_task = None
1353 
1354  return success
1355 
1357  self, dbapi_connection: DBAPIConnection, connection_record: Any
1358  ) -> None:
1359  """Dbapi specific connection settings."""
1360  assert self.engineengine is not None
1361  if database_engine := setup_connection_for_dialect(
1362  self,
1363  self.engineengine.dialect.name,
1364  dbapi_connection,
1365  not self._completed_first_database_setup_completed_first_database_setup,
1366  ):
1367  self.database_enginedatabase_engine = database_engine
1368  self.max_bind_varsmax_bind_vars = database_engine.max_bind_vars
1369  self._completed_first_database_setup_completed_first_database_setup = True
1370 
1371  def _setup_connection(self) -> None:
1372  """Ensure database is ready to fly."""
1373  kwargs: dict[str, Any] = {}
1374  self._completed_first_database_setup_completed_first_database_setup = False
1375 
1376  if self.db_urldb_url == SQLITE_URL_PREFIX or ":memory:" in self.db_urldb_url:
1377  kwargs["connect_args"] = {"check_same_thread": False}
1378  kwargs["poolclass"] = MutexPool
1379  MutexPool.pool_lock = threading.RLock()
1380  kwargs["pool_reset_on_return"] = None
1381  elif self.db_urldb_url.startswith(SQLITE_URL_PREFIX):
1382  kwargs["poolclass"] = RecorderPool
1383  kwargs["recorder_and_worker_thread_ids"] = (
1384  self.recorder_and_worker_thread_ids
1385  )
1386  elif self.db_urldb_url.startswith(
1387  (
1388  MARIADB_URL_PREFIX,
1389  MARIADB_PYMYSQL_URL_PREFIX,
1390  MYSQLDB_URL_PREFIX,
1391  MYSQLDB_PYMYSQL_URL_PREFIX,
1392  )
1393  ):
1394  kwargs["connect_args"] = {"charset": "utf8mb4"}
1395  if self.db_urldb_url.startswith((MARIADB_URL_PREFIX, MYSQLDB_URL_PREFIX)):
1396  # If they have configured MySQLDB but don't have
1397  # the MySQLDB module installed this will throw
1398  # an ImportError which we suppress here since
1399  # sqlalchemy will give them a better error when
1400  # it tried to import it below.
1401  with contextlib.suppress(ImportError):
1402  kwargs["connect_args"]["conv"] = build_mysqldb_conv()
1403 
1404  # Disable extended logging for non SQLite databases
1405  if not self.db_urldb_url.startswith(SQLITE_URL_PREFIX):
1406  kwargs["echo"] = False
1407 
1408  if self._using_file_sqlite_using_file_sqlite:
1410 
1411  assert not self.engineengine
1412  self.engineengine = create_engine(self.db_urldb_url, **kwargs, future=True)
1413  self._dialect_name_dialect_name = try_parse_enum(SupportedDialect, self.engineengine.dialect.name)
1414  self.__dict__.pop("dialect_name", None)
1415  sqlalchemy_event.listen(self.engineengine, "connect", self._setup_recorder_connection_setup_recorder_connection)
1416 
1417  migration.pre_migrate_schema(self.engineengine)
1418  Base.metadata.create_all(self.engineengine)
1419  self._get_session_get_session = scoped_session(sessionmaker(bind=self.engineengine, future=True))
1420  _LOGGER.debug("Connected to recorder database")
1421 
1422  def _close_connection(self) -> None:
1423  """Close the connection."""
1424  if self.engineengine:
1425  self.engineengine.dispose()
1426  self.engineengine = None
1427  self._get_session_get_session = None
1428 
1429  def _setup_run(self) -> None:
1430  """Log the start of the current run and schedule any needed jobs."""
1431  with session_scope(session=self.get_sessionget_session()) as session:
1432  end_incomplete_runs(session, self.recorder_runs_managerrecorder_runs_manager.recording_start)
1433  self.recorder_runs_managerrecorder_runs_manager.start(session)
1434  self.states_managerstates_manager.load_from_db(session)
1435 
1436  self._open_event_session_open_event_session()
1437 
1439  """Add tasks for missing statistics runs."""
1440  self.queue_taskqueue_task(CompileMissingStatisticsTask())
1441 
1442  def _end_session(self) -> None:
1443  """End the recorder session."""
1444  if self.event_sessionevent_session is None:
1445  return
1446  if self.recorder_runs_managerrecorder_runs_manager.active:
1447  # .end will add to the event session
1448  self._event_session_has_pending_writes_event_session_has_pending_writes = True
1449  self.recorder_runs_managerrecorder_runs_manager.end(self.event_sessionevent_session)
1450  try:
1451  self._commit_event_session_or_retry_commit_event_session_or_retry()
1452  except Exception:
1453  _LOGGER.exception("Error saving the event session during shutdown")
1454 
1455  self.event_sessionevent_session.close()
1456  self.recorder_runs_managerrecorder_runs_manager.clear()
1457 
1458  def _shutdown(self) -> None:
1459  """Save end time for current run."""
1460  _LOGGER.debug("Shutting down recorder")
1461 
1462  # If the schema version is not set, we never had a working
1463  # connection to the database or the schema never reached a
1464  # good state.
1465  # In either case, we want to mark startup as failed.
1466  startup_failed = (
1467  not self.schema_versionschema_version or self.schema_versionschema_version != SCHEMA_VERSION
1468  )
1469  self.hasshass.add_job(self._async_startup_done_async_startup_done, startup_failed)
1470 
1471  try:
1472  self._end_session_end_session()
1473  finally:
1474  if self._db_executor_db_executor:
1475  # We shutdown the executor without forcefully
1476  # joining the threads until after we have tried
1477  # to cleanly close the connection.
1478  self._db_executor_db_executor.shutdown(join_threads_or_timeout=False)
1479  self._close_connection_close_connection()
1480  if self._db_executor_db_executor:
1481  # After the connection is closed, we can join the threads
1482  # or forcefully shutdown the threads if they take too long.
1483  self._db_executor_db_executor.join_threads_or_timeout()
tuple[bool, migration.SchemaValidationStatus] _migrate_schema(self, migration.SchemaValidationStatus schema_status, bool live)
Definition: core.py:956
tuple[bool, migration.SchemaValidationStatus] _migrate_schema_live(self, migration.SchemaValidationStatus schema_status)
Definition: core.py:935
None _activate_and_set_db_ready(self, migration.SchemaValidationStatus schema_status)
Definition: core.py:791
SupportedDialect|None dialect_name(self)
Definition: core.py:246
None _async_hass_started(self, HomeAssistant hass)
Definition: core.py:449
None _migrate_data_offline(self, migration.SchemaValidationStatus schema_status)
Definition: core.py:921
None _setup_recorder_connection(self, DBAPIConnection dbapi_connection, Any connection_record)
Definition: core.py:1358
None _guarded_process_one_task_or_event_or_recover(self, RecorderTask|Event task)
Definition: core.py:858
None _process_non_state_changed_event_into_session(self, Event event)
Definition: core.py:1027
None _process_state_changed_event_into_session(self, Event[EventStateChangedData] event)
Definition: core.py:1075
None queue_task(self, RecorderTask|Event task)
Definition: core.py:268
None async_clear_statistics(self, list[str] statistic_ids, *Callable[[], None]|None on_done=None)
Definition: core.py:566
None async_nightly_tasks(self, datetime now)
Definition: core.py:510
None _async_shutdown(self, Event event)
Definition: core.py:440
bool _handle_database_error(self, Exception err, *bool setup_run)
Definition: core.py:1156
None _add_to_session(self, Session session, object obj)
Definition: core.py:688
None _process_one_event(self, Event[Any] event)
Definition: core.py:1016
None async_adjust_statistics(self, str statistic_id, datetime start_time, float sum_adjustment, str adjustment_unit)
Definition: core.py:555
None __init__(self, HomeAssistant hass, bool auto_purge, bool auto_repack, int keep_days, int commit_interval, str uri, int db_max_retries, int db_retry_wait, Callable[[str], bool]|None entity_filter, set[EventType[Any]|str] exclude_event_types)
Definition: core.py:167
None async_change_statistics_unit(self, str statistic_id, *str new_unit_of_measurement, str old_unit_of_measurement)
Definition: core.py:602
None set_enable(self, bool enable)
Definition: core.py:272
None _async_startup_done(self, bool startup_failed)
Definition: core.py:462
None _async_keep_alive(self, datetime now)
Definition: core.py:337
None _lock_database(self, DatabaseLockTask task)
Definition: core.py:987
None async_update_states_metadata(self, str entity_id, str new_entity_id)
Definition: core.py:591
None async_import_statistics(self, StatisticMetaData metadata, Iterable[StatisticData] stats, type[Statistics|StatisticsShortTerm] table)
Definition: core.py:616
None _process_one_task_or_event_or_recover(self, RecorderTask|Event task)
Definition: core.py:866
None _async_commit(self, datetime now)
Definition: core.py:343
None _handle_sqlite_corruption(self, bool setup_run)
Definition: core.py:1237
tuple[bool, migration.SchemaValidationStatus] _migrate_schema_offline(self, migration.SchemaValidationStatus schema_status)
Definition: core.py:928
None async_update_statistics_metadata(self, str statistic_id, *str|UndefinedType new_statistic_id=UNDEFINED, str|None|UndefinedType new_unit_of_measurement=UNDEFINED, Callable[[], None]|None on_done=None)
Definition: core.py:578
None _async_five_minute_tasks(self, datetime now)
Definition: core.py:523
None _async_close(self, Event event)
Definition: core.py:422
None _pre_process_startup_events(self, list[RecorderTask|Event[Any]] startup_task_or_events)
Definition: core.py:832
bool add(self, _T matcher)
Definition: match.py:185
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
DatabaseEngine|None setup_connection_for_dialect(Recorder instance, str dialect_name, DBAPIConnection dbapi_connection, bool first_connection)
Definition: util.py:501
None end_incomplete_runs(Session session, datetime start_time)
Definition: util.py:625
bool validate_or_move_away_sqlite_database(str dburl)
Definition: util.py:211
Generator[None] write_lock_db_sqlite(Recorder instance)
Definition: util.py:792
None move_away_broken_database(str dbfile)
Definition: util.py:308
bool is_second_sunday(datetime date_time)
Definition: util.py:842
CALLBACK_TYPE async_track_time_change(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, Any|None hour=None, Any|None minute=None, Any|None second=None)
Definition: event.py:1904
CALLBACK_TYPE async_track_utc_time_change(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, Any|None hour=None, Any|None minute=None, Any|None second=None, bool local=False)
Definition: event.py:1857
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)
Definition: event.py:1679
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86
CALLBACK_TYPE async_at_started(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)
Definition: start.py:80