Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Component to control TOLO Sauna/Steam Bath."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import NamedTuple
8 
9 from tololib import ToloClient, ToloSettings, ToloStatus
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import CONF_HOST
13 from homeassistant.core import HomeAssistant
14 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
15 
16 from .const import DEFAULT_RETRY_COUNT, DEFAULT_RETRY_TIMEOUT
17 
18 _LOGGER = logging.getLogger(__name__)
19 
20 
21 class ToloSaunaData(NamedTuple):
22  """Compound class for reflecting full state (status and info) of a TOLO Sauna."""
23 
24  status: ToloStatus
25  settings: ToloSettings
26 
27 
29  """DataUpdateCoordinator for TOLO Sauna."""
30 
31  def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
32  """Initialize ToloSaunaUpdateCoordinator."""
33  self.clientclient = ToloClient(
34  address=entry.data[CONF_HOST],
35  retry_timeout=DEFAULT_RETRY_TIMEOUT,
36  retry_count=DEFAULT_RETRY_COUNT,
37  )
38  super().__init__(
39  hass=hass,
40  logger=_LOGGER,
41  name=f"{entry.title} ({entry.data[CONF_HOST]}) Data Update Coordinator",
42  update_interval=timedelta(seconds=5),
43  )
44 
45  async def _async_update_data(self) -> ToloSaunaData:
46  return await self.hasshass.async_add_executor_job(self._get_tolo_sauna_data_get_tolo_sauna_data)
47 
48  def _get_tolo_sauna_data(self) -> ToloSaunaData:
49  try:
50  status = self.clientclient.get_status()
51  settings = self.clientclient.get_settings()
52  except TimeoutError as error:
53  raise UpdateFailed("communication timeout") from error
54  return ToloSaunaData(status, settings)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
Definition: coordinator.py:31
def get_status(hass, host, port)
Definition: panel.py:387