Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """The air-Q integration."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 import logging
7 
8 from aioairq import AirQ
9 
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.const import CONF_IP_ADDRESS, CONF_PASSWORD
12 from homeassistant.core import HomeAssistant
13 from homeassistant.helpers.aiohttp_client import async_get_clientsession
14 from homeassistant.helpers.device_registry import DeviceInfo
15 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
16 
17 from .const import DOMAIN, MANUFACTURER, UPDATE_INTERVAL
18 
19 _LOGGER = logging.getLogger(__name__)
20 
21 
23  """Coordinator is responsible for querying the device at a specified route."""
24 
25  def __init__(
26  self,
27  hass: HomeAssistant,
28  entry: ConfigEntry,
29  clip_negative: bool = True,
30  return_average: bool = True,
31  ) -> None:
32  """Initialise a custom coordinator."""
33  super().__init__(
34  hass,
35  _LOGGER,
36  name=DOMAIN,
37  update_interval=timedelta(seconds=UPDATE_INTERVAL),
38  )
39  session = async_get_clientsession(hass)
40  self.airqairq = AirQ(
41  entry.data[CONF_IP_ADDRESS], entry.data[CONF_PASSWORD], session
42  )
43  self.device_iddevice_id = entry.unique_id
44  assert self.device_iddevice_id is not None
45  self.device_infodevice_info = DeviceInfo(
46  manufacturer=MANUFACTURER,
47  identifiers={(DOMAIN, self.device_iddevice_id)},
48  )
49  self.clip_negativeclip_negative = clip_negative
50  self.return_averagereturn_average = return_average
51 
52  async def _async_update_data(self) -> dict:
53  """Fetch the data from the device."""
54  if "name" not in self.device_infodevice_info:
55  info = await self.airqairq.fetch_device_info()
56  self.device_infodevice_info.update(
57  DeviceInfo(
58  name=info["name"],
59  model=info["model"],
60  sw_version=info["sw_version"],
61  hw_version=info["hw_version"],
62  )
63  )
64  return await self.airqairq.get_latest_data( # type: ignore[no-any-return]
65  return_average=self.return_averagereturn_average,
66  clip_negative_values=self.clip_negativeclip_negative,
67  )
None __init__(self, HomeAssistant hass, ConfigEntry entry, bool clip_negative=True, bool return_average=True)
Definition: coordinator.py:31
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)