1 """Define a PurpleAir DataUpdateCoordinator."""
3 from __future__
import annotations
5 from datetime
import timedelta
7 from aiopurpleair
import API
8 from aiopurpleair.errors
import InvalidApiKeyError, PurpleAirError
9 from aiopurpleair.models.sensors
import GetSensorsResponse
18 from .const
import CONF_SENSOR_INDICES, LOGGER
20 SENSOR_FIELDS_TO_RETRIEVE = [
50 """Define a PurpleAir-specific coordinator."""
52 def __init__(self, hass: HomeAssistant, entry: ConfigEntry) ->
None:
56 entry.data[CONF_API_KEY],
57 session=aiohttp_client.async_get_clientsession(hass),
61 hass, LOGGER, name=entry.title, update_interval=UPDATE_INTERVAL
65 """Get the latest sensor information."""
67 return await self.
_api_api.sensors.async_get_sensors(
68 SENSOR_FIELDS_TO_RETRIEVE,
69 sensor_indices=self.
_entry_entry.options[CONF_SENSOR_INDICES],
71 except InvalidApiKeyError
as err:
73 except PurpleAirError
as err:
74 raise UpdateFailed(f
"Error while fetching data: {err}")
from err
78 """Get the map URL for a sensor index."""
79 return self.
_api_api.get_map_url(sensor_index)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
str async_get_map_url(self, int sensor_index)
GetSensorsResponse _async_update_data(self)