1 """Numeric integration of data coming from a source sensor over time."""
3 from __future__
import annotations
5 from abc
import ABC, abstractmethod
6 from dataclasses
import dataclass
7 from datetime
import UTC, datetime, timedelta
8 from decimal
import Decimal, InvalidOperation
11 from typing
import TYPE_CHECKING, Any, Final, Self
13 import voluptuous
as vol
17 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
20 SensorExtraStoredData,
26 ATTR_UNIT_OF_MEASUREMENT,
36 EventStateChangedData,
37 EventStateReportedData,
48 async_track_state_change_event,
49 async_track_state_report_event,
54 CONF_MAX_SUB_INTERVAL,
57 CONF_UNIT_OF_MEASUREMENT,
66 _LOGGER = logging.getLogger(__name__)
68 ATTR_SOURCE_ID: Final =
"source"
71 UNIT_PREFIXES = {
None: 1,
"k": 10**3,
"M": 10**6,
"G": 10**9,
"T": 10**12}
75 UnitOfTime.SECONDS: 1,
76 UnitOfTime.MINUTES: 60,
77 UnitOfTime.HOURS: 60 * 60,
78 UnitOfTime.DAYS: 24 * 60 * 60,
82 SensorDeviceClass.POWER: SensorDeviceClass.ENERGY,
87 PLATFORM_SCHEMA = vol.All(
88 cv.removed(CONF_UNIT_OF_MEASUREMENT),
89 SENSOR_PLATFORM_SCHEMA.extend(
91 vol.Optional(CONF_NAME): cv.string,
92 vol.Optional(CONF_UNIQUE_ID): cv.string,
93 vol.Required(CONF_SOURCE_SENSOR): cv.entity_id,
94 vol.Optional(CONF_ROUND_DIGITS, default=DEFAULT_ROUND): vol.Any(
97 vol.Optional(CONF_UNIT_PREFIX): vol.In(UNIT_PREFIXES),
98 vol.Optional(CONF_UNIT_TIME, default=UnitOfTime.HOURS): vol.In(UNIT_TIME),
99 vol.Remove(CONF_UNIT_OF_MEASUREMENT): cv.string,
100 vol.Optional(CONF_MAX_SUB_INTERVAL): cv.positive_time_period,
101 vol.Optional(CONF_METHOD, default=METHOD_TRAPEZOIDAL): vol.In(
112 return _NAME_TO_INTEGRATION_METHOD[method_name]()
116 """Check state requirements for integration."""
120 self, elapsed_time: Decimal, left: Decimal, right: Decimal
122 """Calculate area given two states."""
125 self, elapsed_time: Decimal, constant_state: Decimal
127 return constant_state * elapsed_time
132 self, elapsed_time: Decimal, left: Decimal, right: Decimal
134 return elapsed_time * (left + right) / 2
141 return (left_dec, right_dec)
146 self, elapsed_time: Decimal, left: Decimal, right: Decimal
153 return (left_dec, left_dec)
158 self, elapsed_time: Decimal, left: Decimal, right: Decimal
165 return (right_dec, right_dec)
170 return Decimal(state)
171 except (InvalidOperation, TypeError):
175 _NAME_TO_INTEGRATION_METHOD: dict[str, type[_IntegrationMethod]] = {
177 METHOD_RIGHT: _Right,
178 METHOD_TRAPEZOIDAL: _Trapezoidal,
183 StateEvent =
"state_event"
184 TimeElapsed =
"time_elapsed"
189 """Object to hold extra stored data."""
191 source_entity: str |
None
192 last_valid_state: Decimal |
None
195 """Return a dict representation of the utility sensor data."""
197 data[
"source_entity"] = self.source_entity
198 data[
"last_valid_state"] = (
199 str(self.last_valid_state)
if self.last_valid_state
else None
204 def from_dict(cls, restored: dict[str, Any]) -> Self |
None:
205 """Initialize a stored sensor state from a dict."""
206 extra = SensorExtraStoredData.from_dict(restored)
210 source_entity = restored.get(ATTR_SOURCE_ID)
214 Decimal(
str(restored.get(
"last_valid_state")))
215 if restored.get(
"last_valid_state")
218 except InvalidOperation:
220 _LOGGER.error(
"Could not use last_valid_state")
223 if last_valid_state
is None:
228 extra.native_unit_of_measurement,
236 config_entry: ConfigEntry,
237 async_add_entities: AddEntitiesCallback,
239 """Initialize Integration - Riemann sum integral config entry."""
240 registry = er.async_get(hass)
242 source_entity_id = er.async_validate_entity_id(
243 registry, config_entry.options[CONF_SOURCE_SENSOR]
251 if (unit_prefix := config_entry.options.get(CONF_UNIT_PREFIX)) ==
"none":
255 if max_sub_interval_dict := config_entry.options.get(CONF_MAX_SUB_INTERVAL,
None):
256 max_sub_interval = cv.time_period(max_sub_interval_dict)
258 max_sub_interval =
None
260 round_digits = config_entry.options.get(CONF_ROUND_DIGITS)
262 round_digits =
int(round_digits)
265 integration_method=config_entry.options[CONF_METHOD],
266 name=config_entry.title,
267 round_digits=round_digits,
268 source_entity=source_entity_id,
269 unique_id=config_entry.entry_id,
270 unit_prefix=unit_prefix,
271 unit_time=config_entry.options[CONF_UNIT_TIME],
272 device_info=device_info,
273 max_sub_interval=max_sub_interval,
282 async_add_entities: AddEntitiesCallback,
283 discovery_info: DiscoveryInfoType |
None =
None,
285 """Set up the integration sensor."""
287 integration_method=config[CONF_METHOD],
288 name=config.get(CONF_NAME),
289 round_digits=config.get(CONF_ROUND_DIGITS),
290 source_entity=config[CONF_SOURCE_SENSOR],
291 unique_id=config.get(CONF_UNIQUE_ID),
292 unit_prefix=config.get(CONF_UNIT_PREFIX),
293 unit_time=config[CONF_UNIT_TIME],
294 max_sub_interval=config.get(CONF_MAX_SUB_INTERVAL),
301 """Representation of an integration sensor."""
303 _attr_state_class = SensorStateClass.TOTAL
304 _attr_should_poll =
False
309 integration_method: str,
311 round_digits: int |
None,
313 unique_id: str |
None,
314 unit_prefix: str |
None,
315 unit_time: UnitOfTime,
316 max_sub_interval: timedelta |
None,
317 device_info: DeviceInfo |
None =
None,
319 """Initialize the integration sensor."""
323 self.
_state_state: Decimal |
None =
None
324 self.
_method_method = _IntegrationMethod.from_name(integration_method)
326 self.
_attr_name_attr_name = name
if name
is not None else f
"{source_entity} integral"
333 self._source_entity: str = source_entity
336 self._max_sub_interval: timedelta |
None = (
338 if max_sub_interval
is None or max_sub_interval.total_seconds() == 0
339 else max_sub_interval
347 """Multiply source_unit with time unit of the integral.
349 Possibly cancelling out a time unit in the denominator of the source_unit.
350 Note that this is a heuristic string manipulation method and might not
351 transform all source units in a sensible way.
354 - Speed to distance: 'km/h' and 'h' will be transformed to 'km'
355 - Power to energy: 'W' and 'h' will be transformed to 'Wh'
359 if source_unit.endswith(f
"/{unit_time}"):
360 integral_unit = source_unit[0 : (-(1 + len(unit_time)))]
362 integral_unit = f
"{source_unit}{unit_time}"
364 return f
"{self._unit_prefix_string}{integral_unit}"
368 source_device_class: SensorDeviceClass |
None,
369 unit_of_measurement: str |
None,
370 ) -> SensorDeviceClass |
None:
371 """Deduce device class if possible from source device class and target unit."""
372 if source_device_class
is None:
375 if (device_class := DEVICE_CLASS_MAP.get(source_device_class))
is None:
378 if unit_of_measurement
not in DEVICE_CLASS_UNITS.get(device_class, set()):
383 source_unit = source_state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
384 if source_unit
is not None:
396 self.
_attr_icon_attr_icon =
"mdi:chart-histogram"
400 if isinstance(self.
_state_state, Decimal):
401 self.
_state_state += area_scaled
405 "area = %s, area_scaled = %s new state = %s", area, area_scaled, self.
_state_state
410 """Handle entity which will be added."""
416 if last_sensor_data.native_value
417 else last_sensor_data.last_valid_state
424 "Restored state %s and last_valid_state %s",
429 if self._max_sub_interval
is not None:
440 state := self.
hasshass.states.get(self._source_entity)
441 )
and state.state != STATE_UNAVAILABLE:
461 self, event: Event[EventStateChangedData]
463 """Handle sensor state update when sub interval is configured."""
465 None, event.data[
"old_state"], event.data[
"new_state"]
470 self, event: Event[EventStateReportedData]
472 """Handle sensor state report when sub interval is configured."""
474 event.data[
"old_last_reported"],
None, event.data[
"new_state"]
480 old_last_reported: datetime |
None,
481 old_state: State |
None,
482 new_state: State |
None,
484 """Integrate based on state change and time.
486 Next to doing the integration based on state change this method cancels and
487 reschedules time based integration.
501 self, event: Event[EventStateChangedData]
503 """Handle sensor state change."""
505 None, event.data[
"old_state"], event.data[
"new_state"]
510 self, event: Event[EventStateReportedData]
512 """Handle sensor state report."""
514 event.data[
"old_last_reported"],
None, event.data[
"new_state"]
519 old_last_reported: datetime |
None,
520 old_state: State |
None,
521 new_state: State |
None,
523 if new_state
is None:
526 if new_state.state == STATE_UNAVAILABLE:
533 old_state_state = old_state.state
534 old_last_reported = old_state.last_reported
537 old_state_state = new_state.state
542 if old_last_reported
is None and old_state
is None:
547 states := self.
_method_method.validate_states(old_state_state, new_state.state)
553 assert old_last_reported
is not None
555 (new_state.last_reported - old_last_reported).total_seconds()
560 area = self.
_method_method.calculate_area_with_two_states(elapsed_seconds, *states)
566 self, source_state: State |
None
568 """Schedule possible integration using the source state and max_sub_interval.
570 The callback reference is stored for possible cancellation if the source state
571 reports a change before max_sub_interval has passed.
573 If the callback is executed, meaning there was no state change reported, the
574 source_state is assumed constant and integration is done using its value.
577 self._max_sub_interval
is not None
578 and source_state
is not None
583 def _integrate_on_max_sub_interval_exceeded_callback(now: datetime) ->
None:
584 """Integrate based on time and reschedule."""
589 area = self.
_method_method.calculate_area_with_one_state(
590 elapsed_seconds, source_state_dec
604 self._max_sub_interval,
605 _integrate_on_max_sub_interval_exceeded_callback,
613 """Return the state of the sensor."""
620 """Return the unit the value is expressed in."""
625 """Return the state attributes of the sensor."""
627 ATTR_SOURCE_ID: self._source_entity,
632 """Return sensor specific state data to be restored."""
642 ) -> IntegrationSensorExtraStoredData | None:
643 """Restore Utility Meter Sensor Extra Stored Data."""
644 if (restored_last_extra_data := await self.async_get_last_extra_data())
is None:
647 return IntegrationSensorExtraStoredData.from_dict(
648 restored_last_extra_data.as_dict()
None _integrate_on_state_change_with_max_sub_interval(self, Event[EventStateChangedData] event)
None _integrate_on_state_report_with_max_sub_interval(self, Event[EventStateReportedData] event)
SensorDeviceClass|None _calculate_device_class(self, SensorDeviceClass|None source_device_class, str|None unit_of_measurement)
None _integrate_on_state_report_callback(self, Event[EventStateReportedData] event)
_last_integration_trigger
Decimal|None native_value(self)
None _integrate_on_state_change(self, datetime|None old_last_reported, State|None old_state, State|None new_state)
None _derive_and_set_attributes_from_state(self, State source_state)
None _integrate_on_state_change_callback(self, Event[EventStateChangedData] event)
dict[str, str]|None extra_state_attributes(self)
IntegrationSensorExtraStoredData extra_restore_state_data(self)
_attr_suggested_display_precision
None _integrate_on_state_update_with_max_sub_interval(self, datetime|None old_last_reported, State|None old_state, State|None new_state)
_max_sub_interval_exceeded_callback
None _cancel_max_sub_interval_exceeded_callback(self)
None __init__(self, *str integration_method, str|None name, int|None round_digits, str source_entity, str|None unique_id, str|None unit_prefix, UnitOfTime unit_time, timedelta|None max_sub_interval, DeviceInfo|None device_info=None)
None _schedule_max_sub_interval_exceeded_if_state_is_numeric(self, State|None source_state)
IntegrationSensorExtraStoredData|None async_get_last_sensor_data(self)
None _update_integral(self, Decimal area)
None async_added_to_hass(self)
str|None native_unit_of_measurement(self)
str _calculate_unit(self, str source_unit)
_IntegrationMethod from_name(str method_name)
Decimal calculate_area_with_two_states(self, Decimal elapsed_time, Decimal left, Decimal right)
tuple[Decimal, Decimal]|None validate_states(self, str left, str right)
Decimal calculate_area_with_one_state(self, Decimal elapsed_time, Decimal constant_state)
Decimal calculate_area_with_two_states(self, Decimal elapsed_time, Decimal left, Decimal right)
tuple[Decimal, Decimal]|None validate_states(self, str left, str right)
tuple[Decimal, Decimal]|None validate_states(self, str left, str right)
Decimal calculate_area_with_two_states(self, Decimal elapsed_time, Decimal left, Decimal right)
tuple[Decimal, Decimal]|None validate_states(self, str left, str right)
Decimal calculate_area_with_two_states(self, Decimal elapsed_time, Decimal left, Decimal right)
SensorExtraStoredData|None async_get_last_sensor_data(self)
StateType|date|datetime|Decimal native_value(self)
str|None native_unit_of_measurement(self)
str|None unit_of_measurement(self)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
str|None unit_of_measurement(self)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
Decimal|None _decimal_state(str state)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
dr.DeviceInfo|None async_device_info_to_link_from_entity(HomeAssistant hass, str entity_id_or_uuid)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
CALLBACK_TYPE async_track_state_report_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateReportedData]], Any] action, HassJobType|None job_type=None)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)