1 """Purge old data helper."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import datetime
7 from itertools
import zip_longest
10 from typing
import TYPE_CHECKING
12 from sqlalchemy.orm.session
import Session
16 from .db_schema
import Events, States, StatesMeta
17 from .models
import DatabaseEngine
18 from .queries
import (
19 attributes_ids_exist_in_states,
20 attributes_ids_exist_in_states_with_fast_in_distinct,
21 data_ids_exist_in_events,
22 data_ids_exist_in_events_with_fast_in_distinct,
23 delete_event_data_rows,
25 delete_event_types_rows,
26 delete_recorder_runs_rows,
27 delete_states_attributes_rows,
28 delete_states_meta_rows,
30 delete_statistics_runs_rows,
31 delete_statistics_short_term_rows,
32 disconnect_states_rows,
33 find_entity_ids_to_purge,
34 find_event_types_to_purge,
36 find_latest_statistics_runs_run_id,
37 find_legacy_detached_states_and_attributes_to_purge,
38 find_legacy_event_state_and_attributes_and_data_ids_to_purge,
40 find_short_term_statistics_to_purge,
42 find_statistics_runs_to_purge,
44 from .repack
import repack_database
45 from .util
import retryable_database_job, session_scope
48 from .
import Recorder
50 _LOGGER = logging.getLogger(__name__)
53 DEFAULT_STATES_BATCHES_PER_PURGE = 20
54 DEFAULT_EVENTS_BATCHES_PER_PURGE = 15
57 @retryable_database_job("purge")
60 purge_before: datetime,
62 apply_filter: bool =
False,
63 events_batch_size: int = DEFAULT_EVENTS_BATCHES_PER_PURGE,
64 states_batch_size: int = DEFAULT_STATES_BATCHES_PER_PURGE,
66 """Purge events and states older than purge_before.
68 Cleans up an timeframe of an hour, based on the oldest record.
71 "Purging states and events before target %s",
72 purge_before.isoformat(sep=
" ", timespec=
"seconds"),
74 with session_scope(session=instance.get_session())
as session:
76 has_more_to_purge =
False
79 "Purge running in legacy format as there are states with event_id"
85 "Purge running in new format as there are NO states with event_id"
90 instance, session, states_batch_size, purge_before
93 instance, session, events_batch_size, purge_before
97 session, purge_before, instance.max_bind_vars
100 session, purge_before, instance.max_bind_vars
105 if short_term_statistics:
108 if has_more_to_purge
or statistics_runs
or short_term_statistics:
110 _LOGGER.debug(
"Purging hasn't fully completed yet")
114 _LOGGER.debug(
"Cleanup filtered data hasn't fully completed yet")
119 if instance.event_type_manager.active:
122 if instance.states_meta_manager.active:
126 with session_scope(session=instance.get_session(), read_only=
True)
as session:
127 instance.recorder_runs_manager.load_from_db(session)
128 instance.states_manager.load_from_db(session)
135 """Check if there are any legacy event_id linked states rows remaining."""
140 instance: Recorder, session: Session, purge_before: datetime
142 """Purge rows that are still linked by the event_ids."""
149 session, purge_before, instance.max_bind_vars
163 detached_attributes_ids,
165 session, purge_before, instance.max_bind_vars
174 or detached_state_ids
175 or detached_attributes_ids
182 states_batch_size: int,
183 purge_before: datetime,
185 """Purge states and linked attributes id in a batch.
187 Returns true if there are more states to purge.
189 database_engine = instance.database_engine
190 assert database_engine
is not None
191 has_remaining_state_ids_to_purge =
True
196 attributes_ids_batch: set[int] = set()
197 max_bind_vars = instance.max_bind_vars
198 for _
in range(states_batch_size):
200 session, purge_before, max_bind_vars
203 has_remaining_state_ids_to_purge =
False
206 attributes_ids_batch = attributes_ids_batch | attributes_ids
210 "After purging states and attributes_ids remaining=%s",
211 has_remaining_state_ids_to_purge,
213 return has_remaining_state_ids_to_purge
219 events_batch_size: int,
220 purge_before: datetime,
222 """Purge states and linked attributes id in a batch.
224 Returns true if there are more states to purge.
226 has_remaining_event_ids_to_purge =
True
231 data_ids_batch: set[int] = set()
232 max_bind_vars = instance.max_bind_vars
233 for _
in range(events_batch_size):
235 session, purge_before, max_bind_vars
238 has_remaining_event_ids_to_purge =
False
241 data_ids_batch = data_ids_batch | data_ids
245 "After purging event and data_ids remaining=%s",
246 has_remaining_event_ids_to_purge,
248 return has_remaining_event_ids_to_purge
252 session: Session, purge_before: datetime, max_bind_vars: int
253 ) -> tuple[set[int], set[int]]:
254 """Return sets of state and attribute ids to purge."""
256 attributes_ids = set()
257 for state_id, attributes_id
in session.execute(
260 state_ids.add(state_id)
262 attributes_ids.add(attributes_id)
264 "Selected %s state ids and %s attributes_ids to remove",
268 return state_ids, attributes_ids
272 session: Session, purge_before: datetime, max_bind_vars: int
273 ) -> tuple[set[int], set[int]]:
274 """Return sets of event and data ids to purge."""
277 for event_id, data_id
in session.execute(
280 event_ids.add(event_id)
282 data_ids.add(data_id)
284 "Selected %s event ids and %s data_ids to remove", len(event_ids), len(data_ids)
286 return event_ids, data_ids
292 attributes_ids: set[int],
293 database_engine: DatabaseEngine,
295 """Return a set of attributes ids that are not used by any states in the db."""
296 if not attributes_ids:
299 seen_ids: set[int] = set()
300 if not database_engine.optimizer.slow_range_in_select:
315 attributes_ids, instance.max_bind_vars
319 for state
in session.execute(
350 groups = [iter(attributes_ids)] * 100
351 for attr_ids
in zip_longest(*groups, fillvalue=
None):
354 for attrs_id
in session.execute(
357 if attrs_id[0]
is not None
359 to_remove = attributes_ids - seen_ids
361 "Selected %s shared attributes to remove",
370 attributes_ids_batch: set[int],
372 """Purge unused attributes ids."""
373 database_engine = instance.database_engine
374 assert database_engine
is not None
376 instance, session, attributes_ids_batch, database_engine
385 database_engine: DatabaseEngine,
387 """Return a set of event data ids that are not used by any events in the db."""
391 seen_ids: set[int] = set()
394 if not database_engine.optimizer.slow_range_in_select:
395 for data_ids_chunk
in chunked_or_all(data_ids, instance.max_bind_vars):
398 for state
in session.execute(
403 groups = [iter(data_ids)] * 100
404 for data_ids_group
in zip_longest(*groups, fillvalue=
None):
407 for data_id
in session.execute(
410 if data_id[0]
is not None
412 to_remove = data_ids - seen_ids
413 _LOGGER.debug(
"Selected %s shared event data to remove", len(to_remove))
418 instance: Recorder, session: Session, data_ids_batch: set[int]
420 database_engine = instance.database_engine
421 assert database_engine
is not None
423 instance, session, data_ids_batch, database_engine
429 session: Session, purge_before: datetime, max_bind_vars: int
431 """Return a list of statistic runs to purge.
433 Takes care to keep the newest run.
435 statistic_runs = session.execute(
438 statistic_runs_list = [run_id
for (run_id,)
in statistic_runs]
442 )
and last_run
in statistic_runs_list:
443 statistic_runs_list.remove(last_run)
445 _LOGGER.debug(
"Selected %s statistic runs to remove", len(statistic_runs))
446 return statistic_runs_list
450 session: Session, purge_before: datetime, max_bind_vars: int
452 """Return a list of short term statistics to purge."""
453 statistics = session.execute(
456 _LOGGER.debug(
"Selected %s short term statistics to remove", len(statistics))
457 return [statistic_id
for (statistic_id,)
in statistics]
461 session: Session, purge_before: datetime, max_bind_vars: int
462 ) -> tuple[set[int], set[int]]:
463 """Return a list of state, and attribute ids to purge.
465 We do not link these anymore since state_change events
466 do not exist in the events table anymore, however we
467 still need to be able to purge them.
469 states = session.execute(
471 purge_before.timestamp(), max_bind_vars
474 _LOGGER.debug(
"Selected %s state ids to remove", len(states))
476 attributes_ids = set()
477 for state_id, attributes_id
in states:
479 state_ids.add(state_id)
481 attributes_ids.add(attributes_id)
482 return state_ids, attributes_ids
486 session: Session, purge_before: datetime, max_bind_vars: int
487 ) -> tuple[set[int], set[int], set[int], set[int]]:
488 """Return a list of event, state, and attribute ids to purge linked by the event_id.
490 We do not link these anymore since state_change events
491 do not exist in the events table anymore, however we
492 still need to be able to purge them.
494 events = session.execute(
496 purge_before.timestamp(), max_bind_vars
499 _LOGGER.debug(
"Selected %s event ids to remove", len(events))
502 attributes_ids = set()
504 for event_id, data_id, state_id, attributes_id
in events:
505 event_ids.add(event_id)
507 state_ids.add(state_id)
509 attributes_ids.add(attributes_id)
511 data_ids.add(data_id)
512 return event_ids, state_ids, attributes_ids, data_ids
516 """Disconnect states and delete by state id."""
525 _LOGGER.debug(
"Updated %s states to remove old_state_id", disconnected_rows)
528 _LOGGER.debug(
"Deleted %s states", deleted_rows)
531 instance.states_manager.evict_purged_state_ids(state_ids)
535 instance: Recorder, session: Session, attributes_ids: set[int]
537 """Delete old attributes ids in batches of max_bind_vars."""
538 for attributes_ids_chunk
in chunked_or_all(attributes_ids, instance.max_bind_vars):
539 deleted_rows = session.execute(
542 _LOGGER.debug(
"Deleted %s attribute states", deleted_rows)
545 instance.state_attributes_manager.evict_purged(attributes_ids)
549 instance: Recorder, session: Session, data_ids: set[int]
551 """Delete old event data ids in batches of max_bind_vars."""
552 for data_ids_chunk
in chunked_or_all(data_ids, instance.max_bind_vars):
554 _LOGGER.debug(
"Deleted %s data events", deleted_rows)
557 instance.event_data_manager.evict_purged(data_ids)
561 """Delete by run_id."""
563 _LOGGER.debug(
"Deleted %s statistic runs", deleted_rows)
567 session: Session, short_term_statistics: list[int]
570 deleted_rows = session.execute(
573 _LOGGER.debug(
"Deleted %s short term statistics", deleted_rows)
577 """Delete by event id."""
581 _LOGGER.debug(
"Deleted %s events", deleted_rows)
585 instance: Recorder, session: Session, purge_before: datetime
587 """Purge all old recorder runs."""
589 deleted_rows = session.execute(
591 purge_before, instance.recorder_runs_manager.current.run_id
594 _LOGGER.debug(
"Deleted %s recorder_runs", deleted_rows)
598 """Purge all old event types."""
600 purge_event_types = set()
601 event_type_ids = set()
603 purge_event_types.add(event_type)
604 event_type_ids.add(event_type_id)
606 if not event_type_ids:
610 _LOGGER.debug(
"Deleted %s event types", deleted_rows)
613 instance.event_type_manager.evict_purged(purge_event_types)
617 """Purge all old entity_ids."""
619 purge_entity_ids = set()
620 states_metadata_ids = set()
622 purge_entity_ids.add(entity_id)
623 states_metadata_ids.add(metadata_id)
625 if not states_metadata_ids:
629 _LOGGER.debug(
"Deleted %s states meta", deleted_rows)
632 instance.states_meta_manager.evict_purged(purge_entity_ids)
633 instance.states_manager.evict_purged_entity_ids(purge_entity_ids)
637 """Remove filtered states and events that shouldn't be in the database.
639 Returns true if all states and events are purged.
641 _LOGGER.debug(
"Cleanup filtered data")
642 database_engine = instance.database_engine
643 assert database_engine
is not None
644 now_timestamp = time.time()
647 entity_filter = instance.entity_filter
648 has_more_to_purge =
False
649 excluded_metadata_ids: list[str] = [
651 for (metadata_id, entity_id)
in session.query(
652 StatesMeta.metadata_id, StatesMeta.entity_id
654 if entity_filter
and not entity_filter(entity_id)
656 if excluded_metadata_ids:
658 instance, session, excluded_metadata_ids, database_engine, now_timestamp
663 event_type_to_event_type_ids := instance.event_type_manager.get_many(
664 instance.exclude_event_types, session
667 excluded_event_type_ids := [
669 for event_type_id
in event_type_to_event_type_ids.values()
670 if event_type_id
is not None
674 instance, session, excluded_event_type_ids, now_timestamp
678 return not has_more_to_purge
684 metadata_ids_to_purge: list[str],
685 database_engine: DatabaseEngine,
686 purge_before_timestamp: float,
688 """Remove filtered states and linked events.
690 Return true if all states are purged
692 state_ids: tuple[int, ...]
693 attributes_ids: tuple[int, ...]
694 event_ids: tuple[int, ...]
696 session.query(States.state_id, States.attributes_id, States.event_id)
697 .filter(States.metadata_id.in_(metadata_ids_to_purge))
698 .filter(States.last_updated_ts < purge_before_timestamp)
699 .limit(instance.max_bind_vars)
704 state_ids, attributes_ids, event_ids = zip(*to_purge, strict=
False)
705 filtered_event_ids = {id_
for id_
in event_ids
if id_
is not None}
707 "Selected %s state_ids to remove that should be filtered", len(state_ids)
717 {id_
for id_
in attributes_ids
if id_
is not None},
727 excluded_event_type_ids: list[int],
728 purge_before_timestamp: float,
730 """Remove filtered events and linked states.
732 Return true if all events are purged.
734 database_engine = instance.database_engine
735 assert database_engine
is not None
737 session.query(Events.event_id, Events.data_id)
738 .filter(Events.event_type_id.in_(excluded_event_type_ids))
739 .filter(Events.time_fired_ts < purge_before_timestamp)
740 .limit(instance.max_bind_vars)
745 event_ids, data_ids = zip(*to_purge, strict=
False)
746 event_ids_set = set(event_ids)
748 "Selected %s event_ids to remove that should be filtered", len(event_ids_set)
751 instance.use_legacy_events_index
753 states := session.query(States.state_id)
754 .filter(States.event_id.in_(event_ids_set))
757 and (state_ids := {state_id
for (state_id,)
in states})
765 instance, session, set(data_ids), database_engine
771 @retryable_database_job("purge_entity_data")
774 entity_filter: Callable[[str], bool] |
None,
775 purge_before: datetime,
777 """Purge states and events of specified entities."""
778 database_engine = instance.database_engine
779 assert database_engine
is not None
780 purge_before_timestamp = purge_before.timestamp()
781 with session_scope(session=instance.get_session())
as session:
782 selected_metadata_ids: list[str] = [
784 for (metadata_id, entity_id)
in session.query(
785 StatesMeta.metadata_id, StatesMeta.entity_id
787 if entity_filter
and entity_filter(entity_id)
789 _LOGGER.debug(
"Purging entity data for %s", selected_metadata_ids)
790 if not selected_metadata_ids:
798 selected_metadata_ids,
800 purge_before_timestamp,
802 _LOGGER.debug(
"Purging entity data hasn't fully completed yet")
bool _purge_filtered_events(Recorder instance, Session session, list[int] excluded_event_type_ids, float purge_before_timestamp)
None _purge_short_term_statistics(Session session, list[int] short_term_statistics)
tuple[set[int], set[int]] _select_state_attributes_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
tuple[set[int], set[int]] _select_legacy_detached_state_and_attributes_and_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
tuple[set[int], set[int]] _select_event_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
bool _purging_legacy_format(Session session)
bool _purge_filtered_states(Recorder instance, Session session, list[str] metadata_ids_to_purge, DatabaseEngine database_engine, float purge_before_timestamp)
bool purge_old_data(Recorder instance, datetime purge_before, bool repack, bool apply_filter=False, int events_batch_size=DEFAULT_EVENTS_BATCHES_PER_PURGE, int states_batch_size=DEFAULT_STATES_BATCHES_PER_PURGE)
None _purge_unused_attributes_ids(Recorder instance, Session session, set[int] attributes_ids_batch)
None _purge_unused_data_ids(Recorder instance, Session session, set[int] data_ids_batch)
None _purge_state_ids(Recorder instance, Session session, set[int] state_ids)
bool purge_entity_data(Recorder instance, Callable[[str], bool]|None entity_filter, datetime purge_before)
None _purge_event_ids(Session session, set[int] event_ids)
bool _purge_events_and_data_ids(Recorder instance, Session session, int events_batch_size, datetime purge_before)
bool _purge_filtered_data(Recorder instance, Session session)
bool _purge_states_and_attributes_ids(Recorder instance, Session session, int states_batch_size, datetime purge_before)
None _purge_old_recorder_runs(Recorder instance, Session session, datetime purge_before)
None _purge_old_event_types(Recorder instance, Session session)
None _purge_statistics_runs(Session session, list[int] statistics_runs)
None _purge_batch_data_ids(Recorder instance, Session session, set[int] data_ids)
None _purge_batch_attributes_ids(Recorder instance, Session session, set[int] attributes_ids)
set[int] _select_unused_event_data_ids(Recorder instance, Session session, set[int] data_ids, DatabaseEngine database_engine)
bool _purge_legacy_format(Recorder instance, Session session, datetime purge_before)
tuple[set[int], set[int], set[int], set[int]] _select_legacy_event_state_and_attributes_and_data_ids_to_purge(Session session, datetime purge_before, int max_bind_vars)
list[int] _select_statistics_runs_to_purge(Session session, datetime purge_before, int max_bind_vars)
set[int] _select_unused_attributes_ids(Recorder instance, Session session, set[int] attributes_ids, DatabaseEngine database_engine)
list[int] _select_short_term_statistics_to_purge(Session session, datetime purge_before, int max_bind_vars)
None _purge_old_entity_ids(Recorder instance, Session session)
StatementLambdaElement delete_states_meta_rows(Iterable[int] metadata_ids)
StatementLambdaElement find_legacy_detached_states_and_attributes_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_short_term_statistics_to_purge(datetime purge_before, int max_bind_vars)
StatementLambdaElement attributes_ids_exist_in_states(int attr1, int|None attr2, int|None attr3, int|None attr4, int|None attr5, int|None attr6, int|None attr7, int|None attr8, int|None attr9, int|None attr10, int|None attr11, int|None attr12, int|None attr13, int|None attr14, int|None attr15, int|None attr16, int|None attr17, int|None attr18, int|None attr19, int|None attr20, int|None attr21, int|None attr22, int|None attr23, int|None attr24, int|None attr25, int|None attr26, int|None attr27, int|None attr28, int|None attr29, int|None attr30, int|None attr31, int|None attr32, int|None attr33, int|None attr34, int|None attr35, int|None attr36, int|None attr37, int|None attr38, int|None attr39, int|None attr40, int|None attr41, int|None attr42, int|None attr43, int|None attr44, int|None attr45, int|None attr46, int|None attr47, int|None attr48, int|None attr49, int|None attr50, int|None attr51, int|None attr52, int|None attr53, int|None attr54, int|None attr55, int|None attr56, int|None attr57, int|None attr58, int|None attr59, int|None attr60, int|None attr61, int|None attr62, int|None attr63, int|None attr64, int|None attr65, int|None attr66, int|None attr67, int|None attr68, int|None attr69, int|None attr70, int|None attr71, int|None attr72, int|None attr73, int|None attr74, int|None attr75, int|None attr76, int|None attr77, int|None attr78, int|None attr79, int|None attr80, int|None attr81, int|None attr82, int|None attr83, int|None attr84, int|None attr85, int|None attr86, int|None attr87, int|None attr88, int|None attr89, int|None attr90, int|None attr91, int|None attr92, int|None attr93, int|None attr94, int|None attr95, int|None attr96, int|None attr97, int|None attr98, int|None attr99, int|None attr100)
StatementLambdaElement delete_recorder_runs_rows(datetime purge_before, int current_run_id)
StatementLambdaElement find_legacy_event_state_and_attributes_and_data_ids_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_latest_statistics_runs_run_id()
StatementLambdaElement delete_event_data_rows(Iterable[int] data_ids)
StatementLambdaElement delete_event_rows(Iterable[int] event_ids)
StatementLambdaElement find_statistics_runs_to_purge(datetime purge_before, int max_bind_vars)
StatementLambdaElement delete_event_types_rows(Iterable[int] event_type_ids)
StatementLambdaElement find_events_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement delete_statistics_runs_rows(Iterable[int] statistics_runs)
StatementLambdaElement delete_states_rows(Iterable[int] state_ids)
StatementLambdaElement delete_states_attributes_rows(Iterable[int] attributes_ids)
StatementLambdaElement find_states_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_legacy_row()
StatementLambdaElement find_event_types_to_purge()
StatementLambdaElement attributes_ids_exist_in_states_with_fast_in_distinct(Iterable[int] attributes_ids)
StatementLambdaElement disconnect_states_rows(Iterable[int] state_ids)
StatementLambdaElement data_ids_exist_in_events_with_fast_in_distinct(Iterable[int] data_ids)
StatementLambdaElement delete_statistics_short_term_rows(Iterable[int] short_term_statistics)
StatementLambdaElement data_ids_exist_in_events(int id1, int|None id2, int|None id3, int|None id4, int|None id5, int|None id6, int|None id7, int|None id8, int|None id9, int|None id10, int|None id11, int|None id12, int|None id13, int|None id14, int|None id15, int|None id16, int|None id17, int|None id18, int|None id19, int|None id20, int|None id21, int|None id22, int|None id23, int|None id24, int|None id25, int|None id26, int|None id27, int|None id28, int|None id29, int|None id30, int|None id31, int|None id32, int|None id33, int|None id34, int|None id35, int|None id36, int|None id37, int|None id38, int|None id39, int|None id40, int|None id41, int|None id42, int|None id43, int|None id44, int|None id45, int|None id46, int|None id47, int|None id48, int|None id49, int|None id50, int|None id51, int|None id52, int|None id53, int|None id54, int|None id55, int|None id56, int|None id57, int|None id58, int|None id59, int|None id60, int|None id61, int|None id62, int|None id63, int|None id64, int|None id65, int|None id66, int|None id67, int|None id68, int|None id69, int|None id70, int|None id71, int|None id72, int|None id73, int|None id74, int|None id75, int|None id76, int|None id77, int|None id78, int|None id79, int|None id80, int|None id81, int|None id82, int|None id83, int|None id84, int|None id85, int|None id86, int|None id87, int|None id88, int|None id89, int|None id90, int|None id91, int|None id92, int|None id93, int|None id94, int|None id95, int|None id96, int|None id97, int|None id98, int|None id99, int|None id100)
StatementLambdaElement find_entity_ids_to_purge()
None repack_database(Recorder instance)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Iterable[Any] chunked_or_all(Collection[Any] iterable, int chunked_num)