1 """Support for the Xiaomi vacuum cleaner robot."""
3 from __future__
import annotations
5 from functools
import partial
9 from miio
import DeviceException
10 import voluptuous
as vol
30 from .
import VacuumCoordinatorData
36 SERVICE_CLEAN_SEGMENT,
39 SERVICE_MOVE_REMOTE_CONTROL,
40 SERVICE_MOVE_REMOTE_CONTROL_STEP,
41 SERVICE_START_REMOTE_CONTROL,
42 SERVICE_STOP_REMOTE_CONTROL,
44 from .entity
import XiaomiCoordinatedMiioEntity
46 _LOGGER = logging.getLogger(__name__)
49 ATTR_RC_DURATION =
"duration"
50 ATTR_RC_ROTATION =
"rotation"
51 ATTR_RC_VELOCITY =
"velocity"
52 ATTR_STATUS =
"status"
53 ATTR_ZONE_ARRAY =
"zone"
54 ATTR_ZONE_REPEATER =
"repeats"
55 ATTR_TIMERS =
"timers"
57 STATE_CODE_TO_STATE = {
86 config_entry: ConfigEntry,
87 async_add_entities: AddEntitiesCallback,
89 """Set up the Xiaomi vacuum cleaner robot from a config entry."""
92 if config_entry.data[CONF_FLOW_TYPE] == CONF_DEVICE:
93 unique_id = config_entry.unique_id
96 hass.data[DOMAIN][config_entry.entry_id][KEY_DEVICE],
99 hass.data[DOMAIN][config_entry.entry_id][KEY_COORDINATOR],
101 entities.append(mirobo)
103 platform = entity_platform.async_get_current_platform()
105 platform.async_register_entity_service(
106 SERVICE_START_REMOTE_CONTROL,
108 MiroboVacuum.async_remote_control_start.__name__,
111 platform.async_register_entity_service(
112 SERVICE_STOP_REMOTE_CONTROL,
114 MiroboVacuum.async_remote_control_stop.__name__,
117 platform.async_register_entity_service(
118 SERVICE_MOVE_REMOTE_CONTROL,
120 vol.Optional(ATTR_RC_VELOCITY): vol.All(
121 vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
123 vol.Optional(ATTR_RC_ROTATION): vol.All(
124 vol.Coerce(int), vol.Clamp(min=-179, max=179)
126 vol.Optional(ATTR_RC_DURATION): cv.positive_int,
128 MiroboVacuum.async_remote_control_move.__name__,
131 platform.async_register_entity_service(
132 SERVICE_MOVE_REMOTE_CONTROL_STEP,
134 vol.Optional(ATTR_RC_VELOCITY): vol.All(
135 vol.Coerce(float), vol.Clamp(min=-0.29, max=0.29)
137 vol.Optional(ATTR_RC_ROTATION): vol.All(
138 vol.Coerce(int), vol.Clamp(min=-179, max=179)
140 vol.Optional(ATTR_RC_DURATION): cv.positive_int,
142 MiroboVacuum.async_remote_control_move_step.__name__,
145 platform.async_register_entity_service(
148 vol.Required(ATTR_ZONE_ARRAY): vol.All(
161 vol.Required(ATTR_ZONE_REPEATER): vol.All(
162 vol.Coerce(int), vol.Clamp(min=1, max=3)
165 MiroboVacuum.async_clean_zone.__name__,
168 platform.async_register_entity_service(
171 vol.Required(
"x_coord"): vol.Coerce(int),
172 vol.Required(
"y_coord"): vol.Coerce(int),
174 MiroboVacuum.async_goto.__name__,
176 platform.async_register_entity_service(
177 SERVICE_CLEAN_SEGMENT,
178 {vol.Required(
"segments"): vol.Any(vol.Coerce(int), [vol.Coerce(int)])},
179 MiroboVacuum.async_clean_segment.__name__,
186 XiaomiCoordinatedMiioEntity[DataUpdateCoordinator[VacuumCoordinatorData]],
189 """Representation of a Xiaomi Vacuum cleaner robot."""
192 _attr_supported_features = (
193 VacuumEntityFeature.STATE
194 | VacuumEntityFeature.PAUSE
195 | VacuumEntityFeature.STOP
196 | VacuumEntityFeature.RETURN_HOME
197 | VacuumEntityFeature.FAN_SPEED
198 | VacuumEntityFeature.SEND_COMMAND
199 | VacuumEntityFeature.LOCATE
200 | VacuumEntityFeature.BATTERY
201 | VacuumEntityFeature.CLEAN_SPOT
202 | VacuumEntityFeature.START
210 coordinator: DataUpdateCoordinator[VacuumCoordinatorData],
212 """Initialize the Xiaomi vacuum cleaner robot handler."""
213 super().
__init__(device, entry, unique_id, coordinator)
214 self.
_state_state: str |
None =
None
217 """Run when entity is about to be added to hass."""
223 """Return the status of the vacuum cleaner."""
226 if self.coordinator.data.status.got_error:
233 """Return the battery level of the vacuum cleaner."""
234 return self.coordinator.data.status.battery
238 """Return the fan speed of the vacuum cleaner."""
239 speed = self.coordinator.data.status.fanspeed
240 if speed
in self.coordinator.data.fan_speeds_reverse:
241 return self.coordinator.data.fan_speeds_reverse[speed]
243 _LOGGER.debug(
"Unable to find reverse for %s", speed)
249 """Get the list of available fan speed steps of the vacuum cleaner."""
250 if speed_list := self.coordinator.data.fan_speeds:
251 return list(speed_list)
255 def timers(self) -> list[dict[str, Any]]:
256 """Get the list of added timers of the vacuum cleaner."""
259 "enabled": timer.enabled,
261 "next_schedule":
as_utc(timer.next_schedule),
263 for timer
in self.coordinator.data.timers
268 """Return the specific state attributes of this vacuum cleaner."""
269 attrs: dict[str, Any] = {}
270 attrs[ATTR_STATUS] =
str(self.coordinator.data.status.state)
272 if self.coordinator.data.status.got_error:
273 attrs[ATTR_ERROR] = self.coordinator.data.status.error
276 attrs[ATTR_TIMERS] = self.
timerstimers
280 """Call a vacuum command handling error messages."""
282 await self.
hasshasshass.async_add_executor_job(partial(func, *args, **kwargs))
284 except DeviceException
as exc:
285 _LOGGER.error(mask_error, exc)
290 """Start or resume the cleaning task."""
292 "Unable to start the vacuum: %s", self._device.resume_or_start
296 """Pause the cleaning task."""
297 await self.
_try_command_try_command(
"Unable to set start/pause: %s", self._device.pause)
300 """Stop the vacuum cleaner."""
301 await self.
_try_command_try_command(
"Unable to stop: %s", self._device.stop)
305 if fan_speed
in self.coordinator.data.fan_speeds:
306 fan_speed_int = self.coordinator.data.fan_speeds[fan_speed]
309 fan_speed_int =
int(fan_speed)
310 except ValueError
as exc:
312 "Fan speed step not recognized (%s). Valid speeds are: %s",
318 "Unable to set fan speed: %s", self._device.set_fan_speed, fan_speed_int
322 """Set the vacuum cleaner to return to the dock."""
323 await self.
_try_command_try_command(
"Unable to return home: %s", self._device.home)
326 """Perform a spot clean-up."""
328 "Unable to start the vacuum for a spot clean-up: %s", self._device.spot
332 """Locate the vacuum cleaner."""
333 await self.
_try_command_try_command(
"Unable to locate the botvac: %s", self._device.find)
338 params: dict[str, Any] | list[Any] |
None =
None,
341 """Send raw command."""
343 "Unable to send command to the vacuum: %s",
344 self._device.raw_command,
350 """Start remote control mode."""
352 "Unable to start remote control the vacuum: %s", self._device.manual_start
356 """Stop remote control mode."""
358 "Unable to stop remote control the vacuum: %s", self._device.manual_stop
362 self, rotation: int = 0, velocity: float = 0.3, duration: int = 1500
364 """Move vacuum with remote control mode."""
366 "Unable to move with remote control the vacuum: %s",
367 self._device.manual_control,
374 self, rotation: int = 0, velocity: float = 0.2, duration: int = 1500
376 """Move vacuum one step with remote control mode."""
378 "Unable to remote control the vacuum: %s",
379 self._device.manual_control_once,
385 async
def async_goto(self, x_coord: int, y_coord: int) ->
None:
386 """Goto the specified coordinates."""
388 "Unable to send the vacuum cleaner to the specified coordinates: %s",
395 """Clean the specified segments(s)."""
396 if isinstance(segments, int):
397 segments = [segments]
400 "Unable to start cleaning of the specified segments: %s",
401 self._device.segment_clean,
406 """Clean selected area for the number of repeats indicated."""
408 _zone.append(repeats)
409 _LOGGER.debug(
"Zone with repeats: %s", zone)
411 await self.
hasshasshass.async_add_executor_job(self._device.zoned_clean, zone)
413 except (OSError, DeviceException)
as exc:
414 _LOGGER.error(
"Unable to send zoned_clean command to the vacuum: %s", exc)
418 state_code =
int(self.coordinator.data.status.state_code)
419 if state_code
not in STATE_CODE_TO_STATE:
421 "STATE not supported: %s, state_code: %s",
422 self.coordinator.data.status.state,
423 self.coordinator.data.status.state_code,
427 self.
_state_state = STATE_CODE_TO_STATE[state_code]
list[str] fan_speed_list(self)
None async_send_command(self, str command, dict[str, Any]|list[Any]|None params=None, **Any kwargs)
None async_remote_control_move(self, int rotation=0, float velocity=0.3, int duration=1500)
list[dict[str, Any]] timers(self)
None async_remote_control_start(self)
None async_added_to_hass(self)
None async_remote_control_move_step(self, int rotation=0, float velocity=0.2, int duration=1500)
None async_goto(self, int x_coord, int y_coord)
None async_remote_control_stop(self)
dict[str, Any] extra_state_attributes(self)
None async_set_fan_speed(self, str fan_speed, **Any kwargs)
None async_return_to_base(self, **Any kwargs)
None async_stop(self, **Any kwargs)
None async_clean_segment(self, segments)
None async_clean_spot(self, **Any kwargs)
None async_locate(self, **Any kwargs)
def _try_command(self, mask_error, func, *args, **kwargs)
None _handle_coordinator_update(self)
list[str] fan_speed_list(self)
None __init__(self, device, entry, unique_id, DataUpdateCoordinator[VacuumCoordinatorData] coordinator)
None async_clean_zone(self, list[Any] zone, int repeats=1)
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
dt.datetime as_utc(dt.datetime dattim)