Home Assistant Unofficial Reference 2024.12.1
sensor.py
Go to the documentation of this file.
1 """Helper sensor for calculating utility costs."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Callable, Mapping
7 import copy
8 from dataclasses import dataclass
9 import logging
10 from typing import Any, Final, Literal, cast
11 
13  ATTR_LAST_RESET,
14  ATTR_STATE_CLASS,
15  SensorDeviceClass,
16  SensorEntity,
17  SensorStateClass,
18 )
19 from homeassistant.components.sensor.recorder import reset_detected
20 from homeassistant.const import ATTR_UNIT_OF_MEASUREMENT, UnitOfEnergy, UnitOfVolume
21 from homeassistant.core import (
22  HomeAssistant,
23  State,
24  callback,
25  split_entity_id,
26  valid_entity_id,
27 )
28 from homeassistant.helpers import entity_registry as er
29 from homeassistant.helpers.entity_platform import AddEntitiesCallback
30 from homeassistant.helpers.event import async_track_state_change_event
31 from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
32 from homeassistant.util import unit_conversion
33 import homeassistant.util.dt as dt_util
34 from homeassistant.util.unit_system import METRIC_SYSTEM
35 
36 from .const import DOMAIN
37 from .data import EnergyManager, async_get_manager
38 
39 SUPPORTED_STATE_CLASSES = {
40  SensorStateClass.MEASUREMENT,
41  SensorStateClass.TOTAL,
42  SensorStateClass.TOTAL_INCREASING,
43 }
44 VALID_ENERGY_UNITS: set[str] = {
45  UnitOfEnergy.GIGA_JOULE,
46  UnitOfEnergy.KILO_WATT_HOUR,
47  UnitOfEnergy.MEGA_JOULE,
48  UnitOfEnergy.MEGA_WATT_HOUR,
49  UnitOfEnergy.WATT_HOUR,
50 }
51 VALID_ENERGY_UNITS_GAS = {
52  UnitOfVolume.CENTUM_CUBIC_FEET,
53  UnitOfVolume.CUBIC_FEET,
54  UnitOfVolume.CUBIC_METERS,
55  *VALID_ENERGY_UNITS,
56 }
57 VALID_VOLUME_UNITS_WATER: set[str] = {
58  UnitOfVolume.CENTUM_CUBIC_FEET,
59  UnitOfVolume.CUBIC_FEET,
60  UnitOfVolume.CUBIC_METERS,
61  UnitOfVolume.GALLONS,
62  UnitOfVolume.LITERS,
63 }
64 _LOGGER = logging.getLogger(__name__)
65 
66 
68  hass: HomeAssistant,
69  config: ConfigType,
70  async_add_entities: AddEntitiesCallback,
71  discovery_info: DiscoveryInfoType | None = None,
72 ) -> None:
73  """Set up the energy sensors."""
74  sensor_manager = SensorManager(await async_get_manager(hass), async_add_entities)
75  await sensor_manager.async_start()
76 
77 
78 @dataclass(slots=True)
80  """Adapter to allow sources and their flows to be used as sensors."""
81 
82  source_type: Literal["grid", "gas", "water"]
83  flow_type: Literal["flow_from", "flow_to", None]
84  stat_energy_key: Literal["stat_energy_from", "stat_energy_to"]
85  total_money_key: Literal["stat_cost", "stat_compensation"]
86  name_suffix: str
87  entity_id_suffix: str
88 
89 
90 SOURCE_ADAPTERS: Final = (
92  "grid",
93  "flow_from",
94  "stat_energy_from",
95  "stat_cost",
96  "Cost",
97  "cost",
98  ),
100  "grid",
101  "flow_to",
102  "stat_energy_to",
103  "stat_compensation",
104  "Compensation",
105  "compensation",
106  ),
108  "gas",
109  None,
110  "stat_energy_from",
111  "stat_cost",
112  "Cost",
113  "cost",
114  ),
116  "water",
117  None,
118  "stat_energy_from",
119  "stat_cost",
120  "Cost",
121  "cost",
122  ),
123 )
124 
125 
127  """Class to handle creation/removal of sensor data."""
128 
129  def __init__(
130  self, manager: EnergyManager, async_add_entities: AddEntitiesCallback
131  ) -> None:
132  """Initialize sensor manager."""
133  self.managermanager = manager
134  self.async_add_entitiesasync_add_entities = async_add_entities
135  self.current_entities: dict[tuple[str, str | None, str], EnergyCostSensor] = {}
136 
137  async def async_start(self) -> None:
138  """Start."""
139  self.managermanager.async_listen_updates(self._process_manager_data_process_manager_data)
140 
141  if self.managermanager.data:
142  await self._process_manager_data_process_manager_data()
143 
144  async def _process_manager_data(self) -> None:
145  """Process manager data."""
146  to_add: list[EnergyCostSensor] = []
147  to_remove = dict(self.current_entities)
148 
149  async def finish() -> None:
150  if to_add:
151  self.async_add_entitiesasync_add_entities(to_add)
152  await asyncio.wait(ent.add_finished for ent in to_add)
153 
154  for key, entity in to_remove.items():
155  self.current_entities.pop(key)
156  await entity.async_remove()
157 
158  if not self.managermanager.data:
159  await finish()
160  return
161 
162  for energy_source in self.managermanager.data["energy_sources"]:
163  for adapter in SOURCE_ADAPTERS:
164  if adapter.source_type != energy_source["type"]:
165  continue
166 
167  if adapter.flow_type is None:
168  self._process_sensor_data_process_sensor_data(
169  adapter,
170  energy_source,
171  to_add,
172  to_remove,
173  )
174  continue
175 
176  for flow in energy_source[adapter.flow_type]: # type: ignore[typeddict-item]
177  self._process_sensor_data_process_sensor_data(
178  adapter,
179  flow,
180  to_add,
181  to_remove,
182  )
183 
184  await finish()
185 
186  @callback
188  self,
189  adapter: SourceAdapter,
190  config: Mapping[str, Any],
191  to_add: list[EnergyCostSensor],
192  to_remove: dict[tuple[str, str | None, str], EnergyCostSensor],
193  ) -> None:
194  """Process sensor data."""
195  # No need to create an entity if we already have a cost stat
196  if config.get(adapter.total_money_key) is not None:
197  return
198 
199  key = (adapter.source_type, adapter.flow_type, config[adapter.stat_energy_key])
200 
201  # Make sure the right data is there
202  # If the entity existed, we don't pop it from to_remove so it's removed
203  if not valid_entity_id(config[adapter.stat_energy_key]) or (
204  config.get("entity_energy_price") is None
205  and config.get("number_energy_price") is None
206  ):
207  return
208 
209  if current_entity := to_remove.pop(key, None):
210  current_entity.update_config(config)
211  return
212 
213  self.current_entities[key] = EnergyCostSensor(
214  adapter,
215  config,
216  )
217  to_add.append(self.current_entities[key])
218 
219 
220 def _set_result_unless_done(future: asyncio.Future[None]) -> None:
221  """Set the result of a future unless it is done."""
222  if not future.done():
223  future.set_result(None)
224 
225 
227  """Calculate costs incurred by consuming energy.
228 
229  This is intended as a fallback for when no specific cost sensor is available for the
230  utility.
231  """
232 
233  _attr_entity_registry_visible_default = False
234  _attr_should_poll = False
235 
236  _wrong_state_class_reported = False
237  _wrong_unit_reported = False
238 
239  def __init__(
240  self,
241  adapter: SourceAdapter,
242  config: Mapping[str, Any],
243  ) -> None:
244  """Initialize the sensor."""
245  super().__init__()
246 
247  self._adapter_adapter = adapter
248  self.entity_identity_identity_id = f"{config[adapter.stat_energy_key]}_{adapter.entity_id_suffix}"
249  self._attr_device_class_attr_device_class = SensorDeviceClass.MONETARY
250  self._attr_state_class_attr_state_class = SensorStateClass.TOTAL
251  self._config_config = config
252  self._last_energy_sensor_state_last_energy_sensor_state: State | None = None
253  # add_finished is set when either of async_added_to_hass or add_to_platform_abort
254  # is called
255  self.add_finished: asyncio.Future[None] = (
256  asyncio.get_running_loop().create_future()
257  )
258 
259  def _reset(self, energy_state: State) -> None:
260  """Reset the cost sensor."""
261  self._attr_native_value_attr_native_value = 0.0
262  self._attr_last_reset_attr_last_reset = dt_util.utcnow()
263  self._last_energy_sensor_state_last_energy_sensor_state = energy_state
264  self.async_write_ha_stateasync_write_ha_state()
265 
266  @callback
267  def _update_cost(self) -> None:
268  """Update incurred costs."""
269  if self._adapter_adapter.source_type == "grid":
270  valid_units = VALID_ENERGY_UNITS
271  default_price_unit: str | None = UnitOfEnergy.KILO_WATT_HOUR
272 
273  elif self._adapter_adapter.source_type == "gas":
274  valid_units = VALID_ENERGY_UNITS_GAS
275  # No conversion for gas.
276  default_price_unit = None
277 
278  elif self._adapter_adapter.source_type == "water":
279  valid_units = VALID_VOLUME_UNITS_WATER
280  if self.hasshass.config.units is METRIC_SYSTEM:
281  default_price_unit = UnitOfVolume.CUBIC_METERS
282  else:
283  default_price_unit = UnitOfVolume.GALLONS
284 
285  energy_state = self.hasshass.states.get(
286  cast(str, self._config_config[self._adapter_adapter.stat_energy_key])
287  )
288 
289  if energy_state is None:
290  return
291 
292  state_class = energy_state.attributes.get(ATTR_STATE_CLASS)
293  if state_class not in SUPPORTED_STATE_CLASSES:
294  if not self._wrong_state_class_reported_wrong_state_class_reported_wrong_state_class_reported:
295  self._wrong_state_class_reported_wrong_state_class_reported_wrong_state_class_reported = True
296  _LOGGER.warning(
297  "Found unexpected state_class %s for %s",
298  state_class,
299  energy_state.entity_id,
300  )
301  return
302 
303  # last_reset must be set if the sensor is SensorStateClass.MEASUREMENT
304  if (
305  state_class == SensorStateClass.MEASUREMENT
306  and ATTR_LAST_RESET not in energy_state.attributes
307  ):
308  return
309 
310  try:
311  energy = float(energy_state.state)
312  except ValueError:
313  return
314 
315  # Determine energy price
316  if self._config_config["entity_energy_price"] is not None:
317  energy_price_state = self.hasshass.states.get(
318  self._config_config["entity_energy_price"]
319  )
320 
321  if energy_price_state is None:
322  return
323 
324  try:
325  energy_price = float(energy_price_state.state)
326  except ValueError:
327  if self._last_energy_sensor_state_last_energy_sensor_state is None:
328  # Initialize as it's the first time all required entities except
329  # price are in place. This means that the cost will update the first
330  # time the energy is updated after the price entity is in place.
331  self._reset_reset(energy_state)
332  return
333 
334  energy_price_unit: str | None = energy_price_state.attributes.get(
335  ATTR_UNIT_OF_MEASUREMENT, ""
336  ).partition("/")[2]
337 
338  # For backwards compatibility we don't validate the unit of the price
339  # If it is not valid, we assume it's our default price unit.
340  if energy_price_unit not in valid_units:
341  energy_price_unit = default_price_unit
342 
343  else:
344  energy_price = cast(float, self._config_config["number_energy_price"])
345  energy_price_unit = default_price_unit
346 
347  if self._last_energy_sensor_state_last_energy_sensor_state is None:
348  # Initialize as it's the first time all required entities are in place.
349  self._reset_reset(energy_state)
350  return
351 
352  energy_unit: str | None = energy_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
353 
354  if energy_unit is None or energy_unit not in valid_units:
355  if not self._wrong_unit_reported_wrong_unit_reported_wrong_unit_reported:
356  self._wrong_unit_reported_wrong_unit_reported_wrong_unit_reported = True
357  _LOGGER.warning(
358  "Found unexpected unit %s for %s",
359  energy_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT),
360  energy_state.entity_id,
361  )
362  return
363 
364  if (
365  (
366  state_class != SensorStateClass.TOTAL_INCREASING
367  and energy_state.attributes.get(ATTR_LAST_RESET)
368  != self._last_energy_sensor_state_last_energy_sensor_state.attributes.get(ATTR_LAST_RESET)
369  )
370  or state_class == SensorStateClass.TOTAL_INCREASING
371  and reset_detected(
372  self.hasshass,
373  cast(str, self._config_config[self._adapter_adapter.stat_energy_key]),
374  energy,
375  float(self._last_energy_sensor_state_last_energy_sensor_state.state),
376  self._last_energy_sensor_state_last_energy_sensor_state,
377  )
378  ):
379  # Energy meter was reset, reset cost sensor too
380  energy_state_copy = copy.copy(energy_state)
381  energy_state_copy.state = "0.0"
382  self._reset_reset(energy_state_copy)
383 
384  # Update with newly incurred cost
385  old_energy_value = float(self._last_energy_sensor_state_last_energy_sensor_state.state)
386  cur_value = cast(float, self._attr_native_value_attr_native_value)
387 
388  if energy_price_unit is None:
389  converted_energy_price = energy_price
390  else:
391  converter: Callable[[float, str, str], float]
392  if energy_unit in VALID_ENERGY_UNITS:
393  converter = unit_conversion.EnergyConverter.convert
394  else:
395  converter = unit_conversion.VolumeConverter.convert
396 
397  converted_energy_price = converter(
398  energy_price,
399  energy_unit,
400  energy_price_unit,
401  )
402 
403  self._attr_native_value_attr_native_value = (
404  cur_value + (energy - old_energy_value) * converted_energy_price
405  )
406 
407  self._last_energy_sensor_state_last_energy_sensor_state = energy_state
408 
409  async def async_added_to_hass(self) -> None:
410  """Register callbacks."""
411  energy_state = self.hasshass.states.get(self._config_config[self._adapter_adapter.stat_energy_key])
412  if energy_state:
413  name = energy_state.name
414  else:
415  name = split_entity_id(self._config_config[self._adapter_adapter.stat_energy_key])[
416  0
417  ].replace("_", " ")
418 
419  self._attr_name_attr_name = f"{name} {self._adapter.name_suffix}"
420 
421  self._update_cost_update_cost()
422 
423  # Store stat ID in hass.data so frontend can look it up
424  self.hasshass.data[DOMAIN]["cost_sensors"][
425  self._config_config[self._adapter_adapter.stat_energy_key]
426  ] = self.entity_identity_identity_id
427 
428  self.async_on_removeasync_on_remove(
430  self.hasshass,
431  cast(str, self._config_config[self._adapter_adapter.stat_energy_key]),
432  self._async_state_changed_listener_async_state_changed_listener,
433  )
434  )
435  _set_result_unless_done(self.add_finished)
436 
437  @callback
438  def _async_state_changed_listener(self, *_: Any) -> None:
439  """Handle child updates."""
440  self._update_cost_update_cost()
441  self.async_write_ha_stateasync_write_ha_state()
442 
443  @callback
444  def add_to_platform_abort(self) -> None:
445  """Abort adding an entity to a platform."""
446  _set_result_unless_done(self.add_finished)
447  super().add_to_platform_abort()
448 
449  async def async_will_remove_from_hass(self) -> None:
450  """Handle removing from hass."""
451  self.hasshass.data[DOMAIN]["cost_sensors"].pop(
452  self._config_config[self._adapter_adapter.stat_energy_key]
453  )
454  await super().async_will_remove_from_hass()
455 
456  @callback
457  def update_config(self, config: Mapping[str, Any]) -> None:
458  """Update the config."""
459  self._config_config = config
460 
461  @property
462  def native_unit_of_measurement(self) -> str | None:
463  """Return the units of measurement."""
464  return self.hasshass.config.currency
465 
466  @property
467  def unique_id(self) -> str | None:
468  """Return the unique ID of the sensor."""
469  entity_registry = er.async_get(self.hasshass)
470  if registry_entry := entity_registry.async_get(
471  self._config_config[self._adapter_adapter.stat_energy_key]
472  ):
473  prefix = registry_entry.id
474  else:
475  prefix = self._config_config[self._adapter_adapter.stat_energy_key]
476 
477  return f"{prefix}_{self._adapter.source_type}_{self._adapter.entity_id_suffix}"
None __init__(self, SourceAdapter adapter, Mapping[str, Any] config)
Definition: sensor.py:243
None update_config(self, Mapping[str, Any] config)
Definition: sensor.py:457
None _process_sensor_data(self, SourceAdapter adapter, Mapping[str, Any] config, list[EnergyCostSensor] to_add, dict[tuple[str, str|None, str], EnergyCostSensor] to_remove)
Definition: sensor.py:193
None __init__(self, EnergyManager manager, AddEntitiesCallback async_add_entities)
Definition: sensor.py:131
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
EnergyManager async_get_manager(HomeAssistant hass)
Definition: data.py:22
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
Definition: sensor.py:72
None _set_result_unless_done(asyncio.Future[None] future)
Definition: sensor.py:220
bool reset_detected(HomeAssistant hass, str entity_id, float fstate, float|None previous_fstate, State state)
Definition: recorder.py:359
bool valid_entity_id(str entity_id)
Definition: core.py:235
tuple[str, str] split_entity_id(str entity_id)
Definition: core.py:214
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
Definition: event.py:314