Home Assistant Unofficial Reference 2024.12.1
db_schema.py
Go to the documentation of this file.
1 """Models for SQLAlchemy."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable
6 from datetime import datetime, timedelta
7 import logging
8 import time
9 from typing import Any, Self, cast
10 
11 import ciso8601
12 from fnv_hash_fast import fnv1a_32
13 from sqlalchemy import (
14  CHAR,
15  JSON,
16  BigInteger,
17  Boolean,
18  ColumnElement,
19  DateTime,
20  Float,
21  ForeignKey,
22  Identity,
23  Index,
24  Integer,
25  LargeBinary,
26  SmallInteger,
27  String,
28  Text,
29  case,
30  type_coerce,
31 )
32 from sqlalchemy.dialects import mysql, oracle, postgresql, sqlite
33 from sqlalchemy.engine.interfaces import Dialect
34 from sqlalchemy.ext.compiler import compiles
35 from sqlalchemy.orm import DeclarativeBase, Mapped, aliased, mapped_column, relationship
36 from sqlalchemy.types import TypeDecorator
37 
38 from homeassistant.components.sensor import ATTR_STATE_CLASS
39 from homeassistant.const import (
40  ATTR_DEVICE_CLASS,
41  ATTR_FRIENDLY_NAME,
42  ATTR_UNIT_OF_MEASUREMENT,
43  MATCH_ALL,
44  MAX_LENGTH_EVENT_EVENT_TYPE,
45  MAX_LENGTH_STATE_ENTITY_ID,
46  MAX_LENGTH_STATE_STATE,
47 )
48 from homeassistant.core import Context, Event, EventOrigin, EventStateChangedData, State
49 from homeassistant.helpers.json import JSON_DUMP, json_bytes, json_bytes_strip_null
50 import homeassistant.util.dt as dt_util
51 from homeassistant.util.json import (
52  JSON_DECODE_EXCEPTIONS,
53  json_loads,
54  json_loads_object,
55 )
56 
57 from .const import ALL_DOMAIN_EXCLUDE_ATTRS, SupportedDialect
58 from .models import (
59  StatisticData,
60  StatisticDataTimestamp,
61  StatisticMetaData,
62  bytes_to_ulid_or_none,
63  bytes_to_uuid_hex_or_none,
64  datetime_to_timestamp_or_none,
65  process_timestamp,
66  ulid_to_bytes_or_none,
67  uuid_hex_to_bytes_or_none,
68 )
69 
70 
71 # SQLAlchemy Schema
72 class Base(DeclarativeBase):
73  """Base class for tables."""
74 
75 
76 class LegacyBase(DeclarativeBase):
77  """Base class for tables, used for schema migration."""
78 
79 
80 SCHEMA_VERSION = 47
81 
82 _LOGGER = logging.getLogger(__name__)
83 
84 TABLE_EVENTS = "events"
85 TABLE_EVENT_DATA = "event_data"
86 TABLE_EVENT_TYPES = "event_types"
87 TABLE_STATES = "states"
88 TABLE_STATE_ATTRIBUTES = "state_attributes"
89 TABLE_STATES_META = "states_meta"
90 TABLE_RECORDER_RUNS = "recorder_runs"
91 TABLE_SCHEMA_CHANGES = "schema_changes"
92 TABLE_STATISTICS = "statistics"
93 TABLE_STATISTICS_META = "statistics_meta"
94 TABLE_STATISTICS_RUNS = "statistics_runs"
95 TABLE_STATISTICS_SHORT_TERM = "statistics_short_term"
96 TABLE_MIGRATION_CHANGES = "migration_changes"
97 
98 STATISTICS_TABLES = ("statistics", "statistics_short_term")
99 
100 MAX_STATE_ATTRS_BYTES = 16384
101 MAX_EVENT_DATA_BYTES = 32768
102 
103 PSQL_DIALECT = SupportedDialect.POSTGRESQL
104 
105 ALL_TABLES = [
106  TABLE_STATES,
107  TABLE_STATE_ATTRIBUTES,
108  TABLE_EVENTS,
109  TABLE_EVENT_DATA,
110  TABLE_EVENT_TYPES,
111  TABLE_RECORDER_RUNS,
112  TABLE_SCHEMA_CHANGES,
113  TABLE_MIGRATION_CHANGES,
114  TABLE_STATES_META,
115  TABLE_STATISTICS,
116  TABLE_STATISTICS_META,
117  TABLE_STATISTICS_RUNS,
118  TABLE_STATISTICS_SHORT_TERM,
119 ]
120 
121 TABLES_TO_CHECK = [
122  TABLE_STATES,
123  TABLE_EVENTS,
124  TABLE_RECORDER_RUNS,
125  TABLE_SCHEMA_CHANGES,
126 ]
127 
128 LAST_UPDATED_INDEX_TS = "ix_states_last_updated_ts"
129 METADATA_ID_LAST_UPDATED_INDEX_TS = "ix_states_metadata_id_last_updated_ts"
130 EVENTS_CONTEXT_ID_BIN_INDEX = "ix_events_context_id_bin"
131 STATES_CONTEXT_ID_BIN_INDEX = "ix_states_context_id_bin"
132 LEGACY_STATES_EVENT_ID_INDEX = "ix_states_event_id"
133 LEGACY_STATES_ENTITY_ID_LAST_UPDATED_INDEX = "ix_states_entity_id_last_updated_ts"
134 CONTEXT_ID_BIN_MAX_LENGTH = 16
135 
136 MYSQL_COLLATE = "utf8mb4_unicode_ci"
137 MYSQL_DEFAULT_CHARSET = "utf8mb4"
138 MYSQL_ENGINE = "InnoDB"
139 
140 _DEFAULT_TABLE_ARGS = {
141  "mysql_default_charset": MYSQL_DEFAULT_CHARSET,
142  "mysql_collate": MYSQL_COLLATE,
143  "mysql_engine": MYSQL_ENGINE,
144  "mariadb_default_charset": MYSQL_DEFAULT_CHARSET,
145  "mariadb_collate": MYSQL_COLLATE,
146  "mariadb_engine": MYSQL_ENGINE,
147 }
148 
149 _MATCH_ALL_KEEP = {
150  ATTR_DEVICE_CLASS,
151  ATTR_STATE_CLASS,
152  ATTR_UNIT_OF_MEASUREMENT,
153  ATTR_FRIENDLY_NAME,
154 }
155 
156 
157 class UnusedDateTime(DateTime):
158  """An unused column type that behaves like a datetime."""
159 
160 
161 class Unused(CHAR):
162  """An unused column type that behaves like a string."""
163 
164 
165 @compiles(UnusedDateTime, "mysql", "mariadb", "sqlite")
166 @compiles(Unused, "mysql", "mariadb", "sqlite")
167 def compile_char_zero(type_: TypeDecorator, compiler: Any, **kw: Any) -> str:
168  """Compile UnusedDateTime and Unused as CHAR(0) on mysql, mariadb, and sqlite."""
169  return "CHAR(0)" # Uses 1 byte on MySQL (no change on sqlite)
170 
171 
172 @compiles(Unused, "postgresql")
173 def compile_char_one(type_: TypeDecorator, compiler: Any, **kw: Any) -> str:
174  """Compile Unused as CHAR(1) on postgresql."""
175  return "CHAR(1)" # Uses 1 byte
176 
177 
178 class FAST_PYSQLITE_DATETIME(sqlite.DATETIME):
179  """Use ciso8601 to parse datetimes instead of sqlalchemy built-in regex."""
180 
181  def result_processor(self, dialect: Dialect, coltype: Any) -> Callable | None:
182  """Offload the datetime parsing to ciso8601."""
183  return lambda value: None if value is None else ciso8601.parse_datetime(value)
184 
185 
186 class NativeLargeBinary(LargeBinary):
187  """A faster version of LargeBinary for engines that support python bytes natively."""
188 
189  def result_processor(self, dialect: Dialect, coltype: Any) -> Callable | None:
190  """No conversion needed for engines that support native bytes."""
191  return None
192 
193 
194 # Although all integers are same in SQLite, it does not allow an identity column to be BIGINT
195 # https://sqlite.org/forum/info/2dfa968a702e1506e885cb06d92157d492108b22bf39459506ab9f7125bca7fd
196 ID_TYPE = BigInteger().with_variant(sqlite.INTEGER, "sqlite")
197 # For MariaDB and MySQL we can use an unsigned integer type since it will fit 2**32
198 # for sqlite and postgresql we use a bigint
199 UINT_32_TYPE = BigInteger().with_variant(
200  mysql.INTEGER(unsigned=True), # type: ignore[no-untyped-call]
201  "mysql",
202  "mariadb",
203 )
204 JSON_VARIANT_CAST = Text().with_variant(
205  postgresql.JSON(none_as_null=True), # type: ignore[no-untyped-call]
206  "postgresql",
207 )
208 JSONB_VARIANT_CAST = Text().with_variant(
209  postgresql.JSONB(none_as_null=True), # type: ignore[no-untyped-call]
210  "postgresql",
211 )
212 DATETIME_TYPE = (
213  DateTime(timezone=True)
214  .with_variant(mysql.DATETIME(timezone=True, fsp=6), "mysql", "mariadb") # type: ignore[no-untyped-call]
215  .with_variant(FAST_PYSQLITE_DATETIME(), "sqlite") # type: ignore[no-untyped-call]
216 )
217 DOUBLE_TYPE = (
218  Float()
219  .with_variant(mysql.DOUBLE(asdecimal=False), "mysql", "mariadb") # type: ignore[no-untyped-call]
220  .with_variant(oracle.DOUBLE_PRECISION(), "oracle")
221  .with_variant(postgresql.DOUBLE_PRECISION(), "postgresql")
222 )
223 UNUSED_LEGACY_COLUMN = Unused(0)
224 UNUSED_LEGACY_DATETIME_COLUMN = UnusedDateTime(timezone=True)
225 UNUSED_LEGACY_INTEGER_COLUMN = SmallInteger()
226 DOUBLE_PRECISION_TYPE_SQL = "DOUBLE PRECISION"
227 BIG_INTEGER_SQL = "BIGINT"
228 CONTEXT_BINARY_TYPE = LargeBinary(CONTEXT_ID_BIN_MAX_LENGTH).with_variant(
229  NativeLargeBinary(CONTEXT_ID_BIN_MAX_LENGTH), "mysql", "mariadb", "sqlite"
230 )
231 
232 TIMESTAMP_TYPE = DOUBLE_TYPE
233 
234 
235 class JSONLiteral(JSON):
236  """Teach SA how to literalize json."""
237 
238  def literal_processor(self, dialect: Dialect) -> Callable[[Any], str]:
239  """Processor to convert a value to JSON."""
240 
241  def process(value: Any) -> str:
242  """Dump json."""
243  return JSON_DUMP(value)
244 
245  return process
246 
247 
248 EVENT_ORIGIN_ORDER = [EventOrigin.local, EventOrigin.remote]
249 
250 
251 class Events(Base):
252  """Event history data."""
253 
254  __table_args__ = (
255  # Used for fetching events at a specific time
256  # see logbook
257  Index(
258  "ix_events_event_type_id_time_fired_ts", "event_type_id", "time_fired_ts"
259  ),
260  Index(
261  EVENTS_CONTEXT_ID_BIN_INDEX,
262  "context_id_bin",
263  mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
264  mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
265  ),
266  _DEFAULT_TABLE_ARGS,
267  )
268  __tablename__ = TABLE_EVENTS
269  event_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
270  event_type: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
271  event_data: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
272  origin: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
273  origin_idx: Mapped[int | None] = mapped_column(SmallInteger)
274  time_fired: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
275  time_fired_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
276  context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
277  context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
278  context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
279  data_id: Mapped[int | None] = mapped_column(
280  ID_TYPE, ForeignKey("event_data.data_id"), index=True
281  )
282  context_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
283  context_user_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
284  context_parent_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
285  event_type_id: Mapped[int | None] = mapped_column(
286  ID_TYPE, ForeignKey("event_types.event_type_id")
287  )
288  event_data_rel: Mapped[EventData | None] = relationship("EventData")
289  event_type_rel: Mapped[EventTypes | None] = relationship("EventTypes")
290 
291  def __repr__(self) -> str:
292  """Return string representation of instance for debugging."""
293  return (
294  "<recorder.Events("
295  f"id={self.event_id}, event_type_id='{self.event_type_id}', "
296  f"origin_idx='{self.origin_idx}', time_fired='{self._time_fired_isotime}'"
297  f", data_id={self.data_id})>"
298  )
299 
300  @property
301  def _time_fired_isotime(self) -> str | None:
302  """Return time_fired as an isotime string."""
303  date_time: datetime | None
304  if self.time_fired_ts is not None:
305  date_time = dt_util.utc_from_timestamp(self.time_fired_ts)
306  else:
307  date_time = process_timestamp(self.time_fired)
308  if date_time is None:
309  return None
310  return date_time.isoformat(sep=" ", timespec="seconds")
311 
312  @staticmethod
313  def from_event(event: Event) -> Events:
314  """Create an event database object from a native event."""
315  context = event.context
316  return Events(
317  event_type=None,
318  event_data=None,
319  origin_idx=event.origin.idx,
320  time_fired=None,
321  time_fired_ts=event.time_fired_timestamp,
322  context_id=None,
323  context_id_bin=ulid_to_bytes_or_none(context.id),
324  context_user_id=None,
325  context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id),
326  context_parent_id=None,
327  context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id),
328  )
329 
330  def to_native(self, validate_entity_id: bool = True) -> Event | None:
331  """Convert to a native HA Event."""
332  context = Context(
333  id=bytes_to_ulid_or_none(self.context_id_bin),
334  user_id=bytes_to_uuid_hex_or_none(self.context_user_id_bin),
335  parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin),
336  )
337  try:
338  return Event(
339  self.event_type or "",
340  json_loads_object(self.event_data) if self.event_data else {},
341  EventOrigin(self.origin)
342  if self.origin
343  else EVENT_ORIGIN_ORDER[self.origin_idx or 0],
344  self.time_fired_ts or 0,
345  context=context,
346  )
347  except JSON_DECODE_EXCEPTIONS:
348  # When json_loads fails
349  _LOGGER.exception("Error converting to event: %s", self)
350  return None
351 
352 
354  """Event data history."""
355 
356  __table_args__ = (_DEFAULT_TABLE_ARGS,)
357  __tablename__ = TABLE_EVENT_DATA
358  data_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
359  hash: Mapped[int | None] = mapped_column(UINT_32_TYPE, index=True)
360  # Note that this is not named attributes to avoid confusion with the states table
361  shared_data: Mapped[str | None] = mapped_column(
362  Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
363  )
364 
365  def __repr__(self) -> str:
366  """Return string representation of instance for debugging."""
367  return (
368  "<recorder.EventData("
369  f"id={self.data_id}, hash='{self.hash}', data='{self.shared_data}'"
370  ")>"
371  )
372 
373  @staticmethod
375  event: Event, dialect: SupportedDialect | None
376  ) -> bytes:
377  """Create shared_data from an event."""
378  encoder = json_bytes_strip_null if dialect == PSQL_DIALECT else json_bytes
379  bytes_result = encoder(event.data)
380  if len(bytes_result) > MAX_EVENT_DATA_BYTES:
381  _LOGGER.warning(
382  "Event data for %s exceed maximum size of %s bytes. "
383  "This can cause database performance issues; Event data "
384  "will not be stored",
385  event.event_type,
386  MAX_EVENT_DATA_BYTES,
387  )
388  return b"{}"
389  return bytes_result
390 
391  @staticmethod
392  def hash_shared_data_bytes(shared_data_bytes: bytes) -> int:
393  """Return the hash of json encoded shared data."""
394  return fnv1a_32(shared_data_bytes)
395 
396  def to_native(self) -> dict[str, Any]:
397  """Convert to an event data dictionary."""
398  shared_data = self.shared_data
399  if shared_data is None:
400  return {}
401  try:
402  return cast(dict[str, Any], json_loads(shared_data))
403  except JSON_DECODE_EXCEPTIONS:
404  _LOGGER.exception("Error converting row to event data: %s", self)
405  return {}
406 
407 
409  """Event type history."""
410 
411  __table_args__ = (_DEFAULT_TABLE_ARGS,)
412  __tablename__ = TABLE_EVENT_TYPES
413  event_type_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
414  event_type: Mapped[str | None] = mapped_column(
415  String(MAX_LENGTH_EVENT_EVENT_TYPE), index=True, unique=True
416  )
417 
418  def __repr__(self) -> str:
419  """Return string representation of instance for debugging."""
420  return (
421  "<recorder.EventTypes("
422  f"id={self.event_type_id}, event_type='{self.event_type}'"
423  ")>"
424  )
425 
426 
427 class States(Base):
428  """State change history."""
429 
430  __table_args__ = (
431  # Used for fetching the state of entities at a specific time
432  # (get_states in history.py)
433  Index(METADATA_ID_LAST_UPDATED_INDEX_TS, "metadata_id", "last_updated_ts"),
434  Index(
435  STATES_CONTEXT_ID_BIN_INDEX,
436  "context_id_bin",
437  mysql_length=CONTEXT_ID_BIN_MAX_LENGTH,
438  mariadb_length=CONTEXT_ID_BIN_MAX_LENGTH,
439  ),
440  _DEFAULT_TABLE_ARGS,
441  )
442  __tablename__ = TABLE_STATES
443  state_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
444  entity_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
445  state: Mapped[str | None] = mapped_column(String(MAX_LENGTH_STATE_STATE))
446  attributes: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
447  event_id: Mapped[int | None] = mapped_column(UNUSED_LEGACY_INTEGER_COLUMN)
448  last_changed: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
449  last_changed_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
450  last_reported_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
451  last_updated: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
452  last_updated_ts: Mapped[float | None] = mapped_column(
453  TIMESTAMP_TYPE, default=time.time, index=True
454  )
455  old_state_id: Mapped[int | None] = mapped_column(
456  ID_TYPE, ForeignKey("states.state_id"), index=True
457  )
458  attributes_id: Mapped[int | None] = mapped_column(
459  ID_TYPE, ForeignKey("state_attributes.attributes_id"), index=True
460  )
461  context_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
462  context_user_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
463  context_parent_id: Mapped[str | None] = mapped_column(UNUSED_LEGACY_COLUMN)
464  origin_idx: Mapped[int | None] = mapped_column(
465  SmallInteger
466  ) # 0 is local, 1 is remote
467  old_state: Mapped[States | None] = relationship("States", remote_side=[state_id])
468  state_attributes: Mapped[StateAttributes | None] = relationship("StateAttributes")
469  context_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
470  context_user_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
471  context_parent_id_bin: Mapped[bytes | None] = mapped_column(CONTEXT_BINARY_TYPE)
472  metadata_id: Mapped[int | None] = mapped_column(
473  ID_TYPE, ForeignKey("states_meta.metadata_id")
474  )
475  states_meta_rel: Mapped[StatesMeta | None] = relationship("StatesMeta")
476 
477  def __repr__(self) -> str:
478  """Return string representation of instance for debugging."""
479  return (
480  f"<recorder.States(id={self.state_id}, entity_id='{self.entity_id}'"
481  f" metadata_id={self.metadata_id},"
482  f" state='{self.state}', event_id='{self.event_id}',"
483  f" last_updated='{self._last_updated_isotime}',"
484  f" old_state_id={self.old_state_id}, attributes_id={self.attributes_id})>"
485  )
486 
487  @property
488  def _last_updated_isotime(self) -> str | None:
489  """Return last_updated as an isotime string."""
490  date_time: datetime | None
491  if self.last_updated_ts is not None:
492  date_time = dt_util.utc_from_timestamp(self.last_updated_ts)
493  else:
494  date_time = process_timestamp(self.last_updated)
495  if date_time is None:
496  return None
497  return date_time.isoformat(sep=" ", timespec="seconds")
498 
499  @staticmethod
500  def from_event(event: Event[EventStateChangedData]) -> States:
501  """Create object from a state_changed event."""
502  state = event.data["new_state"]
503  # None state means the state was removed from the state machine
504  if state is None:
505  state_value = ""
506  last_updated_ts = event.time_fired_timestamp
507  last_changed_ts = None
508  last_reported_ts = None
509  else:
510  state_value = state.state
511  last_updated_ts = state.last_updated_timestamp
512  if state.last_updated == state.last_changed:
513  last_changed_ts = None
514  else:
515  last_changed_ts = state.last_changed_timestamp
516  if state.last_updated == state.last_reported:
517  last_reported_ts = None
518  else:
519  last_reported_ts = state.last_reported_timestamp
520  context = event.context
521  return States(
522  state=state_value,
523  entity_id=event.data["entity_id"],
524  attributes=None,
525  context_id=None,
526  context_id_bin=ulid_to_bytes_or_none(context.id),
527  context_user_id=None,
528  context_user_id_bin=uuid_hex_to_bytes_or_none(context.user_id),
529  context_parent_id=None,
530  context_parent_id_bin=ulid_to_bytes_or_none(context.parent_id),
531  origin_idx=event.origin.idx,
532  last_updated=None,
533  last_changed=None,
534  last_updated_ts=last_updated_ts,
535  last_changed_ts=last_changed_ts,
536  last_reported_ts=last_reported_ts,
537  )
538 
539  def to_native(self, validate_entity_id: bool = True) -> State | None:
540  """Convert to an HA state object."""
541  context = Context(
542  id=bytes_to_ulid_or_none(self.context_id_bin),
543  user_id=bytes_to_uuid_hex_or_none(self.context_user_id_bin),
544  parent_id=bytes_to_ulid_or_none(self.context_parent_id_bin),
545  )
546  try:
547  attrs = json_loads_object(self.attributes) if self.attributes else {}
548  except JSON_DECODE_EXCEPTIONS:
549  # When json_loads fails
550  _LOGGER.exception("Error converting row to state: %s", self)
551  return None
552  last_updated = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
553  if self.last_changed_tslast_changed_ts is None or self.last_changed_tslast_changed_ts == self.last_updated_ts:
554  last_changed = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
555  else:
556  last_changed = dt_util.utc_from_timestamp(self.last_changed_tslast_changed_ts or 0)
557  if (
558  self.last_reported_tslast_reported_ts is None
559  or self.last_reported_tslast_reported_ts == self.last_updated_ts
560  ):
561  last_reported = dt_util.utc_from_timestamp(self.last_updated_ts or 0)
562  else:
563  last_reported = dt_util.utc_from_timestamp(self.last_reported_tslast_reported_ts or 0)
564  return State(
565  self.entity_id or "",
566  self.state, # type: ignore[arg-type]
567  # Join the state_attributes table on attributes_id to get the attributes
568  # for newer states
569  attrs,
570  last_changed=last_changed,
571  last_reported=last_reported,
572  last_updated=last_updated,
573  context=context,
574  validate_entity_id=validate_entity_id,
575  )
576 
577 
579  """State attribute change history."""
580 
581  __table_args__ = (_DEFAULT_TABLE_ARGS,)
582  __tablename__ = TABLE_STATE_ATTRIBUTES
583  attributes_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
584  hash: Mapped[int | None] = mapped_column(UINT_32_TYPE, index=True)
585  # Note that this is not named attributes to avoid confusion with the states table
586  shared_attrs: Mapped[str | None] = mapped_column(
587  Text().with_variant(mysql.LONGTEXT, "mysql", "mariadb")
588  )
589 
590  def __repr__(self) -> str:
591  """Return string representation of instance for debugging."""
592  return (
593  f"<recorder.StateAttributes(id={self.attributes_id}, hash='{self.hash}',"
594  f" attributes='{self.shared_attrs}')>"
595  )
596 
597  @staticmethod
599  event: Event[EventStateChangedData],
600  dialect: SupportedDialect | None,
601  ) -> bytes:
602  """Create shared_attrs from a state_changed event."""
603  # None state means the state was removed from the state machine
604  if (state := event.data["new_state"]) is None:
605  return b"{}"
606  if state_info := state.state_info:
607  unrecorded_attributes = state_info["unrecorded_attributes"]
608  exclude_attrs = {
609  *ALL_DOMAIN_EXCLUDE_ATTRS,
610  *unrecorded_attributes,
611  }
612  if MATCH_ALL in unrecorded_attributes:
613  # Don't exclude device class, state class, unit of measurement
614  # or friendly name when using the MATCH_ALL exclude constant
615  exclude_attrs.update(state.attributes)
616  exclude_attrs -= _MATCH_ALL_KEEP
617  else:
618  exclude_attrs = ALL_DOMAIN_EXCLUDE_ATTRS
619  encoder = json_bytes_strip_null if dialect == PSQL_DIALECT else json_bytes
620  bytes_result = encoder(
621  {k: v for k, v in state.attributes.items() if k not in exclude_attrs}
622  )
623  if len(bytes_result) > MAX_STATE_ATTRS_BYTES:
624  _LOGGER.warning(
625  "State attributes for %s exceed maximum size of %s bytes. "
626  "This can cause database performance issues; Attributes "
627  "will not be stored",
628  state.entity_id,
629  MAX_STATE_ATTRS_BYTES,
630  )
631  return b"{}"
632  return bytes_result
633 
634  @staticmethod
635  def hash_shared_attrs_bytes(shared_attrs_bytes: bytes) -> int:
636  """Return the hash of json encoded shared attributes."""
637  return fnv1a_32(shared_attrs_bytes)
638 
639  def to_native(self) -> dict[str, Any]:
640  """Convert to a state attributes dictionary."""
641  shared_attrs = self.shared_attrs
642  if shared_attrs is None:
643  return {}
644  try:
645  return cast(dict[str, Any], json_loads(shared_attrs))
646  except JSON_DECODE_EXCEPTIONS:
647  # When json_loads fails
648  _LOGGER.exception("Error converting row to state attributes: %s", self)
649  return {}
650 
651 
653  """Metadata for states."""
654 
655  __table_args__ = (_DEFAULT_TABLE_ARGS,)
656  __tablename__ = TABLE_STATES_META
657  metadata_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
658  entity_id: Mapped[str | None] = mapped_column(
659  String(MAX_LENGTH_STATE_ENTITY_ID), index=True, unique=True
660  )
661 
662  def __repr__(self) -> str:
663  """Return string representation of instance for debugging."""
664  return (
665  "<recorder.StatesMeta("
666  f"id={self.metadata_id}, entity_id='{self.entity_id}'"
667  ")>"
668  )
669 
670 
672  """Statistics base class."""
673 
674  id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
675  created: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
676  created_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, default=time.time)
677  metadata_id: Mapped[int | None] = mapped_column(
678  ID_TYPE,
679  ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
680  )
681  start: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
682  start_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE, index=True)
683  mean: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
684  min: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
685  max: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
686  last_reset: Mapped[datetime | None] = mapped_column(UNUSED_LEGACY_DATETIME_COLUMN)
687  last_reset_ts: Mapped[float | None] = mapped_column(TIMESTAMP_TYPE)
688  state: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
689  sum: Mapped[float | None] = mapped_column(DOUBLE_TYPE)
690 
691  duration: timedelta
692 
693  @classmethod
694  def from_stats(cls, metadata_id: int, stats: StatisticData) -> Self:
695  """Create object from a statistics with datetime objects."""
696  return cls( # type: ignore[call-arg]
697  metadata_id=metadata_id,
698  created=None,
699  created_ts=time.time(),
700  start=None,
701  start_ts=stats["start"].timestamp(),
702  mean=stats.get("mean"),
703  min=stats.get("min"),
704  max=stats.get("max"),
705  last_reset=None,
706  last_reset_ts=datetime_to_timestamp_or_none(stats.get("last_reset")),
707  state=stats.get("state"),
708  sum=stats.get("sum"),
709  )
710 
711  @classmethod
712  def from_stats_ts(cls, metadata_id: int, stats: StatisticDataTimestamp) -> Self:
713  """Create object from a statistics with timestamps."""
714  return cls( # type: ignore[call-arg]
715  metadata_id=metadata_id,
716  created=None,
717  created_ts=time.time(),
718  start=None,
719  start_ts=stats["start_ts"],
720  mean=stats.get("mean"),
721  min=stats.get("min"),
722  max=stats.get("max"),
723  last_reset=None,
724  last_reset_ts=stats.get("last_reset_ts"),
725  state=stats.get("state"),
726  sum=stats.get("sum"),
727  )
728 
729 
731  """Long term statistics."""
732 
733  duration = timedelta(hours=1)
734 
735  __table_args__ = (
736  # Used for fetching statistics for a certain entity at a specific time
737  Index(
738  "ix_statistics_statistic_id_start_ts",
739  "metadata_id",
740  "start_ts",
741  unique=True,
742  ),
743  _DEFAULT_TABLE_ARGS,
744  )
745  __tablename__ = TABLE_STATISTICS
746 
747 
749  """Short term statistics."""
750 
751  duration = timedelta(minutes=5)
752 
753  __tablename__ = TABLE_STATISTICS_SHORT_TERM
754 
755 
757  """Short term statistics."""
758 
759  __table_args__ = (
760  # Used for fetching statistics for a certain entity at a specific time
761  Index(
762  "ix_statistics_short_term_statistic_id_start_ts",
763  "metadata_id",
764  "start_ts",
765  unique=True,
766  ),
767  _DEFAULT_TABLE_ARGS,
768  )
769 
770 
772  """Short term statistics with 32-bit index, used for schema migration."""
773 
774  __table_args__ = (
775  # Used for fetching statistics for a certain entity at a specific time
776  Index(
777  "ix_statistics_short_term_statistic_id_start_ts",
778  "metadata_id",
779  "start_ts",
780  unique=True,
781  ),
782  _DEFAULT_TABLE_ARGS,
783  )
784 
785  metadata_id: Mapped[int | None] = mapped_column(
786  Integer,
787  ForeignKey(f"{TABLE_STATISTICS_META}.id", ondelete="CASCADE"),
788  use_existing_column=True,
789  )
790 
791 
793  """Statistics meta data."""
794 
795  __table_args__ = (_DEFAULT_TABLE_ARGS,)
796  __tablename__ = TABLE_STATISTICS_META
797  id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
798  statistic_id: Mapped[str | None] = mapped_column(
799  String(255), index=True, unique=True
800  )
801  source: Mapped[str | None] = mapped_column(String(32))
802  unit_of_measurement: Mapped[str | None] = mapped_column(String(255))
803  has_mean: Mapped[bool | None] = mapped_column(Boolean)
804  has_sum: Mapped[bool | None] = mapped_column(Boolean)
805  name: Mapped[str | None] = mapped_column(String(255))
806 
807  @staticmethod
808  def from_meta(meta: StatisticMetaData) -> StatisticsMeta:
809  """Create object from meta data."""
810  return StatisticsMeta(**meta)
811 
812 
814  """Statistics meta data."""
815 
816 
817 class LegacyStatisticsMeta(LegacyBase, _StatisticsMeta):
818  """Statistics meta data with 32-bit index, used for schema migration."""
819 
820  id: Mapped[int] = mapped_column(
821  Integer,
822  Identity(),
823  primary_key=True,
824  use_existing_column=True,
825  )
826 
827 
829  """Representation of recorder run."""
830 
831  __table_args__ = (
832  Index("ix_recorder_runs_start_end", "start", "end"),
833  _DEFAULT_TABLE_ARGS,
834  )
835  __tablename__ = TABLE_RECORDER_RUNS
836  run_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
837  start: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow)
838  end: Mapped[datetime | None] = mapped_column(DATETIME_TYPE)
839  closed_incorrect: Mapped[bool] = mapped_column(Boolean, default=False)
840  created: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow)
841 
842  def __repr__(self) -> str:
843  """Return string representation of instance for debugging."""
844  end = (
845  f"'{self.end.isoformat(sep=' ', timespec='seconds')}'" if self.end else None
846  )
847  return (
848  f"<recorder.RecorderRuns(id={self.run_id},"
849  f" start='{self.start.isoformat(sep=' ', timespec='seconds')}', end={end},"
850  f" closed_incorrect={self.closed_incorrect},"
851  f" created='{self.created.isoformat(sep=' ', timespec='seconds')}')>"
852  )
853 
854  def to_native(self, validate_entity_id: bool = True) -> Self:
855  """Return self, native format is this model."""
856  return self
857 
858 
860  """Representation of migration changes."""
861 
862  __tablename__ = TABLE_MIGRATION_CHANGES
863  __table_args__ = (_DEFAULT_TABLE_ARGS,)
864 
865  migration_id: Mapped[str] = mapped_column(String(255), primary_key=True)
866  version: Mapped[int] = mapped_column(SmallInteger)
867 
868 
870  """Representation of schema version changes."""
871 
872  __tablename__ = TABLE_SCHEMA_CHANGES
873  __table_args__ = (_DEFAULT_TABLE_ARGS,)
874 
875  change_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
876  schema_version: Mapped[int | None] = mapped_column(Integer)
877  changed: Mapped[datetime] = mapped_column(DATETIME_TYPE, default=dt_util.utcnow)
878 
879  def __repr__(self) -> str:
880  """Return string representation of instance for debugging."""
881  return (
882  "<recorder.SchemaChanges("
883  f"id={self.change_id}, schema_version={self.schema_version}, "
884  f"changed='{self.changed.isoformat(sep=' ', timespec='seconds')}'"
885  ")>"
886  )
887 
888 
890  """Representation of statistics run."""
891 
892  __tablename__ = TABLE_STATISTICS_RUNS
893  __table_args__ = (_DEFAULT_TABLE_ARGS,)
894 
895  run_id: Mapped[int] = mapped_column(ID_TYPE, Identity(), primary_key=True)
896  start: Mapped[datetime] = mapped_column(DATETIME_TYPE, index=True)
897 
898  def __repr__(self) -> str:
899  """Return string representation of instance for debugging."""
900  return (
901  f"<recorder.StatisticsRuns(id={self.run_id},"
902  f" start='{self.start.isoformat(sep=' ', timespec='seconds')}', )>"
903  )
904 
905 
906 EVENT_DATA_JSON = type_coerce(
907  EventData.shared_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True)
908 )
909 OLD_FORMAT_EVENT_DATA_JSON = type_coerce(
910  Events.event_data.cast(JSONB_VARIANT_CAST), JSONLiteral(none_as_null=True)
911 )
912 
913 SHARED_ATTRS_JSON = type_coerce(
914  StateAttributes.shared_attrs.cast(JSON_VARIANT_CAST), JSON(none_as_null=True)
915 )
916 OLD_FORMAT_ATTRS_JSON = type_coerce(
917  States.attributes.cast(JSON_VARIANT_CAST), JSON(none_as_null=True)
918 )
919 
920 ENTITY_ID_IN_EVENT: ColumnElement = EVENT_DATA_JSON["entity_id"]
921 OLD_ENTITY_ID_IN_EVENT: ColumnElement = OLD_FORMAT_EVENT_DATA_JSON["entity_id"]
922 DEVICE_ID_IN_EVENT: ColumnElement = EVENT_DATA_JSON["device_id"]
923 OLD_STATE = aliased(States, name="old_state")
924 
925 SHARED_ATTR_OR_LEGACY_ATTRIBUTES = case(
926  (StateAttributes.shared_attrs.is_(None), States.attributes),
927  else_=StateAttributes.shared_attrs,
928 ).label("attributes")
929 SHARED_DATA_OR_LEGACY_EVENT_DATA = case(
930  (EventData.shared_data.is_(None), Events.event_data), else_=EventData.shared_data
931 ).label("event_data")
int hash_shared_data_bytes(bytes shared_data_bytes)
Definition: db_schema.py:392
bytes shared_data_bytes_from_event(Event event, SupportedDialect|None dialect)
Definition: db_schema.py:376
Event|None to_native(self, bool validate_entity_id=True)
Definition: db_schema.py:330
Callable|None result_processor(self, Dialect dialect, Any coltype)
Definition: db_schema.py:181
Callable[[Any], str] literal_processor(self, Dialect dialect)
Definition: db_schema.py:238
Callable|None result_processor(self, Dialect dialect, Any coltype)
Definition: db_schema.py:189
Self to_native(self, bool validate_entity_id=True)
Definition: db_schema.py:854
bytes shared_attrs_bytes_from_event(Event[EventStateChangedData] event, SupportedDialect|None dialect)
Definition: db_schema.py:601
State|None to_native(self, bool validate_entity_id=True)
Definition: db_schema.py:539
States from_event(Event[EventStateChangedData] event)
Definition: db_schema.py:500
Self from_stats(cls, int metadata_id, StatisticData stats)
Definition: db_schema.py:694
Self from_stats_ts(cls, int metadata_id, StatisticDataTimestamp stats)
Definition: db_schema.py:712
StatisticsMeta from_meta(StatisticMetaData meta)
Definition: db_schema.py:808
str compile_char_zero(TypeDecorator type_, Any compiler, **Any kw)
Definition: db_schema.py:167
str compile_char_one(TypeDecorator type_, Any compiler, **Any kw)
Definition: db_schema.py:173
str|None bytes_to_uuid_hex_or_none(bytes|None _bytes)
Definition: context.py:31
bytes|None uuid_hex_to_bytes_or_none(str|None uuid_hex)
Definition: context.py:21
float|None datetime_to_timestamp_or_none(datetime|None dt)
Definition: time.py:55
JsonObjectType json_loads_object(bytes|bytearray|memoryview|str obj)
Definition: json.py:54