1 """DataUpdateCoordinator for the Airly integration."""
3 from asyncio
import timeout
4 from datetime
import timedelta
8 from aiohttp
import ClientSession
9 from aiohttp.client_exceptions
import ClientConnectorError
10 from airly
import Airly
11 from airly.exceptions
import AirlyError
20 ATTR_API_CAQI_DESCRIPTION,
28 _LOGGER = logging.getLogger(__name__)
32 """Return data update interval.
34 The number of requests is reset at midnight UTC so we calculate the update
35 interval based on number of minutes until midnight, the number of Airly instances
36 and the number of remaining requests.
38 now = dt_util.utcnow()
39 midnight = dt_util.find_next_time_expression_time(
40 now, seconds=[0], minutes=[0], hours=[0]
42 minutes_to_midnight = (midnight - now).total_seconds() / 60
46 ceil(minutes_to_midnight / requests_remaining * instances_count),
53 _LOGGER.debug(
"Data will be update every %s", interval)
59 """Define an object to hold Airly data."""
64 session: ClientSession,
68 update_interval: timedelta,
75 language =
"pl" if hass.config.language ==
"pl" else "en"
76 self.
airlyairly = Airly(api_key, session, language=language)
79 super().
__init__(hass, _LOGGER, name=DOMAIN, update_interval=update_interval)
82 """Update data via library."""
83 data: dict[str, str | float | int] = {}
85 measurements = self.
airlyairly.create_measurements_session_nearest(
89 measurements = self.
airlyairly.create_measurements_session_point(
92 async
with timeout(20):
94 await measurements.update()
95 except (AirlyError, ClientConnectorError)
as error:
99 "Requests remaining: %s/%s",
100 self.
airlyairly.requests_remaining,
101 self.
airlyairly.requests_per_day,
106 if self.
airlyairly.requests_remaining:
108 len(self.
hasshass.config_entries.async_entries(DOMAIN)),
109 self.
airlyairly.requests_remaining,
112 values = measurements.current[
"values"]
113 index = measurements.current[
"indexes"][0]
114 standards = measurements.current[
"standards"]
116 if index[
"description"] == NO_AIRLY_SENSORS:
117 raise UpdateFailed(
"Can't retrieve data: no Airly sensors in this area")
119 data[value[
"name"]] = value[
"value"]
120 for standard
in standards:
121 data[f
"{standard['pollutant']}_LIMIT"] = standard[
"limit"]
122 data[f
"{standard['pollutant']}_PERCENT"] = standard[
"percent"]
123 data[ATTR_API_CAQI] = index[
"value"]
124 data[ATTR_API_CAQI_LEVEL] = index[
"level"].lower().replace(
"_",
" ")
125 data[ATTR_API_CAQI_DESCRIPTION] = index[
"description"]
126 data[ATTR_API_ADVICE] = index[
"advice"]
None __init__(self, HomeAssistant hass, ClientSession session, str api_key, float latitude, float longitude, timedelta update_interval, bool use_nearest)
dict[str, str|float|int] _async_update_data(self)
None update_interval(self, timedelta|None value)
timedelta|None update_interval(self)
timedelta set_update_interval(int instances_count, int requests_remaining)