1 """The Nibe Heat Pump coordinator."""
3 from __future__
import annotations
6 from collections
import defaultdict
7 from collections.abc
import Callable, Iterable
8 from datetime
import date, timedelta
11 from nibe.coil
import Coil, CoilData
12 from nibe.connection
import Connection
13 from nibe.exceptions
import CoilNotFoundException, ReadException
14 from nibe.heatpump
import HeatPump, Series
15 from propcache
import cached_property
22 from .const
import DOMAIN, LOGGER
26 """Update coordinator with context adjustments."""
30 """Return a dict of all callbacks registered for a given context."""
31 callbacks: dict[_ContextTypeT, list[CALLBACK_TYPE]] = defaultdict(list)
32 for update_callback, context
in list(self._listeners.values()):
33 assert isinstance(context, set)
34 for address
in context:
35 callbacks[address].append(update_callback)
40 """Update all listeners given a set of contexts."""
41 update_callbacks: set[CALLBACK_TYPE] = set()
42 for context
in contexts:
45 for update_callback
in update_callbacks:
50 self, update_callback: CALLBACK_TYPE, context: Any =
None
51 ) -> Callable[[],
None]:
52 """Wrap standard function to prune cached callback database."""
53 assert isinstance(context, set)
56 self.__dict__.pop(
"context_callbacks",
None)
61 self.__dict__.pop(
"context_callbacks",
None)
67 """Update coordinator for nibe heat pumps."""
69 config_entry: ConfigEntry
75 connection: Connection,
77 """Initialize coordinator."""
79 hass, LOGGER, name=
"Nibe Heat Pump", update_interval=
timedelta(seconds=60)
83 self.seed: dict[int, CoilData] = {}
86 self.
tasktask: asyncio.Task |
None =
None
88 heatpump.subscribe(heatpump.COIL_UPDATE_EVENT, self.
_on_coil_update_on_coil_update)
91 """Handle callback on coil updates."""
93 self.
datadatadata[coil.address] = data
94 self.seed[coil.address] = data
99 """Return which series of pump we are connected to."""
104 """Return the full coil database."""
105 return self.
heatpumpheatpump.get_coils()
109 """Return unique id for this coordinator."""
114 """Return device information for the main device."""
118 """Return a coil with data and check for validity."""
119 if coil_with_data := self.
datadatadata.
get(coil.address):
120 return coil_with_data.value
124 """Return a coil with float and check for validity."""
130 """Write coil and update state."""
131 data = CoilData(coil, value)
132 await self.
connectionconnection.write_coil(data)
134 self.
datadatadata[coil.address] = data
139 """Read coil and update state using callbacks."""
140 return await self.
connectionconnection.read_coil(coil)
143 self.
tasktask = asyncio.current_task()
150 result: dict[int, CoilData] = {}
152 def _get_coils() -> Iterable[Coil]:
154 if seed := self.seed.pop(address,
None):
155 self.
loggerlogger.debug(
"Skipping seeded coil: %d", address)
156 result[address] = seed
160 coil = self.
heatpumpheatpump.get_coil_by_address(address)
161 except CoilNotFoundException
as exception:
162 self.
loggerlogger.debug(
"Skipping missing coil: %s", exception)
167 async
for data
in self.
connectionconnection.read_coils(_get_coils()):
168 result[data.coil.address] = data
169 self.seed.pop(data.coil.address,
None)
170 except ReadException
as exception:
172 raise UpdateFailed(f
"Failed to update: {exception}")
from exception
174 "Some coils failed to update, and may be unsupported: %s", exception
180 """Make sure a coordinator is shut down as well as it's connection."""
183 self.
tasktask.cancel()
184 await asyncio.wait((self.
tasktask,))
int|str|float|date|None get_coil_value(self, Coil coil)
None __init__(self, HomeAssistant hass, HeatPump heatpump, Connection connection)
None async_write_coil(self, Coil coil, float|str value)
CoilData async_read_coil(self, Coil coil)
DeviceInfo device_info(self)
def _on_coil_update(self, CoilData data)
float|None get_coil_float(self, Coil coil)
dict[int, CoilData] _async_update_data_internal(self)
dict[int, CoilData] _async_update_data(self)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
None async_update_context_listeners(self, Iterable[_ContextTypeT] contexts)
dict[_ContextTypeT, list[CALLBACK_TYPE]] context_callbacks(self)
web.Response get(self, web.Request request, str config_key)