1 """Passive update processors for the Bluetooth integration."""
3 from __future__
import annotations
6 from datetime
import timedelta
7 from functools
import cache
9 from typing
import TYPE_CHECKING, Any, Self, TypedDict, cast
11 from habluetooth
import BluetoothScanningMode
13 from homeassistant
import config_entries
19 EVENT_HOMEASSISTANT_STOP,
31 from .const
import DOMAIN
32 from .update_coordinator
import BasePassiveBluetoothCoordinator
35 from collections.abc
import Callable
39 from .models
import BluetoothChange, BluetoothServiceInfoBleak
41 STORAGE_KEY =
"bluetooth.passive_update_processor"
44 PASSIVE_UPDATE_PROCESSOR =
"passive_update_processor"
47 @dataclasses.dataclass(slots=True, frozen=True)
49 """Key for a passive bluetooth entity.
53 device_id: outdoor_sensor_1
61 """Convert the key to a string which can be used as JSON key."""
62 return f
"{self.key}___{self.device_id or ''}"
66 """Convert a string (from JSON) to a key."""
67 key, device_id = key.split(
"___")
68 return cls(key, device_id
or None)
71 @dataclasses.dataclass(slots=True, frozen=False)
73 """Data for the passive bluetooth processor."""
75 coordinators: set[PassiveBluetoothProcessorCoordinator[Any]]
76 all_restore_data: dict[str, dict[str, RestoredPassiveBluetoothDataUpdate]]
80 """Restored PassiveBluetoothDataUpdate."""
82 devices: dict[str, DeviceInfo]
83 entity_descriptions: dict[str, dict[str, Any]]
84 entity_names: dict[str, str |
None]
85 entity_data: dict[str, Any]
90 cached_fields = cache(dataclasses.fields)
94 descriptions_class: type[EntityDescription], data: dict[str, Any]
95 ) -> EntityDescription:
96 """Deserialize an entity description."""
97 result: dict[str, Any] = {}
98 if hasattr(descriptions_class,
"_dataclass"):
99 descriptions_class = descriptions_class._dataclass
101 field_name = field.name
107 if field_name == CONF_ENTITY_CATEGORY:
108 value = try_parse_enum(EntityCategory, data.get(field_name))
110 value = data.get(field_name)
111 result[field_name] = value
112 return descriptions_class(**result)
116 """Serialize an entity description."""
120 if (value := getattr(description, field.name)) != field.default
124 @dataclasses.dataclass(slots=True, frozen=False)
126 """Generic bluetooth data."""
128 devices: dict[str |
None, DeviceInfo] = dataclasses.field(default_factory=dict)
129 entity_descriptions: dict[PassiveBluetoothEntityKey, EntityDescription] = (
130 dataclasses.field(default_factory=dict)
132 entity_names: dict[PassiveBluetoothEntityKey, str |
None] = dataclasses.field(
135 entity_data: dict[PassiveBluetoothEntityKey, _T] = dataclasses.field(
140 self, new_data: PassiveBluetoothDataUpdate[_T]
141 ) -> set[PassiveBluetoothEntityKey] |
None:
142 """Update the data and returned changed PassiveBluetoothEntityKey or None on device change.
144 The changed PassiveBluetoothEntityKey can be used to filter
145 which listeners are called.
147 device_change =
False
148 changed_entity_keys: set[PassiveBluetoothEntityKey] = set()
149 for device_key, device_info
in new_data.devices.items():
150 if device_change
or self.devices.
get(device_key, UNDEFINED) != device_info:
152 self.devices[device_key] = device_info
153 for incoming, current
in (
154 (new_data.entity_descriptions, self.entity_descriptions),
155 (new_data.entity_names, self.entity_names),
156 (new_data.entity_data, self.entity_data),
158 for key, data
in incoming.items():
159 if current.get(key, UNDEFINED) != data:
160 changed_entity_keys.add(key)
164 return None if device_change
else changed_entity_keys
167 """Serialize restore data to storage."""
170 key
or "": device_info
for key, device_info
in self.devices.items()
172 "entity_descriptions": {
174 for key, description
in self.entity_descriptions.items()
177 key.to_string(): name
for key, name
in self.entity_names.items()
180 key.to_string(): data
for key, data
in self.entity_data.items()
187 restore_data: RestoredPassiveBluetoothDataUpdate,
188 entity_description_class: type[EntityDescription],
190 """Set the restored data from storage."""
193 key
or None: device_info
194 for key, device_info
in restore_data[
"devices"].items()
197 self.entity_descriptions.
update(
199 PassiveBluetoothEntityKey.from_string(
202 for key, description
in restore_data[
"entity_descriptions"].items()
208 PassiveBluetoothEntityKey.from_string(key): name
209 for key, name
in restore_data[
"entity_names"].items()
214 PassiveBluetoothEntityKey.from_string(key): cast(_T, data)
215 for key, data
in restore_data[
"entity_data"].items()
221 hass: HomeAssistant, coordinator: PassiveBluetoothProcessorCoordinator[Any]
223 """Register a coordinator to have its processors data restored."""
224 data: PassiveBluetoothProcessorData = hass.data[PASSIVE_UPDATE_PROCESSOR]
225 coordinators = data.coordinators
226 coordinators.add(coordinator)
227 if restore_key := coordinator.restore_key:
228 coordinator.restore_data = data.all_restore_data.setdefault(restore_key, {})
231 def _unregister_coordinator_for_restore() -> None:
232 """Unregister a coordinator."""
233 coordinators.remove(coordinator)
235 return _unregister_coordinator_for_restore
239 """Set up the passive update processor coordinators."""
240 storage: Store[dict[str, dict[str, RestoredPassiveBluetoothDataUpdate]]] =
Store(
241 hass, STORAGE_VERSION, STORAGE_KEY
243 coordinators: set[PassiveBluetoothProcessorCoordinator[Any]] = set()
244 all_restore_data: dict[str, dict[str, RestoredPassiveBluetoothDataUpdate]] = (
245 await storage.async_load()
or {}
248 coordinators, all_restore_data
251 async
def _async_save_processor_data(_: Any) ->
None:
252 """Save the processor data."""
253 await storage.async_save(
255 coordinator.restore_key: coordinator.async_get_restore_data()
256 for coordinator
in coordinators
257 if coordinator.restore_key
262 hass, _async_save_processor_data, STORAGE_SAVE_INTERVAL
265 async
def _async_save_processor_data_at_stop(_event: Event) ->
None:
266 """Save the processor data at shutdown."""
268 await _async_save_processor_data(
None)
270 hass.bus.async_listen_once(
271 EVENT_HOMEASSISTANT_STOP,
272 _async_save_processor_data_at_stop,
277 """Passive bluetooth processor coordinator for bluetooth advertisements.
279 The coordinator is responsible for dispatching the bluetooth data,
280 to each processor, and tracking devices.
282 The update_method should return the data that is dispatched to each processor.
283 This is normally a parsed form of the data, but you can just forward the
284 BluetoothServiceInfoBleak if needed.
290 logger: logging.Logger,
292 mode: BluetoothScanningMode,
293 update_method: Callable[[BluetoothServiceInfoBleak], _DataT],
294 connectable: bool =
False,
296 """Initialize the coordinator."""
297 super().
__init__(hass, logger, address, mode, connectable)
298 self._processors: list[PassiveBluetoothDataProcessor[Any, _DataT]] = []
301 self.restore_data: dict[str, RestoredPassiveBluetoothDataUpdate] = {}
303 if config_entry := config_entries.current_entry.get():
304 self.
restore_keyrestore_key = config_entry.entry_id
309 """Return if the device is available."""
315 ) -> dict[str, RestoredPassiveBluetoothDataUpdate]:
316 """Generate the restore data."""
318 processor.restore_key: processor.data.async_get_restore_data()
319 for processor
in self._processors
320 if processor.restore_key
326 processor: PassiveBluetoothDataProcessor[Any, _DataT],
327 entity_description_class: type[EntityDescription] |
None =
None,
328 ) -> Callable[[],
None]:
329 """Register a processor that subscribes to updates."""
334 processor.async_register_coordinator(self, entity_description_class)
337 def remove_processor() -> None:
338 """Remove a processor."""
341 if restore_key := processor.restore_key:
342 self.restore_data[restore_key] = processor.data.async_get_restore_data()
344 self._processors.
remove(processor)
346 self._processors.append(processor)
347 return remove_processor
351 self, service_info: BluetoothServiceInfoBleak
353 """Handle the device going unavailable."""
355 for processor
in self._processors:
356 processor.async_handle_unavailable()
361 service_info: BluetoothServiceInfoBleak,
362 change: BluetoothChange,
364 """Handle a Bluetooth event."""
367 if self.
hasshass.is_stopping:
374 self.
loggerlogger.exception(
"Unexpected error updating %s data", self.
namename)
379 self.
loggerlogger.info(
"Coordinator %s recovered", self.
namename)
381 for processor
in self._processors:
382 processor.async_handle_update(update, was_available)
386 """Passive bluetooth data processor for bluetooth advertisements.
388 The processor is responsible for keeping track of the bluetooth data
389 and updating subscribers.
391 The update_method must return a PassiveBluetoothDataUpdate object. Callers
392 are responsible for formatting the data returned from their parser into
393 the appropriate format.
395 The processor will call the update_method every time the bluetooth device
396 receives a new advertisement data from the coordinator with the data
397 returned by the update_method of the coordinator.
399 As the size of each advertisement is limited, the update_method should
400 return a PassiveBluetoothDataUpdate object that contains only data that
401 should be updated. The coordinator will then dispatch subscribers based
402 on the data in the PassiveBluetoothDataUpdate object. The accumulated data
403 is available in the devices, entity_data, and entity_descriptions attributes.
406 coordinator: PassiveBluetoothProcessorCoordinator[_DataT]
407 data: PassiveBluetoothDataUpdate[_T]
408 entity_names: dict[PassiveBluetoothEntityKey, str |
None]
409 entity_data: dict[PassiveBluetoothEntityKey, _T]
410 entity_descriptions: dict[PassiveBluetoothEntityKey, EntityDescription]
411 devices: dict[str |
None, DeviceInfo]
412 restore_key: str |
None
416 update_method: Callable[[_DataT], PassiveBluetoothDataUpdate[_T]],
417 restore_key: str |
None =
None,
419 """Initialize the coordinator."""
424 self._listeners: list[
425 Callable[[PassiveBluetoothDataUpdate[_T] |
None],
None]
427 self._entity_key_listeners: dict[
428 PassiveBluetoothEntityKey,
429 list[Callable[[PassiveBluetoothDataUpdate[_T] |
None],
None]],
437 coordinator: PassiveBluetoothProcessorCoordinator[_DataT],
438 entity_description_class: type[EntityDescription] |
None,
440 """Register a coordinator."""
451 entity_description_class
453 and (restore_data := coordinator.restore_data)
454 and (restored_processor_data := restore_data.get(restore_key))
456 data.async_set_restore_data(
457 restored_processor_data,
458 entity_description_class,
464 """Return if the device is available."""
469 """Handle the device going unavailable."""
475 entity_class: type[PassiveBluetoothProcessorEntity[Self]],
476 async_add_entities: AddEntitiesCallback,
477 ) -> Callable[[],
None]:
478 """Add a listener for new entities."""
479 created: set[PassiveBluetoothEntityKey] = set()
482 def _async_add_or_update_entities(
483 data: PassiveBluetoothDataUpdate[_T] |
None,
485 """Listen for new entities."""
486 if data
is None or created.issuperset(data.entity_descriptions):
488 entities: list[PassiveBluetoothProcessorEntity[Self]] = []
489 for entity_key, description
in data.entity_descriptions.items():
490 if entity_key
not in created:
491 entities.append(entity_class(self, entity_key, description))
492 created.add(entity_key)
501 update_callback: Callable[[PassiveBluetoothDataUpdate[_T] |
None],
None],
502 ) -> Callable[[],
None]:
503 """Listen for all updates."""
506 def remove_listener() -> None:
507 """Remove update listener."""
508 self._listeners.
remove(update_callback)
510 self._listeners.append(update_callback)
511 return remove_listener
516 update_callback: Callable[[PassiveBluetoothDataUpdate[_T] |
None],
None],
517 entity_key: PassiveBluetoothEntityKey,
518 ) -> Callable[[],
None]:
519 """Listen for updates by device key."""
522 def remove_listener() -> None:
523 """Remove update listener."""
524 self._entity_key_listeners[entity_key].
remove(update_callback)
525 if not self._entity_key_listeners[entity_key]:
526 del self._entity_key_listeners[entity_key]
528 self._entity_key_listeners.setdefault(entity_key, []).append(update_callback)
529 return remove_listener
534 data: PassiveBluetoothDataUpdate[_T] |
None,
535 was_available: bool |
None =
None,
536 changed_entity_keys: set[PassiveBluetoothEntityKey] |
None =
None,
538 """Update all registered listeners."""
539 if was_available
is None:
540 was_available = self.
coordinatorcoordinator.available
543 for update_callback
in self._listeners:
544 update_callback(data)
546 if not was_available
or data
is None:
550 for listeners
in self._entity_key_listeners.values():
551 for update_callback
in listeners:
552 update_callback(data)
557 entity_key_listeners = self._entity_key_listeners
558 for entity_key
in data.entity_data:
561 and changed_entity_keys
is not None
562 and entity_key
not in changed_entity_keys
565 if maybe_listener := entity_key_listeners.get(entity_key):
566 for update_callback
in maybe_listener:
567 update_callback(data)
571 self, update: _DataT, was_available: bool |
None =
None
573 """Handle a Bluetooth event."""
579 "Unexpected error updating %s data", self.
coordinatorcoordinator.name
583 if not isinstance(new_data, PassiveBluetoothDataUpdate):
586 f
"The update_method for {self.coordinator.name} returned"
587 f
" {new_data} instead of a PassiveBluetoothDataUpdate"
593 "Processing %s data recovered", self.
coordinatorcoordinator.name
596 changed_entity_keys = self.
datadata.
update(new_data)
602 _PassiveBluetoothDataProcessorT: PassiveBluetoothDataProcessor[Any, Any]
604 """A class for entities using PassiveBluetoothDataProcessor."""
606 _attr_has_entity_name =
True
607 _attr_should_poll =
False
611 processor: _PassiveBluetoothDataProcessorT,
612 entity_key: PassiveBluetoothEntityKey,
613 description: EntityDescription,
616 """Create the entity with a PassiveBluetoothDataProcessor."""
617 self.entity_description = description
618 self.entity_key = entity_key
619 self.processor = processor
620 self.processor_context = context
621 address = processor.coordinator.address
622 device_id = entity_key.device_id
623 devices = processor.devices
625 if device_id
in devices:
626 base_device_info = devices[device_id]
631 {ATTR_IDENTIFIERS: {(DOMAIN, f
"{address}-{device_id}")}}
633 self._attr_unique_id = f
"{address}-{key}-{device_id}"
635 self._attr_device_info = base_device_info |
DeviceInfo(
636 {ATTR_IDENTIFIERS: {(DOMAIN, address)}}
638 self._attr_unique_id = f
"{address}-{key}"
639 if ATTR_NAME
not in self._attr_device_info:
640 self._attr_device_info[ATTR_NAME] = self.processor.coordinator.name
641 if device_id
is None:
642 self._attr_device_info[ATTR_CONNECTIONS] = {(CONNECTION_BLUETOOTH, address)}
643 if (name := processor.entity_names.get(entity_key))
is not None:
644 self._attr_name = name
648 """Return if entity is available."""
649 return self.processor.available
652 """When entity is added to hass."""
654 self.async_on_remove(
655 self.processor.async_add_entity_key_listener(
656 self._handle_processor_update, self.entity_key
663 new_data: PassiveBluetoothDataUpdate[_PassiveBluetoothDataProcessorT] |
None,
665 """Handle updated data from the processor."""
666 self.async_write_ha_state()
Callable[[], None] async_add_entity_key_listener(self, Callable[[PassiveBluetoothDataUpdate[_T]|None], None] update_callback, PassiveBluetoothEntityKey entity_key)
None __init__(self, Callable[[_DataT], PassiveBluetoothDataUpdate[_T]] update_method, str|None restore_key=None)
None async_update_listeners(self, PassiveBluetoothDataUpdate[_T]|None data, bool|None was_available=None, set[PassiveBluetoothEntityKey]|None changed_entity_keys=None)
Callable[[], None] async_add_listener(self, Callable[[PassiveBluetoothDataUpdate[_T]|None], None] update_callback)
Callable[[], None] async_add_entities_listener(self, type[PassiveBluetoothProcessorEntity[Self]] entity_class, AddEntitiesCallback async_add_entities)
None async_handle_unavailable(self)
None async_register_coordinator(self, PassiveBluetoothProcessorCoordinator[_DataT] coordinator, type[EntityDescription]|None entity_description_class)
None async_handle_update(self, _DataT update, bool|None was_available=None)
None async_set_restore_data(self, RestoredPassiveBluetoothDataUpdate restore_data, type[EntityDescription] entity_description_class)
set[PassiveBluetoothEntityKey]|None update(self, PassiveBluetoothDataUpdate[_T] new_data)
RestoredPassiveBluetoothDataUpdate async_get_restore_data(self)
PassiveBluetoothEntityKey from_string(cls, str key)
None __init__(self, HomeAssistant hass, logging.Logger logger, str address, BluetoothScanningMode mode, Callable[[BluetoothServiceInfoBleak], _DataT] update_method, bool connectable=False)
None _async_handle_unavailable(self, BluetoothServiceInfoBleak service_info)
dict[str, RestoredPassiveBluetoothDataUpdate] async_get_restore_data(self)
None _async_handle_bluetooth_event(self, BluetoothServiceInfoBleak service_info, BluetoothChange change)
Callable[[], None] async_register_processor(self, PassiveBluetoothDataProcessor[Any, _DataT] processor, type[EntityDescription]|None entity_description_class=None)
bool remove(self, _T matcher)
EntityDescription deserialize_entity_description(type[EntityDescription] descriptions_class, dict[str, Any] data)
None async_setup(HomeAssistant hass)
dict[str, Any] serialize_entity_description(EntityDescription description)
None _handle_processor_update(self, PassiveBluetoothDataUpdate[_PassiveBluetoothDataProcessorT]|None new_data)
None async_added_to_hass(self)
None __init__(self, _PassiveBluetoothDataProcessorT processor, PassiveBluetoothEntityKey entity_key, EntityDescription description, Any context=None)
CALLBACK_TYPE async_register_coordinator_for_restore(HomeAssistant hass, PassiveBluetoothProcessorCoordinator[Any] coordinator)
web.Response get(self, web.Request request, str config_key)
IssData update(pyiss.ISS iss)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)