1 """Provide an OpenExchangeRates data coordinator."""
3 from __future__
import annotations
6 from datetime
import timedelta
8 from aiohttp
import ClientSession
9 from aioopenexchangerates
import (
12 OpenExchangeRatesAuthError,
13 OpenExchangeRatesClientError,
20 from .const
import CLIENT_TIMEOUT, DOMAIN, LOGGER
24 """Represent a coordinator for Open Exchange Rates API."""
29 session: ClientSession,
32 update_interval: timedelta,
34 """Initialize the coordinator."""
36 hass, LOGGER, name=f
"{DOMAIN} base {base}", update_interval=update_interval
39 self.
clientclient = Client(api_key, session)
42 """Update data from Open Exchange Rates."""
44 async
with asyncio.timeout(CLIENT_TIMEOUT):
45 latest = await self.
clientclient.get_latest(base=self.
basebase)
46 except OpenExchangeRatesAuthError
as err:
48 except OpenExchangeRatesClientError
as err:
51 LOGGER.debug(
"Result: %s", latest)
Latest _async_update_data(self)
None __init__(self, HomeAssistant hass, ClientSession session, str api_key, str base, timedelta update_interval)