1 """Passive update coordinator for the Bluetooth integration."""
3 from __future__
import annotations
5 from typing
import TYPE_CHECKING, Any
7 from typing_extensions
import TypeVar
11 BaseCoordinatorEntity,
12 BaseDataUpdateCoordinatorProtocol,
15 from .update_coordinator
import BasePassiveBluetoothCoordinator
18 from collections.abc
import Callable, Generator
21 from .
import BluetoothChange, BluetoothScanningMode, BluetoothServiceInfoBleak
23 _PassiveBluetoothDataUpdateCoordinatorT = TypeVar(
24 "_PassiveBluetoothDataUpdateCoordinatorT",
25 bound=
"PassiveBluetoothDataUpdateCoordinator",
26 default=
"PassiveBluetoothDataUpdateCoordinator",
31 BasePassiveBluetoothCoordinator, BaseDataUpdateCoordinatorProtocol
33 """Class to manage passive bluetooth advertisements.
35 This coordinator is responsible for dispatching the bluetooth data
42 logger: logging.Logger,
44 mode: BluetoothScanningMode,
45 connectable: bool =
False,
47 """Initialize PassiveBluetoothDataUpdateCoordinator."""
48 super().
__init__(hass, logger, address, mode, connectable)
49 self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object |
None]] = {}
53 """Return if device is available."""
58 """Update all registered listeners."""
59 for update_callback, _
in list(self._listeners.values()):
64 self, service_info: BluetoothServiceInfoBleak
66 """Handle the device going unavailable."""
72 self, update_callback: CALLBACK_TYPE, context: Any =
None
73 ) -> Callable[[],
None]:
74 """Listen for data updates."""
77 def remove_listener() -> None:
78 """Remove update listener."""
79 self._listeners.pop(remove_listener)
81 self._listeners[remove_listener] = (update_callback, context)
82 return remove_listener
85 """Return all registered contexts."""
87 context
for _, context
in self._listeners.values()
if context
is not None
93 service_info: BluetoothServiceInfoBleak,
94 change: BluetoothChange,
96 """Handle a Bluetooth event."""
102 BaseCoordinatorEntity[_PassiveBluetoothDataUpdateCoordinatorT]
104 """A class for entities using DataUpdateCoordinator."""
107 """All updates are passive."""
111 """Return if entity is available."""
112 return self.coordinator.available
None _async_handle_unavailable(self, BluetoothServiceInfoBleak service_info)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Generator[Any] async_contexts(self)
None __init__(self, HomeAssistant hass, logging.Logger logger, str address, BluetoothScanningMode mode, bool connectable=False)
None _async_handle_bluetooth_event(self, BluetoothServiceInfoBleak service_info, BluetoothChange change)
None async_update_listeners(self)