Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data update coordinator for the Nextcloud integration."""
2 
3 import logging
4 from typing import Any
5 
6 from nextcloudmonitor import NextcloudMonitor, NextcloudMonitorError
7 
8 from homeassistant.config_entries import ConfigEntry
9 from homeassistant.const import CONF_URL
10 from homeassistant.core import HomeAssistant
11 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
12 
13 from .const import DEFAULT_SCAN_INTERVAL
14 
15 _LOGGER = logging.getLogger(__name__)
16 
17 
19  """Nextcloud data update coordinator."""
20 
21  def __init__(
22  self, hass: HomeAssistant, ncm: NextcloudMonitor, entry: ConfigEntry
23  ) -> None:
24  """Initialize the Nextcloud coordinator."""
25  self.ncmncm = ncm
26  self.urlurl = entry.data[CONF_URL]
27 
28  super().__init__(
29  hass,
30  _LOGGER,
31  name=self.urlurl,
32  update_interval=DEFAULT_SCAN_INTERVAL,
33  )
34 
35  # Use recursion to create list of sensors & values based on nextcloud api data
37  self, api_data: dict, key_path: str = "", leaf: bool = False
38  ) -> dict[str, Any]:
39  """Use Recursion to discover data-points and values.
40 
41  Get dictionary of data-points by recursing through dict returned by api until
42  the dictionary value does not contain another dictionary and use the
43  resulting path of dictionary keys and resulting value as the name/value
44  for the data-point.
45 
46  returns: dictionary of data-point/values
47  """
48  result = {}
49  for key, value in api_data.items():
50  if isinstance(value, dict):
51  if leaf:
52  key_path = f"{key}_"
53  if not leaf:
54  key_path += f"{key}_"
55  leaf = True
56  result.update(self._get_data_points_get_data_points(value, key_path, leaf))
57  elif key == "cpuload" and isinstance(value, list):
58  result[f"{key_path}{key}_1"] = value[0]
59  result[f"{key_path}{key}_5"] = value[1]
60  result[f"{key_path}{key}_15"] = value[2]
61  leaf = False
62  else:
63  result[f"{key_path}{key}"] = value
64  leaf = False
65  return result
66 
67  async def _async_update_data(self) -> dict[str, Any]:
68  """Fetch all Nextcloud data."""
69 
70  def _update_data() -> None:
71  try:
72  self.ncmncm.update()
73  except NextcloudMonitorError as ex:
74  raise UpdateFailed from ex
75 
76  await self.hasshass.async_add_executor_job(_update_data)
77  return self._get_data_points_get_data_points(self.ncmncm.data)
None __init__(self, HomeAssistant hass, NextcloudMonitor ncm, ConfigEntry entry)
Definition: coordinator.py:23
dict[str, Any] _get_data_points(self, dict api_data, str key_path="", bool leaf=False)
Definition: coordinator.py:38
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
_ItemT _update_data(self, _ItemT item, dict update_data)
Definition: collection.py:296