Home Assistant Unofficial Reference 2024.12.1
coordinator.py
Go to the documentation of this file.
1 """Define a PurpleAir DataUpdateCoordinator."""
2 
3 from __future__ import annotations
4 
5 from datetime import timedelta
6 
7 from aiopurpleair import API
8 from aiopurpleair.errors import InvalidApiKeyError, PurpleAirError
9 from aiopurpleair.models.sensors import GetSensorsResponse
10 
11 from homeassistant.config_entries import ConfigEntry
12 from homeassistant.const import CONF_API_KEY
13 from homeassistant.core import HomeAssistant, callback
14 from homeassistant.exceptions import ConfigEntryAuthFailed
15 from homeassistant.helpers import aiohttp_client
16 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
17 
18 from .const import CONF_SENSOR_INDICES, LOGGER
19 
20 SENSOR_FIELDS_TO_RETRIEVE = [
21  "0.3_um_count",
22  "0.5_um_count",
23  "1.0_um_count",
24  "10.0_um_count",
25  "2.5_um_count",
26  "5.0_um_count",
27  "altitude",
28  "firmware_version",
29  "hardware",
30  "humidity",
31  "latitude",
32  "location_type",
33  "longitude",
34  "model",
35  "name",
36  "pm1.0",
37  "pm10.0",
38  "pm2.5",
39  "pressure",
40  "rssi",
41  "temperature",
42  "uptime",
43  "voc",
44 ]
45 
46 UPDATE_INTERVAL = timedelta(minutes=2)
47 
48 
50  """Define a PurpleAir-specific coordinator."""
51 
52  def __init__(self, hass: HomeAssistant, entry: ConfigEntry) -> None:
53  """Initialize."""
54  self._entry_entry = entry
55  self._api_api = API(
56  entry.data[CONF_API_KEY],
57  session=aiohttp_client.async_get_clientsession(hass),
58  )
59 
60  super().__init__(
61  hass, LOGGER, name=entry.title, update_interval=UPDATE_INTERVAL
62  )
63 
64  async def _async_update_data(self) -> GetSensorsResponse:
65  """Get the latest sensor information."""
66  try:
67  return await self._api_api.sensors.async_get_sensors(
68  SENSOR_FIELDS_TO_RETRIEVE,
69  sensor_indices=self._entry_entry.options[CONF_SENSOR_INDICES],
70  )
71  except InvalidApiKeyError as err:
72  raise ConfigEntryAuthFailed("Invalid API key") from err
73  except PurpleAirError as err:
74  raise UpdateFailed(f"Error while fetching data: {err}") from err
75 
76  @callback
77  def async_get_map_url(self, sensor_index: int) -> str:
78  """Get the map URL for a sensor index."""
79  return self._api_api.get_map_url(sensor_index)
None __init__(self, HomeAssistant hass, ConfigEntry entry)
Definition: coordinator.py:52