Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Helpers to help coordinate updates."""
2 
3 from pypalazzetti.client import PalazzettiClient
4 from pypalazzetti.exceptions import CommunicationError, ValidationError
5 
6 from homeassistant.config_entries import ConfigEntry
7 from homeassistant.const import CONF_HOST
8 from homeassistant.core import HomeAssistant
9 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
10 
11 from .const import DOMAIN, LOGGER, SCAN_INTERVAL
12 
13 type PalazzettiConfigEntry = ConfigEntry[PalazzettiDataUpdateCoordinator]
14 
15 
17  """Class to manage fetching Palazzetti data from a Palazzetti hub."""
18 
19  config_entry: PalazzettiConfigEntry
20  client: PalazzettiClient
21 
22  def __init__(
23  self,
24  hass: HomeAssistant,
25  ) -> None:
26  """Initialize global Palazzetti data updater."""
27  super().__init__(
28  hass,
29  LOGGER,
30  name=DOMAIN,
31  update_interval=SCAN_INTERVAL,
32  )
33  self.clientclient = PalazzettiClient(self.config_entryconfig_entry.data[CONF_HOST])
34 
35  async def _async_setup(self) -> None:
36  try:
37  await self.clientclient.connect()
38  await self.clientclient.update_state()
39  except (CommunicationError, ValidationError) as err:
40  raise UpdateFailed(f"Error communicating with the API: {err}") from err
41 
42  async def _async_update_data(self) -> None:
43  """Fetch data from Palazzetti."""
44  try:
45  await self.clientclient.update_state()
46  except (CommunicationError, ValidationError) as err:
47  raise UpdateFailed(f"Error communicating with the API: {err}") from err