1 """Polling coordinator for the Sensoterra integration."""
3 from collections.abc
import Callable
4 from datetime
import timedelta
6 from sensoterra.customerapi
import (
8 InvalidAuth
as ApiAuthError,
11 from sensoterra.probe
import Probe, Sensor
17 from .const
import LOGGER, SCAN_INTERVAL_MINUTES
21 """Sensoterra coordinator."""
23 def __init__(self, hass: HomeAssistant, api: CustomerApi) ->
None:
24 """Initialize Sensoterra coordinator."""
28 name=
"Sensoterra probe",
29 update_interval=
timedelta(minutes=SCAN_INTERVAL_MINUTES),
32 self.add_devices_callback: Callable[[list[Probe]],
None] |
None =
None
35 """Fetch data from Sensoterra Customer API endpoint."""
37 probes = await self.
apiapi.poll()
38 except ApiAuthError
as err:
40 except ApiTimeout
as err:
41 raise UpdateFailed(
"Timeout communicating with Sensotera API")
from err
43 if self.add_devices_callback
is not None:
44 self.add_devices_callback(probes)
49 """Try to find the sensor in the API result."""
50 for probe
in self.
datadata:
51 for sensor
in probe.sensors():
Sensor|None get_sensor(self, str|None id)
None __init__(self, HomeAssistant hass, CustomerApi api)
list[Probe] _async_update_data(self)