1 """Provide pre-made queries on top of the recorder component."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Iterable, Iterator
6 from datetime
import datetime
7 from itertools
import groupby
8 from operator
import itemgetter
9 from typing
import Any, cast
11 from sqlalchemy
import (
22 from sqlalchemy.engine.row
import Row
23 from sqlalchemy.orm.session
import Session
30 from ..const
import LAST_REPORTED_SCHEMA_VERSION
31 from ..db_schema
import SHARED_ATTR_OR_LEGACY_ATTRIBUTES, StateAttributes, States
32 from ..filters
import Filters
33 from ..models
import (
35 datetime_to_timestamp_or_none,
37 row_to_compressed_state,
39 from ..util
import execute_stmt_lambda_element, session_scope
42 NEED_ATTRIBUTE_DOMAINS,
56 include_last_changed: bool,
57 include_last_reported: bool,
59 """Return the statement and if StateAttributes should be joined."""
60 _select = select(States.metadata_id, States.state, States.last_updated_ts)
61 if include_last_changed:
62 _select = _select.add_columns(States.last_changed_ts)
63 if include_last_reported:
64 _select = _select.add_columns(States.last_reported_ts)
66 _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES)
72 include_last_changed: bool,
73 include_last_reported: bool,
75 """Return the statement and if StateAttributes should be joined."""
76 _select = select(States.metadata_id, States.state)
77 _select = _select.add_columns(literal(value=0).label(
"last_updated_ts"))
78 if include_last_changed:
79 _select = _select.add_columns(literal(value=0).label(
"last_changed_ts"))
80 if include_last_reported:
81 _select = _select.add_columns(literal(value=0).label(
"last_reported_ts"))
83 _select = _select.add_columns(SHARED_ATTR_OR_LEGACY_ATTRIBUTES)
88 subquery: Subquery | CompoundSelect,
90 include_last_changed: bool,
91 include_last_reported: bool,
93 """Return the statement to select from the union."""
95 subquery.c.metadata_id,
97 subquery.c.last_updated_ts,
99 if include_last_changed:
100 base_select = base_select.add_columns(subquery.c.last_changed_ts)
101 if include_last_reported:
102 base_select = base_select.add_columns(subquery.c.last_reported_ts)
105 return base_select.add_columns(subquery.c.attributes)
110 start_time: datetime,
111 end_time: datetime |
None =
None,
112 entity_ids: list[str] |
None =
None,
113 filters: Filters |
None =
None,
114 include_start_time_state: bool =
True,
115 significant_changes_only: bool =
True,
116 minimal_response: bool =
False,
117 no_attributes: bool =
False,
118 compressed_state_format: bool =
False,
119 ) -> dict[str, list[State | dict[str, Any]]]:
120 """Wrap get_significant_states_with_session with an sql session."""
129 include_start_time_state,
130 significant_changes_only,
133 compressed_state_format,
138 start_time_ts: float,
139 end_time_ts: float |
None,
140 single_metadata_id: int |
None,
141 metadata_ids: list[int],
142 metadata_ids_in_significant_domains: list[int],
143 significant_changes_only: bool,
145 include_start_time_state: bool,
146 run_start_ts: float |
None,
147 ) -> Select | CompoundSelect:
148 """Query the database for significant state changes."""
149 include_last_changed =
not significant_changes_only
151 if significant_changes_only:
155 if metadata_ids_in_significant_domains:
157 States.metadata_id.in_(metadata_ids_in_significant_domains)
158 | (States.last_changed_ts == States.last_updated_ts)
159 | States.last_changed_ts.is_(
None)
163 (States.last_changed_ts == States.last_updated_ts)
164 | States.last_changed_ts.is_(
None)
166 stmt = stmt.filter(States.metadata_id.in_(metadata_ids)).filter(
167 States.last_updated_ts > start_time_ts
170 stmt = stmt.filter(States.last_updated_ts < end_time_ts)
171 if not no_attributes:
172 stmt = stmt.outerjoin(
173 StateAttributes, States.attributes_id == StateAttributes.attributes_id
175 if not include_start_time_state
or not run_start_ts:
176 return stmt.order_by(States.metadata_id, States.last_updated_ts)
177 unioned_subquery = union_all(
185 include_last_changed,
188 include_last_changed,
192 stmt.subquery(), no_attributes, include_last_changed,
False
198 include_last_changed,
200 ).order_by(unioned_subquery.c.metadata_id, unioned_subquery.c.last_updated_ts)
206 start_time: datetime,
207 end_time: datetime |
None =
None,
208 entity_ids: list[str] |
None =
None,
209 filters: Filters |
None =
None,
210 include_start_time_state: bool =
True,
211 significant_changes_only: bool =
True,
212 minimal_response: bool =
False,
213 no_attributes: bool =
False,
214 compressed_state_format: bool =
False,
215 ) -> dict[str, list[State | dict[str, Any]]]:
216 """Return states changes during UTC period start_time - end_time.
218 entity_ids is an optional iterable of entities to include in the results.
220 filters is an optional SQLAlchemy filter which will be applied to the database
221 queries unless entity_ids is given, in which case its ignored.
223 Significant states are all states where there is a state change,
224 as well as all states from certain domains (for instance
225 thermostat so that we get current temperature in our graphs).
227 if filters
is not None:
228 raise NotImplementedError(
"Filters are no longer supported")
230 raise ValueError(
"entity_ids must be provided")
231 entity_id_to_metadata_id: dict[str, int |
None] |
None =
None
232 metadata_ids_in_significant_domains: list[int] = []
235 entity_id_to_metadata_id := instance.states_meta_manager.get_many(
236 entity_ids, session,
False
240 metadata_ids = possible_metadata_ids
241 if significant_changes_only:
242 metadata_ids_in_significant_domains = [
244 for entity_id, metadata_id
in entity_id_to_metadata_id.items()
245 if metadata_id
is not None
248 oldest_ts: float |
None =
None
249 if include_start_time_state
and not (
252 include_start_time_state =
False
253 start_time_ts = dt_util.utc_to_timestamp(start_time)
255 single_metadata_id = metadata_ids[0]
if len(metadata_ids) == 1
else None
262 metadata_ids_in_significant_domains,
263 significant_changes_only,
265 include_start_time_state,
269 bool(single_metadata_id),
270 bool(metadata_ids_in_significant_domains),
272 significant_changes_only,
274 include_start_time_state,
279 start_time_ts
if include_start_time_state
else None,
281 entity_id_to_metadata_id,
283 compressed_state_format,
284 no_attributes=no_attributes,
291 start_time: datetime,
292 end_time: datetime |
None =
None,
293 entity_ids: list[str] |
None =
None,
294 filters: Filters |
None =
None,
295 include_start_time_state: bool =
True,
296 significant_changes_only: bool =
True,
297 no_attributes: bool =
False,
298 ) -> dict[str, list[State]]:
299 """Variant of get_significant_states_with_session.
301 Difference with get_significant_states_with_session is that it does not
302 return minimal responses.
305 dict[str, list[State]],
309 start_time=start_time,
311 entity_ids=entity_ids,
313 include_start_time_state=include_start_time_state,
314 significant_changes_only=significant_changes_only,
315 minimal_response=
False,
316 no_attributes=no_attributes,
322 start_time_ts: float,
323 end_time_ts: float |
None,
324 single_metadata_id: int,
327 include_start_time_state: bool,
328 run_start_ts: float |
None,
329 include_last_reported: bool,
330 ) -> Select | CompoundSelect:
335 (States.last_changed_ts == States.last_updated_ts)
336 | States.last_changed_ts.is_(
None)
338 & (States.last_updated_ts > start_time_ts)
340 .filter(States.metadata_id == single_metadata_id)
343 stmt = stmt.filter(States.last_updated_ts < end_time_ts)
344 if not no_attributes:
345 stmt = stmt.outerjoin(
346 StateAttributes, States.attributes_id == StateAttributes.attributes_id
349 stmt = stmt.limit(limit)
350 stmt = stmt.order_by(
352 States.last_updated_ts,
354 if not include_start_time_state
or not run_start_ts:
364 include_last_reported,
368 include_last_reported,
374 include_last_reported,
379 include_last_reported,
385 start_time: datetime,
386 end_time: datetime |
None =
None,
387 entity_id: str |
None =
None,
388 no_attributes: bool =
False,
389 descending: bool =
False,
390 limit: int |
None =
None,
391 include_start_time_state: bool =
True,
392 ) -> dict[str, list[State]]:
393 """Return states changes during UTC period start_time - end_time."""
394 has_last_reported = (
395 get_instance(hass).schema_version >= LAST_REPORTED_SCHEMA_VERSION
398 raise ValueError(
"entity_id must be provided")
399 entity_ids = [entity_id.lower()]
404 possible_metadata_id := instance.states_meta_manager.get(
405 entity_id, session,
False
409 single_metadata_id = possible_metadata_id
410 entity_id_to_metadata_id: dict[str, int |
None] = {
411 entity_id: single_metadata_id
413 oldest_ts: float |
None =
None
414 if include_start_time_state
and not (
417 include_start_time_state =
False
418 start_time_ts = dt_util.utc_to_timestamp(start_time)
427 include_start_time_state,
435 include_start_time_state,
440 dict[str, list[State]],
443 session, stmt,
None, end_time, orm_rows=
False
445 start_time_ts
if include_start_time_state
else None,
447 entity_id_to_metadata_id,
448 descending=descending,
449 no_attributes=no_attributes,
459 lastest_state_for_metadata_id := (
461 States.metadata_id.label(
"max_metadata_id"),
462 func.max(States.last_updated_ts).label(
"max_last_updated"),
464 .filter(States.metadata_id == metadata_id)
465 .group_by(States.metadata_id)
470 States.metadata_id == lastest_state_for_metadata_id.c.max_metadata_id,
471 States.last_updated_ts
472 == lastest_state_for_metadata_id.c.max_last_updated,
476 StateAttributes, States.attributes_id == StateAttributes.attributes_id
478 .order_by(States.state_id.desc())
483 number_of_states: int, metadata_id: int, include_last_reported: bool
490 select(States.state_id)
491 .filter(States.metadata_id == metadata_id)
492 .order_by(States.last_updated_ts.desc())
493 .limit(number_of_states)
498 StateAttributes, States.attributes_id == StateAttributes.attributes_id
500 .order_by(States.state_id.desc())
505 hass: HomeAssistant, number_of_states: int, entity_id: str
506 ) -> dict[str, list[State]]:
507 """Return the last number_of_states."""
508 has_last_reported = (
509 get_instance(hass).schema_version >= LAST_REPORTED_SCHEMA_VERSION
511 entity_id_lower = entity_id.lower()
512 entity_ids = [entity_id_lower]
521 possible_metadata_id := instance.states_meta_manager.get(
522 entity_id, session,
False
526 metadata_id = possible_metadata_id
527 entity_id_to_metadata_id: dict[str, int |
None] = {entity_id_lower: metadata_id}
528 if number_of_states == 1:
535 number_of_states, metadata_id, has_last_reported
537 track_on=[has_last_reported],
541 dict[str, list[State]],
546 entity_id_to_metadata_id,
555 metadata_ids: list[int],
557 include_last_changed: bool,
559 """Baked query to get states for specific entities."""
564 no_attributes, include_last_changed,
False
568 most_recent_states_for_entities_by_date := (
570 States.metadata_id.label(
"max_metadata_id"),
571 func.max(States.last_updated_ts).label(
"max_last_updated"),
574 (States.last_updated_ts >= run_start_ts)
575 & (States.last_updated_ts < epoch_time)
576 & States.metadata_id.in_(metadata_ids)
578 .group_by(States.metadata_id)
584 == most_recent_states_for_entities_by_date.c.max_metadata_id,
585 States.last_updated_ts
586 == most_recent_states_for_entities_by_date.c.max_last_updated,
590 (States.last_updated_ts >= run_start_ts)
591 & (States.last_updated_ts < epoch_time)
592 & States.metadata_id.in_(metadata_ids)
597 return stmt.outerjoin(
598 StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
603 hass: HomeAssistant, utc_point_in_time: datetime
605 """Return the oldest possible timestamp.
607 Returns None if there are no states as old as utc_point_in_time.
611 if oldest_ts
is not None and oldest_ts < utc_point_in_time.timestamp():
619 single_metadata_id: int |
None,
620 metadata_ids: list[int],
622 include_last_changed: bool,
624 """Return the states at a specific point in time."""
625 if single_metadata_id:
632 include_last_changed,
642 include_last_changed,
650 include_last_changed: bool,
651 include_last_reported: bool,
657 no_attributes, include_last_changed, include_last_reported
660 States.last_updated_ts < epoch_time,
661 States.metadata_id == metadata_id,
663 .order_by(States.last_updated_ts.desc())
668 return stmt.outerjoin(
669 StateAttributes, States.attributes_id == StateAttributes.attributes_id
674 states: Iterable[Row],
675 start_time_ts: float |
None,
676 entity_ids: list[str],
677 entity_id_to_metadata_id: dict[str, int |
None],
678 minimal_response: bool =
False,
679 compressed_state_format: bool =
False,
680 descending: bool =
False,
681 no_attributes: bool =
False,
682 ) -> dict[str, list[State | dict[str, Any]]]:
683 """Convert SQL results into JSON friendly data structure.
685 This takes our state list and turns it into a JSON friendly data
686 structure {'entity_id': [list of states], 'entity_id2': [list of states]}
688 States must be sorted by entity_id and last_updated
690 We also need to go back and create a synthetic zero data point for
691 each list of states, otherwise our graphs won't start on the Y
694 field_map = _FIELD_MAP
695 state_class: Callable[
696 [Row, dict[str, dict[str, Any]], float |
None, str, str, float |
None, bool],
697 State | dict[str, Any],
699 if compressed_state_format:
700 state_class = row_to_compressed_state
701 attr_time = COMPRESSED_STATE_LAST_UPDATED
702 attr_state = COMPRESSED_STATE_STATE
704 state_class = LazyState
705 attr_time = LAST_CHANGED_KEY
706 attr_state = STATE_KEY
709 result: dict[str, list[State | dict[str, Any]]] = {
710 entity_id: []
for entity_id
in entity_ids
712 metadata_id_to_entity_id: dict[int, str] = {}
713 metadata_id_to_entity_id = {
714 v: k
for k, v
in entity_id_to_metadata_id.items()
if v
is not None
717 if len(entity_ids) == 1:
718 metadata_id = entity_id_to_metadata_id[entity_ids[0]]
719 assert metadata_id
is not None
720 states_iter: Iterable[tuple[int, Iterator[Row]]] = (
721 (metadata_id, iter(states)),
724 key_func = itemgetter(field_map[
"metadata_id"])
725 states_iter = groupby(states, key_func)
727 state_idx = field_map[
"state"]
728 last_updated_ts_idx = field_map[
"last_updated_ts"]
731 for metadata_id, group
in states_iter:
732 entity_id = metadata_id_to_entity_id[metadata_id]
733 attr_cache: dict[str, dict[str, Any]] = {}
734 ent_results = result[entity_id]
747 db_state[last_updated_ts_idx],
750 for db_state
in group
755 prev_state: str |
None =
None
761 if (first_state := next(group,
None))
is None:
763 prev_state = first_state[state_idx]
771 first_state[last_updated_ts_idx],
783 if compressed_state_format:
788 attr_state: (prev_state := state),
789 attr_time: row[last_updated_ts_idx],
792 if (state := row[state_idx]) != prev_state
798 _utc_from_timestamp = dt_util.utc_from_timestamp
802 attr_state: (prev_state := state),
803 attr_time: _utc_from_timestamp(
804 row[last_updated_ts_idx]
808 if (state := row[state_idx]) != prev_state
813 for ent_results
in result.values():
814 ent_results.reverse()
817 return {key: val
for key, val
in result.items()
if val}
float|None _get_oldest_possible_ts(HomeAssistant hass, datetime utc_point_in_time)
dict[str, list[State|dict[str, Any]]] get_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
dict[str, list[State]] get_full_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool no_attributes=False)
Select _stmt_and_join_attributes(bool no_attributes, bool include_last_changed, bool include_last_reported)
Select _stmt_and_join_attributes_for_start_state(bool no_attributes, bool include_last_changed, bool include_last_reported)
dict[str, list[State|dict[str, Any]]] _sorted_states_to_dict(Iterable[Row] states, float|None start_time_ts, list[str] entity_ids, dict[str, int|None] entity_id_to_metadata_id, bool minimal_response=False, bool compressed_state_format=False, bool descending=False, bool no_attributes=False)
Select _select_from_subquery(Subquery|CompoundSelect subquery, bool no_attributes, bool include_last_changed, bool include_last_reported)
Select _get_last_state_changes_multiple_stmt(int number_of_states, int metadata_id, bool include_last_reported)
Select _get_single_entity_start_time_stmt(float epoch_time, int metadata_id, bool no_attributes, bool include_last_changed, bool include_last_reported)
Select|CompoundSelect _significant_states_stmt(float start_time_ts, float|None end_time_ts, int|None single_metadata_id, list[int] metadata_ids, list[int] metadata_ids_in_significant_domains, bool significant_changes_only, bool no_attributes, bool include_start_time_state, float|None run_start_ts)
Select _get_start_time_state_for_entities_stmt(float run_start_ts, float epoch_time, list[int] metadata_ids, bool no_attributes, bool include_last_changed)
dict[str, list[State]] state_changes_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time=None, str|None entity_id=None, bool no_attributes=False, bool descending=False, int|None limit=None, bool include_start_time_state=True)
dict[str, list[State|dict[str, Any]]] get_significant_states(HomeAssistant hass, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
Select _get_start_time_state_stmt(float run_start_ts, float epoch_time, int|None single_metadata_id, list[int] metadata_ids, bool no_attributes, bool include_last_changed)
Select|CompoundSelect _state_changed_during_period_stmt(float start_time_ts, float|None end_time_ts, int single_metadata_id, bool no_attributes, int|None limit, bool include_start_time_state, float|None run_start_ts, bool include_last_reported)
Select _get_last_state_changes_single_stmt(int metadata_id)
dict[str, list[State]] get_last_state_changes(HomeAssistant hass, int number_of_states, str entity_id)
list[int] extract_metadata_ids(dict[str, int|None] entity_id_to_metadata_id)
float|None datetime_to_timestamp_or_none(datetime|None dt)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
tuple[str, str] split_entity_id(str entity_id)
Recorder get_instance(HomeAssistant hass)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)