Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Code to handle the Plenticore API."""
2 
3 from __future__ import annotations
4 
5 from collections import defaultdict
6 from collections.abc import Mapping
7 from datetime import datetime, timedelta
8 import logging
9 from typing import cast
10 
11 from aiohttp.client_exceptions import ClientError
12 from pykoplenti import (
13  ApiClient,
14  ApiException,
15  AuthenticationException,
16  ExtendedApiClient,
17 )
18 
19 from homeassistant.const import CONF_HOST, CONF_PASSWORD, EVENT_HOMEASSISTANT_STOP
20 from homeassistant.core import CALLBACK_TYPE, HomeAssistant
21 from homeassistant.exceptions import ConfigEntryNotReady
22 from homeassistant.helpers.aiohttp_client import async_get_clientsession
23 from homeassistant.helpers.device_registry import DeviceInfo
24 from homeassistant.helpers.event import async_call_later
25 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
26 
27 from .const import DOMAIN
28 from .helper import get_hostname_id
29 
30 _LOGGER = logging.getLogger(__name__)
31 
32 
33 class Plenticore:
34  """Manages the Plenticore API."""
35 
36  def __init__(self, hass, config_entry):
37  """Create a new plenticore manager instance."""
38  self.hasshass = hass
39  self.config_entryconfig_entry = config_entry
40 
41  self._client_client = None
42  self._shutdown_remove_listener_shutdown_remove_listener = None
43 
44  self.device_infodevice_info = {}
45 
46  @property
47  def host(self) -> str:
48  """Return the host of the Plenticore inverter."""
49  return self.config_entryconfig_entry.data[CONF_HOST]
50 
51  @property
52  def client(self) -> ApiClient:
53  """Return the Plenticore API client."""
54  return self._client_client
55 
56  async def async_setup(self) -> bool:
57  """Set up Plenticore API client."""
58  self._client_client = ExtendedApiClient(
59  async_get_clientsession(self.hasshass), host=self.hosthost
60  )
61  try:
62  await self._client_client.login(self.config_entryconfig_entry.data[CONF_PASSWORD])
63  except AuthenticationException as err:
64  _LOGGER.error(
65  "Authentication exception connecting to %s: %s", self.hosthost, err
66  )
67  return False
68  except (ClientError, TimeoutError) as err:
69  _LOGGER.error("Error connecting to %s", self.hosthost)
70  raise ConfigEntryNotReady from err
71  else:
72  _LOGGER.debug("Log-in successfully to %s", self.hosthost)
73 
74  self._shutdown_remove_listener_shutdown_remove_listener = self.hasshass.bus.async_listen_once(
75  EVENT_HOMEASSISTANT_STOP, self._async_shutdown_async_shutdown
76  )
77 
78  # get some device meta data
79  hostname_id = await get_hostname_id(self._client_client)
80  settings = await self._client_client.get_setting_values(
81  {
82  "devices:local": [
83  "Properties:SerialNo",
84  "Branding:ProductName1",
85  "Branding:ProductName2",
86  "Properties:VersionIOC",
87  "Properties:VersionMC",
88  ],
89  "scb:network": [hostname_id],
90  }
91  )
92 
93  device_local = settings["devices:local"]
94  prod1 = device_local["Branding:ProductName1"]
95  prod2 = device_local["Branding:ProductName2"]
96 
97  self.device_infodevice_info = DeviceInfo(
98  configuration_url=f"http://{self.host}",
99  identifiers={(DOMAIN, device_local["Properties:SerialNo"])},
100  manufacturer="Kostal",
101  model=f"{prod1} {prod2}",
102  name=settings["scb:network"][hostname_id],
103  sw_version=(
104  f'IOC: {device_local["Properties:VersionIOC"]}'
105  f' MC: {device_local["Properties:VersionMC"]}'
106  ),
107  )
108 
109  return True
110 
111  async def _async_shutdown(self, event):
112  """Call from Homeassistant shutdown event."""
113  # unset remove listener otherwise calling it would raise an exception
114  self._shutdown_remove_listener_shutdown_remove_listener = None
115  await self.async_unloadasync_unload()
116 
117  async def async_unload(self) -> None:
118  """Unload the Plenticore API client."""
119  if self._shutdown_remove_listener_shutdown_remove_listener:
120  self._shutdown_remove_listener_shutdown_remove_listener()
121 
122  await self._client_client.logout()
123  self._client_client = None
124  _LOGGER.debug("Logged out from %s", self.hosthost)
125 
126 
128  """Base implementation for read and write data."""
129 
130  _plenticore: Plenticore
131  name: str
132 
133  async def async_read_data(
134  self, module_id: str, data_id: str
135  ) -> Mapping[str, Mapping[str, str]] | None:
136  """Read data from Plenticore."""
137  if (client := self._plenticore.client) is None:
138  return None
139 
140  try:
141  return await client.get_setting_values(module_id, data_id)
142  except ApiException:
143  return None
144 
145  async def async_write_data(self, module_id: str, value: dict[str, str]) -> bool:
146  """Write settings back to Plenticore."""
147  if (client := self._plenticore.client) is None:
148  return False
149 
150  _LOGGER.debug(
151  "Setting value for %s in module %s to %s", self.name, module_id, value
152  )
153 
154  try:
155  await client.set_setting_values(module_id, value)
156  except ApiException:
157  return False
158 
159  return True
160 
161 
162 class PlenticoreUpdateCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
163  """Base implementation of DataUpdateCoordinator for Plenticore data."""
164 
165  def __init__(
166  self,
167  hass: HomeAssistant,
168  logger: logging.Logger,
169  name: str,
170  update_inverval: timedelta,
171  plenticore: Plenticore,
172  ) -> None:
173  """Create a new update coordinator for plenticore data."""
174  super().__init__(
175  hass=hass,
176  logger=logger,
177  name=name,
178  update_interval=update_inverval,
179  )
180  # data ids to poll
181  self._fetch: dict[str, list[str]] = defaultdict(list)
182  self._plenticore_plenticore = plenticore
183 
184  def start_fetch_data(self, module_id: str, data_id: str) -> CALLBACK_TYPE:
185  """Start fetching the given data (module-id and data-id)."""
186  self._fetch[module_id].append(data_id)
187 
188  # Force an update of all data. Multiple refresh calls
189  # are ignored by the debouncer.
190  async def force_refresh(event_time: datetime) -> None:
191  await self.async_request_refreshasync_request_refresh()
192 
193  return async_call_later(self.hasshass, 2, force_refresh)
194 
195  def stop_fetch_data(self, module_id: str, data_id: str) -> None:
196  """Stop fetching the given data (module-id and data-id)."""
197  self._fetch[module_id].remove(data_id)
198 
199 
201  PlenticoreUpdateCoordinator[Mapping[str, Mapping[str, str]]]
202 ):
203  """Implementation of PlenticoreUpdateCoordinator for process data."""
204 
205  async def _async_update_data(self) -> dict[str, dict[str, str]]:
206  client = self._plenticore_plenticore.client
207 
208  if not self._fetch or client is None:
209  return {}
210 
211  _LOGGER.debug("Fetching %s for %s", self.namename, self._fetch)
212 
213  fetched_data = await client.get_process_data_values(self._fetch)
214  return {
215  module_id: {
216  process_data.id: process_data.value
217  for process_data in fetched_data[module_id].values()
218  }
219  for module_id in fetched_data
220  }
221 
222 
224  PlenticoreUpdateCoordinator[Mapping[str, Mapping[str, str]]],
225  DataUpdateCoordinatorMixin,
226 ):
227  """Implementation of PlenticoreUpdateCoordinator for settings data."""
228 
229  async def _async_update_data(self) -> Mapping[str, Mapping[str, str]]:
230  client = self._plenticore_plenticore.client
231 
232  if not self._fetch or client is None:
233  return {}
234 
235  _LOGGER.debug("Fetching %s for %s", self.namename, self._fetch)
236 
237  return await client.get_setting_values(self._fetch)
238 
239 
240 class PlenticoreSelectUpdateCoordinator[_DataT](DataUpdateCoordinator[_DataT]):
241  """Base implementation of DataUpdateCoordinator for Plenticore data."""
242 
243  def __init__(
244  self,
245  hass: HomeAssistant,
246  logger: logging.Logger,
247  name: str,
248  update_inverval: timedelta,
249  plenticore: Plenticore,
250  ) -> None:
251  """Create a new update coordinator for plenticore data."""
252  super().__init__(
253  hass=hass,
254  logger=logger,
255  name=name,
256  update_interval=update_inverval,
257  )
258  # data ids to poll
259  self._fetch: dict[str, list[str | list[str]]] = defaultdict(list)
260  self._plenticore_plenticore = plenticore
261 
263  self, module_id: str, data_id: str, all_options: list[str]
264  ) -> CALLBACK_TYPE:
265  """Start fetching the given data (module-id and entry-id)."""
266  self._fetch[module_id].append(data_id)
267  self._fetch[module_id].append(all_options)
268 
269  # Force an update of all data. Multiple refresh calls
270  # are ignored by the debouncer.
271  async def force_refresh(event_time: datetime) -> None:
272  await self.async_request_refreshasync_request_refresh()
273 
274  return async_call_later(self.hasshass, 2, force_refresh)
275 
277  self, module_id: str, data_id: str, all_options: list[str]
278  ) -> None:
279  """Stop fetching the given data (module-id and entry-id)."""
280  self._fetch[module_id].remove(all_options)
281  self._fetch[module_id].remove(data_id)
282 
283 
285  PlenticoreSelectUpdateCoordinator[dict[str, dict[str, str]]],
286  DataUpdateCoordinatorMixin,
287 ):
288  """Implementation of PlenticoreUpdateCoordinator for select data."""
289 
290  async def _async_update_data(self) -> dict[str, dict[str, str]]:
291  if self._plenticore_plenticore.client is None:
292  return {}
293 
294  _LOGGER.debug("Fetching select %s for %s", self.namename, self._fetch)
295 
296  return await self._async_get_current_option_async_get_current_option(self._fetch)
297 
299  self,
300  module_id: dict[str, list[str | list[str]]],
301  ) -> dict[str, dict[str, str]]:
302  """Get current option."""
303  for mid, pids in module_id.items():
304  all_options = cast(list[str], pids[1])
305  for all_option in all_options:
306  if all_option == "None" or not (
307  val := await self.async_read_dataasync_read_data(mid, all_option)
308  ):
309  continue
310  for option in val.values():
311  if option[all_option] == "1":
312  return {mid: {cast(str, pids[0]): all_option}}
313 
314  return {mid: {cast(str, pids[0]): "None"}}
315  return {}
Mapping[str, Mapping[str, str]]|None async_read_data(self, str module_id, str data_id)
Definition: coordinator.py:135
CALLBACK_TYPE start_fetch_data(self, str module_id, str data_id, list[str] all_options)
Definition: coordinator.py:264
None __init__(self, HomeAssistant hass, logging.Logger logger, str name, timedelta update_inverval, Plenticore plenticore)
Definition: coordinator.py:250
None stop_fetch_data(self, str module_id, str data_id, list[str] all_options)
Definition: coordinator.py:278
None __init__(self, HomeAssistant hass, logging.Logger logger, str name, timedelta update_inverval, Plenticore plenticore)
Definition: coordinator.py:172
dict[str, dict[str, str]] _async_get_current_option(self, dict[str, list[str|list[str]]] module_id)
Definition: coordinator.py:301
bool remove(self, _T matcher)
Definition: match.py:214
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597