Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The Nibe Heat Pump coordinator."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections import defaultdict
7 from collections.abc import Callable, Iterable
8 from datetime import date, timedelta
9 from typing import Any
10 
11 from nibe.coil import Coil, CoilData
12 from nibe.connection import Connection
13 from nibe.exceptions import CoilNotFoundException, ReadException
14 from nibe.heatpump import HeatPump, Series
15 from propcache import cached_property
16 
17 from homeassistant.config_entries import ConfigEntry
18 from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
19 from homeassistant.helpers.device_registry import DeviceInfo
20 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
21 
22 from .const import DOMAIN, LOGGER
23 
24 
25 class ContextCoordinator[_DataTypeT, _ContextTypeT](DataUpdateCoordinator[_DataTypeT]):
26  """Update coordinator with context adjustments."""
27 
28  @cached_property
29  def context_callbacks(self) -> dict[_ContextTypeT, list[CALLBACK_TYPE]]:
30  """Return a dict of all callbacks registered for a given context."""
31  callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list)
32  for update_callback, context in list(self._listeners.values()):
33  assert isinstance(context, set)
34  for address in context:
35  callbacks[address].append(update_callback)
36  return callbacks
37 
38  @callback
39  def async_update_context_listeners(self, contexts: Iterable[_ContextTypeT]) -> None:
40  """Update all listeners given a set of contexts."""
41  update_callbacks: set[CALLBACK_TYPE] = set()
42  for context in contexts:
43  update_callbacks.update(self.context_callbackscontext_callbacks.get(context, []))
44 
45  for update_callback in update_callbacks:
46  update_callback()
47 
48  @callback
50  self, update_callback: CALLBACK_TYPE, context: Any = None
51  ) -> Callable[[], None]:
52  """Wrap standard function to prune cached callback database."""
53  assert isinstance(context, set)
54  context -= {None}
55  release = super().async_add_listener(update_callback, context)
56  self.__dict__.pop("context_callbacks", None)
57 
58  @callback
59  def release_update():
60  release()
61  self.__dict__.pop("context_callbacks", None)
62 
63  return release_update
64 
65 
66 class CoilCoordinator(ContextCoordinator[dict[int, CoilData], int]):
67  """Update coordinator for nibe heat pumps."""
68 
69  config_entry: ConfigEntry
70 
71  def __init__(
72  self,
73  hass: HomeAssistant,
74  heatpump: HeatPump,
75  connection: Connection,
76  ) -> None:
77  """Initialize coordinator."""
78  super().__init__(
79  hass, LOGGER, name="Nibe Heat Pump", update_interval=timedelta(seconds=60)
80  )
81 
82  self.datadatadata = {}
83  self.seed: dict[int, CoilData] = {}
84  self.connectionconnection = connection
85  self.heatpumpheatpump = heatpump
86  self.tasktask: asyncio.Task | None = None
87 
88  heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self._on_coil_update_on_coil_update)
89 
90  def _on_coil_update(self, data: CoilData):
91  """Handle callback on coil updates."""
92  coil = data.coil
93  self.datadatadata[coil.address] = data
94  self.seed[coil.address] = data
95  self.async_update_context_listenersasync_update_context_listeners([coil.address])
96 
97  @property
98  def series(self) -> Series:
99  """Return which series of pump we are connected to."""
100  return self.heatpumpheatpump.series
101 
102  @property
103  def coils(self) -> list[Coil]:
104  """Return the full coil database."""
105  return self.heatpumpheatpump.get_coils()
106 
107  @property
108  def unique_id(self) -> str:
109  """Return unique id for this coordinator."""
110  return self.config_entryconfig_entry.unique_id or self.config_entryconfig_entry.entry_id
111 
112  @property
113  def device_info(self) -> DeviceInfo:
114  """Return device information for the main device."""
115  return DeviceInfo(identifiers={(DOMAIN, self.unique_idunique_id)})
116 
117  def get_coil_value(self, coil: Coil) -> int | str | float | date | None:
118  """Return a coil with data and check for validity."""
119  if coil_with_data := self.datadatadata.get(coil.address):
120  return coil_with_data.value
121  return None
122 
123  def get_coil_float(self, coil: Coil) -> float | None:
124  """Return a coil with float and check for validity."""
125  if value := self.get_coil_valueget_coil_value(coil):
126  return float(value) # type: ignore[arg-type]
127  return None
128 
129  async def async_write_coil(self, coil: Coil, value: float | str) -> None:
130  """Write coil and update state."""
131  data = CoilData(coil, value)
132  await self.connectionconnection.write_coil(data)
133 
134  self.datadatadata[coil.address] = data
135 
136  self.async_update_context_listenersasync_update_context_listeners([coil.address])
137 
138  async def async_read_coil(self, coil: Coil) -> CoilData:
139  """Read coil and update state using callbacks."""
140  return await self.connectionconnection.read_coil(coil)
141 
142  async def _async_update_data(self) -> dict[int, CoilData]:
143  self.tasktask = asyncio.current_task()
144  try:
145  return await self._async_update_data_internal_async_update_data_internal()
146  finally:
147  self.tasktask = None
148 
149  async def _async_update_data_internal(self) -> dict[int, CoilData]:
150  result: dict[int, CoilData] = {}
151 
152  def _get_coils() -> Iterable[Coil]:
153  for address in sorted(self.context_callbackscontext_callbacks.keys()):
154  if seed := self.seed.pop(address, None):
155  self.loggerlogger.debug("Skipping seeded coil: %d", address)
156  result[address] = seed
157  continue
158 
159  try:
160  coil = self.heatpumpheatpump.get_coil_by_address(address)
161  except CoilNotFoundException as exception:
162  self.loggerlogger.debug("Skipping missing coil: %s", exception)
163  continue
164  yield coil
165 
166  try:
167  async for data in self.connectionconnection.read_coils(_get_coils()):
168  result[data.coil.address] = data
169  self.seed.pop(data.coil.address, None)
170  except ReadException as exception:
171  if not result:
172  raise UpdateFailed(f"Failed to update: {exception}") from exception
173  self.loggerlogger.debug(
174  "Some coils failed to update, and may be unsupported: %s", exception
175  )
176 
177  return result
178 
179  async def async_shutdown(self):
180  """Make sure a coordinator is shut down as well as it's connection."""
181  await super().async_shutdown()
182  if self.tasktask:
183  self.tasktask.cancel()
184  await asyncio.wait((self.tasktask,))
185  await self.connectionconnection.stop()
int|str|float|date|None get_coil_value(self, Coil coil)
Definition: coordinator.py:117
None __init__(self, HomeAssistant hass, HeatPump heatpump, Connection connection)
Definition: coordinator.py:76
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Definition: coordinator.py:51
None async_update_context_listeners(self, Iterable[_ContextTypeT] contexts)
Definition: coordinator.py:39
dict[_ContextTypeT, list[CALLBACK_TYPE]] context_callbacks(self)
Definition: coordinator.py:29
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88