1 """The pvpc_hourly_pricing integration to collect Spain official electric prices."""
3 from datetime
import timedelta
6 from aiopvpc
import BadApiTokenAuthError, EsiosApiData, PVPCData
16 from .const
import ATTR_POWER, ATTR_POWER_P3, ATTR_TARIFF, DOMAIN
18 _LOGGER = logging.getLogger(__name__)
22 """Class to manage fetching Electricity prices data from API."""
25 self, hass: HomeAssistant, entry: ConfigEntry, sensor_keys: set[str]
28 self.
apiapi = PVPCData(
30 tariff=entry.data[ATTR_TARIFF],
31 local_timezone=hass.config.time_zone,
32 power=entry.data[ATTR_POWER],
33 power_valley=entry.data[ATTR_POWER_P3],
34 api_token=entry.data.get(CONF_API_TOKEN),
35 sensor_keys=
tuple(sensor_keys),
38 hass, _LOGGER, name=DOMAIN, update_interval=
timedelta(minutes=30)
44 """Return entry ID."""
45 return self.
_entry_entry.entry_id
48 """Update electricity prices from the ESIOS API."""
50 api_data = await self.
apiapi.async_update_all(self.
datadata, dt_util.utcnow())
51 except BadApiTokenAuthError
as exc:
52 raise ConfigEntryAuthFailed
from exc
55 or not api_data.sensors
56 or not any(api_data.availability.values())
EsiosApiData _async_update_data(self)
None __init__(self, HomeAssistant hass, ConfigEntry entry, set[str] sensor_keys)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)