Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Moehlenhoff Alpha2."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 import aiohttp
9 from moehlenhoff_alpha2 import Alpha2Base
10 
11 from homeassistant.core import HomeAssistant
12 from homeassistant.exceptions import HomeAssistantError
13 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
14 
15 _LOGGER = logging.getLogger(__name__)
16 
17 UPDATE_INTERVAL = timedelta(seconds=60)
18 
19 
21  """Keep the base instance in one place and centralize the update."""
22 
23  def __init__(self, hass: HomeAssistant, base: Alpha2Base) -> None:
24  """Initialize Alpha2Base data updater."""
25  self.basebase = base
26  super().__init__(
27  hass=hass,
28  logger=_LOGGER,
29  name="alpha2_base",
30  update_interval=UPDATE_INTERVAL,
31  )
32 
33  async def _async_update_data(self) -> dict[str, dict[str, dict]]:
34  """Fetch the latest data from the source."""
35  await self.basebase.update_data()
36  return {
37  "heat_areas": {ha["ID"]: ha for ha in self.basebase.heat_areas if ha.get("ID")},
38  "heat_controls": {
39  hc["ID"]: hc for hc in self.basebase.heat_controls if hc.get("ID")
40  },
41  "io_devices": {io["ID"]: io for io in self.basebase.io_devices if io.get("ID")},
42  }
43 
44  def get_cooling(self) -> bool:
45  """Return if cooling mode is enabled."""
46  return self.basebase.cooling
47 
48  async def async_set_cooling(self, enabled: bool) -> None:
49  """Enable or disable cooling mode."""
50  await self.basebase.set_cooling(enabled)
51  self.async_update_listenersasync_update_listeners()
52 
54  self, heat_area_id: str, target_temperature: float
55  ) -> None:
56  """Set the target temperature of the given heat area."""
57  _LOGGER.debug(
58  "Setting target temperature of heat area %s to %0.1f",
59  heat_area_id,
60  target_temperature,
61  )
62 
63  update_data = {"T_TARGET": target_temperature}
64  is_cooling = self.get_coolingget_cooling()
65  heat_area_mode = self.datadata["heat_areas"][heat_area_id]["HEATAREA_MODE"]
66  if heat_area_mode == 1:
67  if is_cooling:
68  update_data["T_COOL_DAY"] = target_temperature
69  else:
70  update_data["T_HEAT_DAY"] = target_temperature
71  elif heat_area_mode == 2:
72  if is_cooling:
73  update_data["T_COOL_NIGHT"] = target_temperature
74  else:
75  update_data["T_HEAT_NIGHT"] = target_temperature
76 
77  try:
78  await self.basebase.update_heat_area(heat_area_id, update_data)
79  except aiohttp.ClientError as http_err:
80  raise HomeAssistantError(
81  "Failed to set target temperature, communication error with alpha2 base"
82  ) from http_err
83  self.datadata["heat_areas"][heat_area_id].update(update_data)
84  self.async_update_listenersasync_update_listeners()
85 
87  self, heat_area_id: str, heat_area_mode: int
88  ) -> None:
89  """Set the mode of the given heat area."""
90  # HEATAREA_MODE: 0=Auto, 1=Tag, 2=Nacht
91  if heat_area_mode not in (0, 1, 2):
92  raise ValueError(f"Invalid heat area mode: {heat_area_mode}")
93  _LOGGER.debug(
94  "Setting mode of heat area %s to %d",
95  heat_area_id,
96  heat_area_mode,
97  )
98  try:
99  await self.basebase.update_heat_area(
100  heat_area_id, {"HEATAREA_MODE": heat_area_mode}
101  )
102  except aiohttp.ClientError as http_err:
103  raise HomeAssistantError(
104  "Failed to set heat area mode, communication error with alpha2 base"
105  ) from http_err
106 
107  self.datadata["heat_areas"][heat_area_id]["HEATAREA_MODE"] = heat_area_mode
108  is_cooling = self.get_coolingget_cooling()
109  if heat_area_mode == 1:
110  if is_cooling:
111  self.datadata["heat_areas"][heat_area_id]["T_TARGET"] = self.datadata[
112  "heat_areas"
113  ][heat_area_id]["T_COOL_DAY"]
114  else:
115  self.datadata["heat_areas"][heat_area_id]["T_TARGET"] = self.datadata[
116  "heat_areas"
117  ][heat_area_id]["T_HEAT_DAY"]
118  elif heat_area_mode == 2:
119  if is_cooling:
120  self.datadata["heat_areas"][heat_area_id]["T_TARGET"] = self.datadata[
121  "heat_areas"
122  ][heat_area_id]["T_COOL_NIGHT"]
123  else:
124  self.datadata["heat_areas"][heat_area_id]["T_TARGET"] = self.datadata[
125  "heat_areas"
126  ][heat_area_id]["T_HEAT_NIGHT"]
127 
128  self.async_update_listenersasync_update_listeners()
None async_set_target_temperature(self, str heat_area_id, float target_temperature)
Definition: coordinator.py:55
None async_set_heat_area_mode(self, str heat_area_id, int heat_area_mode)
Definition: coordinator.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33