1 """Support for openSenseMap Air Quality data."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from opensensemap_api
import OpenSenseMap
9 from opensensemap_api.exceptions
import OpenSenseMapError
10 import voluptuous
as vol
13 PLATFORM_SCHEMA
as AIR_QUALITY_PLATFORM_SCHEMA,
25 _LOGGER = logging.getLogger(__name__)
28 CONF_STATION_ID =
"station_id"
32 PLATFORM_SCHEMA = AIR_QUALITY_PLATFORM_SCHEMA.extend(
33 {vol.Required(CONF_STATION_ID): cv.string, vol.Optional(CONF_NAME): cv.string}
40 async_add_entities: AddEntitiesCallback,
41 discovery_info: DiscoveryInfoType |
None =
None,
43 """Set up the openSenseMap air quality platform."""
45 name = config.get(CONF_NAME)
46 station_id = config[CONF_STATION_ID]
51 await osm_api.async_update()
53 if "name" not in osm_api.api.data:
54 _LOGGER.error(
"Station %s is not available", station_id)
55 raise PlatformNotReady
57 station_name = osm_api.api.data[
"name"]
if name
is None else name
63 """Implementation of an openSenseMap air quality entity."""
65 _attr_attribution =
"Data provided by openSenseMap"
68 """Initialize the air quality entity."""
74 """Return the name of the air quality entity."""
75 return self.
_name_name
79 """Return the particulate matter 2.5 level."""
80 return self.
_osm_osm.api.pm2_5
84 """Return the particulate matter 10 level."""
85 return self.
_osm_osm.api.pm10
88 """Get the latest data from the openSenseMap API."""
93 """Get the latest data and update the states."""
96 """Initialize the data object."""
99 @Throttle(SCAN_INTERVAL)
101 """Get the latest data from the Pi-hole."""
104 await self.
apiapi.get_data()
105 except OpenSenseMapError
as err:
106 _LOGGER.error(
"Unable to fetch data: %s", err)
def __init__(self, name, osm)
def particulate_matter_10(self)
def particulate_matter_2_5(self)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)