1 """Statistics duplication repairs."""
3 from __future__
import annotations
8 from typing
import TYPE_CHECKING
10 from sqlalchemy
import func
11 from sqlalchemy.engine.row
import Row
12 from sqlalchemy.orm.session
import Session
13 from sqlalchemy.sql.expression
import literal_column
20 from ...const
import SQLITE_MAX_BIND_VARS
21 from ...db_schema
import Statistics, StatisticsBase, StatisticsMeta, StatisticsShortTerm
22 from ...util
import database_job_retry_wrapper, execute
25 from ...
import Recorder
27 _LOGGER = logging.getLogger(__name__)
31 session: Session, table: type[StatisticsBase]
32 ) -> tuple[list[int], list[dict]]:
33 """Find duplicated statistics."""
38 literal_column(
"1").label(
"is_duplicate"),
40 .group_by(table.metadata_id, table.start)
41 .having(func.count() > 1)
59 (subquery.c.metadata_id == table.metadata_id)
60 & (subquery.c.start == table.start),
62 .filter(subquery.c.is_duplicate == 1)
63 .order_by(table.metadata_id, table.start, table.id.desc())
64 .limit(1000 * SQLITE_MAX_BIND_VARS)
70 duplicate_ids: list[int] = []
71 non_identical_duplicates_as_dict: list[dict] = []
74 return (duplicate_ids, non_identical_duplicates_as_dict)
76 def columns_to_dict(duplicate: Row) -> dict:
77 """Convert a SQLAlchemy row to dict."""
91 dict_[key] = getattr(duplicate, key)
94 def compare_statistic_rows(row1: dict, row2: dict) -> bool:
95 """Compare two statistics rows, ignoring id and created."""
96 ignore_keys = {
"id",
"created"}
97 keys1 = set(row1).difference(ignore_keys)
98 keys2 = set(row2).difference(ignore_keys)
99 return keys1 == keys2
and all(row1[k] == row2[k]
for k
in keys1)
101 for duplicate
in duplicates:
102 if start != duplicate.start
or metadata_id != duplicate.metadata_id:
103 original_as_dict = columns_to_dict(duplicate)
104 start = duplicate.start
105 metadata_id = duplicate.metadata_id
107 duplicate_as_dict = columns_to_dict(duplicate)
108 duplicate_ids.append(duplicate.id)
109 if not compare_statistic_rows(original_as_dict, duplicate_as_dict):
110 non_identical_duplicates_as_dict.append(
111 {
"duplicate": duplicate_as_dict,
"original": original_as_dict}
114 return (duplicate_ids, non_identical_duplicates_as_dict)
118 session: Session, table: type[StatisticsBase]
119 ) -> tuple[int, list[dict]]:
120 """Identify and delete duplicated statistics from a specified table."""
121 all_non_identical_duplicates: list[dict] = []
122 total_deleted_rows = 0
125 if not duplicate_ids:
127 all_non_identical_duplicates.extend(non_identical_duplicates)
128 for i
in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
131 .filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]))
132 .
delete(synchronize_session=
False)
134 total_deleted_rows += deleted_rows
135 return (total_deleted_rows, all_non_identical_duplicates)
138 @database_job_retry_wrapper("delete statistics duplicates", 3)
140 instance: Recorder, hass: HomeAssistant, session: Session
142 """Identify and delete duplicated statistics.
144 A backup will be made of duplicated statistics before it is deleted.
149 if deleted_statistics_rows:
150 _LOGGER.info(
"Deleted %s duplicated statistics rows", deleted_statistics_rows)
152 if non_identical_duplicates:
153 isotime = dt_util.utcnow().isoformat()
154 backup_file_name = f
"deleted_statistics.{isotime}.json"
155 backup_path = hass.config.path(STORAGE_DIR, backup_file_name)
157 os.makedirs(os.path.dirname(backup_path), exist_ok=
True)
158 with open(backup_path,
"w", encoding=
"utf8")
as backup_file:
160 non_identical_duplicates,
168 "Deleted %s non identical duplicated %s rows, a backup of the deleted"
169 " rows has been saved to %s"
171 len(non_identical_duplicates),
172 Statistics.__tablename__,
177 session, StatisticsShortTerm
179 if deleted_short_term_statistics_rows:
181 "Deleted duplicated short term statistic rows, please report at %s",
182 "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
187 """Find duplicated statistics_meta."""
193 StatisticsMeta.statistic_id,
194 literal_column(
"1").label(
"is_duplicate"),
196 .group_by(StatisticsMeta.statistic_id)
197 .having(func.count() > 1)
201 session.query(StatisticsMeta.statistic_id, StatisticsMeta.id)
204 (subquery.c.statistic_id == StatisticsMeta.statistic_id),
206 .filter(subquery.c.is_duplicate == 1)
207 .order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc())
208 .limit(1000 * SQLITE_MAX_BIND_VARS)
212 duplicate_ids: list[int] = []
217 for duplicate
in duplicates:
218 if statistic_id != duplicate.statistic_id:
219 statistic_id = duplicate.statistic_id
221 duplicate_ids.append(duplicate.id)
227 """Identify and delete duplicated statistics from a specified table."""
228 total_deleted_rows = 0
231 if not duplicate_ids:
233 for i
in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
235 session.query(StatisticsMeta)
237 StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])
239 .
delete(synchronize_session=
False)
241 total_deleted_rows += deleted_rows
242 return total_deleted_rows
245 @database_job_retry_wrapper("delete statistics meta duplicates", 3)
247 """Identify and delete duplicated statistics_meta.
249 This is used when migrating from schema version 28 to schema version 29.
250 Note: If this needs to be called during live schema migration it needs to
251 be modified to reload the statistics_meta_manager.
254 if deleted_statistics_rows:
256 "Deleted %s duplicated statistics_meta rows", deleted_statistics_rows
web.Response delete(self, web.Request request, str config_key)
None open(self, **Any kwargs)
def execute(hass, filename, source, data=None, return_response=False)
tuple[int, list[dict]] _delete_duplicates_from_table(Session session, type[StatisticsBase] table)
tuple[list[int], list[dict]] _find_duplicates(Session session, type[StatisticsBase] table)
list[int] _find_statistics_meta_duplicates(Session session)
None delete_statistics_duplicates(Recorder instance, HomeAssistant hass, Session session)
int _delete_statistics_meta_duplicates(Session session)
None delete_statistics_meta_duplicates(Recorder instance, Session session)