1 """Tracking for bluetooth low energy devices."""
3 from __future__
import annotations
5 from datetime
import datetime, timedelta
9 from bleak
import BleakClient, BleakError
10 import voluptuous
as vol
16 PLATFORM_SCHEMA
as DEVICE_TRACKER_PLATFORM_SCHEMA,
32 _LOGGER = logging.getLogger(__name__)
36 BATTERY_CHARACTERISTIC_UUID =
UUID(
"00002a19-0000-1000-8000-00805f9b34fb")
37 CONF_TRACK_BATTERY =
"track_battery"
38 CONF_TRACK_BATTERY_INTERVAL =
"track_battery_interval"
41 DATA_BLE_ADAPTER =
"ADAPTER"
45 PLATFORM_SCHEMA = DEVICE_TRACKER_PLATFORM_SCHEMA.extend(
47 vol.Optional(CONF_TRACK_BATTERY, default=
False): cv.boolean,
49 CONF_TRACK_BATTERY_INTERVAL, default=DEFAULT_TRACK_BATTERY_INTERVAL
58 async_see: AsyncSeeCallback,
59 discovery_info: DiscoveryInfoType |
None =
None,
61 """Set up the Bluetooth LE Scanner."""
63 new_devices: dict[str, dict] = {}
65 if config[CONF_TRACK_BATTERY]:
66 battery_track_interval = config[CONF_TRACK_BATTERY_INTERVAL]
70 yaml_path = hass.config.path(YAML_DEVICES)
71 devs_to_track: set[str] = set()
72 devs_no_track: set[str] = set()
73 devs_advertise_time: dict[str, float] = {}
74 devs_track_battery = {}
75 interval: timedelta = config.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL)
78 track_new = config.get(CONF_TRACK_NEW)
80 async
def async_see_device(address, name, new_device=False, battery=None):
81 """Mark a device as seen."""
83 name = name.strip(
"\x00")
86 if address
in new_devices:
87 new_devices[address][
"seen"] += 1
89 new_devices[address][
"name"] = name
91 name = new_devices[address][
"name"]
92 _LOGGER.debug(
"Seen %s %s times", address, new_devices[address][
"seen"])
93 if new_devices[address][
"seen"] < MIN_SEEN_NEW:
95 _LOGGER.debug(
"Adding %s to tracked devices", address)
96 devs_to_track.add(address)
98 devs_track_battery[address] = dt_util.as_utc(
99 datetime.fromtimestamp(0)
102 _LOGGER.debug(
"Seen %s for the first time", address)
103 new_devices[address] = {
"seen": 1,
"name": name}
107 mac=BLE_PREFIX + address,
109 source_type=SourceType.BLUETOOTH_LE,
118 if device.mac
and device.mac[:4].upper() == BLE_PREFIX:
119 address = device.mac[4:]
121 _LOGGER.debug(
"Adding %s to BLE tracker", device.mac)
122 devs_to_track.add(address)
123 if battery_track_interval >
timedelta(0):
124 devs_track_battery[address] = dt_util.as_utc(
125 datetime.fromtimestamp(0)
128 _LOGGER.debug(
"Adding %s to BLE do not track", device.mac)
129 devs_no_track.add(address)
131 if not devs_to_track
and not track_new:
132 _LOGGER.warning(
"No Bluetooth LE devices to track!")
135 async
def _async_see_update_ble_battery(
138 service_info: bluetooth.BluetoothServiceInfoBleak,
140 """Lookup Bluetooth LE devices and update status."""
144 if service_info.connectable:
145 device = service_info.device
146 elif connectable_device := bluetooth.async_ble_device_from_address(
147 hass, service_info.device.address,
True
149 device = connectable_device
155 async
with BleakClient(device)
as client:
156 bat_char = await client.read_gatt_char(BATTERY_CHARACTERISTIC_UUID)
157 battery = ord(bat_char)
160 "Timeout when trying to get battery status for %s", service_info.name
165 except (AttributeError, BleakError)
as err:
166 _LOGGER.debug(
"Could not read battery status: %s", err)
170 del devs_track_battery[mac]
172 await async_see_device(mac, service_info.name, battery=battery)
175 def _async_update_ble(
176 service_info: bluetooth.BluetoothServiceInfoBleak,
177 change: bluetooth.BluetoothChange,
179 """Update from a ble callback."""
180 mac = service_info.address
181 if mac
in devs_to_track:
182 devs_advertise_time[mac] = service_info.time
183 now = dt_util.utcnow()
184 hass.async_create_task(async_see_device(mac, service_info.name))
186 mac
in devs_track_battery
187 and now > devs_track_battery[mac] + battery_track_interval
189 devs_track_battery[mac] = now
190 hass.async_create_background_task(
191 _async_see_update_ble_battery(mac, now, service_info),
192 "bluetooth_le_tracker.device_tracker-see_update_ble_battery",
196 if mac
not in devs_to_track
and mac
not in devs_no_track:
197 _LOGGER.debug(
"Discovered Bluetooth LE device %s", mac)
198 hass.async_create_task(
199 async_see_device(mac, service_info.name, new_device=
True)
203 def _async_refresh_ble(now: datetime) ->
None:
204 """Refresh BLE devices from the discovered service info."""
209 for service_info
in bluetooth.async_discovered_service_info(hass,
False):
211 if service_info.time != devs_advertise_time.get(service_info.address):
212 _async_update_ble(service_info, bluetooth.BluetoothChange.ADVERTISEMENT)
215 bluetooth.async_register_callback(
221 bluetooth.BluetoothScanningMode.ACTIVE,
227 def _async_handle_stop(event: Event) ->
None:
228 """Cancel the callback."""
229 for cancel
in cancels:
232 hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STOP, _async_handle_stop)
234 _async_refresh_ble(dt_util.now())
bool async_setup_scanner(HomeAssistant hass, ConfigType config, AsyncSeeCallback async_see, DiscoveryInfoType|None discovery_info=None)
list[Device] async_load_config(str path, HomeAssistant hass, timedelta consider_home)
CALLBACK_TYPE async_track_time_interval(HomeAssistant hass, Callable[[datetime], Coroutine[Any, Any, None]|None] action, timedelta interval, *str|None name=None, bool|None cancel_on_shutdown=None)