Home Assistant Unofficial Reference 2024.12.1
legacy.py
Go to the documentation of this file.
1 """Provide pre-made queries on top of the recorder component."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Callable, Iterable, Iterator
7 from datetime import datetime
8 from itertools import groupby
9 from operator import attrgetter
10 import time
11 from typing import Any, cast
12 
13 from sqlalchemy import Column, Text, and_, func, lambda_stmt, or_, select
14 from sqlalchemy.engine.row import Row
15 from sqlalchemy.orm.properties import MappedColumn
16 from sqlalchemy.orm.session import Session
17 from sqlalchemy.sql.expression import literal
18 from sqlalchemy.sql.lambdas import StatementLambdaElement
19 
20 from homeassistant.const import COMPRESSED_STATE_LAST_UPDATED, COMPRESSED_STATE_STATE
21 from homeassistant.core import HomeAssistant, State, split_entity_id
22 from homeassistant.helpers.recorder import get_instance
23 import homeassistant.util.dt as dt_util
24 
25 from ..db_schema import StateAttributes, States
26 from ..filters import Filters
27 from ..models import process_timestamp_to_utc_isoformat
28 from ..models.legacy import LegacyLazyState, legacy_row_to_compressed_state
29 from ..util import execute_stmt_lambda_element, session_scope
30 from .const import (
31  LAST_CHANGED_KEY,
32  NEED_ATTRIBUTE_DOMAINS,
33  SIGNIFICANT_DOMAINS,
34  SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE,
35  STATE_KEY,
36 )
37 
38 _BASE_STATES = (
39  States.entity_id,
40  States.state,
41  States.last_changed_ts,
42  States.last_updated_ts,
43 )
44 _BASE_STATES_NO_LAST_CHANGED = (
45  States.entity_id,
46  States.state,
47  literal(value=None).label("last_changed_ts"),
48  States.last_updated_ts,
49 )
50 _QUERY_STATE_NO_ATTR = (
51  *_BASE_STATES,
52  literal(value=None, type_=Text).label("attributes"),
53  literal(value=None, type_=Text).label("shared_attrs"),
54 )
55 _QUERY_STATE_NO_ATTR_NO_LAST_CHANGED = (
56  *_BASE_STATES_NO_LAST_CHANGED,
57  literal(value=None, type_=Text).label("attributes"),
58  literal(value=None, type_=Text).label("shared_attrs"),
59 )
60 _BASE_STATES_PRE_SCHEMA_31 = (
61  States.entity_id,
62  States.state,
63  States.last_changed,
64  States.last_updated,
65 )
66 _BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
67  States.entity_id,
68  States.state,
69  literal(value=None, type_=Text).label("last_changed"),
70  States.last_updated,
71 )
72 _QUERY_STATE_NO_ATTR_PRE_SCHEMA_31 = (
73  *_BASE_STATES_PRE_SCHEMA_31,
74  literal(value=None, type_=Text).label("attributes"),
75  literal(value=None, type_=Text).label("shared_attrs"),
76 )
77 _QUERY_STATE_NO_ATTR_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
78  *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
79  literal(value=None, type_=Text).label("attributes"),
80  literal(value=None, type_=Text).label("shared_attrs"),
81 )
82 # Remove QUERY_STATES_PRE_SCHEMA_25
83 # and the migration_in_progress check
84 # once schema 26 is created
85 _QUERY_STATES_PRE_SCHEMA_25 = (
86  *_BASE_STATES_PRE_SCHEMA_31,
87  States.attributes,
88  literal(value=None, type_=Text).label("shared_attrs"),
89 )
90 _QUERY_STATES_PRE_SCHEMA_25_NO_LAST_CHANGED = (
91  *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
92  States.attributes,
93  literal(value=None, type_=Text).label("shared_attrs"),
94 )
95 _QUERY_STATES_PRE_SCHEMA_31 = (
96  *_BASE_STATES_PRE_SCHEMA_31,
97  # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
98  States.attributes,
99  StateAttributes.shared_attrs,
100 )
101 _QUERY_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31 = (
102  *_BASE_STATES_NO_LAST_CHANGED_PRE_SCHEMA_31,
103  # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
104  States.attributes,
105  StateAttributes.shared_attrs,
106 )
107 _QUERY_STATES = (
108  *_BASE_STATES,
109  # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
110  States.attributes,
111  StateAttributes.shared_attrs,
112 )
113 _QUERY_STATES_NO_LAST_CHANGED = (
114  *_BASE_STATES_NO_LAST_CHANGED,
115  # Remove States.attributes once all attributes are in StateAttributes.shared_attrs
116  States.attributes,
117  StateAttributes.shared_attrs,
118 )
119 _FIELD_MAP = {
120  cast(MappedColumn, field).name: idx
121  for idx, field in enumerate(_QUERY_STATE_NO_ATTR)
122 }
123 _FIELD_MAP_PRE_SCHEMA_31 = {
124  cast(MappedColumn, field).name: idx
125  for idx, field in enumerate(_QUERY_STATES_PRE_SCHEMA_31)
126 }
127 
128 
130  no_attributes: bool, include_last_changed: bool = True
131 ) -> tuple[StatementLambdaElement, bool]:
132  """Return the lambda_stmt and if StateAttributes should be joined.
133 
134  Because these are lambda_stmt the values inside the lambdas need
135  to be explicitly written out to avoid caching the wrong values.
136  """
137  # If no_attributes was requested we do the query
138  # without the attributes fields and do not join the
139  # state_attributes table
140  if no_attributes:
141  if include_last_changed:
142  return (
143  lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR)),
144  False,
145  )
146  return (
147  lambda_stmt(lambda: select(*_QUERY_STATE_NO_ATTR_NO_LAST_CHANGED)),
148  False,
149  )
150 
151  if include_last_changed:
152  return lambda_stmt(lambda: select(*_QUERY_STATES)), True
153  return lambda_stmt(lambda: select(*_QUERY_STATES_NO_LAST_CHANGED)), True
154 
155 
157  hass: HomeAssistant,
158  start_time: datetime,
159  end_time: datetime | None = None,
160  entity_ids: list[str] | None = None,
161  filters: Filters | None = None,
162  include_start_time_state: bool = True,
163  significant_changes_only: bool = True,
164  minimal_response: bool = False,
165  no_attributes: bool = False,
166  compressed_state_format: bool = False,
167 ) -> dict[str, list[State | dict[str, Any]]]:
168  """Wrap get_significant_states_with_session with an sql session."""
169  with session_scope(hass=hass, read_only=True) as session:
171  hass,
172  session,
173  start_time,
174  end_time,
175  entity_ids,
176  filters,
177  include_start_time_state,
178  significant_changes_only,
179  minimal_response,
180  no_attributes,
181  compressed_state_format,
182  )
183 
184 
186  start_time: datetime,
187  end_time: datetime | None,
188  entity_ids: list[str],
189  significant_changes_only: bool,
190  no_attributes: bool,
191 ) -> StatementLambdaElement:
192  """Query the database for significant state changes."""
193  stmt, join_attributes = _lambda_stmt_and_join_attributes(
194  no_attributes, include_last_changed=not significant_changes_only
195  )
196  if (
197  len(entity_ids) == 1
198  and significant_changes_only
199  and split_entity_id(entity_ids[0])[0] not in SIGNIFICANT_DOMAINS
200  ):
201  stmt += lambda q: q.filter(
202  (States.last_changed_ts == States.last_updated_ts)
203  | States.last_changed_ts.is_(None)
204  )
205  elif significant_changes_only:
206  stmt += lambda q: q.filter(
207  or_(
208  *[
209  States.entity_id.like(entity_domain)
210  for entity_domain in SIGNIFICANT_DOMAINS_ENTITY_ID_LIKE
211  ],
212  (
213  (States.last_changed_ts == States.last_updated_ts)
214  | States.last_changed_ts.is_(None)
215  ),
216  )
217  )
218  stmt += lambda q: q.filter(States.entity_id.in_(entity_ids))
219 
220  start_time_ts = start_time.timestamp()
221  stmt += lambda q: q.filter(States.last_updated_ts > start_time_ts)
222  if end_time:
223  end_time_ts = end_time.timestamp()
224  stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
225 
226  if join_attributes:
227  stmt += lambda q: q.outerjoin(
228  StateAttributes, States.attributes_id == StateAttributes.attributes_id
229  )
230  stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
231  return stmt
232 
233 
235  hass: HomeAssistant,
236  session: Session,
237  start_time: datetime,
238  end_time: datetime | None = None,
239  entity_ids: list[str] | None = None,
240  filters: Filters | None = None,
241  include_start_time_state: bool = True,
242  significant_changes_only: bool = True,
243  minimal_response: bool = False,
244  no_attributes: bool = False,
245  compressed_state_format: bool = False,
246 ) -> dict[str, list[State | dict[str, Any]]]:
247  """Return states changes during UTC period start_time - end_time.
248 
249  entity_ids is an optional iterable of entities to include in the results.
250 
251  filters is an optional SQLAlchemy filter which will be applied to the database
252  queries unless entity_ids is given, in which case its ignored.
253 
254  Significant states are all states where there is a state change,
255  as well as all states from certain domains (for instance
256  thermostat so that we get current temperature in our graphs).
257  """
258  if filters is not None:
259  raise NotImplementedError("Filters are no longer supported")
260  if not entity_ids:
261  raise ValueError("entity_ids must be provided")
263  start_time,
264  end_time,
265  entity_ids,
266  significant_changes_only,
267  no_attributes,
268  )
269  states = execute_stmt_lambda_element(session, stmt, None, end_time)
270  return _sorted_states_to_dict(
271  hass,
272  session,
273  states,
274  start_time,
275  entity_ids,
276  include_start_time_state,
277  minimal_response,
278  no_attributes,
279  compressed_state_format,
280  )
281 
282 
284  hass: HomeAssistant,
285  session: Session,
286  start_time: datetime,
287  end_time: datetime | None = None,
288  entity_ids: list[str] | None = None,
289  filters: Filters | None = None,
290  include_start_time_state: bool = True,
291  significant_changes_only: bool = True,
292  no_attributes: bool = False,
293 ) -> dict[str, list[State]]:
294  """Variant of get_significant_states_with_session.
295 
296  Difference with get_significant_states_with_session is that it does not
297  return minimal responses.
298  """
299  return cast(
300  dict[str, list[State]],
302  hass=hass,
303  session=session,
304  start_time=start_time,
305  end_time=end_time,
306  entity_ids=entity_ids,
307  filters=filters,
308  include_start_time_state=include_start_time_state,
309  significant_changes_only=significant_changes_only,
310  minimal_response=False,
311  no_attributes=no_attributes,
312  ),
313  )
314 
315 
317  start_time: datetime,
318  end_time: datetime | None,
319  entity_id: str,
320  no_attributes: bool,
321  descending: bool,
322  limit: int | None,
323 ) -> StatementLambdaElement:
324  stmt, join_attributes = _lambda_stmt_and_join_attributes(
325  no_attributes, include_last_changed=False
326  )
327  start_time_ts = start_time.timestamp()
328  stmt += lambda q: q.filter(
329  (
330  (States.last_changed_ts == States.last_updated_ts)
331  | States.last_changed_ts.is_(None)
332  )
333  & (States.last_updated_ts > start_time_ts)
334  )
335  if end_time:
336  end_time_ts = end_time.timestamp()
337  stmt += lambda q: q.filter(States.last_updated_ts < end_time_ts)
338  stmt += lambda q: q.filter(States.entity_id == entity_id)
339  if join_attributes:
340  stmt += lambda q: q.outerjoin(
341  StateAttributes, States.attributes_id == StateAttributes.attributes_id
342  )
343  if descending:
344  stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts.desc())
345  else:
346  stmt += lambda q: q.order_by(States.entity_id, States.last_updated_ts)
347 
348  if limit:
349  stmt += lambda q: q.limit(limit)
350  return stmt
351 
352 
354  hass: HomeAssistant,
355  start_time: datetime,
356  end_time: datetime | None = None,
357  entity_id: str | None = None,
358  no_attributes: bool = False,
359  descending: bool = False,
360  limit: int | None = None,
361  include_start_time_state: bool = True,
362 ) -> dict[str, list[State]]:
363  """Return states changes during UTC period start_time - end_time."""
364  if not entity_id:
365  raise ValueError("entity_id must be provided")
366  entity_ids = [entity_id.lower()]
367  with session_scope(hass=hass, read_only=True) as session:
369  start_time,
370  end_time,
371  entity_id,
372  no_attributes,
373  descending,
374  limit,
375  )
376  states = execute_stmt_lambda_element(session, stmt, None, end_time)
377  return cast(
378  dict[str, list[State]],
380  hass,
381  session,
382  states,
383  start_time,
384  entity_ids,
385  include_start_time_state=include_start_time_state,
386  ),
387  )
388 
389 
391  number_of_states: int, entity_id: str
392 ) -> StatementLambdaElement:
393  stmt, join_attributes = _lambda_stmt_and_join_attributes(
394  False, include_last_changed=False
395  )
396  stmt += lambda q: q.where(
397  States.state_id
398  == (
399  select(States.state_id)
400  .filter(States.entity_id == entity_id)
401  .order_by(States.last_updated_ts.desc())
402  .limit(number_of_states)
403  .subquery()
404  ).c.state_id
405  )
406  if join_attributes:
407  stmt += lambda q: q.outerjoin(
408  StateAttributes, States.attributes_id == StateAttributes.attributes_id
409  )
410 
411  stmt += lambda q: q.order_by(States.state_id.desc())
412  return stmt
413 
414 
416  hass: HomeAssistant, number_of_states: int, entity_id: str
417 ) -> dict[str, list[State]]:
418  """Return the last number_of_states."""
419  entity_id_lower = entity_id.lower()
420  entity_ids = [entity_id_lower]
421 
422  with session_scope(hass=hass, read_only=True) as session:
423  stmt = _get_last_state_changes_stmt(number_of_states, entity_id_lower)
424  states = list(execute_stmt_lambda_element(session, stmt))
425  return cast(
426  dict[str, list[State]],
428  hass,
429  session,
430  reversed(states),
431  dt_util.utcnow(),
432  entity_ids,
433  include_start_time_state=False,
434  ),
435  )
436 
437 
439  run_start_ts: float,
440  utc_point_in_time: datetime,
441  entity_ids: list[str],
442  no_attributes: bool,
443 ) -> StatementLambdaElement:
444  """Baked query to get states for specific entities."""
445  stmt, join_attributes = _lambda_stmt_and_join_attributes(
446  no_attributes, include_last_changed=True
447  )
448  # We got an include-list of entities, accelerate the query by filtering already
449  # in the inner query.
450  utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
451  stmt += lambda q: q.join(
452  (
453  most_recent_states_for_entities_by_date := (
454  select(
455  States.entity_id.label("max_entity_id"),
456  func.max(States.last_updated_ts).label("max_last_updated"),
457  )
458  .filter(
459  (States.last_updated_ts >= run_start_ts)
460  & (States.last_updated_ts < utc_point_in_time_ts)
461  )
462  .filter(States.entity_id.in_(entity_ids))
463  .group_by(States.entity_id)
464  .subquery()
465  )
466  ),
467  and_(
468  States.entity_id == most_recent_states_for_entities_by_date.c.max_entity_id,
469  States.last_updated_ts
470  == most_recent_states_for_entities_by_date.c.max_last_updated,
471  ),
472  )
473  if join_attributes:
474  stmt += lambda q: q.outerjoin(
475  StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
476  )
477  return stmt
478 
479 
481  hass: HomeAssistant,
482  session: Session,
483  utc_point_in_time: datetime,
484  entity_ids: list[str],
485  *,
486  no_attributes: bool = False,
487 ) -> Iterable[Row]:
488  """Return the states at a specific point in time."""
489  if len(entity_ids) == 1:
491  session,
493  utc_point_in_time, entity_ids[0], no_attributes
494  ),
495  )
496 
497  oldest_ts = get_instance(hass).states_manager.oldest_ts
498 
499  if oldest_ts is None or oldest_ts > utc_point_in_time.timestamp():
500  # We don't have any states for the requested time
501  return []
502 
503  # We have more than one entity to look at so we need to do a query on states
504  # since the last recorder run started.
506  oldest_ts, utc_point_in_time, entity_ids, no_attributes
507  )
508  return execute_stmt_lambda_element(session, stmt)
509 
510 
512  utc_point_in_time: datetime,
513  entity_id: str,
514  no_attributes: bool = False,
515 ) -> StatementLambdaElement:
516  # Use an entirely different (and extremely fast) query if we only
517  # have a single entity id
518  stmt, join_attributes = _lambda_stmt_and_join_attributes(
519  no_attributes, include_last_changed=True
520  )
521  utc_point_in_time_ts = dt_util.utc_to_timestamp(utc_point_in_time)
522  stmt += (
523  lambda q: q.filter(
524  States.last_updated_ts < utc_point_in_time_ts,
525  States.entity_id == entity_id,
526  )
527  .order_by(States.last_updated_ts.desc())
528  .limit(1)
529  )
530  if join_attributes:
531  stmt += lambda q: q.outerjoin(
532  StateAttributes, States.attributes_id == StateAttributes.attributes_id
533  )
534  return stmt
535 
536 
538  hass: HomeAssistant,
539  session: Session,
540  states: Iterable[Row],
541  start_time: datetime,
542  entity_ids: list[str],
543  include_start_time_state: bool = True,
544  minimal_response: bool = False,
545  no_attributes: bool = False,
546  compressed_state_format: bool = False,
547 ) -> dict[str, list[State | dict[str, Any]]]:
548  """Convert SQL results into JSON friendly data structure.
549 
550  This takes our state list and turns it into a JSON friendly data
551  structure {'entity_id': [list of states], 'entity_id2': [list of states]}
552 
553  States must be sorted by entity_id and last_updated
554 
555  We also need to go back and create a synthetic zero data point for
556  each list of states, otherwise our graphs won't start on the Y
557  axis correctly.
558  """
559  state_class: Callable[
560  [Row, dict[str, dict[str, Any]], datetime | None], State | dict[str, Any]
561  ]
562  if compressed_state_format:
563  state_class = legacy_row_to_compressed_state
564  attr_time = COMPRESSED_STATE_LAST_UPDATED
565  attr_state = COMPRESSED_STATE_STATE
566  else:
567  state_class = LegacyLazyState
568  attr_time = LAST_CHANGED_KEY
569  attr_state = STATE_KEY
570 
571  result: dict[str, list[State | dict[str, Any]]] = defaultdict(list)
572  # Set all entity IDs to empty lists in result set to maintain the order
573  for ent_id in entity_ids:
574  result[ent_id] = []
575 
576  # Get the states at the start time
577  time.perf_counter()
578  initial_states: dict[str, Row] = {}
579  if include_start_time_state:
580  initial_states = {
581  row.entity_id: row
582  for row in _get_rows_with_session(
583  hass,
584  session,
585  start_time,
586  entity_ids,
587  no_attributes=no_attributes,
588  )
589  }
590 
591  if len(entity_ids) == 1:
592  states_iter: Iterable[tuple[str, Iterator[Row]]] = (
593  (entity_ids[0], iter(states)),
594  )
595  else:
596  key_func = attrgetter("entity_id")
597  states_iter = groupby(states, key_func)
598 
599  # Append all changes to it
600  for ent_id, group in states_iter:
601  attr_cache: dict[str, dict[str, Any]] = {}
602  prev_state: Column | str
603  ent_results = result[ent_id]
604  if row := initial_states.pop(ent_id, None):
605  prev_state = row.state
606  ent_results.append(state_class(row, attr_cache, start_time))
607 
608  if not minimal_response or split_entity_id(ent_id)[0] in NEED_ATTRIBUTE_DOMAINS:
609  ent_results.extend(
610  state_class(db_state, attr_cache, None) for db_state in group
611  )
612  continue
613 
614  # With minimal response we only provide a native
615  # State for the first and last response. All the states
616  # in-between only provide the "state" and the
617  # "last_changed".
618  if not ent_results:
619  if (first_state := next(group, None)) is None:
620  continue
621  prev_state = first_state.state
622  ent_results.append(state_class(first_state, attr_cache, None))
623 
624  state_idx = _FIELD_MAP["state"]
625 
626  #
627  # minimal_response only makes sense with last_updated == last_updated
628  #
629  # We use last_updated for for last_changed since its the same
630  #
631  # With minimal response we do not care about attribute
632  # changes so we can filter out duplicate states
633  last_updated_ts_idx = _FIELD_MAP["last_updated_ts"]
634  if compressed_state_format:
635  for row in group:
636  if (state := row[state_idx]) != prev_state:
637  ent_results.append(
638  {
639  attr_state: state,
640  attr_time: row[last_updated_ts_idx],
641  }
642  )
643  prev_state = state
644  continue
645 
646  for row in group:
647  if (state := row[state_idx]) != prev_state:
648  ent_results.append(
649  {
650  attr_state: state,
652  dt_util.utc_from_timestamp(row[last_updated_ts_idx])
653  ),
654  }
655  )
656  prev_state = state
657 
658  # If there are no states beyond the initial state,
659  # the state a was never popped from initial_states
660  for ent_id, row in initial_states.items():
661  result[ent_id].append(state_class(row, {}, start_time))
662 
663  # Filter out the empty lists if some states had 0 results.
664  return {key: val for key, val in result.items() if val}
StatementLambdaElement _significant_states_stmt(datetime start_time, datetime|None end_time, list[str] entity_ids, bool significant_changes_only, bool no_attributes)
Definition: legacy.py:191
dict[str, list[State|dict[str, Any]]] _sorted_states_to_dict(HomeAssistant hass, Session session, Iterable[Row] states, datetime start_time, list[str] entity_ids, bool include_start_time_state=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
Definition: legacy.py:547
StatementLambdaElement _state_changed_during_period_stmt(datetime start_time, datetime|None end_time, str entity_id, bool no_attributes, bool descending, int|None limit)
Definition: legacy.py:323
dict[str, list[State]] get_full_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool no_attributes=False)
Definition: legacy.py:293
dict[str, list[State|dict[str, Any]]] get_significant_states(HomeAssistant hass, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
Definition: legacy.py:167
dict[str, list[State|dict[str, Any]]] get_significant_states_with_session(HomeAssistant hass, Session session, datetime start_time, datetime|None end_time=None, list[str]|None entity_ids=None, Filters|None filters=None, bool include_start_time_state=True, bool significant_changes_only=True, bool minimal_response=False, bool no_attributes=False, bool compressed_state_format=False)
Definition: legacy.py:246
StatementLambdaElement _get_last_state_changes_stmt(int number_of_states, str entity_id)
Definition: legacy.py:392
Iterable[Row] _get_rows_with_session(HomeAssistant hass, Session session, datetime utc_point_in_time, list[str] entity_ids, *bool no_attributes=False)
Definition: legacy.py:487
dict[str, list[State]] state_changes_during_period(HomeAssistant hass, datetime start_time, datetime|None end_time=None, str|None entity_id=None, bool no_attributes=False, bool descending=False, int|None limit=None, bool include_start_time_state=True)
Definition: legacy.py:362
StatementLambdaElement _get_states_for_entities_stmt(float run_start_ts, datetime utc_point_in_time, list[str] entity_ids, bool no_attributes)
Definition: legacy.py:443
StatementLambdaElement _get_single_entity_states_stmt(datetime utc_point_in_time, str entity_id, bool no_attributes=False)
Definition: legacy.py:515
tuple[StatementLambdaElement, bool] _lambda_stmt_and_join_attributes(bool no_attributes, bool include_last_changed=True)
Definition: legacy.py:131
dict[str, list[State]] get_last_state_changes(HomeAssistant hass, int number_of_states, str entity_id)
Definition: legacy.py:417
None process_timestamp_to_utc_isoformat(None ts)
Definition: time.py:37
Sequence[Row]|Result execute_stmt_lambda_element(Session session, StatementLambdaElement stmt, datetime|None start_time=None, datetime|None end_time=None, int yield_per=DEFAULT_YIELD_STATES_ROWS, bool orm_rows=True)
Definition: util.py:179
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
Recorder get_instance(HomeAssistant hass)
Definition: recorder.py:74
Generator[Session] session_scope(*HomeAssistant|None hass=None, Session|None session=None, Callable[[Exception], bool]|None exception_filter=None, bool read_only=False)
Definition: recorder.py:86