1 """The bluetooth integration matchers."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from dataclasses
import dataclass
7 from fnmatch
import translate
8 from functools
import lru_cache
10 from typing
import TYPE_CHECKING, Final, TypedDict
17 from .models
import BluetoothCallback, BluetoothServiceInfoBleak
20 from bleak.backends.scanner
import AdvertisementData
23 MAX_REMEMBER_ADDRESSES: Final = 2048
25 CALLBACK: Final =
"callback"
26 DOMAIN: Final =
"domain"
27 ADDRESS: Final =
"address"
28 CONNECTABLE: Final =
"connectable"
29 LOCAL_NAME: Final =
"local_name"
30 SERVICE_UUID: Final =
"service_uuid"
31 SERVICE_DATA_UUID: Final =
"service_data_uuid"
32 MANUFACTURER_ID: Final =
"manufacturer_id"
33 MANUFACTURER_DATA_START: Final =
"manufacturer_data_start"
35 LOCAL_NAME_MIN_MATCH_LENGTH = 3
39 """Matcher for the bluetooth integration for callback optional fields."""
45 BluetoothMatcherOptional,
46 BluetoothCallbackMatcherOptional,
48 """Callback matcher for the bluetooth integration."""
52 """Callback for the bluetooth integration."""
54 callback: BluetoothCallback
58 _BluetoothCallbackMatcherWithCallback,
59 BluetoothCallbackMatcher,
61 """Callback matcher for the bluetooth integration that stores the callback."""
64 @dataclass(slots=True, frozen=False)
66 """Track which fields have been seen."""
68 manufacturer_data: bool
69 service_data: set[str]
70 service_uuids: set[str]
74 previous_match: IntegrationMatchHistory, advertisement_data: AdvertisementData
76 """Return if we have seen all fields."""
77 if not previous_match.manufacturer_data
and advertisement_data.manufacturer_data:
79 if advertisement_data.service_data
and (
80 not previous_match.service_data
81 or not previous_match.service_data.issuperset(advertisement_data.service_data)
84 if advertisement_data.service_uuids
and (
85 not previous_match.service_uuids
86 or not previous_match.service_uuids.issuperset(advertisement_data.service_uuids)
93 """Integration matcher for the bluetooth integration."""
95 __slots__ = (
"_integration_matchers",
"_matched",
"_matched_connectable",
"_index")
97 def __init__(self, integration_matchers: list[BluetoothMatcher]) ->
None:
98 """Initialize the matcher."""
102 self._matched: LRU[str, IntegrationMatchHistory] = LRU(MAX_REMEMBER_ADDRESSES)
103 self._matched_connectable: LRU[str, IntegrationMatchHistory] = LRU(
104 MAX_REMEMBER_ADDRESSES
110 """Set up the matcher."""
116 """Clear the history matches for a set of domains."""
117 self._matched.pop(address,
None)
118 self._matched_connectable.pop(address,
None)
120 def match_domains(self, service_info: BluetoothServiceInfoBleak) -> set[str]:
121 """Return the domains that are matched."""
122 device = service_info.device
123 advertisement_data = service_info.advertisement
124 connectable = service_info.connectable
125 matched = self._matched_connectable
if connectable
else self._matched
126 matched_domains: set[str] = set()
127 if (previous_match := matched.get(device.address))
and seen_all_fields(
128 previous_match, advertisement_data
131 return matched_domains
133 matcher[DOMAIN]
for matcher
in self.
_index_index.
match(service_info)
135 if not matched_domains:
136 return matched_domains
138 previous_match.manufacturer_data |= bool(
139 advertisement_data.manufacturer_data
141 previous_match.service_data |= set(advertisement_data.service_data)
142 previous_match.service_uuids |= set(advertisement_data.service_uuids)
145 manufacturer_data=bool(advertisement_data.manufacturer_data),
146 service_data=set(advertisement_data.service_data),
147 service_uuids=set(advertisement_data.service_uuids),
149 return matched_domains
153 _T: (BluetoothMatcher, BluetoothCallbackMatcherWithCallback)
155 """Bluetooth matcher base for the bluetooth integration.
157 The indexer puts each matcher in the bucket that it is most
158 likely to match. This allows us to only check the service infos
159 against each bucket to see if we should match against the data.
161 This is optimized for cases when no service infos will be matched in
162 any bucket and we can quickly reject the service info as not matching.
171 "service_data_uuid_set",
172 "manufacturer_id_set",
176 """Initialize the matcher index."""
177 self.local_name: defaultdict[str, list[_T]] = defaultdict(list)
178 self.service_uuid: defaultdict[str, list[_T]] = defaultdict(list)
179 self.service_data_uuid: defaultdict[str, list[_T]] = defaultdict(list)
180 self.manufacturer_id: defaultdict[int, list[_T]] = defaultdict(list)
181 self.service_uuid_set: set[str] = set()
182 self.service_data_uuid_set: set[str] = set()
183 self.manufacturer_id_set: set[int] = set()
185 def add(self, matcher: _T) -> bool:
186 """Add a matcher to the index.
188 Matchers must end up only in one bucket.
190 We put them in the bucket that they are most likely to match.
193 if LOCAL_NAME
in matcher:
200 if MANUFACTURER_ID
in matcher:
201 self.manufacturer_id[matcher[MANUFACTURER_ID]].append(matcher)
204 if SERVICE_UUID
in matcher:
205 self.service_uuid[matcher[SERVICE_UUID]].append(matcher)
208 if SERVICE_DATA_UUID
in matcher:
209 self.service_data_uuid[matcher[SERVICE_DATA_UUID]].append(matcher)
215 """Remove a matcher from the index.
217 Matchers only end up in one bucket, so once we have
218 removed one, we are done.
220 if LOCAL_NAME
in matcher:
226 if MANUFACTURER_ID
in matcher:
227 self.manufacturer_id[matcher[MANUFACTURER_ID]].
remove(matcher)
230 if SERVICE_UUID
in matcher:
231 self.service_uuid[matcher[SERVICE_UUID]].
remove(matcher)
234 if SERVICE_DATA_UUID
in matcher:
235 self.service_data_uuid[matcher[SERVICE_DATA_UUID]].
remove(matcher)
241 """Rebuild the index sets."""
242 self.service_uuid_set = set(self.service_uuid)
243 self.service_data_uuid_set = set(self.service_data_uuid)
244 self.manufacturer_id_set = set(self.manufacturer_id)
246 def match(self, service_info: BluetoothServiceInfoBleak) -> list[_T]:
247 """Check for a match."""
248 matches: list[_T] = []
249 if (name := service_info.name)
and (
250 local_name_matchers := self.local_name.
get(
251 name[:LOCAL_NAME_MIN_MATCH_LENGTH]
256 for matcher
in local_name_matchers
261 (service_data_uuid_set := self.service_data_uuid_set)
262 and (service_data := service_info.service_data)
263 and (matched_uuids := service_data_uuid_set.intersection(service_data))
267 for service_data_uuid
in matched_uuids
268 for matcher
in self.service_data_uuid[service_data_uuid]
273 (manufacturer_id_set := self.manufacturer_id_set)
274 and (manufacturer_data := service_info.manufacturer_data)
275 and (matched_ids := manufacturer_id_set.intersection(manufacturer_data))
279 for manufacturer_id
in matched_ids
280 for matcher
in self.manufacturer_id[manufacturer_id]
285 (service_uuid_set := self.service_uuid_set)
286 and (service_uuids := service_info.service_uuids)
287 and (matched_uuids := service_uuid_set.intersection(service_uuids))
291 for service_uuid
in matched_uuids
292 for matcher
in self.service_uuid[service_uuid]
300 """Bluetooth matcher for the bluetooth integration."""
304 BluetoothMatcherIndexBase[BluetoothCallbackMatcherWithCallback]
306 """Bluetooth matcher for the bluetooth integration.
308 Supports matching on addresses.
311 __slots__ = (
"address",
"connectable")
314 """Initialize the matcher index."""
316 self.address: defaultdict[str, list[BluetoothCallbackMatcherWithCallback]] = (
319 self.connectable: list[BluetoothCallbackMatcherWithCallback] = []
322 self, matcher: BluetoothCallbackMatcherWithCallback
324 """Add a matcher to the index.
326 Matchers must end up only in one bucket.
328 We put them in the bucket that they are most likely to match.
330 if ADDRESS
in matcher:
331 self.address[matcher[ADDRESS]].append(matcher)
334 if super().
add(matcher):
338 if CONNECTABLE
in matcher:
339 self.connectable.append(matcher)
343 self, matcher: BluetoothCallbackMatcherWithCallback
345 """Remove a matcher from the index.
347 Matchers only end up in one bucket, so once we have
348 removed one, we are done.
350 if ADDRESS
in matcher:
351 self.address[matcher[ADDRESS]].
remove(matcher)
354 if super().
remove(matcher):
358 if CONNECTABLE
in matcher:
359 self.connectable.
remove(matcher)
363 self, service_info: BluetoothServiceInfoBleak
364 ) -> list[BluetoothCallbackMatcherWithCallback]:
365 """Check for a match."""
366 matches = self.match(service_info)
367 for matcher
in self.address.
get(service_info.address, []):
369 matches.append(matcher)
370 for matcher
in self.connectable:
372 matches.append(matcher)
377 """Convert a local name to an index.
379 We check the local name matchers here and raise a ValueError
380 if they try to setup a matcher that will is overly broad
381 as would match too many devices and cause a performance hit.
383 match_part = local_name[:LOCAL_NAME_MIN_MATCH_LENGTH]
384 if "*" in match_part
or "[" in match_part:
386 "Local name matchers may not have patterns in the first "
387 f
"{LOCAL_NAME_MIN_MATCH_LENGTH} characters because they "
388 f
"would match too broadly ({local_name})"
394 matcher: BluetoothMatcherOptional,
395 service_info: BluetoothServiceInfoBleak,
397 """Check if a ble device and advertisement_data matches the matcher."""
401 if matcher.get(CONNECTABLE,
True)
and not service_info.connectable:
405 service_uuid := matcher.get(SERVICE_UUID)
406 )
and service_uuid
not in service_info.service_uuids:
410 service_data_uuid := matcher.get(SERVICE_DATA_UUID)
411 )
and service_data_uuid
not in service_info.service_data:
414 if manufacturer_id := matcher.get(MANUFACTURER_ID):
415 if manufacturer_id
not in service_info.manufacturer_data:
418 if manufacturer_data_start := matcher.get(MANUFACTURER_DATA_START):
419 if not service_info.manufacturer_data[manufacturer_id].startswith(
420 bytes(manufacturer_data_start)
433 @lru_cache(maxsize=4096, typed=True)
435 """Compile a fnmatch pattern."""
436 return re.compile(translate(pattern))
439 @lru_cache(maxsize=1024, typed=True)
441 """Memorized version of fnmatch that has a larger lru_cache.
443 The default version of fnmatch only has a lru_cache of 256 entries.
444 With many devices we quickly reach that limit and end up compiling
445 the same pattern over and over again.
447 Bluetooth has its own memorized fnmatch with its own lru_cache
448 since the data is going to be relatively the same
449 since the devices will not change frequently.
None remove_callback_matcher(self, BluetoothCallbackMatcherWithCallback matcher)
list[BluetoothCallbackMatcherWithCallback] match_callbacks(self, BluetoothServiceInfoBleak service_info)
None add_callback_matcher(self, BluetoothCallbackMatcherWithCallback matcher)
None __init__(self, list[BluetoothMatcher] integration_matchers)
None async_clear_address(self, str address)
set[str] match_domains(self, BluetoothServiceInfoBleak service_info)
bool seen_all_fields(IntegrationMatchHistory previous_match, AdvertisementData advertisement_data)
bool ble_device_matches(BluetoothMatcherOptional matcher, BluetoothServiceInfoBleak service_info)
re.Pattern _compile_fnmatch(str pattern)
bool _memorized_fnmatch(str name, str pattern)
bool add(self, _T matcher)
list[_T] match(self, BluetoothServiceInfoBleak service_info)
str _local_name_to_index_key(str local_name)
bool remove(self, _T matcher)
web.Response get(self, web.Request request, str config_key)