Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the venstar component."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from datetime import timedelta
7 
8 from requests import RequestException
9 from venstarcolortouch import VenstarColorTouch
10 
11 from homeassistant.core import HomeAssistant
12 from homeassistant.helpers import update_coordinator
13 
14 from .const import _LOGGER, DOMAIN, VENSTAR_SLEEP
15 
16 
17 class VenstarDataUpdateCoordinator(update_coordinator.DataUpdateCoordinator[None]):
18  """Class to manage fetching Venstar data."""
19 
20  def __init__(
21  self,
22  hass: HomeAssistant,
23  *,
24  venstar_connection: VenstarColorTouch,
25  ) -> None:
26  """Initialize global Venstar data updater."""
27  super().__init__(
28  hass,
29  _LOGGER,
30  name=DOMAIN,
31  update_interval=timedelta(seconds=60),
32  )
33  self.clientclient = venstar_connection
34  self.runtimesruntimes: list[dict[str, int]] = []
35 
36  async def _async_update_data(self) -> None:
37  """Update the state."""
38  try:
39  await self.hass.async_add_executor_job(self.clientclient.update_info)
40  except (OSError, RequestException) as ex:
41  raise update_coordinator.UpdateFailed(
42  f"Exception during Venstar info update: {ex}"
43  ) from ex
44 
45  # older venstars sometimes cannot handle rapid sequential connections
46  await asyncio.sleep(VENSTAR_SLEEP)
47 
48  try:
49  await self.hass.async_add_executor_job(self.clientclient.update_sensors)
50  except (OSError, RequestException) as ex:
51  raise update_coordinator.UpdateFailed(
52  f"Exception during Venstar sensor update: {ex}"
53  ) from ex
54 
55  # older venstars sometimes cannot handle rapid sequential connections
56  await asyncio.sleep(VENSTAR_SLEEP)
57 
58  try:
59  await self.hass.async_add_executor_job(self.clientclient.update_alerts)
60  except (OSError, RequestException) as ex:
61  raise update_coordinator.UpdateFailed(
62  f"Exception during Venstar alert update: {ex}"
63  ) from ex
64 
65  # older venstars sometimes cannot handle rapid sequential connections
66  await asyncio.sleep(VENSTAR_SLEEP)
67 
68  try:
69  self.runtimesruntimes = await self.hass.async_add_executor_job(
70  self.clientclient.get_runtimes
71  )
72  except (OSError, RequestException) as ex:
73  raise update_coordinator.UpdateFailed(
74  f"Exception during Venstar runtime update: {ex}"
75  ) from ex
None __init__(self, HomeAssistant hass, *VenstarColorTouch venstar_connection)
Definition: coordinator.py:25