1 """Provide pre-made queries on top of the recorder component."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from collections.abc
import Callable, Iterable, Iterator
7 from datetime
import datetime
8 from itertools
import groupby
9 from operator
import attrgetter
11 from typing
import Any, cast
13 from sqlalchemy
import Column, Text, and_, func, lambda_stmt, or_, select
14 from sqlalchemy.engine.row
import Row
15 from sqlalchemy.orm.properties
import MappedColumn
16 from sqlalchemy.orm.session
import Session
17 from sqlalchemy.sql.expression
import literal
18 from sqlalchemy.sql.lambdas
import StatementLambdaElement
25 from ..db_schema
import StateAttributes, States
26 from ..filters
import Filters
27 from ..models
import process_timestamp_to_utc_isoformat
28 from ..models.legacy
import LegacyLazyState, legacy_row_to_compressed_state
29 from ..util
import execute_stmt_lambda_element, session_scope
32 NEED_ATTRIBUTE_DOMAINS,
34 SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE,
41 States.last_changed_ts,
42 States.last_updated_ts,
44 _BASE_STATES_NO_LAST_CHANGED = (
47 literal(value=
None).label(
"last_changed_ts"),
48 States.last_updated_ts,
50 _QUERY_STATE_NO_ATTR = (
52 literal(value=
None, type_=Text).label(
"attributes"),
53 literal(value=
None, type_=Text).label(
"shared_attrs"),
55 _QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = (
56 *_BASE_STATES_NO_LAST_CHANGED,
57 literal(value=
None, type_=Text).label(
"attributes"),
58 literal(value=
None, type_=Text).label(
"shared_attrs"),
60 _BASE_STATES_PRE_SCHEMA_31 = (
66 _BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
69 literal(value=
None, type_=Text).label(
"last_changed"),
72 _QUERY_STATE_NO_ATTR_PRE_SCHEMA_31 = (
73 *_BASE_STATES_PRE_SCHEMA_31,
74 literal(value=
None, type_=Text).label(
"attributes"),
75 literal(value=
None, type_=Text).label(
"shared_attrs"),
77 _QUERY_STATE_NO_ATTR_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
78 *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
79 literal(value=
None, type_=Text).label(
"attributes"),
80 literal(value=
None, type_=Text).label(
"shared_attrs"),
85 _QUERY_STATES_PRE_SCHEMA_25 = (
86 *_BASE_STATES_PRE_SCHEMA_31,
88 literal(value=
None, type_=Text).label(
"shared_attrs"),
90 _QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = (
91 *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
93 literal(value=
None, type_=Text).label(
"shared_attrs"),
95 _QUERY_STATES_PRE_SCHEMA_31 = (
96 *_BASE_STATES_PRE_SCHEMA_31,
99 StateAttributes.shared_attrs,
101 _QUERY_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
102 *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
105 StateAttributes.shared_attrs,
111 StateAttributes.shared_attrs,
113 _QUERY_STATES_NO_LAST_CHANGED = (
114 *_BASE_STATES_NO_LAST_CHANGED,
117 StateAttributes.shared_attrs,
120 cast(MappedColumn, field).name: idx
121 for idx, field
in enumerate(_QUERY_STATE_NO_ATTR)
123 _FIELD_MAP_PRE_SCHEMA_31 = {
124 cast(MappedColumn, field).name: idx
125 for idx, field
in enumerate(_QUERY_STATES_PRE_SCHEMA_31)
130 no_attributes: bool, include_last_changed: bool =
True
131 ) -> tuple[StatementLambdaElement, bool]:
132 """Return the lambda_stmt and if StateAttributes should be joined.
134 Because these are lambda_stmt the values inside the lambdas need
135 to be explicitly written out to avoid caching the wrong values.
141 if include_last_changed:
143 lambda_stmt(
lambda: select(*_QUERY_STATE_NO_ATTR)),
147 lambda_stmt(
lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
151 if include_last_changed:
152 return lambda_stmt(
lambda: select(*_QUERY_STATES)),
True
153 return lambda_stmt(
lambda: select(*_QUERY_STATES_NO_LAST_CHANGED)),
True
158 start_time: datetime,
159 end_time: datetime |
None =
None,
160 entity_ids: list[str] |
None =
None,
161 filters: Filters |
None =
None,
162 include_start_time_state: bool =
True,
163 significant_changes_only: bool =
True,
164 minimal_response: bool =
False,
165 no_attributes: bool =
False,
166 compressed_state_format: bool =
False,
167 ) -> dict[str, list[State | dict[str, Any]]]:
168 """Wrap get_significant_states_with_session with an sql session."""
177 include_start_time_state,
178 significant_changes_only,
181 compressed_state_format,
186 start_time: datetime,
187 end_time: datetime |
None,
188 entity_ids: list[str],
189 significant_changes_only: bool,
191 ) -> StatementLambdaElement:
192 """Query the database for significant state changes."""
194 no_attributes, include_last_changed=
not significant_changes_only
198 and significant_changes_only
201 stmt +=
lambda q: q.filter(
202 (States.last_changed_ts == States.last_updated_ts)
203 | States.last_changed_ts.is_(
None)
205 elif significant_changes_only:
206 stmt +=
lambda q: q.filter(
209 States.entity_id.like(entity_domain)
210 for entity_domain
in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
213 (States.last_changed_ts == States.last_updated_ts)
214 | States.last_changed_ts.is_(
None)
218 stmt +=
lambda q: q.filter(States.entity_id.in_(entity_ids))
220 start_time_ts = start_time.timestamp()
221 stmt +=
lambda q: q.filter(States.last_updated_ts > start_time_ts)
223 end_time_ts = end_time.timestamp()
224 stmt +=
lambda q: q.filter(States.last_updated_ts < end_time_ts)
227 stmt +=
lambda q: q.outerjoin(
228 StateAttributes, States.attributes_id == StateAttributes.attributes_id
230 stmt +=
lambda q: q.order_by(States.entity_id, States.last_updated_ts)
237 start_time: datetime,
238 end_time: datetime |
None =
None,
239 entity_ids: list[str] |
None =
None,
240 filters: Filters |
None =
None,
241 include_start_time_state: bool =
True,
242 significant_changes_only: bool =
True,
243 minimal_response: bool =
False,
244 no_attributes: bool =
False,
245 compressed_state_format: bool =
False,
246 ) -> dict[str, list[State | dict[str, Any]]]:
247 """Return states changes during UTC period start_time - end_time.
249 entity_ids is an optional iterable of entities to include in the results.
251 filters is an optional SQLAlchemy filter which will be applied to the database
252 queries unless entity_ids is given, in which case its ignored.
254 Significant states are all states where there is a state change,
255 as well as all states from certain domains (for instance
256 thermostat so that we get current temperature in our graphs).
258 if filters
is not None:
259 raise NotImplementedError(
"Filters are no longer supported")
261 raise ValueError(
"entity_ids must be provided")
266 significant_changes_only,
276 include_start_time_state,
279 compressed_state_format,
286 start_time: datetime,
287 end_time: datetime |
None =
None,
288 entity_ids: list[str] |
None =
None,
289 filters: Filters |
None =
None,
290 include_start_time_state: bool =
True,
291 significant_changes_only: bool =
True,
292 no_attributes: bool =
False,
293 ) -> dict[str, list[State]]:
294 """Variant of get_significant_states_with_session.
296 Difference with get_significant_states_with_session is that it does not
297 return minimal responses.
300 dict[str, list[State]],
304 start_time=start_time,
306 entity_ids=entity_ids,
308 include_start_time_state=include_start_time_state,
309 significant_changes_only=significant_changes_only,
310 minimal_response=
False,
311 no_attributes=no_attributes,
317 start_time: datetime,
318 end_time: datetime |
None,
323 ) -> StatementLambdaElement:
325 no_attributes, include_last_changed=
False
327 start_time_ts = start_time.timestamp()
328 stmt +=
lambda q: q.filter(
330 (States.last_changed_ts == States.last_updated_ts)
331 | States.last_changed_ts.is_(
None)
333 & (States.last_updated_ts > start_time_ts)
336 end_time_ts = end_time.timestamp()
337 stmt +=
lambda q: q.filter(States.last_updated_ts < end_time_ts)
338 stmt +=
lambda q: q.filter(States.entity_id == entity_id)
340 stmt +=
lambda q: q.outerjoin(
341 StateAttributes, States.attributes_id == StateAttributes.attributes_id
344 stmt +=
lambda q: q.order_by(States.entity_id, States.last_updated_ts.desc())
346 stmt +=
lambda q: q.order_by(States.entity_id, States.last_updated_ts)
349 stmt +=
lambda q: q.limit(limit)
355 start_time: datetime,
356 end_time: datetime |
None =
None,
357 entity_id: str |
None =
None,
358 no_attributes: bool =
False,
359 descending: bool =
False,
360 limit: int |
None =
None,
361 include_start_time_state: bool =
True,
362 ) -> dict[str, list[State]]:
363 """Return states changes during UTC period start_time - end_time."""
365 raise ValueError(
"entity_id must be provided")
366 entity_ids = [entity_id.lower()]
378 dict[str, list[State]],
385 include_start_time_state=include_start_time_state,
391 number_of_states: int, entity_id: str
392 ) -> StatementLambdaElement:
394 False, include_last_changed=
False
396 stmt +=
lambda q: q.where(
399 select(States.state_id)
400 .filter(States.entity_id == entity_id)
401 .order_by(States.last_updated_ts.desc())
402 .limit(number_of_states)
407 stmt +=
lambda q: q.outerjoin(
408 StateAttributes, States.attributes_id == StateAttributes.attributes_id
411 stmt +=
lambda q: q.order_by(States.state_id.desc())
416 hass: HomeAssistant, number_of_states: int, entity_id: str
417 ) -> dict[str, list[State]]:
418 """Return the last number_of_states."""
419 entity_id_lower = entity_id.lower()
420 entity_ids = [entity_id_lower]
426 dict[str, list[State]],
433 include_start_time_state=
False,
440 utc_point_in_time: datetime,
441 entity_ids: list[str],
443 ) -> StatementLambdaElement:
444 """Baked query to get states for specific entities."""
446 no_attributes, include_last_changed=
True
450 utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
451 stmt +=
lambda q: q.join(
453 most_recent_states_for_entities_by_date := (
455 States.entity_id.label(
"max_entity_id"),
456 func.max(States.last_updated_ts).label(
"max_last_updated"),
459 (States.last_updated_ts >= run_start_ts)
460 & (States.last_updated_ts < utc_point_in_time_ts)
462 .filter(States.entity_id.in_(entity_ids))
463 .group_by(States.entity_id)
468 States.entity_id == most_recent_states_for_entities_by_date.c.max_entity_id,
469 States.last_updated_ts
470 == most_recent_states_for_entities_by_date.c.max_last_updated,
474 stmt +=
lambda q: q.outerjoin(
475 StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
483 utc_point_in_time: datetime,
484 entity_ids: list[str],
486 no_attributes: bool =
False,
488 """Return the states at a specific point in time."""
489 if len(entity_ids) == 1:
493 utc_point_in_time, entity_ids[0], no_attributes
499 if oldest_ts
is None or oldest_ts > utc_point_in_time.timestamp():
506 oldest_ts, utc_point_in_time, entity_ids, no_attributes
512 utc_point_in_time: datetime,
514 no_attributes: bool =
False,
515 ) -> StatementLambdaElement:
519 no_attributes, include_last_changed=
True
521 utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
524 States.last_updated_ts < utc_point_in_time_ts,
525 States.entity_id == entity_id,
527 .order_by(States.last_updated_ts.desc())
531 stmt +=
lambda q: q.outerjoin(
532 StateAttributes, States.attributes_id == StateAttributes.attributes_id
540 states: Iterable[Row],
541 start_time: datetime,
542 entity_ids: list[str],
543 include_start_time_state: bool =
True,
544 minimal_response: bool =
False,
545 no_attributes: bool =
False,
546 compressed_state_format: bool =
False,
547 ) -> dict[str, list[State | dict[str, Any]]]:
548 """Convert SQL results into JSON friendly data structure.
550 This takes our state list and turns it into a JSON friendly data
551 structure {'entity_id': [list of states], 'entity_id2': [list of states]}
553 States must be sorted by entity_id and last_updated
555 We also need to go back and create a synthetic zero data point for
556 each list of states, otherwise our graphs won't start on the Y
559 state_class: Callable[
560 [Row, dict[str, dict[str, Any]], datetime |
None], State | dict[str, Any]
562 if compressed_state_format:
563 state_class = legacy_row_to_compressed_state
564 attr_time = COMPRESSED_STATE_LAST_UPDATED
565 attr_state = COMPRESSED_STATE_STATE
567 state_class = LegacyLazyState
568 attr_time = LAST_CHANGED_KEY
569 attr_state = STATE_KEY
571 result: dict[str, list[State | dict[str, Any]]] = defaultdict(list)
573 for ent_id
in entity_ids:
578 initial_states: dict[str, Row] = {}
579 if include_start_time_state:
587 no_attributes=no_attributes,
591 if len(entity_ids) == 1:
592 states_iter: Iterable[tuple[str, Iterator[Row]]] = (
593 (entity_ids[0], iter(states)),
596 key_func = attrgetter(
"entity_id")
597 states_iter = groupby(states, key_func)
600 for ent_id, group
in states_iter:
601 attr_cache: dict[str, dict[str, Any]] = {}
602 prev_state: Column | str
603 ent_results = result[ent_id]
604 if row := initial_states.pop(ent_id,
None):
605 prev_state = row.state
606 ent_results.append(
state_class(row, attr_cache, start_time))
608 if not minimal_response
or split_entity_id(ent_id)[0]
in NEED_ATTRIBUTE_DOMAINS:
610 state_class(db_state, attr_cache,
None)
for db_state
in group
619 if (first_state := next(group,
None))
is None:
621 prev_state = first_state.state
622 ent_results.append(
state_class(first_state, attr_cache,
None))
624 state_idx = _FIELD_MAP[
"state"]
633 last_updated_ts_idx = _FIELD_MAP[
"last_updated_ts"]
634 if compressed_state_format:
636 if (state := row[state_idx]) != prev_state:
640 attr_time: row[last_updated_ts_idx],
647 if (state := row[state_idx]) != prev_state:
652 dt_util.utc_from_timestamp(row[last_updated_ts_idx])
660 for ent_id, row
in initial_states.items():
661 result[ent_id].append(
state_class(row, {}, start_time))
664 return {key: val
for key, val
in result.items()
if val}
StatementLambdaElement _significant_states_stmt(datetime start_time, datetime|None end_time, list[str] entity_ids, bool significant_changes_only, bool no_attributes)
dict[str, list[State|dict[str, Any]]] _sorted_states_to_dict(HomeAssistant hass, Session session, Iterable[Row] states, datetime start_time, list[str] entity_ids, bool include_start_time_state=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
StatementLambdaElement _state_changed_during_period_stmt(datetime start_time, datetime|None end_time, str entity_id, bool no_attributes, bool descending, int|None limit)
dict[str, list[State]] get_full_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool no_attributes=False)
dict[str, list[State|dict[str, Any]]] get_significant_states(HomeAssistant hass, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
dict[str, list[State|dict[str, Any]]] get_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
StatementLambdaElement _get_last_state_changes_stmt(int number_of_states, str entity_id)
Iterable[Row] _get_rows_with_session(HomeAssistant hass, Session session, datetime utc_point_in_time, list[str] entity_ids, *bool no_attributes=False)
dict[str, list[State]] state_changes_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time=None, str|None entity_id=None, bool no_attributes=False, bool descending=False, int|None limit=None, bool include_start_time_state=True)
StatementLambdaElement _get_states_for_entities_stmt(float run_start_ts, datetime utc_point_in_time, list[str] entity_ids, bool no_attributes)
StatementLambdaElement _get_single_entity_states_stmt(datetime utc_point_in_time, str entity_id, bool no_attributes=False)
tuple[StatementLambdaElement, bool] _lambda_stmt_and_join_attributes(bool no_attributes, bool include_last_changed=True)
dict[str, list[State]] get_last_state_changes(HomeAssistant hass, int number_of_states, str entity_id)
None process_timestamp_to_utc_isoformat(None ts)
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
tuple[str, str] split_entity_id(str entity_id)
Recorder get_instance(HomeAssistant hass)
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)