Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Custom DataUpdateCoordinator for the laundrify integration."""
2 
3 import asyncio
4 from datetime import timedelta
5 import logging
6 
7 from laundrify_aio import LaundrifyAPI, LaundrifyDevice
8 from laundrify_aio.exceptions import ApiConnectionException, UnauthorizedException
9 
10 from homeassistant.core import HomeAssistant
11 from homeassistant.exceptions import ConfigEntryAuthFailed
12 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
13 
14 from .const import DOMAIN, REQUEST_TIMEOUT
15 
16 _LOGGER = logging.getLogger(__name__)
17 
18 
19 class LaundrifyUpdateCoordinator(DataUpdateCoordinator[dict[str, LaundrifyDevice]]):
20  """Class to manage fetching laundrify API data."""
21 
22  def __init__(
23  self, hass: HomeAssistant, laundrify_api: LaundrifyAPI, poll_interval: int
24  ) -> None:
25  """Initialize laundrify coordinator."""
26  super().__init__(
27  hass,
28  _LOGGER,
29  name=DOMAIN,
30  update_interval=timedelta(seconds=poll_interval),
31  )
32  self.laundrify_apilaundrify_api = laundrify_api
33 
34  async def _async_update_data(self) -> dict[str, LaundrifyDevice]:
35  """Fetch data from laundrify API."""
36  try:
37  # Note: TimeoutError and aiohttp.ClientError are already
38  # handled by the data update coordinator.
39  async with asyncio.timeout(REQUEST_TIMEOUT):
40  return {m.id: m for m in await self.laundrify_apilaundrify_api.get_machines()}
41  except UnauthorizedException as err:
42  # Raising ConfigEntryAuthFailed will cancel future updates
43  # and start a config flow with SOURCE_REAUTH (async_step_reauth)
44  raise ConfigEntryAuthFailed from err
45  except ApiConnectionException as err:
46  raise UpdateFailed(f"Error communicating with API: {err}") from err
None __init__(self, HomeAssistant hass, LaundrifyAPI laundrify_api, int poll_interval)
Definition: coordinator.py:24