Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for fetching data from fitbit API."""
2 
3 import asyncio
4 from dataclasses import dataclass
5 import datetime
6 import logging
7 from typing import Final
8 
9 from homeassistant.core import HomeAssistant
10 from homeassistant.exceptions import ConfigEntryAuthFailed
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .api import FitbitApi
14 from .exceptions import FitbitApiException, FitbitAuthException
15 from .model import FitbitDevice
16 
17 _LOGGER = logging.getLogger(__name__)
18 
19 UPDATE_INTERVAL: Final = datetime.timedelta(minutes=30)
20 TIMEOUT = 10
21 
22 
23 class FitbitDeviceCoordinator(DataUpdateCoordinator[dict[str, FitbitDevice]]):
24  """Coordinator for fetching fitbit devices from the API."""
25 
26  def __init__(self, hass: HomeAssistant, api: FitbitApi) -> None:
27  """Initialize FitbitDeviceCoordinator."""
28  super().__init__(hass, _LOGGER, name="Fitbit", update_interval=UPDATE_INTERVAL)
29  self._api_api = api
30 
31  async def _async_update_data(self) -> dict[str, FitbitDevice]:
32  """Fetch data from API endpoint."""
33  async with asyncio.timeout(TIMEOUT):
34  try:
35  devices = await self._api_api.async_get_devices()
36  except FitbitAuthException as err:
37  raise ConfigEntryAuthFailed(err) from err
38  except FitbitApiException as err:
39  raise UpdateFailed(err) from err
40  return {device.id: device for device in devices}
41 
42 
43 @dataclass
44 class FitbitData:
45  """Config Entry global data."""
46 
47  api: FitbitApi
48  device_coordinator: FitbitDeviceCoordinator | None
None __init__(self, HomeAssistant hass, FitbitApi api)
Definition: coordinator.py:26
Generator[ProtectAdoptableDeviceModel] async_get_devices(Bootstrap bootstrap, Iterable[ModelType] model_type)
Definition: utils.py:85