Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Coordinator for the Arve integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 
7 from asyncarve import (
8  Arve,
9  ArveConnectionError,
10  ArveDeviceInfo,
11  ArveDevices,
12  ArveError,
13  ArveSensProData,
14 )
15 
16 from homeassistant.config_entries import ConfigEntry
17 from homeassistant.const import CONF_ACCESS_TOKEN, CONF_CLIENT_SECRET
18 from homeassistant.core import HomeAssistant
19 from homeassistant.helpers.aiohttp_client import async_get_clientsession
20 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
21 
22 from .const import DOMAIN, LOGGER
23 
24 type ArveConfigEntry = ConfigEntry[ArveCoordinator]
25 
26 
27 class ArveCoordinator(DataUpdateCoordinator[ArveSensProData]):
28  """Arve coordinator."""
29 
30  config_entry: ArveConfigEntry
31  devices: ArveDevices
32 
33  def __init__(self, hass: HomeAssistant) -> None:
34  """Initialize Arve coordinator."""
35  super().__init__(
36  hass,
37  LOGGER,
38  name=DOMAIN,
39  update_interval=timedelta(seconds=60),
40  )
41 
42  self.arvearve = Arve(
43  self.config_entryconfig_entry.data[CONF_ACCESS_TOKEN],
44  self.config_entryconfig_entry.data[CONF_CLIENT_SECRET],
45  session=async_get_clientsession(hass),
46  )
47 
48  async def _async_update_data(self) -> dict[str, ArveDeviceInfo]:
49  """Fetch data from API endpoint."""
50  try:
51  self.devicesdevices = await self.arvearve.get_devices()
52 
53  response_data = {
54  sn: ArveDeviceInfo(
55  await self.arvearve.device_sensor_data(sn),
56  await self.arvearve.get_sensor_info(sn),
57  )
58  for sn in self.devicesdevices.sn
59  }
60  except ArveConnectionError as err:
61  raise UpdateFailed("Unable to connect to the Arve device") from err
62  except ArveError as err:
63  raise UpdateFailed("Unknown error occurred") from err
64 
65  return response_data
dict[str, ArveDeviceInfo] _async_update_data(self)
Definition: coordinator.py:48
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)