1 """Reproduce an Vacuum state."""
3 from __future__
import annotations
6 from collections.abc
import Iterable
25 SERVICE_RETURN_TO_BASE,
26 SERVICE_SET_FAN_SPEED,
34 _LOGGER = logging.getLogger(__name__)
36 VALID_STATES_TOGGLE = {STATE_ON, STATE_OFF}
37 VALID_STATES_STATE = {
50 context: Context |
None =
None,
51 reproduce_options: dict[str, Any] |
None =
None,
53 """Reproduce a single state."""
54 if (cur_state := hass.states.get(state.entity_id))
is None:
55 _LOGGER.warning(
"Unable to find entity %s", state.entity_id)
58 if not (state.state
in VALID_STATES_TOGGLE
or state.state
in VALID_STATES_STATE):
60 "Invalid state specified for %s: %s", state.entity_id, state.state
65 if cur_state.state == state.state
and cur_state.attributes.get(
67 ) == state.attributes.get(ATTR_FAN_SPEED):
70 service_data = {ATTR_ENTITY_ID: state.entity_id}
72 if cur_state.state != state.state:
74 if state.state == STATE_ON:
75 service = SERVICE_TURN_ON
76 elif state.state == STATE_OFF:
77 service = SERVICE_TURN_OFF
78 elif state.state == STATE_CLEANING:
79 service = SERVICE_START
80 elif state.state
in [STATE_DOCKED, STATE_RETURNING]:
81 service = SERVICE_RETURN_TO_BASE
82 elif state.state == STATE_IDLE:
83 service = SERVICE_STOP
84 elif state.state == STATE_PAUSED:
85 service = SERVICE_PAUSE
87 await hass.services.async_call(
88 DOMAIN, service, service_data, context=context, blocking=
True
91 if cur_state.attributes.get(ATTR_FAN_SPEED) != state.attributes.get(ATTR_FAN_SPEED):
93 service_data[
"fan_speed"] = state.attributes[ATTR_FAN_SPEED]
94 await hass.services.async_call(
95 DOMAIN, SERVICE_SET_FAN_SPEED, service_data, context=context, blocking=
True
101 states: Iterable[State],
103 context: Context |
None =
None,
104 reproduce_options: dict[str, Any] |
None =
None,
106 """Reproduce Vacuum states."""
108 await asyncio.gather(
111 hass, state, context=context, reproduce_options=reproduce_options
None async_reproduce_states(HomeAssistant hass, Iterable[State] states, *Context|None context=None, dict[str, Any]|None reproduce_options=None)
None _async_reproduce_state(HomeAssistant hass, State state, *Context|None context=None, dict[str, Any]|None reproduce_options=None)