Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """Support for Skybeacon temperature/humidity Bluetooth LE sensors."""
2 
3 from __future__ import annotations
4 
5 import logging
6 import threading
7 from uuid import UUID
8 
9 from pygatt import BLEAddressType
10 from pygatt.backends import Characteristic, GATTToolBackend
11 from pygatt.exceptions import BLEError, NotConnectedError, NotificationTimeout
12 import voluptuous as vol
13 
15  PLATFORM_SCHEMA as SENSOR_PLATFORM_SCHEMA,
16  SensorDeviceClass,
17  SensorEntity,
18 )
19 from homeassistant.const import (
20  CONF_MAC,
21  CONF_NAME,
22  EVENT_HOMEASSISTANT_STOP,
23  PERCENTAGE,
24  STATE_UNKNOWN,
25  UnitOfTemperature,
26 )
27 from homeassistant.core import HomeAssistant
29 from homeassistant.helpers.entity_platform import AddEntitiesCallback
30 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
31 
32 _LOGGER = logging.getLogger(__name__)
33 
34 ATTR_DEVICE = "device"
35 ATTR_MODEL = "model"
36 
37 BLE_TEMP_HANDLE = 0x24
38 BLE_TEMP_UUID = "0000ff92-0000-1000-8000-00805f9b34fb"
39 
40 CONNECT_LOCK = threading.Lock()
41 CONNECT_TIMEOUT = 30
42 
43 DEFAULT_NAME = "Skybeacon"
44 
45 SKIP_HANDLE_LOOKUP = True
46 
47 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
48  {
49  vol.Required(CONF_MAC): cv.string,
50  vol.Optional(CONF_NAME, default=DEFAULT_NAME): cv.string,
51  }
52 )
53 
54 
56  hass: HomeAssistant,
57  config: ConfigType,
58  add_entities: AddEntitiesCallback,
59  discovery_info: DiscoveryInfoType | None = None,
60 ) -> None:
61  """Set up the Skybeacon sensor."""
62  name = config.get(CONF_NAME)
63  mac = config.get(CONF_MAC)
64  _LOGGER.debug("Setting up")
65 
66  mon = Monitor(hass, mac, name)
67  add_entities([SkybeaconTemp(name, mon)])
68  add_entities([SkybeaconHumid(name, mon)])
69 
70  def monitor_stop(_service_or_event):
71  """Stop the monitor thread."""
72  _LOGGER.debug("Stopping monitor for %s", name)
73  mon.terminate()
74 
75  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, monitor_stop)
76  mon.start()
77 
78 
80  """Representation of a Skybeacon humidity sensor."""
81 
82  _attr_native_unit_of_measurement = PERCENTAGE
83 
84  def __init__(self, name, mon):
85  """Initialize a sensor."""
86  self.monmon = mon
87  self._name_name = name
88 
89  @property
90  def name(self):
91  """Return the name of the sensor."""
92  return self._name_name
93 
94  @property
95  def native_value(self):
96  """Return the state of the device."""
97  return self.monmon.data["humid"]
98 
99  @property
101  """Return the state attributes of the sensor."""
102  return {ATTR_DEVICE: "SKYBEACON", ATTR_MODEL: 1}
103 
104 
106  """Representation of a Skybeacon temperature sensor."""
107 
108  _attr_device_class = SensorDeviceClass.TEMPERATURE
109  _attr_native_unit_of_measurement = UnitOfTemperature.CELSIUS
110 
111  def __init__(self, name, mon):
112  """Initialize a sensor."""
113  self.monmon = mon
114  self._name_name = name
115 
116  @property
117  def name(self):
118  """Return the name of the sensor."""
119  return self._name_name
120 
121  @property
122  def native_value(self):
123  """Return the state of the device."""
124  return self.monmon.data["temp"]
125 
126  @property
128  """Return the state attributes of the sensor."""
129  return {ATTR_DEVICE: "SKYBEACON", ATTR_MODEL: 1}
130 
131 
132 class Monitor(threading.Thread, SensorEntity):
133  """Connection handling."""
134 
135  def __init__(self, hass, mac, name):
136  """Construct interface object."""
137  threading.Thread.__init__(self)
138  self.daemondaemon = False
139  self.hasshasshass = hass
140  self.macmac = mac
141  self.namenamename = name
142  self.datadata = {"temp": STATE_UNKNOWN, "humid": STATE_UNKNOWN}
143  self.keep_goingkeep_going = True
144  self.eventevent = threading.Event()
145 
146  def run(self):
147  """Thread that keeps connection alive."""
148  cached_char = Characteristic(BLE_TEMP_UUID, BLE_TEMP_HANDLE)
149  adapter = GATTToolBackend()
150  while True:
151  try:
152  _LOGGER.debug("Connecting to %s", self.namenamename)
153  # We need concurrent connect, so lets not reset the device
154  adapter.start(reset_on_start=False)
155  # Seems only one connection can be initiated at a time
156  with CONNECT_LOCK:
157  device = adapter.connect(
158  self.macmac, CONNECT_TIMEOUT, BLEAddressType.random
159  )
160  if SKIP_HANDLE_LOOKUP:
161  # HACK: inject handle mapping collected offline
162  device._characteristics[UUID(BLE_TEMP_UUID)] = cached_char # noqa: SLF001
163  # Magic: writing this makes device happy
164  device.char_write_handle(0x1B, bytearray([255]), False)
165  device.subscribe(BLE_TEMP_UUID, self._update_update)
166  _LOGGER.debug("Subscribed to %s", self.namenamename)
167  while self.keep_goingkeep_going:
168  # protect against stale connections, just read temperature
169  device.char_read(BLE_TEMP_UUID, timeout=CONNECT_TIMEOUT)
170  self.eventevent.wait(60)
171  break
172  except (BLEError, NotConnectedError, NotificationTimeout) as ex:
173  _LOGGER.error("Exception: %s ", str(ex))
174  finally:
175  adapter.stop()
176 
177  def _update(self, handle, value):
178  """Notification callback from pygatt."""
179  _LOGGER.debug(
180  "%s: %15s temperature = %-2d.%-2d, humidity = %3d",
181  handle,
182  self.namenamename,
183  value[0],
184  value[2],
185  value[1],
186  )
187  self.datadata["temp"] = float(f"{value[0]}.{value[2]}")
188  self.datadata["humid"] = value[1]
189 
190  def terminate(self):
191  """Signal runner to stop and join thread."""
192  self.keep_goingkeep_going = False
193  self.eventevent.set()
194  self.join()
str|UndefinedType|None name(self)
Definition: entity.py:738
def add_entities(account, async_add_entities, tracked)
Definition: sensor.py:40
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:60