Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Data coordinator for the qnap integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 from typing import Any
8 
9 from qnapstats import QNAPStats
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import (
13  CONF_HOST,
14  CONF_PASSWORD,
15  CONF_PORT,
16  CONF_SSL,
17  CONF_TIMEOUT,
18  CONF_USERNAME,
19  CONF_VERIFY_SSL,
20 )
21 from homeassistant.core import HomeAssistant
22 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
23 
24 from .const import DOMAIN
25 
26 UPDATE_INTERVAL = timedelta(minutes=1)
27 
28 _LOGGER = logging.getLogger(__name__)
29 
30 
31 class QnapCoordinator(DataUpdateCoordinator[dict[str, dict[str, Any]]]):
32  """Custom coordinator for the qnap integration."""
33 
34  def __init__(self, hass: HomeAssistant, config_entry: ConfigEntry) -> None:
35  """Initialize the qnap coordinator."""
36  super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=UPDATE_INTERVAL)
37 
38  protocol = "https" if config_entry.data[CONF_SSL] else "http"
39  self._api_api = QNAPStats(
40  f"{protocol}://{config_entry.data.get(CONF_HOST)}",
41  config_entry.data.get(CONF_PORT),
42  config_entry.data.get(CONF_USERNAME),
43  config_entry.data.get(CONF_PASSWORD),
44  verify_ssl=config_entry.data.get(CONF_VERIFY_SSL),
45  timeout=config_entry.data.get(CONF_TIMEOUT),
46  )
47 
48  def _sync_update(self) -> dict[str, dict[str, Any]]:
49  """Get the latest data from the Qnap API."""
50  return {
51  "system_stats": self._api_api.get_system_stats(),
52  "system_health": self._api_api.get_system_health(),
53  "smart_drive_health": self._api_api.get_smart_disk_health(),
54  "volumes": self._api_api.get_volumes(),
55  "bandwidth": self._api_api.get_bandwidth(),
56  }
57 
58  async def _async_update_data(self) -> dict[str, dict[str, Any]]:
59  """Get the latest data from the Qnap API."""
60  return await self.hasshass.async_add_executor_job(self._sync_update_sync_update)
dict[str, dict[str, Any]] _async_update_data(self)
Definition: coordinator.py:58
None __init__(self, HomeAssistant hass, ConfigEntry config_entry)
Definition: coordinator.py:34