Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Climate device for CCM15 coordinator."""
2 
3 import datetime
4 import logging
5 
6 from ccm15 import CCM15Device, CCM15DeviceState, CCM15SlaveDevice
7 import httpx
8 
9 from homeassistant.components.climate import HVACMode
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import (
14  CONST_FAN_CMD_MAP,
15  CONST_STATE_CMD_MAP,
16  DEFAULT_INTERVAL,
17  DEFAULT_TIMEOUT,
18 )
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 
23 class CCM15Coordinator(DataUpdateCoordinator[CCM15DeviceState]):
24  """Class to coordinate multiple CCM15Climate devices."""
25 
26  def __init__(self, hass: HomeAssistant, host: str, port: int) -> None:
27  """Initialize the coordinator."""
28  super().__init__(
29  hass,
30  _LOGGER,
31  name=host,
32  update_interval=datetime.timedelta(seconds=DEFAULT_INTERVAL),
33  )
34  self._ccm15_ccm15 = CCM15Device(host, port, DEFAULT_TIMEOUT)
35  self._host_host = host
36 
37  def get_host(self) -> str:
38  """Get the host."""
39  return self._host_host
40 
41  async def _async_update_data(self) -> CCM15DeviceState:
42  """Fetch data from Rain Bird device."""
43  try:
44  return await self._fetch_data_fetch_data()
45  except httpx.RequestError as err: # pragma: no cover
46  raise UpdateFailed("Error communicating with Device") from err
47 
48  async def _fetch_data(self) -> CCM15DeviceState:
49  """Get the current status of all AC devices."""
50  return await self._ccm15_ccm15.get_status_async()
51 
52  async def async_set_state(self, ac_index: int, state: str, value: int) -> None:
53  """Set new target states."""
54  if await self._ccm15_ccm15.async_set_state(ac_index, state, value):
55  await self.async_request_refreshasync_request_refresh()
56 
57  def get_ac_data(self, ac_index: int) -> CCM15SlaveDevice | None:
58  """Get ac data from the ac_index."""
59  if ac_index < 0 or ac_index >= len(self.datadata.devices):
60  # Network latency may return an empty or incomplete array
61  return None
62  return self.datadata.devices[ac_index]
63 
64  async def async_set_hvac_mode(self, ac_index, hvac_mode: HVACMode) -> None:
65  """Set the hvac mode."""
66  _LOGGER.debug("Set Hvac[%s]='%s'", ac_index, str(hvac_mode))
67  await self.async_set_stateasync_set_state(ac_index, "mode", CONST_STATE_CMD_MAP[hvac_mode])
68 
69  async def async_set_fan_mode(self, ac_index, fan_mode: str) -> None:
70  """Set the fan mode."""
71  _LOGGER.debug("Set Fan[%s]='%s'", ac_index, fan_mode)
72  await self.async_set_stateasync_set_state(ac_index, "fan", CONST_FAN_CMD_MAP[fan_mode])
73 
74  async def async_set_temperature(self, ac_index, temp) -> None:
75  """Set the target temperature mode."""
76  _LOGGER.debug("Set Temp[%s]='%s'", ac_index, temp)
77  await self.async_set_stateasync_set_state(ac_index, "temp", temp)
None __init__(self, HomeAssistant hass, str host, int port)
Definition: coordinator.py:26
None async_set_fan_mode(self, ac_index, str fan_mode)
Definition: coordinator.py:69
None async_set_hvac_mode(self, ac_index, HVACMode hvac_mode)
Definition: coordinator.py:64
CCM15SlaveDevice|None get_ac_data(self, int ac_index)
Definition: coordinator.py:57
None async_set_state(self, int ac_index, str state, int value)
Definition: coordinator.py:52