Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Update coordinator for HomeWizard."""
2 
3 from __future__ import annotations
4 
5 import logging
6 
7 from homewizard_energy import HomeWizardEnergyV1
8 from homewizard_energy.errors import DisabledError, RequestError, UnsupportedError
9 from homewizard_energy.v1.const import SUPPORTS_IDENTIFY, SUPPORTS_STATE
10 from homewizard_energy.v1.models import Device
11 
12 from homeassistant.config_entries import ConfigEntry
13 from homeassistant.const import CONF_IP_ADDRESS
14 from homeassistant.core import HomeAssistant
15 from homeassistant.helpers.aiohttp_client import async_get_clientsession
16 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
17 
18 from .const import DOMAIN, UPDATE_INTERVAL, DeviceResponseEntry
19 
20 _LOGGER = logging.getLogger(__name__)
21 
22 
24  """Gather data for the energy device."""
25 
26  api: HomeWizardEnergyV1
27  api_disabled: bool = False
28 
29  _unsupported_error: bool = False
30 
31  config_entry: ConfigEntry
32 
33  def __init__(
34  self,
35  hass: HomeAssistant,
36  ) -> None:
37  """Initialize update coordinator."""
38  super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=UPDATE_INTERVAL)
39  self.apiapi = HomeWizardEnergyV1(
40  self.config_entryconfig_entry.data[CONF_IP_ADDRESS],
41  clientsession=async_get_clientsession(hass),
42  )
43 
44  async def _async_update_data(self) -> DeviceResponseEntry:
45  """Fetch all device and sensor data from api."""
46  try:
47  data = DeviceResponseEntry(
48  device=await self.apiapi.device(),
49  data=await self.apiapi.data(),
50  )
51 
52  try:
53  if self.supports_statesupports_state(data.device):
54  data.state = await self.apiapi.state()
55 
56  data.system = await self.apiapi.system()
57 
58  except UnsupportedError as ex:
59  # Old firmware, ignore
60  if not self._unsupported_error_unsupported_error:
61  self._unsupported_error_unsupported_error = True
62  _LOGGER.warning(
63  "%s is running an outdated firmware version (%s). Contact HomeWizard support to update your device",
64  self.config_entryconfig_entry.title,
65  ex,
66  )
67 
68  except RequestError as ex:
69  raise UpdateFailed(
70  ex, translation_domain=DOMAIN, translation_key="communication_error"
71  ) from ex
72 
73  except DisabledError as ex:
74  if not self.api_disabledapi_disabled:
75  self.api_disabledapi_disabled = True
76 
77  # Do not reload when performing first refresh
78  if self.datadatadata is not None:
79  # Reload config entry to let init flow handle retrying and trigger repair flow
80  self.hasshass.config_entries.async_schedule_reload(
81  self.config_entryconfig_entry.entry_id
82  )
83 
84  raise UpdateFailed(
85  ex, translation_domain=DOMAIN, translation_key="api_disabled"
86  ) from ex
87 
88  self.api_disabledapi_disabled = False
89 
90  self.datadatadata = data
91  return data
92 
93  def supports_state(self, device: Device | None = None) -> bool:
94  """Return True if the device supports state."""
95 
96  if device is None:
97  device = self.datadatadata.device
98 
99  return device.product_type in SUPPORTS_STATE
100 
101  def supports_identify(self, device: Device | None = None) -> bool:
102  """Return True if the device supports identify."""
103  if device is None:
104  device = self.datadatadata.device
105 
106  return device.product_type in SUPPORTS_IDENTIFY
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)
bool state(HomeAssistant hass, str|State|None entity, Any req_state, timedelta|None for_period=None, str|None attribute=None, TemplateVarsType variables=None)
Definition: condition.py:551