1 """Statistics helper for sensor."""
3 from __future__
import annotations
5 from collections
import defaultdict
6 from collections.abc
import Callable, Iterable
7 from contextlib
import suppress
12 from typing
import Any
14 from sqlalchemy.orm.session
import Session
17 DOMAIN
as RECORDER_DOMAIN,
28 ATTR_UNIT_OF_MEASUREMENT,
29 REVOLUTIONS_PER_MINUTE,
53 _LOGGER = logging.getLogger(__name__)
55 DEFAULT_STATISTICS = {
56 SensorStateClass.MEASUREMENT: {
"mean",
"min",
"max"},
57 SensorStateClass.TOTAL: {
"sum"},
58 SensorStateClass.TOTAL_INCREASING: {
"sum"},
62 "BTU/(h×ft²)": UnitOfIrradiance.BTUS_PER_HOUR_SQUARE_FOOT,
63 "dBa": UnitOfSoundPressure.WEIGHTED_DECIBEL_A,
64 "RPM": REVOLUTIONS_PER_MINUTE,
65 "ft3": UnitOfVolume.CUBIC_FEET,
66 "m3": UnitOfVolume.CUBIC_METERS,
67 "ft³/m": UnitOfVolumeFlowRate.CUBIC_FEET_PER_MINUTE,
72 SEEN_DIP: HassKey[set[str]] =
HassKey(f
"{DOMAIN}_seen_total_increasing_dip")
73 WARN_DIP: HassKey[set[str]] =
HassKey(f
"{DOMAIN}_warn_total_increasing_dip")
75 WARN_NEGATIVE: HassKey[set[str]] =
HassKey(f
"{DOMAIN}_warn_total_increasing_negative")
77 WARN_UNSUPPORTED_UNIT: HassKey[set[str]] =
HassKey(f
"{DOMAIN}_warn_unsupported_unit")
78 WARN_UNSTABLE_UNIT: HassKey[set[str]] =
HassKey(f
"{DOMAIN}_warn_unstable_unit")
80 LINK_DEV_STATISTICS =
"https://my.home-assistant.io/redirect/developer_statistics"
84 """Get the current state of all sensors for which to compile statistics."""
89 entity_filter = instance.entity_filter
92 for state
in hass.states.all(DOMAIN)
93 if (state_class := state.attributes.get(ATTR_STATE_CLASS))
95 type(state_class)
is SensorStateClass
96 or try_parse_enum(SensorStateClass, state_class)
98 and (
not entity_filter
or entity_filter(state.entity_id))
103 fstates: list[tuple[float, State]], start: datetime.datetime, end: datetime.datetime
105 """Calculate a time weighted average.
107 The average is calculated by weighting the states by duration in seconds between
109 Note: there's no interpolation of values between state changes.
111 old_fstate: float |
None =
None
112 old_start_time: datetime.datetime |
None =
None
115 for fstate, state
in fstates:
118 start_time =
max(state.last_updated, start)
119 if old_start_time
is None:
123 duration = start_time - old_start_time
125 assert old_fstate
is not None
126 accumulated += old_fstate * duration.total_seconds()
129 old_start_time = start_time
131 if old_fstate
is not None:
133 assert old_start_time
is not None
134 duration = end - old_start_time
135 accumulated += old_fstate * duration.total_seconds()
137 period_seconds = (end - start).total_seconds()
138 if period_seconds == 0:
146 return accumulated / period_seconds
149 def _get_units(fstates: list[tuple[float, State]]) -> set[str |
None]:
150 """Return a set of all units."""
151 return {item[1].attributes.get(ATTR_UNIT_OF_MEASUREMENT)
for item
in fstates}
155 """Return True if the units are equivalent."""
159 EQUIVALENT_UNITS[unit]
if unit
in EQUIVALENT_UNITS
else unit
162 return len(units) == 1
166 entity_history: Iterable[State],
167 ) -> list[tuple[float, State]]:
168 """Return a list of (float, state) tuples for the given entity."""
169 float_states: list[tuple[float, State]] = []
170 append = float_states.append
171 isfinite = math.isfinite
172 for state
in entity_history:
174 if (float_state :=
float(state.state))
is not None and isfinite(
177 append((float_state, state))
178 except (ValueError, TypeError):
184 """Return if the state is numeric."""
185 with suppress(ValueError, TypeError):
186 if (num_state :=
float(state.state))
is not None and math.isfinite(num_state):
193 old_metadatas: dict[str, tuple[int, StatisticMetaData]],
194 fstates: list[tuple[float, State]],
196 ) -> tuple[str |
None, list[tuple[float, State]]]:
197 """Normalize units."""
198 state_unit: str |
None =
None
199 statistics_unit: str |
None
200 state_unit = fstates[0][1].attributes.get(ATTR_UNIT_OF_MEASUREMENT)
201 old_metadata = old_metadatas[entity_id][1]
if entity_id
in old_metadatas
else None
205 statistics_unit = state_unit
208 statistics_unit = old_metadata[
"unit_of_measurement"]
210 if statistics_unit
not in statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER:
215 if WARN_UNSTABLE_UNIT
not in hass.data:
216 hass.data[WARN_UNSTABLE_UNIT] = set()
217 if entity_id
not in hass.data[WARN_UNSTABLE_UNIT]:
218 hass.data[WARN_UNSTABLE_UNIT].
add(entity_id)
222 " and matches the unit of already compiled statistics "
223 f
"({old_metadata['unit_of_measurement']})"
227 "The unit of %s is changing, got multiple %s, generation of"
228 " long term statistics will be suppressed unless the unit is"
229 " stable%s. Go to %s to fix this"
238 return state_unit, fstates
240 converter = statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER[statistics_unit]
241 valid_fstates: list[tuple[float, State]] = []
242 convert: Callable[[float], float] |
None =
None
243 last_unit: str |
None | UndefinedType = UNDEFINED
244 valid_units = converter.VALID_UNITS
246 for fstate, state
in fstates:
247 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
249 if state_unit
not in valid_units:
250 if WARN_UNSUPPORTED_UNIT
not in hass.data:
251 hass.data[WARN_UNSUPPORTED_UNIT] = set()
252 if entity_id
not in hass.data[WARN_UNSUPPORTED_UNIT]:
253 hass.data[WARN_UNSUPPORTED_UNIT].
add(entity_id)
256 "The unit of %s (%s) cannot be converted to the unit of"
257 " previously compiled statistics (%s). Generation of long term"
258 " statistics will be suppressed unless the unit changes back to"
259 " %s or a compatible unit. Go to %s to fix this"
269 if state_unit != last_unit:
272 if state_unit == statistics_unit:
275 convert = converter.converter_factory(state_unit, statistics_unit)
276 last_unit = state_unit
278 if convert
is not None:
279 fstate = convert(fstate)
281 valid_fstates.append((fstate, state))
283 return statistics_unit, valid_fstates
287 """Suggest to report an issue."""
291 hass, integration_domain=entity_info[
"domain"]
if entity_info
else None
296 hass: HomeAssistant, entity_id: str, state: State, previous_fstate: float
298 """Log a warning once if a sensor with state_class_total has a decreasing value.
300 The log will be suppressed until two dips have been seen to prevent warning due to
301 rounding issues with databases storing the state as a single precision float, which
302 was fixed in recorder DB version 20.
304 if SEEN_DIP
not in hass.data:
305 hass.data[SEEN_DIP] = set()
306 if entity_id
not in hass.data[SEEN_DIP]:
307 hass.data[SEEN_DIP].
add(entity_id)
309 if WARN_DIP
not in hass.data:
310 hass.data[WARN_DIP] = set()
311 if entity_id
not in hass.data[WARN_DIP]:
312 hass.data[WARN_DIP].
add(entity_id)
314 domain = entity_info[
"domain"]
if entity_info
else None
315 if domain
in [
"energy",
"growatt_server",
"solaredge"]:
319 "Entity %s %shas state class total_increasing, but its state is not"
320 " strictly increasing. Triggered by state %s (%s) with last_updated set"
324 f
"from integration {domain} " if domain
else "",
327 state.last_updated.isoformat(),
332 def warn_negative(hass: HomeAssistant, entity_id: str, state: State) ->
None:
333 """Log a warning once if a sensor with state_class_total has a negative value."""
334 if WARN_NEGATIVE
not in hass.data:
335 hass.data[WARN_NEGATIVE] = set()
336 if entity_id
not in hass.data[WARN_NEGATIVE]:
337 hass.data[WARN_NEGATIVE].
add(entity_id)
339 domain = entity_info[
"domain"]
if entity_info
else None
342 "Entity %s %shas state class total_increasing, but its state is "
343 "negative. Triggered by state %s with last_updated set to %s. Please %s"
346 f
"from integration {domain} " if domain
else "",
348 state.last_updated.isoformat(),
357 previous_fstate: float |
None,
360 """Test if a total_increasing sensor has been reset."""
361 if previous_fstate
is None:
364 if 0.9 * previous_fstate <= fstate < previous_fstate:
365 warn_dip(hass, entity_id, state, previous_fstate)
369 raise HomeAssistantError
371 return fstate < 0.9 * previous_fstate
375 """Prepare a dict with wanted statistics for entities."""
377 state.entity_id: DEFAULT_STATISTICS[state.attributes[ATTR_STATE_CLASS]]
378 for state
in sensor_states
383 """Parse last_reset and convert it to UTC."""
384 if last_reset_s
is None:
386 if isinstance(last_reset_s, str):
387 last_reset = dt_util.parse_datetime(last_reset_s)
390 if last_reset
is None:
392 "Ignoring invalid last reset '%s' for %s", last_reset_s, entity_id
395 return dt_util.as_utc(last_reset).isoformat()
399 """Convert a timestamp to ISO format or return None."""
400 if timestamp
is None:
402 return dt_util.utc_from_timestamp(timestamp).isoformat()
408 start: datetime.datetime,
409 end: datetime.datetime,
410 ) -> statistics.PlatformCompiledStatistics:
411 """Compile statistics for all entities during start-end."""
412 result: list[StatisticResult] = []
417 entities_full_history = [
418 i.entity_id
for i
in sensor_states
if "sum" in wanted_statistics[i.entity_id]
420 history_list: dict[str, list[State]] = {}
421 if entities_full_history:
422 history_list = history.get_full_significant_states_with_session(
425 start - datetime.timedelta.resolution,
427 entity_ids=entities_full_history,
428 significant_changes_only=
False,
430 entities_significant_history = [
432 for i
in sensor_states
433 if "sum" not in wanted_statistics[i.entity_id]
435 if entities_significant_history:
436 _history_list = history.get_full_significant_states_with_session(
439 start - datetime.timedelta.resolution,
441 entity_ids=entities_significant_history,
443 history_list = {**history_list, **_history_list}
445 entities_with_float_states: dict[str, list[tuple[float, State]]] = {}
446 for _state
in sensor_states:
447 entity_id = _state.entity_id
450 if not (entity_history := history_list.get(entity_id, [_state])):
454 entities_with_float_states[entity_id] = float_states
460 old_metadatas = statistics.get_metadata_with_session(
461 get_instance(hass), session, statistic_ids=set(entities_with_float_states)
463 to_process: list[tuple[str, str |
None, str, list[tuple[float, State]]]] = []
464 to_query: set[str] = set()
465 for _state
in sensor_states:
466 entity_id = _state.entity_id
467 if not (maybe_float_states := entities_with_float_states.get(entity_id)):
475 if not valid_float_states:
477 state_class: str = _state.attributes[ATTR_STATE_CLASS]
478 to_process.append((entity_id, statistics_unit, state_class, valid_float_states))
479 if "sum" in wanted_statistics[entity_id]:
480 to_query.add(entity_id)
482 last_stats = statistics.get_latest_short_term_statistics_with_session(
483 hass, session, to_query, {
"last_reset",
"state",
"sum"}, metadata=old_metadatas
492 if old_metadata := old_metadatas.get(entity_id):
494 {old_metadata[1][
"unit_of_measurement"], statistics_unit}
496 if WARN_UNSTABLE_UNIT
not in hass.data:
497 hass.data[WARN_UNSTABLE_UNIT] = set()
498 if entity_id
not in hass.data[WARN_UNSTABLE_UNIT]:
499 hass.data[WARN_UNSTABLE_UNIT].
add(entity_id)
502 "The unit of %s (%s) cannot be converted to the unit of"
503 " previously compiled statistics (%s). Generation of long"
504 " term statistics will be suppressed unless the unit"
505 " changes back to %s or a compatible unit. Go to %s to fix"
510 old_metadata[1][
"unit_of_measurement"],
511 old_metadata[1][
"unit_of_measurement"],
517 meta: StatisticMetaData = {
518 "has_mean":
"mean" in wanted_statistics[entity_id],
519 "has_sum":
"sum" in wanted_statistics[entity_id],
521 "source": RECORDER_DOMAIN,
522 "statistic_id": entity_id,
523 "unit_of_measurement": statistics_unit,
527 stat: StatisticData = {
"start": start}
528 if "max" in wanted_statistics[entity_id]:
530 *itertools.islice(zip(*valid_float_states, strict=
False), 1)
532 if "min" in wanted_statistics[entity_id]:
534 *itertools.islice(zip(*valid_float_states, strict=
False), 1)
537 if "mean" in wanted_statistics[entity_id]:
540 if "sum" in wanted_statistics[entity_id]:
541 last_reset = old_last_reset =
None
542 new_state = old_state =
None
544 if entity_id
in last_stats:
547 last_stat = last_stats[entity_id][0]
549 old_last_reset = last_reset
553 new_state = old_state = last_stat.get(
"state")
554 _sum = last_stat.get(
"sum")
or 0.0
556 for fstate, state
in valid_float_states:
559 state_class != SensorStateClass.TOTAL_INCREASING
562 state.attributes.get(
"last_reset"), entity_id
566 and last_reset
is not None
568 if old_state
is None:
571 "Compiling initial sum statistics for %s, zero point"
580 "Detected new cycle for %s, last_reset set to %s (old"
588 elif old_state
is None and last_reset
is None:
591 "Compiling initial sum statistics for %s, zero point set to %s",
595 elif state_class == SensorStateClass.TOTAL_INCREASING:
598 hass, entity_id, fstate, new_state, state
603 "Detected new cycle for %s, value dropped from %s"
604 " to %s, triggered by state with last_updated set"
610 state.last_updated.isoformat(),
612 except HomeAssistantError:
617 if old_state
is not None and new_state
is not None:
618 _sum += new_state - old_state
621 old_last_reset = last_reset
623 if old_state
is not None:
626 old_state = new_state
630 if new_state
is None or old_state
is None:
635 _sum += new_state - old_state
636 if last_reset
is not None:
637 stat[
"last_reset"] = dt_util.parse_datetime(last_reset)
639 stat[
"state"] = new_state
641 result.append({
"meta": meta,
"stat": stat})
643 return statistics.PlatformCompiledStatistics(result, old_metadatas)
648 statistic_ids: list[str] | tuple[str] |
None =
None,
649 statistic_type: str |
None =
None,
651 """Return all or filtered statistic_ids and meta data."""
654 result: dict[str, StatisticMetaData] = {}
656 for state
in entities:
657 entity_id = state.entity_id
658 if statistic_ids
is not None and entity_id
not in statistic_ids:
661 attributes = state.attributes
662 state_class = attributes[ATTR_STATE_CLASS]
663 provided_statistics = DEFAULT_STATISTICS[state_class]
664 if statistic_type
is not None and statistic_type
not in provided_statistics:
668 (has_sum :=
"sum" in provided_statistics)
669 and ATTR_LAST_RESET
not in attributes
670 and state_class == SensorStateClass.MEASUREMENT
674 result[entity_id] = {
675 "has_mean":
"mean" in provided_statistics,
678 "source": RECORDER_DOMAIN,
679 "statistic_id": entity_id,
680 "unit_of_measurement": attributes.get(ATTR_UNIT_OF_MEASUREMENT),
688 report_issue: Callable[[str, str, dict[str, Any]],
None],
689 sensor_states: list[State],
690 metadatas: dict[str, tuple[int, StatisticMetaData]],
692 """Update repair issues."""
693 for state
in sensor_states:
694 entity_id = state.entity_id
696 state_class = try_parse_enum(
697 SensorStateClass, state.attributes.get(ATTR_STATE_CLASS)
699 state_unit = state.attributes.get(ATTR_UNIT_OF_MEASUREMENT)
701 if metadata := metadatas.get(entity_id):
702 if numeric
and state_class
is None:
705 "state_class_removed",
707 {
"statistic_id": entity_id},
710 metadata_unit = metadata[1][
"unit_of_measurement"]
711 converter = statistics.STATISTIC_UNIT_TO_UNIT_CONVERTER.get(metadata_unit)
719 "statistic_id": entity_id,
720 "state_unit": state_unit,
721 "metadata_unit": metadata_unit,
722 "supported_unit": metadata_unit,
725 elif numeric
and state_unit
not in converter.VALID_UNITS:
727 valid_units = (unit
or "<None>" for unit
in converter.VALID_UNITS)
728 valid_units_str =
", ".join(sorted(valid_units))
733 "statistic_id": entity_id,
734 "state_unit": state_unit,
735 "metadata_unit": metadata_unit,
736 "supported_unit": valid_units_str,
745 """Validate statistics."""
747 sensor_states = hass.states.all(DOMAIN)
748 metadatas = statistics.get_metadata_with_session(
749 instance, session, statistic_source=RECORDER_DOMAIN
753 def get_sensor_statistics_issues(hass: HomeAssistant) -> set[str]:
754 """Return a list of statistics issues."""
756 issue_registry = ir.async_get(hass)
757 for issue
in issue_registry.issues.values():
759 issue.domain != DOMAIN
760 or not (issue_data := issue.data)
761 or issue_data.get(
"issue_type")
762 not in (
"state_class_removed",
"units_changed")
765 issues.add(issue.issue_id)
768 issues = run_callback_threadsafe(
769 hass.loop, get_sensor_statistics_issues, hass
772 def create_issue_registry_issue(
773 issue_type: str, statistic_id: str, data: dict[str, Any]
775 """Create an issue registry issue."""
776 issue_id = f
"{issue_type}_{statistic_id}"
777 issues.discard(issue_id)
782 data=data | {
"issue_type": issue_type},
784 severity=ir.IssueSeverity.WARNING,
785 translation_key=issue_type,
786 translation_placeholders=data,
790 create_issue_registry_issue,
794 for issue_id
in issues:
795 hass.loop.call_soon_threadsafe(ir.async_delete_issue, hass, DOMAIN, issue_id)
800 ) -> dict[str, list[statistics.ValidationIssue]]:
801 """Validate statistics."""
802 validation_result = defaultdict(list)
804 sensor_states = hass.states.all(DOMAIN)
805 metadatas = statistics.get_metadata(hass, statistic_source=RECORDER_DOMAIN)
806 sensor_entity_ids = {i.entity_id
for i
in sensor_states}
807 sensor_statistic_ids = set(metadatas)
809 entity_filter = instance.entity_filter
811 def create_statistic_validation_issue(
812 issue_type: str, statistic_id: str, data: dict[str, Any]
814 """Create a statistic validation issue."""
815 validation_result[statistic_id].append(
816 statistics.ValidationIssue(issue_type, data)
820 create_statistic_validation_issue,
825 for state
in sensor_states:
826 entity_id = state.entity_id
827 state_class = try_parse_enum(
828 SensorStateClass, state.attributes.get(ATTR_STATE_CLASS)
831 if entity_id
in metadatas:
832 if entity_filter
and not entity_filter(state.entity_id):
834 validation_result[entity_id].append(
835 statistics.ValidationIssue(
836 "entity_no_longer_recorded",
837 {
"statistic_id": entity_id},
840 elif state_class
is not None:
841 if entity_filter
and not entity_filter(state.entity_id):
843 validation_result[entity_id].append(
844 statistics.ValidationIssue(
845 "entity_not_recorded",
846 {
"statistic_id": entity_id},
850 for statistic_id
in sensor_statistic_ids - sensor_entity_ids:
854 validation_result[statistic_id].append(
855 statistics.ValidationIssue(
858 "statistic_id": statistic_id,
863 return validation_result
bool add(self, _T matcher)
web.Response get(self, web.Request request, str config_key)
statistics.PlatformCompiledStatistics compile_statistics(HomeAssistant hass, Session session, datetime.datetime start, datetime.datetime end)
list[State] _get_sensor_states(HomeAssistant hass)
str _suggest_report_issue(HomeAssistant hass, str entity_id)
bool _equivalent_units(set[str|None] units)
float _time_weighted_average(list[tuple[float, State]] fstates, datetime.datetime start, datetime.datetime end)
dict list_statistic_ids(HomeAssistant hass, list[str]|tuple[str]|None statistic_ids=None, str|None statistic_type=None)
None warn_dip(HomeAssistant hass, str entity_id, State state, float previous_fstate)
bool reset_detected(HomeAssistant hass, str entity_id, float fstate, float|None previous_fstate, State state)
dict[str, set[str]] _wanted_statistics(list[State] sensor_states)
None _update_issues(Callable[[str, str, dict[str, Any]], None] report_issue, list[State] sensor_states, dict[str, tuple[int, StatisticMetaData]] metadatas)
None warn_negative(HomeAssistant hass, str entity_id, State state)
dict[str, list[statistics.ValidationIssue]] validate_statistics(HomeAssistant hass)
None update_statistics_issues(HomeAssistant hass, Session session)
set[str|None] _get_units(list[tuple[float, State]] fstates)
bool _is_numeric(State state)
str|None _timestamp_to_isoformat_or_none(float|None timestamp)
str|None _last_reset_as_utc_isoformat(Any last_reset_s, str entity_id)
list[tuple[float, State]] _entity_history_to_float_and_state(Iterable[State] entity_history)
tuple[str|None, list[tuple[float, State]]] _normalize_states(HomeAssistant hass, dict[str, tuple[int, StatisticMetaData]] old_metadatas, list[tuple[float, State]] fstates, str entity_id)
tuple[str, str] split_entity_id(str entity_id)
dict[str, EntityInfo] entity_sources(HomeAssistant hass)
Recorder get_instance(HomeAssistant hass)
str async_suggest_report_issue(HomeAssistant|None hass, *Integration|None integration=None, str|None integration_domain=None, str|None module=None)