1 """DataUpdateCoordinators for awair integration."""
3 from __future__
import annotations
5 from asyncio
import gather, timeout
6 from dataclasses
import dataclass
7 from datetime
import timedelta
9 from aiohttp
import ClientSession
10 from python_awair
import Awair, AwairLocal
11 from python_awair.air_data
import AirData
12 from python_awair.devices
import AwairBaseDevice, AwairLocalDevice
13 from python_awair.exceptions
import AuthError, AwairError
25 UPDATE_INTERVAL_CLOUD,
26 UPDATE_INTERVAL_LOCAL,
29 type AwairConfigEntry = ConfigEntry[AwairDataUpdateCoordinator]
34 """Wrapper class to hold an awair device and set of air data."""
36 device: AwairBaseDevice
41 """Define a wrapper class to update Awair data."""
46 config_entry: ConfigEntry,
47 update_interval: timedelta |
None,
49 """Set up the AwairDataUpdateCoordinator class."""
51 self.
titletitle = config_entry.title
53 super().
__init__(hass, LOGGER, name=DOMAIN, update_interval=update_interval)
56 """Fetch latest air quality data."""
57 LOGGER.debug(
"Fetching data for %s", device.uuid)
58 air_data = await device.air_data_latest()
59 LOGGER.debug(air_data)
60 return AwairResult(device=device, air_data=air_data)
64 """Define a wrapper class to update Awair data from Cloud API."""
67 self, hass: HomeAssistant, config_entry: ConfigEntry, session: ClientSession
69 """Set up the AwairCloudDataUpdateCoordinator class."""
70 access_token = config_entry.data[CONF_ACCESS_TOKEN]
71 self.
_awair_awair = Awair(access_token=access_token, session=session)
73 super().
__init__(hass, config_entry, UPDATE_INTERVAL_CLOUD)
76 """Update data via Awair client library."""
77 async
with timeout(API_TIMEOUT):
79 LOGGER.debug(
"Fetching users and devices")
80 user = await self.
_awair_awair.user()
81 devices = await user.devices()
82 results = await gather(
85 return {result.device.uuid: result
for result
in results}
86 except AuthError
as err:
87 raise ConfigEntryAuthFailed
from err
88 except Exception
as err:
93 """Define a wrapper class to update Awair data from the local API."""
95 _device: AwairLocalDevice |
None =
None
98 self, hass: HomeAssistant, config_entry: ConfigEntry, session: ClientSession
100 """Set up the AwairLocalDataUpdateCoordinator class."""
102 session=session, device_addrs=[config_entry.data[CONF_HOST]]
105 super().
__init__(hass, config_entry, UPDATE_INTERVAL_LOCAL)
108 """Update data via Awair client library."""
109 async
with timeout(API_TIMEOUT):
111 if self.
_device_device
is None:
112 LOGGER.debug(
"Fetching devices")
116 except AwairError
as err:
117 LOGGER.error(
"Unexpected API error: %s", err)
119 return {result.device.uuid: result}
dict[str, AwairResult] _async_update_data(self)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, ClientSession session)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, timedelta|None update_interval)
AwairResult _fetch_air_data(self, AwairBaseDevice device)
None __init__(self, HomeAssistant hass, ConfigEntry config_entry, ClientSession session)
dict[str, AwairResult] _async_update_data(self)
dict[str, dict[str, Any]] devices(HomeAssistant hass)