Home Assistant Unofficial Reference 2024.12.1
filters.py
Go to the documentation of this file.
1 """Provide pre-made queries on top of the recorder component."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Collection, Iterable
6 from typing import Any
7 
8 from sqlalchemy import Column, Text, cast, not_, or_
9 from sqlalchemy.sql.elements import ColumnElement
10 
11 from homeassistant.const import CONF_DOMAINS, CONF_ENTITIES, CONF_EXCLUDE, CONF_INCLUDE
12 from homeassistant.helpers.entityfilter import CONF_ENTITY_GLOBS
13 from homeassistant.helpers.json import json_dumps
14 from homeassistant.helpers.typing import ConfigType
15 
16 from .db_schema import ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT, States, StatesMeta
17 
18 DOMAIN = "history"
19 HISTORY_FILTERS = "history_filters"
20 JSON_NULL = json_dumps(None)
21 
22 GLOB_TO_SQL_CHARS = {
23  ord("*"): "%",
24  ord("?"): "_",
25  ord("%"): "\\%",
26  ord("_"): "\\_",
27  ord("\\"): "\\\\",
28 }
29 
30 FILTER_TYPES = (CONF_EXCLUDE, CONF_INCLUDE)
31 FITLER_MATCHERS = (CONF_ENTITIES, CONF_DOMAINS, CONF_ENTITY_GLOBS)
32 
33 
34 def extract_include_exclude_filter_conf(conf: ConfigType) -> dict[str, Any]:
35  """Extract an include exclude filter from configuration.
36 
37  This makes a copy so we do not alter the original data.
38  """
39  return {
40  filter_type: {
41  matcher: set(conf.get(filter_type, {}).get(matcher) or [])
42  for matcher in FITLER_MATCHERS
43  }
44  for filter_type in FILTER_TYPES
45  }
46 
47 
49  base_filter: dict[str, Any], add_filter: dict[str, Any]
50 ) -> dict[str, Any]:
51  """Merge two filters.
52 
53  This makes a copy so we do not alter the original data.
54  """
55  return {
56  filter_type: {
57  matcher: base_filter[filter_type][matcher]
58  | add_filter[filter_type][matcher]
59  for matcher in FITLER_MATCHERS
60  }
61  for filter_type in FILTER_TYPES
62  }
63 
64 
65 def sqlalchemy_filter_from_include_exclude_conf(conf: ConfigType) -> Filters | None:
66  """Build a sql filter from config."""
67  exclude = conf.get(CONF_EXCLUDE, {})
68  include = conf.get(CONF_INCLUDE, {})
69  filters = Filters(
70  excluded_entities=exclude.get(CONF_ENTITIES, []),
71  excluded_domains=exclude.get(CONF_DOMAINS, []),
72  excluded_entity_globs=exclude.get(CONF_ENTITY_GLOBS, []),
73  included_entities=include.get(CONF_ENTITIES, []),
74  included_domains=include.get(CONF_DOMAINS, []),
75  included_entity_globs=include.get(CONF_ENTITY_GLOBS, []),
76  )
77  return filters if filters.has_config else None
78 
79 
80 class Filters:
81  """Container for the configured include and exclude filters.
82 
83  A filter must never change after it is created since it is used in a
84  cache key.
85  """
86 
87  def __init__(
88  self,
89  excluded_entities: Collection[str] | None = None,
90  excluded_domains: Collection[str] | None = None,
91  excluded_entity_globs: Collection[str] | None = None,
92  included_entities: Collection[str] | None = None,
93  included_domains: Collection[str] | None = None,
94  included_entity_globs: Collection[str] | None = None,
95  ) -> None:
96  """Initialise the include and exclude filters."""
97  self._excluded_entities_excluded_entities = excluded_entities or []
98  self._excluded_domains_excluded_domains = excluded_domains or []
99  self._excluded_entity_globs_excluded_entity_globs = excluded_entity_globs or []
100  self._included_entities_included_entities = included_entities or []
101  self._included_domains_included_domains = included_domains or []
102  self._included_entity_globs_included_entity_globs = included_entity_globs or []
103 
104  def __repr__(self) -> str:
105  """Return human readable excludes/includes."""
106  return (
107  "<Filters"
108  f" excluded_entities={self._excluded_entities}"
109  f" excluded_domains={self._excluded_domains}"
110  f" excluded_entity_globs={self._excluded_entity_globs}"
111  f" included_entities={self._included_entities}"
112  f" included_domains={self._included_domains}"
113  f" included_entity_globs={self._included_entity_globs}"
114  ">"
115  )
116 
117  @property
118  def has_config(self) -> bool:
119  """Determine if there is any filter configuration."""
120  return bool(self._have_exclude_have_exclude or self._have_include_have_include)
121 
122  @property
123  def _have_exclude(self) -> bool:
124  return bool(
125  self._excluded_entities_excluded_entities
126  or self._excluded_domains_excluded_domains
127  or self._excluded_entity_globs_excluded_entity_globs
128  )
129 
130  @property
131  def _have_include(self) -> bool:
132  return bool(
133  self._included_entities_included_entities
134  or self._included_domains_included_domains
135  or self._included_entity_globs_included_entity_globs
136  )
137 
139  self, columns: Iterable[Column], encoder: Callable[[Any], Any]
140  ) -> ColumnElement:
141  """Generate a filter from pre-computed sets and pattern lists.
142 
143  This must match exactly how homeassistant.helpers.entityfilter works.
144  """
145  i_domains = _domain_matcher(self._included_domains_included_domains, columns, encoder)
146  i_entities = _entity_matcher(self._included_entities_included_entities, columns, encoder)
147  i_entity_globs = _globs_to_like(self._included_entity_globs_included_entity_globs, columns, encoder)
148  includes = [i_domains, i_entities, i_entity_globs]
149 
150  e_domains = _domain_matcher(self._excluded_domains_excluded_domains, columns, encoder)
151  e_entities = _entity_matcher(self._excluded_entities_excluded_entities, columns, encoder)
152  e_entity_globs = _globs_to_like(self._excluded_entity_globs_excluded_entity_globs, columns, encoder)
153  excludes = [e_domains, e_entities, e_entity_globs]
154 
155  have_exclude = self._have_exclude_have_exclude
156  have_include = self._have_include_have_include
157 
158  # Case 1 - No filter
159  # - All entities included
160  if not have_include and not have_exclude:
161  raise RuntimeError(
162  "No filter configuration provided, check has_config before calling this method."
163  )
164 
165  # Case 2 - Only includes
166  # - Entity listed in entities include: include
167  # - Otherwise, entity matches domain include: include
168  # - Otherwise, entity matches glob include: include
169  # - Otherwise: exclude
170  if have_include and not have_exclude:
171  return or_(*includes).self_group()
172 
173  # Case 3 - Only excludes
174  # - Entity listed in exclude: exclude
175  # - Otherwise, entity matches domain exclude: exclude
176  # - Otherwise, entity matches glob exclude: exclude
177  # - Otherwise: include
178  if not have_include and have_exclude:
179  return not_(or_(*excludes).self_group())
180 
181  # Case 4 - Domain and/or glob includes (may also have excludes)
182  # - Entity listed in entities include: include
183  # - Otherwise, entity listed in entities exclude: exclude
184  # - Otherwise, entity matches glob include: include
185  # - Otherwise, entity matches glob exclude: exclude
186  # - Otherwise, entity matches domain include: include
187  # - Otherwise: exclude
188  if self._included_domains_included_domains or self._included_entity_globs_included_entity_globs:
189  return or_(
190  i_entities,
191  (~e_entities & (i_entity_globs | (~e_entity_globs & i_domains))),
192  ).self_group()
193 
194  # Case 5 - Domain and/or glob excludes (no domain and/or glob includes)
195  # - Entity listed in entities include: include
196  # - Otherwise, entity listed in exclude: exclude
197  # - Otherwise, entity matches glob exclude: exclude
198  # - Otherwise, entity matches domain exclude: exclude
199  # - Otherwise: include
200  if self._excluded_domains_excluded_domains or self._excluded_entity_globs_excluded_entity_globs:
201  return (not_(or_(*excludes)) | i_entities).self_group()
202 
203  # Case 6 - No Domain and/or glob includes or excludes
204  # - Entity listed in entities include: include
205  # - Otherwise: exclude
206  return i_entities
207 
208  def states_entity_filter(self) -> ColumnElement:
209  """Generate the States.entity_id filter query.
210 
211  This is no longer used except by the legacy queries.
212  """
213 
214  def _encoder(data: Any) -> Any:
215  """Nothing to encode for states since there is no json."""
216  return data
217 
218  # The type annotation should be improved so the type ignore can be removed
219  return self._generate_filter_for_columns_generate_filter_for_columns((States.entity_id,), _encoder) # type: ignore[arg-type]
220 
221  def states_metadata_entity_filter(self) -> ColumnElement:
222  """Generate the StatesMeta.entity_id filter query."""
223 
224  def _encoder(data: Any) -> Any:
225  """Nothing to encode for states since there is no json."""
226  return data
227 
228  # The type annotation should be improved so the type ignore can be removed
229  return self._generate_filter_for_columns_generate_filter_for_columns((StatesMeta.entity_id,), _encoder) # type: ignore[arg-type]
230 
231  def events_entity_filter(self) -> ColumnElement:
232  """Generate the entity filter query."""
233  _encoder = json_dumps
234  return or_(
235  # sqlalchemy's SQLite json implementation always
236  # wraps everything with JSON_QUOTE so it resolves to 'null'
237  # when its empty
238  #
239  # For MySQL and PostgreSQL it will resolve to a literal
240  # NULL when its empty
241  #
242  ((ENTITY_ID_IN_EVENT == JSON_NULL) | ENTITY_ID_IN_EVENT.is_(None))
243  & (
244  (OLD_ENTITY_ID_IN_EVENT == JSON_NULL) | OLD_ENTITY_ID_IN_EVENT.is_(None)
245  ),
246  # Needs https://github.com/bdraco/home-assistant/commit/bba91945006a46f3a01870008eb048e4f9cbb1ef
247  self._generate_filter_for_columns_generate_filter_for_columns(
248  (ENTITY_ID_IN_EVENT, OLD_ENTITY_ID_IN_EVENT), # type: ignore[arg-type]
249  _encoder,
250  ).self_group(),
251  )
252 
253 
255  glob_strs: Iterable[str], columns: Iterable[Column], encoder: Callable[[Any], Any]
256 ) -> ColumnElement:
257  """Translate glob to sql."""
258  matchers = [
259  (
260  column.is_not(None)
261  & cast(column, Text()).like(
262  encoder(glob_str).translate(GLOB_TO_SQL_CHARS), escape="\\"
263  )
264  )
265  for glob_str in glob_strs
266  for column in columns
267  ]
268  return or_(*matchers) if matchers else or_(False)
269 
270 
272  entity_ids: Iterable[str], columns: Iterable[Column], encoder: Callable[[Any], Any]
273 ) -> ColumnElement:
274  matchers = [
275  (
276  column.is_not(None)
277  & cast(column, Text()).in_([encoder(entity_id) for entity_id in entity_ids])
278  )
279  for column in columns
280  ]
281  return or_(*matchers) if matchers else or_(False)
282 
283 
285  domains: Iterable[str], columns: Iterable[Column], encoder: Callable[[Any], Any]
286 ) -> ColumnElement:
287  matchers = [
288  (column.is_not(None) & cast(column, Text()).like(encoder(domain_matcher)))
289  for domain_matcher in like_domain_matchers(domains)
290  for column in columns
291  ]
292  return or_(*matchers) if matchers else or_(False)
293 
294 
295 def like_domain_matchers(domains: Iterable[str]) -> list[str]:
296  """Convert a list of domains to sql LIKE matchers."""
297  return [f"{domain}.%" for domain in domains]
None __init__(self, Collection[str]|None excluded_entities=None, Collection[str]|None excluded_domains=None, Collection[str]|None excluded_entity_globs=None, Collection[str]|None included_entities=None, Collection[str]|None included_domains=None, Collection[str]|None included_entity_globs=None)
Definition: filters.py:95
ColumnElement _generate_filter_for_columns(self, Iterable[Column] columns, Callable[[Any], Any] encoder)
Definition: filters.py:140
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
ColumnElement _globs_to_like(Iterable[str] glob_strs, Iterable[Column] columns, Callable[[Any], Any] encoder)
Definition: filters.py:256
ColumnElement _domain_matcher(Iterable[str] domains, Iterable[Column] columns, Callable[[Any], Any] encoder)
Definition: filters.py:286
dict[str, Any] merge_include_exclude_filters(dict[str, Any] base_filter, dict[str, Any] add_filter)
Definition: filters.py:50
dict[str, Any] extract_include_exclude_filter_conf(ConfigType conf)
Definition: filters.py:34
list[str] like_domain_matchers(Iterable[str] domains)
Definition: filters.py:295
Filters|None sqlalchemy_filter_from_include_exclude_conf(ConfigType conf)
Definition: filters.py:65
ColumnElement _entity_matcher(Iterable[str] entity_ids, Iterable[Column] columns, Callable[[Any], Any] encoder)
Definition: filters.py:273
str json_dumps(Any data)
Definition: json.py:149