1 """Queries for the recorder."""
3 from __future__
import annotations
5 from collections.abc
import Iterable
6 from datetime
import datetime
8 from sqlalchemy
import delete, distinct, func, lambda_stmt, select, union_all, update
9 from sqlalchemy.sql.lambdas
import StatementLambdaElement
10 from sqlalchemy.sql.selectable
import Select
12 from .db_schema
import (
28 """Generate a select for event type ids.
30 This query is intentionally not a lambda statement as it is used inside
31 other lambda statements.
33 return select(EventTypes.event_type_id).where(
34 EventTypes.event_type.in_(event_types)
39 """Load shared attributes from the database."""
42 StateAttributes.attributes_id, StateAttributes.shared_attrs
43 ).where(StateAttributes.hash.in_(hashes))
48 """Load shared event data from the database."""
50 lambda: select(EventData.data_id, EventData.shared_data).where(
51 EventData.hash.in_(hashes)
57 """Find an event_type id by event_type."""
59 lambda: select(EventTypes.event_type_id, EventTypes.event_type).filter(
60 EventTypes.event_type.in_(event_types)
66 """Find all metadata_ids and entity_ids."""
67 return lambda_stmt(
lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id))
71 """Find metadata_ids by entity_ids."""
73 lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).filter(
74 StatesMeta.entity_id.in_(entity_ids)
80 """Check if a state attributes id exists in the states table."""
81 return select(func.min(States.attributes_id)).where(States.attributes_id == attr)
85 attributes_ids: Iterable[int],
86 ) -> StatementLambdaElement:
87 """Find attributes ids that exist in the states table."""
89 lambda: select(distinct(States.attributes_id)).filter(
90 States.attributes_id.in_(attributes_ids)
196 ) -> StatementLambdaElement:
197 """Generate the find attributes select only once.
199 https://docs.sqlalchemy.org/en/14/core/connections.html#quick-guidelines-for-lambdas
308 data_ids: Iterable[int],
309 ) -> StatementLambdaElement:
310 """Find data ids that exist in the events table."""
312 lambda: select(distinct(Events.data_id)).filter(Events.data_id.in_(data_ids))
317 """Check if a event data id exists in the events table."""
318 return select(func.min(Events.data_id)).where(Events.data_id == data_id)
422 ) -> StatementLambdaElement:
423 """Generate the find event data select only once.
425 https://docs.sqlalchemy.org/en/14/core/connections.html#quick-guidelines-for-lambdas
534 """Disconnect states rows."""
537 .where(States.old_state_id.in_(state_ids))
538 .values(old_state_id=
None)
539 .execution_options(synchronize_session=
False)
544 """Delete states rows."""
547 .where(States.state_id.in_(state_ids))
548 .execution_options(synchronize_session=
False)
553 """Delete event_data rows."""
556 .where(EventData.data_id.in_(data_ids))
557 .execution_options(synchronize_session=
False)
562 attributes_ids: Iterable[int],
563 ) -> StatementLambdaElement:
564 """Delete states_attributes rows."""
566 lambda:
delete(StateAttributes)
567 .where(StateAttributes.attributes_id.in_(attributes_ids))
568 .execution_options(synchronize_session=
False)
573 statistics_runs: Iterable[int],
574 ) -> StatementLambdaElement:
575 """Delete statistics_runs rows."""
577 lambda:
delete(StatisticsRuns)
578 .where(StatisticsRuns.run_id.in_(statistics_runs))
579 .execution_options(synchronize_session=
False)
584 short_term_statistics: Iterable[int],
585 ) -> StatementLambdaElement:
586 """Delete statistics_short_term rows."""
588 lambda:
delete(StatisticsShortTerm)
589 .where(StatisticsShortTerm.id.in_(short_term_statistics))
590 .execution_options(synchronize_session=
False)
595 event_ids: Iterable[int],
596 ) -> StatementLambdaElement:
597 """Delete event rows."""
600 .where(Events.event_id.in_(event_ids))
601 .execution_options(synchronize_session=
False)
606 purge_before: datetime, current_run_id: int
607 ) -> StatementLambdaElement:
608 """Delete recorder_runs rows."""
610 lambda:
delete(RecorderRuns)
611 .filter(RecorderRuns.end.is_not(
None))
612 .filter(RecorderRuns.end < purge_before)
613 .filter(RecorderRuns.run_id != current_run_id)
614 .execution_options(synchronize_session=
False)
619 purge_before: float, max_bind_vars: int
620 ) -> StatementLambdaElement:
621 """Find events to purge."""
623 lambda: select(Events.event_id, Events.data_id)
624 .filter(Events.time_fired_ts < purge_before)
625 .limit(max_bind_vars)
630 purge_before: float, max_bind_vars: int
631 ) -> StatementLambdaElement:
632 """Find states to purge."""
634 lambda: select(States.state_id, States.attributes_id)
635 .filter(States.last_updated_ts < purge_before)
636 .limit(max_bind_vars)
641 """Find the last_updated_ts of the oldest state."""
643 lambda: select(States.last_updated_ts).where(
644 States.state_id.in_(select(func.min(States.state_id)))
650 purge_before: datetime, max_bind_vars: int
651 ) -> StatementLambdaElement:
652 """Find short term statistics to purge."""
653 purge_before_ts = purge_before.timestamp()
655 lambda: select(StatisticsShortTerm.id)
656 .filter(StatisticsShortTerm.start_ts < purge_before_ts)
657 .limit(max_bind_vars)
662 purge_before: datetime, max_bind_vars: int
663 ) -> StatementLambdaElement:
664 """Find statistics_runs to purge."""
666 lambda: select(StatisticsRuns.run_id)
667 .filter(StatisticsRuns.start < purge_before)
668 .limit(max_bind_vars)
673 """Find the latest statistics_runs run_id."""
674 return lambda_stmt(
lambda: select(func.max(StatisticsRuns.run_id)))
678 purge_before: float, max_bind_vars: int
679 ) -> StatementLambdaElement:
680 """Find the latest row in the legacy format to purge."""
683 Events.event_id, Events.data_id, States.state_id, States.attributes_id
685 .outerjoin(States, Events.event_id == States.event_id)
686 .filter(Events.time_fired_ts < purge_before)
687 .limit(max_bind_vars)
692 purge_before: float, max_bind_vars: int
693 ) -> StatementLambdaElement:
694 """Find states rows with event_id set but not linked event_id in Events."""
696 lambda: select(States.state_id, States.attributes_id)
697 .outerjoin(Events, States.event_id == Events.event_id)
698 .filter(States.event_id.isnot(
None))
700 (States.last_updated_ts < purge_before) | States.last_updated_ts.is_(
None)
702 .filter(Events.event_id.is_(
None))
703 .limit(max_bind_vars)
708 """Check if there are still states in the table with an event_id."""
709 return lambda_stmt(
lambda: select(func.max(States.event_id)))
713 """Find events context_ids to migrate."""
717 Events.time_fired_ts,
719 Events.context_user_id,
720 Events.context_parent_id,
722 .filter(Events.context_id_bin.is_(
None))
723 .limit(max_bind_vars)
728 """Find events event_type to migrate."""
734 .filter(Events.event_type_id.is_(
None))
735 .limit(max_bind_vars)
740 """Find entity_id to migrate."""
746 .filter(States.metadata_id.is_(
None))
747 .limit(max_bind_vars)
752 """Find entity_id to cleanup."""
758 select(States.state_id)
760 states_with_entity_ids := select(
761 States.state_id.label(
"state_id_with_entity_id")
763 .filter(States.entity_id.is_not(
None))
766 States.state_id == states_with_entity_ids.c.state_id_with_entity_id,
768 .alias(
"states_with_entity_ids")
772 .values(entity_id=
None)
777 """Check if there are used entity_ids in the states table."""
779 lambda: select(States.state_id).filter(States.entity_id.isnot(
None)).limit(1)
784 """Check if there are used event_ids in the states table."""
786 lambda: select(States.state_id).filter(States.event_id.isnot(
None)).limit(1)
791 """Check if there are events context ids to migrate."""
793 lambda: select(Events.event_id).filter(Events.context_id_bin.is_(
None)).limit(1)
798 """Check if there are states context ids to migrate."""
800 lambda: select(States.state_id).filter(States.context_id_bin.is_(
None)).limit(1)
805 """Check if there are event_types to migrate."""
807 lambda: select(Events.event_id).filter(Events.event_type_id.is_(
None)).limit(1)
812 """Check if there are entity_id to migrate."""
814 lambda: select(States.state_id).filter(States.metadata_id.is_(
None)).limit(1)
819 """Find events context_ids to migrate."""
823 States.last_updated_ts,
825 States.context_user_id,
826 States.context_parent_id,
828 .filter(States.context_id_bin.is_(
None))
829 .limit(max_bind_vars)
834 """Query the database for previous migration changes."""
836 lambda: select(MigrationChanges.migration_id, MigrationChanges.version)
841 """Find event_type_ids to purge."""
843 lambda: select(EventTypes.event_type_id, EventTypes.event_type).where(
844 EventTypes.event_type_id.not_in(
845 select(EventTypes.event_type_id).join(
846 used_event_type_ids := select(
847 distinct(Events.event_type_id).label(
"used_event_type_id")
849 EventTypes.event_type_id
850 == used_event_type_ids.c.used_event_type_id,
858 """Find entity_ids to purge."""
860 lambda: select(StatesMeta.metadata_id, StatesMeta.entity_id).where(
861 StatesMeta.metadata_id.not_in(
862 select(StatesMeta.metadata_id).join(
863 used_states_metadata_id := select(
864 distinct(States.metadata_id).label(
"used_states_metadata_id")
866 StatesMeta.metadata_id
867 == used_states_metadata_id.c.used_states_metadata_id,
875 """Delete EventTypes rows."""
877 lambda:
delete(EventTypes)
878 .where(EventTypes.event_type_id.in_(event_type_ids))
879 .execution_options(synchronize_session=
False)
884 """Delete StatesMeta rows."""
886 lambda:
delete(StatesMeta)
887 .where(StatesMeta.metadata_id.in_(metadata_ids))
888 .execution_options(synchronize_session=
False)
894 ) -> StatementLambdaElement:
895 """Find unmigrated short term statistics rows."""
898 StatisticsShortTerm.id,
899 StatisticsShortTerm.start,
900 StatisticsShortTerm.created,
901 StatisticsShortTerm.last_reset,
903 .filter(StatisticsShortTerm.start_ts.is_(
None))
904 .filter(StatisticsShortTerm.start.isnot(
None))
905 .limit(max_bind_vars)
910 """Find unmigrated statistics rows."""
913 Statistics.id, Statistics.start, Statistics.created, Statistics.last_reset
915 .filter(Statistics.start_ts.is_(
None))
916 .filter(Statistics.start.isnot(
None))
917 .limit(max_bind_vars)
923 start_ts: float |
None,
924 created_ts: float |
None,
925 last_reset_ts: float |
None,
926 ) -> StatementLambdaElement:
927 """Migrate a single short term statistics row to timestamp."""
929 lambda:
update(StatisticsShortTerm)
930 .where(StatisticsShortTerm.id == statistic_id)
934 created_ts=created_ts,
936 last_reset_ts=last_reset_ts,
939 .execution_options(synchronize_session=
False)
945 start_ts: float |
None,
946 created_ts: float |
None,
947 last_reset_ts: float |
None,
948 ) -> StatementLambdaElement:
949 """Migrate a single statistics row to timestamp."""
951 lambda:
update(Statistics)
952 .where(Statistics.id == statistic_id)
956 created_ts=created_ts,
958 last_reset_ts=last_reset_ts,
961 .execution_options(synchronize_session=
False)
967 ) -> StatementLambdaElement:
968 """Delete a single duplicate short term statistics row."""
970 lambda:
delete(StatisticsShortTerm)
971 .where(StatisticsShortTerm.id == statistic_id)
972 .execution_options(synchronize_session=
False)
977 """Delete a single duplicate statistics row."""
979 lambda:
delete(Statistics)
980 .where(Statistics.id == statistic_id)
981 .execution_options(synchronize_session=
False)
web.Response delete(self, web.Request request, str config_key)
IssData update(pyiss.ISS iss)
Select _state_attrs_exist(int|None attr)
StatementLambdaElement find_states_metadata_ids(Iterable[str] entity_ids)
StatementLambdaElement delete_states_meta_rows(Iterable[int] metadata_ids)
StatementLambdaElement find_oldest_state()
StatementLambdaElement get_shared_attributes(list[int] hashes)
StatementLambdaElement find_event_type_ids(Iterable[str] event_types)
StatementLambdaElement find_legacy_detached_states_and_attributes_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_short_term_statistics_to_purge(datetime purge_before, int max_bind_vars)
StatementLambdaElement attributes_ids_exist_in_states(int attr1, int|None attr2, int|None attr3, int|None attr4, int|None attr5, int|None attr6, int|None attr7, int|None attr8, int|None attr9, int|None attr10, int|None attr11, int|None attr12, int|None attr13, int|None attr14, int|None attr15, int|None attr16, int|None attr17, int|None attr18, int|None attr19, int|None attr20, int|None attr21, int|None attr22, int|None attr23, int|None attr24, int|None attr25, int|None attr26, int|None attr27, int|None attr28, int|None attr29, int|None attr30, int|None attr31, int|None attr32, int|None attr33, int|None attr34, int|None attr35, int|None attr36, int|None attr37, int|None attr38, int|None attr39, int|None attr40, int|None attr41, int|None attr42, int|None attr43, int|None attr44, int|None attr45, int|None attr46, int|None attr47, int|None attr48, int|None attr49, int|None attr50, int|None attr51, int|None attr52, int|None attr53, int|None attr54, int|None attr55, int|None attr56, int|None attr57, int|None attr58, int|None attr59, int|None attr60, int|None attr61, int|None attr62, int|None attr63, int|None attr64, int|None attr65, int|None attr66, int|None attr67, int|None attr68, int|None attr69, int|None attr70, int|None attr71, int|None attr72, int|None attr73, int|None attr74, int|None attr75, int|None attr76, int|None attr77, int|None attr78, int|None attr79, int|None attr80, int|None attr81, int|None attr82, int|None attr83, int|None attr84, int|None attr85, int|None attr86, int|None attr87, int|None attr88, int|None attr89, int|None attr90, int|None attr91, int|None attr92, int|None attr93, int|None attr94, int|None attr95, int|None attr96, int|None attr97, int|None attr98, int|None attr99, int|None attr100)
StatementLambdaElement delete_recorder_runs_rows(datetime purge_before, int current_run_id)
StatementLambdaElement migrate_single_statistics_row_to_timestamp(int statistic_id, float|None start_ts, float|None created_ts, float|None last_reset_ts)
StatementLambdaElement delete_duplicate_short_term_statistics_row(int statistic_id)
StatementLambdaElement migrate_single_short_term_statistics_row_to_timestamp(int statistic_id, float|None start_ts, float|None created_ts, float|None last_reset_ts)
StatementLambdaElement find_legacy_event_state_and_attributes_and_data_ids_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_latest_statistics_runs_run_id()
StatementLambdaElement find_all_states_metadata_ids()
StatementLambdaElement delete_event_data_rows(Iterable[int] data_ids)
StatementLambdaElement delete_event_rows(Iterable[int] event_ids)
StatementLambdaElement batch_cleanup_entity_ids()
StatementLambdaElement get_shared_event_datas(list[int] hashes)
StatementLambdaElement has_used_states_entity_ids()
StatementLambdaElement find_unmigrated_short_term_statistics_rows(int max_bind_vars)
StatementLambdaElement has_states_context_ids_to_migrate()
StatementLambdaElement has_used_states_event_ids()
StatementLambdaElement has_events_context_ids_to_migrate()
StatementLambdaElement find_statistics_runs_to_purge(datetime purge_before, int max_bind_vars)
StatementLambdaElement find_states_context_ids_to_migrate(int max_bind_vars)
StatementLambdaElement find_entity_ids_to_migrate(int max_bind_vars)
StatementLambdaElement find_events_context_ids_to_migrate(int max_bind_vars)
StatementLambdaElement delete_event_types_rows(Iterable[int] event_type_ids)
StatementLambdaElement find_events_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement delete_statistics_runs_rows(Iterable[int] statistics_runs)
StatementLambdaElement delete_states_rows(Iterable[int] state_ids)
StatementLambdaElement delete_states_attributes_rows(Iterable[int] attributes_ids)
Select _event_data_id_exist(int|None data_id)
StatementLambdaElement find_states_to_purge(float purge_before, int max_bind_vars)
StatementLambdaElement find_legacy_row()
Select select_event_type_ids(tuple[str,...] event_types)
StatementLambdaElement find_unmigrated_statistics_rows(int max_bind_vars)
StatementLambdaElement find_event_types_to_purge()
StatementLambdaElement has_entity_ids_to_migrate()
StatementLambdaElement attributes_ids_exist_in_states_with_fast_in_distinct(Iterable[int] attributes_ids)
StatementLambdaElement disconnect_states_rows(Iterable[int] state_ids)
StatementLambdaElement find_event_type_to_migrate(int max_bind_vars)
StatementLambdaElement data_ids_exist_in_events_with_fast_in_distinct(Iterable[int] data_ids)
StatementLambdaElement delete_statistics_short_term_rows(Iterable[int] short_term_statistics)
StatementLambdaElement delete_duplicate_statistics_row(int statistic_id)
StatementLambdaElement data_ids_exist_in_events(int id1, int|None id2, int|None id3, int|None id4, int|None id5, int|None id6, int|None id7, int|None id8, int|None id9, int|None id10, int|None id11, int|None id12, int|None id13, int|None id14, int|None id15, int|None id16, int|None id17, int|None id18, int|None id19, int|None id20, int|None id21, int|None id22, int|None id23, int|None id24, int|None id25, int|None id26, int|None id27, int|None id28, int|None id29, int|None id30, int|None id31, int|None id32, int|None id33, int|None id34, int|None id35, int|None id36, int|None id37, int|None id38, int|None id39, int|None id40, int|None id41, int|None id42, int|None id43, int|None id44, int|None id45, int|None id46, int|None id47, int|None id48, int|None id49, int|None id50, int|None id51, int|None id52, int|None id53, int|None id54, int|None id55, int|None id56, int|None id57, int|None id58, int|None id59, int|None id60, int|None id61, int|None id62, int|None id63, int|None id64, int|None id65, int|None id66, int|None id67, int|None id68, int|None id69, int|None id70, int|None id71, int|None id72, int|None id73, int|None id74, int|None id75, int|None id76, int|None id77, int|None id78, int|None id79, int|None id80, int|None id81, int|None id82, int|None id83, int|None id84, int|None id85, int|None id86, int|None id87, int|None id88, int|None id89, int|None id90, int|None id91, int|None id92, int|None id93, int|None id94, int|None id95, int|None id96, int|None id97, int|None id98, int|None id99, int|None id100)
StatementLambdaElement has_event_type_to_migrate()
StatementLambdaElement get_migration_changes()
StatementLambdaElement find_entity_ids_to_purge()