1 """Data for all Kaiterra devices."""
4 from logging
import getLogger
6 from aiohttp.client_exceptions
import ClientConnectorError, ClientResponseError
7 from kaiterra_async_client
import AQIStandard, KaiterraAPIClient, Units
20 _LOGGER = getLogger(__name__)
22 POLLUTANTS = {
"rpm25c":
"PM2.5",
"rpm10c":
"PM10",
"rtvoc":
"TVOC",
"rco2":
"CO2"}
26 """Get data from Kaiterra API."""
29 """Initialize the API data object."""
31 api_key = config[CONF_API_KEY]
32 aqi_standard = config[CONF_AQI_STANDARD]
33 devices = config[CONF_DEVICES]
34 units = config[CONF_PREFERRED_UNITS]
37 self.
_api_api = KaiterraAPIClient(
40 aqi_standard=AQIStandard.from_str(aqi_standard),
41 preferred_units=[Units.from_str(unit)
for unit
in units],
43 self.
_devices_ids_devices_ids = [device[CONF_DEVICE_ID]
for device
in devices]
45 f
"/{device[CONF_TYPE]}s/{device[CONF_DEVICE_ID]}" for device
in devices
47 self.
_scale_scale = AQI_SCALE[aqi_standard]
48 self.
_level_level = AQI_LEVEL[aqi_standard]
53 """Get the data from Kaiterra API."""
56 async
with asyncio.timeout(10):
57 data = await self.
_api_api.get_latest_sensor_readings(self.
_devices_devices)
58 except (ClientResponseError, ClientConnectorError, TimeoutError)
as err:
59 _LOGGER.debug(
"Couldn't fetch data from Kaiterra API: %s", err)
64 _LOGGER.debug(
"New data retrieved: %s", data)
68 for i, device
in enumerate(data):
73 aqi, main_pollutant =
None,
None
74 for sensor_name, sensor
in device.items():
75 if not (points := sensor.get(
"points")):
79 sensor[
"value"] = point.get(
"value")
81 if "aqi" not in point:
84 sensor[
"aqi"] = point[
"aqi"]
85 if not aqi
or aqi < point[
"aqi"]:
87 main_pollutant = POLLUTANTS.get(sensor_name)
91 for j
in range(1, len(self.
_scale_scale)):
92 if aqi <= self.
_scale_scale[j]:
93 level = self.
_level_level[j - 1]
96 device[
"aqi"] = {
"value": aqi}
97 device[
"aqi_level"] = {
"value": level}
98 device[
"aqi_pollutant"] = {
"value": main_pollutant}
101 except IndexError
as err:
102 _LOGGER.error(
"Parsing error %s", err)
103 except TypeError
as err:
104 _LOGGER.error(
"Type error %s", err)
def __init__(self, hass, config, session)
None async_dispatcher_send(HomeAssistant hass, str signal, *Any args)