Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data coordinator for WeatherFlow Cloud Data."""
2 
3 from datetime import timedelta
4 
5 from aiohttp import ClientResponseError
6 from weatherflow4py.api import WeatherFlowRestAPI
7 from weatherflow4py.models.rest.unified import WeatherFlowDataREST
8 
9 from homeassistant.core import HomeAssistant
10 from homeassistant.exceptions import ConfigEntryAuthFailed
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import DOMAIN, LOGGER
14 
15 
17  DataUpdateCoordinator[dict[int, WeatherFlowDataREST]]
18 ):
19  """Class to manage fetching REST Based WeatherFlow Forecast data."""
20 
21  def __init__(self, hass: HomeAssistant, api_token: str) -> None:
22  """Initialize global WeatherFlow forecast data updater."""
23  self.weather_apiweather_api = WeatherFlowRestAPI(api_token=api_token)
24  super().__init__(
25  hass,
26  LOGGER,
27  name=DOMAIN,
28  update_interval=timedelta(seconds=60),
29  )
30 
31  async def _async_update_data(self) -> dict[int, WeatherFlowDataREST]:
32  """Fetch data from WeatherFlow Forecast."""
33  try:
34  async with self.weather_apiweather_api:
35  return await self.weather_apiweather_api.get_all_data()
36  except ClientResponseError as err:
37  if err.status == 401:
38  raise ConfigEntryAuthFailed(err) from err
39  raise UpdateFailed(f"Update failed: {err}") from err