Home Assistant Unofficial Reference 2024.12.1
purge.py
Go to the documentation of this file.
1 """Purge old data helper."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from datetime import datetime
7 from itertools import zip_longest
8 import logging
9 import time
10 from typing import TYPE_CHECKING
11 
12 from sqlalchemy.orm.session import Session
13 
14 from homeassistant.util.collection import chunked_or_all
15 
16 from .db_schema import Events, States, StatesMeta
17 from .models import DatabaseEngine
18 from .queries import (
19  attributes_ids_exist_in_states,
20  attributes_ids_exist_in_states_with_fast_in_distinct,
21  data_ids_exist_in_events,
22  data_ids_exist_in_events_with_fast_in_distinct,
23  delete_event_data_rows,
24  delete_event_rows,
25  delete_event_types_rows,
26  delete_recorder_runs_rows,
27  delete_states_attributes_rows,
28  delete_states_meta_rows,
29  delete_states_rows,
30  delete_statistics_runs_rows,
31  delete_statistics_short_term_rows,
32  disconnect_states_rows,
33  find_entity_ids_to_purge,
34  find_event_types_to_purge,
35  find_events_to_purge,
36  find_latest_statistics_runs_run_id,
37  find_legacy_detached_states_and_attributes_to_purge,
38  find_legacy_event_state_and_attributes_and_data_ids_to_purge,
39  find_legacy_row,
40  find_short_term_statistics_to_purge,
41  find_states_to_purge,
42  find_statistics_runs_to_purge,
43 )
44 from .repack import repack_database
45 from .util import retryable_database_job, session_scope
46 
47 if TYPE_CHECKING:
48  from . import Recorder
49 
50 _LOGGER = logging.getLogger(__name__)
51 
52 
53 DEFAULT_STATES_BATCHES_PER_PURGE = 20 # We expect ~95% de-dupe rate
54 DEFAULT_EVENTS_BATCHES_PER_PURGE = 15 # We expect ~92% de-dupe rate
55 
56 
57 @retryable_database_job("purge")
59  instance: Recorder,
60  purge_before: datetime,
61  repack: bool,
62  apply_filter: bool = False,
63  events_batch_size: int = DEFAULT_EVENTS_BATCHES_PER_PURGE,
64  states_batch_size: int = DEFAULT_STATES_BATCHES_PER_PURGE,
65 ) -> bool:
66  """Purge events and states older than purge_before.
67 
68  Cleans up an timeframe of an hour, based on the oldest record.
69  """
70  _LOGGER.debug(
71  "Purging states and events before target %s",
72  purge_before.isoformat(sep=" ", timespec="seconds"),
73  )
74  with session_scope(session=instance.get_session()) as session:
75  # Purge a max of max_bind_vars, based on the oldest states or events record
76  has_more_to_purge = False
77  if instance.use_legacy_events_index and _purging_legacy_format(session):
78  _LOGGER.debug(
79  "Purge running in legacy format as there are states with event_id"
80  " remaining"
81  )
82  has_more_to_purge |= _purge_legacy_format(instance, session, purge_before)
83  else:
84  _LOGGER.debug(
85  "Purge running in new format as there are NO states with event_id"
86  " remaining"
87  )
88  # Once we are done purging legacy rows, we use the new method
89  has_more_to_purge |= _purge_states_and_attributes_ids(
90  instance, session, states_batch_size, purge_before
91  )
92  has_more_to_purge |= _purge_events_and_data_ids(
93  instance, session, events_batch_size, purge_before
94  )
95 
96  statistics_runs = _select_statistics_runs_to_purge(
97  session, purge_before, instance.max_bind_vars
98  )
99  short_term_statistics = _select_short_term_statistics_to_purge(
100  session, purge_before, instance.max_bind_vars
101  )
102  if statistics_runs:
103  _purge_statistics_runs(session, statistics_runs)
104 
105  if short_term_statistics:
106  _purge_short_term_statistics(session, short_term_statistics)
107 
108  if has_more_to_purge or statistics_runs or short_term_statistics:
109  # Return false, as we might not be done yet.
110  _LOGGER.debug("Purging hasn't fully completed yet")
111  return False
112 
113  if apply_filter and not _purge_filtered_data(instance, session):
114  _LOGGER.debug("Cleanup filtered data hasn't fully completed yet")
115  return False
116 
117  # This purge cycle is finished, clean up old event types and
118  # recorder runs
119  if instance.event_type_manager.active:
120  _purge_old_event_types(instance, session)
121 
122  if instance.states_meta_manager.active:
123  _purge_old_entity_ids(instance, session)
124 
125  _purge_old_recorder_runs(instance, session, purge_before)
126  with session_scope(session=instance.get_session(), read_only=True) as session:
127  instance.recorder_runs_manager.load_from_db(session)
128  instance.states_manager.load_from_db(session)
129  if repack:
130  repack_database(instance)
131  return True
132 
133 
134 def _purging_legacy_format(session: Session) -> bool:
135  """Check if there are any legacy event_id linked states rows remaining."""
136  return bool(session.execute(find_legacy_row()).scalar())
137 
138 
140  instance: Recorder, session: Session, purge_before: datetime
141 ) -> bool:
142  """Purge rows that are still linked by the event_ids."""
143  (
144  event_ids,
145  state_ids,
146  attributes_ids,
147  data_ids,
149  session, purge_before, instance.max_bind_vars
150  )
151  _purge_state_ids(instance, session, state_ids)
152  _purge_unused_attributes_ids(instance, session, attributes_ids)
153  _purge_event_ids(session, event_ids)
154  _purge_unused_data_ids(instance, session, data_ids)
155 
156  # The database may still have some rows that have an event_id but are not
157  # linked to any event. These rows are not linked to any event because the
158  # event was deleted. We need to purge these rows as well or we will never
159  # switch to the new format which will prevent us from purging any events
160  # that happened after the detached states.
161  (
162  detached_state_ids,
163  detached_attributes_ids,
165  session, purge_before, instance.max_bind_vars
166  )
167  _purge_state_ids(instance, session, detached_state_ids)
168  _purge_unused_attributes_ids(instance, session, detached_attributes_ids)
169  return bool(
170  event_ids
171  or state_ids
172  or attributes_ids
173  or data_ids
174  or detached_state_ids
175  or detached_attributes_ids
176  )
177 
178 
180  instance: Recorder,
181  session: Session,
182  states_batch_size: int,
183  purge_before: datetime,
184 ) -> bool:
185  """Purge states and linked attributes id in a batch.
186 
187  Returns true if there are more states to purge.
188  """
189  database_engine = instance.database_engine
190  assert database_engine is not None
191  has_remaining_state_ids_to_purge = True
192  # There are more states relative to attributes_ids so
193  # we purge enough state_ids to try to generate a full
194  # size batch of attributes_ids that will be around the size
195  # max_bind_vars
196  attributes_ids_batch: set[int] = set()
197  max_bind_vars = instance.max_bind_vars
198  for _ in range(states_batch_size):
199  state_ids, attributes_ids = _select_state_attributes_ids_to_purge(
200  session, purge_before, max_bind_vars
201  )
202  if not state_ids:
203  has_remaining_state_ids_to_purge = False
204  break
205  _purge_state_ids(instance, session, state_ids)
206  attributes_ids_batch = attributes_ids_batch | attributes_ids
207 
208  _purge_unused_attributes_ids(instance, session, attributes_ids_batch)
209  _LOGGER.debug(
210  "After purging states and attributes_ids remaining=%s",
211  has_remaining_state_ids_to_purge,
212  )
213  return has_remaining_state_ids_to_purge
214 
215 
217  instance: Recorder,
218  session: Session,
219  events_batch_size: int,
220  purge_before: datetime,
221 ) -> bool:
222  """Purge states and linked attributes id in a batch.
223 
224  Returns true if there are more states to purge.
225  """
226  has_remaining_event_ids_to_purge = True
227  # There are more events relative to data_ids so
228  # we purge enough event_ids to try to generate a full
229  # size batch of data_ids that will be around the size
230  # max_bind_vars
231  data_ids_batch: set[int] = set()
232  max_bind_vars = instance.max_bind_vars
233  for _ in range(events_batch_size):
234  event_ids, data_ids = _select_event_data_ids_to_purge(
235  session, purge_before, max_bind_vars
236  )
237  if not event_ids:
238  has_remaining_event_ids_to_purge = False
239  break
240  _purge_event_ids(session, event_ids)
241  data_ids_batch = data_ids_batch | data_ids
242 
243  _purge_unused_data_ids(instance, session, data_ids_batch)
244  _LOGGER.debug(
245  "After purging event and data_ids remaining=%s",
246  has_remaining_event_ids_to_purge,
247  )
248  return has_remaining_event_ids_to_purge
249 
250 
252  session: Session, purge_before: datetime, max_bind_vars: int
253 ) -> tuple[set[int], set[int]]:
254  """Return sets of state and attribute ids to purge."""
255  state_ids = set()
256  attributes_ids = set()
257  for state_id, attributes_id in session.execute(
258  find_states_to_purge(purge_before.timestamp(), max_bind_vars)
259  ).all():
260  state_ids.add(state_id)
261  if attributes_id:
262  attributes_ids.add(attributes_id)
263  _LOGGER.debug(
264  "Selected %s state ids and %s attributes_ids to remove",
265  len(state_ids),
266  len(attributes_ids),
267  )
268  return state_ids, attributes_ids
269 
270 
272  session: Session, purge_before: datetime, max_bind_vars: int
273 ) -> tuple[set[int], set[int]]:
274  """Return sets of event and data ids to purge."""
275  event_ids = set()
276  data_ids = set()
277  for event_id, data_id in session.execute(
278  find_events_to_purge(purge_before.timestamp(), max_bind_vars)
279  ).all():
280  event_ids.add(event_id)
281  if data_id:
282  data_ids.add(data_id)
283  _LOGGER.debug(
284  "Selected %s event ids and %s data_ids to remove", len(event_ids), len(data_ids)
285  )
286  return event_ids, data_ids
287 
288 
290  instance: Recorder,
291  session: Session,
292  attributes_ids: set[int],
293  database_engine: DatabaseEngine,
294 ) -> set[int]:
295  """Return a set of attributes ids that are not used by any states in the db."""
296  if not attributes_ids:
297  return set()
298 
299  seen_ids: set[int] = set()
300  if not database_engine.optimizer.slow_range_in_select:
301  #
302  # SQLite has a superior query optimizer for the distinct query below as it uses
303  # the covering index without having to examine the rows directly for both of the
304  # queries below.
305  #
306  # We use the distinct query for SQLite since the query in the other branch can
307  # generate more than 500 unions which SQLite does not support.
308  #
309  # How MariaDB's query optimizer handles this query:
310  # > explain select distinct attributes_id from states where attributes_id in
311  # (136723);
312  # ...Using index
313  #
314  for attributes_ids_chunk in chunked_or_all(
315  attributes_ids, instance.max_bind_vars
316  ):
317  seen_ids.update(
318  state[0]
319  for state in session.execute(
321  attributes_ids_chunk
322  )
323  ).all()
324  )
325  else:
326  #
327  # This branch is for DBMS that cannot optimize the distinct query well and has
328  # to examine all the rows that match.
329  #
330  # This branch uses a union of simple queries, as each query is optimized away
331  # as the answer to the query can be found in the index.
332  #
333  # The below query works for SQLite as long as there are no more than 500
334  # attributes_id to be selected. We currently do not have MySQL or PostgreSQL
335  # servers running in the test suite; we test this path using SQLite when there
336  # are less than 500 attributes_id.
337  #
338  # How MariaDB's query optimizer handles this query:
339  # > explain select min(attributes_id) from states where attributes_id = 136723;
340  # ...Select tables optimized away
341  #
342  # We used to generate a query based on how many attribute_ids to find but
343  # that meant sqlalchemy Transparent SQL Compilation Caching was working against
344  # us by cached up to max_bind_vars different statements which could be
345  # up to 500MB for large database due to the complexity of the ORM objects.
346  #
347  # We now break the query into groups of 100 and use a lambda_stmt to ensure
348  # that the query is only cached once.
349  #
350  groups = [iter(attributes_ids)] * 100
351  for attr_ids in zip_longest(*groups, fillvalue=None):
352  seen_ids |= {
353  attrs_id[0]
354  for attrs_id in session.execute(
355  attributes_ids_exist_in_states(*attr_ids) # type: ignore[arg-type]
356  ).all()
357  if attrs_id[0] is not None
358  }
359  to_remove = attributes_ids - seen_ids
360  _LOGGER.debug(
361  "Selected %s shared attributes to remove",
362  len(to_remove),
363  )
364  return to_remove
365 
366 
368  instance: Recorder,
369  session: Session,
370  attributes_ids_batch: set[int],
371 ) -> None:
372  """Purge unused attributes ids."""
373  database_engine = instance.database_engine
374  assert database_engine is not None
375  if unused_attribute_ids_set := _select_unused_attributes_ids(
376  instance, session, attributes_ids_batch, database_engine
377  ):
378  _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
379 
380 
382  instance: Recorder,
383  session: Session,
384  data_ids: set[int],
385  database_engine: DatabaseEngine,
386 ) -> set[int]:
387  """Return a set of event data ids that are not used by any events in the db."""
388  if not data_ids:
389  return set()
390 
391  seen_ids: set[int] = set()
392  # See _select_unused_attributes_ids for why this function
393  # branches for non-sqlite databases.
394  if not database_engine.optimizer.slow_range_in_select:
395  for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars):
396  seen_ids.update(
397  state[0]
398  for state in session.execute(
400  ).all()
401  )
402  else:
403  groups = [iter(data_ids)] * 100
404  for data_ids_group in zip_longest(*groups, fillvalue=None):
405  seen_ids |= {
406  data_id[0]
407  for data_id in session.execute(
408  data_ids_exist_in_events(*data_ids_group) # type: ignore[arg-type]
409  ).all()
410  if data_id[0] is not None
411  }
412  to_remove = data_ids - seen_ids
413  _LOGGER.debug("Selected %s shared event data to remove", len(to_remove))
414  return to_remove
415 
416 
418  instance: Recorder, session: Session, data_ids_batch: set[int]
419 ) -> None:
420  database_engine = instance.database_engine
421  assert database_engine is not None
422  if unused_data_ids_set := _select_unused_event_data_ids(
423  instance, session, data_ids_batch, database_engine
424  ):
425  _purge_batch_data_ids(instance, session, unused_data_ids_set)
426 
427 
429  session: Session, purge_before: datetime, max_bind_vars: int
430 ) -> list[int]:
431  """Return a list of statistic runs to purge.
432 
433  Takes care to keep the newest run.
434  """
435  statistic_runs = session.execute(
436  find_statistics_runs_to_purge(purge_before, max_bind_vars)
437  ).all()
438  statistic_runs_list = [run_id for (run_id,) in statistic_runs]
439  # Exclude the newest statistics run
440  if (
441  last_run := session.execute(find_latest_statistics_runs_run_id()).scalar()
442  ) and last_run in statistic_runs_list:
443  statistic_runs_list.remove(last_run)
444 
445  _LOGGER.debug("Selected %s statistic runs to remove", len(statistic_runs))
446  return statistic_runs_list
447 
448 
450  session: Session, purge_before: datetime, max_bind_vars: int
451 ) -> list[int]:
452  """Return a list of short term statistics to purge."""
453  statistics = session.execute(
454  find_short_term_statistics_to_purge(purge_before, max_bind_vars)
455  ).all()
456  _LOGGER.debug("Selected %s short term statistics to remove", len(statistics))
457  return [statistic_id for (statistic_id,) in statistics]
458 
459 
461  session: Session, purge_before: datetime, max_bind_vars: int
462 ) -> tuple[set[int], set[int]]:
463  """Return a list of state, and attribute ids to purge.
464 
465  We do not link these anymore since state_change events
466  do not exist in the events table anymore, however we
467  still need to be able to purge them.
468  """
469  states = session.execute(
471  purge_before.timestamp(), max_bind_vars
472  )
473  ).all()
474  _LOGGER.debug("Selected %s state ids to remove", len(states))
475  state_ids = set()
476  attributes_ids = set()
477  for state_id, attributes_id in states:
478  if state_id:
479  state_ids.add(state_id)
480  if attributes_id:
481  attributes_ids.add(attributes_id)
482  return state_ids, attributes_ids
483 
484 
486  session: Session, purge_before: datetime, max_bind_vars: int
487 ) -> tuple[set[int], set[int], set[int], set[int]]:
488  """Return a list of event, state, and attribute ids to purge linked by the event_id.
489 
490  We do not link these anymore since state_change events
491  do not exist in the events table anymore, however we
492  still need to be able to purge them.
493  """
494  events = session.execute(
496  purge_before.timestamp(), max_bind_vars
497  )
498  ).all()
499  _LOGGER.debug("Selected %s event ids to remove", len(events))
500  event_ids = set()
501  state_ids = set()
502  attributes_ids = set()
503  data_ids = set()
504  for event_id, data_id, state_id, attributes_id in events:
505  event_ids.add(event_id)
506  if state_id:
507  state_ids.add(state_id)
508  if attributes_id:
509  attributes_ids.add(attributes_id)
510  if data_id:
511  data_ids.add(data_id)
512  return event_ids, state_ids, attributes_ids, data_ids
513 
514 
515 def _purge_state_ids(instance: Recorder, session: Session, state_ids: set[int]) -> None:
516  """Disconnect states and delete by state id."""
517  if not state_ids:
518  return
519 
520  # Update old_state_id to NULL before deleting to ensure
521  # the delete does not fail due to a foreign key constraint
522  # since some databases (MSSQL) cannot do the ON DELETE SET NULL
523  # for us.
524  disconnected_rows = session.execute(disconnect_states_rows(state_ids))
525  _LOGGER.debug("Updated %s states to remove old_state_id", disconnected_rows)
526 
527  deleted_rows = session.execute(delete_states_rows(state_ids))
528  _LOGGER.debug("Deleted %s states", deleted_rows)
529 
530  # Evict eny entries in the old_states cache referring to a purged state
531  instance.states_manager.evict_purged_state_ids(state_ids)
532 
533 
535  instance: Recorder, session: Session, attributes_ids: set[int]
536 ) -> None:
537  """Delete old attributes ids in batches of max_bind_vars."""
538  for attributes_ids_chunk in chunked_or_all(attributes_ids, instance.max_bind_vars):
539  deleted_rows = session.execute(
540  delete_states_attributes_rows(attributes_ids_chunk)
541  )
542  _LOGGER.debug("Deleted %s attribute states", deleted_rows)
543 
544  # Evict any entries in the state_attributes_ids cache referring to a purged state
545  instance.state_attributes_manager.evict_purged(attributes_ids)
546 
547 
549  instance: Recorder, session: Session, data_ids: set[int]
550 ) -> None:
551  """Delete old event data ids in batches of max_bind_vars."""
552  for data_ids_chunk in chunked_or_all(data_ids, instance.max_bind_vars):
553  deleted_rows = session.execute(delete_event_data_rows(data_ids_chunk))
554  _LOGGER.debug("Deleted %s data events", deleted_rows)
555 
556  # Evict any entries in the event_data_ids cache referring to a purged state
557  instance.event_data_manager.evict_purged(data_ids)
558 
559 
560 def _purge_statistics_runs(session: Session, statistics_runs: list[int]) -> None:
561  """Delete by run_id."""
562  deleted_rows = session.execute(delete_statistics_runs_rows(statistics_runs))
563  _LOGGER.debug("Deleted %s statistic runs", deleted_rows)
564 
565 
567  session: Session, short_term_statistics: list[int]
568 ) -> None:
569  """Delete by id."""
570  deleted_rows = session.execute(
571  delete_statistics_short_term_rows(short_term_statistics)
572  )
573  _LOGGER.debug("Deleted %s short term statistics", deleted_rows)
574 
575 
576 def _purge_event_ids(session: Session, event_ids: set[int]) -> None:
577  """Delete by event id."""
578  if not event_ids:
579  return
580  deleted_rows = session.execute(delete_event_rows(event_ids))
581  _LOGGER.debug("Deleted %s events", deleted_rows)
582 
583 
585  instance: Recorder, session: Session, purge_before: datetime
586 ) -> None:
587  """Purge all old recorder runs."""
588  # Recorder runs is small, no need to batch run it
589  deleted_rows = session.execute(
591  purge_before, instance.recorder_runs_manager.current.run_id
592  )
593  )
594  _LOGGER.debug("Deleted %s recorder_runs", deleted_rows)
595 
596 
597 def _purge_old_event_types(instance: Recorder, session: Session) -> None:
598  """Purge all old event types."""
599  # Event types is small, no need to batch run it
600  purge_event_types = set()
601  event_type_ids = set()
602  for event_type_id, event_type in session.execute(find_event_types_to_purge()):
603  purge_event_types.add(event_type)
604  event_type_ids.add(event_type_id)
605 
606  if not event_type_ids:
607  return
608 
609  deleted_rows = session.execute(delete_event_types_rows(event_type_ids))
610  _LOGGER.debug("Deleted %s event types", deleted_rows)
611 
612  # Evict any entries in the event_type cache referring to a purged state
613  instance.event_type_manager.evict_purged(purge_event_types)
614 
615 
616 def _purge_old_entity_ids(instance: Recorder, session: Session) -> None:
617  """Purge all old entity_ids."""
618  # entity_ids are small, no need to batch run it
619  purge_entity_ids = set()
620  states_metadata_ids = set()
621  for metadata_id, entity_id in session.execute(find_entity_ids_to_purge()):
622  purge_entity_ids.add(entity_id)
623  states_metadata_ids.add(metadata_id)
624 
625  if not states_metadata_ids:
626  return
627 
628  deleted_rows = session.execute(delete_states_meta_rows(states_metadata_ids))
629  _LOGGER.debug("Deleted %s states meta", deleted_rows)
630 
631  # Evict any entries in the event_type cache referring to a purged state
632  instance.states_meta_manager.evict_purged(purge_entity_ids)
633  instance.states_manager.evict_purged_entity_ids(purge_entity_ids)
634 
635 
636 def _purge_filtered_data(instance: Recorder, session: Session) -> bool:
637  """Remove filtered states and events that shouldn't be in the database.
638 
639  Returns true if all states and events are purged.
640  """
641  _LOGGER.debug("Cleanup filtered data")
642  database_engine = instance.database_engine
643  assert database_engine is not None
644  now_timestamp = time.time()
645 
646  # Check if excluded entity_ids are in database
647  entity_filter = instance.entity_filter
648  has_more_to_purge = False
649  excluded_metadata_ids: list[str] = [
650  metadata_id
651  for (metadata_id, entity_id) in session.query(
652  StatesMeta.metadata_id, StatesMeta.entity_id
653  ).all()
654  if entity_filter and not entity_filter(entity_id)
655  ]
656  if excluded_metadata_ids:
657  has_more_to_purge |= not _purge_filtered_states(
658  instance, session, excluded_metadata_ids, database_engine, now_timestamp
659  )
660 
661  # Check if excluded event_types are in database
662  if (
663  event_type_to_event_type_ids := instance.event_type_manager.get_many(
664  instance.exclude_event_types, session
665  )
666  ) and (
667  excluded_event_type_ids := [
668  event_type_id
669  for event_type_id in event_type_to_event_type_ids.values()
670  if event_type_id is not None
671  ]
672  ):
673  has_more_to_purge |= not _purge_filtered_events(
674  instance, session, excluded_event_type_ids, now_timestamp
675  )
676 
677  # Purge has completed if there are not more state or events to purge
678  return not has_more_to_purge
679 
680 
682  instance: Recorder,
683  session: Session,
684  metadata_ids_to_purge: list[str],
685  database_engine: DatabaseEngine,
686  purge_before_timestamp: float,
687 ) -> bool:
688  """Remove filtered states and linked events.
689 
690  Return true if all states are purged
691  """
692  state_ids: tuple[int, ...]
693  attributes_ids: tuple[int, ...]
694  event_ids: tuple[int, ...]
695  to_purge = list(
696  session.query(States.state_id, States.attributes_id, States.event_id)
697  .filter(States.metadata_id.in_(metadata_ids_to_purge))
698  .filter(States.last_updated_ts < purge_before_timestamp)
699  .limit(instance.max_bind_vars)
700  .all()
701  )
702  if not to_purge:
703  return True
704  state_ids, attributes_ids, event_ids = zip(*to_purge, strict=False)
705  filtered_event_ids = {id_ for id_ in event_ids if id_ is not None}
706  _LOGGER.debug(
707  "Selected %s state_ids to remove that should be filtered", len(state_ids)
708  )
709  _purge_state_ids(instance, session, set(state_ids))
710  # These are legacy events that are linked to a state that are no longer
711  # created but since we did not remove them when we stopped adding new ones
712  # we will need to purge them here.
713  _purge_event_ids(session, filtered_event_ids)
714  unused_attribute_ids_set = _select_unused_attributes_ids(
715  instance,
716  session,
717  {id_ for id_ in attributes_ids if id_ is not None},
718  database_engine,
719  )
720  _purge_batch_attributes_ids(instance, session, unused_attribute_ids_set)
721  return False
722 
723 
725  instance: Recorder,
726  session: Session,
727  excluded_event_type_ids: list[int],
728  purge_before_timestamp: float,
729 ) -> bool:
730  """Remove filtered events and linked states.
731 
732  Return true if all events are purged.
733  """
734  database_engine = instance.database_engine
735  assert database_engine is not None
736  to_purge = list(
737  session.query(Events.event_id, Events.data_id)
738  .filter(Events.event_type_id.in_(excluded_event_type_ids))
739  .filter(Events.time_fired_ts < purge_before_timestamp)
740  .limit(instance.max_bind_vars)
741  .all()
742  )
743  if not to_purge:
744  return True
745  event_ids, data_ids = zip(*to_purge, strict=False)
746  event_ids_set = set(event_ids)
747  _LOGGER.debug(
748  "Selected %s event_ids to remove that should be filtered", len(event_ids_set)
749  )
750  if (
751  instance.use_legacy_events_index
752  and (
753  states := session.query(States.state_id)
754  .filter(States.event_id.in_(event_ids_set))
755  .all()
756  )
757  and (state_ids := {state_id for (state_id,) in states})
758  ):
759  # These are legacy states that are linked to an event that are no longer
760  # created but since we did not remove them when we stopped adding new ones
761  # we will need to purge them here.
762  _purge_state_ids(instance, session, state_ids)
763  _purge_event_ids(session, event_ids_set)
764  if unused_data_ids_set := _select_unused_event_data_ids(
765  instance, session, set(data_ids), database_engine
766  ):
767  _purge_batch_data_ids(instance, session, unused_data_ids_set)
768  return False
769 
770 
771 @retryable_database_job("purge_entity_data")
773  instance: Recorder,
774  entity_filter: Callable[[str], bool] | None,
775  purge_before: datetime,
776 ) -> bool:
777  """Purge states and events of specified entities."""
778  database_engine = instance.database_engine
779  assert database_engine is not None
780  purge_before_timestamp = purge_before.timestamp()
781  with session_scope(session=instance.get_session()) as session:
782  selected_metadata_ids: list[str] = [
783  metadata_id
784  for (metadata_id, entity_id) in session.query(
785  StatesMeta.metadata_id, StatesMeta.entity_id
786  ).all()
787  if entity_filter and entity_filter(entity_id)
788  ]
789  _LOGGER.debug("Purging entity data for %s", selected_metadata_ids)
790  if not selected_metadata_ids:
791  return True
792 
793  # Purge a max of max_bind_vars, based on the oldest states
794  # or events record.
795  if not _purge_filtered_states(
796  instance,
797  session,
798  selected_metadata_ids,
799  database_engine,
800  purge_before_timestamp,
801  ):
802  _LOGGER.debug("Purging entity data hasn't fully completed yet")
803  return False
804 
805  _purge_old_entity_ids(instance, session)
806 
807  return True
bool _purge_filtered_events(Recorder instance, Session session, list[int] excluded_event_type_ids, float purge_before_timestamp)
Definition: purge.py:729
None _purge_short_term_statistics(Session session, list[int] short_term_statistics)
Definition: purge.py:568
tuple[set[int], set[int]] _select_state_attributes_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:253
tuple[set[int], set[int]] _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:462
tuple[set[int], set[int]] _select_event_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:273
bool _purging_legacy_format(Session session)
Definition: purge.py:134
bool _purge_filtered_states(Recorder instance, Session session, list[str] metadata_ids_to_purge, DatabaseEngine database_engine, float purge_before_timestamp)
Definition: purge.py:687
bool purge_old_data(Recorder instance, datetime purge_before, bool repack, bool apply_filter=False, int events_batch_size=DEFAULT_EVENTS_BATCHES_PER_PURGE, int states_batch_size=DEFAULT_STATES_BATCHES_PER_PURGE)
Definition: purge.py:65
None _purge_unused_attributes_ids(Recorder instance, Session session, set[int] attributes_ids_batch)
Definition: purge.py:371
None _purge_unused_data_ids(Recorder instance, Session session, set[int] data_ids_batch)
Definition: purge.py:419
None _purge_state_ids(Recorder instance, Session session, set[int] state_ids)
Definition: purge.py:515
bool purge_entity_data(Recorder instance, Callable[[str], bool]|None entity_filter, datetime purge_before)
Definition: purge.py:776
None _purge_event_ids(Session session, set[int] event_ids)
Definition: purge.py:576
bool _purge_events_and_data_ids(Recorder instance, Session session, int events_batch_size, datetime purge_before)
Definition: purge.py:221
bool _purge_filtered_data(Recorder instance, Session session)
Definition: purge.py:636
bool _purge_states_and_attributes_ids(Recorder instance, Session session, int states_batch_size, datetime purge_before)
Definition: purge.py:184
None _purge_old_recorder_runs(Recorder instance, Session session, datetime purge_before)
Definition: purge.py:586
None _purge_old_event_types(Recorder instance, Session session)
Definition: purge.py:597
None _purge_statistics_runs(Session session, list[int] statistics_runs)
Definition: purge.py:560
None _purge_batch_data_ids(Recorder instance, Session session, set[int] data_ids)
Definition: purge.py:550
None _purge_batch_attributes_ids(Recorder instance, Session session, set[int] attributes_ids)
Definition: purge.py:536
set[int] _select_unused_event_data_ids(Recorder instance, Session session, set[int] data_ids, DatabaseEngine database_engine)
Definition: purge.py:386
bool _purge_legacy_format(Recorder instance, Session session, datetime purge_before)
Definition: purge.py:141
tuple[set[int], set[int], set[int], set[int]] _select_legacy_event_state_and_attributes_and_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:487
list[int] _select_statistics_runs_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:430
set[int] _select_unused_attributes_ids(Recorder instance, Session session, set[int] attributes_ids, DatabaseEngine database_engine)
Definition: purge.py:294
list[int] _select_short_term_statistics_to_purge(Session session, datetime purge_before, int max_bind_vars)
Definition: purge.py:451
None _purge_old_entity_ids(Recorder instance, Session session)
Definition: purge.py:616
StatementLambdaElement delete_states_meta_rows(Iterable[int] metadata_ids)
Definition: queries.py:883
StatementLambdaElement find_legacy_detached_states_and_attributes_to_purge(float purge_before, int max_bind_vars)
Definition: queries.py:693
StatementLambdaElement find_short_term_statistics_to_purge(datetime purge_before, int max_bind_vars)
Definition: queries.py:651
StatementLambdaElement attributes_ids_exist_in_states(int attr1, int|None attr2, int|None attr3, int|None attr4, int|None attr5, int|None attr6, int|None attr7, int|None attr8, int|None attr9, int|None attr10, int|None attr11, int|None attr12, int|None attr13, int|None attr14, int|None attr15, int|None attr16, int|None attr17, int|None attr18, int|None attr19, int|None attr20, int|None attr21, int|None attr22, int|None attr23, int|None attr24, int|None attr25, int|None attr26, int|None attr27, int|None attr28, int|None attr29, int|None attr30, int|None attr31, int|None attr32, int|None attr33, int|None attr34, int|None attr35, int|None attr36, int|None attr37, int|None attr38, int|None attr39, int|None attr40, int|None attr41, int|None attr42, int|None attr43, int|None attr44, int|None attr45, int|None attr46, int|None attr47, int|None attr48, int|None attr49, int|None attr50, int|None attr51, int|None attr52, int|None attr53, int|None attr54, int|None attr55, int|None attr56, int|None attr57, int|None attr58, int|None attr59, int|None attr60, int|None attr61, int|None attr62, int|None attr63, int|None attr64, int|None attr65, int|None attr66, int|None attr67, int|None attr68, int|None attr69, int|None attr70, int|None attr71, int|None attr72, int|None attr73, int|None attr74, int|None attr75, int|None attr76, int|None attr77, int|None attr78, int|None attr79, int|None attr80, int|None attr81, int|None attr82, int|None attr83, int|None attr84, int|None attr85, int|None attr86, int|None attr87, int|None attr88, int|None attr89, int|None attr90, int|None attr91, int|None attr92, int|None attr93, int|None attr94, int|None attr95, int|None attr96, int|None attr97, int|None attr98, int|None attr99, int|None attr100)
Definition: queries.py:196
StatementLambdaElement delete_recorder_runs_rows(datetime purge_before, int current_run_id)
Definition: queries.py:607
StatementLambdaElement find_legacy_event_state_and_attributes_and_data_ids_to_purge(float purge_before, int max_bind_vars)
Definition: queries.py:679
StatementLambdaElement find_latest_statistics_runs_run_id()
Definition: queries.py:672
StatementLambdaElement delete_event_data_rows(Iterable[int] data_ids)
Definition: queries.py:552
StatementLambdaElement delete_event_rows(Iterable[int] event_ids)
Definition: queries.py:596
StatementLambdaElement find_statistics_runs_to_purge(datetime purge_before, int max_bind_vars)
Definition: queries.py:663
StatementLambdaElement delete_event_types_rows(Iterable[int] event_type_ids)
Definition: queries.py:874
StatementLambdaElement find_events_to_purge(float purge_before, int max_bind_vars)
Definition: queries.py:620
StatementLambdaElement delete_statistics_runs_rows(Iterable[int] statistics_runs)
Definition: queries.py:574
StatementLambdaElement delete_states_rows(Iterable[int] state_ids)
Definition: queries.py:543
StatementLambdaElement delete_states_attributes_rows(Iterable[int] attributes_ids)
Definition: queries.py:563
StatementLambdaElement find_states_to_purge(float purge_before, int max_bind_vars)
Definition: queries.py:631
StatementLambdaElement find_legacy_row()
Definition: queries.py:707
StatementLambdaElement find_event_types_to_purge()
Definition: queries.py:840
StatementLambdaElement attributes_ids_exist_in_states_with_fast_in_distinct(Iterable[int] attributes_ids)
Definition: queries.py:86
StatementLambdaElement disconnect_states_rows(Iterable[int] state_ids)
Definition: queries.py:533
StatementLambdaElement data_ids_exist_in_events_with_fast_in_distinct(Iterable[int] data_ids)
Definition: queries.py:309
StatementLambdaElement delete_statistics_short_term_rows(Iterable[int] short_term_statistics)
Definition: queries.py:585
StatementLambdaElement data_ids_exist_in_events(int id1, int|None id2, int|None id3, int|None id4, int|None id5, int|None id6, int|None id7, int|None id8, int|None id9, int|None id10, int|None id11, int|None id12, int|None id13, int|None id14, int|None id15, int|None id16, int|None id17, int|None id18, int|None id19, int|None id20, int|None id21, int|None id22, int|None id23, int|None id24, int|None id25, int|None id26, int|None id27, int|None id28, int|None id29, int|None id30, int|None id31, int|None id32, int|None id33, int|None id34, int|None id35, int|None id36, int|None id37, int|None id38, int|None id39, int|None id40, int|None id41, int|None id42, int|None id43, int|None id44, int|None id45, int|None id46, int|None id47, int|None id48, int|None id49, int|None id50, int|None id51, int|None id52, int|None id53, int|None id54, int|None id55, int|None id56, int|None id57, int|None id58, int|None id59, int|None id60, int|None id61, int|None id62, int|None id63, int|None id64, int|None id65, int|None id66, int|None id67, int|None id68, int|None id69, int|None id70, int|None id71, int|None id72, int|None id73, int|None id74, int|None id75, int|None id76, int|None id77, int|None id78, int|None id79, int|None id80, int|None id81, int|None id82, int|None id83, int|None id84, int|None id85, int|None id86, int|None id87, int|None id88, int|None id89, int|None id90, int|None id91, int|None id92, int|None id93, int|None id94, int|None id95, int|None id96, int|None id97, int|None id98, int|None id99, int|None id100)
Definition: queries.py:422
StatementLambdaElement find_entity_ids_to_purge()
Definition: queries.py:857
None repack_database(Recorder instance)
Definition: repack.py:19
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)
Definition: collection.py:25