1 """DataUpdateCoordinator for the co2signal integration."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from aioelectricitymaps
import (
9 CarbonIntensityResponse,
12 ElectricityMapsInvalidTokenError,
20 from .const
import DOMAIN
21 from .helpers
import fetch_latest_carbon_intensity
23 _LOGGER = logging.getLogger(__name__)
27 """Data update coordinator."""
29 config_entry: ConfigEntry
31 def __init__(self, hass: HomeAssistant, client: ElectricityMaps) ->
None:
32 """Initialize the coordinator."""
34 hass, _LOGGER, name=DOMAIN, update_interval=
timedelta(minutes=15)
40 """Return entry ID."""
44 """Fetch the latest data from the source."""
50 except ElectricityMapsInvalidTokenError
as err:
51 raise ConfigEntryAuthFailed
from err
52 except ElectricityMapsError
as err:
None __init__(self, HomeAssistant hass, ElectricityMaps client)
CarbonIntensityResponse _async_update_data(self)
CarbonIntensityResponse fetch_latest_carbon_intensity(HomeAssistant hass, ElectricityMaps em, Mapping[str, Any] config)