Home Assistant Unofficial Reference 2024.12.1
valve.py
Go to the documentation of this file.
1 """Support for MQTT valve devices."""
2 
3 from __future__ import annotations
4 
5 from contextlib import suppress
6 import logging
7 from typing import Any
8 
9 import voluptuous as vol
10 
11 from homeassistant.components import valve
13  DEVICE_CLASSES_SCHEMA,
14  ValveEntity,
15  ValveEntityFeature,
16  ValveState,
17 )
18 from homeassistant.config_entries import ConfigEntry
19 from homeassistant.const import (
20  CONF_DEVICE_CLASS,
21  CONF_NAME,
22  CONF_OPTIMISTIC,
23  CONF_VALUE_TEMPLATE,
24 )
25 from homeassistant.core import HomeAssistant, callback
27 from homeassistant.helpers.entity_platform import AddEntitiesCallback
28 from homeassistant.helpers.typing import ConfigType, VolSchemaType
29 from homeassistant.util.json import JSON_DECODE_EXCEPTIONS, json_loads
31  percentage_to_ranged_value,
32  ranged_value_to_percentage,
33 )
34 
35 from . import subscription
36 from .config import MQTT_BASE_SCHEMA
37 from .const import (
38  CONF_COMMAND_TEMPLATE,
39  CONF_COMMAND_TOPIC,
40  CONF_PAYLOAD_CLOSE,
41  CONF_PAYLOAD_OPEN,
42  CONF_PAYLOAD_STOP,
43  CONF_POSITION_CLOSED,
44  CONF_POSITION_OPEN,
45  CONF_RETAIN,
46  CONF_STATE_CLOSED,
47  CONF_STATE_CLOSING,
48  CONF_STATE_OPEN,
49  CONF_STATE_OPENING,
50  CONF_STATE_TOPIC,
51  DEFAULT_OPTIMISTIC,
52  DEFAULT_PAYLOAD_CLOSE,
53  DEFAULT_PAYLOAD_OPEN,
54  DEFAULT_POSITION_CLOSED,
55  DEFAULT_POSITION_OPEN,
56  DEFAULT_RETAIN,
57  PAYLOAD_NONE,
58 )
59 from .entity import MqttEntity, async_setup_entity_entry_helper
60 from .models import MqttCommandTemplate, MqttValueTemplate, ReceiveMessage
61 from .schemas import MQTT_ENTITY_COMMON_SCHEMA
62 from .util import valid_publish_topic, valid_subscribe_topic
63 
64 _LOGGER = logging.getLogger(__name__)
65 
66 PARALLEL_UPDATES = 0
67 
68 CONF_REPORTS_POSITION = "reports_position"
69 
70 DEFAULT_NAME = "MQTT Valve"
71 
72 MQTT_VALVE_ATTRIBUTES_BLOCKED = frozenset(
73  {
74  valve.ATTR_CURRENT_POSITION,
75  }
76 )
77 
78 NO_POSITION_KEYS = (
79  CONF_PAYLOAD_CLOSE,
80  CONF_PAYLOAD_OPEN,
81  CONF_STATE_CLOSED,
82  CONF_STATE_OPEN,
83 )
84 
85 DEFAULTS = {
86  CONF_PAYLOAD_CLOSE: DEFAULT_PAYLOAD_CLOSE,
87  CONF_PAYLOAD_OPEN: DEFAULT_PAYLOAD_OPEN,
88  CONF_STATE_OPEN: ValveState.OPEN,
89  CONF_STATE_CLOSED: ValveState.CLOSED,
90 }
91 
92 RESET_CLOSING_OPENING = "reset_opening_closing"
93 
94 
95 def _validate_and_add_defaults(config: ConfigType) -> ConfigType:
96  """Validate config options and set defaults."""
97  if config[CONF_REPORTS_POSITION] and any(key in config for key in NO_POSITION_KEYS):
98  raise vol.Invalid(
99  "Options `payload_open`, `payload_close`, `state_open` and "
100  "`state_closed` are not allowed if the valve reports a position."
101  )
102  return {**DEFAULTS, **config}
103 
104 
105 _PLATFORM_SCHEMA_BASE = MQTT_BASE_SCHEMA.extend(
106  {
107  vol.Optional(CONF_COMMAND_TOPIC): valid_publish_topic,
108  vol.Optional(CONF_COMMAND_TEMPLATE): cv.template,
109  vol.Optional(CONF_DEVICE_CLASS): vol.Any(DEVICE_CLASSES_SCHEMA, None),
110  vol.Optional(CONF_NAME): vol.Any(cv.string, None),
111  vol.Optional(CONF_OPTIMISTIC, default=DEFAULT_OPTIMISTIC): cv.boolean,
112  vol.Optional(CONF_PAYLOAD_CLOSE): vol.Any(cv.string, None),
113  vol.Optional(CONF_PAYLOAD_OPEN): vol.Any(cv.string, None),
114  vol.Optional(CONF_PAYLOAD_STOP): vol.Any(cv.string, None),
115  vol.Optional(CONF_POSITION_CLOSED, default=DEFAULT_POSITION_CLOSED): int,
116  vol.Optional(CONF_POSITION_OPEN, default=DEFAULT_POSITION_OPEN): int,
117  vol.Optional(CONF_REPORTS_POSITION, default=False): cv.boolean,
118  vol.Optional(CONF_RETAIN, default=DEFAULT_RETAIN): cv.boolean,
119  vol.Optional(CONF_STATE_CLOSED): cv.string,
120  vol.Optional(CONF_STATE_CLOSING, default=ValveState.CLOSING): cv.string,
121  vol.Optional(CONF_STATE_OPEN): cv.string,
122  vol.Optional(CONF_STATE_OPENING, default=ValveState.OPENING): cv.string,
123  vol.Optional(CONF_STATE_TOPIC): valid_subscribe_topic,
124  vol.Optional(CONF_VALUE_TEMPLATE): cv.template,
125  }
126 ).extend(MQTT_ENTITY_COMMON_SCHEMA.schema)
127 
128 PLATFORM_SCHEMA_MODERN = vol.All(_PLATFORM_SCHEMA_BASE, _validate_and_add_defaults)
129 
130 DISCOVERY_SCHEMA = vol.All(
131  _PLATFORM_SCHEMA_BASE.extend({}, extra=vol.REMOVE_EXTRA),
132  _validate_and_add_defaults,
133 )
134 
135 
137  hass: HomeAssistant,
138  config_entry: ConfigEntry,
139  async_add_entities: AddEntitiesCallback,
140 ) -> None:
141  """Set up MQTT valve through YAML and through MQTT discovery."""
143  hass,
144  config_entry,
145  MqttValve,
146  valve.DOMAIN,
147  async_add_entities,
148  DISCOVERY_SCHEMA,
149  PLATFORM_SCHEMA_MODERN,
150  )
151 
152 
154  """Representation of a valve that can be controlled using MQTT."""
155 
156  _attr_is_closed: bool | None = None
157  _attributes_extra_blocked: frozenset[str] = MQTT_VALVE_ATTRIBUTES_BLOCKED
158  _default_name = DEFAULT_NAME
159  _entity_id_format: str = valve.ENTITY_ID_FORMAT
160  _optimistic: bool
161  _range: tuple[int, int]
162  _tilt_optimistic: bool
163 
164  @staticmethod
165  def config_schema() -> VolSchemaType:
166  """Return the config schema."""
167  return DISCOVERY_SCHEMA
168 
169  def _setup_from_config(self, config: ConfigType) -> None:
170  """Set up valve from config."""
171  self._attr_reports_position_attr_reports_position = config[CONF_REPORTS_POSITION]
172  self._range_range = (
173  self._config_config[CONF_POSITION_CLOSED] + 1,
174  self._config_config[CONF_POSITION_OPEN],
175  )
176  no_state_topic = config.get(CONF_STATE_TOPIC) is None
177  self._optimistic_optimistic = config[CONF_OPTIMISTIC] or no_state_topic
178  self._attr_assumed_state_attr_assumed_state = self._optimistic_optimistic
179 
180  template_config_attributes = {
181  "position_open": config[CONF_POSITION_OPEN],
182  "position_closed": config[CONF_POSITION_CLOSED],
183  }
184 
185  self._value_template_value_template = MqttValueTemplate(
186  config.get(CONF_VALUE_TEMPLATE), entity=self
187  ).async_render_with_possible_json_value
188 
189  self._command_template_command_template = MqttCommandTemplate(
190  config.get(CONF_COMMAND_TEMPLATE), entity=self
191  ).async_render
192 
193  self._value_template_value_template = MqttValueTemplate(
194  config.get(CONF_VALUE_TEMPLATE),
195  entity=self,
196  config_attributes=template_config_attributes,
197  ).async_render_with_possible_json_value
198 
199  self._attr_device_class_attr_device_class = config.get(CONF_DEVICE_CLASS)
200 
201  supported_features = ValveEntityFeature(0)
202  if CONF_COMMAND_TOPIC in config:
203  if config[CONF_PAYLOAD_OPEN] is not None:
204  supported_features |= ValveEntityFeature.OPEN
205  if config[CONF_PAYLOAD_CLOSE] is not None:
206  supported_features |= ValveEntityFeature.CLOSE
207 
208  if config[CONF_REPORTS_POSITION]:
209  supported_features |= ValveEntityFeature.SET_POSITION
210  if config.get(CONF_PAYLOAD_STOP) is not None:
211  supported_features |= ValveEntityFeature.STOP
212 
213  self._attr_supported_features_attr_supported_features = supported_features
214 
215  @callback
216  def _update_state(self, state: str | None) -> None:
217  """Update the valve state properties."""
218  self._attr_is_opening_attr_is_opening = state == ValveState.OPENING
219  self._attr_is_closing_attr_is_closing = state == ValveState.CLOSING
220  if self.reports_positionreports_position:
221  return
222  if state is None:
223  self._attr_is_closed_attr_is_closed = None
224  else:
225  self._attr_is_closed_attr_is_closed = state == ValveState.CLOSED
226 
227  @callback
229  self, msg: ReceiveMessage, state_payload: str
230  ) -> None:
231  """Process an update for a valve that does not report the position."""
232  state: str | None = None
233  if state_payload == self._config_config[CONF_STATE_OPENING]:
234  state = ValveState.OPENING
235  elif state_payload == self._config_config[CONF_STATE_CLOSING]:
236  state = ValveState.CLOSING
237  elif state_payload == self._config_config[CONF_STATE_OPEN]:
238  state = ValveState.OPEN
239  elif state_payload == self._config_config[CONF_STATE_CLOSED]:
240  state = ValveState.CLOSED
241  elif state_payload == PAYLOAD_NONE:
242  state = None
243  else:
244  _LOGGER.warning(
245  "Payload received on topic '%s' is not one of "
246  "[open, closed, opening, closing], got: %s",
247  msg.topic,
248  state_payload,
249  )
250  return
251  self._update_state_update_state(state)
252 
253  @callback
255  self, msg: ReceiveMessage, position_payload: str, state_payload: str
256  ) -> None:
257  """Process an update for a valve that reports the position."""
258  state: str | None = None
259  position_set: bool = False
260  if state_payload == self._config_config[CONF_STATE_OPENING]:
261  state = ValveState.OPENING
262  elif state_payload == self._config_config[CONF_STATE_CLOSING]:
263  state = ValveState.CLOSING
264  elif state_payload == PAYLOAD_NONE:
265  self._attr_current_valve_position_attr_current_valve_position = None
266  return
267  if state is None or position_payload != state_payload:
268  try:
269  percentage_payload = ranged_value_to_percentage(
270  self._range_range, float(position_payload)
271  )
272  except ValueError:
273  _LOGGER.warning(
274  "Ignoring non numeric payload '%s' received on topic '%s'",
275  position_payload,
276  msg.topic,
277  )
278  else:
279  percentage_payload = min(max(percentage_payload, 0), 100)
280  self._attr_current_valve_position_attr_current_valve_position = percentage_payload
281  # Reset closing and opening if the valve is fully opened or fully closed
282  if state is None and percentage_payload in (0, 100):
283  state = RESET_CLOSING_OPENING
284  position_set = True
285  if state_payload and state is None and not position_set:
286  _LOGGER.warning(
287  "Payload received on topic '%s' is not one of "
288  "[opening, closing], got: %s",
289  msg.topic,
290  state_payload,
291  )
292  return
293  if state is None:
294  return
295  self._update_state_update_state(state)
296 
297  @callback
298  def _state_message_received(self, msg: ReceiveMessage) -> None:
299  """Handle new MQTT state messages."""
300  payload = self._value_template_value_template(msg.payload)
301  payload_dict: Any = None
302  position_payload: Any = payload
303  state_payload: Any = payload
304 
305  if not payload:
306  _LOGGER.debug("Ignoring empty state message from '%s'", msg.topic)
307  return
308 
309  with suppress(*JSON_DECODE_EXCEPTIONS):
310  payload_dict = json_loads(payload)
311  if isinstance(payload_dict, dict):
312  if self.reports_positionreports_position and "position" not in payload_dict:
313  _LOGGER.warning(
314  "Missing required `position` attribute in json payload "
315  "on topic '%s', got: %s",
316  msg.topic,
317  payload,
318  )
319  return
320  if not self.reports_positionreports_position and "state" not in payload_dict:
321  _LOGGER.warning(
322  "Missing required `state` attribute in json payload "
323  " on topic '%s', got: %s",
324  msg.topic,
325  payload,
326  )
327  return
328  position_payload = payload_dict.get("position")
329  state_payload = payload_dict.get("state")
330 
331  if self._config_config[CONF_REPORTS_POSITION]:
332  self._process_position_valve_update_process_position_valve_update(msg, position_payload, state_payload)
333  else:
334  self._process_binary_valve_update_process_binary_valve_update(msg, state_payload)
335 
336  @callback
337  def _prepare_subscribe_topics(self) -> None:
338  """(Re)Subscribe to topics."""
339  self.add_subscriptionadd_subscription(
340  CONF_STATE_TOPIC,
341  self._state_message_received_state_message_received,
342  {
343  "_attr_current_valve_position",
344  "_attr_is_closed",
345  "_attr_is_closing",
346  "_attr_is_opening",
347  },
348  )
349 
350  async def _subscribe_topics(self) -> None:
351  """(Re)Subscribe to topics."""
352  subscription.async_subscribe_topics_internal(self.hasshasshass, self._sub_state_sub_state)
353 
354  async def async_open_valve(self) -> None:
355  """Move the valve up.
356 
357  This method is a coroutine.
358  """
359  payload = self._command_template_command_template(
360  self._config_config.get(CONF_PAYLOAD_OPEN, DEFAULT_PAYLOAD_OPEN)
361  )
362  await self.async_publish_with_configasync_publish_with_config(self._config_config[CONF_COMMAND_TOPIC], payload)
363  if self._optimistic_optimistic:
364  # Optimistically assume that valve has changed state.
365  self._update_state_update_state(ValveState.OPEN)
366  self.async_write_ha_stateasync_write_ha_state()
367 
368  async def async_close_valve(self) -> None:
369  """Move the valve down.
370 
371  This method is a coroutine.
372  """
373  payload = self._command_template_command_template(
374  self._config_config.get(CONF_PAYLOAD_CLOSE, DEFAULT_PAYLOAD_CLOSE)
375  )
376  await self.async_publish_with_configasync_publish_with_config(self._config_config[CONF_COMMAND_TOPIC], payload)
377  if self._optimistic_optimistic:
378  # Optimistically assume that valve has changed state.
379  self._update_state_update_state(ValveState.CLOSED)
380  self.async_write_ha_stateasync_write_ha_state()
381 
382  async def async_stop_valve(self) -> None:
383  """Stop valve positioning.
384 
385  This method is a coroutine.
386  """
387  payload = self._command_template_command_template(self._config_config[CONF_PAYLOAD_STOP])
388  await self.async_publish_with_configasync_publish_with_config(self._config_config[CONF_COMMAND_TOPIC], payload)
389 
390  async def async_set_valve_position(self, position: int) -> None:
391  """Move the valve to a specific position."""
392  percentage_position = position
393  scaled_position = round(
394  percentage_to_ranged_value(self._range_range, percentage_position)
395  )
396  variables = {
397  "position": percentage_position,
398  "position_open": self._config_config[CONF_POSITION_OPEN],
399  "position_closed": self._config_config[CONF_POSITION_CLOSED],
400  }
401  rendered_position = self._command_template_command_template(scaled_position, variables=variables)
402  await self.async_publish_with_configasync_publish_with_config(
403  self._config_config[CONF_COMMAND_TOPIC], rendered_position
404  )
405  if self._optimistic_optimistic:
406  self._update_state_update_state(
407  ValveState.CLOSED
408  if percentage_position == self._config_config[CONF_POSITION_CLOSED]
409  else ValveState.OPEN
410  )
411  self._attr_current_valve_position_attr_current_valve_position = percentage_position
412  self.async_write_ha_stateasync_write_ha_state()
None async_publish_with_config(self, str topic, PublishPayloadType payload)
Definition: entity.py:1377
bool add_subscription(self, str state_topic_config_key, Callable[[ReceiveMessage], None] msg_callback, set[str]|None tracked_attributes, bool disable_encoding=False)
Definition: entity.py:1484
None _process_position_valve_update(self, ReceiveMessage msg, str position_payload, str state_payload)
Definition: valve.py:256
None _state_message_received(self, ReceiveMessage msg)
Definition: valve.py:298
None _process_binary_valve_update(self, ReceiveMessage msg, str state_payload)
Definition: valve.py:230
None _update_state(self, str|None state)
Definition: valve.py:216
None _setup_from_config(self, ConfigType config)
Definition: valve.py:169
None async_set_valve_position(self, int position)
Definition: valve.py:390
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_setup_entity_entry_helper(HomeAssistant hass, ConfigEntry entry, type[MqttEntity]|None entity_class, str domain, AddEntitiesCallback async_add_entities, VolSchemaType discovery_schema, VolSchemaType platform_schema_modern, dict[str, type[MqttEntity]]|None schema_class_mapping=None)
Definition: entity.py:245
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
Definition: valve.py:140
ConfigType _validate_and_add_defaults(ConfigType config)
Definition: valve.py:95
float percentage_to_ranged_value(tuple[float, float] low_high_range, float percentage)
Definition: percentage.py:81
int ranged_value_to_percentage(tuple[float, float] low_high_range, float value)
Definition: percentage.py:64