Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Coordinator for EnergyZero."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 from typing import NamedTuple
7 
8 from energyzero import (
9  Electricity,
10  EnergyZero,
11  EnergyZeroConnectionError,
12  EnergyZeroNoDataError,
13  Gas,
14 )
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.core import HomeAssistant
18 from homeassistant.helpers.aiohttp_client import async_get_clientsession
19 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
20 from homeassistant.util import dt as dt_util
21 
22 from .const import DOMAIN, LOGGER, SCAN_INTERVAL, THRESHOLD_HOUR
23 
24 
25 class EnergyZeroData(NamedTuple):
26  """Class for defining data in dict."""
27 
28  energy_today: Electricity
29  energy_tomorrow: Electricity | None
30  gas_today: Gas | None
31 
32 
34  """Class to manage fetching EnergyZero data from single endpoint."""
35 
36  config_entry: ConfigEntry
37 
38  def __init__(self, hass: HomeAssistant) -> None:
39  """Initialize global EnergyZero data updater."""
40  super().__init__(
41  hass,
42  LOGGER,
43  name=DOMAIN,
44  update_interval=SCAN_INTERVAL,
45  )
46 
47  self.energyzeroenergyzero = EnergyZero(session=async_get_clientsession(hass))
48 
49  async def _async_update_data(self) -> EnergyZeroData:
50  """Fetch data from EnergyZero."""
51  today = dt_util.now().date()
52  gas_today = None
53  energy_tomorrow = None
54 
55  try:
56  energy_today = await self.energyzeroenergyzero.energy_prices(
57  start_date=today, end_date=today
58  )
59  try:
60  gas_today = await self.energyzeroenergyzero.gas_prices(
61  start_date=today, end_date=today
62  )
63  except EnergyZeroNoDataError:
64  LOGGER.debug("No data for gas prices for EnergyZero integration")
65  # Energy for tomorrow only after 14:00 UTC
66  if dt_util.utcnow().hour >= THRESHOLD_HOUR:
67  tomorrow = today + timedelta(days=1)
68  try:
69  energy_tomorrow = await self.energyzeroenergyzero.energy_prices(
70  start_date=tomorrow, end_date=tomorrow
71  )
72  except EnergyZeroNoDataError:
73  LOGGER.debug("No data for tomorrow for EnergyZero integration")
74 
75  except EnergyZeroConnectionError as err:
76  raise UpdateFailed("Error communicating with EnergyZero API") from err
77 
78  return EnergyZeroData(
79  energy_today=energy_today,
80  energy_tomorrow=energy_tomorrow,
81  gas_today=gas_today,
82  )
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)