1 """Support for hydrological data from the Fed. Office for the Environment."""
3 from __future__
import annotations
5 from datetime
import timedelta
8 from swisshydrodata
import SwissHydroData
9 import voluptuous
as vol
12 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
22 _LOGGER = logging.getLogger(__name__)
24 ATTR_MAX_24H =
"max-24h"
25 ATTR_MEAN_24H =
"mean-24h"
26 ATTR_MIN_24H =
"min-24h"
27 ATTR_STATION =
"station"
28 ATTR_STATION_UPDATE =
"station_update"
29 ATTR_WATER_BODY =
"water_body"
30 ATTR_WATER_BODY_TYPE =
"water_body_type"
32 CONF_STATION =
"station"
36 SENSOR_DISCHARGE =
"discharge"
37 SENSOR_LEVEL =
"level"
38 SENSOR_TEMPERATURE =
"temperature"
41 SENSOR_DISCHARGE:
"mdi:waves",
42 SENSOR_LEVEL:
"mdi:zodiac-aquarius",
43 SENSOR_TEMPERATURE:
"mdi:oil-temperature",
52 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
54 vol.Required(CONF_STATION): vol.Coerce(int),
55 vol.Optional(CONF_MONITORED_CONDITIONS, default=[SENSOR_TEMPERATURE]): vol.All(
56 cv.ensure_list, [vol.In(CONDITIONS)]
65 add_entities: AddEntitiesCallback,
66 discovery_info: DiscoveryInfoType |
None =
None,
68 """Set up the Swiss hydrological sensor."""
69 station = config[CONF_STATION]
70 monitored_conditions = config[CONF_MONITORED_CONDITIONS]
75 if hydro_data.data
is None:
76 _LOGGER.error(
"The station doesn't exists: %s", station)
82 for condition
in monitored_conditions
89 """Implementation of a Swiss hydrological sensor."""
92 "Data provided by the Swiss Federal Office for the Environment FOEN"
95 def __init__(self, hydro_data, station, condition):
96 """Initialize the Swiss hydrological sensor."""
100 self.
_icon_icon = CONDITIONS[condition]
105 """Return the name of the sensor."""
106 return f
"{self._data['water-body-name']} {self._condition}"
110 """Return a unique, friendly identifier for this entity."""
111 return f
"{self._station}_{self._condition}"
115 """Return the unit of measurement of this entity, if any."""
116 if self.
_state_state
is not None:
122 """Return the state of the sensor."""
123 if isinstance(self.
_state_state, (int, float)):
124 return round(self.
_state_state, 2)
129 """Return the device state attributes."""
132 if not self.
_data_data:
135 attrs[ATTR_WATER_BODY_TYPE] = self.
_data_data[
"water-body-type"]
136 attrs[ATTR_STATION] = self.
_data_data[
"name"]
137 attrs[ATTR_STATION_UPDATE] = self.
_data_data[
"parameters"][self.
_condition_condition][
141 for entry
in CONDITION_DETAILS:
142 attrs[entry.replace(
"-",
"_")] = self.
_data_data[
"parameters"][self.
_condition_condition][
150 """Icon to use in the frontend."""
151 return self.
_icon_icon
154 """Get the latest data and update the state."""
158 if self.
_data_data
is None:
165 """The Class for handling the data retrieval."""
168 """Initialize the data object."""
172 @Throttle(MIN_TIME_BETWEEN_UPDATES)
174 """Get the latest data."""
176 shd = SwissHydroData()
177 self.
datadata = shd.get_station(self.
stationstation)
def __init__(self, station)
def __init__(self, hydro_data, station, condition)
def extra_state_attributes(self)
def native_unit_of_measurement(self)
def add_entities(account, async_add_entities, tracked)
None setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback add_entities, DiscoveryInfoType|None discovery_info=None)