1 """Models for Recorder."""
3 from __future__
import annotations
5 from datetime
import datetime
8 from sqlalchemy.engine.row
import Row
11 COMPRESSED_STATE_ATTRIBUTES,
12 COMPRESSED_STATE_LAST_CHANGED,
13 COMPRESSED_STATE_LAST_UPDATED,
14 COMPRESSED_STATE_STATE,
19 from .state_attributes
import decode_attributes_from_source
20 from .time
import process_timestamp
24 """A lazy version of core State after schema 31."""
39 attr_cache: dict[str, dict[str, Any]],
40 start_time: datetime |
None,
41 entity_id: str |
None =
None,
43 """Init the lazy state."""
47 self.
_attributes_attributes: dict[str, Any] |
None =
None
49 dt_util.utc_to_timestamp(start_time)
if start_time
else None
55 self.
_context_context: Context |
None =
None
60 """State attributes."""
80 def context(self, value: Context) ->
None:
86 """Last changed datetime."""
92 """Set last changed datetime."""
97 """Last reported datetime."""
101 @last_reported.setter
103 """Set last reported datetime."""
108 """Last updated datetime."""
114 """Set last updated datetime."""
118 """Return a dict representation of the LazyState.
121 To be used for JSON serialization.
125 last_changed_isoformat = last_updated_isoformat
132 "last_changed": last_changed_isoformat,
133 "last_updated": last_updated_isoformat,
139 attr_cache: dict[str, dict[str, Any]],
140 start_time: datetime |
None,
141 entity_id: str |
None =
None,
143 """Convert a database row to a compressed state schema 31 and later."""
145 COMPRESSED_STATE_STATE: row.state,
149 comp_state[COMPRESSED_STATE_LAST_UPDATED] = dt_util.utc_to_timestamp(start_time)
151 row_last_updated_ts: float = row.last_updated_ts
152 comp_state[COMPRESSED_STATE_LAST_UPDATED] = row_last_updated_ts
154 row_last_changed_ts := row.last_changed_ts
155 )
and row_last_updated_ts != row_last_changed_ts:
156 comp_state[COMPRESSED_STATE_LAST_CHANGED] = row_last_changed_ts
161 row: Row, attr_cache: dict[str, dict[str, Any]]
163 """Decode attributes from a database row."""
165 getattr(row,
"shared_attrs",
None)
or getattr(row,
"attributes",
None),
datetime last_changed(self)
None __init__(self, Row row, dict[str, dict[str, Any]] attr_cache, datetime|None start_time, str|None entity_id=None)
datetime last_updated(self)
None attributes(self, dict[str, Any] value)
dict[str, Any] attributes(self)
None last_changed(self, datetime value)
dict[str, Any] as_dict(self)
None last_updated(self, datetime value)
dict[str, Any] legacy_row_to_compressed_state(Row row, dict[str, dict[str, Any]] attr_cache, datetime|None start_time, str|None entity_id=None)
dict[str, Any] decode_attributes_from_row_legacy(Row row, dict[str, dict[str, Any]] attr_cache)
dict[str, Any] decode_attributes_from_source(Any source, dict[str, dict[str, Any]] attr_cache)
None process_timestamp(None ts)