Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Polling coordinator for the Sensoterra integration."""
2 
3 from collections.abc import Callable
4 from datetime import timedelta
5 
6 from sensoterra.customerapi import (
7  CustomerApi,
8  InvalidAuth as ApiAuthError,
9  Timeout as ApiTimeout,
10 )
11 from sensoterra.probe import Probe, Sensor
12 
13 from homeassistant.core import HomeAssistant
14 from homeassistant.exceptions import ConfigEntryError
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
16 
17 from .const import LOGGER, SCAN_INTERVAL_MINUTES
18 
19 
21  """Sensoterra coordinator."""
22 
23  def __init__(self, hass: HomeAssistant, api: CustomerApi) -> None:
24  """Initialize Sensoterra coordinator."""
25  super().__init__(
26  hass,
27  LOGGER,
28  name="Sensoterra probe",
29  update_interval=timedelta(minutes=SCAN_INTERVAL_MINUTES),
30  )
31  self.apiapi = api
32  self.add_devices_callback: Callable[[list[Probe]], None] | None = None
33 
34  async def _async_update_data(self) -> list[Probe]:
35  """Fetch data from Sensoterra Customer API endpoint."""
36  try:
37  probes = await self.apiapi.poll()
38  except ApiAuthError as err:
39  raise ConfigEntryError(err) from err
40  except ApiTimeout as err:
41  raise UpdateFailed("Timeout communicating with Sensotera API") from err
42 
43  if self.add_devices_callback is not None:
44  self.add_devices_callback(probes)
45 
46  return probes
47 
48  def get_sensor(self, id: str | None) -> Sensor | None:
49  """Try to find the sensor in the API result."""
50  for probe in self.datadata:
51  for sensor in probe.sensors():
52  if sensor.id == id:
53  return sensor
54  return None
None __init__(self, HomeAssistant hass, CustomerApi api)
Definition: coordinator.py:23