Home Assistant Unofficial Reference 2024.12.1
duplicates.py
Go to the documentation of this file.
1 """Statistics duplication repairs."""
2 
3 from __future__ import annotations
4 
5 import json
6 import logging
7 import os
8 from typing import TYPE_CHECKING
9 
10 from sqlalchemy import func
11 from sqlalchemy.engine.row import Row
12 from sqlalchemy.orm.session import Session
13 from sqlalchemy.sql.expression import literal_column
14 
15 from homeassistant.core import HomeAssistant
16 from homeassistant.helpers.json import JSONEncoder
17 from homeassistant.helpers.storage import STORAGE_DIR
18 from homeassistant.util import dt as dt_util
19 
20 from ...const import SQLITE_MAX_BIND_VARS
21 from ...db_schema import Statistics, StatisticsBase, StatisticsMeta, StatisticsShortTerm
22 from ...util import database_job_retry_wrapper, execute
23 
24 if TYPE_CHECKING:
25  from ... import Recorder
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 
31  session: Session, table: type[StatisticsBase]
32 ) -> tuple[list[int], list[dict]]:
33  """Find duplicated statistics."""
34  subquery = (
35  session.query(
36  table.start,
37  table.metadata_id,
38  literal_column("1").label("is_duplicate"),
39  )
40  .group_by(table.metadata_id, table.start)
41  .having(func.count() > 1)
42  .subquery()
43  )
44  query = (
45  session.query(
46  table.id,
47  table.metadata_id,
48  table.created,
49  table.start,
50  table.mean,
51  table.min,
52  table.max,
53  table.last_reset,
54  table.state,
55  table.sum,
56  )
57  .outerjoin(
58  subquery,
59  (subquery.c.metadata_id == table.metadata_id)
60  & (subquery.c.start == table.start),
61  )
62  .filter(subquery.c.is_duplicate == 1)
63  .order_by(table.metadata_id, table.start, table.id.desc())
64  .limit(1000 * SQLITE_MAX_BIND_VARS)
65  )
66  duplicates = execute(query)
67  original_as_dict = {}
68  start = None
69  metadata_id = None
70  duplicate_ids: list[int] = []
71  non_identical_duplicates_as_dict: list[dict] = []
72 
73  if not duplicates:
74  return (duplicate_ids, non_identical_duplicates_as_dict)
75 
76  def columns_to_dict(duplicate: Row) -> dict:
77  """Convert a SQLAlchemy row to dict."""
78  dict_ = {}
79  for key in (
80  "id",
81  "metadata_id",
82  "start",
83  "created",
84  "mean",
85  "min",
86  "max",
87  "last_reset",
88  "state",
89  "sum",
90  ):
91  dict_[key] = getattr(duplicate, key)
92  return dict_
93 
94  def compare_statistic_rows(row1: dict, row2: dict) -> bool:
95  """Compare two statistics rows, ignoring id and created."""
96  ignore_keys = {"id", "created"}
97  keys1 = set(row1).difference(ignore_keys)
98  keys2 = set(row2).difference(ignore_keys)
99  return keys1 == keys2 and all(row1[k] == row2[k] for k in keys1)
100 
101  for duplicate in duplicates:
102  if start != duplicate.start or metadata_id != duplicate.metadata_id:
103  original_as_dict = columns_to_dict(duplicate)
104  start = duplicate.start
105  metadata_id = duplicate.metadata_id
106  continue
107  duplicate_as_dict = columns_to_dict(duplicate)
108  duplicate_ids.append(duplicate.id)
109  if not compare_statistic_rows(original_as_dict, duplicate_as_dict):
110  non_identical_duplicates_as_dict.append(
111  {"duplicate": duplicate_as_dict, "original": original_as_dict}
112  )
113 
114  return (duplicate_ids, non_identical_duplicates_as_dict)
115 
116 
118  session: Session, table: type[StatisticsBase]
119 ) -> tuple[int, list[dict]]:
120  """Identify and delete duplicated statistics from a specified table."""
121  all_non_identical_duplicates: list[dict] = []
122  total_deleted_rows = 0
123  while True:
124  duplicate_ids, non_identical_duplicates = _find_duplicates(session, table)
125  if not duplicate_ids:
126  break
127  all_non_identical_duplicates.extend(non_identical_duplicates)
128  for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
129  deleted_rows = (
130  session.query(table)
131  .filter(table.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS]))
132  .delete(synchronize_session=False)
133  )
134  total_deleted_rows += deleted_rows
135  return (total_deleted_rows, all_non_identical_duplicates)
136 
137 
138 @database_job_retry_wrapper("delete statistics duplicates", 3)
140  instance: Recorder, hass: HomeAssistant, session: Session
141 ) -> None:
142  """Identify and delete duplicated statistics.
143 
144  A backup will be made of duplicated statistics before it is deleted.
145  """
146  deleted_statistics_rows, non_identical_duplicates = _delete_duplicates_from_table(
147  session, Statistics
148  )
149  if deleted_statistics_rows:
150  _LOGGER.info("Deleted %s duplicated statistics rows", deleted_statistics_rows)
151 
152  if non_identical_duplicates:
153  isotime = dt_util.utcnow().isoformat()
154  backup_file_name = f"deleted_statistics.{isotime}.json"
155  backup_path = hass.config.path(STORAGE_DIR, backup_file_name)
156 
157  os.makedirs(os.path.dirname(backup_path), exist_ok=True)
158  with open(backup_path, "w", encoding="utf8") as backup_file:
159  json.dump(
160  non_identical_duplicates,
161  backup_file,
162  indent=4,
163  sort_keys=True,
164  cls=JSONEncoder,
165  )
166  _LOGGER.warning(
167  (
168  "Deleted %s non identical duplicated %s rows, a backup of the deleted"
169  " rows has been saved to %s"
170  ),
171  len(non_identical_duplicates),
172  Statistics.__tablename__,
173  backup_path,
174  )
175 
176  deleted_short_term_statistics_rows, _ = _delete_duplicates_from_table(
177  session, StatisticsShortTerm
178  )
179  if deleted_short_term_statistics_rows:
180  _LOGGER.warning(
181  "Deleted duplicated short term statistic rows, please report at %s",
182  "https://github.com/home-assistant/core/issues?q=is%3Aopen+is%3Aissue+label%3A%22integration%3A+recorder%22",
183  )
184 
185 
186 def _find_statistics_meta_duplicates(session: Session) -> list[int]:
187  """Find duplicated statistics_meta."""
188  # When querying the database, be careful to only explicitly query for columns
189  # which were present in schema version 29. If querying the table, SQLAlchemy
190  # will refer to future columns.
191  subquery = (
192  session.query(
193  StatisticsMeta.statistic_id,
194  literal_column("1").label("is_duplicate"),
195  )
196  .group_by(StatisticsMeta.statistic_id)
197  .having(func.count() > 1)
198  .subquery()
199  )
200  query = (
201  session.query(StatisticsMeta.statistic_id, StatisticsMeta.id)
202  .outerjoin(
203  subquery,
204  (subquery.c.statistic_id == StatisticsMeta.statistic_id),
205  )
206  .filter(subquery.c.is_duplicate == 1)
207  .order_by(StatisticsMeta.statistic_id, StatisticsMeta.id.desc())
208  .limit(1000 * SQLITE_MAX_BIND_VARS)
209  )
210  duplicates = execute(query)
211  statistic_id = None
212  duplicate_ids: list[int] = []
213 
214  if not duplicates:
215  return duplicate_ids
216 
217  for duplicate in duplicates:
218  if statistic_id != duplicate.statistic_id:
219  statistic_id = duplicate.statistic_id
220  continue
221  duplicate_ids.append(duplicate.id)
222 
223  return duplicate_ids
224 
225 
226 def _delete_statistics_meta_duplicates(session: Session) -> int:
227  """Identify and delete duplicated statistics from a specified table."""
228  total_deleted_rows = 0
229  while True:
230  duplicate_ids = _find_statistics_meta_duplicates(session)
231  if not duplicate_ids:
232  break
233  for i in range(0, len(duplicate_ids), SQLITE_MAX_BIND_VARS):
234  deleted_rows = (
235  session.query(StatisticsMeta)
236  .filter(
237  StatisticsMeta.id.in_(duplicate_ids[i : i + SQLITE_MAX_BIND_VARS])
238  )
239  .delete(synchronize_session=False)
240  )
241  total_deleted_rows += deleted_rows
242  return total_deleted_rows
243 
244 
245 @database_job_retry_wrapper("delete statistics meta duplicates", 3)
246 def delete_statistics_meta_duplicates(instance: Recorder, session: Session) -> None:
247  """Identify and delete duplicated statistics_meta.
248 
249  This is used when migrating from schema version 28 to schema version 29.
250  Note: If this needs to be called during live schema migration it needs to
251  be modified to reload the statistics_meta_manager.
252  """
253  deleted_statistics_rows = _delete_statistics_meta_duplicates(session)
254  if deleted_statistics_rows:
255  _LOGGER.info(
256  "Deleted %s duplicated statistics_meta rows", deleted_statistics_rows
257  )
web.Response delete(self, web.Request request, str config_key)
Definition: view.py:144
None open(self, **Any kwargs)
Definition: lock.py:86
def execute(hass, filename, source, data=None, return_response=False)
Definition: __init__.py:194
tuple[int, list[dict]] _delete_duplicates_from_table(Session session, type[StatisticsBase] table)
Definition: duplicates.py:119
tuple[list[int], list[dict]] _find_duplicates(Session session, type[StatisticsBase] table)
Definition: duplicates.py:32
None delete_statistics_duplicates(Recorder instance, HomeAssistant hass, Session session)
Definition: duplicates.py:141
None delete_statistics_meta_duplicates(Recorder instance, Session session)
Definition: duplicates.py:246