1 """Data update coordinator for the Nextcloud integration."""
6 from nextcloudmonitor
import NextcloudMonitor, NextcloudMonitorError
13 from .const
import DEFAULT_SCAN_INTERVAL
15 _LOGGER = logging.getLogger(__name__)
19 """Nextcloud data update coordinator."""
22 self, hass: HomeAssistant, ncm: NextcloudMonitor, entry: ConfigEntry
24 """Initialize the Nextcloud coordinator."""
26 self.
urlurl = entry.data[CONF_URL]
32 update_interval=DEFAULT_SCAN_INTERVAL,
37 self, api_data: dict, key_path: str =
"", leaf: bool =
False
39 """Use Recursion to discover data-points and values.
41 Get dictionary of data-points by recursing through dict returned by api until
42 the dictionary value does not contain another dictionary and use the
43 resulting path of dictionary keys and resulting value as the name/value
46 returns: dictionary of data-point/values
49 for key, value
in api_data.items():
50 if isinstance(value, dict):
57 elif key ==
"cpuload" and isinstance(value, list):
58 result[f
"{key_path}{key}_1"] = value[0]
59 result[f
"{key_path}{key}_5"] = value[1]
60 result[f
"{key_path}{key}_15"] = value[2]
63 result[f
"{key_path}{key}"] = value
68 """Fetch all Nextcloud data."""
73 except NextcloudMonitorError
as ex:
74 raise UpdateFailed
from ex
76 await self.
hasshass.async_add_executor_job(_update_data)
None __init__(self, HomeAssistant hass, NextcloudMonitor ncm, ConfigEntry entry)
dict[str, Any] _get_data_points(self, dict api_data, str key_path="", bool leaf=False)
dict[str, Any] _async_update_data(self)
IssData update(pyiss.ISS iss)
_ItemT _update_data(self, _ItemT item, dict update_data)