1 """Base class for iRobot devices."""
3 from __future__
import annotations
24 from .
import roomba_reported_state
25 from .const
import DOMAIN
27 _LOGGER = logging.getLogger(__name__)
29 ATTR_CLEANING_TIME =
"cleaning_time"
30 ATTR_CLEANED_AREA =
"cleaned_area"
32 ATTR_ERROR_CODE =
"error_code"
33 ATTR_POSITION =
"position"
34 ATTR_SOFTWARE_VERSION =
"software_version"
38 VacuumEntityFeature.BATTERY
39 | VacuumEntityFeature.PAUSE
40 | VacuumEntityFeature.RETURN_HOME
41 | VacuumEntityFeature.SEND_COMMAND
42 | VacuumEntityFeature.START
43 | VacuumEntityFeature.STATE
44 | VacuumEntityFeature.STOP
45 | VacuumEntityFeature.LOCATE
50 "charge": STATE_DOCKED,
51 "evac": STATE_RETURNING,
52 "hmMidMsn": STATE_CLEANING,
53 "hmPostMsn": STATE_RETURNING,
54 "hmUsrDock": STATE_RETURNING,
55 "pause": STATE_PAUSED,
56 "run": STATE_CLEANING,
63 """Base class for iRobot Entities."""
65 _attr_should_poll =
False
66 _attr_has_entity_name =
True
69 """Initialize the iRobot handler."""
77 manufacturer=
"iRobot",
88 (dr.CONNECTION_NETWORK_MAC, mac_address)
93 """Return the uniqueid of the vacuum cleaner."""
94 return f
"roomba_{self._blid}"
98 """Return the uniqueid of the vacuum cleaner."""
103 """Return the battery level of the vacuum cleaner."""
108 """Return the run stats."""
113 """Return the mission stats."""
118 """Return the battery stats."""
123 """Return last mission start time."""
126 )
is None or ts == 0:
128 return dt_util.utc_from_timestamp(ts)
132 """Return the state of the vacuum cleaner."""
133 clean_mission_status = self.
vacuum_statevacuum_state.
get(
"cleanMissionStatus", {})
134 cycle = clean_mission_status.get(
"cycle")
135 phase = clean_mission_status.get(
"phase")
137 state = STATE_MAP[phase]
140 if cycle !=
"none" and state
in (STATE_IDLE, STATE_DOCKED):
145 """Register callback function."""
149 """Filter out wifi state messages."""
150 return len(new_state) > 1
or "signal" not in new_state
153 """Update state on message change."""
154 state = json_data.get(
"state", {}).
get(
"reported", {})
160 """Base class for iRobot robots."""
163 _attr_supported_features = SUPPORT_IROBOT
164 _attr_available =
True
167 """Initialize the iRobot handler."""
173 """Return the state of the vacuum cleaner."""
178 """Return the state attributes of the device."""
182 software_version = state.get(
"softwareVer")
185 state_attrs = {ATTR_SOFTWARE_VERSION: software_version}
188 state_attrs[ATTR_STATUS] = self.
vacuumvacuum.current_state
195 state_attrs[ATTR_CLEANING_TIME],
196 state_attrs[ATTR_CLEANED_AREA],
200 if self.
vacuumvacuum.error_code != 0:
201 state_attrs[ATTR_ERROR] = self.
vacuumvacuum.error_message
202 state_attrs[ATTR_ERROR_CODE] = self.
vacuumvacuum.error_code
207 pos_state = state.get(
"pose", {})
209 pos_x = pos_state.get(
"point", {}).
get(
"x")
210 pos_y = pos_state.get(
"point", {}).
get(
"y")
211 theta = pos_state.get(
"theta")
212 if all(item
is not None for item
in (pos_x, pos_y, theta)):
213 position = f
"({pos_x}, {pos_y}, {theta})"
214 state_attrs[ATTR_POSITION] = position
219 """Return the cleaning time and cleaned area from the device."""
220 if not (mission_state := state.get(
"cleanMissionStatus")):
223 if cleaning_time := mission_state.get(
"mssnM", 0):
225 elif start_time := mission_state.get(
"mssnStrtTm"):
226 now = dt_util.as_timestamp(dt_util.utcnow())
228 cleaning_time = (now - start_time) // 60
230 if cleaned_area := mission_state.get(
"sqft", 0):
232 if self.
hasshass.config.units
is METRIC_SYSTEM:
233 cleaned_area = round(cleaned_area * 0.0929)
235 return (cleaning_time, cleaned_area)
238 """Update state on message change."""
239 state = json_data.get(
"state", {}).
get(
"reported", {})
241 _LOGGER.debug(
"Got new state from the vacuum: %s", json_data)
245 """Start or resume the cleaning task."""
247 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"resume")
249 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"start")
252 """Stop the vacuum cleaner."""
253 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"stop")
256 """Pause the cleaning cycle."""
257 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"pause")
260 """Set the vacuum cleaner to return to the dock."""
266 await asyncio.sleep(1)
267 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"dock")
270 """Located vacuum."""
271 await self.
hasshass.async_add_executor_job(self.
vacuumvacuum.send_command,
"find")
274 """Send raw command."""
275 _LOGGER.debug(
"async_send_command %s (%s), %s", command, params, kwargs)
276 await self.
hasshass.async_add_executor_job(
277 self.
vacuumvacuum.send_command, command, params
def robot_unique_id(self)
def new_state_filter(self, new_state)
def async_added_to_hass(self)
def on_message(self, json_data)
def __init__(self, roomba, blid)
def async_send_command(self, command, params=None, **kwargs)
tuple[int, int] get_cleaning_status(self, state)
def __init__(self, roomba, blid)
def extra_state_attributes(self)
def async_locate(self, **kwargs)
def on_message(self, json_data)
def async_return_to_base(self, **kwargs)
def async_stop(self, **kwargs)
None schedule_update_ha_state(self, bool force_refresh=False)
web.Response get(self, web.Request request, str config_key)
dict[str, Any] roomba_reported_state(Roomba roomba)