Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for WS66i."""
2 
3 from __future__ import annotations
4 
5 import logging
6 
7 from pyws66i import WS66i, ZoneStatus
8 
9 from homeassistant.core import HomeAssistant
10 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
11 
12 from .const import POLL_INTERVAL
13 
14 _LOGGER = logging.getLogger(__name__)
15 
16 
18  """DataUpdateCoordinator to gather data for WS66i Zones."""
19 
20  def __init__(
21  self,
22  hass: HomeAssistant,
23  my_api: WS66i,
24  zones: list[int],
25  ) -> None:
26  """Initialize DataUpdateCoordinator to gather data for specific zones."""
27  super().__init__(
28  hass,
29  _LOGGER,
30  name="WS66i",
31  update_interval=POLL_INTERVAL,
32  )
33  self._ws66i_ws66i = my_api
34  self._zones_zones = zones
35 
36  def _update_all_zones(self) -> list[ZoneStatus]:
37  """Fetch data for each of the zones."""
38  data = []
39  for zone_id in self._zones_zones:
40  data_zone = self._ws66i_ws66i.zone_status(zone_id)
41  if data_zone is None:
42  raise UpdateFailed(f"Failed to update zone {zone_id}")
43 
44  data.append(data_zone)
45 
46  return data
47 
48  async def _async_update_data(self) -> list[ZoneStatus]:
49  """Fetch data for each of the zones."""
50  # The data that is returned here can be accessed through coordinator.data.
51  return await self.hasshass.async_add_executor_job(self._update_all_zones_update_all_zones)
None __init__(self, HomeAssistant hass, WS66i my_api, list[int] zones)
Definition: coordinator.py:25