1 """Support for MQTT sensors."""
3 from __future__
import annotations
5 from collections.abc
import Callable
6 from datetime
import datetime, timedelta
9 import voluptuous
as vol
14 DEVICE_CLASSES_SCHEMA,
19 SensorExtraStoredData,
27 CONF_UNIT_OF_MEASUREMENT,
40 from .
import subscription
41 from .config
import MQTT_RO_SCHEMA
42 from .const
import CONF_OPTIONS, CONF_STATE_TOPIC, PAYLOAD_NONE
43 from .entity
import MqttAvailabilityMixin, MqttEntity, async_setup_entity_entry_helper
44 from .models
import MqttValueTemplate, PayloadSentinel, ReceiveMessage
45 from .schemas
import MQTT_ENTITY_COMMON_SCHEMA
46 from .util
import check_state_too_long
48 _LOGGER = logging.getLogger(__name__)
52 CONF_EXPIRE_AFTER =
"expire_after"
53 CONF_LAST_RESET_VALUE_TEMPLATE =
"last_reset_value_template"
54 CONF_SUGGESTED_DISPLAY_PRECISION =
"suggested_display_precision"
56 MQTT_SENSOR_ATTRIBUTES_BLOCKED = frozenset(
58 sensor.ATTR_LAST_RESET,
59 sensor.ATTR_STATE_CLASS,
63 DEFAULT_NAME =
"MQTT Sensor"
64 DEFAULT_FORCE_UPDATE =
False
66 _PLATFORM_SCHEMA_BASE = MQTT_RO_SCHEMA.extend(
68 vol.Optional(CONF_DEVICE_CLASS): vol.Any(DEVICE_CLASSES_SCHEMA,
None),
69 vol.Optional(CONF_EXPIRE_AFTER): cv.positive_int,
70 vol.Optional(CONF_FORCE_UPDATE, default=DEFAULT_FORCE_UPDATE): cv.boolean,
71 vol.Optional(CONF_LAST_RESET_VALUE_TEMPLATE): cv.template,
72 vol.Optional(CONF_NAME): vol.Any(cv.string,
None),
73 vol.Optional(CONF_OPTIONS): cv.ensure_list,
74 vol.Optional(CONF_SUGGESTED_DISPLAY_PRECISION): cv.positive_int,
75 vol.Optional(CONF_STATE_CLASS): vol.Any(STATE_CLASSES_SCHEMA,
None),
76 vol.Optional(CONF_UNIT_OF_MEASUREMENT): vol.Any(cv.string,
None),
78 ).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
82 """Validate the sensor options, state and device class config."""
84 CONF_LAST_RESET_VALUE_TEMPLATE
in config
85 and (state_class := config.get(CONF_STATE_CLASS)) != SensorStateClass.TOTAL
88 f
"The option `{CONF_LAST_RESET_VALUE_TEMPLATE}` cannot be used "
89 f
"together with state class `{state_class}`"
94 if (options := config.get(CONF_OPTIONS))
is not None:
96 raise vol.Invalid(
"An empty options list is not allowed")
97 if config.get(CONF_STATE_CLASS)
or config.get(CONF_UNIT_OF_MEASUREMENT):
99 f
"Specifying `{CONF_OPTIONS}` is not allowed together with "
100 f
"the `{CONF_STATE_CLASS}` or `{CONF_UNIT_OF_MEASUREMENT}` option"
103 if (device_class := config.get(CONF_DEVICE_CLASS)) != SensorDeviceClass.ENUM:
105 f
"The option `{CONF_OPTIONS}` must be used "
106 f
"together with device class `{SensorDeviceClass.ENUM}`, "
107 f
"got `{CONF_DEVICE_CLASS}` '{device_class}'"
113 PLATFORM_SCHEMA_MODERN = vol.All(
114 _PLATFORM_SCHEMA_BASE,
115 validate_sensor_state_and_device_class_config,
118 DISCOVERY_SCHEMA = vol.All(
119 _PLATFORM_SCHEMA_BASE.extend({}, extra=vol.REMOVE_EXTRA),
120 validate_sensor_state_and_device_class_config,
126 config_entry: ConfigEntry,
127 async_add_entities: AddEntitiesCallback,
129 """Set up MQTT sensor through YAML and through MQTT discovery."""
137 PLATFORM_SCHEMA_MODERN,
142 """Representation of a sensor that can be updated using MQTT."""
144 _default_name = DEFAULT_NAME
145 _entity_id_format = ENTITY_ID_FORMAT
146 _attr_last_reset: datetime |
None =
None
147 _attributes_extra_blocked = MQTT_SENSOR_ATTRIBUTES_BLOCKED
148 _expiration_trigger: CALLBACK_TYPE |
None =
None
149 _expire_after: int |
None
150 _expired: bool |
None
152 Callable[[ReceivePayloadType, PayloadSentinel], ReceivePayloadType] |
None
154 _last_reset_template: Callable[[ReceivePayloadType], ReceivePayloadType] |
None = (
159 """Restore state for entities with expire_after set."""
160 last_state: State |
None
161 last_sensor_data: SensorExtraStoredData |
None
163 (_expire_after := self.
_expire_after_expire_after)
is not None
164 and _expire_after > 0
165 and (last_state := await self.async_get_last_state())
is not None
166 and last_state.state
not in [STATE_UNKNOWN, STATE_UNAVAILABLE]
173 expiration_at = last_state.last_changed +
timedelta(seconds=_expire_after)
174 remain_seconds = (expiration_at - dt_util.utcnow()).total_seconds()
176 if remain_seconds <= 0:
178 _LOGGER.debug(
"Skip state recovery after reload for %s", self.
entity_identity_id)
184 self.
hasshasshass, remain_seconds, self._value_is_expired
188 "State recovered after reload for %s, remaining time before"
196 """Remove expire triggers."""
198 _LOGGER.debug(
"Clean up expire after trigger for %s", self.
entity_identity_id)
202 await MqttEntity.async_will_remove_from_hass(self)
206 """Return the config schema."""
207 return DISCOVERY_SCHEMA
210 """(Re)Setup the entity."""
214 CONF_SUGGESTED_DISPLAY_PRECISION
226 if value_template := config.get(CONF_VALUE_TEMPLATE):
228 value_template, entity=self
229 ).async_render_with_possible_json_value
230 if last_reset_template := config.get(CONF_LAST_RESET_VALUE_TEMPLATE):
232 last_reset_template, entity=self
233 ).async_render_with_possible_json_value
236 def _update_state(self, msg: ReceiveMessage) ->
None:
253 payload =
template(msg.payload, PayloadSentinel.DEFAULT)
255 payload = msg.payload
256 if payload
is PayloadSentinel.DEFAULT:
258 if not isinstance(payload, str):
260 "Invalid undecoded state message '%s' received from '%s'",
266 if payload == PAYLOAD_NONE:
272 _LOGGER.debug(
"Ignore empty state from '%s'", msg.topic)
279 "Ignoring invalid option received on topic '%s', got '%s', allowed: %s",
282 ", ".join(self.
optionsoptions),
288 SensorDeviceClass.ENUM,
293 if (payload_datetime := dt_util.parse_datetime(payload))
is None:
296 _LOGGER.warning(
"Invalid state message '%s' from '%s'", payload, msg.topic)
305 def _update_last_reset(self, msg: ReceiveMessage) ->
None:
307 payload = msg.payload
if template
is None else template(msg.payload)
309 _LOGGER.debug(
"Ignoring empty last_reset message from '%s'", msg.topic)
312 last_reset = dt_util.parse_datetime(
str(payload))
313 if last_reset
is None:
318 "Invalid last_reset message '%s' from '%s'", msg.payload, msg.topic
322 def _state_message_received(self, msg: ReceiveMessage) ->
None:
323 """Handle new MQTT state messages."""
324 self._update_state(msg)
325 if CONF_LAST_RESET_VALUE_TEMPLATE
in self.
_config_config:
326 self._update_last_reset(msg)
330 """(Re)Subscribe to topics."""
333 self._state_message_received,
334 {
"_attr_native_value",
"_attr_last_reset",
"_expired"},
338 """(Re)Subscribe to topics."""
339 subscription.async_subscribe_topics_internal(self.
hasshasshass, self.
_sub_state_sub_state)
342 def _value_is_expired(self, *_: datetime) ->
None:
343 """Triggered when value is expired."""
350 """Return true if the device is available and value has not expired."""
352 return MqttAvailabilityMixin.available.fget(self)
and (
None _setup_from_config(self, ConfigType config)
None _prepare_subscribe_topics(self)
VolSchemaType config_schema()
bool add_subscription(self, str state_topic_config_key, Callable[[ReceiveMessage], None] msg_callback, set[str]|None tracked_attributes, bool disable_encoding=False)
None _subscribe_topics(self)
None async_will_remove_from_hass(self)
_attr_native_unit_of_measurement
_attr_suggested_display_precision
None mqtt_async_added_to_hass(self)
SensorExtraStoredData|None async_get_last_sensor_data(self)
SensorDeviceClass|None device_class(self)
bool _numeric_state_expected(self)
list[str]|None options(self)
str|None device_class(self)
None async_write_ha_state(self)
None async_setup_entity_entry_helper(HomeAssistant hass, ConfigEntry entry, type[MqttEntity]|None entity_class, str domain, AddEntitiesCallback async_add_entities, VolSchemaType discovery_schema, VolSchemaType platform_schema_modern, dict[str, type[MqttEntity]]|None schema_class_mapping=None)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
ConfigType validate_sensor_state_and_device_class_config(ConfigType config)
bool check_state_too_long(logging.Logger logger, str proposed_state, str entity_id, ReceiveMessage msg)
bool template(HomeAssistant hass, Template value_template, TemplateVarsType variables=None)
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)