Home Assistant Unofficial Reference 2024.12.1
migration.py
Go to the documentation of this file.
1 """Schema migration helpers."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 from collections.abc import Callable, Iterable
7 import contextlib
8 from dataclasses import dataclass, replace as dataclass_replace
9 from datetime import timedelta
10 import logging
11 from time import time
12 from typing import TYPE_CHECKING, Any, cast, final
13 from uuid import UUID
14 
15 import sqlalchemy
16 from sqlalchemy import ForeignKeyConstraint, MetaData, Table, func, text, update
17 from sqlalchemy.engine import CursorResult, Engine
18 from sqlalchemy.exc import (
19  DatabaseError,
20  IntegrityError,
21  InternalError,
22  OperationalError,
23  ProgrammingError,
24  SQLAlchemyError,
25 )
26 from sqlalchemy.orm.session import Session
27 from sqlalchemy.schema import AddConstraint, CreateTable, DropConstraint
28 from sqlalchemy.sql.expression import true
29 from sqlalchemy.sql.lambdas import StatementLambdaElement
30 
31 from homeassistant.core import HomeAssistant
32 from homeassistant.util.enum import try_parse_enum
33 from homeassistant.util.ulid import ulid_at_time, ulid_to_bytes
34 
35 from .auto_repairs.events.schema import (
36  correct_db_schema as events_correct_db_schema,
37  validate_db_schema as events_validate_db_schema,
38 )
39 from .auto_repairs.states.schema import (
40  correct_db_schema as states_correct_db_schema,
41  validate_db_schema as states_validate_db_schema,
42 )
43 from .auto_repairs.statistics.duplicates import (
44  delete_statistics_duplicates,
45  delete_statistics_meta_duplicates,
46 )
47 from .auto_repairs.statistics.schema import (
48  correct_db_schema as statistics_correct_db_schema,
49  validate_db_schema as statistics_validate_db_schema,
50 )
51 from .const import (
52  CONTEXT_ID_AS_BINARY_SCHEMA_VERSION,
53  EVENT_TYPE_IDS_SCHEMA_VERSION,
54  LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
55  STATES_META_SCHEMA_VERSION,
56  SupportedDialect,
57 )
58 from .db_schema import (
59  BIG_INTEGER_SQL,
60  CONTEXT_ID_BIN_MAX_LENGTH,
61  DOUBLE_PRECISION_TYPE_SQL,
62  LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX,
63  LEGACY_STATES_EVENT_ID_INDEX,
64  MYSQL_COLLATE,
65  MYSQL_DEFAULT_CHARSET,
66  SCHEMA_VERSION,
67  STATISTICS_TABLES,
68  TABLE_STATES,
69  Base,
70  Events,
71  EventTypes,
72  LegacyBase,
73  MigrationChanges,
74  SchemaChanges,
75  States,
76  StatesMeta,
77  Statistics,
78  StatisticsMeta,
79  StatisticsRuns,
80  StatisticsShortTerm,
81 )
82 from .models import process_timestamp
83 from .models.time import datetime_to_timestamp_or_none
84 from .queries import (
85  batch_cleanup_entity_ids,
86  delete_duplicate_short_term_statistics_row,
87  delete_duplicate_statistics_row,
88  find_entity_ids_to_migrate,
89  find_event_type_to_migrate,
90  find_events_context_ids_to_migrate,
91  find_states_context_ids_to_migrate,
92  find_unmigrated_short_term_statistics_rows,
93  find_unmigrated_statistics_rows,
94  get_migration_changes,
95  has_entity_ids_to_migrate,
96  has_event_type_to_migrate,
97  has_events_context_ids_to_migrate,
98  has_states_context_ids_to_migrate,
99  has_used_states_entity_ids,
100  has_used_states_event_ids,
101  migrate_single_short_term_statistics_row_to_timestamp,
102  migrate_single_statistics_row_to_timestamp,
103 )
104 from .statistics import cleanup_statistics_timestamp_migration, get_start_time
105 from .tasks import RecorderTask
106 from .util import (
107  database_job_retry_wrapper,
108  database_job_retry_wrapper_method,
109  execute_stmt_lambda_element,
110  get_index_by_name,
111  retryable_database_job_method,
112  session_scope,
113 )
114 
115 if TYPE_CHECKING:
116  from . import Recorder
117 
118 # Live schema migration supported starting from schema version 42 or newer
119 # Schema version 41 was introduced in HA Core 2023.4
120 # Schema version 42 was introduced in HA Core 2023.11
121 LIVE_MIGRATION_MIN_SCHEMA_VERSION = 42
122 
123 MIGRATION_NOTE_OFFLINE = (
124  "Note: this may take several hours on large databases and slow machines. "
125  "Home Assistant will not start until the upgrade is completed. Please be patient "
126  "and do not turn off or restart Home Assistant while the upgrade is in progress!"
127 )
128 MIGRATION_NOTE_MINUTES = (
129  "Note: this may take several minutes on large databases and slow machines. "
130  "Please be patient!"
131 )
132 MIGRATION_NOTE_WHILE = "This will take a while; please be patient!"
133 
134 _EMPTY_ENTITY_ID = "missing.entity_id"
135 _EMPTY_EVENT_TYPE = "missing_event_type"
136 
137 _LOGGER = logging.getLogger(__name__)
138 
139 
140 @dataclass
142  big_int_type: str
143  timestamp_type: str
144  context_bin_type: str
145 
146 
147 _MYSQL_COLUMN_TYPES = _ColumnTypesForDialect(
148  big_int_type="INTEGER(20)",
149  timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
150  context_bin_type=f"BLOB({CONTEXT_ID_BIN_MAX_LENGTH})",
151 )
152 
153 _POSTGRESQL_COLUMN_TYPES = _ColumnTypesForDialect(
154  big_int_type="INTEGER",
155  timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
156  context_bin_type="BYTEA",
157 )
158 
159 _SQLITE_COLUMN_TYPES = _ColumnTypesForDialect(
160  big_int_type="INTEGER",
161  timestamp_type="FLOAT",
162  context_bin_type="BLOB",
163 )
164 
165 _COLUMN_TYPES_FOR_DIALECT: dict[SupportedDialect | None, _ColumnTypesForDialect] = {
166  SupportedDialect.MYSQL: _MYSQL_COLUMN_TYPES,
167  SupportedDialect.POSTGRESQL: _POSTGRESQL_COLUMN_TYPES,
168  SupportedDialect.SQLITE: _SQLITE_COLUMN_TYPES,
169 }
170 
171 
172 def raise_if_exception_missing_str(ex: Exception, match_substrs: Iterable[str]) -> None:
173  """Raise if the exception and cause do not contain the match substrs."""
174  lower_ex_strs = [str(ex).lower(), str(ex.__cause__).lower()]
175  for str_sub in match_substrs:
176  for exc_str in lower_ex_strs:
177  if exc_str and str_sub in exc_str:
178  return
179 
180  raise ex
181 
182 
183 def _get_schema_version(session: Session) -> int | None:
184  """Get the schema version."""
185  res = (
186  session.query(SchemaChanges.schema_version)
187  .order_by(SchemaChanges.change_id.desc())
188  .first()
189  )
190  return getattr(res, "schema_version", None)
191 
192 
193 def get_schema_version(session_maker: Callable[[], Session]) -> int | None:
194  """Get the schema version."""
195  try:
196  with session_scope(session=session_maker(), read_only=True) as session:
197  return _get_schema_version(session)
198  except Exception:
199  _LOGGER.exception("Error when determining DB schema version")
200  return None
201 
202 
203 @dataclass(frozen=True, kw_only=True)
205  """Store schema validation status."""
206 
207  current_version: int
208  migration_needed: bool
209  non_live_data_migration_needed: bool
210  schema_errors: set[str]
211  start_version: int
212 
213 
214 def _schema_is_current(current_version: int) -> bool:
215  """Check if the schema is current."""
216  return current_version == SCHEMA_VERSION
217 
218 
220  hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
221 ) -> SchemaValidationStatus | None:
222  """Check if the schema is valid.
223 
224  This checks that the schema is the current version as well as for some common schema
225  errors caused by manual migration between database engines, for example importing an
226  SQLite database to MariaDB.
227  """
228  schema_errors: set[str] = set()
229 
230  current_version = get_schema_version(session_maker)
231  if current_version is None:
232  return None
233 
234  if is_current := _schema_is_current(current_version):
235  # We can only check for further errors if the schema is current, because
236  # columns may otherwise not exist etc.
237  schema_errors = _find_schema_errors(hass, instance, session_maker)
238 
239  schema_migration_needed = not is_current
240  _non_live_data_migration_needed = non_live_data_migration_needed(
241  instance, session_maker, current_version
242  )
243 
244  return SchemaValidationStatus(
245  current_version=current_version,
246  non_live_data_migration_needed=_non_live_data_migration_needed,
247  migration_needed=schema_migration_needed or _non_live_data_migration_needed,
248  schema_errors=schema_errors,
249  start_version=current_version,
250  )
251 
252 
254  hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
255 ) -> set[str]:
256  """Find schema errors."""
257  schema_errors: set[str] = set()
258  schema_errors |= statistics_validate_db_schema(instance)
259  schema_errors |= states_validate_db_schema(instance)
260  schema_errors |= events_validate_db_schema(instance)
261  return schema_errors
262 
263 
264 def live_migration(schema_status: SchemaValidationStatus) -> bool:
265  """Check if live migration is possible."""
266  return (
267  schema_status.current_version >= LIVE_MIGRATION_MIN_SCHEMA_VERSION
268  and not schema_status.non_live_data_migration_needed
269  )
270 
271 
272 def pre_migrate_schema(engine: Engine) -> None:
273  """Prepare for migration.
274 
275  This function is called before calling Base.metadata.create_all.
276  """
277  inspector = sqlalchemy.inspect(engine)
278 
279  if inspector.has_table("statistics_meta") and not inspector.has_table(
280  "statistics_short_term"
281  ):
282  # Prepare for migration from schema with statistics_meta table but no
283  # statistics_short_term table
284  LegacyBase.metadata.create_all(
285  engine, (LegacyBase.metadata.tables["statistics_short_term"],)
286  )
287 
288 
290  instance: Recorder,
291  hass: HomeAssistant,
292  engine: Engine,
293  session_maker: Callable[[], Session],
294  schema_status: SchemaValidationStatus,
295  end_version: int,
296 ) -> SchemaValidationStatus:
297  """Check if the schema needs to be upgraded."""
298  current_version = schema_status.current_version
299  start_version = schema_status.start_version
300 
301  if current_version < end_version:
302  _LOGGER.warning(
303  "The database is about to upgrade from schema version %s to %s%s",
304  current_version,
305  end_version,
306  (
307  f". {MIGRATION_NOTE_OFFLINE}"
308  if current_version < LIVE_MIGRATION_MIN_SCHEMA_VERSION
309  else ""
310  ),
311  )
312  schema_status = dataclass_replace(schema_status, current_version=end_version)
313 
314  for version in range(current_version, end_version):
315  new_version = version + 1
316  _LOGGER.info("Upgrading recorder db schema to version %s", new_version)
317  _apply_update(instance, hass, engine, session_maker, new_version, start_version)
318  with session_scope(session=session_maker()) as session:
319  session.add(SchemaChanges(schema_version=new_version))
320 
321  # Log at the same level as the long schema changes
322  # so its clear that the upgrade is done
323  _LOGGER.warning("Upgrade to version %s done", new_version)
324 
325  return schema_status
326 
327 
329  instance: Recorder,
330  hass: HomeAssistant,
331  engine: Engine,
332  session_maker: Callable[[], Session],
333  schema_status: SchemaValidationStatus,
334 ) -> SchemaValidationStatus:
335  """Check if the schema needs to be upgraded."""
336  end_version = LIVE_MIGRATION_MIN_SCHEMA_VERSION
337  return _migrate_schema(
338  instance, hass, engine, session_maker, schema_status, end_version
339  )
340 
341 
343  instance: Recorder,
344  hass: HomeAssistant,
345  engine: Engine,
346  session_maker: Callable[[], Session],
347  schema_status: SchemaValidationStatus,
348 ) -> SchemaValidationStatus:
349  """Check if the schema needs to be upgraded."""
350  end_version = SCHEMA_VERSION
351  schema_status = _migrate_schema(
352  instance, hass, engine, session_maker, schema_status, end_version
353  )
354 
355  # Repairs are currently done during the live migration
356  if schema_errors := schema_status.schema_errors:
357  _LOGGER.warning(
358  "Database is about to correct DB schema errors: %s",
359  ", ".join(sorted(schema_errors)),
360  )
361  statistics_correct_db_schema(instance, schema_errors)
362  states_correct_db_schema(instance, schema_errors)
363  events_correct_db_schema(instance, schema_errors)
364 
365  return schema_status
366 
367 
368 def _get_migration_changes(session: Session) -> dict[str, int]:
369  """Return migration changes as a dict."""
370  migration_changes: dict[str, int] = {
371  row[0]: row[1]
373  }
374  return migration_changes
375 
376 
378  instance: Recorder,
379  session_maker: Callable[[], Session],
380  schema_version: int,
381 ) -> bool:
382  """Return True if non-live data migration is needed.
383 
384  This must only be called if database schema is current.
385  """
386  migration_needed = False
387  with session_scope(session=session_maker()) as session:
388  migration_changes = _get_migration_changes(session)
389  for migrator_cls in NON_LIVE_DATA_MIGRATORS:
390  migrator = migrator_cls(schema_version, migration_changes)
391  migration_needed |= migrator.needs_migrate(instance, session)
392 
393  return migration_needed
394 
395 
397  instance: Recorder,
398  session_maker: Callable[[], Session],
399  schema_status: SchemaValidationStatus,
400 ) -> None:
401  """Do non-live data migration.
402 
403  This must be called after non-live schema migration is completed.
404  """
405  with session_scope(session=session_maker()) as session:
406  migration_changes = _get_migration_changes(session)
407 
408  for migrator_cls in NON_LIVE_DATA_MIGRATORS:
409  migrator = migrator_cls(schema_status.start_version, migration_changes)
410  migrator.migrate_all(instance, session_maker)
411 
412 
414  instance: Recorder,
415  session_maker: Callable[[], Session],
416  schema_status: SchemaValidationStatus,
417 ) -> None:
418  """Queue live schema migration tasks.
419 
420  This must be called after live schema migration is completed.
421  """
422  with session_scope(session=session_maker()) as session:
423  migration_changes = _get_migration_changes(session)
424 
425  for migrator_cls in LIVE_DATA_MIGRATORS:
426  migrator = migrator_cls(schema_status.start_version, migration_changes)
427  migrator.queue_migration(instance, session)
428 
429 
431  session_maker: Callable[[], Session], table_name: str, index_name: str
432 ) -> None:
433  """Create an index for the specified table.
434 
435  The index name should match the name given for the index
436  within the table definition described in the models
437  """
438  table = Table(table_name, Base.metadata)
439  _LOGGER.debug("Looking up index %s for table %s", index_name, table_name)
440  # Look up the index object by name from the table is the models
441  index_list = [idx for idx in table.indexes if idx.name == index_name]
442  if not index_list:
443  _LOGGER.debug("The index %s no longer exists", index_name)
444  return
445  index = index_list[0]
446  _LOGGER.debug("Creating %s index", index_name)
447  _LOGGER.warning(
448  "Adding index `%s` to table `%s`. %s",
449  index_name,
450  table_name,
451  MIGRATION_NOTE_MINUTES,
452  )
453  with session_scope(session=session_maker()) as session:
454  try:
455  connection = session.connection()
456  index.create(connection)
457  except (InternalError, OperationalError, ProgrammingError) as err:
458  raise_if_exception_missing_str(err, ["already exists", "duplicate"])
459  _LOGGER.warning(
460  "Index %s already exists on %s, continuing", index_name, table_name
461  )
462 
463  _LOGGER.warning("Finished adding index `%s` to table `%s`", index_name, table_name)
464 
465 
467  session_maker: Callable[[], Session], query: str, errors: list[str]
468 ) -> bool:
469  """Execute a query or collect an error."""
470  with session_scope(session=session_maker()) as session:
471  try:
472  session.connection().execute(text(query))
473  except SQLAlchemyError as err:
474  errors.append(str(err))
475  return False
476  return True
477 
478 
480  session_maker: Callable[[], Session],
481  table_name: str,
482  index_name: str,
483  quiet: bool | None = None,
484 ) -> None:
485  """Drop an index from a specified table.
486 
487  There is no universal way to do something like `DROP INDEX IF EXISTS`
488  so we will simply execute the DROP command and ignore any exceptions
489 
490  WARNING: Due to some engines (MySQL at least) being unable to use bind
491  parameters in a DROP INDEX statement (at least via SQLAlchemy), the query
492  string here is generated from the method parameters without sanitizing.
493  DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT.
494  """
495  _LOGGER.warning(
496  "Dropping index `%s` from table `%s`. %s",
497  index_name,
498  table_name,
499  MIGRATION_NOTE_MINUTES,
500  )
501  index_to_drop: str | None = None
502  with session_scope(session=session_maker()) as session:
503  index_to_drop = get_index_by_name(session, table_name, index_name)
504 
505  if index_to_drop is None:
506  _LOGGER.warning(
507  "The index `%s` on table `%s` no longer exists", index_name, table_name
508  )
509  return
510 
511  errors: list[str] = []
512  for query in (
513  # Engines like DB2/Oracle
514  f"DROP INDEX {index_name}",
515  # Engines like SQLite, SQL Server
516  f"DROP INDEX {table_name}.{index_name}",
517  # Engines like MySQL, MS Access
518  f"DROP INDEX {index_name} ON {table_name}",
519  # Engines like postgresql may have a prefix
520  # ex idx_16532_ix_events_event_type_time_fired
521  f"DROP INDEX {index_to_drop}",
522  ):
523  if _execute_or_collect_error(session_maker, query, errors):
524  _LOGGER.warning(
525  "Finished dropping index `%s` from table `%s`", index_name, table_name
526  )
527  return
528 
529  if not quiet:
530  _LOGGER.warning(
531  "Failed to drop index `%s` from table `%s`. Schema "
532  "Migration will continue; this is not a "
533  "critical operation: %s",
534  index_name,
535  table_name,
536  errors,
537  )
538 
539 
541  session_maker: Callable[[], Session], table_name: str, columns_def: list[str]
542 ) -> None:
543  """Add columns to a table."""
544  _LOGGER.warning(
545  "Adding columns %s to table %s. %s",
546  ", ".join(column.split(" ")[0] for column in columns_def),
547  table_name,
548  MIGRATION_NOTE_MINUTES,
549  )
550 
551  columns_def = [f"ADD {col_def}" for col_def in columns_def]
552 
553  with session_scope(session=session_maker()) as session:
554  try:
555  connection = session.connection()
556  connection.execute(
557  text(f"ALTER TABLE {table_name} {', '.join(columns_def)}")
558  )
559  except (InternalError, OperationalError, ProgrammingError):
560  # Some engines support adding all columns at once,
561  # this error is when they don't
562  _LOGGER.info("Unable to use quick column add. Adding 1 by 1")
563  else:
564  return
565 
566  for column_def in columns_def:
567  with session_scope(session=session_maker()) as session:
568  try:
569  connection = session.connection()
570  connection.execute(text(f"ALTER TABLE {table_name} {column_def}"))
571  except (InternalError, OperationalError, ProgrammingError) as err:
572  raise_if_exception_missing_str(err, ["already exists", "duplicate"])
573  _LOGGER.warning(
574  "Column %s already exists on %s, continuing",
575  column_def.split(" ")[1],
576  table_name,
577  )
578 
579 
581  session_maker: Callable[[], Session],
582  engine: Engine,
583  table_name: str,
584  columns_def: list[str],
585 ) -> None:
586  """Modify columns in a table."""
587  if engine.dialect.name == SupportedDialect.SQLITE:
588  _LOGGER.debug(
589  (
590  "Skipping to modify columns %s in table %s; "
591  "Modifying column length in SQLite is unnecessary, "
592  "it does not impose any length restrictions"
593  ),
594  ", ".join(column.split(" ")[0] for column in columns_def),
595  table_name,
596  )
597  return
598 
599  _LOGGER.warning(
600  "Modifying columns %s in table %s. %s",
601  ", ".join(column.split(" ")[0] for column in columns_def),
602  table_name,
603  MIGRATION_NOTE_MINUTES,
604  )
605 
606  if engine.dialect.name == SupportedDialect.POSTGRESQL:
607  columns_def = [
608  f"ALTER {column} TYPE {type_}"
609  for column, type_ in (col_def.split(" ", 1) for col_def in columns_def)
610  ]
611  elif engine.dialect.name == "mssql":
612  columns_def = [f"ALTER COLUMN {col_def}" for col_def in columns_def]
613  else:
614  columns_def = [f"MODIFY {col_def}" for col_def in columns_def]
615 
616  with session_scope(session=session_maker()) as session:
617  try:
618  connection = session.connection()
619  connection.execute(
620  text(f"ALTER TABLE {table_name} {', '.join(columns_def)}")
621  )
622  except (InternalError, OperationalError):
623  _LOGGER.info("Unable to use quick column modify. Modifying 1 by 1")
624  else:
625  return
626 
627  for column_def in columns_def:
628  with session_scope(session=session_maker()) as session:
629  try:
630  connection = session.connection()
631  connection.execute(text(f"ALTER TABLE {table_name} {column_def}"))
632  except (InternalError, OperationalError):
633  _LOGGER.exception(
634  "Could not modify column %s in table %s", column_def, table_name
635  )
636  raise
637 
638 
640  session_maker: Callable[[], Session], engine: Engine
641 ) -> None:
642  """Add the options to foreign key constraints.
643 
644  This is not supported for SQLite because it does not support
645  dropping constraints.
646  """
647 
648  if engine.dialect.name not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
649  raise RuntimeError(
650  "_update_states_table_with_foreign_key_options not supported for "
651  f"{engine.dialect.name}"
652  )
653 
654  inspector = sqlalchemy.inspect(engine)
655  tmp_states_table = Table(TABLE_STATES, MetaData())
656  alters = [
657  {
658  "old_fk": ForeignKeyConstraint(
659  (), (), name=foreign_key["name"], table=tmp_states_table
660  ),
661  "columns": foreign_key["constrained_columns"],
662  }
663  for foreign_key in inspector.get_foreign_keys(TABLE_STATES)
664  if foreign_key["name"] # It's not possible to drop an unnamed constraint
665  and (
666  # MySQL/MariaDB will have empty options
667  not foreign_key.get("options")
668  # Postgres will have ondelete set to None
669  or foreign_key.get("options", {}).get("ondelete") is None
670  )
671  ]
672 
673  if not alters:
674  return
675 
676  states_key_constraints = Base.metadata.tables[TABLE_STATES].foreign_key_constraints
677 
678  for alter in alters:
679  with session_scope(session=session_maker()) as session:
680  try:
681  connection = session.connection()
682  connection.execute(DropConstraint(alter["old_fk"])) # type: ignore[no-untyped-call]
683  for fkc in states_key_constraints:
684  if fkc.column_keys == alter["columns"]:
685  # AddConstraint mutates the constraint passed to it, we need to
686  # undo that to avoid changing the behavior of the table schema.
687  # https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748
688  create_rule = fkc._create_rule # noqa: SLF001
689  add_constraint = AddConstraint(fkc) # type: ignore[no-untyped-call]
690  fkc._create_rule = create_rule # noqa: SLF001
691  connection.execute(add_constraint)
692  except (InternalError, OperationalError):
693  _LOGGER.exception(
694  "Could not update foreign options in %s table", TABLE_STATES
695  )
696  raise
697 
698 
700  session_maker: Callable[[], Session], engine: Engine, table: str, column: str
701 ) -> None:
702  """Drop foreign key constraints for a table on specific columns.
703 
704  This is not supported for SQLite because it does not support
705  dropping constraints.
706  """
707 
708  if engine.dialect.name not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
709  raise RuntimeError(
710  f"_drop_foreign_key_constraints not supported for {engine.dialect.name}"
711  )
712 
713  inspector = sqlalchemy.inspect(engine)
714 
715  ## Find matching named constraints and bind the ForeignKeyConstraints to the table
716  tmp_table = Table(table, MetaData())
717  drops = [
718  ForeignKeyConstraint((), (), name=foreign_key["name"], table=tmp_table)
719  for foreign_key in inspector.get_foreign_keys(table)
720  if foreign_key["name"] and foreign_key["constrained_columns"] == [column]
721  ]
722 
723  for drop in drops:
724  with session_scope(session=session_maker()) as session:
725  try:
726  connection = session.connection()
727  connection.execute(DropConstraint(drop)) # type: ignore[no-untyped-call]
728  except (InternalError, OperationalError):
729  _LOGGER.exception(
730  "Could not drop foreign constraints in %s table on %s",
731  TABLE_STATES,
732  column,
733  )
734  raise
735 
736 
738  session_maker: Callable[[], Session],
739  engine: Engine,
740  foreign_columns: list[tuple[str, str, str | None, str | None]],
741 ) -> None:
742  """Restore foreign key constraints."""
743  for table, column, foreign_table, foreign_column in foreign_columns:
744  constraints = Base.metadata.tables[table].foreign_key_constraints
745  for constraint in constraints:
746  if constraint.column_keys == [column]:
747  break
748  else:
749  _LOGGER.info("Did not find a matching constraint for %s.%s", table, column)
750  continue
751 
752  inspector = sqlalchemy.inspect(engine)
753  if any(
754  foreign_key["name"] and foreign_key["constrained_columns"] == [column]
755  for foreign_key in inspector.get_foreign_keys(table)
756  ):
757  _LOGGER.info(
758  "The database already has a matching constraint for %s.%s",
759  table,
760  column,
761  )
762  continue
763 
764  if TYPE_CHECKING:
765  assert foreign_table is not None
766  assert foreign_column is not None
767 
768  # AddConstraint mutates the constraint passed to it, we need to
769  # undo that to avoid changing the behavior of the table schema.
770  # https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/sql/ddl.py#L746-L748
771  create_rule = constraint._create_rule # noqa: SLF001
772  add_constraint = AddConstraint(constraint) # type: ignore[no-untyped-call]
773  constraint._create_rule = create_rule # noqa: SLF001
774  try:
775  _add_constraint(session_maker, add_constraint, table, column)
776  except IntegrityError:
777  _LOGGER.exception(
778  (
779  "Could not update foreign options in %s table, will delete "
780  "violations and try again"
781  ),
782  table,
783  )
785  session_maker, engine, table, column, foreign_table, foreign_column
786  )
787  _add_constraint(session_maker, add_constraint, table, column)
788 
789 
791  session_maker: Callable[[], Session],
792  add_constraint: AddConstraint,
793  table: str,
794  column: str,
795 ) -> None:
796  """Add a foreign key constraint."""
797  _LOGGER.warning(
798  "Adding foreign key constraint to %s.%s. "
799  "Note: this can take several minutes on large databases and slow "
800  "machines. Please be patient!",
801  table,
802  column,
803  )
804  with session_scope(session=session_maker()) as session:
805  try:
806  connection = session.connection()
807  connection.execute(add_constraint)
808  except (InternalError, OperationalError):
809  _LOGGER.exception("Could not update foreign options in %s table", table)
810  raise
811 
812 
814  session_maker: Callable[[], Session],
815  engine: Engine,
816  table: str,
817  column: str,
818  foreign_table: str,
819  foreign_column: str,
820 ) -> None:
821  """Remove rows which violate the constraints."""
822  if engine.dialect.name not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
823  raise RuntimeError(
824  f"_delete_foreign_key_violations not supported for {engine.dialect.name}"
825  )
826 
827  _LOGGER.warning(
828  "Rows in table %s where %s references non existing %s.%s will be %s. "
829  "Note: this can take several minutes on large databases and slow "
830  "machines. Please be patient!",
831  table,
832  column,
833  foreign_table,
834  foreign_column,
835  "set to NULL" if table == foreign_table else "deleted",
836  )
837 
838  result: CursorResult | None = None
839  if table == foreign_table:
840  # In case of a foreign reference to the same table, we set invalid
841  # references to NULL instead of deleting as deleting rows may
842  # cause additional invalid references to be created. This is to handle
843  # old_state_id referencing a missing state.
844  if engine.dialect.name == SupportedDialect.MYSQL:
845  while result is None or result.rowcount > 0:
846  with session_scope(session=session_maker()) as session:
847  # The subquery (SELECT {foreign_column} from {foreign_table}) is
848  # to be compatible with old MySQL versions which do not allow
849  # referencing the table being updated in the WHERE clause.
850  result = session.connection().execute(
851  text(
852  f"UPDATE {table} as t1 " # noqa: S608
853  f"SET {column} = NULL "
854  "WHERE ("
855  f"t1.{column} IS NOT NULL AND "
856  "NOT EXISTS "
857  "(SELECT 1 "
858  f"FROM (SELECT {foreign_column} from {foreign_table}) AS t2 "
859  f"WHERE t2.{foreign_column} = t1.{column})) "
860  "LIMIT 100000;"
861  )
862  )
863  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
864  while result is None or result.rowcount > 0:
865  with session_scope(session=session_maker()) as session:
866  # PostgreSQL does not support LIMIT in UPDATE clauses, so we
867  # update matches from a limited subquery instead.
868  result = session.connection().execute(
869  text(
870  f"UPDATE {table} " # noqa: S608
871  f"SET {column} = NULL "
872  f"WHERE {column} in "
873  f"(SELECT {column} from {table} as t1 "
874  "WHERE ("
875  f"t1.{column} IS NOT NULL AND "
876  "NOT EXISTS "
877  "(SELECT 1 "
878  f"FROM {foreign_table} AS t2 "
879  f"WHERE t2.{foreign_column} = t1.{column})) "
880  "LIMIT 100000);"
881  )
882  )
883  return
884 
885  if engine.dialect.name == SupportedDialect.MYSQL:
886  while result is None or result.rowcount > 0:
887  with session_scope(session=session_maker()) as session:
888  result = session.connection().execute(
889  # We don't use an alias for the table we're deleting from,
890  # support of the form `DELETE FROM table AS t1` was added in
891  # MariaDB 11.6 and is not supported by MySQL. MySQL and older
892  # MariaDB instead support the from `DELETE t1 from table AS t1`
893  # which is undocumented for MariaDB.
894  text(
895  f"DELETE FROM {table} " # noqa: S608
896  "WHERE ("
897  f"{table}.{column} IS NOT NULL AND "
898  "NOT EXISTS "
899  "(SELECT 1 "
900  f"FROM {foreign_table} AS t2 "
901  f"WHERE t2.{foreign_column} = {table}.{column})) "
902  "LIMIT 100000;"
903  )
904  )
905  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
906  while result is None or result.rowcount > 0:
907  with session_scope(session=session_maker()) as session:
908  # PostgreSQL does not support LIMIT in DELETE clauses, so we
909  # delete matches from a limited subquery instead.
910  result = session.connection().execute(
911  text(
912  f"DELETE FROM {table} " # noqa: S608
913  f"WHERE {column} in "
914  f"(SELECT {column} from {table} as t1 "
915  "WHERE ("
916  f"t1.{column} IS NOT NULL AND "
917  "NOT EXISTS "
918  "(SELECT 1 "
919  f"FROM {foreign_table} AS t2 "
920  f"WHERE t2.{foreign_column} = t1.{column})) "
921  "LIMIT 100000);"
922  )
923  )
924 
925 
926 @database_job_retry_wrapper("Apply migration update", 10)
928  instance: Recorder,
929  hass: HomeAssistant,
930  engine: Engine,
931  session_maker: Callable[[], Session],
932  new_version: int,
933  old_version: int,
934 ) -> None:
935  """Perform operations to bring schema up to date."""
936  migrator_cls = _SchemaVersionMigrator.get_migrator(new_version)
937  migrator_cls(instance, hass, engine, session_maker, old_version).apply_update()
938 
939 
941  """Perform operations to bring schema up to date."""
942 
943  __migrators: dict[int, type[_SchemaVersionMigrator]] = {}
944 
945  def __init_subclass__(cls, target_version: int, **kwargs: Any) -> None:
946  """Post initialisation processing."""
947  super().__init_subclass__(**kwargs)
948  if target_version in _SchemaVersionMigrator.__migrators:
949  raise ValueError("Duplicated version")
950  _SchemaVersionMigrator.__migrators[target_version] = cls
951 
952  def __init__(
953  self,
954  instance: Recorder,
955  hass: HomeAssistant,
956  engine: Engine,
957  session_maker: Callable[[], Session],
958  old_version: int,
959  ) -> None:
960  """Initialize."""
961  self.instanceinstance = instance
962  self.hasshass = hass
963  self.engineengine = engine
964  self.session_makersession_maker = session_maker
965  self.old_versionold_version = old_version
966  assert engine.dialect.name is not None, "Dialect name must be set"
967  dialect = try_parse_enum(SupportedDialect, engine.dialect.name)
968  self.column_typescolumn_types = _COLUMN_TYPES_FOR_DIALECT.get(dialect, _SQLITE_COLUMN_TYPES)
969 
970  @classmethod
971  def get_migrator(cls, target_version: int) -> type[_SchemaVersionMigrator]:
972  """Return a migrator for a specific schema version."""
973  try:
974  return cls.__migrators[target_version]
975  except KeyError as err:
976  raise ValueError(
977  f"No migrator for schema version {target_version}"
978  ) from err
979 
980  @final
981  def apply_update(self) -> None:
982  """Perform operations to bring schema up to date."""
983  self._apply_update_apply_update()
984 
985  @abstractmethod
986  def _apply_update(self) -> None:
987  """Version specific update method."""
988 
989 
990 class _SchemaVersion1Migrator(_SchemaVersionMigrator, target_version=1):
991  def _apply_update(self) -> None:
992  """Version specific update method."""
993  # This used to create ix_events_time_fired, but it was removed in version 32
994 
995 
997  def _apply_update(self) -> None:
998  """Version specific update method."""
999  # Create compound start/end index for recorder_runs
1000  _create_index(self.session_makersession_maker, "recorder_runs", "ix_recorder_runs_start_end")
1001  # This used to create ix_states_last_updated bit it was removed in version 32
1002 
1003 
1005  def _apply_update(self) -> None:
1006  """Version specific update method."""
1007  # There used to be a new index here, but it was removed in version 4.
1008 
1009 
1010 class _SchemaVersion4Migrator(_SchemaVersionMigrator, target_version=4):
1011  def _apply_update(self) -> None:
1012  """Version specific update method."""
1013  # Queries were rewritten in this schema release. Most indexes from
1014  # earlier versions of the schema are no longer needed.
1015 
1016  if self.old_versionold_versionold_version == 3:
1017  # Remove index that was added in version 3
1018  _drop_index(self.session_makersession_maker, "states", "ix_states_created_domain")
1019  if self.old_versionold_versionold_version == 2:
1020  # Remove index that was added in version 2
1021  _drop_index(self.session_makersession_maker, "states", "ix_states_entity_id_created")
1022 
1023  # Remove indexes that were added in version 0
1024  _drop_index(self.session_makersession_maker, "states", "states__state_changes")
1025  _drop_index(self.session_makersession_maker, "states", "states__significant_changes")
1026  _drop_index(self.session_makersession_maker, "states", "ix_states_entity_id_created")
1027  # This used to create ix_states_entity_id_last_updated,
1028  # but it was removed in version 32
1029 
1030 
1032  def _apply_update(self) -> None:
1033  """Version specific update method."""
1034  # Create supporting index for States.event_id foreign key
1035  _create_index(self.session_makersession_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
1036 
1037 
1039  def _apply_update(self) -> None:
1040  """Version specific update method."""
1041  _add_columns(
1042  self.session_makersession_maker,
1043  "events",
1044  ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"],
1045  )
1046  _create_index(self.session_makersession_maker, "events", "ix_events_context_id")
1047  # This used to create ix_events_context_user_id,
1048  # but it was removed in version 28
1049  _add_columns(
1050  self.session_makersession_maker,
1051  "states",
1052  ["context_id CHARACTER(36)", "context_user_id CHARACTER(36)"],
1053  )
1054  _create_index(self.session_makersession_maker, "states", "ix_states_context_id")
1055  # This used to create ix_states_context_user_id,
1056  # but it was removed in version 28
1057 
1058 
1060  def _apply_update(self) -> None:
1061  """Version specific update method."""
1062  # There used to be a ix_states_entity_id index here,
1063  # but it was removed in later schema
1064 
1065 
1066 class _SchemaVersion8Migrator(_SchemaVersionMigrator, target_version=8):
1067  def _apply_update(self) -> None:
1068  """Version specific update method."""
1069  _add_columns(self.session_makersession_maker, "events", ["context_parent_id CHARACTER(36)"])
1070  _add_columns(self.session_makersession_maker, "states", ["old_state_id INTEGER"])
1071  # This used to create ix_events_context_parent_id,
1072  # but it was removed in version 28
1073 
1074 
1076  def _apply_update(self) -> None:
1077  """Version specific update method."""
1078  # We now get the context from events with a join
1079  # since its always there on state_changed events
1080  #
1081  # Ideally we would drop the columns from the states
1082  # table as well but sqlite doesn't support that
1083  # and we would have to move to something like
1084  # sqlalchemy alembic to make that work
1085  #
1086  # no longer dropping ix_states_context_id since its recreated in 28
1087  _drop_index(self.session_makersession_maker, "states", "ix_states_context_user_id")
1088  # This index won't be there if they were not running
1089  # nightly but we don't treat that as a critical issue
1090  _drop_index(self.session_makersession_maker, "states", "ix_states_context_parent_id")
1091  # Redundant keys on composite index:
1092  # We already have ix_states_entity_id_last_updated
1093  _drop_index(self.session_makersession_maker, "states", "ix_states_entity_id")
1094  # This used to create ix_events_event_type_time_fired,
1095  # but it was removed in version 32
1096  _drop_index(self.session_makersession_maker, "events", "ix_events_event_type")
1097 
1098 
1100  def _apply_update(self) -> None:
1101  """Version specific update method."""
1102  # Now done in step 11
1103 
1104 
1105 class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11):
1106  def _apply_update(self) -> None:
1107  """Version specific update method."""
1108  _create_index(self.session_makersession_maker, "states", "ix_states_old_state_id")
1109 
1110  # _update_states_table_with_foreign_key_options first drops foreign
1111  # key constraints, and then re-adds them with the correct settings.
1112  # This is not supported by SQLite
1113  if self.engineengine.dialect.name in (
1114  SupportedDialect.MYSQL,
1115  SupportedDialect.POSTGRESQL,
1116  ):
1118  self.session_makersession_maker, self.engineengine
1119  )
1120 
1121 
1123  def _apply_update(self) -> None:
1124  """Version specific update method."""
1125  if self.engineengine.dialect.name == SupportedDialect.MYSQL:
1127  self.session_makersession_maker, self.engineengine, "events", ["event_data LONGTEXT"]
1128  )
1130  self.session_makersession_maker, self.engineengine, "states", ["attributes LONGTEXT"]
1131  )
1132 
1133 
1135  def _apply_update(self) -> None:
1136  """Version specific update method."""
1137  if self.engineengine.dialect.name == SupportedDialect.MYSQL:
1139  self.session_makersession_maker,
1140  self.engineengine,
1141  "events",
1142  ["time_fired DATETIME(6)", "created DATETIME(6)"],
1143  )
1145  self.session_makersession_maker,
1146  self.engineengine,
1147  "states",
1148  [
1149  "last_changed DATETIME(6)",
1150  "last_updated DATETIME(6)",
1151  "created DATETIME(6)",
1152  ],
1153  )
1154 
1155 
1157  def _apply_update(self) -> None:
1158  """Version specific update method."""
1160  self.session_makersession_maker, self.engineengine, "events", ["event_type VARCHAR(64)"]
1161  )
1162 
1163 
1165  def _apply_update(self) -> None:
1166  """Version specific update method."""
1167  # This dropped the statistics table, done again in version 18.
1168 
1169 
1170 class _SchemaVersion16Migrator(_SchemaVersionMigrator, target_version=16):
1171  def _apply_update(self) -> None:
1172  """Version specific update method."""
1173  # Dropping foreign key constraints is not supported by SQLite
1174  if self.engineengine.dialect.name in (
1175  SupportedDialect.MYSQL,
1176  SupportedDialect.POSTGRESQL,
1177  ):
1178  # Version 16 changes settings for the foreign key constraint on
1179  # states.old_state_id. Dropping the constraint is not really correct
1180  # we should have recreated it instead. Recreating the constraint now
1181  # happens in the migration to schema version 47.
1183  self.session_makersession_maker, self.engineengine, TABLE_STATES, "old_state_id"
1184  )
1185 
1186 
1188  def _apply_update(self) -> None:
1189  """Version specific update method."""
1190  # This dropped the statistics table, done again in version 18.
1191 
1192 
1193 class _SchemaVersion18Migrator(_SchemaVersionMigrator, target_version=18):
1194  def _apply_update(self) -> None:
1195  """Version specific update method."""
1196  # Recreate the statistics and statistics meta tables.
1197  #
1198  # Order matters! Statistics and StatisticsShortTerm have a relation with
1199  # StatisticsMeta, so statistics need to be deleted before meta (or in pair
1200  # depending on the SQL backend); and meta needs to be created before statistics.
1201 
1202  # We need to cast __table__ to Table, explanation in
1203  # https://github.com/sqlalchemy/sqlalchemy/issues/9130
1204  Base.metadata.drop_all(
1205  bind=self.engineengine,
1206  tables=[
1207  cast(Table, StatisticsShortTerm.__table__),
1208  cast(Table, Statistics.__table__),
1209  cast(Table, StatisticsMeta.__table__),
1210  ],
1211  )
1212 
1213  cast(Table, StatisticsMeta.__table__).create(self.engineengine)
1214  cast(Table, StatisticsShortTerm.__table__).create(self.engineengine)
1215  cast(Table, Statistics.__table__).create(self.engineengine)
1216 
1217 
1219  def _apply_update(self) -> None:
1220  """Version specific update method."""
1221  # This adds the statistic runs table, insert a fake run to prevent duplicating
1222  # statistics.
1223  with session_scope(session=self.session_makersession_maker()) as session:
1224  session.add(StatisticsRuns(start=get_start_time()))
1225 
1226 
1228  def _apply_update(self) -> None:
1229  """Version specific update method."""
1230  # This changed the precision of statistics from float to double
1231  if self.engineengine.dialect.name in [
1232  SupportedDialect.MYSQL,
1233  SupportedDialect.POSTGRESQL,
1234  ]:
1236  self.session_makersession_maker,
1237  self.engineengine,
1238  "statistics",
1239  [
1240  f"{column} {DOUBLE_PRECISION_TYPE_SQL}"
1241  for column in ("max", "mean", "min", "state", "sum")
1242  ],
1243  )
1244 
1245 
1247  def _apply_update(self) -> None:
1248  """Version specific update method."""
1249  # Try to change the character set of the statistic_meta table
1250  if self.engineengine.dialect.name == SupportedDialect.MYSQL:
1251  for table in ("events", "states", "statistics_meta"):
1253 
1254 
1256  def _apply_update(self) -> None:
1257  """Version specific update method."""
1258  # Recreate the all statistics tables for Oracle DB with Identity columns
1259  #
1260  # Order matters! Statistics has a relation with StatisticsMeta,
1261  # so statistics need to be deleted before meta (or in pair depending
1262  # on the SQL backend); and meta needs to be created before statistics.
1263  if self.engineengine.dialect.name == "oracle":
1264  # We need to cast __table__ to Table, explanation in
1265  # https://github.com/sqlalchemy/sqlalchemy/issues/9130
1266  Base.metadata.drop_all(
1267  bind=self.engineengine,
1268  tables=[
1269  cast(Table, StatisticsShortTerm.__table__),
1270  cast(Table, Statistics.__table__),
1271  cast(Table, StatisticsMeta.__table__),
1272  cast(Table, StatisticsRuns.__table__),
1273  ],
1274  )
1275 
1276  cast(Table, StatisticsRuns.__table__).create(self.engineengine)
1277  cast(Table, StatisticsMeta.__table__).create(self.engineengine)
1278  cast(Table, StatisticsShortTerm.__table__).create(self.engineengine)
1279  cast(Table, Statistics.__table__).create(self.engineengine)
1280 
1281  # Block 5-minute statistics for one hour from the last run, or it will overlap
1282  # with existing hourly statistics. Don't block on a database with no existing
1283  # statistics.
1284  with session_scope(session=self.session_makersession_maker()) as session:
1285  if session.query(Statistics.id).count() and (
1286  last_run_string := session.query(
1287  func.max(StatisticsRuns.start)
1288  ).scalar()
1289  ):
1290  last_run_start_time = process_timestamp(last_run_string)
1291  if last_run_start_time:
1292  fake_start_time = last_run_start_time + timedelta(minutes=5)
1293  while fake_start_time < last_run_start_time + timedelta(hours=1):
1294  session.add(StatisticsRuns(start=fake_start_time))
1295  fake_start_time += timedelta(minutes=5)
1296 
1297  # When querying the database, be careful to only explicitly query for columns
1298  # which were present in schema version 22. If querying the table, SQLAlchemy
1299  # will refer to future columns.
1300  with session_scope(session=self.session_makersession_maker()) as session:
1301  for sum_statistic in session.query(StatisticsMeta.id).filter_by(
1302  has_sum=true()
1303  ):
1304  last_statistic = (
1305  session.query(
1306  Statistics.start,
1307  Statistics.last_reset,
1308  Statistics.state,
1309  Statistics.sum,
1310  )
1311  .filter_by(metadata_id=sum_statistic.id)
1312  .order_by(Statistics.start.desc())
1313  .first()
1314  )
1315  if last_statistic:
1316  session.add(
1318  metadata_id=sum_statistic.id,
1319  start=last_statistic.start,
1320  last_reset=last_statistic.last_reset,
1321  state=last_statistic.state,
1322  sum=last_statistic.sum,
1323  )
1324  )
1325 
1326 
1328  def _apply_update(self) -> None:
1329  """Version specific update method."""
1330  # Add name column to StatisticsMeta
1331  _add_columns(self.session_makersession_maker, "statistics_meta", ["name VARCHAR(255)"])
1332 
1333 
1335  def _apply_update(self) -> None:
1336  """Version specific update method."""
1337  # This used to create the unique indices for start and statistic_id
1338  # but we changed the format in schema 34 which will now take care
1339  # of removing any duplicate if they still exist.
1340 
1341 
1342 class _SchemaVersion25Migrator(_SchemaVersionMigrator, target_version=25):
1343  def _apply_update(self) -> None:
1344  """Version specific update method."""
1345  _add_columns(
1346  self.session_makersession_maker,
1347  "states",
1348  [f"attributes_id {self.column_types.big_int_type}"],
1349  )
1350  _create_index(self.session_makersession_maker, "states", "ix_states_attributes_id")
1351 
1352 
1354  def _apply_update(self) -> None:
1355  """Version specific update method."""
1356  _create_index(self.session_makersession_maker, "statistics_runs", "ix_statistics_runs_start")
1357 
1358 
1360  def _apply_update(self) -> None:
1361  """Version specific update method."""
1362  _add_columns(
1363  self.session_makersession_maker, "events", [f"data_id {self.column_types.big_int_type}"]
1364  )
1365  _create_index(self.session_makersession_maker, "events", "ix_events_data_id")
1366 
1367 
1369  def _apply_update(self) -> None:
1370  """Version specific update method."""
1371  _add_columns(self.session_makersession_maker, "events", ["origin_idx INTEGER"])
1372  # We never use the user_id or parent_id index
1373  _drop_index(self.session_makersession_maker, "events", "ix_events_context_user_id")
1374  _drop_index(self.session_makersession_maker, "events", "ix_events_context_parent_id")
1375  _add_columns(
1376  self.session_makersession_maker,
1377  "states",
1378  [
1379  "origin_idx INTEGER",
1380  "context_id VARCHAR(36)",
1381  "context_user_id VARCHAR(36)",
1382  "context_parent_id VARCHAR(36)",
1383  ],
1384  )
1385  _create_index(self.session_makersession_maker, "states", "ix_states_context_id")
1386  # Once there are no longer any state_changed events
1387  # in the events table we can drop the index on states.event_id
1388 
1389 
1391  def _apply_update(self) -> None:
1392  """Version specific update method."""
1393  # Recreate statistics_meta index to block duplicated statistic_id
1394  _drop_index(
1395  self.session_makersession_maker, "statistics_meta", "ix_statistics_meta_statistic_id"
1396  )
1397  if self.engineengine.dialect.name == SupportedDialect.MYSQL:
1398  # Ensure the row format is dynamic or the index
1399  # unique will be too large
1400  with (
1401  contextlib.suppress(SQLAlchemyError),
1402  session_scope(session=self.session_makersession_maker()) as session,
1403  ):
1404  connection = session.connection()
1405  # This is safe to run multiple times and fast
1406  # since the table is small.
1407  connection.execute(
1408  text("ALTER TABLE statistics_meta ROW_FORMAT=DYNAMIC")
1409  )
1410  try:
1411  _create_index(
1412  self.session_makersession_maker, "statistics_meta", "ix_statistics_meta_statistic_id"
1413  )
1414  except DatabaseError:
1415  # There may be duplicated statistics_meta entries, delete duplicates
1416  # and try again
1417  with session_scope(session=self.session_makersession_maker()) as session:
1418  delete_statistics_meta_duplicates(self.instanceinstance, session)
1419  _create_index(
1420  self.session_makersession_maker, "statistics_meta", "ix_statistics_meta_statistic_id"
1421  )
1422 
1423 
1425  def _apply_update(self) -> None:
1426  """Version specific update method."""
1427  # This added a column to the statistics_meta table, removed again before
1428  # release of HA Core 2022.10.0
1429  # SQLite 3.31.0 does not support dropping columns.
1430  # Once we require SQLite >= 3.35.5, we should drop the column:
1431  # ALTER TABLE statistics_meta DROP COLUMN state_unit_of_measurement
1432 
1433 
1434 class _SchemaVersion31Migrator(_SchemaVersionMigrator, target_version=31):
1435  def _apply_update(self) -> None:
1436  """Version specific update method."""
1437  # Once we require SQLite >= 3.35.5, we should drop the column:
1438  # ALTER TABLE events DROP COLUMN time_fired
1439  # ALTER TABLE states DROP COLUMN last_updated
1440  # ALTER TABLE states DROP COLUMN last_changed
1441  _add_columns(
1442  self.session_makersession_maker,
1443  "events",
1444  [f"time_fired_ts {self.column_types.timestamp_type}"],
1445  )
1446  _add_columns(
1447  self.session_makersession_maker,
1448  "states",
1449  [
1450  f"last_updated_ts {self.column_types.timestamp_type}",
1451  f"last_changed_ts {self.column_types.timestamp_type}",
1452  ],
1453  )
1454  _create_index(self.session_makersession_maker, "events", "ix_events_time_fired_ts")
1455  _create_index(
1456  self.session_makersession_maker, "events", "ix_events_event_type_time_fired_ts"
1457  )
1458  _create_index(
1459  self.session_makersession_maker, "states", "ix_states_entity_id_last_updated_ts"
1460  )
1461  _create_index(self.session_makersession_maker, "states", "ix_states_last_updated_ts")
1462  _migrate_columns_to_timestamp(self.instanceinstance, self.session_makersession_maker, self.engineengine)
1463 
1464 
1466  def _apply_update(self) -> None:
1467  """Version specific update method."""
1468  # Migration is done in two steps to ensure we can start using
1469  # the new columns before we wipe the old ones.
1470  _drop_index(self.session_makersession_maker, "states", "ix_states_entity_id_last_updated")
1471  _drop_index(self.session_makersession_maker, "events", "ix_events_event_type_time_fired")
1472  _drop_index(self.session_makersession_maker, "states", "ix_states_last_updated")
1473  _drop_index(self.session_makersession_maker, "events", "ix_events_time_fired")
1474  with session_scope(session=self.session_makersession_maker()) as session:
1475  # In version 31 we migrated all the time_fired, last_updated, and last_changed
1476  # columns to be timestamps. In version 32 we need to wipe the old columns
1477  # since they are no longer used and take up a significant amount of space.
1478  assert self.instanceinstance.engine is not None, "engine should never be None"
1479  _wipe_old_string_time_columns(self.instanceinstance, self.instanceinstance.engine, session)
1480 
1481 
1483  def _apply_update(self) -> None:
1484  """Version specific update method."""
1485  # This index is no longer used and can cause MySQL to use the wrong index
1486  # when querying the states table.
1487  # https://github.com/home-assistant/core/issues/83787
1488  # There was an index cleanup here but its now done in schema 39
1489 
1490 
1491 class _SchemaVersion34Migrator(_SchemaVersionMigrator, target_version=34):
1492  def _apply_update(self) -> None:
1493  """Version specific update method."""
1494  # Once we require SQLite >= 3.35.5, we should drop the columns:
1495  # ALTER TABLE statistics DROP COLUMN created
1496  # ALTER TABLE statistics DROP COLUMN start
1497  # ALTER TABLE statistics DROP COLUMN last_reset
1498  # ALTER TABLE statistics_short_term DROP COLUMN created
1499  # ALTER TABLE statistics_short_term DROP COLUMN start
1500  # ALTER TABLE statistics_short_term DROP COLUMN last_reset
1501  _add_columns(
1502  self.session_makersession_maker,
1503  "statistics",
1504  [
1505  f"created_ts {self.column_types.timestamp_type}",
1506  f"start_ts {self.column_types.timestamp_type}",
1507  f"last_reset_ts {self.column_types.timestamp_type}",
1508  ],
1509  )
1510  _add_columns(
1511  self.session_makersession_maker,
1512  "statistics_short_term",
1513  [
1514  f"created_ts {self.column_types.timestamp_type}",
1515  f"start_ts {self.column_types.timestamp_type}",
1516  f"last_reset_ts {self.column_types.timestamp_type}",
1517  ],
1518  )
1519  _create_index(self.session_makersession_maker, "statistics", "ix_statistics_start_ts")
1520  _create_index(
1521  self.session_makersession_maker, "statistics", "ix_statistics_statistic_id_start_ts"
1522  )
1523  _create_index(
1524  self.session_makersession_maker,
1525  "statistics_short_term",
1526  "ix_statistics_short_term_start_ts",
1527  )
1528  _create_index(
1529  self.session_makersession_maker,
1530  "statistics_short_term",
1531  "ix_statistics_short_term_statistic_id_start_ts",
1532  )
1534  self.hasshass, self.instanceinstance, self.session_makersession_maker, self.engineengine
1535  )
1536 
1537 
1539  def _apply_update(self) -> None:
1540  """Version specific update method."""
1541  # Migration is done in two steps to ensure we can start using
1542  # the new columns before we wipe the old ones.
1543  _drop_index(
1544  self.session_makersession_maker,
1545  "statistics",
1546  "ix_statistics_statistic_id_start",
1547  quiet=True,
1548  )
1549  _drop_index(
1550  self.session_makersession_maker,
1551  "statistics_short_term",
1552  "ix_statistics_short_term_statistic_id_start",
1553  quiet=True,
1554  )
1555  # ix_statistics_start and ix_statistics_statistic_id_start are still used
1556  # for the post migration cleanup and can be removed in a future version.
1557 
1558  # In version 34 we migrated all the created, start, and last_reset
1559  # columns to be timestamps. In version 35 we need to wipe the old columns
1560  # since they are no longer used and take up a significant amount of space.
1561  while not cleanup_statistics_timestamp_migration(self.instanceinstance):
1562  pass
1563 
1564 
1566  def _apply_update(self) -> None:
1567  """Version specific update method."""
1568  for table in ("states", "events"):
1569  _add_columns(
1570  self.session_makersession_maker,
1571  table,
1572  [
1573  f"context_id_bin {self.column_types.context_bin_type}",
1574  f"context_user_id_bin {self.column_types.context_bin_type}",
1575  f"context_parent_id_bin {self.column_types.context_bin_type}",
1576  ],
1577  )
1578  _create_index(self.session_makersession_maker, "events", "ix_events_context_id_bin")
1579  _create_index(self.session_makersession_maker, "states", "ix_states_context_id_bin")
1580 
1581 
1583  def _apply_update(self) -> None:
1584  """Version specific update method."""
1585  _add_columns(
1586  self.session_makersession_maker,
1587  "events",
1588  [f"event_type_id {self.column_types.big_int_type}"],
1589  )
1590  _create_index(self.session_makersession_maker, "events", "ix_events_event_type_id")
1591  _drop_index(self.session_makersession_maker, "events", "ix_events_event_type_time_fired_ts")
1592  _create_index(
1593  self.session_makersession_maker, "events", "ix_events_event_type_id_time_fired_ts"
1594  )
1595 
1596 
1598  def _apply_update(self) -> None:
1599  """Version specific update method."""
1600  _add_columns(
1601  self.session_makersession_maker,
1602  "states",
1603  [f"metadata_id {self.column_types.big_int_type}"],
1604  )
1605  _create_index(self.session_makersession_maker, "states", "ix_states_metadata_id")
1606  _create_index(
1607  self.session_makersession_maker, "states", "ix_states_metadata_id_last_updated_ts"
1608  )
1609 
1610 
1612  def _apply_update(self) -> None:
1613  """Version specific update method."""
1614  # Dropping indexes with PostgreSQL never worked correctly if there was a prefix
1615  # so we need to cleanup leftover indexes.
1616  _drop_index(
1617  self.session_makersession_maker,
1618  "events",
1619  "ix_events_event_type_time_fired_ts",
1620  quiet=True,
1621  )
1622  _drop_index(self.session_makersession_maker, "events", "ix_events_event_type", quiet=True)
1623  _drop_index(
1624  self.session_makersession_maker, "events", "ix_events_event_type_time_fired", quiet=True
1625  )
1626  _drop_index(self.session_makersession_maker, "events", "ix_events_time_fired", quiet=True)
1627  _drop_index(
1628  self.session_makersession_maker, "events", "ix_events_context_user_id", quiet=True
1629  )
1630  _drop_index(
1631  self.session_makersession_maker, "events", "ix_events_context_parent_id", quiet=True
1632  )
1633  _drop_index(
1634  self.session_makersession_maker, "states", "ix_states_entity_id_last_updated", quiet=True
1635  )
1636  _drop_index(self.session_makersession_maker, "states", "ix_states_last_updated", quiet=True)
1637  _drop_index(self.session_makersession_maker, "states", "ix_states_entity_id", quiet=True)
1638  _drop_index(
1639  self.session_makersession_maker, "states", "ix_states_context_user_id", quiet=True
1640  )
1641  _drop_index(
1642  self.session_makersession_maker, "states", "ix_states_context_parent_id", quiet=True
1643  )
1644  _drop_index(
1645  self.session_makersession_maker, "states", "ix_states_created_domain", quiet=True
1646  )
1647  _drop_index(
1648  self.session_makersession_maker, "states", "ix_states_entity_id_created", quiet=True
1649  )
1650  _drop_index(self.session_makersession_maker, "states", "states__state_changes", quiet=True)
1651  _drop_index(
1652  self.session_makersession_maker, "states", "states__significant_changes", quiet=True
1653  )
1654  _drop_index(
1655  self.session_makersession_maker, "states", "ix_states_entity_id_created", quiet=True
1656  )
1657  _drop_index(
1658  self.session_makersession_maker,
1659  "statistics",
1660  "ix_statistics_statistic_id_start",
1661  quiet=True,
1662  )
1663  _drop_index(
1664  self.session_makersession_maker,
1665  "statistics_short_term",
1666  "ix_statistics_short_term_statistic_id_start",
1667  quiet=True,
1668  )
1669 
1670 
1672  def _apply_update(self) -> None:
1673  """Version specific update method."""
1674  # ix_events_event_type_id is a left-prefix of ix_events_event_type_id_time_fired_ts
1675  _drop_index(self.session_makersession_maker, "events", "ix_events_event_type_id")
1676  # ix_states_metadata_id is a left-prefix of ix_states_metadata_id_last_updated_ts
1677  _drop_index(self.session_makersession_maker, "states", "ix_states_metadata_id")
1678  # ix_statistics_metadata_id is a left-prefix of ix_statistics_statistic_id_start_ts
1679  _drop_index(self.session_makersession_maker, "statistics", "ix_statistics_metadata_id")
1680  # ix_statistics_short_term_metadata_id is a left-prefix of ix_statistics_short_term_statistic_id_start_ts
1681  _drop_index(
1682  self.session_makersession_maker,
1683  "statistics_short_term",
1684  "ix_statistics_short_term_metadata_id",
1685  )
1686 
1687 
1689  def _apply_update(self) -> None:
1690  """Version specific update method."""
1691  _create_index(self.session_makersession_maker, "event_types", "ix_event_types_event_type")
1692  _create_index(self.session_makersession_maker, "states_meta", "ix_states_meta_entity_id")
1693 
1694 
1696  def _apply_update(self) -> None:
1697  """Version specific update method."""
1698  # If the user had a previously failed migration, or they
1699  # downgraded from 2023.3.x to an older version we will have
1700  # unmigrated statistics columns so we want to clean this up
1701  # one last time since compiling the statistics will be slow
1702  # or fail if we have unmigrated statistics.
1704  self.hasshass, self.instanceinstance, self.session_makersession_maker, self.engineengine
1705  )
1706 
1707 
1709  def _apply_update(self) -> None:
1710  """Version specific update method."""
1711  _add_columns(
1712  self.session_makersession_maker,
1713  "states",
1714  [f"last_reported_ts {self.column_types.timestamp_type}"],
1715  )
1716 
1717 
1719  def _apply_update(self) -> None:
1720  """Version specific update method."""
1721  # The changes in this version are identical to the changes in version
1722  # 46. We apply the same changes again because the migration code previously
1723  # swallowed errors which caused some users' databases to end up in an
1724  # undefined state after the migration.
1725 
1726 
1727 class _SchemaVersion45Migrator(_SchemaVersionMigrator, target_version=45):
1728  def _apply_update(self) -> None:
1729  """Version specific update method."""
1730  # The changes in this version are identical to the changes in version
1731  # 47. We apply the same changes again because the migration code previously
1732  # swallowed errors which caused some users' databases to end up in an
1733  # undefined state after the migration.
1734 
1735 
1736 FOREIGN_COLUMNS = (
1737  (
1738  "events",
1739  ("data_id", "event_type_id"),
1740  (
1741  ("data_id", "event_data", "data_id"),
1742  ("event_type_id", "event_types", "event_type_id"),
1743  ),
1744  ),
1745  (
1746  "states",
1747  ("event_id", "old_state_id", "attributes_id", "metadata_id"),
1748  (
1749  ("event_id", None, None),
1750  ("old_state_id", "states", "state_id"),
1751  ("attributes_id", "state_attributes", "attributes_id"),
1752  ("metadata_id", "states_meta", "metadata_id"),
1753  ),
1754  ),
1755  (
1756  "statistics",
1757  ("metadata_id",),
1758  (("metadata_id", "statistics_meta", "id"),),
1759  ),
1760  (
1761  "statistics_short_term",
1762  ("metadata_id",),
1763  (("metadata_id", "statistics_meta", "id"),),
1764  ),
1765 )
1766 
1767 
1769  def _apply_update(self) -> None:
1770  """Version specific update method."""
1771  # We skip this step for SQLITE, it doesn't have differently sized integers
1772  if self.engineengine.dialect.name == SupportedDialect.SQLITE:
1773  return
1774  identity_sql = (
1775  "NOT NULL AUTO_INCREMENT"
1776  if self.engineengine.dialect.name == SupportedDialect.MYSQL
1777  else ""
1778  )
1779  # First drop foreign key constraints
1780  for table, columns, _ in FOREIGN_COLUMNS:
1781  for column in columns:
1783  self.session_makersession_maker, self.engineengine, table, column
1784  )
1785 
1786  # Then modify the constrained columns
1787  for table, columns, _ in FOREIGN_COLUMNS:
1789  self.session_makersession_maker,
1790  self.engineengine,
1791  table,
1792  [f"{column} {BIG_INTEGER_SQL}" for column in columns],
1793  )
1794 
1795  # Then modify the ID columns
1796  id_columns = (
1797  ("events", "event_id"),
1798  ("event_data", "data_id"),
1799  ("event_types", "event_type_id"),
1800  ("states", "state_id"),
1801  ("state_attributes", "attributes_id"),
1802  ("states_meta", "metadata_id"),
1803  ("statistics", "id"),
1804  ("statistics_short_term", "id"),
1805  ("statistics_meta", "id"),
1806  ("recorder_runs", "run_id"),
1807  ("schema_changes", "change_id"),
1808  ("statistics_runs", "run_id"),
1809  )
1810  for table, column in id_columns:
1812  self.session_makersession_maker,
1813  self.engineengine,
1814  table,
1815  [f"{column} {BIG_INTEGER_SQL} {identity_sql}"],
1816  )
1817 
1818 
1820  def _apply_update(self) -> None:
1821  """Version specific update method."""
1822  # We skip this step for SQLITE, it doesn't have differently sized integers
1823  if self.engineengine.dialect.name == SupportedDialect.SQLITE:
1824  return
1825 
1826  # Restore constraints dropped in migration to schema version 46
1828  self.session_makersession_maker,
1829  self.engineengine,
1830  [
1831  (table, column, foreign_table, foreign_column)
1832  for table, _, foreign_mappings in FOREIGN_COLUMNS
1833  for column, foreign_table, foreign_column in foreign_mappings
1834  ],
1835  )
1836 
1837 
1839  hass: HomeAssistant,
1840  instance: Recorder,
1841  session_maker: Callable[[], Session],
1842  engine: Engine,
1843 ) -> None:
1844  """Migrate statistics columns to timestamp or cleanup duplicates."""
1845  try:
1846  _migrate_statistics_columns_to_timestamp(instance, session_maker, engine)
1847  except IntegrityError as ex:
1848  _LOGGER.error(
1849  "Statistics table contains duplicate entries: %s; "
1850  "Cleaning up duplicates and trying again; %s",
1851  ex,
1852  MIGRATION_NOTE_WHILE,
1853  )
1854  # There may be duplicated statistics entries, delete duplicates
1855  # and try again
1856  with session_scope(session=session_maker()) as session:
1857  delete_statistics_duplicates(instance, hass, session)
1858  try:
1859  _migrate_statistics_columns_to_timestamp(instance, session_maker, engine)
1860  except IntegrityError:
1861  _LOGGER.warning(
1862  "Statistics table still contains duplicate entries after cleanup; "
1863  "Falling back to a one by one migration"
1864  )
1866  # Log at error level to ensure the user sees this message in the log
1867  # since we logged the error above.
1868  _LOGGER.error(
1869  "Statistics migration successfully recovered after statistics table duplicate cleanup"
1870  )
1871 
1872 
1874  table: str,
1875  session_maker: Callable[[], Session],
1876 ) -> None:
1877  """Correct issues detected by validate_db_schema."""
1878  # Attempt to convert the table to utf8mb4
1879  _LOGGER.warning(
1880  "Updating character set and collation of table %s to utf8mb4. %s",
1881  table,
1882  MIGRATION_NOTE_MINUTES,
1883  )
1884  with (
1885  contextlib.suppress(SQLAlchemyError),
1886  session_scope(session=session_maker()) as session,
1887  ):
1888  connection = session.connection()
1889  connection.execute(
1890  # Using LOCK=EXCLUSIVE to prevent the database from corrupting
1891  # https://github.com/home-assistant/core/issues/56104
1892  text(
1893  f"ALTER TABLE {table} CONVERT TO CHARACTER SET "
1894  f"{MYSQL_DEFAULT_CHARSET} "
1895  f"COLLATE {MYSQL_COLLATE}, LOCK=EXCLUSIVE"
1896  )
1897  )
1898 
1899 
1900 @database_job_retry_wrapper("Wipe old string time columns", 3)
1902  instance: Recorder, engine: Engine, session: Session
1903 ) -> None:
1904  """Wipe old string time columns to save space."""
1905  # Wipe Events.time_fired since its been replaced by Events.time_fired_ts
1906  # Wipe States.last_updated since its been replaced by States.last_updated_ts
1907  # Wipe States.last_changed since its been replaced by States.last_changed_ts
1908  #
1909  if engine.dialect.name == SupportedDialect.SQLITE:
1910  session.execute(text("UPDATE events set time_fired=NULL;"))
1911  session.commit()
1912  session.execute(text("UPDATE states set last_updated=NULL, last_changed=NULL;"))
1913  session.commit()
1914  elif engine.dialect.name == SupportedDialect.MYSQL:
1915  #
1916  # Since this is only to save space we limit the number of rows we update
1917  # to 100,000 per table since we do not want to block the database for too long
1918  # or run out of innodb_buffer_pool_size on MySQL. The old data will eventually
1919  # be cleaned up by the recorder purge if we do not do it now.
1920  #
1921  session.execute(text("UPDATE events set time_fired=NULL LIMIT 100000;"))
1922  session.commit()
1923  session.execute(
1924  text(
1925  "UPDATE states set last_updated=NULL, last_changed=NULL "
1926  " LIMIT 100000;"
1927  )
1928  )
1929  session.commit()
1930  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
1931  #
1932  # Since this is only to save space we limit the number of rows we update
1933  # to 100,000 per table since we do not want to block the database for too long
1934  # or run out ram with postgresql. The old data will eventually
1935  # be cleaned up by the recorder purge if we do not do it now.
1936  #
1937  session.execute(
1938  text(
1939  "UPDATE events set time_fired=NULL "
1940  "where event_id in "
1941  "(select event_id from events where time_fired_ts is NOT NULL LIMIT 100000);"
1942  )
1943  )
1944  session.commit()
1945  session.execute(
1946  text(
1947  "UPDATE states set last_updated=NULL, last_changed=NULL "
1948  "where state_id in "
1949  "(select state_id from states where last_updated_ts is NOT NULL LIMIT 100000);"
1950  )
1951  )
1952  session.commit()
1953 
1954 
1955 @database_job_retry_wrapper("Migrate columns to timestamp", 3)
1957  instance: Recorder, session_maker: Callable[[], Session], engine: Engine
1958 ) -> None:
1959  """Migrate columns to use timestamp."""
1960  # Migrate all data in Events.time_fired to Events.time_fired_ts
1961  # Migrate all data in States.last_updated to States.last_updated_ts
1962  # Migrate all data in States.last_changed to States.last_changed_ts
1963  result: CursorResult | None = None
1964  if engine.dialect.name == SupportedDialect.SQLITE:
1965  # With SQLite we do this in one go since it is faster
1966  with session_scope(session=session_maker()) as session:
1967  connection = session.connection()
1968  connection.execute(
1969  text(
1970  'UPDATE events set time_fired_ts=strftime("%s",time_fired) + '
1971  "cast(substr(time_fired,-7) AS FLOAT);"
1972  )
1973  )
1974  connection.execute(
1975  text(
1976  'UPDATE states set last_updated_ts=strftime("%s",last_updated) + '
1977  "cast(substr(last_updated,-7) AS FLOAT), "
1978  'last_changed_ts=strftime("%s",last_changed) + '
1979  "cast(substr(last_changed,-7) AS FLOAT);"
1980  )
1981  )
1982  elif engine.dialect.name == SupportedDialect.MYSQL:
1983  # With MySQL we do this in chunks to avoid hitting the `innodb_buffer_pool_size` limit
1984  # We also need to do this in a loop since we can't be sure that we have
1985  # updated all rows in the table until the rowcount is 0
1986  while result is None or result.rowcount > 0:
1987  with session_scope(session=session_maker()) as session:
1988  result = session.connection().execute(
1989  text(
1990  "UPDATE events set time_fired_ts="
1991  "IF(time_fired is NULL or UNIX_TIMESTAMP(time_fired) is NULL,0,"
1992  "UNIX_TIMESTAMP(time_fired)"
1993  ") "
1994  "where time_fired_ts is NULL "
1995  "LIMIT 100000;"
1996  )
1997  )
1998  result = None
1999  while result is None or result.rowcount > 0: # type: ignore[unreachable]
2000  with session_scope(session=session_maker()) as session:
2001  result = session.connection().execute(
2002  text(
2003  "UPDATE states set last_updated_ts="
2004  "IF(last_updated is NULL or UNIX_TIMESTAMP(last_updated) is NULL,0,"
2005  "UNIX_TIMESTAMP(last_updated) "
2006  "), "
2007  "last_changed_ts="
2008  "UNIX_TIMESTAMP(last_changed) "
2009  "where last_updated_ts is NULL "
2010  "LIMIT 100000;"
2011  )
2012  )
2013  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2014  # With Postgresql we do this in chunks to avoid using too much memory
2015  # We also need to do this in a loop since we can't be sure that we have
2016  # updated all rows in the table until the rowcount is 0
2017  while result is None or result.rowcount > 0:
2018  with session_scope(session=session_maker()) as session:
2019  result = session.connection().execute(
2020  text(
2021  "UPDATE events SET "
2022  "time_fired_ts= "
2023  "(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired::timestamptz) end) "
2024  "WHERE event_id IN ( "
2025  "SELECT event_id FROM events where time_fired_ts is NULL LIMIT 100000 "
2026  " );"
2027  )
2028  )
2029  result = None
2030  while result is None or result.rowcount > 0: # type: ignore[unreachable]
2031  with session_scope(session=session_maker()) as session:
2032  result = session.connection().execute(
2033  text(
2034  "UPDATE states set last_updated_ts="
2035  "(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated::timestamptz) end), "
2036  "last_changed_ts=EXTRACT(EPOCH FROM last_changed::timestamptz) "
2037  "where state_id IN ( "
2038  "SELECT state_id FROM states where last_updated_ts is NULL LIMIT 100000 "
2039  " );"
2040  )
2041  )
2042 
2043 
2044 @database_job_retry_wrapper("Migrate statistics columns to timestamp one by one", 3)
2046  instance: Recorder, session_maker: Callable[[], Session]
2047 ) -> None:
2048  """Migrate statistics columns to use timestamp on by one.
2049 
2050  If something manually inserted data into the statistics table
2051  in the past it may have inserted duplicate rows.
2052 
2053  Before we had the unique index on (statistic_id, start) this
2054  the data could have been inserted without any errors and we
2055  could end up with duplicate rows that go undetected (even by
2056  our current duplicate cleanup code) until we try to migrate the
2057  data to use timestamps.
2058 
2059  This will migrate the data one by one to ensure we do not hit any
2060  duplicate rows, and remove the duplicate rows as they are found.
2061  """
2062  for find_func, migrate_func, delete_func in (
2063  (
2064  find_unmigrated_statistics_rows,
2065  migrate_single_statistics_row_to_timestamp,
2066  delete_duplicate_statistics_row,
2067  ),
2068  (
2069  find_unmigrated_short_term_statistics_rows,
2070  migrate_single_short_term_statistics_row_to_timestamp,
2071  delete_duplicate_short_term_statistics_row,
2072  ),
2073  ):
2074  with session_scope(session=session_maker()) as session:
2075  while stats := session.execute(find_func(instance.max_bind_vars)).all():
2076  for statistic_id, start, created, last_reset in stats:
2078  created_ts = datetime_to_timestamp_or_none(
2079  process_timestamp(created)
2080  )
2081  last_reset_ts = datetime_to_timestamp_or_none(
2082  process_timestamp(last_reset)
2083  )
2084  try:
2085  session.execute(
2086  migrate_func(
2087  statistic_id, start_ts, created_ts, last_reset_ts
2088  )
2089  )
2090  except IntegrityError:
2091  # This can happen if we have duplicate rows
2092  # in the statistics table.
2093  session.execute(delete_func(statistic_id))
2094  session.commit()
2095 
2096 
2097 @database_job_retry_wrapper("Migrate statistics columns to timestamp", 3)
2099  instance: Recorder, session_maker: Callable[[], Session], engine: Engine
2100 ) -> None:
2101  """Migrate statistics columns to use timestamp."""
2102  # Migrate all data in statistics.start to statistics.start_ts
2103  # Migrate all data in statistics.created to statistics.created_ts
2104  # Migrate all data in statistics.last_reset to statistics.last_reset_ts
2105  # Migrate all data in statistics_short_term.start to statistics_short_term.start_ts
2106  # Migrate all data in statistics_short_term.created to statistics_short_term.created_ts
2107  # Migrate all data in statistics_short_term.last_reset to statistics_short_term.last_reset_ts
2108  result: CursorResult | None = None
2109  if engine.dialect.name == SupportedDialect.SQLITE:
2110  # With SQLite we do this in one go since it is faster
2111  for table in STATISTICS_TABLES:
2112  with session_scope(session=session_maker()) as session:
2113  session.connection().execute(
2114  text(
2115  f"UPDATE {table} set start_ts=strftime('%s',start) + " # noqa: S608
2116  "cast(substr(start,-7) AS FLOAT), "
2117  f"created_ts=strftime('%s',created) + "
2118  "cast(substr(created,-7) AS FLOAT), "
2119  f"last_reset_ts=strftime('%s',last_reset) + "
2120  "cast(substr(last_reset,-7) AS FLOAT) where start_ts is NULL;"
2121  )
2122  )
2123  elif engine.dialect.name == SupportedDialect.MYSQL:
2124  # With MySQL we do this in chunks to avoid hitting the `innodb_buffer_pool_size` limit
2125  # We also need to do this in a loop since we can't be sure that we have
2126  # updated all rows in the table until the rowcount is 0
2127  for table in STATISTICS_TABLES:
2128  result = None
2129  while result is None or result.rowcount > 0: # type: ignore[unreachable]
2130  with session_scope(session=session_maker()) as session:
2131  result = session.connection().execute(
2132  text(
2133  f"UPDATE {table} set start_ts=" # noqa: S608
2134  "IF(start is NULL or UNIX_TIMESTAMP(start) is NULL,0,"
2135  "UNIX_TIMESTAMP(start) "
2136  "), "
2137  "created_ts="
2138  "UNIX_TIMESTAMP(created), "
2139  "last_reset_ts="
2140  "UNIX_TIMESTAMP(last_reset) "
2141  "where start_ts is NULL "
2142  "LIMIT 100000;"
2143  )
2144  )
2145  elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2146  # With Postgresql we do this in chunks to avoid using too much memory
2147  # We also need to do this in a loop since we can't be sure that we have
2148  # updated all rows in the table until the rowcount is 0
2149  for table in STATISTICS_TABLES:
2150  result = None
2151  while result is None or result.rowcount > 0: # type: ignore[unreachable]
2152  with session_scope(session=session_maker()) as session:
2153  result = session.connection().execute(
2154  text(
2155  f"UPDATE {table} set start_ts=" # noqa: S608
2156  "(case when start is NULL then 0 else EXTRACT(EPOCH FROM start::timestamptz) end), "
2157  "created_ts=EXTRACT(EPOCH FROM created::timestamptz), "
2158  "last_reset_ts=EXTRACT(EPOCH FROM last_reset::timestamptz) "
2159  "where id IN ("
2160  f"SELECT id FROM {table} where start_ts is NULL LIMIT 100000"
2161  ");"
2162  )
2163  )
2164 
2165 
2166 def _context_id_to_bytes(context_id: str | None) -> bytes | None:
2167  """Convert a context_id to bytes."""
2168  if context_id is None:
2169  return None
2170  with contextlib.suppress(ValueError):
2171  # There may be garbage in the context_id column
2172  # from custom integrations that are not UUIDs or
2173  # ULIDs that filled the column to the max length
2174  # so we need to catch the ValueError and return
2175  # None if it happens
2176  if len(context_id) == 26:
2177  return ulid_to_bytes(context_id)
2178  return UUID(context_id).bytes
2179  return None
2180 
2181 
2182 def _generate_ulid_bytes_at_time(timestamp: float | None) -> bytes:
2183  """Generate a ulid with a specific timestamp."""
2184  return ulid_to_bytes(ulid_at_time(timestamp or time()))
2185 
2186 
2187 def post_migrate_entity_ids(instance: Recorder) -> bool:
2188  """Remove old entity_id strings from states.
2189 
2190  We cannot do this in migrate_entity_ids since the history queries
2191  still need to work while the migration is in progress.
2192  """
2193  session_maker = instance.get_session
2194  _LOGGER.debug("Cleanup legacy entity_ids")
2195  with session_scope(session=session_maker()) as session:
2196  cursor_result = session.connection().execute(batch_cleanup_entity_ids())
2197  is_done = not cursor_result or cursor_result.rowcount == 0
2198  # If there is more work to do return False
2199  # so that we can be called again
2200 
2201  _LOGGER.debug("Cleanup legacy entity_ids done=%s", is_done)
2202  return is_done
2203 
2204 
2205 def _initialize_database(session: Session) -> bool:
2206  """Initialize a new database.
2207 
2208  The function determines the schema version by inspecting the db structure.
2209 
2210  When the schema version is not present in the db, either db was just
2211  created with the correct schema, or this is a db created before schema
2212  versions were tracked. For now, we'll test if the changes for schema
2213  version 1 are present to make the determination. Eventually this logic
2214  can be removed and we can assume a new db is being created.
2215  """
2216  inspector = sqlalchemy.inspect(session.connection())
2217  indexes = inspector.get_indexes("events")
2218 
2219  for index in indexes:
2220  if index["column_names"] in (["time_fired"], ["time_fired_ts"]):
2221  # Schema addition from version 1 detected. New DB.
2222  session.add(StatisticsRuns(start=get_start_time()))
2223  session.add(SchemaChanges(schema_version=SCHEMA_VERSION))
2224  return True
2225 
2226  # Version 1 schema changes not found, this db needs to be migrated.
2227  current_version = SchemaChanges(schema_version=0)
2228  session.add(current_version)
2229  return True
2230 
2231 
2232 def initialize_database(session_maker: Callable[[], Session]) -> bool:
2233  """Initialize a new database."""
2234  try:
2235  with session_scope(session=session_maker(), read_only=True) as session:
2236  if _get_schema_version(session) is not None:
2237  return True
2238 
2239  with session_scope(session=session_maker()) as session:
2240  return _initialize_database(session)
2241 
2242  except Exception:
2243  _LOGGER.exception("Error when initialise database")
2244  return False
2245 
2246 
2247 @dataclass(slots=True)
2249  """Base class for migration tasks."""
2250 
2251  migrator: BaseRunTimeMigration
2252  commit_before = False
2253 
2254  def run(self, instance: Recorder) -> None:
2255  """Run migration task."""
2256  if not self.migrator.migrate_data(instance):
2257  # Schedule a new migration task if this one didn't finish
2258  instance.queue_task(MigrationTask(self.migrator))
2259 
2260 
2261 @dataclass(slots=True)
2263  """Base class for migration tasks which commit first."""
2264 
2265  commit_before = True
2266 
2267 
2268 @dataclass(frozen=True, kw_only=True)
2270  """Container for data migrator status."""
2271 
2272  needs_migrate: bool
2273  migration_done: bool
2274 
2275 
2276 class BaseMigration(ABC):
2277  """Base class for migrations."""
2278 
2279  index_to_drop: tuple[str, str] | None = None
2280  required_schema_version = 0
2281  migration_version = 1
2282  migration_id: str
2283 
2284  def __init__(self, schema_version: int, migration_changes: dict[str, int]) -> None:
2285  """Initialize a new BaseRunTimeMigration."""
2286  self.schema_versionschema_version = schema_version
2287  self.migration_changesmigration_changes = migration_changes
2288 
2289  @abstractmethod
2290  def migrate_data(self, instance: Recorder) -> bool:
2291  """Migrate some data, return True if migration is completed."""
2292 
2293  def _migrate_data(self, instance: Recorder) -> bool:
2294  """Migrate some data, returns True if migration is completed."""
2295  status = self.migrate_data_implmigrate_data_impl(instance)
2296  if status.migration_done:
2297  if self.index_to_drop is not None:
2298  table, index = self.index_to_drop
2299  _drop_index(instance.get_session, table, index)
2300  with session_scope(session=instance.get_session()) as session:
2301  self.migration_donemigration_done(instance, session)
2302  _mark_migration_done(session, self.__class__)
2303  return not status.needs_migrate
2304 
2305  @abstractmethod
2306  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2307  """Migrate some data, return if the migration needs to run and if it is done."""
2308 
2309  def migration_done(self, instance: Recorder, session: Session) -> None:
2310  """Will be called after migrate returns True or if migration is not needed."""
2311 
2312  @abstractmethod
2314  self, instance: Recorder, session: Session
2315  ) -> DataMigrationStatus:
2316  """Return if the migration needs to run and if it is done."""
2317 
2318  def needs_migrate(self, instance: Recorder, session: Session) -> bool:
2319  """Return if the migration needs to run.
2320 
2321  If the migration needs to run, it will return True.
2322 
2323  If the migration does not need to run, it will return False and
2324  mark the migration as done in the database if its not already
2325  marked as done.
2326  """
2327  if self.schema_versionschema_version < self.required_schema_versionrequired_schema_version:
2328  # Schema is too old, we must have to migrate
2329  return True
2330  if self.migration_changesmigration_changes.get(self.migration_id, -1) >= self.migration_versionmigration_version:
2331  # The migration changes table indicates that the migration has been done
2332  return False
2333  # We do not know if the migration is done from the
2334  # migration changes table so we must check the index and data
2335  # This is the slow path
2336  if (
2337  self.index_to_drop is not None
2338  and get_index_by_name(session, self.index_to_drop[0], self.index_to_drop[1])
2339  is not None
2340  ):
2341  return True
2342  needs_migrate = self.needs_migrate_implneeds_migrate_impl(instance, session)
2343  if needs_migrate.migration_done:
2344  _mark_migration_done(session, self.__class__)
2345  return needs_migrate.needs_migrate
2346 
2347 
2349  """Base class for off line migrations."""
2350 
2352  self, instance: Recorder, session_maker: Callable[[], Session]
2353  ) -> None:
2354  """Migrate all data."""
2355  with session_scope(session=session_maker()) as session:
2356  if not self.needs_migrateneeds_migrate(instance, session):
2357  self.migration_donemigration_done(instance, session)
2358  return
2359  while not self.migrate_datamigrate_datamigrate_data(instance):
2360  pass
2361 
2362  @database_job_retry_wrapper_method("migrate data", 10)
2363  def migrate_data(self, instance: Recorder) -> bool:
2364  """Migrate some data, returns True if migration is completed."""
2365  return self._migrate_data_migrate_data(instance)
2366 
2367 
2369  """Base class for run time migrations."""
2370 
2371  task = MigrationTask
2372 
2373  def queue_migration(self, instance: Recorder, session: Session) -> None:
2374  """Start migration if needed."""
2375  if self.needs_migrateneeds_migrate(instance, session):
2376  instance.queue_task(self.tasktask(self))
2377  else:
2378  self.migration_donemigration_done(instance, session)
2379 
2380  @retryable_database_job_method("migrate data")
2381  def migrate_data(self, instance: Recorder) -> bool:
2382  """Migrate some data, returns True if migration is completed."""
2383  return self._migrate_data_migrate_data(instance)
2384 
2385 
2387  """Base class for run time migrations."""
2388 
2389  @abstractmethod
2390  def needs_migrate_query(self) -> StatementLambdaElement:
2391  """Return the query to check if the migration needs to run."""
2392 
2394  self, instance: Recorder, session: Session
2395  ) -> DataMigrationStatus:
2396  """Return if the migration needs to run."""
2397  needs_migrate = execute_stmt_lambda_element(session, self.needs_migrate_query())
2398  return DataMigrationStatus(
2399  needs_migrate=bool(needs_migrate), migration_done=not needs_migrate
2400  )
2401 
2402 
2404  """Migration to migrate states context_ids to binary format."""
2405 
2406  required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
2407  migration_id = "state_context_id_as_binary"
2408  migration_version = 2
2409  index_to_drop = ("states", "ix_states_context_id")
2410 
2411  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2412  """Migrate states context_ids to use binary format, return True if completed."""
2413  _to_bytes = _context_id_to_bytes
2414  session_maker = instance.get_session
2415  _LOGGER.debug("Migrating states context_ids to binary format")
2416  with session_scope(session=session_maker()) as session:
2417  if states := session.execute(
2418  find_states_context_ids_to_migrate(instance.max_bind_vars)
2419  ).all():
2420  session.execute(
2421  update(States),
2422  [
2423  {
2424  "state_id": state_id,
2425  "context_id": None,
2426  "context_id_bin": _to_bytes(context_id)
2427  or _generate_ulid_bytes_at_time(last_updated_ts),
2428  "context_user_id": None,
2429  "context_user_id_bin": _to_bytes(context_user_id),
2430  "context_parent_id": None,
2431  "context_parent_id_bin": _to_bytes(context_parent_id),
2432  }
2433  for state_id, last_updated_ts, context_id, context_user_id, context_parent_id in states
2434  ],
2435  )
2436  is_done = not states
2437 
2438  _LOGGER.debug("Migrating states context_ids to binary format: done=%s", is_done)
2439  return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
2440 
2441  def needs_migrate_query(self) -> StatementLambdaElement:
2442  """Return the query to check if the migration needs to run."""
2444 
2445 
2447  """Migration to migrate events context_ids to binary format."""
2448 
2449  required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
2450  migration_id = "event_context_id_as_binary"
2451  migration_version = 2
2452  index_to_drop = ("events", "ix_events_context_id")
2453 
2454  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2455  """Migrate events context_ids to use binary format, return True if completed."""
2456  _to_bytes = _context_id_to_bytes
2457  session_maker = instance.get_session
2458  _LOGGER.debug("Migrating context_ids to binary format")
2459  with session_scope(session=session_maker()) as session:
2460  if events := session.execute(
2461  find_events_context_ids_to_migrate(instance.max_bind_vars)
2462  ).all():
2463  session.execute(
2464  update(Events),
2465  [
2466  {
2467  "event_id": event_id,
2468  "context_id": None,
2469  "context_id_bin": _to_bytes(context_id)
2470  or _generate_ulid_bytes_at_time(time_fired_ts),
2471  "context_user_id": None,
2472  "context_user_id_bin": _to_bytes(context_user_id),
2473  "context_parent_id": None,
2474  "context_parent_id_bin": _to_bytes(context_parent_id),
2475  }
2476  for event_id, time_fired_ts, context_id, context_user_id, context_parent_id in events
2477  ],
2478  )
2479  is_done = not events
2480 
2481  _LOGGER.debug("Migrating events context_ids to binary format: done=%s", is_done)
2482  return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
2483 
2484  def needs_migrate_query(self) -> StatementLambdaElement:
2485  """Return the query to check if the migration needs to run."""
2487 
2488 
2490  """Migration to migrate event_type to event_type_ids."""
2491 
2492  required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
2493  migration_id = "event_type_id_migration"
2494  task = CommitBeforeMigrationTask
2495  # We have to commit before to make sure there are
2496  # no new pending event_types about to be added to
2497  # the db since this happens live
2498 
2499  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2500  """Migrate event_type to event_type_ids, return True if completed."""
2501  session_maker = instance.get_session
2502  _LOGGER.debug("Migrating event_types")
2503  event_type_manager = instance.event_type_manager
2504  with session_scope(session=session_maker()) as session:
2505  if events := session.execute(
2506  find_event_type_to_migrate(instance.max_bind_vars)
2507  ).all():
2508  event_types = {event_type for _, event_type in events}
2509  if None in event_types:
2510  # event_type should never be None but we need to be defensive
2511  # so we don't fail the migration because of a bad state
2512  event_types.remove(None)
2513  event_types.add(_EMPTY_EVENT_TYPE)
2514 
2515  event_type_to_id = event_type_manager.get_many(event_types, session)
2516  if missing_event_types := {
2517  event_type
2518  for event_type, event_id in event_type_to_id.items()
2519  if event_id is None
2520  }:
2521  missing_db_event_types = [
2522  EventTypes(event_type=event_type)
2523  for event_type in missing_event_types
2524  ]
2525  session.add_all(missing_db_event_types)
2526  session.flush() # Assign ids
2527  for db_event_type in missing_db_event_types:
2528  # We cannot add the assigned ids to the event_type_manager
2529  # because the commit could get rolled back
2530  assert (
2531  db_event_type.event_type is not None
2532  ), "event_type should never be None"
2533  event_type_to_id[db_event_type.event_type] = (
2534  db_event_type.event_type_id
2535  )
2536  event_type_manager.clear_non_existent(db_event_type.event_type)
2537 
2538  session.execute(
2539  update(Events),
2540  [
2541  {
2542  "event_id": event_id,
2543  "event_type": None,
2544  "event_type_id": event_type_to_id[
2545  _EMPTY_EVENT_TYPE if event_type is None else event_type
2546  ],
2547  }
2548  for event_id, event_type in events
2549  ],
2550  )
2551 
2552  is_done = not events
2553 
2554  _LOGGER.debug("Migrating event_types done=%s", is_done)
2555  return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
2556 
2557  def migration_done(self, instance: Recorder, session: Session) -> None:
2558  """Will be called after migrate returns True."""
2559  _LOGGER.debug("Activating event_types manager as all data is migrated")
2560  instance.event_type_manager.active = True
2561 
2562  def needs_migrate_query(self) -> StatementLambdaElement:
2563  """Check if the data is migrated."""
2564  return has_event_type_to_migrate()
2565 
2566 
2568  """Migration to migrate entity_ids to states_meta."""
2569 
2570  required_schema_version = STATES_META_SCHEMA_VERSION
2571  migration_id = "entity_id_migration"
2572  task = CommitBeforeMigrationTask
2573  # We have to commit before to make sure there are
2574  # no new pending states_meta about to be added to
2575  # the db since this happens live
2576 
2577  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2578  """Migrate entity_ids to states_meta, return True if completed.
2579 
2580  We do this in two steps because we need the history queries to work
2581  while we are migrating.
2582 
2583  1. Link the states to the states_meta table
2584  2. Remove the entity_id column from the states table (in post_migrate_entity_ids)
2585  """
2586  _LOGGER.debug("Migrating entity_ids")
2587  states_meta_manager = instance.states_meta_manager
2588  with session_scope(session=instance.get_session()) as session:
2589  if states := session.execute(
2590  find_entity_ids_to_migrate(instance.max_bind_vars)
2591  ).all():
2592  entity_ids = {entity_id for _, entity_id in states}
2593  if None in entity_ids:
2594  # entity_id should never be None but we need to be defensive
2595  # so we don't fail the migration because of a bad state
2596  entity_ids.remove(None)
2597  entity_ids.add(_EMPTY_ENTITY_ID)
2598 
2599  entity_id_to_metadata_id = states_meta_manager.get_many(
2600  entity_ids, session, True
2601  )
2602  if missing_entity_ids := {
2603  entity_id
2604  for entity_id, metadata_id in entity_id_to_metadata_id.items()
2605  if metadata_id is None
2606  }:
2607  missing_states_metadata = [
2608  StatesMeta(entity_id=entity_id)
2609  for entity_id in missing_entity_ids
2610  ]
2611  session.add_all(missing_states_metadata)
2612  session.flush() # Assign ids
2613  for db_states_metadata in missing_states_metadata:
2614  # We cannot add the assigned ids to the event_type_manager
2615  # because the commit could get rolled back
2616  assert (
2617  db_states_metadata.entity_id is not None
2618  ), "entity_id should never be None"
2619  entity_id_to_metadata_id[db_states_metadata.entity_id] = (
2620  db_states_metadata.metadata_id
2621  )
2622 
2623  session.execute(
2624  update(States),
2625  [
2626  {
2627  "state_id": state_id,
2628  # We cannot set "entity_id": None yet since
2629  # the history queries still need to work while the
2630  # migration is in progress and we will do this in
2631  # post_migrate_entity_ids
2632  "metadata_id": entity_id_to_metadata_id[
2633  _EMPTY_ENTITY_ID if entity_id is None else entity_id
2634  ],
2635  }
2636  for state_id, entity_id in states
2637  ],
2638  )
2639 
2640  is_done = not states
2641 
2642  _LOGGER.debug("Migrating entity_ids done=%s", is_done)
2643  return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
2644 
2645  def migration_done(self, instance: Recorder, session: Session) -> None:
2646  """Will be called after migrate returns True."""
2647  # The migration has finished, now we start the post migration
2648  # to remove the old entity_id data from the states table
2649  # at this point we can also start using the StatesMeta table
2650  # so we set active to True
2651  _LOGGER.debug("Activating states_meta manager as all data is migrated")
2652  instance.states_meta_manager.active = True
2653  with contextlib.suppress(SQLAlchemyError):
2654  migrate = EntityIDPostMigration(self.schema_versionschema_version, self.migration_changesmigration_changes)
2655  migrate.queue_migration(instance, session)
2656 
2657  def needs_migrate_query(self) -> StatementLambdaElement:
2658  """Check if the data is migrated."""
2659  return has_entity_ids_to_migrate()
2660 
2661 
2663  """Migration to remove old event_id index from states."""
2664 
2665  migration_id = "event_id_post_migration"
2666  task = MigrationTask
2667  migration_version = 2
2668 
2669  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2670  """Remove old event_id index from states, returns True if completed.
2671 
2672  We used to link states to events using the event_id column but we no
2673  longer store state changed events in the events table.
2674 
2675  If all old states have been purged and existing states are in the new
2676  format we can drop the index since it can take up ~10MB per 1M rows.
2677  """
2678  session_maker = instance.get_session
2679  _LOGGER.debug("Cleanup legacy entity_ids")
2680  with session_scope(session=session_maker()) as session:
2681  result = session.execute(has_used_states_event_ids()).scalar()
2682  # In the future we may migrate existing states to the new format
2683  # but in practice very few of these still exist in production and
2684  # removing the index is the likely all that needs to happen.
2685  all_gone = not result
2686 
2687  fk_remove_ok = False
2688  if all_gone:
2689  # Only drop the index if there are no more event_ids in the states table
2690  # ex all NULL
2691  assert instance.engine is not None, "engine should never be None"
2692  if instance.dialect_name == SupportedDialect.SQLITE:
2693  # SQLite does not support dropping foreign key constraints
2694  # so we have to rebuild the table
2695  fk_remove_ok = rebuild_sqlite_table(
2696  session_maker, instance.engine, States
2697  )
2698  else:
2699  try:
2701  session_maker, instance.engine, TABLE_STATES, "event_id"
2702  )
2703  except (InternalError, OperationalError):
2704  fk_remove_ok = False
2705  else:
2706  fk_remove_ok = True
2707  if fk_remove_ok:
2708  _drop_index(session_maker, "states", LEGACY_STATES_EVENT_ID_INDEX)
2709  instance.use_legacy_events_index = False
2710 
2711  return DataMigrationStatus(needs_migrate=False, migration_done=fk_remove_ok)
2712 
2713  @staticmethod
2714  def _legacy_event_id_foreign_key_exists(instance: Recorder) -> bool:
2715  """Check if the legacy event_id foreign key exists."""
2716  engine = instance.engine
2717  assert engine is not None
2718  inspector = sqlalchemy.inspect(engine)
2719  return bool(
2720  next(
2721  (
2722  fk
2723  for fk in inspector.get_foreign_keys(TABLE_STATES)
2724  if fk["constrained_columns"] == ["event_id"]
2725  ),
2726  None,
2727  )
2728  )
2729 
2731  self, instance: Recorder, session: Session
2732  ) -> DataMigrationStatus:
2733  """Return if the migration needs to run."""
2734  if self.schema_versionschema_version <= LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION:
2735  return DataMigrationStatus(needs_migrate=False, migration_done=False)
2736  if get_index_by_name(
2737  session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
2738  ) is not None or self._legacy_event_id_foreign_key_exists_legacy_event_id_foreign_key_exists(instance):
2739  instance.use_legacy_events_index = True
2740  return DataMigrationStatus(needs_migrate=True, migration_done=False)
2741  return DataMigrationStatus(needs_migrate=False, migration_done=True)
2742 
2743 
2745  """Migration to remove old entity_id strings from states."""
2746 
2747  migration_id = "entity_id_post_migration"
2748  task = MigrationTask
2749  index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX)
2750 
2751  def migrate_data_impl(self, instance: Recorder) -> DataMigrationStatus:
2752  """Migrate some data, returns True if migration is completed."""
2753  is_done = post_migrate_entity_ids(instance)
2754  return DataMigrationStatus(needs_migrate=not is_done, migration_done=is_done)
2755 
2756  def needs_migrate_query(self) -> StatementLambdaElement:
2757  """Check if the data is migrated."""
2759 
2760 
2761 NON_LIVE_DATA_MIGRATORS = (
2762  StatesContextIDMigration, # Introduced in HA Core 2023.4
2763  EventsContextIDMigration, # Introduced in HA Core 2023.4
2764 )
2765 
2766 LIVE_DATA_MIGRATORS = (
2767  EventTypeIDMigration,
2768  EntityIDMigration,
2769  EventIDPostMigration,
2770 )
2771 
2772 
2773 def _mark_migration_done(session: Session, migration: type[BaseMigration]) -> None:
2774  """Mark a migration as done in the database."""
2775  session.merge(
2777  migration_id=migration.migration_id, version=migration.migration_version
2778  )
2779  )
2780 
2781 
2783  session_maker: Callable[[], Session], engine: Engine, table: type[Base]
2784 ) -> bool:
2785  """Rebuild an SQLite table.
2786 
2787  This must only be called after all migrations are complete
2788  and the database is in a consistent state.
2789 
2790  If the table is not migrated to the current schema this
2791  will likely fail.
2792  """
2793  table_table = cast(Table, table.__table__)
2794  orig_name = table_table.name
2795  temp_name = f"{table_table.name}_temp_{int(time())}"
2796 
2797  _LOGGER.warning("Rebuilding SQLite table %s; %s", orig_name, MIGRATION_NOTE_WHILE)
2798 
2799  try:
2800  # 12 step SQLite table rebuild
2801  # https://www.sqlite.org/lang_altertable.html
2802  with session_scope(session=session_maker()) as session:
2803  # Step 1 - Disable foreign keys
2804  session.connection().execute(text("PRAGMA foreign_keys=OFF"))
2805  # Step 2 - create a transaction
2806  with session_scope(session=session_maker()) as session:
2807  # Step 3 - we know all the indexes, triggers, and views associated with table X
2808  new_sql = str(CreateTable(table_table).compile(engine)).strip("\n") + ";"
2809  source_sql = f"CREATE TABLE {orig_name}"
2810  replacement_sql = f"CREATE TABLE {temp_name}"
2811  assert source_sql in new_sql, f"{source_sql} should be in new_sql"
2812  new_sql = new_sql.replace(source_sql, replacement_sql)
2813  # Step 4 - Create temp table
2814  session.execute(text(new_sql))
2815  column_names = ",".join([column.name for column in table_table.columns])
2816  # Step 5 - Transfer content
2817  sql = f"INSERT INTO {temp_name} SELECT {column_names} FROM {orig_name};" # noqa: S608
2818  session.execute(text(sql))
2819  # Step 6 - Drop the original table
2820  session.execute(text(f"DROP TABLE {orig_name}"))
2821  # Step 7 - Rename the temp table
2822  session.execute(text(f"ALTER TABLE {temp_name} RENAME TO {orig_name}"))
2823  # Step 8 - Recreate indexes
2824  for index in table_table.indexes:
2825  index.create(session.connection())
2826  # Step 9 - Recreate views (there are none)
2827  # Step 10 - Check foreign keys
2828  session.execute(text("PRAGMA foreign_key_check"))
2829  # Step 11 - Commit transaction
2830  session.commit()
2831  except SQLAlchemyError:
2832  _LOGGER.exception("Error recreating SQLite table %s", table_table.name)
2833  # Swallow the exception since we do not want to ever raise
2834  # an integrity error as it would cause the database
2835  # to be discarded and recreated from scratch
2836  return False
2837  else:
2838  _LOGGER.warning("Rebuilding SQLite table %s finished", orig_name)
2839  return True
2840  finally:
2841  with session_scope(session=session_maker()) as session:
2842  # Step 12 - Re-enable foreign keys
2843  session.connection().execute(text("PRAGMA foreign_keys=ON"))
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
Definition: migration.py:2395
None __init__(self, int schema_version, dict[str, int] migration_changes)
Definition: migration.py:2284
bool needs_migrate(self, Recorder instance, Session session)
Definition: migration.py:2318
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2306
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
Definition: migration.py:2315
None migration_done(self, Recorder instance, Session session)
Definition: migration.py:2309
None migrate_all(self, Recorder instance, Callable[[], Session] session_maker)
Definition: migration.py:2353
None queue_migration(self, Recorder instance, Session session)
Definition: migration.py:2373
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2577
None migration_done(self, Recorder instance, Session session)
Definition: migration.py:2645
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2751
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
Definition: migration.py:2732
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2669
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2499
None migration_done(self, Recorder instance, Session session)
Definition: migration.py:2557
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2454
DataMigrationStatus migrate_data_impl(self, Recorder instance)
Definition: migration.py:2411
None __init_subclass__(cls, int target_version, **Any kwargs)
Definition: migration.py:945
None __init__(self, Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, int old_version)
Definition: migration.py:959
type[_SchemaVersionMigrator] get_migrator(cls, int target_version)
Definition: migration.py:971
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
None create(HomeAssistant hass, str message, str|None title=None, str|None notification_id=None)
Definition: __init__.py:84
def execute(hass, filename, source, data=None, return_response=False)
Definition: __init__.py:194
None delete_statistics_duplicates(Recorder instance, HomeAssistant hass, Session session)
Definition: duplicates.py:141
None delete_statistics_meta_duplicates(Recorder instance, Session session)
Definition: duplicates.py:246
int|None get_schema_version(Callable[[], Session] session_maker)
Definition: migration.py:193
None _apply_update(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, int new_version, int old_version)
Definition: migration.py:934
None _add_constraint(Callable[[], Session] session_maker, AddConstraint add_constraint, str table, str column)
Definition: migration.py:795
None _modify_columns(Callable[[], Session] session_maker, Engine engine, str table_name, list[str] columns_def)
Definition: migration.py:585
bytes|None _context_id_to_bytes(str|None context_id)
Definition: migration.py:2166
None _create_index(Callable[[], Session] session_maker, str table_name, str index_name)
Definition: migration.py:432
None raise_if_exception_missing_str(Exception ex, Iterable[str] match_substrs)
Definition: migration.py:172
None _add_columns(Callable[[], Session] session_maker, str table_name, list[str] columns_def)
Definition: migration.py:542
bool rebuild_sqlite_table(Callable[[], Session] session_maker, Engine engine, type[Base] table)
Definition: migration.py:2784
bytes _generate_ulid_bytes_at_time(float|None timestamp)
Definition: migration.py:2182
None _migrate_statistics_columns_to_timestamp(Recorder instance, Callable[[], Session] session_maker, Engine engine)
Definition: migration.py:2100
None _correct_table_character_set_and_collation(str table, Callable[[], Session] session_maker)
Definition: migration.py:1876
bool non_live_data_migration_needed(Recorder instance, Callable[[], Session] session_maker, int schema_version)
Definition: migration.py:381
SchemaValidationStatus migrate_schema_live(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
Definition: migration.py:348
None _drop_foreign_key_constraints(Callable[[], Session] session_maker, Engine engine, str table, str column)
Definition: migration.py:701
set[str] _find_schema_errors(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker)
Definition: migration.py:255
None _delete_foreign_key_violations(Callable[[], Session] session_maker, Engine engine, str table, str column, str foreign_table, str foreign_column)
Definition: migration.py:820
bool _schema_is_current(int current_version)
Definition: migration.py:214
None _mark_migration_done(Session session, type[BaseMigration] migration)
Definition: migration.py:2773
bool _execute_or_collect_error(Callable[[], Session] session_maker, str query, list[str] errors)
Definition: migration.py:468
None _wipe_old_string_time_columns(Recorder instance, Engine engine, Session session)
Definition: migration.py:1903
SchemaValidationStatus _migrate_schema(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status, int end_version)
Definition: migration.py:296
None _drop_index(Callable[[], Session] session_maker, str table_name, str index_name, bool|None quiet=None)
Definition: migration.py:484
None _migrate_statistics_columns_to_timestamp_one_by_one(Recorder instance, Callable[[], Session] session_maker)
Definition: migration.py:2047
bool post_migrate_entity_ids(Recorder instance)
Definition: migration.py:2187
dict[str, int] _get_migration_changes(Session session)
Definition: migration.py:368
None _migrate_statistics_columns_to_timestamp_removing_duplicates(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker, Engine engine)
Definition: migration.py:1843
bool _initialize_database(Session session)
Definition: migration.py:2205
None _restore_foreign_key_constraints(Callable[[], Session] session_maker, Engine engine, list[tuple[str, str, str|None, str|None]] foreign_columns)
Definition: migration.py:741
int|None _get_schema_version(Session session)
Definition: migration.py:183
bool initialize_database(Callable[[], Session] session_maker)
Definition: migration.py:2232
None migrate_data_live(Recorder instance, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
Definition: migration.py:417
SchemaValidationStatus|None validate_db_schema(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker)
Definition: migration.py:221
None migrate_data_non_live(Recorder instance, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
Definition: migration.py:400
None _update_states_table_with_foreign_key_options(Callable[[], Session] session_maker, Engine engine)
Definition: migration.py:641
None _migrate_columns_to_timestamp(Recorder instance, Callable[[], Session] session_maker, Engine engine)
Definition: migration.py:1958
bool live_migration(SchemaValidationStatus schema_status)
Definition: migration.py:264
SchemaValidationStatus migrate_schema_non_live(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
Definition: migration.py:334
float|None datetime_to_timestamp_or_none(datetime|None dt)
Definition: time.py:55
StatementLambdaElement batch_cleanup_entity_ids()
Definition: queries.py:751
StatementLambdaElement has_used_states_entity_ids()
Definition: queries.py:776
StatementLambdaElement has_states_context_ids_to_migrate()
Definition: queries.py:797
StatementLambdaElement has_used_states_event_ids()
Definition: queries.py:783
StatementLambdaElement has_events_context_ids_to_migrate()
Definition: queries.py:790
StatementLambdaElement find_states_context_ids_to_migrate(int max_bind_vars)
Definition: queries.py:818
StatementLambdaElement find_entity_ids_to_migrate(int max_bind_vars)
Definition: queries.py:739
StatementLambdaElement find_events_context_ids_to_migrate(int max_bind_vars)
Definition: queries.py:712
StatementLambdaElement has_entity_ids_to_migrate()
Definition: queries.py:811
StatementLambdaElement find_event_type_to_migrate(int max_bind_vars)
Definition: queries.py:727
StatementLambdaElement has_event_type_to_migrate()
Definition: queries.py:804
StatementLambdaElement get_migration_changes()
Definition: queries.py:833
bool cleanup_statistics_timestamp_migration(Recorder instance)
Definition: statistics.py:2576
str|None get_index_by_name(Session session, str table_name, str index_name)
Definition: util.py:927
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Definition: condition.py:802
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86