Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """Base class for iRobot devices."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 import logging
7 
9  ATTR_STATUS,
10  STATE_CLEANING,
11  STATE_DOCKED,
12  STATE_ERROR,
13  STATE_RETURNING,
14  StateVacuumEntity,
15  VacuumEntityFeature,
16 )
17 from homeassistant.const import ATTR_CONNECTIONS, STATE_IDLE, STATE_PAUSED
19 from homeassistant.helpers.device_registry import DeviceInfo
20 from homeassistant.helpers.entity import Entity
21 import homeassistant.util.dt as dt_util
22 from homeassistant.util.unit_system import METRIC_SYSTEM
23 
24 from . import roomba_reported_state
25 from .const import DOMAIN
26 
27 _LOGGER = logging.getLogger(__name__)
28 
29 ATTR_CLEANING_TIME = "cleaning_time"
30 ATTR_CLEANED_AREA = "cleaned_area"
31 ATTR_ERROR = "error"
32 ATTR_ERROR_CODE = "error_code"
33 ATTR_POSITION = "position"
34 ATTR_SOFTWARE_VERSION = "software_version"
35 
36 # Commonly supported features
37 SUPPORT_IROBOT = (
38  VacuumEntityFeature.BATTERY
39  | VacuumEntityFeature.PAUSE
40  | VacuumEntityFeature.RETURN_HOME
41  | VacuumEntityFeature.SEND_COMMAND
42  | VacuumEntityFeature.START
43  | VacuumEntityFeature.STATE
44  | VacuumEntityFeature.STOP
45  | VacuumEntityFeature.LOCATE
46 )
47 
48 STATE_MAP = {
49  "": STATE_IDLE,
50  "charge": STATE_DOCKED,
51  "evac": STATE_RETURNING, # Emptying at cleanbase
52  "hmMidMsn": STATE_CLEANING, # Recharging at the middle of a cycle
53  "hmPostMsn": STATE_RETURNING, # Cycle finished
54  "hmUsrDock": STATE_RETURNING,
55  "pause": STATE_PAUSED,
56  "run": STATE_CLEANING,
57  "stop": STATE_IDLE,
58  "stuck": STATE_ERROR,
59 }
60 
61 
63  """Base class for iRobot Entities."""
64 
65  _attr_should_poll = False
66  _attr_has_entity_name = True
67 
68  def __init__(self, roomba, blid):
69  """Initialize the iRobot handler."""
70  self.vacuumvacuum = roomba
71  self._blid_blid = blid
72  self.vacuum_statevacuum_state = roomba_reported_state(roomba)
73 
74  self._attr_device_info_attr_device_info = DeviceInfo(
75  identifiers={(DOMAIN, self.robot_unique_idrobot_unique_id)},
76  serial_number=self.vacuum_statevacuum_state.get("hwPartsRev", {}).get("navSerialNo"),
77  manufacturer="iRobot",
78  model=self.vacuum_statevacuum_state.get("sku"),
79  name=str(self.vacuum_statevacuum_state.get("name")),
80  sw_version=self.vacuum_statevacuum_state.get("softwareVer"),
81  hw_version=self.vacuum_statevacuum_state.get("hardwareRev"),
82  )
83 
84  if mac_address := self.vacuum_statevacuum_state.get("hwPartsRev", {}).get(
85  "wlan0HwAddr", self.vacuum_statevacuum_state.get("mac")
86  ):
87  self._attr_device_info_attr_device_info[ATTR_CONNECTIONS] = {
88  (dr.CONNECTION_NETWORK_MAC, mac_address)
89  }
90 
91  @property
92  def robot_unique_id(self):
93  """Return the uniqueid of the vacuum cleaner."""
94  return f"roomba_{self._blid}"
95 
96  @property
97  def unique_id(self):
98  """Return the uniqueid of the vacuum cleaner."""
99  return self.robot_unique_idrobot_unique_id
100 
101  @property
102  def battery_level(self):
103  """Return the battery level of the vacuum cleaner."""
104  return self.vacuum_statevacuum_state.get("batPct")
105 
106  @property
107  def run_stats(self):
108  """Return the run stats."""
109  return self.vacuum_statevacuum_state.get("bbrun", {})
110 
111  @property
112  def mission_stats(self):
113  """Return the mission stats."""
114  return self.vacuum_statevacuum_state.get("bbmssn", {})
115 
116  @property
117  def battery_stats(self):
118  """Return the battery stats."""
119  return self.vacuum_statevacuum_state.get("bbchg3", {})
120 
121  @property
122  def last_mission(self):
123  """Return last mission start time."""
124  if (
125  ts := self.vacuum_statevacuum_state.get("cleanMissionStatus", {}).get("mssnStrtTm")
126  ) is None or ts == 0:
127  return None
128  return dt_util.utc_from_timestamp(ts)
129 
130  @property
131  def _robot_state(self):
132  """Return the state of the vacuum cleaner."""
133  clean_mission_status = self.vacuum_statevacuum_state.get("cleanMissionStatus", {})
134  cycle = clean_mission_status.get("cycle")
135  phase = clean_mission_status.get("phase")
136  try:
137  state = STATE_MAP[phase]
138  except KeyError:
139  return STATE_ERROR
140  if cycle != "none" and state in (STATE_IDLE, STATE_DOCKED):
141  state = STATE_PAUSED
142  return state
143 
144  async def async_added_to_hass(self):
145  """Register callback function."""
146  self.vacuumvacuum.register_on_message_callback(self.on_messageon_message)
147 
148  def new_state_filter(self, new_state):
149  """Filter out wifi state messages."""
150  return len(new_state) > 1 or "signal" not in new_state
151 
152  def on_message(self, json_data):
153  """Update state on message change."""
154  state = json_data.get("state", {}).get("reported", {})
155  if self.new_state_filternew_state_filter(state):
156  self.schedule_update_ha_stateschedule_update_ha_state()
157 
158 
159 class IRobotVacuum(IRobotEntity, StateVacuumEntity): # pylint: disable=hass-enforce-class-module
160  """Base class for iRobot robots."""
161 
162  _attr_name = None
163  _attr_supported_features = SUPPORT_IROBOT
164  _attr_available = True # Always available, otherwise setup will fail
165 
166  def __init__(self, roomba, blid):
167  """Initialize the iRobot handler."""
168  super().__init__(roomba, blid)
169  self._cap_position_cap_position = self.vacuum_statevacuum_state.get("cap", {}).get("pose") == 1
170 
171  @property
172  def state(self):
173  """Return the state of the vacuum cleaner."""
174  return self._robot_state_robot_state
175 
176  @property
178  """Return the state attributes of the device."""
179  state = self.vacuum_statevacuum_state
180 
181  # Roomba software version
182  software_version = state.get("softwareVer")
183 
184  # Set properties that are to appear in the GUI
185  state_attrs = {ATTR_SOFTWARE_VERSION: software_version}
186 
187  # Set legacy status to avoid break changes
188  state_attrs[ATTR_STATUS] = self.vacuumvacuum.current_state
189 
190  # Only add cleaning time and cleaned area attrs when the vacuum is
191  # currently on
192  if self.statestatestatestatestatestate == STATE_CLEANING:
193  # Get clean mission status
194  (
195  state_attrs[ATTR_CLEANING_TIME],
196  state_attrs[ATTR_CLEANED_AREA],
197  ) = self.get_cleaning_statusget_cleaning_status(state)
198 
199  # Error
200  if self.vacuumvacuum.error_code != 0:
201  state_attrs[ATTR_ERROR] = self.vacuumvacuum.error_message
202  state_attrs[ATTR_ERROR_CODE] = self.vacuumvacuum.error_code
203 
204  # Not all Roombas expose position data
205  # https://github.com/koalazak/dorita980/issues/48
206  if self._cap_position_cap_position:
207  pos_state = state.get("pose", {})
208  position = None
209  pos_x = pos_state.get("point", {}).get("x")
210  pos_y = pos_state.get("point", {}).get("y")
211  theta = pos_state.get("theta")
212  if all(item is not None for item in (pos_x, pos_y, theta)):
213  position = f"({pos_x}, {pos_y}, {theta})"
214  state_attrs[ATTR_POSITION] = position
215 
216  return state_attrs
217 
218  def get_cleaning_status(self, state) -> tuple[int, int]:
219  """Return the cleaning time and cleaned area from the device."""
220  if not (mission_state := state.get("cleanMissionStatus")):
221  return (0, 0)
222 
223  if cleaning_time := mission_state.get("mssnM", 0):
224  pass
225  elif start_time := mission_state.get("mssnStrtTm"):
226  now = dt_util.as_timestamp(dt_util.utcnow())
227  if now > start_time:
228  cleaning_time = (now - start_time) // 60
229 
230  if cleaned_area := mission_state.get("sqft", 0): # Imperial
231  # Convert to m2 if the unit_system is set to metric
232  if self.hasshass.config.units is METRIC_SYSTEM:
233  cleaned_area = round(cleaned_area * 0.0929)
234 
235  return (cleaning_time, cleaned_area)
236 
237  def on_message(self, json_data):
238  """Update state on message change."""
239  state = json_data.get("state", {}).get("reported", {})
240  if self.new_state_filternew_state_filter(state):
241  _LOGGER.debug("Got new state from the vacuum: %s", json_data)
242  self.schedule_update_ha_stateschedule_update_ha_state()
243 
244  async def async_start(self):
245  """Start or resume the cleaning task."""
246  if self.statestatestatestatestatestate == STATE_PAUSED:
247  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "resume")
248  else:
249  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "start")
250 
251  async def async_stop(self, **kwargs):
252  """Stop the vacuum cleaner."""
253  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "stop")
254 
255  async def async_pause(self):
256  """Pause the cleaning cycle."""
257  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "pause")
258 
259  async def async_return_to_base(self, **kwargs):
260  """Set the vacuum cleaner to return to the dock."""
261  if self.statestatestatestatestatestate == STATE_CLEANING:
262  await self.async_pauseasync_pauseasync_pause()
263  for _ in range(10):
264  if self.statestatestatestatestatestate == STATE_PAUSED:
265  break
266  await asyncio.sleep(1)
267  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "dock")
268 
269  async def async_locate(self, **kwargs):
270  """Located vacuum."""
271  await self.hasshass.async_add_executor_job(self.vacuumvacuum.send_command, "find")
272 
273  async def async_send_command(self, command, params=None, **kwargs):
274  """Send raw command."""
275  _LOGGER.debug("async_send_command %s (%s), %s", command, params, kwargs)
276  await self.hasshass.async_add_executor_job(
277  self.vacuumvacuum.send_command, command, params
278  )
def async_send_command(self, command, params=None, **kwargs)
Definition: entity.py:273
tuple[int, int] get_cleaning_status(self, state)
Definition: entity.py:218
None schedule_update_ha_state(self, bool force_refresh=False)
Definition: entity.py:1244
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
dict[str, Any] roomba_reported_state(Roomba roomba)
Definition: __init__.py:129