Home Assistant Unofficial Reference 2024.12.1
util.py
Go to the documentation of this file.
1 """SQLAlchemy util functions."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Generator, Sequence
6 import contextlib
7 from contextlib import contextmanager
8 from datetime import date, datetime, timedelta
9 import functools
10 import logging
11 import os
12 import time
13 from typing import TYPE_CHECKING, Any, Concatenate, NoReturn
14 
15 from awesomeversion import (
16  AwesomeVersion,
17  AwesomeVersionException,
18  AwesomeVersionStrategy,
19 )
20 import ciso8601
21 from sqlalchemy import inspect, text
22 from sqlalchemy.engine import Result, Row
23 from sqlalchemy.engine.interfaces import DBAPIConnection
24 from sqlalchemy.exc import OperationalError, SQLAlchemyError, StatementError
25 from sqlalchemy.orm.query import Query
26 from sqlalchemy.orm.session import Session
27 from sqlalchemy.sql.lambdas import StatementLambdaElement
28 import voluptuous as vol
29 
30 from homeassistant.core import HomeAssistant, callback
31 from homeassistant.helpers import config_validation as cv, issue_registry as ir
32 from homeassistant.helpers.recorder import ( # noqa: F401
33  DATA_INSTANCE,
34  get_instance,
35  session_scope,
36 )
37 import homeassistant.util.dt as dt_util
38 
39 from .const import (
40  DEFAULT_MAX_BIND_VARS,
41  DOMAIN,
42  SQLITE_MAX_BIND_VARS,
43  SQLITE_MODERN_MAX_BIND_VARS,
44  SQLITE_URL_PREFIX,
45  SupportedDialect,
46 )
47 from .db_schema import (
48  TABLE_RECORDER_RUNS,
49  TABLE_SCHEMA_CHANGES,
50  TABLES_TO_CHECK,
51  RecorderRuns,
52 )
53 from .models import (
54  DatabaseEngine,
55  DatabaseOptimizer,
56  StatisticPeriod,
57  UnsupportedDialect,
58  process_timestamp,
59 )
60 
61 if TYPE_CHECKING:
62  from sqlite3.dbapi2 import Cursor as SQLiteCursor
63 
64  from . import Recorder
65 
66 _LOGGER = logging.getLogger(__name__)
67 
68 RETRIES = 3
69 QUERY_RETRY_WAIT = 0.1
70 SQLITE3_POSTFIXES = ["", "-wal", "-shm"]
71 DEFAULT_YIELD_STATES_ROWS = 32768
72 
73 
74 # Our minimum versions for each database
75 #
76 # Older MariaDB suffers https://jira.mariadb.org/browse/MDEV-25020
77 # which is fixed in 10.5.17, 10.6.9, 10.7.5, 10.8.4
78 #
79 def _simple_version(version: str) -> AwesomeVersion:
80  """Return a simple version."""
81  return AwesomeVersion(version, ensure_strategy=AwesomeVersionStrategy.SIMPLEVER)
82 
83 
84 MIN_VERSION_MARIA_DB = _simple_version("10.3.0")
85 RECOMMENDED_MIN_VERSION_MARIA_DB = _simple_version("10.5.17")
86 MARIADB_WITH_FIXED_IN_QUERIES_105 = _simple_version("10.5.17")
87 MARIA_DB_106 = _simple_version("10.6.0")
88 MARIADB_WITH_FIXED_IN_QUERIES_106 = _simple_version("10.6.9")
89 RECOMMENDED_MIN_VERSION_MARIA_DB_106 = _simple_version("10.6.9")
90 MARIA_DB_107 = _simple_version("10.7.0")
91 RECOMMENDED_MIN_VERSION_MARIA_DB_107 = _simple_version("10.7.5")
92 MARIADB_WITH_FIXED_IN_QUERIES_107 = _simple_version("10.7.5")
93 MARIA_DB_108 = _simple_version("10.8.0")
94 RECOMMENDED_MIN_VERSION_MARIA_DB_108 = _simple_version("10.8.4")
95 MARIADB_WITH_FIXED_IN_QUERIES_108 = _simple_version("10.8.4")
96 MIN_VERSION_MYSQL = _simple_version("8.0.0")
97 MIN_VERSION_PGSQL = _simple_version("12.0")
98 MIN_VERSION_SQLITE = _simple_version("3.31.0")
99 UPCOMING_MIN_VERSION_SQLITE = _simple_version("3.40.1")
100 MIN_VERSION_SQLITE_MODERN_BIND_VARS = _simple_version("3.32.0")
101 
102 
103 # This is the maximum time after the recorder ends the session
104 # before we no longer consider startup to be a "restart" and we
105 # should do a check on the sqlite3 database.
106 MAX_RESTART_TIME = timedelta(minutes=10)
107 
108 # Retry when one of the following MySQL errors occurred:
109 RETRYABLE_MYSQL_ERRORS = (1205, 1206, 1213)
110 # 1205: Lock wait timeout exceeded; try restarting transaction
111 # 1206: The total number of locks exceeds the lock table size
112 # 1213: Deadlock found when trying to get lock; try restarting transaction
113 
114 FIRST_POSSIBLE_SUNDAY = 8
115 SUNDAY_WEEKDAY = 6
116 DAYS_IN_WEEK = 7
117 
118 
120  qry: Query, to_native: bool = False, validate_entity_ids: bool = True
121 ) -> list[Row]:
122  """Query the database and convert the objects to HA native form.
123 
124  This method also retries a few times in the case of stale connections.
125  """
126  debug = _LOGGER.isEnabledFor(logging.DEBUG)
127  for tryno in range(RETRIES):
128  try:
129  if debug:
130  timer_start = time.perf_counter()
131 
132  if to_native:
133  result = [
134  row
135  for row in (
136  row.to_native(validate_entity_id=validate_entity_ids)
137  for row in qry
138  )
139  if row is not None
140  ]
141  else:
142  result = qry.all()
143 
144  if debug:
145  elapsed = time.perf_counter() - timer_start
146  if to_native:
147  _LOGGER.debug(
148  "converting %d rows to native objects took %fs",
149  len(result),
150  elapsed,
151  )
152  else:
153  _LOGGER.debug(
154  "querying %d rows took %fs",
155  len(result),
156  elapsed,
157  )
158 
159  except SQLAlchemyError as err:
160  _LOGGER.error("Error executing query: %s", err)
161 
162  if tryno == RETRIES - 1:
163  raise
164  time.sleep(QUERY_RETRY_WAIT)
165  else:
166  return result
167 
168  # Unreachable
169  raise RuntimeError # pragma: no cover
170 
171 
173  session: Session,
174  stmt: StatementLambdaElement,
175  start_time: datetime | None = None,
176  end_time: datetime | None = None,
177  yield_per: int = DEFAULT_YIELD_STATES_ROWS,
178  orm_rows: bool = True,
179 ) -> Sequence[Row] | Result:
180  """Execute a StatementLambdaElement.
181 
182  If the time window passed is greater than one day
183  the execution method will switch to yield_per to
184  reduce memory pressure.
185 
186  It is not recommended to pass a time window
187  when selecting non-ranged rows (ie selecting
188  specific entities) since they are usually faster
189  with .all().
190  """
191  use_all = not start_time or ((end_time or dt_util.utcnow()) - start_time).days <= 1
192  for tryno in range(RETRIES):
193  try:
194  if orm_rows:
195  executed = session.execute(stmt)
196  else:
197  executed = session.connection().execute(stmt)
198  if use_all:
199  return executed.all()
200  return executed.yield_per(yield_per)
201  except SQLAlchemyError as err:
202  _LOGGER.error("Error executing query: %s", err)
203  if tryno == RETRIES - 1:
204  raise
205  time.sleep(QUERY_RETRY_WAIT)
206 
207  # Unreachable
208  raise RuntimeError # pragma: no cover
209 
210 
212  """Ensure that the database is valid or move it away."""
213  dbpath = dburl_to_path(dburl)
214 
215  if not os.path.exists(dbpath):
216  # Database does not exist yet, this is OK
217  return True
218 
219  if not validate_sqlite_database(dbpath):
221  return False
222 
223  return True
224 
225 
226 def dburl_to_path(dburl: str) -> str:
227  """Convert the db url into a filesystem path."""
228  return dburl.removeprefix(SQLITE_URL_PREFIX)
229 
230 
231 def last_run_was_recently_clean(cursor: SQLiteCursor) -> bool:
232  """Verify the last recorder run was recently clean."""
233 
234  cursor.execute("SELECT end FROM recorder_runs ORDER BY start DESC LIMIT 1;")
235  end_time = cursor.fetchone()
236 
237  if not end_time or not end_time[0]:
238  return False
239 
240  last_run_end_time = process_timestamp(dt_util.parse_datetime(end_time[0]))
241  assert last_run_end_time is not None
242  now = dt_util.utcnow()
243 
244  _LOGGER.debug("The last run ended at: %s (now: %s)", last_run_end_time, now)
245 
246  if last_run_end_time + MAX_RESTART_TIME < now:
247  return False
248 
249  return True
250 
251 
252 def basic_sanity_check(cursor: SQLiteCursor) -> bool:
253  """Check tables to make sure select does not fail."""
254 
255  for table in TABLES_TO_CHECK:
256  if table in (TABLE_RECORDER_RUNS, TABLE_SCHEMA_CHANGES):
257  cursor.execute(f"SELECT * FROM {table};") # noqa: S608 # not injection
258  else:
259  cursor.execute(
260  f"SELECT * FROM {table} LIMIT 1;" # noqa: S608 # not injection
261  )
262 
263  return True
264 
265 
266 def validate_sqlite_database(dbpath: str) -> bool:
267  """Run a quick check on an sqlite database to see if it is corrupt."""
268  import sqlite3 # pylint: disable=import-outside-toplevel
269 
270  try:
271  conn = sqlite3.connect(dbpath)
272  run_checks_on_open_db(dbpath, conn.cursor())
273  conn.close()
274  except sqlite3.DatabaseError:
275  _LOGGER.exception("The database at %s is corrupt or malformed", dbpath)
276  return False
277 
278  return True
279 
280 
281 def run_checks_on_open_db(dbpath: str, cursor: SQLiteCursor) -> None:
282  """Run checks that will generate a sqlite3 exception if there is corruption."""
283  sanity_check_passed = basic_sanity_check(cursor)
284  last_run_was_clean = last_run_was_recently_clean(cursor)
285 
286  if sanity_check_passed and last_run_was_clean:
287  _LOGGER.debug(
288  "The system was restarted cleanly and passed the basic sanity check"
289  )
290  return
291 
292  if not sanity_check_passed:
293  _LOGGER.warning(
294  "The database sanity check failed to validate the sqlite3 database at %s",
295  dbpath,
296  )
297 
298  if not last_run_was_clean:
299  _LOGGER.warning(
300  (
301  "The system could not validate that the sqlite3 database at %s was"
302  " shutdown cleanly"
303  ),
304  dbpath,
305  )
306 
307 
308 def move_away_broken_database(dbfile: str) -> None:
309  """Move away a broken sqlite3 database."""
310 
311  isotime = dt_util.utcnow().isoformat()
312  corrupt_postfix = f".corrupt.{isotime}"
313 
314  _LOGGER.error(
315  (
316  "The system will rename the corrupt database file %s to %s in order to"
317  " allow startup to proceed"
318  ),
319  dbfile,
320  f"{dbfile}{corrupt_postfix}",
321  )
322 
323  for postfix in SQLITE3_POSTFIXES:
324  path = f"{dbfile}{postfix}"
325  if not os.path.exists(path):
326  continue
327  os.rename(path, f"{path}{corrupt_postfix}")
328 
329 
330 def execute_on_connection(dbapi_connection: DBAPIConnection, statement: str) -> None:
331  """Execute a single statement with a dbapi connection."""
332  cursor = dbapi_connection.cursor()
333  cursor.execute(statement)
334  cursor.close()
335 
336 
337 def query_on_connection(dbapi_connection: DBAPIConnection, statement: str) -> Any:
338  """Execute a single statement with a dbapi connection and return the result."""
339  cursor = dbapi_connection.cursor()
340  cursor.execute(statement)
341  result = cursor.fetchall()
342  cursor.close()
343  return result
344 
345 
346 def _fail_unsupported_dialect(dialect_name: str) -> NoReturn:
347  """Warn about unsupported database version."""
348  _LOGGER.error(
349  (
350  "Database %s is not supported; Home Assistant supports %s. "
351  "Starting with Home Assistant 2022.6 this prevents the recorder from "
352  "starting. Please migrate your database to a supported software"
353  ),
354  dialect_name,
355  "MariaDB ≥ 10.3, MySQL ≥ 8.0, PostgreSQL ≥ 12, SQLite ≥ 3.31.0",
356  )
357  raise UnsupportedDialect
358 
359 
361  server_version: str, dialect_name: str, minimum_version: str
362 ) -> NoReturn:
363  """Warn about unsupported database version."""
364  _LOGGER.error(
365  (
366  "Version %s of %s is not supported; minimum supported version is %s. "
367  "Starting with Home Assistant 2022.6 this prevents the recorder from "
368  "starting. Please upgrade your database software"
369  ),
370  server_version,
371  dialect_name,
372  minimum_version,
373  )
374  raise UnsupportedDialect
375 
376 
377 @callback
379  hass: HomeAssistant, dialect_name: str
380 ) -> None:
381  """Delete the issue about upcoming unsupported database version."""
382  ir.async_delete_issue(hass, DOMAIN, f"{dialect_name}_too_old")
383 
384 
385 @callback
387  hass: HomeAssistant,
388  server_version: AwesomeVersion,
389  dialect_name: str,
390  min_version: AwesomeVersion,
391 ) -> None:
392  """Warn about upcoming unsupported database version."""
393  ir.async_create_issue(
394  hass,
395  DOMAIN,
396  f"{dialect_name}_too_old",
397  is_fixable=False,
398  severity=ir.IssueSeverity.CRITICAL,
399  translation_key=f"{dialect_name}_too_old",
400  translation_placeholders={
401  "server_version": str(server_version),
402  "min_version": str(min_version),
403  },
404  breaks_in_ha_version="2025.2.0",
405  )
406 
407 
409  server_response: str,
410 ) -> AwesomeVersion:
411  """Extract version from server response."""
412  return AwesomeVersion(
413  server_response,
414  ensure_strategy=AwesomeVersionStrategy.SIMPLEVER,
415  find_first_match=True,
416  )
417 
418 
420  server_response: str,
421 ) -> AwesomeVersion | None:
422  """Attempt to extract version from server response."""
423  try:
424  return _extract_version_from_server_response_or_raise(server_response)
425  except AwesomeVersionException:
426  return None
427 
428 
429 def _datetime_or_none(value: str) -> datetime | None:
430  """Fast version of mysqldb DateTime_or_None.
431 
432  https://github.com/PyMySQL/mysqlclient/blob/v2.1.0/MySQLdb/times.py#L66
433  """
434  try:
435  return ciso8601.parse_datetime(value)
436  except ValueError:
437  return None
438 
439 
440 def build_mysqldb_conv() -> dict:
441  """Build a MySQLDB conv dict that uses cisco8601 to parse datetimes."""
442  # Late imports since we only call this if they are using mysqldb
443  # pylint: disable=import-outside-toplevel
444  from MySQLdb.constants import FIELD_TYPE
445  from MySQLdb.converters import conversions
446 
447  return {**conversions, FIELD_TYPE.DATETIME: _datetime_or_none}
448 
449 
450 @callback
452  hass: HomeAssistant, version: AwesomeVersion
453 ) -> None:
454  """Create an issue for the index range regression in older MariaDB.
455 
456  The range scan issue was fixed in MariaDB 10.5.17, 10.6.9, 10.7.5, 10.8.4 and later.
457  """
458  if version >= MARIA_DB_108:
459  min_version = RECOMMENDED_MIN_VERSION_MARIA_DB_108
460  elif version >= MARIA_DB_107:
461  min_version = RECOMMENDED_MIN_VERSION_MARIA_DB_107
462  elif version >= MARIA_DB_106:
463  min_version = RECOMMENDED_MIN_VERSION_MARIA_DB_106
464  else:
465  min_version = RECOMMENDED_MIN_VERSION_MARIA_DB
466  ir.async_create_issue(
467  hass,
468  DOMAIN,
469  "maria_db_range_index_regression",
470  is_fixable=False,
471  severity=ir.IssueSeverity.CRITICAL,
472  learn_more_url="https://jira.mariadb.org/browse/MDEV-25020",
473  translation_key="maria_db_range_index_regression",
474  translation_placeholders={"min_version": str(min_version)},
475  )
476 
477 
478 @callback
480  hass: HomeAssistant,
481  local_start_time: datetime,
482 ) -> None:
483  """Create an issue when the backup fails because we run out of resources."""
484  ir.async_create_issue(
485  hass,
486  DOMAIN,
487  "backup_failed_out_of_resources",
488  is_fixable=False,
489  severity=ir.IssueSeverity.CRITICAL,
490  learn_more_url="https://www.home-assistant.io/integrations/recorder",
491  translation_key="backup_failed_out_of_resources",
492  translation_placeholders={"start_time": local_start_time.strftime("%H:%M:%S")},
493  )
494 
495 
497  instance: Recorder,
498  dialect_name: str,
499  dbapi_connection: DBAPIConnection,
500  first_connection: bool,
501 ) -> DatabaseEngine | None:
502  """Execute statements needed for dialect connection."""
503  version: AwesomeVersion | None = None
504  slow_range_in_select = False
505  if dialect_name == SupportedDialect.SQLITE:
506  max_bind_vars = SQLITE_MAX_BIND_VARS
507  if first_connection:
508  old_isolation = dbapi_connection.isolation_level # type: ignore[attr-defined]
509  dbapi_connection.isolation_level = None # type: ignore[attr-defined]
510  execute_on_connection(dbapi_connection, "PRAGMA journal_mode=WAL")
511  dbapi_connection.isolation_level = old_isolation # type: ignore[attr-defined]
512  # WAL mode only needs to be setup once
513  # instead of every time we open the sqlite connection
514  # as its persistent and isn't free to call every time.
515  result = query_on_connection(dbapi_connection, "SELECT sqlite_version()")
516  version_string = result[0][0]
517  version = _extract_version_from_server_response_or_raise(version_string)
518 
519  if version < MIN_VERSION_SQLITE:
521  version or version_string, "SQLite", MIN_VERSION_SQLITE
522  )
523 
524  # No elif here since _raise_if_version_unsupported raises
525  if version < UPCOMING_MIN_VERSION_SQLITE:
526  instance.hass.add_job(
527  _async_create_issue_deprecated_version,
528  instance.hass,
529  version or version_string,
530  dialect_name,
531  UPCOMING_MIN_VERSION_SQLITE,
532  )
533  else:
534  instance.hass.add_job(
535  _async_delete_issue_deprecated_version, instance.hass, dialect_name
536  )
537 
538  if version and version > MIN_VERSION_SQLITE_MODERN_BIND_VARS:
539  max_bind_vars = SQLITE_MODERN_MAX_BIND_VARS
540 
541  # The upper bound on the cache size is approximately 16MiB of memory
542  execute_on_connection(dbapi_connection, "PRAGMA cache_size = -16384")
543 
544  #
545  # Enable FULL synchronous if they have a commit interval of 0
546  # or NORMAL if they do not.
547  #
548  # https://sqlite.org/pragma.html#pragma_synchronous
549  # The synchronous=NORMAL setting is a good choice for most applications
550  # running in WAL mode.
551  #
552  synchronous = "NORMAL" if instance.commit_interval else "FULL"
553  execute_on_connection(dbapi_connection, f"PRAGMA synchronous={synchronous}")
554 
555  # enable support for foreign keys
556  execute_on_connection(dbapi_connection, "PRAGMA foreign_keys=ON")
557 
558  elif dialect_name == SupportedDialect.MYSQL:
559  max_bind_vars = DEFAULT_MAX_BIND_VARS
560  execute_on_connection(dbapi_connection, "SET session wait_timeout=28800")
561  if first_connection:
562  result = query_on_connection(dbapi_connection, "SELECT VERSION()")
563  version_string = result[0][0]
564  version = _extract_version_from_server_response(version_string)
565  is_maria_db = "mariadb" in version_string.lower()
566 
567  if is_maria_db:
568  if not version or version < MIN_VERSION_MARIA_DB:
570  version or version_string, "MariaDB", MIN_VERSION_MARIA_DB
571  )
572  if version and (
573  (version < RECOMMENDED_MIN_VERSION_MARIA_DB)
574  or (MARIA_DB_106 <= version < RECOMMENDED_MIN_VERSION_MARIA_DB_106)
575  or (MARIA_DB_107 <= version < RECOMMENDED_MIN_VERSION_MARIA_DB_107)
576  or (MARIA_DB_108 <= version < RECOMMENDED_MIN_VERSION_MARIA_DB_108)
577  ):
578  instance.hass.add_job(
579  _async_create_mariadb_range_index_regression_issue,
580  instance.hass,
581  version,
582  )
583 
584  elif not version or version < MIN_VERSION_MYSQL:
586  version or version_string, "MySQL", MIN_VERSION_MYSQL
587  )
588 
589  slow_range_in_select = bool(
590  not version
591  or version < MARIADB_WITH_FIXED_IN_QUERIES_105
592  or MARIA_DB_106 <= version < MARIADB_WITH_FIXED_IN_QUERIES_106
593  or MARIA_DB_107 <= version < MARIADB_WITH_FIXED_IN_QUERIES_107
594  or MARIA_DB_108 <= version < MARIADB_WITH_FIXED_IN_QUERIES_108
595  )
596 
597  # Ensure all times are using UTC to avoid issues with daylight savings
598  execute_on_connection(dbapi_connection, "SET time_zone = '+00:00'")
599  elif dialect_name == SupportedDialect.POSTGRESQL:
600  max_bind_vars = DEFAULT_MAX_BIND_VARS
601  if first_connection:
602  # server_version_num was added in 2006
603  result = query_on_connection(dbapi_connection, "SHOW server_version")
604  version_string = result[0][0]
605  version = _extract_version_from_server_response(version_string)
606  if not version or version < MIN_VERSION_PGSQL:
608  version or version_string, "PostgreSQL", MIN_VERSION_PGSQL
609  )
610 
611  else:
612  _fail_unsupported_dialect(dialect_name)
613 
614  if not first_connection:
615  return None
616 
617  return DatabaseEngine(
618  dialect=SupportedDialect(dialect_name),
619  version=version,
620  optimizer=DatabaseOptimizer(slow_range_in_select=slow_range_in_select),
621  max_bind_vars=max_bind_vars,
622  )
623 
624 
625 def end_incomplete_runs(session: Session, start_time: datetime) -> None:
626  """End any incomplete recorder runs."""
627  for run in session.query(RecorderRuns).filter_by(end=None):
628  run.closed_incorrect = True
629  run.end = start_time
630  _LOGGER.warning(
631  "Ended unfinished session (id=%s from %s)", run.run_id, run.start
632  )
633  session.add(run)
634 
635 
636 def _is_retryable_error(instance: Recorder, err: OperationalError) -> bool:
637  """Return True if the error is retryable."""
638  assert instance.engine is not None
639  return bool(
640  instance.engine.dialect.name == SupportedDialect.MYSQL
641  and isinstance(err.orig, BaseException)
642  and err.orig.args
643  and err.orig.args[0] in RETRYABLE_MYSQL_ERRORS
644  )
645 
646 
647 type _FuncType[**P, R] = Callable[Concatenate[Recorder, P], R]
648 type _MethType[Self, **P, R] = Callable[Concatenate[Self, Recorder, P], R]
649 type _FuncOrMethType[**_P, _R] = Callable[_P, _R]
650 
651 
652 def retryable_database_job[**_P](
653  description: str,
654 ) -> Callable[[_FuncType[_P, bool]], _FuncType[_P, bool]]:
655  """Execute a database job repeatedly until it succeeds.
656 
657  The job should return True if it finished, and False if it needs to be rescheduled.
658  """
659 
660  def decorator(job: _FuncType[_P, bool]) -> _FuncType[_P, bool]:
661  return _wrap_retryable_database_job_func_or_meth(job, description, False)
662 
663  return decorator
664 
665 
666 def retryable_database_job_method[_Self, **_P](
667  description: str,
668 ) -> Callable[[_MethType[_Self, _P, bool]], _MethType[_Self, _P, bool]]:
669  """Execute a database job repeatedly until it succeeds.
670 
671  The job should return True if it finished, and False if it needs to be rescheduled.
672  """
673 
674  def decorator(job: _MethType[_Self, _P, bool]) -> _MethType[_Self, _P, bool]:
675  return _wrap_retryable_database_job_func_or_meth(job, description, True)
676 
677  return decorator
678 
679 
680 def _wrap_retryable_database_job_func_or_meth[**_P](
681  job: _FuncOrMethType[_P, bool], description: str, method: bool
682 ) -> _FuncOrMethType[_P, bool]:
683  recorder_pos = 1 if method else 0
684 
685  @functools.wraps(job)
686  def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> bool:
687  instance: Recorder = args[recorder_pos] # type: ignore[assignment]
688  try:
689  return job(*args, **kwargs)
690  except OperationalError as err:
691  if _is_retryable_error(instance, err):
692  assert isinstance(err.orig, BaseException) # noqa: PT017
693  _LOGGER.info(
694  "%s; %s not completed, retrying", err.orig.args[1], description
695  )
696  time.sleep(instance.db_retry_wait)
697  # Failed with retryable error
698  return False
699 
700  _LOGGER.warning("Error executing %s: %s", description, err)
701 
702  # Failed with permanent error
703  return True
704 
705  return wrapper
706 
707 
708 def database_job_retry_wrapper[**_P, _R](
709  description: str, attempts: int
710 ) -> Callable[[_FuncType[_P, _R]], _FuncType[_P, _R]]:
711  """Execute a database job repeatedly until it succeeds, at most attempts times.
712 
713  This wrapper handles InnoDB deadlocks and lock timeouts.
714 
715  This is different from retryable_database_job in that it will retry the job
716  attempts number of times instead of returning False if the job fails.
717  """
718 
719  def decorator(
720  job: _FuncType[_P, _R],
721  ) -> _FuncType[_P, _R]:
722  return _database_job_retry_wrapper_func_or_meth(
723  job, description, attempts, False
724  )
725 
726  return decorator
727 
728 
729 def database_job_retry_wrapper_method[_Self, **_P, _R](
730  description: str, attempts: int
731 ) -> Callable[[_MethType[_Self, _P, _R]], _MethType[_Self, _P, _R]]:
732  """Execute a database job repeatedly until it succeeds, at most attempts times.
733 
734  This wrapper handles InnoDB deadlocks and lock timeouts.
735 
736  This is different from retryable_database_job in that it will retry the job
737  attempts number of times instead of returning False if the job fails.
738  """
739 
740  def decorator(
741  job: _MethType[_Self, _P, _R],
742  ) -> _MethType[_Self, _P, _R]:
743  return _database_job_retry_wrapper_func_or_meth(
744  job, description, attempts, True
745  )
746 
747  return decorator
748 
749 
750 def _database_job_retry_wrapper_func_or_meth[**_P, _R](
751  job: _FuncOrMethType[_P, _R],
752  description: str,
753  attempts: int,
754  method: bool,
755 ) -> _FuncOrMethType[_P, _R]:
756  recorder_pos = 1 if method else 0
757 
758  @functools.wraps(job)
759  def wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
760  instance: Recorder = args[recorder_pos] # type: ignore[assignment]
761  for attempt in range(attempts):
762  try:
763  return job(*args, **kwargs)
764  except OperationalError as err:
765  # Failed with retryable error
766  if attempt == attempts - 1 or not _is_retryable_error(instance, err):
767  raise
768  assert isinstance(err.orig, BaseException) # noqa: PT017
769  _LOGGER.info("%s; %s failed, retrying", err.orig.args[1], description)
770  time.sleep(instance.db_retry_wait)
771 
772  raise ValueError("attempts must be a positive integer")
773 
774  return wrapper
775 
776 
777 def periodic_db_cleanups(instance: Recorder) -> None:
778  """Run any database cleanups that need to happen periodically.
779 
780  These cleanups will happen nightly or after any purge.
781  """
782  assert instance.engine is not None
783  if instance.engine.dialect.name == SupportedDialect.SQLITE:
784  # Execute sqlite to create a wal checkpoint and free up disk space
785  _LOGGER.debug("WAL checkpoint")
786  with instance.engine.connect() as connection:
787  connection.execute(text("PRAGMA wal_checkpoint(TRUNCATE);"))
788  connection.execute(text("PRAGMA OPTIMIZE;"))
789 
790 
791 @contextmanager
792 def write_lock_db_sqlite(instance: Recorder) -> Generator[None]:
793  """Lock database for writes."""
794  assert instance.engine is not None
795  with instance.engine.connect() as connection:
796  # Execute sqlite to create a wal checkpoint
797  # This is optional but makes sure the backup is going to be minimal
798  connection.execute(text("PRAGMA wal_checkpoint(TRUNCATE)"))
799  # Create write lock
800  _LOGGER.debug("Lock database")
801  connection.execute(text("BEGIN IMMEDIATE;"))
802  try:
803  yield
804  finally:
805  _LOGGER.debug("Unlock database")
806  connection.execute(text("END;"))
807 
808 
809 def async_migration_in_progress(hass: HomeAssistant) -> bool:
810  """Determine if a migration is in progress.
811 
812  This is a thin wrapper that allows us to change
813  out the implementation later.
814  """
815  if DATA_INSTANCE not in hass.data:
816  return False
817  return hass.data[DATA_INSTANCE].migration_in_progress
818 
819 
820 def async_migration_is_live(hass: HomeAssistant) -> bool:
821  """Determine if a migration is live.
822 
823  This is a thin wrapper that allows us to change
824  out the implementation later.
825  """
826  if DATA_INSTANCE not in hass.data:
827  return False
828  return hass.data[DATA_INSTANCE].migration_is_live
829 
830 
831 def second_sunday(year: int, month: int) -> date:
832  """Return the datetime.date for the second sunday of a month."""
833  second = date(year, month, FIRST_POSSIBLE_SUNDAY)
834  day_of_week = second.weekday()
835  if day_of_week == SUNDAY_WEEKDAY:
836  return second
837  return second.replace(
838  day=(FIRST_POSSIBLE_SUNDAY + (SUNDAY_WEEKDAY - day_of_week) % DAYS_IN_WEEK)
839  )
840 
841 
842 def is_second_sunday(date_time: datetime) -> bool:
843  """Check if a time is the second sunday of the month."""
844  return bool(second_sunday(date_time.year, date_time.month).day == date_time.day)
845 
846 
847 PERIOD_SCHEMA = vol.Schema(
848  {
849  vol.Exclusive("calendar", "period"): vol.Schema(
850  {
851  vol.Required("period"): vol.Any("hour", "day", "week", "month", "year"),
852  vol.Optional("offset"): int,
853  }
854  ),
855  vol.Exclusive("fixed_period", "period"): vol.Schema(
856  {
857  vol.Optional("start_time"): vol.All(cv.datetime, dt_util.as_utc),
858  vol.Optional("end_time"): vol.All(cv.datetime, dt_util.as_utc),
859  }
860  ),
861  vol.Exclusive("rolling_window", "period"): vol.Schema(
862  {
863  vol.Required("duration"): cv.time_period_dict,
864  vol.Optional("offset"): cv.time_period_dict,
865  }
866  ),
867  }
868 )
869 
870 
872  period_def: StatisticPeriod,
873 ) -> tuple[datetime | None, datetime | None]:
874  """Return start and end datetimes for a statistic period definition."""
875  start_time = None
876  end_time = None
877 
878  if "calendar" in period_def:
879  calendar_period = period_def["calendar"]["period"]
880  start_of_day = dt_util.start_of_local_day()
881  cal_offset = period_def["calendar"].get("offset", 0)
882  if calendar_period == "hour":
883  start_time = dt_util.now().replace(minute=0, second=0, microsecond=0)
884  start_time += timedelta(hours=cal_offset)
885  end_time = start_time + timedelta(hours=1)
886  elif calendar_period == "day":
887  start_time = start_of_day
888  start_time += timedelta(days=cal_offset)
889  end_time = start_time + timedelta(days=1)
890  elif calendar_period == "week":
891  start_time = start_of_day - timedelta(days=start_of_day.weekday())
892  start_time += timedelta(days=cal_offset * 7)
893  end_time = start_time + timedelta(weeks=1)
894  elif calendar_period == "month":
895  start_time = start_of_day.replace(day=28)
896  # This works for up to 48 months of offset
897  start_time = (start_time + timedelta(days=cal_offset * 31)).replace(day=1)
898  end_time = (start_time + timedelta(days=31)).replace(day=1)
899  else: # calendar_period = "year"
900  start_time = start_of_day.replace(month=12, day=31)
901  # This works for 100+ years of offset
902  start_time = (start_time + timedelta(days=cal_offset * 366)).replace(
903  month=1, day=1
904  )
905  end_time = (start_time + timedelta(days=366)).replace(day=1)
906 
907  start_time = dt_util.as_utc(start_time)
908  end_time = dt_util.as_utc(end_time)
909 
910  elif "fixed_period" in period_def:
911  start_time = period_def["fixed_period"].get("start_time")
912  end_time = period_def["fixed_period"].get("end_time")
913 
914  elif "rolling_window" in period_def:
915  duration = period_def["rolling_window"]["duration"]
916  now = dt_util.utcnow()
917  start_time = now - duration
918  end_time = start_time + duration
919 
920  if offset := period_def["rolling_window"].get("offset"):
921  start_time += offset
922  end_time += offset
923 
924  return (start_time, end_time)
925 
926 
927 def get_index_by_name(session: Session, table_name: str, index_name: str) -> str | None:
928  """Get an index by name."""
929  connection = session.connection()
930  inspector = inspect(connection)
931  indexes = inspector.get_indexes(table_name)
932  return next(
933  (
934  possible_index["name"]
935  for possible_index in indexes
936  if possible_index["name"]
937  and (
938  possible_index["name"] == index_name
939  or possible_index["name"].endswith(f"_{index_name}")
940  )
941  ),
942  None,
943  )
944 
945 
947  instance: Recorder, row_type: str
948 ) -> Callable[[Exception], bool]:
949  """Create a filter for unique constraint integrity errors."""
950 
951  def _filter_unique_constraint_integrity_error(err: Exception) -> bool:
952  """Handle unique constraint integrity errors."""
953  if not isinstance(err, StatementError):
954  return False
955 
956  assert instance.engine is not None
957  dialect_name = instance.engine.dialect.name
958 
959  ignore = False
960  if (
961  dialect_name == SupportedDialect.SQLITE
962  and "UNIQUE constraint failed" in str(err)
963  ):
964  ignore = True
965  if (
966  dialect_name == SupportedDialect.POSTGRESQL
967  and err.orig
968  and hasattr(err.orig, "pgcode")
969  and err.orig.pgcode == "23505"
970  ):
971  ignore = True
972  if (
973  dialect_name == SupportedDialect.MYSQL
974  and err.orig
975  and hasattr(err.orig, "args")
976  ):
977  with contextlib.suppress(TypeError):
978  if err.orig.args[0] == 1062:
979  ignore = True
980 
981  if ignore:
982  _LOGGER.warning(
983  (
984  "Blocked attempt to insert duplicated %s rows, please report"
985  " at %s"
986  ),
987  row_type,
988  "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
989  exc_info=err,
990  )
991 
992  return ignore
993 
994  return _filter_unique_constraint_integrity_error
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
bool _is_retryable_error(Recorder instance, OperationalError err)
Definition: util.py:636
tuple[datetime|None, datetime|None] resolve_period(StatisticPeriod period_def)
Definition: util.py:873
DatabaseEngine|None setup_connection_for_dialect(Recorder instance, str dialect_name, DBAPIConnection dbapi_connection, bool first_connection)
Definition: util.py:501
None async_create_backup_failure_issue(HomeAssistant hass, datetime local_start_time)
Definition: util.py:482
None end_incomplete_runs(Session session, datetime start_time)
Definition: util.py:625
None execute_on_connection(DBAPIConnection dbapi_connection, str statement)
Definition: util.py:330
bool basic_sanity_check(SQLiteCursor cursor)
Definition: util.py:252
date second_sunday(int year, int month)
Definition: util.py:831
bool validate_or_move_away_sqlite_database(str dburl)
Definition: util.py:211
bool last_run_was_recently_clean(SQLiteCursor cursor)
Definition: util.py:231
None _async_delete_issue_deprecated_version(HomeAssistant hass, str dialect_name)
Definition: util.py:380
None periodic_db_cleanups(Recorder instance)
Definition: util.py:777
NoReturn _raise_if_version_unsupported(str server_version, str dialect_name, str minimum_version)
Definition: util.py:362
Generator[None] write_lock_db_sqlite(Recorder instance)
Definition: util.py:792
AwesomeVersion|None _extract_version_from_server_response(str server_response)
Definition: util.py:421
Any query_on_connection(DBAPIConnection dbapi_connection, str statement)
Definition: util.py:337
None _async_create_issue_deprecated_version(HomeAssistant hass, AwesomeVersion server_version, str dialect_name, AwesomeVersion min_version)
Definition: util.py:391
bool async_migration_in_progress(HomeAssistant hass)
Definition: util.py:809
list[Row] execute(Query qry, bool to_native=False, bool validate_entity_ids=True)
Definition: util.py:121
bool validate_sqlite_database(str dbpath)
Definition: util.py:266
None _async_create_mariadb_range_index_regression_issue(HomeAssistant hass, AwesomeVersion version)
Definition: util.py:453
None move_away_broken_database(str dbfile)
Definition: util.py:308
str|None get_index_by_name(Session session, str table_name, str index_name)
Definition: util.py:927
AwesomeVersion _extract_version_from_server_response_or_raise(str server_response)
Definition: util.py:410
None run_checks_on_open_db(str dbpath, SQLiteCursor cursor)
Definition: util.py:281
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
bool is_second_sunday(datetime date_time)
Definition: util.py:842
datetime|None _datetime_or_none(str value)
Definition: util.py:429
Callable[[Exception], bool] filter_unique_constraint_integrity_error(Recorder instance, str row_type)
Definition: util.py:948
AwesomeVersion _simple_version(str version)
Definition: util.py:79
NoReturn _fail_unsupported_dialect(str dialect_name)
Definition: util.py:346
bool async_migration_is_live(HomeAssistant hass)
Definition: util.py:820