Home Assistant Unofficial Reference 2024.12.1
vacuum.py
Go to the documentation of this file.
1 """Support for the Xiaomi vacuum cleaner robot."""
2 
3 from __future__ import annotations
4 
5 from functools import partial
6 import logging
7 from typing import Any
8 
9 from miio import DeviceException
10 import voluptuous as vol
11 
13  STATE_CLEANING,
14  STATE_DOCKED,
15  STATE_ERROR,
16  STATE_IDLE,
17  STATE_PAUSED,
18  STATE_RETURNING,
19  StateVacuumEntity,
20  VacuumEntityFeature,
21 )
22 from homeassistant.config_entries import ConfigEntry
23 from homeassistant.const import CONF_DEVICE
24 from homeassistant.core import HomeAssistant, callback
25 from homeassistant.helpers import config_validation as cv, entity_platform
26 from homeassistant.helpers.entity_platform import AddEntitiesCallback
27 from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
28 from homeassistant.util.dt import as_utc
29 
30 from . import VacuumCoordinatorData
31 from .const import (
32  CONF_FLOW_TYPE,
33  DOMAIN,
34  KEY_COORDINATOR,
35  KEY_DEVICE,
36  SERVICE_CLEAN_SEGMENT,
37  SERVICE_CLEAN_ZONE,
38  SERVICE_GOTO,
39  SERVICE_MOVE_REMOTE_CONTROL,
40  SERVICE_MOVE_REMOTE_CONTROL_STEP,
41  SERVICE_START_REMOTE_CONTROL,
42  SERVICE_STOP_REMOTE_CONTROL,
43 )
44 from .entity import XiaomiCoordinatedMiioEntity
45 
46 _LOGGER = logging.getLogger(__name__)
47 
48 ATTR_ERROR = "error"
49 ATTR_RC_DURATION = "duration"
50 ATTR_RC_ROTATION = "rotation"
51 ATTR_RC_VELOCITY = "velocity"
52 ATTR_STATUS = "status"
53 ATTR_ZONE_ARRAY = "zone"
54 ATTR_ZONE_REPEATER = "repeats"
55 ATTR_TIMERS = "timers"
56 
57 STATE_CODE_TO_STATE = {
58  1: STATE_IDLE, # "Starting"
59  2: STATE_IDLE, # "Charger disconnected"
60  3: STATE_IDLE, # "Idle"
61  4: STATE_CLEANING, # "Remote control active"
62  5: STATE_CLEANING, # "Cleaning"
63  6: STATE_RETURNING, # "Returning home"
64  7: STATE_CLEANING, # "Manual mode"
65  8: STATE_DOCKED, # "Charging"
66  9: STATE_ERROR, # "Charging problem"
67  10: STATE_PAUSED, # "Paused"
68  11: STATE_CLEANING, # "Spot cleaning"
69  12: STATE_ERROR, # "Error"
70  13: STATE_IDLE, # "Shutting down"
71  14: STATE_DOCKED, # "Updating"
72  15: STATE_RETURNING, # "Docking"
73  16: STATE_CLEANING, # "Going to target"
74  17: STATE_CLEANING, # "Zoned cleaning"
75  18: STATE_CLEANING, # "Segment cleaning"
76  22: STATE_DOCKED, # "Emptying the bin" on s7+
77  23: STATE_DOCKED, # "Washing the mop" on s7maxV
78  26: STATE_RETURNING, # "Going to wash the mop" on s7maxV
79  100: STATE_DOCKED, # "Charging complete"
80  101: STATE_ERROR, # "Device offline"
81 }
82 
83 
85  hass: HomeAssistant,
86  config_entry: ConfigEntry,
87  async_add_entities: AddEntitiesCallback,
88 ) -> None:
89  """Set up the Xiaomi vacuum cleaner robot from a config entry."""
90  entities = []
91 
92  if config_entry.data[CONF_FLOW_TYPE] == CONF_DEVICE:
93  unique_id = config_entry.unique_id
94 
95  mirobo = MiroboVacuum(
96  hass.data[DOMAIN][config_entry.entry_id][KEY_DEVICE],
97  config_entry,
98  unique_id,
99  hass.data[DOMAIN][config_entry.entry_id][KEY_COORDINATOR],
100  )
101  entities.append(mirobo)
102 
103  platform = entity_platform.async_get_current_platform()
104 
105  platform.async_register_entity_service(
106  SERVICE_START_REMOTE_CONTROL,
107  None,
108  MiroboVacuum.async_remote_control_start.__name__,
109  )
110 
111  platform.async_register_entity_service(
112  SERVICE_STOP_REMOTE_CONTROL,
113  None,
114  MiroboVacuum.async_remote_control_stop.__name__,
115  )
116 
117  platform.async_register_entity_service(
118  SERVICE_MOVE_REMOTE_CONTROL,
119  {
120  vol.Optional(ATTR_RC_VELOCITY): vol.All(
121  vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
122  ),
123  vol.Optional(ATTR_RC_ROTATION): vol.All(
124  vol.Coerce(int), vol.Clamp(min=-179, max=179)
125  ),
126  vol.Optional(ATTR_RC_DURATION): cv.positive_int,
127  },
128  MiroboVacuum.async_remote_control_move.__name__,
129  )
130 
131  platform.async_register_entity_service(
132  SERVICE_MOVE_REMOTE_CONTROL_STEP,
133  {
134  vol.Optional(ATTR_RC_VELOCITY): vol.All(
135  vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
136  ),
137  vol.Optional(ATTR_RC_ROTATION): vol.All(
138  vol.Coerce(int), vol.Clamp(min=-179, max=179)
139  ),
140  vol.Optional(ATTR_RC_DURATION): cv.positive_int,
141  },
142  MiroboVacuum.async_remote_control_move_step.__name__,
143  )
144 
145  platform.async_register_entity_service(
146  SERVICE_CLEAN_ZONE,
147  {
148  vol.Required(ATTR_ZONE_ARRAY): vol.All(
149  list,
150  [
151  vol.ExactSequence(
152  [
153  vol.Coerce(int),
154  vol.Coerce(int),
155  vol.Coerce(int),
156  vol.Coerce(int),
157  ]
158  )
159  ],
160  ),
161  vol.Required(ATTR_ZONE_REPEATER): vol.All(
162  vol.Coerce(int), vol.Clamp(min=1, max=3)
163  ),
164  },
165  MiroboVacuum.async_clean_zone.__name__,
166  )
167 
168  platform.async_register_entity_service(
169  SERVICE_GOTO,
170  {
171  vol.Required("x_coord"): vol.Coerce(int),
172  vol.Required("y_coord"): vol.Coerce(int),
173  },
174  MiroboVacuum.async_goto.__name__,
175  )
176  platform.async_register_entity_service(
177  SERVICE_CLEAN_SEGMENT,
178  {vol.Required("segments"): vol.Any(vol.Coerce(int), [vol.Coerce(int)])},
179  MiroboVacuum.async_clean_segment.__name__,
180  )
181 
182  async_add_entities(entities, update_before_add=True)
183 
184 
186  XiaomiCoordinatedMiioEntity[DataUpdateCoordinator[VacuumCoordinatorData]],
187  StateVacuumEntity,
188 ):
189  """Representation of a Xiaomi Vacuum cleaner robot."""
190 
191  _attr_name = None
192  _attr_supported_features = (
193  VacuumEntityFeature.STATE
194  | VacuumEntityFeature.PAUSE
195  | VacuumEntityFeature.STOP
196  | VacuumEntityFeature.RETURN_HOME
197  | VacuumEntityFeature.FAN_SPEED
198  | VacuumEntityFeature.SEND_COMMAND
199  | VacuumEntityFeature.LOCATE
200  | VacuumEntityFeature.BATTERY
201  | VacuumEntityFeature.CLEAN_SPOT
202  | VacuumEntityFeature.START
203  )
204 
205  def __init__(
206  self,
207  device,
208  entry,
209  unique_id,
210  coordinator: DataUpdateCoordinator[VacuumCoordinatorData],
211  ) -> None:
212  """Initialize the Xiaomi vacuum cleaner robot handler."""
213  super().__init__(device, entry, unique_id, coordinator)
214  self._state_state: str | None = None
215 
216  async def async_added_to_hass(self) -> None:
217  """Run when entity is about to be added to hass."""
218  await super().async_added_to_hass()
219  self._handle_coordinator_update_handle_coordinator_update()
220 
221  @property
222  def state(self) -> str | None:
223  """Return the status of the vacuum cleaner."""
224  # The vacuum reverts back to an idle state after erroring out.
225  # We want to keep returning an error until it has been cleared.
226  if self.coordinator.data.status.got_error:
227  return STATE_ERROR
228 
229  return self._state_state
230 
231  @property
232  def battery_level(self) -> int:
233  """Return the battery level of the vacuum cleaner."""
234  return self.coordinator.data.status.battery
235 
236  @property
237  def fan_speed(self) -> str:
238  """Return the fan speed of the vacuum cleaner."""
239  speed = self.coordinator.data.status.fanspeed
240  if speed in self.coordinator.data.fan_speeds_reverse:
241  return self.coordinator.data.fan_speeds_reverse[speed]
242 
243  _LOGGER.debug("Unable to find reverse for %s", speed)
244 
245  return str(speed)
246 
247  @property
248  def fan_speed_list(self) -> list[str]:
249  """Get the list of available fan speed steps of the vacuum cleaner."""
250  if speed_list := self.coordinator.data.fan_speeds:
251  return list(speed_list)
252  return []
253 
254  @property
255  def timers(self) -> list[dict[str, Any]]:
256  """Get the list of added timers of the vacuum cleaner."""
257  return [
258  {
259  "enabled": timer.enabled,
260  "cron": timer.cron,
261  "next_schedule": as_utc(timer.next_schedule),
262  }
263  for timer in self.coordinator.data.timers
264  ]
265 
266  @property
267  def extra_state_attributes(self) -> dict[str, Any]:
268  """Return the specific state attributes of this vacuum cleaner."""
269  attrs: dict[str, Any] = {}
270  attrs[ATTR_STATUS] = str(self.coordinator.data.status.state)
271 
272  if self.coordinator.data.status.got_error:
273  attrs[ATTR_ERROR] = self.coordinator.data.status.error
274 
275  if self.timerstimers:
276  attrs[ATTR_TIMERS] = self.timerstimers
277  return attrs
278 
279  async def _try_command(self, mask_error, func, *args, **kwargs):
280  """Call a vacuum command handling error messages."""
281  try:
282  await self.hasshasshass.async_add_executor_job(partial(func, *args, **kwargs))
283  await self.coordinator.async_refresh()
284  except DeviceException as exc:
285  _LOGGER.error(mask_error, exc)
286  return False
287  return True
288 
289  async def async_start(self) -> None:
290  """Start or resume the cleaning task."""
291  await self._try_command_try_command(
292  "Unable to start the vacuum: %s", self._device.resume_or_start
293  )
294 
295  async def async_pause(self) -> None:
296  """Pause the cleaning task."""
297  await self._try_command_try_command("Unable to set start/pause: %s", self._device.pause)
298 
299  async def async_stop(self, **kwargs: Any) -> None:
300  """Stop the vacuum cleaner."""
301  await self._try_command_try_command("Unable to stop: %s", self._device.stop)
302 
303  async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None:
304  """Set fan speed."""
305  if fan_speed in self.coordinator.data.fan_speeds:
306  fan_speed_int = self.coordinator.data.fan_speeds[fan_speed]
307  else:
308  try:
309  fan_speed_int = int(fan_speed)
310  except ValueError as exc:
311  _LOGGER.error(
312  "Fan speed step not recognized (%s). Valid speeds are: %s",
313  exc,
314  self.fan_speed_listfan_speed_listfan_speed_list,
315  )
316  return
317  await self._try_command_try_command(
318  "Unable to set fan speed: %s", self._device.set_fan_speed, fan_speed_int
319  )
320 
321  async def async_return_to_base(self, **kwargs: Any) -> None:
322  """Set the vacuum cleaner to return to the dock."""
323  await self._try_command_try_command("Unable to return home: %s", self._device.home)
324 
325  async def async_clean_spot(self, **kwargs: Any) -> None:
326  """Perform a spot clean-up."""
327  await self._try_command_try_command(
328  "Unable to start the vacuum for a spot clean-up: %s", self._device.spot
329  )
330 
331  async def async_locate(self, **kwargs: Any) -> None:
332  """Locate the vacuum cleaner."""
333  await self._try_command_try_command("Unable to locate the botvac: %s", self._device.find)
334 
336  self,
337  command: str,
338  params: dict[str, Any] | list[Any] | None = None,
339  **kwargs: Any,
340  ) -> None:
341  """Send raw command."""
342  await self._try_command_try_command(
343  "Unable to send command to the vacuum: %s",
344  self._device.raw_command,
345  command,
346  params,
347  )
348 
349  async def async_remote_control_start(self) -> None:
350  """Start remote control mode."""
351  await self._try_command_try_command(
352  "Unable to start remote control the vacuum: %s", self._device.manual_start
353  )
354 
355  async def async_remote_control_stop(self) -> None:
356  """Stop remote control mode."""
357  await self._try_command_try_command(
358  "Unable to stop remote control the vacuum: %s", self._device.manual_stop
359  )
360 
362  self, rotation: int = 0, velocity: float = 0.3, duration: int = 1500
363  ) -> None:
364  """Move vacuum with remote control mode."""
365  await self._try_command_try_command(
366  "Unable to move with remote control the vacuum: %s",
367  self._device.manual_control,
368  velocity=velocity,
369  rotation=rotation,
370  duration=duration,
371  )
372 
374  self, rotation: int = 0, velocity: float = 0.2, duration: int = 1500
375  ) -> None:
376  """Move vacuum one step with remote control mode."""
377  await self._try_command_try_command(
378  "Unable to remote control the vacuum: %s",
379  self._device.manual_control_once,
380  velocity=velocity,
381  rotation=rotation,
382  duration=duration,
383  )
384 
385  async def async_goto(self, x_coord: int, y_coord: int) -> None:
386  """Goto the specified coordinates."""
387  await self._try_command_try_command(
388  "Unable to send the vacuum cleaner to the specified coordinates: %s",
389  self._device.goto,
390  x_coord=x_coord,
391  y_coord=y_coord,
392  )
393 
394  async def async_clean_segment(self, segments) -> None:
395  """Clean the specified segments(s)."""
396  if isinstance(segments, int):
397  segments = [segments]
398 
399  await self._try_command_try_command(
400  "Unable to start cleaning of the specified segments: %s",
401  self._device.segment_clean,
402  segments=segments,
403  )
404 
405  async def async_clean_zone(self, zone: list[Any], repeats: int = 1) -> None:
406  """Clean selected area for the number of repeats indicated."""
407  for _zone in zone:
408  _zone.append(repeats)
409  _LOGGER.debug("Zone with repeats: %s", zone)
410  try:
411  await self.hasshasshass.async_add_executor_job(self._device.zoned_clean, zone)
412  await self.coordinator.async_refresh()
413  except (OSError, DeviceException) as exc:
414  _LOGGER.error("Unable to send zoned_clean command to the vacuum: %s", exc)
415 
416  @callback
417  def _handle_coordinator_update(self) -> None:
418  state_code = int(self.coordinator.data.status.state_code)
419  if state_code not in STATE_CODE_TO_STATE:
420  _LOGGER.error(
421  "STATE not supported: %s, state_code: %s",
422  self.coordinator.data.status.state,
423  self.coordinator.data.status.state_code,
424  )
425  self._state_state = None
426  else:
427  self._state_state = STATE_CODE_TO_STATE[state_code]
428 
None async_send_command(self, str command, dict[str, Any]|list[Any]|None params=None, **Any kwargs)
Definition: vacuum.py:340
None async_remote_control_move(self, int rotation=0, float velocity=0.3, int duration=1500)
Definition: vacuum.py:363
None async_remote_control_move_step(self, int rotation=0, float velocity=0.2, int duration=1500)
Definition: vacuum.py:375
None async_goto(self, int x_coord, int y_coord)
Definition: vacuum.py:385
None async_set_fan_speed(self, str fan_speed, **Any kwargs)
Definition: vacuum.py:303
def _try_command(self, mask_error, func, *args, **kwargs)
Definition: vacuum.py:279
None __init__(self, device, entry, unique_id, DataUpdateCoordinator[VacuumCoordinatorData] coordinator)
Definition: vacuum.py:211
None async_clean_zone(self, list[Any] zone, int repeats=1)
Definition: vacuum.py:405
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
Definition: vacuum.py:88
dt.datetime as_utc(dt.datetime dattim)
Definition: dt.py:132