1 """Allows the creation of a sensor that filters state property."""
3 from __future__
import annotations
5 from collections
import Counter, deque
7 from dataclasses
import dataclass
8 from datetime
import datetime, timedelta
9 from functools
import partial
11 from numbers
import Number
13 from typing
import Any, cast
15 import voluptuous
as vol
22 DOMAIN
as SENSOR_DOMAIN,
23 PLATFORM_SCHEMA
as SENSOR_PLATFORM_SCHEMA,
31 ATTR_UNIT_OF_MEASUREMENT,
40 EventStateChangedData,
54 from .
import DOMAIN, PLATFORMS
56 _LOGGER = logging.getLogger(__name__)
58 FILTER_NAME_RANGE =
"range"
59 FILTER_NAME_LOWPASS =
"lowpass"
60 FILTER_NAME_OUTLIER =
"outlier"
61 FILTER_NAME_THROTTLE =
"throttle"
62 FILTER_NAME_TIME_THROTTLE =
"time_throttle"
63 FILTER_NAME_TIME_SMA =
"time_simple_moving_average"
64 FILTERS: Registry[str, type[Filter]] =
Registry()
66 CONF_FILTERS =
"filters"
67 CONF_FILTER_NAME =
"filter"
68 CONF_FILTER_WINDOW_SIZE =
"window_size"
69 CONF_FILTER_PRECISION =
"precision"
70 CONF_FILTER_RADIUS =
"radius"
71 CONF_FILTER_TIME_CONSTANT =
"time_constant"
72 CONF_FILTER_LOWER_BOUND =
"lower_bound"
73 CONF_FILTER_UPPER_BOUND =
"upper_bound"
74 CONF_TIME_SMA_TYPE =
"type"
76 TIME_SMA_LAST =
"last"
78 WINDOW_SIZE_UNIT_NUMBER_EVENTS = 1
79 WINDOW_SIZE_UNIT_TIME = 2
81 DEFAULT_WINDOW_SIZE = 1
83 DEFAULT_FILTER_RADIUS = 2.0
84 DEFAULT_FILTER_TIME_CONSTANT = 10
86 NAME_TEMPLATE =
"{} filter"
87 ICON =
"mdi:chart-line-variant"
89 FILTER_SCHEMA = vol.Schema({vol.Optional(CONF_FILTER_PRECISION): vol.Coerce(int)})
91 FILTER_OUTLIER_SCHEMA = FILTER_SCHEMA.extend(
93 vol.Required(CONF_FILTER_NAME): FILTER_NAME_OUTLIER,
94 vol.Optional(CONF_FILTER_WINDOW_SIZE, default=DEFAULT_WINDOW_SIZE): vol.Coerce(
97 vol.Optional(CONF_FILTER_RADIUS, default=DEFAULT_FILTER_RADIUS): vol.Coerce(
103 FILTER_LOWPASS_SCHEMA = FILTER_SCHEMA.extend(
105 vol.Required(CONF_FILTER_NAME): FILTER_NAME_LOWPASS,
106 vol.Optional(CONF_FILTER_WINDOW_SIZE, default=DEFAULT_WINDOW_SIZE): vol.Coerce(
110 CONF_FILTER_TIME_CONSTANT, default=DEFAULT_FILTER_TIME_CONSTANT
115 FILTER_RANGE_SCHEMA = FILTER_SCHEMA.extend(
117 vol.Required(CONF_FILTER_NAME): FILTER_NAME_RANGE,
118 vol.Optional(CONF_FILTER_LOWER_BOUND): vol.Coerce(float),
119 vol.Optional(CONF_FILTER_UPPER_BOUND): vol.Coerce(float),
123 FILTER_TIME_SMA_SCHEMA = FILTER_SCHEMA.extend(
125 vol.Required(CONF_FILTER_NAME): FILTER_NAME_TIME_SMA,
126 vol.Optional(CONF_TIME_SMA_TYPE, default=TIME_SMA_LAST): vol.In(
129 vol.Required(CONF_FILTER_WINDOW_SIZE): vol.All(
130 cv.time_period, cv.positive_timedelta
135 FILTER_THROTTLE_SCHEMA = FILTER_SCHEMA.extend(
137 vol.Required(CONF_FILTER_NAME): FILTER_NAME_THROTTLE,
138 vol.Optional(CONF_FILTER_WINDOW_SIZE, default=DEFAULT_WINDOW_SIZE): vol.Coerce(
144 FILTER_TIME_THROTTLE_SCHEMA = FILTER_SCHEMA.extend(
146 vol.Required(CONF_FILTER_NAME): FILTER_NAME_TIME_THROTTLE,
147 vol.Required(CONF_FILTER_WINDOW_SIZE): vol.All(
148 cv.time_period, cv.positive_timedelta
153 PLATFORM_SCHEMA = SENSOR_PLATFORM_SCHEMA.extend(
155 vol.Required(CONF_ENTITY_ID): vol.Any(
156 cv.entity_domain(SENSOR_DOMAIN),
157 cv.entity_domain(BINARY_SENSOR_DOMAIN),
158 cv.entity_domain(INPUT_NUMBER_DOMAIN),
160 vol.Optional(CONF_NAME): cv.string,
161 vol.Optional(CONF_UNIQUE_ID): cv.string,
162 vol.Required(CONF_FILTERS): vol.All(
166 FILTER_OUTLIER_SCHEMA,
167 FILTER_LOWPASS_SCHEMA,
168 FILTER_TIME_SMA_SCHEMA,
169 FILTER_THROTTLE_SCHEMA,
170 FILTER_TIME_THROTTLE_SCHEMA,
182 async_add_entities: AddEntitiesCallback,
183 discovery_info: DiscoveryInfoType |
None =
None,
185 """Set up the template sensors."""
189 name: str |
None = config.get(CONF_NAME)
190 unique_id: str |
None = config.get(CONF_UNIQUE_ID)
191 entity_id: str = config[CONF_ENTITY_ID]
193 filter_configs: list[dict[str, Any]] = config[CONF_FILTERS]
195 FILTERS[_filter.pop(CONF_FILTER_NAME)](entity=entity_id, **_filter)
196 for _filter
in filter_configs
203 """Representation of a Filter Sensor."""
205 _attr_should_poll =
False
210 unique_id: str |
None,
212 filters: list[Filter],
214 """Initialize the sensor."""
219 self.
_state_state: StateType =
None
228 self, event: Event[EventStateChangedData]
230 """Handle device state changes."""
231 _LOGGER.debug(
"Update filter on event: %s", event)
236 self, new_state: State |
None, update_ha: bool =
True
238 """Process device state changes."""
239 if new_state
is None:
241 "While updating filter %s, the new_state is None", self.
namename
247 if new_state.state == STATE_UNKNOWN:
252 if new_state.state == STATE_UNAVAILABLE:
259 temp_state =
_State(new_state.last_updated, new_state.state)
263 filtered_state = filt.filter_state(copy(temp_state))
269 "skip" if filt.skip_processing
else filtered_state.state,
271 if filt.skip_processing:
273 temp_state = filtered_state
276 "Could not convert state: %s (%s) to number",
278 type(new_state.state),
282 self.
_state_state = temp_state.state
284 self.
_attr_icon_attr_icon = new_state.attributes.get(ATTR_ICON, ICON)
285 self.
_attr_device_class_attr_device_class = new_state.attributes.get(ATTR_DEVICE_CLASS)
286 self.
_attr_state_class_attr_state_class = new_state.attributes.get(ATTR_STATE_CLASS)
289 ATTR_UNIT_OF_MEASUREMENT
294 ATTR_UNIT_OF_MEASUREMENT
301 """Register callbacks."""
303 if "recorder" in self.
hasshass.config.components:
305 largest_window_items = 0
311 filt.window_unit == WINDOW_SIZE_UNIT_NUMBER_EVENTS
312 and largest_window_items < (size := cast(int, filt.window_size))
314 largest_window_items = size
316 filt.window_unit == WINDOW_SIZE_UNIT_TIME
317 and largest_window_time < (val := cast(timedelta, filt.window_size))
319 largest_window_time = val
322 if largest_window_items > 0:
325 history.get_last_state_changes,
327 largest_window_items,
331 if self.
_entity_entity
in filter_history:
332 history_list.extend(filter_history[self.
_entity_entity])
333 if largest_window_time >
timedelta(seconds=0):
334 start = dt_util.utcnow() - largest_window_time
337 history.state_changes_during_period,
343 if self.
_entity_entity
in filter_history:
347 for state
in filter_history[self.
_entity_entity]
348 if state
not in history_list
353 history_list = sorted(history_list, key=
lambda s: s.last_updated)
355 "Loading from history: %s",
356 [(s.state, s.last_updated)
for s
in history_list],
360 for state
in history_list:
361 if state.state
not in [STATE_UNKNOWN, STATE_UNAVAILABLE,
None]:
365 def _async_hass_started(hass: HomeAssistant) ->
None:
366 """Delay source entity tracking."""
377 """Return the state of the sensor."""
379 return datetime.fromisoformat(
str(self.
_state_state))
385 """State abstraction for filter usage."""
387 state: str | float | int
390 """Initialize with HA State object."""
395 self.
statestate = state.state
398 """Set precision of Number based states."""
399 if precision
is not None and isinstance(self.
statestate, Number):
400 value = round(
float(self.
statestate), precision)
401 self.
statestate =
int(value)
if precision == 0
else value
404 """Return state as the string representation of FilterState."""
408 """Return timestamp and state as the representation of FilterState."""
409 return f
"{self.timestamp} : {self.state}"
414 """Simplified State class.
416 The standard State class only accepts string in `state`,
417 and we are only interested in two properties.
420 last_updated: datetime
421 state: str | float | int
425 """Filter skeleton."""
430 window_size: int | timedelta,
432 precision: int |
None,
434 """Initialize common attributes.
436 :param window_size: size of the sliding window that holds previous values
437 :param precision: round filtered value to precision value
438 :param entity: used for debugging only
440 if isinstance(window_size, int):
441 self.
statesstates: deque[FilterState] = deque(maxlen=window_size)
445 self.
window_unitwindow_unit = WINDOW_SIZE_UNIT_TIME
456 """Return window size."""
461 """Return filter name."""
462 return self.
_name_name
466 """Return whether the current filter_state should be skipped."""
474 """Implement filter."""
475 raise NotImplementedError
478 """Implement a common interface for filters."""
480 if self.
_only_numbers_only_numbers
and not isinstance(fstate.state, Number):
481 raise ValueError(f
"State <{fstate.state}> is not a Number")
489 self.
statesstates.append(copy(filtered))
490 new_state.state = filtered.state
494 @FILTERS.register(FILTER_NAME_RANGE)
498 Determines if new state is in the range of upper_bound and lower_bound.
499 If not inside, lower or upper bound is returned instead.
506 precision: int |
None =
None,
507 lower_bound: float |
None =
None,
508 upper_bound: float |
None =
None,
510 """Initialize Filter.
512 :param upper_bound: band upper bound
513 :param lower_bound: band lower bound
516 FILTER_NAME_RANGE, DEFAULT_WINDOW_SIZE, precision=precision, entity=entity
520 self._stats_internal: Counter = Counter()
523 """Implement the range filter."""
526 new_state_value = cast(float, new_state.state)
529 self._stats_internal[
"erasures_up"] += 1
532 "Upper outlier nr. %s in %s: %s",
533 self._stats_internal[
"erasures_up"],
540 self._stats_internal[
"erasures_low"] += 1
543 "Lower outlier nr. %s in %s: %s",
544 self._stats_internal[
"erasures_low"],
553 @FILTERS.register(FILTER_NAME_OUTLIER)
555 """BASIC outlier filter.
557 Determines if new state is in a band around the median.
566 precision: int |
None =
None,
568 """Initialize Filter.
570 :param radius: band radius
573 FILTER_NAME_OUTLIER, window_size, precision=precision, entity=entity
576 self._stats_internal: Counter = Counter()
580 """Implement the outlier filter."""
583 previous_state_values = [cast(float, s.state)
for s
in self.
statesstates]
584 new_state_value = cast(float, new_state.state)
586 median = statistics.median(previous_state_values)
if self.
statesstates
else 0
589 and abs(new_state_value - median) > self.
_radius_radius
591 self._stats_internal[
"erasures"] += 1
594 "Outlier nr. %s in %s: %s",
595 self._stats_internal[
"erasures"],
599 new_state.state = median
603 @FILTERS.register(FILTER_NAME_LOWPASS)
605 """BASIC Low Pass Filter."""
613 precision: int = DEFAULT_PRECISION,
615 """Initialize Filter."""
617 FILTER_NAME_LOWPASS, window_size, precision=precision, entity=entity
622 """Implement the low pass filter."""
628 prev_weight = 1.0 - new_weight
630 prev_state_value = cast(float, self.
statesstates[-1].state)
631 new_state_value = cast(float, new_state.state)
632 new_state.state = prev_weight * prev_state_value + new_weight * new_state_value
637 @FILTERS.register(FILTER_NAME_TIME_SMA)
639 """Simple Moving Average (SMA) Filter.
641 The window_size is determined by time, and SMA is time weighted.
647 window_size: timedelta,
650 precision: int = DEFAULT_PRECISION,
652 """Initialize Filter.
654 :param type: type of algorithm used to connect discrete values
657 FILTER_NAME_TIME_SMA, window_size, precision=precision, entity=entity
660 self.
last_leaklast_leak: FilterState |
None =
None
661 self.
queuequeue = deque[FilterState]()
663 def _leak(self, left_boundary: datetime) ->
None:
664 """Remove timeouted elements."""
665 while self.
queuequeue:
666 if self.
queuequeue[0].timestamp + self.
_time_window_time_window <= left_boundary:
672 """Implement the Simple Moving Average filter."""
674 self.
_leak_leak(new_state.timestamp)
675 self.
queuequeue.append(copy(new_state))
677 moving_sum: float = 0
678 start = new_state.timestamp - self.
_time_window_time_window
680 for state
in self.
queuequeue:
682 prev_state_value = cast(float, prev_state.state)
683 moving_sum += (state.timestamp - start).total_seconds() * prev_state_value
684 start = state.timestamp
687 new_state.state = moving_sum / self.
_time_window_time_window.total_seconds()
692 @FILTERS.register(FILTER_NAME_THROTTLE)
696 One sample per window.
700 self, *, window_size: int, entity: str, precision:
None =
None
702 """Initialize Filter."""
704 FILTER_NAME_THROTTLE, window_size, precision=precision, entity=entity
709 """Implement the throttle filter."""
719 @FILTERS.register(FILTER_NAME_TIME_THROTTLE)
721 """Time Throttle Filter.
723 One sample per time period.
727 self, *, window_size: timedelta, entity: str, precision: int |
None =
None
729 """Initialize Filter."""
731 FILTER_NAME_TIME_THROTTLE, window_size, precision=precision, entity=entity
738 """Implement the filter."""
739 window_start = new_state.timestamp - self.
_time_window_time_window
None __init__(self, _State state)
None set_precision(self, int|None precision)
FilterState _filter_state(self, FilterState new_state)
bool skip_processing(self)
_State filter_state(self, _State new_state)
int|timedelta window_size(self)
None __init__(self, str name, int|timedelta window_size, str entity, int|None precision)
FilterState _filter_state(self, FilterState new_state)
None __init__(self, *int window_size, str entity, int time_constant, int precision=DEFAULT_PRECISION)
None __init__(self, *int window_size, str entity, float radius, int|None precision=None)
FilterState _filter_state(self, FilterState new_state)
None __init__(self, *str entity, int|None precision=None, float|None lower_bound=None, float|None upper_bound=None)
FilterState _filter_state(self, FilterState new_state)
None _update_filter_sensor_state_event(self, Event[EventStateChangedData] event)
datetime|StateType native_value(self)
_attr_extra_state_attributes
None __init__(self, str|None name, str|None unique_id, str entity_id, list[Filter] filters)
None _update_filter_sensor_state(self, State|None new_state, bool update_ha=True)
_attr_native_unit_of_measurement
None async_added_to_hass(self)
None __init__(self, *int window_size, str entity, None precision=None)
FilterState _filter_state(self, FilterState new_state)
None _leak(self, datetime left_boundary)
None __init__(self, *timedelta window_size, str entity, str type, int precision=DEFAULT_PRECISION)
FilterState _filter_state(self, FilterState new_state)
FilterState _filter_state(self, FilterState new_state)
None __init__(self, *timedelta window_size, str entity, int|None precision=None)
SensorDeviceClass|None device_class(self)
str|None device_class(self)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
str|UndefinedType|None name(self)
None async_setup_platform(HomeAssistant hass, ConfigType config, AddEntitiesCallback async_add_entities, DiscoveryInfoType|None discovery_info=None)
CALLBACK_TYPE async_track_state_change_event(HomeAssistant hass, str|Iterable[str] entity_ids, Callable[[Event[EventStateChangedData]], Any] action, HassJobType|None job_type=None)
Recorder get_instance(HomeAssistant hass)
None async_setup_reload_service(HomeAssistant hass, str domain, Iterable[str] platforms)
CALLBACK_TYPE async_at_started(HomeAssistant hass, Callable[[HomeAssistant], Coroutine[Any, Any, None]|None] at_start_cb)