Home Assistant Unofficial Reference 2024.12.1
match.py
Go to the documentation of this file.
1 """The bluetooth integration matchers."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from dataclasses import dataclass
7 from fnmatch import translate
8 from functools import lru_cache
9 import re
10 from typing import TYPE_CHECKING, Final, TypedDict
11 
12 from lru import LRU
13 
14 from homeassistant.core import callback
15 from homeassistant.loader import BluetoothMatcher, BluetoothMatcherOptional
16 
17 from .models import BluetoothCallback, BluetoothServiceInfoBleak
18 
19 if TYPE_CHECKING:
20  from bleak.backends.scanner import AdvertisementData
21 
22 
23 MAX_REMEMBER_ADDRESSES: Final = 2048
24 
25 CALLBACK: Final = "callback"
26 DOMAIN: Final = "domain"
27 ADDRESS: Final = "address"
28 CONNECTABLE: Final = "connectable"
29 LOCAL_NAME: Final = "local_name"
30 SERVICE_UUID: Final = "service_uuid"
31 SERVICE_DATA_UUID: Final = "service_data_uuid"
32 MANUFACTURER_ID: Final = "manufacturer_id"
33 MANUFACTURER_DATA_START: Final = "manufacturer_data_start"
34 
35 LOCAL_NAME_MIN_MATCH_LENGTH = 3
36 
37 
38 class BluetoothCallbackMatcherOptional(TypedDict, total=False):
39  """Matcher for the bluetooth integration for callback optional fields."""
40 
41  address: str
42 
43 
45  BluetoothMatcherOptional,
46  BluetoothCallbackMatcherOptional,
47 ):
48  """Callback matcher for the bluetooth integration."""
49 
50 
52  """Callback for the bluetooth integration."""
53 
54  callback: BluetoothCallback
55 
56 
58  _BluetoothCallbackMatcherWithCallback,
59  BluetoothCallbackMatcher,
60 ):
61  """Callback matcher for the bluetooth integration that stores the callback."""
62 
63 
64 @dataclass(slots=True, frozen=False)
66  """Track which fields have been seen."""
67 
68  manufacturer_data: bool
69  service_data: set[str]
70  service_uuids: set[str]
71 
72 
74  previous_match: IntegrationMatchHistory, advertisement_data: AdvertisementData
75 ) -> bool:
76  """Return if we have seen all fields."""
77  if not previous_match.manufacturer_data and advertisement_data.manufacturer_data:
78  return False
79  if advertisement_data.service_data and (
80  not previous_match.service_data
81  or not previous_match.service_data.issuperset(advertisement_data.service_data)
82  ):
83  return False
84  if advertisement_data.service_uuids and (
85  not previous_match.service_uuids
86  or not previous_match.service_uuids.issuperset(advertisement_data.service_uuids)
87  ):
88  return False
89  return True
90 
91 
93  """Integration matcher for the bluetooth integration."""
94 
95  __slots__ = ("_integration_matchers", "_matched", "_matched_connectable", "_index")
96 
97  def __init__(self, integration_matchers: list[BluetoothMatcher]) -> None:
98  """Initialize the matcher."""
99  self._integration_matchers_integration_matchers = integration_matchers
100  # Some devices use a random address so we need to use
101  # an LRU to avoid memory issues.
102  self._matched: LRU[str, IntegrationMatchHistory] = LRU(MAX_REMEMBER_ADDRESSES)
103  self._matched_connectable: LRU[str, IntegrationMatchHistory] = LRU(
104  MAX_REMEMBER_ADDRESSES
105  )
107 
108  @callback
109  def async_setup(self) -> None:
110  """Set up the matcher."""
111  for matcher in self._integration_matchers_integration_matchers:
112  self._index_index.add(matcher)
113  self._index_index.build()
114 
115  def async_clear_address(self, address: str) -> None:
116  """Clear the history matches for a set of domains."""
117  self._matched.pop(address, None)
118  self._matched_connectable.pop(address, None)
119 
120  def match_domains(self, service_info: BluetoothServiceInfoBleak) -> set[str]:
121  """Return the domains that are matched."""
122  device = service_info.device
123  advertisement_data = service_info.advertisement
124  connectable = service_info.connectable
125  matched = self._matched_connectable if connectable else self._matched
126  matched_domains: set[str] = set()
127  if (previous_match := matched.get(device.address)) and seen_all_fields(
128  previous_match, advertisement_data
129  ):
130  # We have seen all fields so we can skip the rest of the matchers
131  return matched_domains
132  matched_domains = {
133  matcher[DOMAIN] for matcher in self._index_index.match(service_info)
134  }
135  if not matched_domains:
136  return matched_domains
137  if previous_match:
138  previous_match.manufacturer_data |= bool(
139  advertisement_data.manufacturer_data
140  )
141  previous_match.service_data |= set(advertisement_data.service_data)
142  previous_match.service_uuids |= set(advertisement_data.service_uuids)
143  else:
144  matched[device.address] = IntegrationMatchHistory(
145  manufacturer_data=bool(advertisement_data.manufacturer_data),
146  service_data=set(advertisement_data.service_data),
147  service_uuids=set(advertisement_data.service_uuids),
148  )
149  return matched_domains
150 
151 
153  _T: (BluetoothMatcher, BluetoothCallbackMatcherWithCallback)
154 ]:
155  """Bluetooth matcher base for the bluetooth integration.
156 
157  The indexer puts each matcher in the bucket that it is most
158  likely to match. This allows us to only check the service infos
159  against each bucket to see if we should match against the data.
160 
161  This is optimized for cases when no service infos will be matched in
162  any bucket and we can quickly reject the service info as not matching.
163  """
164 
165  __slots__ = (
166  "local_name",
167  "service_uuid",
168  "service_data_uuid",
169  "manufacturer_id",
170  "service_uuid_set",
171  "service_data_uuid_set",
172  "manufacturer_id_set",
173  )
174 
175  def __init__(self) -> None:
176  """Initialize the matcher index."""
177  self.local_name: defaultdict[str, list[_T]] = defaultdict(list)
178  self.service_uuid: defaultdict[str, list[_T]] = defaultdict(list)
179  self.service_data_uuid: defaultdict[str, list[_T]] = defaultdict(list)
180  self.manufacturer_id: defaultdict[int, list[_T]] = defaultdict(list)
181  self.service_uuid_set: set[str] = set()
182  self.service_data_uuid_set: set[str] = set()
183  self.manufacturer_id_set: set[int] = set()
184 
185  def add(self, matcher: _T) -> bool:
186  """Add a matcher to the index.
187 
188  Matchers must end up only in one bucket.
189 
190  We put them in the bucket that they are most likely to match.
191  """
192  # Local name is the cheapest to match since its just a dict lookup
193  if LOCAL_NAME in matcher:
194  self.local_name[_local_name_to_index_key(matcher[LOCAL_NAME])].append(
195  matcher
196  )
197  return True
198 
199  # Manufacturer data is 2nd cheapest since its all ints
200  if MANUFACTURER_ID in matcher:
201  self.manufacturer_id[matcher[MANUFACTURER_ID]].append(matcher)
202  return True
203 
204  if SERVICE_UUID in matcher:
205  self.service_uuid[matcher[SERVICE_UUID]].append(matcher)
206  return True
207 
208  if SERVICE_DATA_UUID in matcher:
209  self.service_data_uuid[matcher[SERVICE_DATA_UUID]].append(matcher)
210  return True
211 
212  return False
213 
214  def remove(self, matcher: _T) -> bool:
215  """Remove a matcher from the index.
216 
217  Matchers only end up in one bucket, so once we have
218  removed one, we are done.
219  """
220  if LOCAL_NAME in matcher:
221  self.local_name[_local_name_to_index_key(matcher[LOCAL_NAME])].remove(
222  matcher
223  )
224  return True
225 
226  if MANUFACTURER_ID in matcher:
227  self.manufacturer_id[matcher[MANUFACTURER_ID]].remove(matcher)
228  return True
229 
230  if SERVICE_UUID in matcher:
231  self.service_uuid[matcher[SERVICE_UUID]].remove(matcher)
232  return True
233 
234  if SERVICE_DATA_UUID in matcher:
235  self.service_data_uuid[matcher[SERVICE_DATA_UUID]].remove(matcher)
236  return True
237 
238  return False
239 
240  def build(self) -> None:
241  """Rebuild the index sets."""
242  self.service_uuid_set = set(self.service_uuid)
243  self.service_data_uuid_set = set(self.service_data_uuid)
244  self.manufacturer_id_set = set(self.manufacturer_id)
245 
246  def match(self, service_info: BluetoothServiceInfoBleak) -> list[_T]:
247  """Check for a match."""
248  matches: list[_T] = []
249  if (name := service_info.name) and (
250  local_name_matchers := self.local_name.get(
251  name[:LOCAL_NAME_MIN_MATCH_LENGTH]
252  )
253  ):
254  matches.extend(
255  matcher
256  for matcher in local_name_matchers
257  if ble_device_matches(matcher, service_info)
258  )
259 
260  if (
261  (service_data_uuid_set := self.service_data_uuid_set)
262  and (service_data := service_info.service_data)
263  and (matched_uuids := service_data_uuid_set.intersection(service_data))
264  ):
265  matches.extend(
266  matcher
267  for service_data_uuid in matched_uuids
268  for matcher in self.service_data_uuid[service_data_uuid]
269  if ble_device_matches(matcher, service_info)
270  )
271 
272  if (
273  (manufacturer_id_set := self.manufacturer_id_set)
274  and (manufacturer_data := service_info.manufacturer_data)
275  and (matched_ids := manufacturer_id_set.intersection(manufacturer_data))
276  ):
277  matches.extend(
278  matcher
279  for manufacturer_id in matched_ids
280  for matcher in self.manufacturer_id[manufacturer_id]
281  if ble_device_matches(matcher, service_info)
282  )
283 
284  if (
285  (service_uuid_set := self.service_uuid_set)
286  and (service_uuids := service_info.service_uuids)
287  and (matched_uuids := service_uuid_set.intersection(service_uuids))
288  ):
289  matches.extend(
290  matcher
291  for service_uuid in matched_uuids
292  for matcher in self.service_uuid[service_uuid]
293  if ble_device_matches(matcher, service_info)
294  )
295 
296  return matches
297 
298 
300  """Bluetooth matcher for the bluetooth integration."""
301 
302 
304  BluetoothMatcherIndexBase[BluetoothCallbackMatcherWithCallback]
305 ):
306  """Bluetooth matcher for the bluetooth integration.
307 
308  Supports matching on addresses.
309  """
310 
311  __slots__ = ("address", "connectable")
312 
313  def __init__(self) -> None:
314  """Initialize the matcher index."""
315  super().__init__()
316  self.address: defaultdict[str, list[BluetoothCallbackMatcherWithCallback]] = (
317  defaultdict(list)
318  )
319  self.connectable: list[BluetoothCallbackMatcherWithCallback] = []
320 
322  self, matcher: BluetoothCallbackMatcherWithCallback
323  ) -> None:
324  """Add a matcher to the index.
325 
326  Matchers must end up only in one bucket.
327 
328  We put them in the bucket that they are most likely to match.
329  """
330  if ADDRESS in matcher:
331  self.address[matcher[ADDRESS]].append(matcher)
332  return
333 
334  if super().add(matcher):
335  self.build()
336  return
337 
338  if CONNECTABLE in matcher:
339  self.connectable.append(matcher)
340  return
341 
343  self, matcher: BluetoothCallbackMatcherWithCallback
344  ) -> None:
345  """Remove a matcher from the index.
346 
347  Matchers only end up in one bucket, so once we have
348  removed one, we are done.
349  """
350  if ADDRESS in matcher:
351  self.address[matcher[ADDRESS]].remove(matcher)
352  return
353 
354  if super().remove(matcher):
355  self.build()
356  return
357 
358  if CONNECTABLE in matcher:
359  self.connectable.remove(matcher)
360  return
361 
363  self, service_info: BluetoothServiceInfoBleak
364  ) -> list[BluetoothCallbackMatcherWithCallback]:
365  """Check for a match."""
366  matches = self.match(service_info)
367  for matcher in self.address.get(service_info.address, []):
368  if ble_device_matches(matcher, service_info):
369  matches.append(matcher)
370  for matcher in self.connectable:
371  if ble_device_matches(matcher, service_info):
372  matches.append(matcher)
373  return matches
374 
375 
376 def _local_name_to_index_key(local_name: str) -> str:
377  """Convert a local name to an index.
378 
379  We check the local name matchers here and raise a ValueError
380  if they try to setup a matcher that will is overly broad
381  as would match too many devices and cause a performance hit.
382  """
383  match_part = local_name[:LOCAL_NAME_MIN_MATCH_LENGTH]
384  if "*" in match_part or "[" in match_part:
385  raise ValueError(
386  "Local name matchers may not have patterns in the first "
387  f"{LOCAL_NAME_MIN_MATCH_LENGTH} characters because they "
388  f"would match too broadly ({local_name})"
389  )
390  return match_part
391 
392 
394  matcher: BluetoothMatcherOptional,
395  service_info: BluetoothServiceInfoBleak,
396 ) -> bool:
397  """Check if a ble device and advertisement_data matches the matcher."""
398  # Don't check address here since all callers already
399  # check the address and we don't want to double check
400  # since it would result in an unreachable reject case.
401  if matcher.get(CONNECTABLE, True) and not service_info.connectable:
402  return False
403 
404  if (
405  service_uuid := matcher.get(SERVICE_UUID)
406  ) and service_uuid not in service_info.service_uuids:
407  return False
408 
409  if (
410  service_data_uuid := matcher.get(SERVICE_DATA_UUID)
411  ) and service_data_uuid not in service_info.service_data:
412  return False
413 
414  if manufacturer_id := matcher.get(MANUFACTURER_ID):
415  if manufacturer_id not in service_info.manufacturer_data:
416  return False
417 
418  if manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START):
419  if not service_info.manufacturer_data[manufacturer_id].startswith(
420  bytes(manufacturer_data_start)
421  ):
422  return False
423 
424  if (local_name := matcher.get(LOCAL_NAME)) and not _memorized_fnmatch(
425  service_info.name,
426  local_name,
427  ):
428  return False
429 
430  return True
431 
432 
433 @lru_cache(maxsize=4096, typed=True)
434 def _compile_fnmatch(pattern: str) -> re.Pattern:
435  """Compile a fnmatch pattern."""
436  return re.compile(translate(pattern))
437 
438 
439 @lru_cache(maxsize=1024, typed=True)
440 def _memorized_fnmatch(name: str, pattern: str) -> bool:
441  """Memorized version of fnmatch that has a larger lru_cache.
442 
443  The default version of fnmatch only has a lru_cache of 256 entries.
444  With many devices we quickly reach that limit and end up compiling
445  the same pattern over and over again.
446 
447  Bluetooth has its own memorized fnmatch with its own lru_cache
448  since the data is going to be relatively the same
449  since the devices will not change frequently.
450  """
451  return bool(_compile_fnmatch(pattern).match(name))
None remove_callback_matcher(self, BluetoothCallbackMatcherWithCallback matcher)
Definition: match.py:344
list[BluetoothCallbackMatcherWithCallback] match_callbacks(self, BluetoothServiceInfoBleak service_info)
Definition: match.py:364
None add_callback_matcher(self, BluetoothCallbackMatcherWithCallback matcher)
Definition: match.py:323
None __init__(self, list[BluetoothMatcher] integration_matchers)
Definition: match.py:97
set[str] match_domains(self, BluetoothServiceInfoBleak service_info)
Definition: match.py:120
bool seen_all_fields(IntegrationMatchHistory previous_match, AdvertisementData advertisement_data)
Definition: match.py:75
bool ble_device_matches(BluetoothMatcherOptional matcher, BluetoothServiceInfoBleak service_info)
Definition: match.py:396
re.Pattern _compile_fnmatch(str pattern)
Definition: match.py:434
bool _memorized_fnmatch(str name, str pattern)
Definition: match.py:440
bool add(self, _T matcher)
Definition: match.py:185
list[_T] match(self, BluetoothServiceInfoBleak service_info)
Definition: match.py:246
str _local_name_to_index_key(str local_name)
Definition: match.py:376
bool remove(self, _T matcher)
Definition: match.py:214
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88