1 """Schema migration helpers."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
6 from collections.abc
import Callable, Iterable
8 from dataclasses
import dataclass, replace
as dataclass_replace
9 from datetime
import timedelta
12 from typing
import TYPE_CHECKING, Any, cast, final
16 from sqlalchemy
import ForeignKeyConstraint, MetaData, Table, func, text, update
17 from sqlalchemy.engine
import CursorResult, Engine
18 from sqlalchemy.exc
import (
26 from sqlalchemy.orm.session
import Session
27 from sqlalchemy.schema
import AddConstraint, CreateTable, DropConstraint
28 from sqlalchemy.sql.expression
import true
29 from sqlalchemy.sql.lambdas
import StatementLambdaElement
35 from .auto_repairs.events.schema
import (
36 correct_db_schema
as events_correct_db_schema,
37 validate_db_schema
as events_validate_db_schema,
39 from .auto_repairs.states.schema
import (
40 correct_db_schema
as states_correct_db_schema,
41 validate_db_schema
as states_validate_db_schema,
43 from .auto_repairs.statistics.duplicates
import (
44 delete_statistics_duplicates,
45 delete_statistics_meta_duplicates,
47 from .auto_repairs.statistics.schema
import (
48 correct_db_schema
as statistics_correct_db_schema,
49 validate_db_schema
as statistics_validate_db_schema,
52 CONTEXT_ID_AS_BINARY_SCHEMA_VERSION,
53 EVENT_TYPE_IDS_SCHEMA_VERSION,
54 LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION,
55 STATES_META_SCHEMA_VERSION,
58 from .db_schema
import (
60 CONTEXT_ID_BIN_MAX_LENGTH,
61 DOUBLE_PRECISION_TYPE_SQL,
62 LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX,
63 LEGACY_STATES_EVENT_ID_INDEX,
65 MYSQL_DEFAULT_CHARSET,
82 from .models
import process_timestamp
83 from .models.time
import datetime_to_timestamp_or_none
84 from .queries
import (
85 batch_cleanup_entity_ids,
86 delete_duplicate_short_term_statistics_row,
87 delete_duplicate_statistics_row,
88 find_entity_ids_to_migrate,
89 find_event_type_to_migrate,
90 find_events_context_ids_to_migrate,
91 find_states_context_ids_to_migrate,
92 find_unmigrated_short_term_statistics_rows,
93 find_unmigrated_statistics_rows,
94 get_migration_changes,
95 has_entity_ids_to_migrate,
96 has_event_type_to_migrate,
97 has_events_context_ids_to_migrate,
98 has_states_context_ids_to_migrate,
99 has_used_states_entity_ids,
100 has_used_states_event_ids,
101 migrate_single_short_term_statistics_row_to_timestamp,
102 migrate_single_statistics_row_to_timestamp,
104 from .statistics
import cleanup_statistics_timestamp_migration, get_start_time
105 from .tasks
import RecorderTask
107 database_job_retry_wrapper,
108 database_job_retry_wrapper_method,
109 execute_stmt_lambda_element,
111 retryable_database_job_method,
116 from .
import Recorder
121 LIVE_MIGRATION_MIN_SCHEMA_VERSION = 42
123 MIGRATION_NOTE_OFFLINE = (
124 "Note: this may take several hours on large databases and slow machines. "
125 "Home Assistant will not start until the upgrade is completed. Please be patient "
126 "and do not turn off or restart Home Assistant while the upgrade is in progress!"
128 MIGRATION_NOTE_MINUTES = (
129 "Note: this may take several minutes on large databases and slow machines. "
132 MIGRATION_NOTE_WHILE =
"This will take a while; please be patient!"
134 _EMPTY_ENTITY_ID =
"missing.entity_id"
135 _EMPTY_EVENT_TYPE =
"missing_event_type"
137 _LOGGER = logging.getLogger(__name__)
144 context_bin_type: str
148 big_int_type=
"INTEGER(20)",
149 timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
150 context_bin_type=f
"BLOB({CONTEXT_ID_BIN_MAX_LENGTH})",
154 big_int_type=
"INTEGER",
155 timestamp_type=DOUBLE_PRECISION_TYPE_SQL,
156 context_bin_type=
"BYTEA",
160 big_int_type=
"INTEGER",
161 timestamp_type=
"FLOAT",
162 context_bin_type=
"BLOB",
165 _COLUMN_TYPES_FOR_DIALECT: dict[SupportedDialect |
None, _ColumnTypesForDialect] = {
166 SupportedDialect.MYSQL: _MYSQL_COLUMN_TYPES,
167 SupportedDialect.POSTGRESQL: _POSTGRESQL_COLUMN_TYPES,
168 SupportedDialect.SQLITE: _SQLITE_COLUMN_TYPES,
173 """Raise if the exception and cause do not contain the match substrs."""
174 lower_ex_strs = [
str(ex).lower(),
str(ex.__cause__).lower()]
175 for str_sub
in match_substrs:
176 for exc_str
in lower_ex_strs:
177 if exc_str
and str_sub
in exc_str:
184 """Get the schema version."""
186 session.query(SchemaChanges.schema_version)
187 .order_by(SchemaChanges.change_id.desc())
190 return getattr(res,
"schema_version",
None)
194 """Get the schema version."""
196 with session_scope(session=session_maker(), read_only=
True)
as session:
199 _LOGGER.exception(
"Error when determining DB schema version")
203 @dataclass(frozen=True, kw_only=True)
205 """Store schema validation status."""
208 migration_needed: bool
209 non_live_data_migration_needed: bool
210 schema_errors: set[str]
215 """Check if the schema is current."""
216 return current_version == SCHEMA_VERSION
220 hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
221 ) -> SchemaValidationStatus |
None:
222 """Check if the schema is valid.
224 This checks that the schema is the current version as well as for some common schema
225 errors caused by manual migration between database engines, for example importing an
226 SQLite database to MariaDB.
228 schema_errors: set[str] = set()
231 if current_version
is None:
239 schema_migration_needed =
not is_current
241 instance, session_maker, current_version
245 current_version=current_version,
246 non_live_data_migration_needed=_non_live_data_migration_needed,
247 migration_needed=schema_migration_needed
or _non_live_data_migration_needed,
248 schema_errors=schema_errors,
249 start_version=current_version,
254 hass: HomeAssistant, instance: Recorder, session_maker: Callable[[], Session]
256 """Find schema errors."""
257 schema_errors: set[str] = set()
258 schema_errors |= statistics_validate_db_schema(instance)
259 schema_errors |= states_validate_db_schema(instance)
260 schema_errors |= events_validate_db_schema(instance)
265 """Check if live migration is possible."""
267 schema_status.current_version >= LIVE_MIGRATION_MIN_SCHEMA_VERSION
268 and not schema_status.non_live_data_migration_needed
273 """Prepare for migration.
275 This function is called before calling Base.metadata.create_all.
277 inspector = sqlalchemy.inspect(engine)
279 if inspector.has_table(
"statistics_meta")
and not inspector.has_table(
280 "statistics_short_term"
284 LegacyBase.metadata.create_all(
285 engine, (LegacyBase.metadata.tables[
"statistics_short_term"],)
293 session_maker: Callable[[], Session],
294 schema_status: SchemaValidationStatus,
296 ) -> SchemaValidationStatus:
297 """Check if the schema needs to be upgraded."""
298 current_version = schema_status.current_version
299 start_version = schema_status.start_version
301 if current_version < end_version:
303 "The database is about to upgrade from schema version %s to %s%s",
307 f
". {MIGRATION_NOTE_OFFLINE}"
308 if current_version < LIVE_MIGRATION_MIN_SCHEMA_VERSION
312 schema_status = dataclass_replace(schema_status, current_version=end_version)
314 for version
in range(current_version, end_version):
315 new_version = version + 1
316 _LOGGER.info(
"Upgrading recorder db schema to version %s", new_version)
317 _apply_update(instance, hass, engine, session_maker, new_version, start_version)
323 _LOGGER.warning(
"Upgrade to version %s done", new_version)
332 session_maker: Callable[[], Session],
333 schema_status: SchemaValidationStatus,
334 ) -> SchemaValidationStatus:
335 """Check if the schema needs to be upgraded."""
336 end_version = LIVE_MIGRATION_MIN_SCHEMA_VERSION
338 instance, hass, engine, session_maker, schema_status, end_version
346 session_maker: Callable[[], Session],
347 schema_status: SchemaValidationStatus,
348 ) -> SchemaValidationStatus:
349 """Check if the schema needs to be upgraded."""
350 end_version = SCHEMA_VERSION
352 instance, hass, engine, session_maker, schema_status, end_version
356 if schema_errors := schema_status.schema_errors:
358 "Database is about to correct DB schema errors: %s",
359 ", ".join(sorted(schema_errors)),
361 statistics_correct_db_schema(instance, schema_errors)
362 states_correct_db_schema(instance, schema_errors)
363 events_correct_db_schema(instance, schema_errors)
369 """Return migration changes as a dict."""
370 migration_changes: dict[str, int] = {
374 return migration_changes
379 session_maker: Callable[[], Session],
382 """Return True if non-live data migration is needed.
384 This must only be called if database schema is current.
386 migration_needed =
False
389 for migrator_cls
in NON_LIVE_DATA_MIGRATORS:
390 migrator = migrator_cls(schema_version, migration_changes)
391 migration_needed |= migrator.needs_migrate(instance, session)
393 return migration_needed
398 session_maker: Callable[[], Session],
399 schema_status: SchemaValidationStatus,
401 """Do non-live data migration.
403 This must be called after non-live schema migration is completed.
408 for migrator_cls
in NON_LIVE_DATA_MIGRATORS:
409 migrator = migrator_cls(schema_status.start_version, migration_changes)
410 migrator.migrate_all(instance, session_maker)
415 session_maker: Callable[[], Session],
416 schema_status: SchemaValidationStatus,
418 """Queue live schema migration tasks.
420 This must be called after live schema migration is completed.
425 for migrator_cls
in LIVE_DATA_MIGRATORS:
426 migrator = migrator_cls(schema_status.start_version, migration_changes)
427 migrator.queue_migration(instance, session)
431 session_maker: Callable[[], Session], table_name: str, index_name: str
433 """Create an index for the specified table.
435 The index name should match the name given for the index
436 within the table definition described in the models
438 table = Table(table_name, Base.metadata)
439 _LOGGER.debug(
"Looking up index %s for table %s", index_name, table_name)
441 index_list = [idx
for idx
in table.indexes
if idx.name == index_name]
443 _LOGGER.debug(
"The index %s no longer exists", index_name)
445 index = index_list[0]
446 _LOGGER.debug(
"Creating %s index", index_name)
448 "Adding index `%s` to table `%s`. %s",
451 MIGRATION_NOTE_MINUTES,
455 connection = session.connection()
456 index.create(connection)
457 except (InternalError, OperationalError, ProgrammingError)
as err:
460 "Index %s already exists on %s, continuing", index_name, table_name
463 _LOGGER.warning(
"Finished adding index `%s` to table `%s`", index_name, table_name)
467 session_maker: Callable[[], Session], query: str, errors: list[str]
469 """Execute a query or collect an error."""
472 session.connection().
execute(text(query))
473 except SQLAlchemyError
as err:
474 errors.append(
str(err))
480 session_maker: Callable[[], Session],
483 quiet: bool |
None =
None,
485 """Drop an index from a specified table.
487 There is no universal way to do something like `DROP INDEX IF EXISTS`
488 so we will simply execute the DROP command and ignore any exceptions
490 WARNING: Due to some engines (MySQL at least) being unable to use bind
491 parameters in a DROP INDEX statement (at least via SQLAlchemy), the query
492 string here is generated from the method parameters without sanitizing.
493 DO NOT USE THIS FUNCTION IN ANY OPERATION THAT TAKES USER INPUT.
496 "Dropping index `%s` from table `%s`. %s",
499 MIGRATION_NOTE_MINUTES,
501 index_to_drop: str |
None =
None
505 if index_to_drop
is None:
507 "The index `%s` on table `%s` no longer exists", index_name, table_name
511 errors: list[str] = []
514 f
"DROP INDEX {index_name}",
516 f
"DROP INDEX {table_name}.{index_name}",
518 f
"DROP INDEX {index_name} ON {table_name}",
521 f
"DROP INDEX {index_to_drop}",
525 "Finished dropping index `%s` from table `%s`", index_name, table_name
531 "Failed to drop index `%s` from table `%s`. Schema "
532 "Migration will continue; this is not a "
533 "critical operation: %s",
541 session_maker: Callable[[], Session], table_name: str, columns_def: list[str]
543 """Add columns to a table."""
545 "Adding columns %s to table %s. %s",
546 ", ".join(column.split(
" ")[0]
for column
in columns_def),
548 MIGRATION_NOTE_MINUTES,
551 columns_def = [f
"ADD {col_def}" for col_def
in columns_def]
555 connection = session.connection()
557 text(f
"ALTER TABLE {table_name} {', '.join(columns_def)}")
559 except (InternalError, OperationalError, ProgrammingError):
562 _LOGGER.info(
"Unable to use quick column add. Adding 1 by 1")
566 for column_def
in columns_def:
569 connection = session.connection()
570 connection.execute(text(f
"ALTER TABLE {table_name} {column_def}"))
571 except (InternalError, OperationalError, ProgrammingError)
as err:
574 "Column %s already exists on %s, continuing",
575 column_def.split(
" ")[1],
581 session_maker: Callable[[], Session],
584 columns_def: list[str],
586 """Modify columns in a table."""
587 if engine.dialect.name == SupportedDialect.SQLITE:
590 "Skipping to modify columns %s in table %s; "
591 "Modifying column length in SQLite is unnecessary, "
592 "it does not impose any length restrictions"
594 ", ".join(column.split(
" ")[0]
for column
in columns_def),
600 "Modifying columns %s in table %s. %s",
601 ", ".join(column.split(
" ")[0]
for column
in columns_def),
603 MIGRATION_NOTE_MINUTES,
606 if engine.dialect.name == SupportedDialect.POSTGRESQL:
608 f
"ALTER {column} TYPE {type_}"
609 for column, type_
in (col_def.split(
" ", 1)
for col_def
in columns_def)
611 elif engine.dialect.name ==
"mssql":
612 columns_def = [f
"ALTER COLUMN {col_def}" for col_def
in columns_def]
614 columns_def = [f
"MODIFY {col_def}" for col_def
in columns_def]
618 connection = session.connection()
620 text(f
"ALTER TABLE {table_name} {', '.join(columns_def)}")
622 except (InternalError, OperationalError):
623 _LOGGER.info(
"Unable to use quick column modify. Modifying 1 by 1")
627 for column_def
in columns_def:
630 connection = session.connection()
631 connection.execute(text(f
"ALTER TABLE {table_name} {column_def}"))
632 except (InternalError, OperationalError):
634 "Could not modify column %s in table %s", column_def, table_name
640 session_maker: Callable[[], Session], engine: Engine
642 """Add the options to foreign key constraints.
644 This is not supported for SQLite because it does not support
645 dropping constraints.
648 if engine.dialect.name
not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
650 "_update_states_table_with_foreign_key_options not supported for "
651 f
"{engine.dialect.name}"
654 inspector = sqlalchemy.inspect(engine)
655 tmp_states_table = Table(TABLE_STATES, MetaData())
658 "old_fk": ForeignKeyConstraint(
659 (), (), name=foreign_key[
"name"], table=tmp_states_table
661 "columns": foreign_key[
"constrained_columns"],
663 for foreign_key
in inspector.get_foreign_keys(TABLE_STATES)
664 if foreign_key[
"name"]
667 not foreign_key.get(
"options")
669 or foreign_key.get(
"options", {}).
get(
"ondelete")
is None
676 states_key_constraints = Base.metadata.tables[TABLE_STATES].foreign_key_constraints
681 connection = session.connection()
682 connection.execute(DropConstraint(alter[
"old_fk"]))
683 for fkc
in states_key_constraints:
684 if fkc.column_keys == alter[
"columns"]:
688 create_rule = fkc._create_rule
689 add_constraint = AddConstraint(fkc)
690 fkc._create_rule = create_rule
691 connection.execute(add_constraint)
692 except (InternalError, OperationalError):
694 "Could not update foreign options in %s table", TABLE_STATES
700 session_maker: Callable[[], Session], engine: Engine, table: str, column: str
702 """Drop foreign key constraints for a table on specific columns.
704 This is not supported for SQLite because it does not support
705 dropping constraints.
708 if engine.dialect.name
not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
710 f
"_drop_foreign_key_constraints not supported for {engine.dialect.name}"
713 inspector = sqlalchemy.inspect(engine)
716 tmp_table = Table(table, MetaData())
718 ForeignKeyConstraint((), (), name=foreign_key[
"name"], table=tmp_table)
719 for foreign_key
in inspector.get_foreign_keys(table)
720 if foreign_key[
"name"]
and foreign_key[
"constrained_columns"] == [column]
726 connection = session.connection()
727 connection.execute(DropConstraint(drop))
728 except (InternalError, OperationalError):
730 "Could not drop foreign constraints in %s table on %s",
738 session_maker: Callable[[], Session],
740 foreign_columns: list[tuple[str, str, str |
None, str |
None]],
742 """Restore foreign key constraints."""
743 for table, column, foreign_table, foreign_column
in foreign_columns:
744 constraints = Base.metadata.tables[table].foreign_key_constraints
745 for constraint
in constraints:
746 if constraint.column_keys == [column]:
749 _LOGGER.info(
"Did not find a matching constraint for %s.%s", table, column)
752 inspector = sqlalchemy.inspect(engine)
754 foreign_key[
"name"]
and foreign_key[
"constrained_columns"] == [column]
755 for foreign_key
in inspector.get_foreign_keys(table)
758 "The database already has a matching constraint for %s.%s",
765 assert foreign_table
is not None
766 assert foreign_column
is not None
771 create_rule = constraint._create_rule
772 add_constraint = AddConstraint(constraint)
773 constraint._create_rule = create_rule
776 except IntegrityError:
779 "Could not update foreign options in %s table, will delete "
780 "violations and try again"
785 session_maker, engine, table, column, foreign_table, foreign_column
791 session_maker: Callable[[], Session],
792 add_constraint: AddConstraint,
796 """Add a foreign key constraint."""
798 "Adding foreign key constraint to %s.%s. "
799 "Note: this can take several minutes on large databases and slow "
800 "machines. Please be patient!",
806 connection = session.connection()
807 connection.execute(add_constraint)
808 except (InternalError, OperationalError):
809 _LOGGER.exception(
"Could not update foreign options in %s table", table)
814 session_maker: Callable[[], Session],
821 """Remove rows which violate the constraints."""
822 if engine.dialect.name
not in (SupportedDialect.MYSQL, SupportedDialect.POSTGRESQL):
824 f
"_delete_foreign_key_violations not supported for {engine.dialect.name}"
828 "Rows in table %s where %s references non existing %s.%s will be %s. "
829 "Note: this can take several minutes on large databases and slow "
830 "machines. Please be patient!",
835 "set to NULL" if table == foreign_table
else "deleted",
838 result: CursorResult |
None =
None
839 if table == foreign_table:
844 if engine.dialect.name == SupportedDialect.MYSQL:
845 while result
is None or result.rowcount > 0:
850 result = session.connection().
execute(
852 f
"UPDATE {table} as t1 "
853 f
"SET {column} = NULL "
855 f
"t1.{column} IS NOT NULL AND "
858 f
"FROM (SELECT {foreign_column} from {foreign_table}) AS t2 "
859 f
"WHERE t2.{foreign_column} = t1.{column})) "
863 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
864 while result
is None or result.rowcount > 0:
868 result = session.connection().
execute(
871 f
"SET {column} = NULL "
872 f
"WHERE {column} in "
873 f
"(SELECT {column} from {table} as t1 "
875 f
"t1.{column} IS NOT NULL AND "
878 f
"FROM {foreign_table} AS t2 "
879 f
"WHERE t2.{foreign_column} = t1.{column})) "
885 if engine.dialect.name == SupportedDialect.MYSQL:
886 while result
is None or result.rowcount > 0:
888 result = session.connection().
execute(
895 f
"DELETE FROM {table} "
897 f
"{table}.{column} IS NOT NULL AND "
900 f
"FROM {foreign_table} AS t2 "
901 f
"WHERE t2.{foreign_column} = {table}.{column})) "
905 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
906 while result
is None or result.rowcount > 0:
910 result = session.connection().
execute(
912 f
"DELETE FROM {table} "
913 f
"WHERE {column} in "
914 f
"(SELECT {column} from {table} as t1 "
916 f
"t1.{column} IS NOT NULL AND "
919 f
"FROM {foreign_table} AS t2 "
920 f
"WHERE t2.{foreign_column} = t1.{column})) "
926 @database_job_retry_wrapper("Apply migration update", 10)
931 session_maker: Callable[[], Session],
935 """Perform operations to bring schema up to date."""
936 migrator_cls = _SchemaVersionMigrator.get_migrator(new_version)
937 migrator_cls(instance, hass, engine, session_maker, old_version).apply_update()
941 """Perform operations to bring schema up to date."""
943 __migrators: dict[int, type[_SchemaVersionMigrator]] = {}
946 """Post initialisation processing."""
948 if target_version
in _SchemaVersionMigrator.__migrators:
949 raise ValueError(
"Duplicated version")
950 _SchemaVersionMigrator.__migrators[target_version] = cls
957 session_maker: Callable[[], Session],
966 assert engine.dialect.name
is not None,
"Dialect name must be set"
967 dialect = try_parse_enum(SupportedDialect, engine.dialect.name)
968 self.
column_typescolumn_types = _COLUMN_TYPES_FOR_DIALECT.get(dialect, _SQLITE_COLUMN_TYPES)
971 def get_migrator(cls, target_version: int) -> type[_SchemaVersionMigrator]:
972 """Return a migrator for a specific schema version."""
974 return cls.__migrators[target_version]
975 except KeyError
as err:
977 f
"No migrator for schema version {target_version}"
982 """Perform operations to bring schema up to date."""
987 """Version specific update method."""
990 class _SchemaVersion1Migrator(_SchemaVersionMigrator, target_version=1):
992 """Version specific update method."""
998 """Version specific update method."""
1006 """Version specific update method."""
1010 class _SchemaVersion4Migrator(_SchemaVersionMigrator, target_version=4):
1012 """Version specific update method."""
1033 """Version specific update method."""
1040 """Version specific update method."""
1044 [
"context_id CHARACTER(36)",
"context_user_id CHARACTER(36)"],
1052 [
"context_id CHARACTER(36)",
"context_user_id CHARACTER(36)"],
1061 """Version specific update method."""
1066 class _SchemaVersion8Migrator(_SchemaVersionMigrator, target_version=8):
1068 """Version specific update method."""
1077 """Version specific update method."""
1101 """Version specific update method."""
1105 class _SchemaVersion11Migrator(_SchemaVersionMigrator, target_version=11):
1107 """Version specific update method."""
1113 if self.
engineengine.dialect.name
in (
1114 SupportedDialect.MYSQL,
1115 SupportedDialect.POSTGRESQL,
1124 """Version specific update method."""
1125 if self.
engineengine.dialect.name == SupportedDialect.MYSQL:
1136 """Version specific update method."""
1137 if self.
engineengine.dialect.name == SupportedDialect.MYSQL:
1142 [
"time_fired DATETIME(6)",
"created DATETIME(6)"],
1149 "last_changed DATETIME(6)",
1150 "last_updated DATETIME(6)",
1151 "created DATETIME(6)",
1158 """Version specific update method."""
1166 """Version specific update method."""
1170 class _SchemaVersion16Migrator(_SchemaVersionMigrator, target_version=16):
1172 """Version specific update method."""
1174 if self.
engineengine.dialect.name
in (
1175 SupportedDialect.MYSQL,
1176 SupportedDialect.POSTGRESQL,
1189 """Version specific update method."""
1193 class _SchemaVersion18Migrator(_SchemaVersionMigrator, target_version=18):
1195 """Version specific update method."""
1204 Base.metadata.drop_all(
1207 cast(Table, StatisticsShortTerm.__table__),
1208 cast(Table, Statistics.__table__),
1209 cast(Table, StatisticsMeta.__table__),
1213 cast(Table, StatisticsMeta.__table__).
create(self.
engineengine)
1214 cast(Table, StatisticsShortTerm.__table__).
create(self.
engineengine)
1215 cast(Table, Statistics.__table__).
create(self.
engineengine)
1220 """Version specific update method."""
1229 """Version specific update method."""
1231 if self.
engineengine.dialect.name
in [
1232 SupportedDialect.MYSQL,
1233 SupportedDialect.POSTGRESQL,
1240 f
"{column} {DOUBLE_PRECISION_TYPE_SQL}"
1241 for column
in (
"max",
"mean",
"min",
"state",
"sum")
1248 """Version specific update method."""
1250 if self.
engineengine.dialect.name == SupportedDialect.MYSQL:
1251 for table
in (
"events",
"states",
"statistics_meta"):
1257 """Version specific update method."""
1263 if self.
engineengine.dialect.name ==
"oracle":
1266 Base.metadata.drop_all(
1269 cast(Table, StatisticsShortTerm.__table__),
1270 cast(Table, Statistics.__table__),
1271 cast(Table, StatisticsMeta.__table__),
1272 cast(Table, StatisticsRuns.__table__),
1276 cast(Table, StatisticsRuns.__table__).
create(self.
engineengine)
1277 cast(Table, StatisticsMeta.__table__).
create(self.
engineengine)
1278 cast(Table, StatisticsShortTerm.__table__).
create(self.
engineengine)
1279 cast(Table, Statistics.__table__).
create(self.
engineengine)
1285 if session.query(Statistics.id).count()
and (
1286 last_run_string := session.query(
1287 func.max(StatisticsRuns.start)
1291 if last_run_start_time:
1292 fake_start_time = last_run_start_time +
timedelta(minutes=5)
1293 while fake_start_time < last_run_start_time +
timedelta(hours=1):
1301 for sum_statistic
in session.query(StatisticsMeta.id).filter_by(
1307 Statistics.last_reset,
1311 .filter_by(metadata_id=sum_statistic.id)
1312 .order_by(Statistics.start.desc())
1318 metadata_id=sum_statistic.id,
1319 start=last_statistic.start,
1320 last_reset=last_statistic.last_reset,
1321 state=last_statistic.state,
1322 sum=last_statistic.sum,
1329 """Version specific update method."""
1336 """Version specific update method."""
1342 class _SchemaVersion25Migrator(_SchemaVersionMigrator, target_version=25):
1344 """Version specific update method."""
1348 [f
"attributes_id {self.column_types.big_int_type}"],
1355 """Version specific update method."""
1361 """Version specific update method."""
1363 self.
session_makersession_maker,
"events", [f
"data_id {self.column_types.big_int_type}"]
1370 """Version specific update method."""
1379 "origin_idx INTEGER",
1380 "context_id VARCHAR(36)",
1381 "context_user_id VARCHAR(36)",
1382 "context_parent_id VARCHAR(36)",
1392 """Version specific update method."""
1395 self.
session_makersession_maker,
"statistics_meta",
"ix_statistics_meta_statistic_id"
1397 if self.
engineengine.dialect.name == SupportedDialect.MYSQL:
1401 contextlib.suppress(SQLAlchemyError),
1404 connection = session.connection()
1408 text(
"ALTER TABLE statistics_meta ROW_FORMAT=DYNAMIC")
1412 self.
session_makersession_maker,
"statistics_meta",
"ix_statistics_meta_statistic_id"
1414 except DatabaseError:
1420 self.
session_makersession_maker,
"statistics_meta",
"ix_statistics_meta_statistic_id"
1426 """Version specific update method."""
1434 class _SchemaVersion31Migrator(_SchemaVersionMigrator, target_version=31):
1436 """Version specific update method."""
1444 [f
"time_fired_ts {self.column_types.timestamp_type}"],
1450 f
"last_updated_ts {self.column_types.timestamp_type}",
1451 f
"last_changed_ts {self.column_types.timestamp_type}",
1456 self.
session_makersession_maker,
"events",
"ix_events_event_type_time_fired_ts"
1459 self.
session_makersession_maker,
"states",
"ix_states_entity_id_last_updated_ts"
1467 """Version specific update method."""
1478 assert self.
instanceinstance.engine
is not None,
"engine should never be None"
1484 """Version specific update method."""
1491 class _SchemaVersion34Migrator(_SchemaVersionMigrator, target_version=34):
1493 """Version specific update method."""
1505 f
"created_ts {self.column_types.timestamp_type}",
1506 f
"start_ts {self.column_types.timestamp_type}",
1507 f
"last_reset_ts {self.column_types.timestamp_type}",
1512 "statistics_short_term",
1514 f
"created_ts {self.column_types.timestamp_type}",
1515 f
"start_ts {self.column_types.timestamp_type}",
1516 f
"last_reset_ts {self.column_types.timestamp_type}",
1521 self.
session_makersession_maker,
"statistics",
"ix_statistics_statistic_id_start_ts"
1525 "statistics_short_term",
1526 "ix_statistics_short_term_start_ts",
1530 "statistics_short_term",
1531 "ix_statistics_short_term_statistic_id_start_ts",
1540 """Version specific update method."""
1546 "ix_statistics_statistic_id_start",
1551 "statistics_short_term",
1552 "ix_statistics_short_term_statistic_id_start",
1567 """Version specific update method."""
1568 for table
in (
"states",
"events"):
1573 f
"context_id_bin {self.column_types.context_bin_type}",
1574 f
"context_user_id_bin {self.column_types.context_bin_type}",
1575 f
"context_parent_id_bin {self.column_types.context_bin_type}",
1584 """Version specific update method."""
1588 [f
"event_type_id {self.column_types.big_int_type}"],
1593 self.
session_makersession_maker,
"events",
"ix_events_event_type_id_time_fired_ts"
1599 """Version specific update method."""
1603 [f
"metadata_id {self.column_types.big_int_type}"],
1607 self.
session_makersession_maker,
"states",
"ix_states_metadata_id_last_updated_ts"
1613 """Version specific update method."""
1619 "ix_events_event_type_time_fired_ts",
1624 self.
session_makersession_maker,
"events",
"ix_events_event_type_time_fired", quiet=
True
1628 self.
session_makersession_maker,
"events",
"ix_events_context_user_id", quiet=
True
1631 self.
session_makersession_maker,
"events",
"ix_events_context_parent_id", quiet=
True
1634 self.
session_makersession_maker,
"states",
"ix_states_entity_id_last_updated", quiet=
True
1639 self.
session_makersession_maker,
"states",
"ix_states_context_user_id", quiet=
True
1642 self.
session_makersession_maker,
"states",
"ix_states_context_parent_id", quiet=
True
1645 self.
session_makersession_maker,
"states",
"ix_states_created_domain", quiet=
True
1648 self.
session_makersession_maker,
"states",
"ix_states_entity_id_created", quiet=
True
1652 self.
session_makersession_maker,
"states",
"states__significant_changes", quiet=
True
1655 self.
session_makersession_maker,
"states",
"ix_states_entity_id_created", quiet=
True
1660 "ix_statistics_statistic_id_start",
1665 "statistics_short_term",
1666 "ix_statistics_short_term_statistic_id_start",
1673 """Version specific update method."""
1683 "statistics_short_term",
1684 "ix_statistics_short_term_metadata_id",
1690 """Version specific update method."""
1697 """Version specific update method."""
1710 """Version specific update method."""
1714 [f
"last_reported_ts {self.column_types.timestamp_type}"],
1720 """Version specific update method."""
1727 class _SchemaVersion45Migrator(_SchemaVersionMigrator, target_version=45):
1729 """Version specific update method."""
1739 (
"data_id",
"event_type_id"),
1741 (
"data_id",
"event_data",
"data_id"),
1742 (
"event_type_id",
"event_types",
"event_type_id"),
1747 (
"event_id",
"old_state_id",
"attributes_id",
"metadata_id"),
1749 (
"event_id",
None,
None),
1750 (
"old_state_id",
"states",
"state_id"),
1751 (
"attributes_id",
"state_attributes",
"attributes_id"),
1752 (
"metadata_id",
"states_meta",
"metadata_id"),
1758 ((
"metadata_id",
"statistics_meta",
"id"),),
1761 "statistics_short_term",
1763 ((
"metadata_id",
"statistics_meta",
"id"),),
1770 """Version specific update method."""
1772 if self.
engineengine.dialect.name == SupportedDialect.SQLITE:
1775 "NOT NULL AUTO_INCREMENT"
1776 if self.
engineengine.dialect.name == SupportedDialect.MYSQL
1780 for table, columns, _
in FOREIGN_COLUMNS:
1781 for column
in columns:
1787 for table, columns, _
in FOREIGN_COLUMNS:
1792 [f
"{column} {BIG_INTEGER_SQL}" for column
in columns],
1797 (
"events",
"event_id"),
1798 (
"event_data",
"data_id"),
1799 (
"event_types",
"event_type_id"),
1800 (
"states",
"state_id"),
1801 (
"state_attributes",
"attributes_id"),
1802 (
"states_meta",
"metadata_id"),
1803 (
"statistics",
"id"),
1804 (
"statistics_short_term",
"id"),
1805 (
"statistics_meta",
"id"),
1806 (
"recorder_runs",
"run_id"),
1807 (
"schema_changes",
"change_id"),
1808 (
"statistics_runs",
"run_id"),
1810 for table, column
in id_columns:
1815 [f
"{column} {BIG_INTEGER_SQL} {identity_sql}"],
1821 """Version specific update method."""
1823 if self.
engineengine.dialect.name == SupportedDialect.SQLITE:
1831 (table, column, foreign_table, foreign_column)
1832 for table, _, foreign_mappings
in FOREIGN_COLUMNS
1833 for column, foreign_table, foreign_column
in foreign_mappings
1839 hass: HomeAssistant,
1841 session_maker: Callable[[], Session],
1844 """Migrate statistics columns to timestamp or cleanup duplicates."""
1847 except IntegrityError
as ex:
1849 "Statistics table contains duplicate entries: %s; "
1850 "Cleaning up duplicates and trying again; %s",
1852 MIGRATION_NOTE_WHILE,
1860 except IntegrityError:
1862 "Statistics table still contains duplicate entries after cleanup; "
1863 "Falling back to a one by one migration"
1869 "Statistics migration successfully recovered after statistics table duplicate cleanup"
1875 session_maker: Callable[[], Session],
1877 """Correct issues detected by validate_db_schema."""
1880 "Updating character set and collation of table %s to utf8mb4. %s",
1882 MIGRATION_NOTE_MINUTES,
1885 contextlib.suppress(SQLAlchemyError),
1888 connection = session.connection()
1893 f
"ALTER TABLE {table} CONVERT TO CHARACTER SET "
1894 f
"{MYSQL_DEFAULT_CHARSET} "
1895 f
"COLLATE {MYSQL_COLLATE}, LOCK=EXCLUSIVE"
1900 @database_job_retry_wrapper("Wipe old string time columns", 3)
1902 instance: Recorder, engine: Engine, session: Session
1904 """Wipe old string time columns to save space."""
1909 if engine.dialect.name == SupportedDialect.SQLITE:
1910 session.execute(text(
"UPDATE events set time_fired=NULL;"))
1912 session.execute(text(
"UPDATE states set last_updated=NULL, last_changed=NULL;"))
1914 elif engine.dialect.name == SupportedDialect.MYSQL:
1921 session.execute(text(
"UPDATE events set time_fired=NULL LIMIT 100000;"))
1925 "UPDATE states set last_updated=NULL, last_changed=NULL "
1930 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
1939 "UPDATE events set time_fired=NULL "
1940 "where event_id in "
1941 "(select event_id from events where time_fired_ts is NOT NULL LIMIT 100000);"
1947 "UPDATE states set last_updated=NULL, last_changed=NULL "
1948 "where state_id in "
1949 "(select state_id from states where last_updated_ts is NOT NULL LIMIT 100000);"
1955 @database_job_retry_wrapper("Migrate columns to timestamp", 3)
1957 instance: Recorder, session_maker: Callable[[], Session], engine: Engine
1959 """Migrate columns to use timestamp."""
1963 result: CursorResult |
None =
None
1964 if engine.dialect.name == SupportedDialect.SQLITE:
1967 connection = session.connection()
1970 'UPDATE events set time_fired_ts=strftime("%s",time_fired) + '
1971 "cast(substr(time_fired,-7) AS FLOAT);"
1976 'UPDATE states set last_updated_ts=strftime("%s",last_updated) + '
1977 "cast(substr(last_updated,-7) AS FLOAT), "
1978 'last_changed_ts=strftime("%s",last_changed) + '
1979 "cast(substr(last_changed,-7) AS FLOAT);"
1982 elif engine.dialect.name == SupportedDialect.MYSQL:
1986 while result
is None or result.rowcount > 0:
1988 result = session.connection().
execute(
1990 "UPDATE events set time_fired_ts="
1991 "IF(time_fired is NULL or UNIX_TIMESTAMP(time_fired) is NULL,0,"
1992 "UNIX_TIMESTAMP(time_fired)"
1994 "where time_fired_ts is NULL "
1999 while result
is None or result.rowcount > 0:
2001 result = session.connection().
execute(
2003 "UPDATE states set last_updated_ts="
2004 "IF(last_updated is NULL or UNIX_TIMESTAMP(last_updated) is NULL,0,"
2005 "UNIX_TIMESTAMP(last_updated) "
2008 "UNIX_TIMESTAMP(last_changed) "
2009 "where last_updated_ts is NULL "
2013 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2017 while result
is None or result.rowcount > 0:
2019 result = session.connection().
execute(
2021 "UPDATE events SET "
2023 "(case when time_fired is NULL then 0 else EXTRACT(EPOCH FROM time_fired::timestamptz) end) "
2024 "WHERE event_id IN ( "
2025 "SELECT event_id FROM events where time_fired_ts is NULL LIMIT 100000 "
2030 while result
is None or result.rowcount > 0:
2032 result = session.connection().
execute(
2034 "UPDATE states set last_updated_ts="
2035 "(case when last_updated is NULL then 0 else EXTRACT(EPOCH FROM last_updated::timestamptz) end), "
2036 "last_changed_ts=EXTRACT(EPOCH FROM last_changed::timestamptz) "
2037 "where state_id IN ( "
2038 "SELECT state_id FROM states where last_updated_ts is NULL LIMIT 100000 "
2044 @database_job_retry_wrapper("Migrate statistics columns to timestamp one by one", 3)
2046 instance: Recorder, session_maker: Callable[[], Session]
2048 """Migrate statistics columns to use timestamp on by one.
2050 If something manually inserted data into the statistics table
2051 in the past it may have inserted duplicate rows.
2053 Before we had the unique index on (statistic_id, start) this
2054 the data could have been inserted without any errors and we
2055 could end up with duplicate rows that go undetected (even by
2056 our current duplicate cleanup code) until we try to migrate the
2057 data to use timestamps.
2059 This will migrate the data one by one to ensure we do not hit any
2060 duplicate rows, and remove the duplicate rows as they are found.
2062 for find_func, migrate_func, delete_func
in (
2064 find_unmigrated_statistics_rows,
2065 migrate_single_statistics_row_to_timestamp,
2066 delete_duplicate_statistics_row,
2069 find_unmigrated_short_term_statistics_rows,
2070 migrate_single_short_term_statistics_row_to_timestamp,
2071 delete_duplicate_short_term_statistics_row,
2075 while stats := session.execute(find_func(instance.max_bind_vars)).all():
2076 for statistic_id, start, created, last_reset
in stats:
2087 statistic_id, start_ts, created_ts, last_reset_ts
2090 except IntegrityError:
2093 session.execute(delete_func(statistic_id))
2097 @database_job_retry_wrapper("Migrate statistics columns to timestamp", 3)
2099 instance: Recorder, session_maker: Callable[[], Session], engine: Engine
2101 """Migrate statistics columns to use timestamp."""
2108 result: CursorResult |
None =
None
2109 if engine.dialect.name == SupportedDialect.SQLITE:
2111 for table
in STATISTICS_TABLES:
2115 f
"UPDATE {table} set start_ts=strftime('%s',start) + "
2116 "cast(substr(start,-7) AS FLOAT), "
2117 f
"created_ts=strftime('%s',created) + "
2118 "cast(substr(created,-7) AS FLOAT), "
2119 f
"last_reset_ts=strftime('%s',last_reset) + "
2120 "cast(substr(last_reset,-7) AS FLOAT) where start_ts is NULL;"
2123 elif engine.dialect.name == SupportedDialect.MYSQL:
2127 for table
in STATISTICS_TABLES:
2129 while result
is None or result.rowcount > 0:
2131 result = session.connection().
execute(
2133 f
"UPDATE {table} set start_ts="
2134 "IF(start is NULL or UNIX_TIMESTAMP(start) is NULL,0,"
2135 "UNIX_TIMESTAMP(start) "
2138 "UNIX_TIMESTAMP(created), "
2140 "UNIX_TIMESTAMP(last_reset) "
2141 "where start_ts is NULL "
2145 elif engine.dialect.name == SupportedDialect.POSTGRESQL:
2149 for table
in STATISTICS_TABLES:
2151 while result
is None or result.rowcount > 0:
2153 result = session.connection().
execute(
2155 f
"UPDATE {table} set start_ts="
2156 "(case when start is NULL then 0 else EXTRACT(EPOCH FROM start::timestamptz) end), "
2157 "created_ts=EXTRACT(EPOCH FROM created::timestamptz), "
2158 "last_reset_ts=EXTRACT(EPOCH FROM last_reset::timestamptz) "
2160 f
"SELECT id FROM {table} where start_ts is NULL LIMIT 100000"
2167 """Convert a context_id to bytes."""
2168 if context_id
is None:
2170 with contextlib.suppress(ValueError):
2176 if len(context_id) == 26:
2177 return ulid_to_bytes(context_id)
2178 return UUID(context_id).bytes
2183 """Generate a ulid with a specific timestamp."""
2184 return ulid_to_bytes(ulid_at_time(timestamp
or time()))
2188 """Remove old entity_id strings from states.
2190 We cannot do this in migrate_entity_ids since the history queries
2191 still need to work while the migration is in progress.
2193 session_maker = instance.get_session
2194 _LOGGER.debug(
"Cleanup legacy entity_ids")
2197 is_done =
not cursor_result
or cursor_result.rowcount == 0
2201 _LOGGER.debug(
"Cleanup legacy entity_ids done=%s", is_done)
2206 """Initialize a new database.
2208 The function determines the schema version by inspecting the db structure.
2210 When the schema version is not present in the db, either db was just
2211 created with the correct schema, or this is a db created before schema
2212 versions were tracked. For now, we'll test if the changes for schema
2213 version 1 are present to make the determination. Eventually this logic
2214 can be removed and we can assume a new db is being created.
2216 inspector = sqlalchemy.inspect(session.connection())
2217 indexes = inspector.get_indexes(
"events")
2219 for index
in indexes:
2220 if index[
"column_names"]
in ([
"time_fired"], [
"time_fired_ts"]):
2228 session.add(current_version)
2233 """Initialize a new database."""
2235 with session_scope(session=session_maker(), read_only=
True)
as session:
2243 _LOGGER.exception(
"Error when initialise database")
2247 @dataclass(slots=True)
2249 """Base class for migration tasks."""
2251 migrator: BaseRunTimeMigration
2252 commit_before =
False
2254 def run(self, instance: Recorder) ->
None:
2255 """Run migration task."""
2256 if not self.migrator.migrate_data(instance):
2261 @dataclass(slots=True)
2263 """Base class for migration tasks which commit first."""
2265 commit_before =
True
2268 @dataclass(frozen=True, kw_only=True)
2270 """Container for data migrator status."""
2273 migration_done: bool
2277 """Base class for migrations."""
2279 index_to_drop: tuple[str, str] |
None =
None
2280 required_schema_version = 0
2281 migration_version = 1
2284 def __init__(self, schema_version: int, migration_changes: dict[str, int]) ->
None:
2285 """Initialize a new BaseRunTimeMigration."""
2291 """Migrate some data, return True if migration is completed."""
2294 """Migrate some data, returns True if migration is completed."""
2296 if status.migration_done:
2297 if self.index_to_drop
is not None:
2298 table, index = self.index_to_drop
2300 with session_scope(session=instance.get_session())
as session:
2303 return not status.needs_migrate
2307 """Migrate some data, return if the migration needs to run and if it is done."""
2310 """Will be called after migrate returns True or if migration is not needed."""
2314 self, instance: Recorder, session: Session
2315 ) -> DataMigrationStatus:
2316 """Return if the migration needs to run and if it is done."""
2319 """Return if the migration needs to run.
2321 If the migration needs to run, it will return True.
2323 If the migration does not need to run, it will return False and
2324 mark the migration as done in the database if its not already
2337 self.index_to_drop
is not None
2343 if needs_migrate.migration_done:
2345 return needs_migrate.needs_migrate
2349 """Base class for off line migrations."""
2352 self, instance: Recorder, session_maker: Callable[[], Session]
2354 """Migrate all data."""
2362 @database_job_retry_wrapper_method("migrate data", 10)
2364 """Migrate some data, returns True if migration is completed."""
2369 """Base class for run time migrations."""
2371 task = MigrationTask
2374 """Start migration if needed."""
2376 instance.queue_task(self.
tasktask(self))
2380 @retryable_database_job_method("migrate data")
2382 """Migrate some data, returns True if migration is completed."""
2387 """Base class for run time migrations."""
2391 """Return the query to check if the migration needs to run."""
2394 self, instance: Recorder, session: Session
2395 ) -> DataMigrationStatus:
2396 """Return if the migration needs to run."""
2399 needs_migrate=bool(needs_migrate), migration_done=
not needs_migrate
2404 """Migration to migrate states context_ids to binary format."""
2406 required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
2407 migration_id =
"state_context_id_as_binary"
2408 migration_version = 2
2409 index_to_drop = (
"states",
"ix_states_context_id")
2412 """Migrate states context_ids to use binary format, return True if completed."""
2413 _to_bytes = _context_id_to_bytes
2414 session_maker = instance.get_session
2415 _LOGGER.debug(
"Migrating states context_ids to binary format")
2417 if states := session.execute(
2424 "state_id": state_id,
2426 "context_id_bin": _to_bytes(context_id)
2428 "context_user_id":
None,
2429 "context_user_id_bin": _to_bytes(context_user_id),
2430 "context_parent_id":
None,
2431 "context_parent_id_bin": _to_bytes(context_parent_id),
2433 for state_id, last_updated_ts, context_id, context_user_id, context_parent_id
in states
2436 is_done =
not states
2438 _LOGGER.debug(
"Migrating states context_ids to binary format: done=%s", is_done)
2442 """Return the query to check if the migration needs to run."""
2447 """Migration to migrate events context_ids to binary format."""
2449 required_schema_version = CONTEXT_ID_AS_BINARY_SCHEMA_VERSION
2450 migration_id =
"event_context_id_as_binary"
2451 migration_version = 2
2452 index_to_drop = (
"events",
"ix_events_context_id")
2455 """Migrate events context_ids to use binary format, return True if completed."""
2456 _to_bytes = _context_id_to_bytes
2457 session_maker = instance.get_session
2458 _LOGGER.debug(
"Migrating context_ids to binary format")
2460 if events := session.execute(
2467 "event_id": event_id,
2469 "context_id_bin": _to_bytes(context_id)
2471 "context_user_id":
None,
2472 "context_user_id_bin": _to_bytes(context_user_id),
2473 "context_parent_id":
None,
2474 "context_parent_id_bin": _to_bytes(context_parent_id),
2476 for event_id, time_fired_ts, context_id, context_user_id, context_parent_id
in events
2479 is_done =
not events
2481 _LOGGER.debug(
"Migrating events context_ids to binary format: done=%s", is_done)
2485 """Return the query to check if the migration needs to run."""
2490 """Migration to migrate event_type to event_type_ids."""
2492 required_schema_version = EVENT_TYPE_IDS_SCHEMA_VERSION
2493 migration_id =
"event_type_id_migration"
2494 task = CommitBeforeMigrationTask
2500 """Migrate event_type to event_type_ids, return True if completed."""
2501 session_maker = instance.get_session
2502 _LOGGER.debug(
"Migrating event_types")
2503 event_type_manager = instance.event_type_manager
2505 if events := session.execute(
2508 event_types = {event_type
for _, event_type
in events}
2509 if None in event_types:
2512 event_types.remove(
None)
2513 event_types.add(_EMPTY_EVENT_TYPE)
2515 event_type_to_id = event_type_manager.get_many(event_types, session)
2516 if missing_event_types := {
2518 for event_type, event_id
in event_type_to_id.items()
2521 missing_db_event_types = [
2523 for event_type
in missing_event_types
2525 session.add_all(missing_db_event_types)
2527 for db_event_type
in missing_db_event_types:
2531 db_event_type.event_type
is not None
2532 ),
"event_type should never be None"
2533 event_type_to_id[db_event_type.event_type] = (
2534 db_event_type.event_type_id
2536 event_type_manager.clear_non_existent(db_event_type.event_type)
2542 "event_id": event_id,
2544 "event_type_id": event_type_to_id[
2545 _EMPTY_EVENT_TYPE
if event_type
is None else event_type
2548 for event_id, event_type
in events
2552 is_done =
not events
2554 _LOGGER.debug(
"Migrating event_types done=%s", is_done)
2558 """Will be called after migrate returns True."""
2559 _LOGGER.debug(
"Activating event_types manager as all data is migrated")
2560 instance.event_type_manager.active =
True
2563 """Check if the data is migrated."""
2568 """Migration to migrate entity_ids to states_meta."""
2570 required_schema_version = STATES_META_SCHEMA_VERSION
2571 migration_id =
"entity_id_migration"
2572 task = CommitBeforeMigrationTask
2578 """Migrate entity_ids to states_meta, return True if completed.
2580 We do this in two steps because we need the history queries to work
2581 while we are migrating.
2583 1. Link the states to the states_meta table
2584 2. Remove the entity_id column from the states table (in post_migrate_entity_ids)
2586 _LOGGER.debug(
"Migrating entity_ids")
2587 states_meta_manager = instance.states_meta_manager
2588 with session_scope(session=instance.get_session())
as session:
2589 if states := session.execute(
2592 entity_ids = {entity_id
for _, entity_id
in states}
2593 if None in entity_ids:
2596 entity_ids.remove(
None)
2597 entity_ids.add(_EMPTY_ENTITY_ID)
2599 entity_id_to_metadata_id = states_meta_manager.get_many(
2600 entity_ids, session,
True
2602 if missing_entity_ids := {
2604 for entity_id, metadata_id
in entity_id_to_metadata_id.items()
2605 if metadata_id
is None
2607 missing_states_metadata = [
2609 for entity_id
in missing_entity_ids
2611 session.add_all(missing_states_metadata)
2613 for db_states_metadata
in missing_states_metadata:
2617 db_states_metadata.entity_id
is not None
2618 ),
"entity_id should never be None"
2619 entity_id_to_metadata_id[db_states_metadata.entity_id] = (
2620 db_states_metadata.metadata_id
2627 "state_id": state_id,
2632 "metadata_id": entity_id_to_metadata_id[
2633 _EMPTY_ENTITY_ID
if entity_id
is None else entity_id
2636 for state_id, entity_id
in states
2640 is_done =
not states
2642 _LOGGER.debug(
"Migrating entity_ids done=%s", is_done)
2646 """Will be called after migrate returns True."""
2651 _LOGGER.debug(
"Activating states_meta manager as all data is migrated")
2652 instance.states_meta_manager.active =
True
2653 with contextlib.suppress(SQLAlchemyError):
2655 migrate.queue_migration(instance, session)
2658 """Check if the data is migrated."""
2663 """Migration to remove old event_id index from states."""
2665 migration_id =
"event_id_post_migration"
2666 task = MigrationTask
2667 migration_version = 2
2670 """Remove old event_id index from states, returns True if completed.
2672 We used to link states to events using the event_id column but we no
2673 longer store state changed events in the events table.
2675 If all old states have been purged and existing states are in the new
2676 format we can drop the index since it can take up ~10MB per 1M rows.
2678 session_maker = instance.get_session
2679 _LOGGER.debug(
"Cleanup legacy entity_ids")
2685 all_gone =
not result
2687 fk_remove_ok =
False
2691 assert instance.engine
is not None,
"engine should never be None"
2692 if instance.dialect_name == SupportedDialect.SQLITE:
2696 session_maker, instance.engine, States
2701 session_maker, instance.engine, TABLE_STATES,
"event_id"
2703 except (InternalError, OperationalError):
2704 fk_remove_ok =
False
2708 _drop_index(session_maker,
"states", LEGACY_STATES_EVENT_ID_INDEX)
2709 instance.use_legacy_events_index =
False
2715 """Check if the legacy event_id foreign key exists."""
2716 engine = instance.engine
2717 assert engine
is not None
2718 inspector = sqlalchemy.inspect(engine)
2723 for fk
in inspector.get_foreign_keys(TABLE_STATES)
2724 if fk[
"constrained_columns"] == [
"event_id"]
2731 self, instance: Recorder, session: Session
2732 ) -> DataMigrationStatus:
2733 """Return if the migration needs to run."""
2734 if self.
schema_versionschema_version <= LEGACY_STATES_EVENT_ID_INDEX_SCHEMA_VERSION:
2737 session, TABLE_STATES, LEGACY_STATES_EVENT_ID_INDEX
2739 instance.use_legacy_events_index =
True
2745 """Migration to remove old entity_id strings from states."""
2747 migration_id =
"entity_id_post_migration"
2748 task = MigrationTask
2749 index_to_drop = (TABLE_STATES, LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX)
2752 """Migrate some data, returns True if migration is completed."""
2757 """Check if the data is migrated."""
2761 NON_LIVE_DATA_MIGRATORS = (
2762 StatesContextIDMigration,
2763 EventsContextIDMigration,
2766 LIVE_DATA_MIGRATORS = (
2767 EventTypeIDMigration,
2769 EventIDPostMigration,
2774 """Mark a migration as done in the database."""
2777 migration_id=migration.migration_id, version=migration.migration_version
2783 session_maker: Callable[[], Session], engine: Engine, table: type[Base]
2785 """Rebuild an SQLite table.
2787 This must only be called after all migrations are complete
2788 and the database is in a consistent state.
2790 If the table is not migrated to the current schema this
2793 table_table = cast(Table, table.__table__)
2794 orig_name = table_table.name
2795 temp_name = f
"{table_table.name}_temp_{int(time())}"
2797 _LOGGER.warning(
"Rebuilding SQLite table %s; %s", orig_name, MIGRATION_NOTE_WHILE)
2804 session.connection().
execute(text(
"PRAGMA foreign_keys=OFF"))
2808 new_sql =
str(CreateTable(table_table).compile(engine)).strip(
"\n") +
";"
2809 source_sql = f
"CREATE TABLE {orig_name}"
2810 replacement_sql = f
"CREATE TABLE {temp_name}"
2811 assert source_sql
in new_sql, f
"{source_sql} should be in new_sql"
2812 new_sql = new_sql.replace(source_sql, replacement_sql)
2814 session.execute(text(new_sql))
2815 column_names =
",".join([column.name
for column
in table_table.columns])
2817 sql = f
"INSERT INTO {temp_name} SELECT {column_names} FROM {orig_name};"
2818 session.execute(text(sql))
2820 session.execute(text(f
"DROP TABLE {orig_name}"))
2822 session.execute(text(f
"ALTER TABLE {temp_name} RENAME TO {orig_name}"))
2824 for index
in table_table.indexes:
2825 index.create(session.connection())
2828 session.execute(text(
"PRAGMA foreign_key_check"))
2831 except SQLAlchemyError:
2832 _LOGGER.exception(
"Error recreating SQLite table %s", table_table.name)
2838 _LOGGER.warning(
"Rebuilding SQLite table %s finished", orig_name)
2843 session.connection().
execute(text(
"PRAGMA foreign_keys=ON"))
StatementLambdaElement needs_migrate_query(self)
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
int required_schema_version
None __init__(self, int schema_version, dict[str, int] migration_changes)
bool migrate_data(self, Recorder instance)
bool needs_migrate(self, Recorder instance, Session session)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
None migration_done(self, Recorder instance, Session session)
bool _migrate_data(self, Recorder instance)
bool migrate_data(self, Recorder instance)
None migrate_all(self, Recorder instance, Callable[[], Session] session_maker)
bool migrate_data(self, Recorder instance)
None queue_migration(self, Recorder instance, Session session)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
StatementLambdaElement needs_migrate_query(self)
None migration_done(self, Recorder instance, Session session)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
StatementLambdaElement needs_migrate_query(self)
DataMigrationStatus needs_migrate_impl(self, Recorder instance, Session session)
bool _legacy_event_id_foreign_key_exists(Recorder instance)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
StatementLambdaElement needs_migrate_query(self)
None migration_done(self, Recorder instance, Session session)
StatementLambdaElement needs_migrate_query(self)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
None run(self, Recorder instance)
StatementLambdaElement needs_migrate_query(self)
DataMigrationStatus migrate_data_impl(self, Recorder instance)
None __init_subclass__(cls, int target_version, **Any kwargs)
None __init__(self, Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, int old_version)
type[_SchemaVersionMigrator] get_migrator(cls, int target_version)
web.Response get(self, web.Request request, str config_key)
IssData update(pyiss.ISS iss)
None create(HomeAssistant hass, str message, str|None title=None, str|None notification_id=None)
def execute(hass, filename, source, data=None, return_response=False)
None delete_statistics_duplicates(Recorder instance, HomeAssistant hass, Session session)
None delete_statistics_meta_duplicates(Recorder instance, Session session)
int|None get_schema_version(Callable[[], Session] session_maker)
None _apply_update(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, int new_version, int old_version)
None _add_constraint(Callable[[], Session] session_maker, AddConstraint add_constraint, str table, str column)
None _modify_columns(Callable[[], Session] session_maker, Engine engine, str table_name, list[str] columns_def)
bytes|None _context_id_to_bytes(str|None context_id)
None _create_index(Callable[[], Session] session_maker, str table_name, str index_name)
None raise_if_exception_missing_str(Exception ex, Iterable[str] match_substrs)
None _add_columns(Callable[[], Session] session_maker, str table_name, list[str] columns_def)
bool rebuild_sqlite_table(Callable[[], Session] session_maker, Engine engine, type[Base] table)
bytes _generate_ulid_bytes_at_time(float|None timestamp)
None _migrate_statistics_columns_to_timestamp(Recorder instance, Callable[[], Session] session_maker, Engine engine)
None _correct_table_character_set_and_collation(str table, Callable[[], Session] session_maker)
bool non_live_data_migration_needed(Recorder instance, Callable[[], Session] session_maker, int schema_version)
SchemaValidationStatus migrate_schema_live(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
None _drop_foreign_key_constraints(Callable[[], Session] session_maker, Engine engine, str table, str column)
set[str] _find_schema_errors(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker)
None _delete_foreign_key_violations(Callable[[], Session] session_maker, Engine engine, str table, str column, str foreign_table, str foreign_column)
bool _schema_is_current(int current_version)
None _mark_migration_done(Session session, type[BaseMigration] migration)
bool _execute_or_collect_error(Callable[[], Session] session_maker, str query, list[str] errors)
None _wipe_old_string_time_columns(Recorder instance, Engine engine, Session session)
SchemaValidationStatus _migrate_schema(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status, int end_version)
None _drop_index(Callable[[], Session] session_maker, str table_name, str index_name, bool|None quiet=None)
None _migrate_statistics_columns_to_timestamp_one_by_one(Recorder instance, Callable[[], Session] session_maker)
bool post_migrate_entity_ids(Recorder instance)
dict[str, int] _get_migration_changes(Session session)
None _migrate_statistics_columns_to_timestamp_removing_duplicates(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker, Engine engine)
None pre_migrate_schema(Engine engine)
bool _initialize_database(Session session)
None _restore_foreign_key_constraints(Callable[[], Session] session_maker, Engine engine, list[tuple[str, str, str|None, str|None]] foreign_columns)
int|None _get_schema_version(Session session)
bool initialize_database(Callable[[], Session] session_maker)
None migrate_data_live(Recorder instance, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
SchemaValidationStatus|None validate_db_schema(HomeAssistant hass, Recorder instance, Callable[[], Session] session_maker)
None migrate_data_non_live(Recorder instance, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
None _update_states_table_with_foreign_key_options(Callable[[], Session] session_maker, Engine engine)
None _migrate_columns_to_timestamp(Recorder instance, Callable[[], Session] session_maker, Engine engine)
bool live_migration(SchemaValidationStatus schema_status)
SchemaValidationStatus migrate_schema_non_live(Recorder instance, HomeAssistant hass, Engine engine, Callable[[], Session] session_maker, SchemaValidationStatus schema_status)
None process_timestamp(None ts)
float|None datetime_to_timestamp_or_none(datetime|None dt)
StatementLambdaElement batch_cleanup_entity_ids()
StatementLambdaElement has_used_states_entity_ids()
StatementLambdaElement has_states_context_ids_to_migrate()
StatementLambdaElement has_used_states_event_ids()
StatementLambdaElement has_events_context_ids_to_migrate()
StatementLambdaElement find_states_context_ids_to_migrate(int max_bind_vars)
StatementLambdaElement find_entity_ids_to_migrate(int max_bind_vars)
StatementLambdaElement find_events_context_ids_to_migrate(int max_bind_vars)
StatementLambdaElement has_entity_ids_to_migrate()
StatementLambdaElement find_event_type_to_migrate(int max_bind_vars)
StatementLambdaElement has_event_type_to_migrate()
StatementLambdaElement get_migration_changes()
datetime get_start_time()
bool cleanup_statistics_timestamp_migration(Recorder instance)
str|None get_index_by_name(Session session, str table_name, str index_name)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
bool time(HomeAssistant hass, dt_time|str|None before=None, dt_time|str|None after=None, str|Container[str]|None weekday=None)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)