Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Provide an OpenExchangeRates data coordinator."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import timedelta
7 
8 from aiohttp import ClientSession
9 from aioopenexchangerates import (
10  Client,
11  Latest,
12  OpenExchangeRatesAuthError,
13  OpenExchangeRatesClientError,
14 )
15 
16 from homeassistant.core import HomeAssistant
17 from homeassistant.exceptions import ConfigEntryAuthFailed
18 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
19 
20 from .const import CLIENT_TIMEOUT, DOMAIN, LOGGER
21 
22 
24  """Represent a coordinator for Open Exchange Rates API."""
25 
26  def __init__(
27  self,
28  hass: HomeAssistant,
29  session: ClientSession,
30  api_key: str,
31  base: str,
32  update_interval: timedelta,
33  ) -> None:
34  """Initialize the coordinator."""
35  super().__init__(
36  hass, LOGGER, name=f"{DOMAIN} base {base}", update_interval=update_interval
37  )
38  self.basebase = base
39  self.clientclient = Client(api_key, session)
40 
41  async def _async_update_data(self) -> Latest:
42  """Update data from Open Exchange Rates."""
43  try:
44  async with asyncio.timeout(CLIENT_TIMEOUT):
45  latest = await self.clientclient.get_latest(base=self.basebase)
46  except OpenExchangeRatesAuthError as err:
47  raise ConfigEntryAuthFailed(err) from err
48  except OpenExchangeRatesClientError as err:
49  raise UpdateFailed(err) from err
50 
51  LOGGER.debug("Result: %s", latest)
52  return latest
None __init__(self, HomeAssistant hass, ClientSession session, str api_key, str base, timedelta update_interval)
Definition: coordinator.py:33