1 """Support for gauges from flood monitoring API."""
13 DataUpdateCoordinator,
16 from .const
import DOMAIN
19 "http://qudt.org/1.1/vocab/unit#Meter": UnitOfLength.METERS,
25 config_entry: ConfigEntry,
26 async_add_entities: AddEntitiesCallback,
28 """Set up UK Flood Monitoring Sensors."""
29 coordinator: DataUpdateCoordinator = hass.data[DOMAIN][config_entry.entry_id]
30 created_entities: set[str] = set()
33 def _async_create_new_entities():
34 """Create new entities."""
35 if not coordinator.last_update_success:
37 measures: dict[str, dict[str, Any]] = coordinator.data[
"measures"]
38 entities: list[Measurement] = []
40 for key, data
in measures.items():
41 if key
in created_entities:
44 if "latestReading" not in data:
49 created_entities.add(key)
53 _async_create_new_entities()
57 config_entry.async_on_unload(
58 coordinator.async_add_listener(_async_create_new_entities)
63 """A gauge at a flood monitoring station."""
66 "This uses Environment Agency flood and river level data "
67 "from the real-time data API"
69 _attr_state_class = SensorStateClass.MEASUREMENT
70 _attr_has_entity_name =
True
74 """Initialise the gauge with a data instance and station."""
81 """Return the station name for the measure."""
82 return self.coordinator.data[
"label"]
86 """Return the station id for the measure."""
87 return self.coordinator.data[
"measures"][self.
keykey][
"stationReference"]
91 """Return the qualifier for the station."""
92 return self.coordinator.data[
"measures"][self.
keykey][
"qualifier"]
96 """Return the parameter name for the station."""
97 return self.coordinator.data[
"measures"][self.
keykey][
"parameterName"]
101 """Return the device info."""
103 entry_type=DeviceEntryType.SERVICE,
104 identifiers={(DOMAIN,
"measure-id", self.
station_idstation_id)},
105 manufacturer=
"https://environment.data.gov.uk/",
107 name=f
"{self.station_name} {self.parameter_name} {self.qualifier}",
112 """Return True if entity is available."""
113 if not self.coordinator.last_update_success:
117 if "latestReading" not in self.coordinator.data[
"measures"][self.
keykey]:
123 self.coordinator.data[
"measures"][self.
keykey][
"latestReading"], dict
131 """Return units for the sensor."""
132 measure = self.coordinator.data[
"measures"][self.
keykey]
133 if "unit" not in measure:
135 return UNIT_MAPPING.get(measure[
"unit"], measure[
"unitName"])
139 """Return the current sensor value."""
140 return self.coordinator.data[
"measures"][self.
keykey][
"latestReading"][
"value"]
def __init__(self, coordinator, key)
def native_unit_of_measurement(self)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)