Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Coordinator for easyEnergy."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import NamedTuple
7 
8 from easyenergy import (
9  EasyEnergy,
10  EasyEnergyConnectionError,
11  EasyEnergyNoDataError,
12  Electricity,
13  Gas,
14 )
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.core import HomeAssistant
18 from homeassistant.helpers.aiohttp_client import async_get_clientsession
19 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
20 from homeassistant.util import dt as dt_util
21 
22 from .const import DOMAIN, LOGGER, SCAN_INTERVAL, THRESHOLD_HOUR
23 
24 
25 class EasyEnergyData(NamedTuple):
26  """Class for defining data in dict."""
27 
28  energy_today: Electricity
29  energy_tomorrow: Electricity | None
30  gas_today: Gas | None
31 
32 
34  """Class to manage fetching easyEnergy data from single endpoint."""
35 
36  config_entry: ConfigEntry
37 
38  def __init__(self, hass: HomeAssistant) -> None:
39  """Initialize global easyEnergy data updater."""
40  super().__init__(
41  hass,
42  LOGGER,
43  name=DOMAIN,
44  update_interval=SCAN_INTERVAL,
45  )
46 
47  self.easyenergyeasyenergy = EasyEnergy(session=async_get_clientsession(hass))
48 
49  async def _async_update_data(self) -> EasyEnergyData:
50  """Fetch data from easyEnergy."""
51  today = dt_util.now().date()
52  gas_today = None
53  energy_tomorrow = None
54 
55  try:
56  energy_today = await self.easyenergyeasyenergy.energy_prices(
57  start_date=today, end_date=today
58  )
59  try:
60  gas_today = await self.easyenergyeasyenergy.gas_prices(
61  start_date=today, end_date=today
62  )
63  except EasyEnergyNoDataError:
64  LOGGER.debug("No data for gas prices for easyEnergy integration")
65  # Energy for tomorrow only after 14:00 UTC
66  if dt_util.utcnow().hour >= THRESHOLD_HOUR:
67  tomorrow = today + timedelta(days=1)
68  try:
69  energy_tomorrow = await self.easyenergyeasyenergy.energy_prices(
70  start_date=tomorrow, end_date=tomorrow
71  )
72  except EasyEnergyNoDataError:
73  LOGGER.debug(
74  "No electricity data for tomorrow for easyEnergy integration"
75  )
76 
77  except EasyEnergyConnectionError as err:
78  raise UpdateFailed("Error communicating with easyEnergy API") from err
79 
80  return EasyEnergyData(
81  energy_today=energy_today,
82  energy_tomorrow=energy_tomorrow,
83  gas_today=gas_today,
84  )
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)