Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """DataUpdateCoordinators for the Fronius integration."""
2 
3 from __future__ import annotations
4 
5 from abc import ABC, abstractmethod
6 from datetime import timedelta
7 from typing import TYPE_CHECKING, Any
8 
9 from pyfronius import BadStatusError, FroniusError
10 
11 from homeassistant.core import callback
12 from homeassistant.helpers.entity_platform import AddEntitiesCallback
13 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
14 
15 from .const import (
16  SOLAR_NET_ID_POWER_FLOW,
17  SOLAR_NET_ID_SYSTEM,
18  FroniusDeviceInfo,
19  SolarNetId,
20 )
21 from .sensor import (
22  INVERTER_ENTITY_DESCRIPTIONS,
23  LOGGER_ENTITY_DESCRIPTIONS,
24  METER_ENTITY_DESCRIPTIONS,
25  OHMPILOT_ENTITY_DESCRIPTIONS,
26  POWER_FLOW_ENTITY_DESCRIPTIONS,
27  STORAGE_ENTITY_DESCRIPTIONS,
28  FroniusSensorEntityDescription,
29 )
30 
31 if TYPE_CHECKING:
32  from . import FroniusSolarNet
33  from .sensor import _FroniusSensorEntity
34 
35 
37  ABC, DataUpdateCoordinator[dict[SolarNetId, dict[str, Any]]]
38 ):
39  """Query Fronius endpoint and keep track of seen conditions."""
40 
41  default_interval: timedelta
42  error_interval: timedelta
43  valid_descriptions: list[FroniusSensorEntityDescription]
44 
45  MAX_FAILED_UPDATES = 3
46 
47  def __init__(self, *args: Any, solar_net: FroniusSolarNet, **kwargs: Any) -> None:
48  """Set up the FroniusCoordinatorBase class."""
49  self._failed_update_count_failed_update_count = 0
50  self.solar_netsolar_net = solar_net
51  # unregistered_descriptors are used to create entities in platform module
52  self.unregistered_descriptors: dict[
53  SolarNetId, list[FroniusSensorEntityDescription]
54  ] = {}
55  super().__init__(*args, update_interval=self.default_interval, **kwargs)
56 
57  @abstractmethod
58  async def _update_method(self) -> dict[SolarNetId, Any]:
59  """Return data per solar net id from pyfronius."""
60 
61  async def _async_update_data(self) -> dict[SolarNetId, Any]:
62  """Fetch the latest data from the source."""
63  async with self.solar_net.coordinator_lock:
64  try:
65  data = await self._update_method()
66  except FroniusError as err:
67  self._failed_update_count += 1
68  if self._failed_update_count == self.MAX_FAILED_UPDATES:
69  self.update_intervalupdate_intervalupdate_intervalupdate_intervalupdate_interval = self.error_interval
70  raise UpdateFailed(err) from err
71 
72  if self._failed_update_count_failed_update_count != 0:
73  self._failed_update_count_failed_update_count = 0
74  self.update_intervalupdate_intervalupdate_intervalupdate_intervalupdate_interval = self.default_interval
75 
76  for solar_net_id in data:
77  if solar_net_id not in self.unregistered_descriptors:
78  # id seen for the first time
79  self.unregistered_descriptors[solar_net_id] = (
80  self.valid_descriptions.copy()
81  )
82  return data
83 
84  @callback
85  def add_entities_for_seen_keys[_FroniusEntityT: _FroniusSensorEntity](
86  self,
87  async_add_entities: AddEntitiesCallback,
88  entity_constructor: type[_FroniusEntityT],
89  ) -> None:
90  """Add entities for received keys and registers listener for future seen keys.
91 
92  Called from a platforms `async_setup_entry`.
93  """
94 
95  @callback
97  """Add entities for keys seen for the first time."""
98  new_entities: list[_FroniusEntityT] = []
99  for solar_net_id, device_data in self.datadata.items():
100  remaining_unregistered_descriptors = []
101  for description in self.unregistered_descriptors[solar_net_id]:
102  key = description.response_key or description.key
103  if key not in device_data:
104  remaining_unregistered_descriptors.append(description)
105  continue
106  if device_data[key]["value"] is None:
107  remaining_unregistered_descriptors.append(description)
108  continue
109  new_entities.append(
110  entity_constructor(
111  coordinator=self,
112  description=description,
113  solar_net_id=solar_net_id,
114  )
115  )
116  self.unregistered_descriptors[solar_net_id] = (
117  remaining_unregistered_descriptors
118  )
119  async_add_entities(new_entities)
120 
122  self.solar_netsolar_net.config_entry.async_on_unload(
123  self.async_add_listenerasync_add_listenerasync_add_listener(_add_entities_for_unregistered_descriptors)
124  )
125 
126 
128  """Query Fronius device inverter endpoint and keep track of seen conditions."""
129 
130  default_interval = timedelta(minutes=1)
131  error_interval = timedelta(minutes=10)
132  valid_descriptions = INVERTER_ENTITY_DESCRIPTIONS
133 
134  SILENT_RETRIES = 3
135 
136  def __init__(
137  self, *args: Any, inverter_info: FroniusDeviceInfo, **kwargs: Any
138  ) -> None:
139  """Set up a Fronius inverter device scope coordinator."""
140  super().__init__(*args, **kwargs)
141  self.inverter_infoinverter_info = inverter_info
142 
143  async def _update_method(self) -> dict[SolarNetId, Any]:
144  """Return data per solar net id from pyfronius."""
145  # almost 1% of `current_inverter_data` requests on Symo devices result in
146  # `BadStatusError Code: 8 - LNRequestTimeout` due to flaky internal
147  # communication between the logger and the inverter.
148  for silent_retry in range(self.SILENT_RETRIESSILENT_RETRIES):
149  try:
150  data = await self.solar_netsolar_net.fronius.current_inverter_data(
151  self.inverter_infoinverter_info.solar_net_id
152  )
153  except BadStatusError:
154  if silent_retry == (self.SILENT_RETRIESSILENT_RETRIES - 1):
155  raise
156  continue
157  break
158  # wrap a single devices data in a dict with solar_net_id key for
159  # FroniusCoordinatorBase _async_update_data and add_entities_for_seen_keys
160  return {self.inverter_infoinverter_info.solar_net_id: data}
161 
162 
164  """Query Fronius logger info endpoint and keep track of seen conditions."""
165 
166  default_interval = timedelta(hours=1)
167  error_interval = timedelta(hours=1)
168  valid_descriptions = LOGGER_ENTITY_DESCRIPTIONS
169 
170  async def _update_method(self) -> dict[SolarNetId, Any]:
171  """Return data per solar net id from pyfronius."""
172  data = await self.solar_netsolar_net.fronius.current_logger_info()
173  return {SOLAR_NET_ID_SYSTEM: data}
174 
175 
177  """Query Fronius system meter endpoint and keep track of seen conditions."""
178 
179  default_interval = timedelta(minutes=1)
180  error_interval = timedelta(minutes=10)
181  valid_descriptions = METER_ENTITY_DESCRIPTIONS
182 
183  async def _update_method(self) -> dict[SolarNetId, Any]:
184  """Return data per solar net id from pyfronius."""
185  data = await self.solar_netsolar_net.fronius.current_system_meter_data()
186  return data["meters"] # type: ignore[no-any-return]
187 
188 
190  """Query Fronius Ohmpilots and keep track of seen conditions."""
191 
192  default_interval = timedelta(minutes=1)
193  error_interval = timedelta(minutes=10)
194  valid_descriptions = OHMPILOT_ENTITY_DESCRIPTIONS
195 
196  async def _update_method(self) -> dict[SolarNetId, Any]:
197  """Return data per solar net id from pyfronius."""
198  data = await self.solar_netsolar_net.fronius.current_system_ohmpilot_data()
199  return data["ohmpilots"] # type: ignore[no-any-return]
200 
201 
203  """Query Fronius power flow endpoint and keep track of seen conditions."""
204 
205  default_interval = timedelta(seconds=10)
206  error_interval = timedelta(minutes=3)
207  valid_descriptions = POWER_FLOW_ENTITY_DESCRIPTIONS
208 
209  async def _update_method(self) -> dict[SolarNetId, Any]:
210  """Return data per solar net id from pyfronius."""
211  data = await self.solar_netsolar_net.fronius.current_power_flow()
212  return {SOLAR_NET_ID_POWER_FLOW: data}
213 
214 
216  """Query Fronius system storage endpoint and keep track of seen conditions."""
217 
218  default_interval = timedelta(minutes=1)
219  error_interval = timedelta(minutes=10)
220  valid_descriptions = STORAGE_ENTITY_DESCRIPTIONS
221 
222  async def _update_method(self) -> dict[SolarNetId, Any]:
223  """Return data per solar net id from pyfronius."""
224  data = await self.solar_netsolar_net.fronius.current_system_storage_data()
225  return data["storages"] # type: ignore[no-any-return]
None __init__(self, *Any args, FroniusSolarNet solar_net, **Any kwargs)
Definition: coordinator.py:47
None __init__(self, *Any args, FroniusDeviceInfo inverter_info, **Any kwargs)
Definition: coordinator.py:138
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)
Callable[[], None] async_add_listener(self, CALLBACK_TYPE update_callback, Any context=None)