Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """IoTaWatt DataUpdateCoordinator."""
2 
3 from __future__ import annotations
4 
5 from datetime import datetime, timedelta
6 import logging
7 
8 from iotawattpy.iotawatt import Iotawatt
9 
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.const import CONF_HOST, CONF_PASSWORD, CONF_USERNAME
12 from homeassistant.core import HomeAssistant
13 from homeassistant.helpers import httpx_client
14 from homeassistant.helpers.debounce import Debouncer
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
16 
17 from .const import CONNECTION_ERRORS
18 
19 _LOGGER = logging.getLogger(__name__)
20 
21 # Matches iotwatt data log interval
22 REQUEST_REFRESH_DEFAULT_COOLDOWN = 5
23 
24 
26  """Class to manage fetching update data from the IoTaWatt Energy Device."""
27 
28  api: Iotawatt | None = None
29 
30  def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
31  """Initialize IotaWattUpdater object."""
32  self.entryentry = entry
33  super().__init__(
34  hass=hass,
35  logger=_LOGGER,
36  name=entry.title,
37  update_interval=timedelta(seconds=30),
38  request_refresh_debouncer=Debouncer(
39  hass,
40  _LOGGER,
41  cooldown=REQUEST_REFRESH_DEFAULT_COOLDOWN,
42  immediate=True,
43  ),
44  )
45 
46  self._last_run_last_run: datetime | None = None
47 
48  def update_last_run(self, last_run: datetime) -> None:
49  """Notify coordinator of a sensor last update time."""
50  # We want to fetch the data from the iotawatt since HA was last shutdown.
51  # We retrieve from the sensor last updated.
52  # This method is called from each sensor upon their state being restored.
53  if self._last_run_last_run is None or last_run > self._last_run_last_run:
54  self._last_run_last_run = last_run
55 
56  async def _async_update_data(self):
57  """Fetch sensors from IoTaWatt device."""
58  if self.apiapi is None:
59  api = Iotawatt(
60  self.entryentry.title,
61  self.entryentry.data[CONF_HOST],
62  httpx_client.get_async_client(self.hasshass),
63  self.entryentry.data.get(CONF_USERNAME),
64  self.entryentry.data.get(CONF_PASSWORD),
65  integratedInterval="d",
66  includeNonTotalSensors=False,
67  )
68  try:
69  is_authenticated = await api.connect()
70  except CONNECTION_ERRORS as err:
71  raise UpdateFailed("Connection failed") from err
72 
73  if not is_authenticated:
74  raise UpdateFailed("Authentication error")
75 
76  self.apiapi = api
77 
78  await self.apiapi.update(lastUpdate=self._last_run_last_run)
79  self._last_run_last_run = None
80  return self.apiapi.getSensors()
None __init__(self, HomeAssistant hass, ConfigEntry entry)
Definition: coordinator.py:30
IssData update(pyiss.ISS iss)
Definition: __init__.py:33