Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The pvpc_hourly_pricing integration to collect Spain official electric prices."""
2 
3 from datetime import timedelta
4 import logging
5 
6 from aiopvpc import BadApiTokenAuthError, EsiosApiData, PVPCData
7 
8 from homeassistant.config_entries import ConfigEntry
9 from homeassistant.const import CONF_API_TOKEN
10 from homeassistant.core import HomeAssistant
11 from homeassistant.exceptions import ConfigEntryAuthFailed
12 from homeassistant.helpers.aiohttp_client import async_get_clientsession
13 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
14 from homeassistant.util import dt as dt_util
15 
16 from .const import ATTR_POWER, ATTR_POWER_P3, ATTR_TARIFF, DOMAIN
17 
18 _LOGGER = logging.getLogger(__name__)
19 
20 
22  """Class to manage fetching Electricity prices data from API."""
23 
24  def __init__(
25  self, hass: HomeAssistant, entry: ConfigEntry, sensor_keys: set[str]
26  ) -> None:
27  """Initialize."""
28  self.apiapi = PVPCData(
29  session=async_get_clientsession(hass),
30  tariff=entry.data[ATTR_TARIFF],
31  local_timezone=hass.config.time_zone,
32  power=entry.data[ATTR_POWER],
33  power_valley=entry.data[ATTR_POWER_P3],
34  api_token=entry.data.get(CONF_API_TOKEN),
35  sensor_keys=tuple(sensor_keys),
36  )
37  super().__init__(
38  hass, _LOGGER, name=DOMAIN, update_interval=timedelta(minutes=30)
39  )
40  self._entry_entry = entry
41 
42  @property
43  def entry_id(self) -> str:
44  """Return entry ID."""
45  return self._entry_entry.entry_id
46 
47  async def _async_update_data(self) -> EsiosApiData:
48  """Update electricity prices from the ESIOS API."""
49  try:
50  api_data = await self.apiapi.async_update_all(self.datadata, dt_util.utcnow())
51  except BadApiTokenAuthError as exc:
52  raise ConfigEntryAuthFailed from exc
53  if (
54  not api_data
55  or not api_data.sensors
56  or not any(api_data.availability.values())
57  ):
58  raise UpdateFailed
59  return api_data
None __init__(self, HomeAssistant hass, ConfigEntry entry, set[str] sensor_keys)
Definition: coordinator.py:26
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)