Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Read Your Meter Pro integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from pyrympro import CannotConnectError, OperationError, RymPro, UnauthorizedError
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import DOMAIN
14 
15 SCAN_INTERVAL = 60 * 60
16 
17 _LOGGER = logging.getLogger(__name__)
18 
19 
21  """Class to manage fetching RYM Pro data."""
22 
23  def __init__(self, hass: HomeAssistant, rympro: RymPro) -> None:
24  """Initialize global RymPro data updater."""
25  self.rymprorympro = rympro
26  interval = timedelta(seconds=SCAN_INTERVAL)
27  super().__init__(
28  hass,
29  _LOGGER,
30  name=DOMAIN,
31  update_interval=interval,
32  )
33 
34  async def _async_update_data(self) -> dict[int, dict]:
35  """Fetch data from Rym Pro."""
36  try:
37  meters = await self.rymprorympro.last_read()
38  for meter_id, meter in meters.items():
39  meter["consumption_forecast"] = await self.rymprorympro.consumption_forecast(
40  meter_id
41  )
42  except UnauthorizedError as error:
43  assert self.config_entryconfig_entry
44  await self.hasshass.config_entries.async_reload(self.config_entryconfig_entry.entry_id)
45  raise UpdateFailed(error) from error
46  except (CannotConnectError, OperationError) as error:
47  raise UpdateFailed(error) from error
48  return meters
None __init__(self, HomeAssistant hass, RymPro rympro)
Definition: coordinator.py:23