Home Assistant Unofficial Reference 2024.12.1
common.py
Go to the documentation of this file.
1 """Queries for logbook."""
2 
3 from __future__ import annotations
4 
5 from typing import Final
6 
7 import sqlalchemy
8 from sqlalchemy import select
9 from sqlalchemy.sql.elements import BooleanClauseList, ColumnElement
10 from sqlalchemy.sql.expression import literal
11 from sqlalchemy.sql.selectable import Select
12 
14  EVENTS_CONTEXT_ID_BIN_INDEX,
15  OLD_FORMAT_ATTRS_JSON,
16  OLD_STATE,
17  SHARED_ATTRS_JSON,
18  SHARED_DATA_OR_LEGACY_EVENT_DATA,
19  STATES_CONTEXT_ID_BIN_INDEX,
20  EventData,
21  Events,
22  EventTypes,
23  StateAttributes,
24  States,
25  StatesMeta,
26 )
27 from homeassistant.components.recorder.filters import like_domain_matchers
28 
29 from ..const import ALWAYS_CONTINUOUS_DOMAINS, CONDITIONALLY_CONTINUOUS_DOMAINS
30 
31 # Domains that are continuous if there is a UOM set on the entity
32 CONDITIONALLY_CONTINUOUS_ENTITY_ID_LIKE = like_domain_matchers(
33  CONDITIONALLY_CONTINUOUS_DOMAINS
34 )
35 # Domains that are always continuous
36 ALWAYS_CONTINUOUS_ENTITY_ID_LIKE = like_domain_matchers(ALWAYS_CONTINUOUS_DOMAINS)
37 
38 UNIT_OF_MEASUREMENT_JSON = '"unit_of_measurement":'
39 UNIT_OF_MEASUREMENT_JSON_LIKE = f"%{UNIT_OF_MEASUREMENT_JSON}%"
40 
41 ICON_OR_OLD_FORMAT_ICON_JSON = sqlalchemy.case(
42  (SHARED_ATTRS_JSON["icon"].is_(None), OLD_FORMAT_ATTRS_JSON["icon"].as_string()),
43  else_=SHARED_ATTRS_JSON["icon"].as_string(),
44 ).label("icon")
45 
46 PSEUDO_EVENT_STATE_CHANGED: Final = None
47 # Since we don't store event_types and None
48 # and we don't store state_changed in events
49 # we use a NULL for state_changed events
50 # when we synthesize them from the states table
51 # since it avoids another column being sent
52 # in the payload
53 
54 EVENT_COLUMNS = (
55  Events.event_id.label("row_id"),
56  EventTypes.event_type.label("event_type"),
57  SHARED_DATA_OR_LEGACY_EVENT_DATA,
58  Events.time_fired_ts.label("time_fired_ts"),
59  Events.context_id_bin.label("context_id_bin"),
60  Events.context_user_id_bin.label("context_user_id_bin"),
61  Events.context_parent_id_bin.label("context_parent_id_bin"),
62 )
63 
64 STATE_COLUMNS = (
65  States.state.label("state"),
66  StatesMeta.entity_id.label("entity_id"),
67  ICON_OR_OLD_FORMAT_ICON_JSON,
68 )
69 
70 STATE_CONTEXT_ONLY_COLUMNS = (
71  States.state.label("state"),
72  StatesMeta.entity_id.label("entity_id"),
73  literal(value=None, type_=sqlalchemy.String).label("icon"),
74 )
75 
76 EVENT_COLUMNS_FOR_STATE_SELECT = (
77  States.state_id.label("row_id"),
78  # We use PSEUDO_EVENT_STATE_CHANGED aka None for
79  # state_changed events since it takes up less
80  # space in the response and every row has to be
81  # marked with the event_type
82  literal(value=PSEUDO_EVENT_STATE_CHANGED, type_=sqlalchemy.String).label(
83  "event_type"
84  ),
85  literal(value=None, type_=sqlalchemy.Text).label("event_data"),
86  States.last_updated_ts.label("time_fired_ts"),
87  States.context_id_bin.label("context_id_bin"),
88  States.context_user_id_bin.label("context_user_id_bin"),
89  States.context_parent_id_bin.label("context_parent_id_bin"),
90 )
91 
92 EMPTY_STATE_COLUMNS = (
93  literal(value=None, type_=sqlalchemy.String).label("state"),
94  literal(value=None, type_=sqlalchemy.String).label("entity_id"),
95  literal(value=None, type_=sqlalchemy.String).label("icon"),
96 )
97 
98 
99 EVENT_ROWS_NO_STATES = (
100  *EVENT_COLUMNS,
101  *EMPTY_STATE_COLUMNS,
102 )
103 
104 # Virtual column to tell logbook if it should avoid processing
105 # the event as its only used to link contexts
106 CONTEXT_ONLY = literal(value="1", type_=sqlalchemy.String).label("context_only")
107 NOT_CONTEXT_ONLY = literal(value=None, type_=sqlalchemy.String).label("context_only")
108 
109 
111  start_day: float,
112  end_day: float,
113  event_type_ids: tuple[int, ...],
114 ) -> Select:
115  """Generate the select for a context_id subquery."""
116  return (
117  select(Events.context_id_bin)
118  .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
119  .where(Events.event_type_id.in_(event_type_ids))
120  .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
121  .outerjoin(EventData, (Events.data_id == EventData.data_id))
122  )
123 
124 
126  """Generate an events query that mark them as for context_only.
127 
128  By marking them as context_only we know they are only for
129  linking context ids and we can avoid processing them.
130  """
131  return select(*EVENT_ROWS_NO_STATES, CONTEXT_ONLY)
132 
133 
135  """Generate an states query that mark them as for context_only.
136 
137  By marking them as context_only we know they are only for
138  linking context ids and we can avoid processing them.
139  """
140  return select(
141  *EVENT_COLUMNS_FOR_STATE_SELECT, *STATE_CONTEXT_ONLY_COLUMNS, CONTEXT_ONLY
142  )
143 
144 
146  start_day: float, end_day: float, event_type_ids: tuple[int, ...]
147 ) -> Select:
148  """Generate an events select that does not join states."""
149  return (
150  select(*EVENT_ROWS_NO_STATES, NOT_CONTEXT_ONLY)
151  .where((Events.time_fired_ts > start_day) & (Events.time_fired_ts < end_day))
152  .where(Events.event_type_id.in_(event_type_ids))
153  .outerjoin(EventTypes, (Events.event_type_id == EventTypes.event_type_id))
154  .outerjoin(EventData, (Events.data_id == EventData.data_id))
155  )
156 
157 
158 def select_states() -> Select:
159  """Generate a states select that formats the states table as event rows."""
160  return select(
161  *EVENT_COLUMNS_FOR_STATE_SELECT,
162  *STATE_COLUMNS,
163  NOT_CONTEXT_ONLY,
164  )
165 
166 
167 def apply_states_filters(sel: Select, start_day: float, end_day: float) -> Select:
168  """Filter states by time range.
169 
170  Filters states that do not have an old state or new state (added / removed)
171  Filters states that are in a continuous domain with a UOM.
172  Filters states that do not have matching last_updated_ts and last_changed_ts.
173  """
174  return (
175  sel.filter(
176  (States.last_updated_ts > start_day) & (States.last_updated_ts < end_day)
177  )
178  .outerjoin(OLD_STATE, (States.old_state_id == OLD_STATE.state_id))
179  .where(_missing_state_matcher())
181  .where(
182  (States.last_updated_ts == States.last_changed_ts)
183  | States.last_changed_ts.is_(None)
184  )
185  .outerjoin(
186  StateAttributes, (States.attributes_id == StateAttributes.attributes_id)
187  )
188  .outerjoin(StatesMeta, (States.metadata_id == StatesMeta.metadata_id))
189  )
190 
191 
192 def _missing_state_matcher() -> ColumnElement[bool]:
193  # The below removes state change events that do not have
194  # and old_state or the old_state is missing (newly added entities)
195  # or the new_state is missing (removed entities)
196  return sqlalchemy.and_(
197  OLD_STATE.state_id.is_not(None),
198  (States.state != OLD_STATE.state),
199  States.state.is_not(None),
200  )
201 
202 
203 def _not_continuous_entity_matcher() -> ColumnElement[bool]:
204  """Match non continuous entities."""
205  return sqlalchemy.or_(
206  # First exclude domains that may be continuous
208  # But let in the entities in the possible continuous domains
209  # that are not actually continuous sensors because they lack a UOM
210  sqlalchemy.and_(
211  _conditionally_continuous_domain_matcher, _not_uom_attributes_matcher()
212  ).self_group(),
213  )
214 
215 
216 def _not_possible_continuous_domain_matcher() -> ColumnElement[bool]:
217  """Match not continuous domains.
218 
219  This matches domain that are always considered continuous
220  and domains that are conditionally (if they have a UOM)
221  continuous domains.
222  """
223  return sqlalchemy.and_(
224  *[
225  ~StatesMeta.entity_id.like(entity_domain)
226  for entity_domain in (
227  *ALWAYS_CONTINUOUS_ENTITY_ID_LIKE,
228  *CONDITIONALLY_CONTINUOUS_ENTITY_ID_LIKE,
229  )
230  ],
231  ).self_group()
232 
233 
234 def _conditionally_continuous_domain_matcher() -> ColumnElement[bool]:
235  """Match conditionally continuous domains.
236 
237  This matches domain that are only considered
238  continuous if a UOM is set.
239  """
240  return sqlalchemy.or_(
241  *[
242  StatesMeta.entity_id.like(entity_domain)
243  for entity_domain in CONDITIONALLY_CONTINUOUS_ENTITY_ID_LIKE
244  ],
245  ).self_group()
246 
247 
248 def _not_uom_attributes_matcher() -> BooleanClauseList:
249  """Prefilter ATTR_UNIT_OF_MEASUREMENT as its much faster in sql."""
250  return ~StateAttributes.shared_attrs.like(
251  UNIT_OF_MEASUREMENT_JSON_LIKE
252  ) | ~States.attributes.like(UNIT_OF_MEASUREMENT_JSON_LIKE)
253 
254 
255 def apply_states_context_hints(sel: Select) -> Select:
256  """Force mysql to use the right index on large context_id selects."""
257  return sel.with_hint(
258  States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql"
259  ).with_hint(
260  States, f"FORCE INDEX ({STATES_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb"
261  )
262 
263 
264 def apply_events_context_hints(sel: Select) -> Select:
265  """Force mysql to use the right index on large context_id selects."""
266  return sel.with_hint(
267  Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mysql"
268  ).with_hint(
269  Events, f"FORCE INDEX ({EVENTS_CONTEXT_ID_BIN_INDEX})", dialect_name="mariadb"
270  )
Select apply_states_filters(Select sel, float start_day, float end_day)
Definition: common.py:167
ColumnElement[bool] _conditionally_continuous_domain_matcher()
Definition: common.py:234
ColumnElement[bool] _not_continuous_entity_matcher()
Definition: common.py:203
BooleanClauseList _not_uom_attributes_matcher()
Definition: common.py:248
Select select_events_without_states(float start_day, float end_day, tuple[int,...] event_type_ids)
Definition: common.py:147
Select select_events_context_id_subquery(float start_day, float end_day, tuple[int,...] event_type_ids)
Definition: common.py:114
ColumnElement[bool] _not_possible_continuous_domain_matcher()
Definition: common.py:216
ColumnElement[bool] _missing_state_matcher()
Definition: common.py:192
list[str] like_domain_matchers(Iterable[str] domains)
Definition: filters.py:295