1 """Data update coordination for Rainforest RAVEn devices."""
3 from __future__
import annotations
6 from dataclasses
import asdict
7 from datetime
import timedelta
11 from aioraven.data
import DeviceInfo
as RAVEnDeviceInfo
12 from aioraven.device
import RAVEnConnectionError
13 from aioraven.serial
import RAVEnSerialDevice
21 from .const
import DOMAIN
23 type RAVEnConfigEntry = ConfigEntry[RAVEnDataCoordinator]
25 _LOGGER = logging.getLogger(__name__)
29 device: RAVEnSerialDevice, meter: bytes
30 ) -> dict[str, dict[str, Any]]:
33 sum_info = await device.get_current_summation_delivered(meter=meter)
34 demand_info = await device.get_instantaneous_demand(meter=meter)
35 price_info = await device.get_current_price(meter=meter)
37 if sum_info
and sum_info.meter_mac_id == meter:
38 data[
"CurrentSummationDelivered"] = asdict(sum_info)
40 if demand_info
and demand_info.meter_mac_id == meter:
41 data[
"InstantaneousDemand"] = asdict(demand_info)
43 if price_info
and price_info.meter_mac_id == meter:
44 data[
"PriceCluster"] = asdict(price_info)
50 device: RAVEnSerialDevice, meter_macs: list[str]
51 ) -> dict[str, dict[str, Any]]:
52 data: dict[str, dict[str, Any]] = {
"Meters": {}}
54 for meter_mac
in meter_macs:
56 device, bytes.fromhex(meter_mac)
59 network_info = await device.get_network_info()
61 if network_info
and network_info.link_strength:
62 data[
"NetworkInfo"] = asdict(network_info)
68 """Communication coordinator for a Rainforest RAVEn device."""
70 _raven_device: RAVEnSerialDevice |
None =
None
71 _device_info: RAVEnDeviceInfo |
None =
None
72 config_entry: RAVEnConfigEntry
74 def __init__(self, hass: HomeAssistant, entry: RAVEnConfigEntry) ->
None:
75 """Initialize the data object."""
86 """Return the MAC address of the device."""
93 """Return device info."""
98 identifiers={(DOMAIN, mac_address)},
99 manufacturer=device_info.manufacturer,
100 model=device_info.model_id,
101 model_id=device_info.model_id,
103 sw_version=device_info.fw_version,
104 hw_version=device_info.hw_version,
109 """Shutdown the coordinator."""
116 async
with asyncio.timeout(5):
118 except RAVEnConnectionError
as err:
120 raise UpdateFailed(f
"RAVEnConnectionError: {err}")
from err
127 if device
is not None:
134 device = RAVEnSerialDevice(self.
config_entryconfig_entry.data[CONF_DEVICE])
137 async
with asyncio.timeout(5):
139 await device.synchronize()
None async_shutdown(self)
DeviceInfo|None device_info(self)
str|None device_mac_address(self)
None __init__(self, HomeAssistant hass, RAVEnConfigEntry entry)
None _cleanup_device(self)
RAVEnSerialDevice _get_device(self)
dict[str, Any] _async_update_data(self)
dict[str, dict[str, Any]] _get_all_data(RAVEnSerialDevice device, list[str] meter_macs)
dict[str, dict[str, Any]] _get_meter_data(RAVEnSerialDevice device, bytes meter)