Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Epion data coordinator."""
2 
3 import logging
4 from typing import Any
5 
6 from epion import Epion, EpionAuthenticationError, EpionConnectionError
7 
8 from homeassistant.core import HomeAssistant
9 from homeassistant.exceptions import ConfigEntryAuthFailed
10 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
11 
12 from .const import REFRESH_INTERVAL
13 
14 _LOGGER = logging.getLogger(__name__)
15 
16 
17 class EpionCoordinator(DataUpdateCoordinator[dict[str, Any]]):
18  """Epion data update coordinator."""
19 
20  def __init__(self, hass: HomeAssistant, epion_api: Epion) -> None:
21  """Initialize the Epion coordinator."""
22  super().__init__(
23  hass,
24  _LOGGER,
25  name="Epion",
26  update_interval=REFRESH_INTERVAL,
27  )
28  self.epion_apiepion_api = epion_api
29 
30  async def _async_update_data(self) -> dict[str, Any]:
31  """Fetch data from Epion API and construct a dictionary with device IDs as keys."""
32  try:
33  response = await self.hasshass.async_add_executor_job(
34  self.epion_apiepion_api.get_current
35  )
36  except EpionAuthenticationError as err:
37  _LOGGER.error("Authentication error with Epion API")
38  raise ConfigEntryAuthFailed from err
39  except EpionConnectionError as err:
40  _LOGGER.error("Epion API connection problem")
41  raise UpdateFailed(f"Error communicating with API: {err}") from err
42  device_data = {}
43  for epion_device in response["devices"]:
44  device_data[epion_device["deviceId"]] = epion_device
45  return device_data
None __init__(self, HomeAssistant hass, Epion epion_api)
Definition: coordinator.py:20