Home Assistant Unofficial Reference 2024.12.1
executor.py
Go to the documentation of this file.
1 """Class for helpers and communication with the OverKiz API."""
2 
3 from __future__ import annotations
4 
5 from typing import Any, cast
6 from urllib.parse import urlparse
7 
8 from pyoverkiz.enums import OverkizCommand, Protocol
9 from pyoverkiz.exceptions import OverkizException
10 from pyoverkiz.models import Command, Device, StateDefinition
11 from pyoverkiz.types import StateType as OverkizStateType
12 
13 from homeassistant.exceptions import HomeAssistantError
14 
15 from .coordinator import OverkizDataUpdateCoordinator
16 
17 # Commands that don't support setting
18 # the delay to another value
19 COMMANDS_WITHOUT_DELAY = [
20  OverkizCommand.IDENTIFY,
21  OverkizCommand.OFF,
22  OverkizCommand.ON,
23  OverkizCommand.ON_WITH_TIMER,
24  OverkizCommand.TEST,
25 ]
26 
27 
29  """Representation of an Overkiz device with execution handler."""
30 
31  def __init__(
32  self, device_url: str, coordinator: OverkizDataUpdateCoordinator
33  ) -> None:
34  """Initialize the executor."""
35  self.device_urldevice_url = device_url
36  self.coordinatorcoordinator = coordinator
37  self.base_device_urlbase_device_url = self.device_urldevice_url.split("#")[0]
38 
39  @property
40  def device(self) -> Device:
41  """Return Overkiz device linked to this entity."""
42  return self.coordinatorcoordinator.data[self.device_urldevice_url]
43 
44  def linked_device(self, index: int) -> Device | None:
45  """Return Overkiz device sharing the same base url."""
46  return self.coordinatorcoordinator.data.get(f"{self.base_device_url}#{index}")
47 
48  def select_command(self, *commands: str) -> str | None:
49  """Select first existing command in a list of commands."""
50  existing_commands = self.devicedevice.definition.commands
51  return next((c for c in commands if c in existing_commands), None)
52 
53  def has_command(self, *commands: str) -> bool:
54  """Return True if a command exists in a list of commands."""
55  return self.select_commandselect_command(*commands) is not None
56 
57  def select_definition_state(self, *states: str) -> StateDefinition | None:
58  """Select first existing definition state in a list of states."""
59  for existing_state in self.devicedevice.definition.states:
60  if existing_state.qualified_name in states:
61  return existing_state
62  return None
63 
64  def select_state(self, *states: str) -> OverkizStateType:
65  """Select first existing active state in a list of states."""
66  for state in states:
67  if current_state := self.devicedevice.states[state]:
68  return current_state.value
69 
70  return None
71 
72  def has_state(self, *states: str) -> bool:
73  """Return True if a state exists in self."""
74  return self.select_stateselect_state(*states) is not None
75 
76  def select_attribute(self, *attributes: str) -> OverkizStateType:
77  """Select first existing active state in a list of states."""
78  for attribute in attributes:
79  if current_attribute := self.devicedevice.attributes[attribute]:
80  return current_attribute.value
81 
82  return None
83 
85  self, command_name: str, *args: Any, refresh_afterwards: bool = True
86  ) -> None:
87  """Execute device command in async context.
88 
89  :param refresh_afterwards: Whether to refresh the device state after the command is executed.
90  If several commands are executed, it will be refreshed only once.
91  """
92  parameters = [arg for arg in args if arg is not None]
93  # Set the execution duration to 0 seconds for RTS devices on supported commands
94  # Default execution duration is 30 seconds and will block consecutive commands
95  if (
96  self.devicedevice.protocol == Protocol.RTS
97  and command_name not in COMMANDS_WITHOUT_DELAY
98  ):
99  parameters.append(0)
100 
101  try:
102  exec_id = await self.coordinatorcoordinator.client.execute_command(
103  self.devicedevice.device_url,
104  Command(command_name, parameters),
105  "Home Assistant",
106  )
107  # Catch Overkiz exceptions to support `continue_on_error` functionality
108  except OverkizException as exception:
109  raise HomeAssistantError(exception) from exception
110 
111  # ExecutionRegisteredEvent doesn't contain the device_url, thus we need to register it here
112  self.coordinatorcoordinator.executions[exec_id] = {
113  "device_url": self.devicedevice.device_url,
114  "command_name": command_name,
115  }
116  if refresh_afterwards:
117  await self.coordinatorcoordinator.async_refresh()
118 
120  self, commands_to_cancel: list[OverkizCommand]
121  ) -> bool:
122  """Cancel running execution by command."""
123 
124  # Cancel a running execution
125  # Retrieve executions initiated via Home Assistant from Data Update Coordinator queue
126  exec_id = next(
127  (
128  exec_id
129  # Reverse dictionary to cancel the last added execution
130  for exec_id, execution in reversed(self.coordinatorcoordinator.executions.items())
131  if execution.get("device_url") == self.devicedevice.device_url
132  and execution.get("command_name") in commands_to_cancel
133  ),
134  None,
135  )
136 
137  if exec_id:
138  await self.async_cancel_executionasync_cancel_execution(exec_id)
139  return True
140 
141  # Retrieve executions initiated outside Home Assistant via API
142  executions = cast(Any, await self.coordinatorcoordinator.client.get_current_executions())
143  # executions.action_group is typed incorrectly in the upstream library
144  # or the below code is incorrect.
145  exec_id = next(
146  (
147  execution.id
148  for execution in executions
149  # Reverse dictionary to cancel the last added execution
150  for action in reversed(execution.action_group.get("actions"))
151  for command in action.get("commands")
152  if action.get("device_url") == self.devicedevice.device_url
153  and command.get("name") in commands_to_cancel
154  ),
155  None,
156  )
157 
158  if exec_id:
159  await self.async_cancel_executionasync_cancel_execution(exec_id)
160  return True
161 
162  return False
163 
164  async def async_cancel_execution(self, exec_id: str) -> None:
165  """Cancel running execution via execution id."""
166  await self.coordinatorcoordinator.client.cancel_command(exec_id)
167 
168  def get_gateway_id(self) -> str:
169  """Retrieve gateway id from device url.
170 
171  device URL (<protocol>://<gatewayId>/<deviceAddress>[#<subsystemId>])
172  """
173  url = urlparse(self.device_urldevice_url)
174  return url.netloc
None async_execute_command(self, str command_name, *Any args, bool refresh_afterwards=True)
Definition: executor.py:86
bool async_cancel_command(self, list[OverkizCommand] commands_to_cancel)
Definition: executor.py:121
None __init__(self, str device_url, OverkizDataUpdateCoordinator coordinator)
Definition: executor.py:33
OverkizStateType select_state(self, *str states)
Definition: executor.py:64
OverkizStateType select_attribute(self, *str attributes)
Definition: executor.py:76
StateDefinition|None select_definition_state(self, *str states)
Definition: executor.py:57