Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Tankerkoenig update coordinator."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from math import ceil
8 
9 from aiotankerkoenig import (
10  PriceInfo,
11  Station,
12  Tankerkoenig,
13  TankerkoenigConnectionError,
14  TankerkoenigError,
15  TankerkoenigInvalidKeyError,
16  TankerkoenigRateLimitError,
17 )
18 
19 from homeassistant.config_entries import ConfigEntry
20 from homeassistant.const import ATTR_ID, CONF_API_KEY, CONF_SHOW_ON_MAP
21 from homeassistant.core import HomeAssistant
22 from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
23 from homeassistant.helpers import device_registry as dr, entity_registry as er
24 from homeassistant.helpers.aiohttp_client import async_get_clientsession
25 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
26 
27 from .const import CONF_FUEL_TYPES, CONF_STATIONS
28 
29 _LOGGER = logging.getLogger(__name__)
30 
31 type TankerkoenigConfigEntry = ConfigEntry[TankerkoenigDataUpdateCoordinator]
32 
33 
35  """Get the latest data from the API."""
36 
37  config_entry: TankerkoenigConfigEntry
38 
39  def __init__(
40  self,
41  hass: HomeAssistant,
42  name: str,
43  update_interval: int,
44  ) -> None:
45  """Initialize the data object."""
46 
47  super().__init__(
48  hass=hass,
49  logger=_LOGGER,
50  name=name,
51  update_interval=timedelta(minutes=update_interval),
52  )
53 
54  self._selected_stations: list[str] = self.config_entryconfig_entry.data[CONF_STATIONS]
55  self.stations: dict[str, Station] = {}
56  self.fuel_types: list[str] = self.config_entryconfig_entry.data[CONF_FUEL_TYPES]
57  self.show_on_map: bool = self.config_entryconfig_entry.options[CONF_SHOW_ON_MAP]
58 
59  self._tankerkoenig_tankerkoenig = Tankerkoenig(
60  api_key=self.config_entryconfig_entry.data[CONF_API_KEY],
61  session=async_get_clientsession(hass),
62  )
63 
64  async def async_setup(self) -> None:
65  """Set up the tankerkoenig API."""
66  for station_id in self._selected_stations:
67  try:
68  station = await self._tankerkoenig_tankerkoenig.station_details(station_id)
69  except TankerkoenigInvalidKeyError as err:
70  _LOGGER.debug(
71  "invalid key error occur during setup of station %s %s",
72  station_id,
73  err,
74  )
75  raise ConfigEntryAuthFailed(err) from err
76  except TankerkoenigConnectionError as err:
77  _LOGGER.debug(
78  "connection error occur during setup of station %s %s",
79  station_id,
80  err,
81  )
82  raise ConfigEntryNotReady(err) from err
83  except TankerkoenigError as err:
84  _LOGGER.error("Error when adding station %s %s", station_id, err)
85  continue
86 
87  self.stations[station_id] = station
88 
89  entity_reg = er.async_get(self.hasshass)
90  for entity in er.async_entries_for_config_entry(
91  entity_reg, self.config_entryconfig_entry.entry_id
92  ):
93  if entity.unique_id.split("_")[0] not in self._selected_stations:
94  _LOGGER.debug("Removing obsolete entity entry %s", entity.entity_id)
95  entity_reg.async_remove(entity.entity_id)
96 
97  device_reg = dr.async_get(self.hasshass)
98  for device in dr.async_entries_for_config_entry(
99  device_reg, self.config_entryconfig_entry.entry_id
100  ):
101  if not any(
102  (ATTR_ID, station_id) in device.identifiers
103  for station_id in self._selected_stations
104  ):
105  _LOGGER.debug("Removing obsolete device entry %s", device.name)
106  device_reg.async_update_device(
107  device.id, remove_config_entry_id=self.config_entryconfig_entry.entry_id
108  )
109 
110  if len(self.stations) > 10:
111  _LOGGER.warning(
112  "Found more than 10 stations to check. "
113  "This might invalidate your api-key on the long run. "
114  "Try using a smaller radius"
115  )
116 
117  async def _async_update_data(self) -> dict[str, PriceInfo]:
118  """Get the latest data from tankerkoenig.de."""
119  station_ids = list(self.stations)
120 
121  prices = {}
122  # The API seems to only return at most 10 results, so split the list in chunks of 10
123  # and merge it together.
124  for index in range(ceil(len(station_ids) / 10)):
125  stations = station_ids[index * 10 : (index + 1) * 10]
126  try:
127  data = await self._tankerkoenig_tankerkoenig.prices(stations)
128  except TankerkoenigInvalidKeyError as err:
129  _LOGGER.debug(
130  "invalid key error occur during update of stations %s %s",
131  stations,
132  err,
133  )
134  raise ConfigEntryAuthFailed(err) from err
135  except TankerkoenigRateLimitError as err:
136  _LOGGER.warning(
137  "API rate limit reached, consider to increase polling interval"
138  )
139  raise UpdateFailed(err) from err
140  except (TankerkoenigError, TankerkoenigConnectionError) as err:
141  _LOGGER.debug(
142  "error occur during update of stations %s %s",
143  stations,
144  err,
145  )
146  raise UpdateFailed(err) from err
147 
148  prices.update(data)
149 
150  return prices
None __init__(self, HomeAssistant hass, str name, int update_interval)
Definition: coordinator.py:44
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)