Home Assistant Unofficial Reference 2024.12.1
switch.py
Go to the documentation of this file.
1 """Platform for Omnilogic switch integration."""
2 
3 import time
4 from typing import Any
5 
6 from omnilogic import OmniLogicException
7 import voluptuous as vol
8 
9 from homeassistant.components.switch import SwitchEntity
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.core import HomeAssistant
12 from homeassistant.helpers import config_validation as cv, entity_platform
13 from homeassistant.helpers.entity_platform import AddEntitiesCallback
14 
15 from .common import check_guard
16 from .const import COORDINATOR, DOMAIN, PUMP_TYPES
17 from .coordinator import OmniLogicUpdateCoordinator
18 from .entity import OmniLogicEntity
19 
20 SERVICE_SET_SPEED = "set_pump_speed"
21 OMNILOGIC_SWITCH_OFF = 7
22 
23 
25  hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback
26 ) -> None:
27  """Set up the light platform."""
28 
29  coordinator: OmniLogicUpdateCoordinator = hass.data[DOMAIN][entry.entry_id][
30  COORDINATOR
31  ]
32  entities = []
33 
34  for item_id, item in coordinator.data.items():
35  id_len = len(item_id)
36  item_kind = item_id[-2]
37  entity_settings = SWITCH_TYPES.get((id_len, item_kind))
38 
39  if not entity_settings:
40  continue
41 
42  for entity_setting in entity_settings:
43  entity_classes: dict[str, type] = entity_setting["entity_classes"]
44  for state_key, entity_class in entity_classes.items():
45  if check_guard(state_key, item, entity_setting):
46  continue
47 
48  entity = entity_class(
49  coordinator=coordinator,
50  state_key=state_key,
51  name=entity_setting["name"],
52  kind=entity_setting["kind"],
53  item_id=item_id,
54  icon=entity_setting["icon"],
55  )
56 
57  entities.append(entity)
58 
59  async_add_entities(entities)
60 
61  # register service
62  platform = entity_platform.async_get_current_platform()
63 
64  platform.async_register_entity_service(
65  SERVICE_SET_SPEED,
66  {vol.Required("speed"): cv.positive_int},
67  "async_set_speed",
68  )
69 
70 
72  """Define an Omnilogic Base Switch entity to be extended."""
73 
74  def __init__(
75  self,
76  coordinator: OmniLogicUpdateCoordinator,
77  kind: str,
78  name: str,
79  icon: str,
80  item_id: tuple,
81  state_key: str,
82  ) -> None:
83  """Initialize Entities."""
84  super().__init__(
85  coordinator=coordinator,
86  kind=kind,
87  name=name,
88  item_id=item_id,
89  icon=icon,
90  )
91 
92  self._state_key_state_key = state_key
93  self._state_state = None
94  self._last_action_last_action = 0
95  self._state_delay_state_delay = 30
96 
97  @property
98  def is_on(self):
99  """Return the on/off state of the switch."""
100  state_int = 0
101 
102  # The Omnilogic API has a significant delay in state reporting after
103  # calling for a change. This state delay will ensure that HA keeps an
104  # optimistic value of state during this period to improve the user
105  # experience and avoid confusion.
106  if self._last_action_last_action < (time.time() - self._state_delay_state_delay):
107  state_int = int(self.coordinator.data[self._item_id_item_id][self._state_key_state_key])
108 
109  if self._state_state == OMNILOGIC_SWITCH_OFF:
110  state_int = 0
111 
112  self._state_state = state_int != 0
113 
114  return self._state_state
115 
116 
118  """Define the OmniLogic Relay entity."""
119 
120  async def async_turn_on(self, **kwargs):
121  """Turn on the relay."""
122  self._state_state_state = True
123  self._last_action_last_action_last_action = time.time()
124  self.async_write_ha_stateasync_write_ha_state()
125 
126  await self.coordinator.api.set_relay_valve(
127  int(self._item_id_item_id[1]),
128  int(self._item_id_item_id[3]),
129  int(self._item_id_item_id[-1]),
130  1,
131  )
132 
133  async def async_turn_off(self, **kwargs):
134  """Turn off the relay."""
135  self._state_state_state = False
136  self._last_action_last_action_last_action = time.time()
137  self.async_write_ha_stateasync_write_ha_state()
138 
139  await self.coordinator.api.set_relay_valve(
140  int(self._item_id_item_id[1]),
141  int(self._item_id_item_id[3]),
142  int(self._item_id_item_id[-1]),
143  0,
144  )
145 
146 
148  """Define the OmniLogic Pump Switch Entity."""
149 
150  def __init__(
151  self,
152  coordinator: OmniLogicUpdateCoordinator,
153  kind: str,
154  name: str,
155  icon: str,
156  item_id: tuple,
157  state_key: str,
158  ) -> None:
159  """Initialize entities."""
160  super().__init__(
161  coordinator=coordinator,
162  kind=kind,
163  name=name,
164  icon=icon,
165  item_id=item_id,
166  state_key=state_key,
167  )
168 
169  self._max_speed_max_speed = int(coordinator.data[item_id].get("Max-Pump-Speed", 100))
170  self._min_speed_min_speed = int(coordinator.data[item_id].get("Min-Pump-Speed", 0))
171 
172  if "Filter-Type" in coordinator.data[item_id]:
173  self._pump_type_pump_type = PUMP_TYPES[coordinator.data[item_id]["Filter-Type"]]
174  else:
175  self._pump_type_pump_type = PUMP_TYPES[coordinator.data[item_id]["Type"]]
176 
177  self._last_speed_last_speed = None
178 
179  async def async_turn_on(self, **kwargs):
180  """Turn on the pump."""
181  self._state_state_state = True
182  self._last_action_last_action_last_action = time.time()
183  self.async_write_ha_stateasync_write_ha_state()
184 
185  on_value = 100
186 
187  if self._pump_type_pump_type != "SINGLE" and self._last_speed_last_speed:
188  on_value = self._last_speed_last_speed
189 
190  await self.coordinator.api.set_relay_valve(
191  int(self._item_id_item_id[1]),
192  int(self._item_id_item_id[3]),
193  int(self._item_id_item_id[-1]),
194  on_value,
195  )
196 
197  async def async_turn_off(self, **kwargs):
198  """Turn off the pump."""
199  self._state_state_state = False
200  self._last_action_last_action_last_action = time.time()
201  self.async_write_ha_stateasync_write_ha_state()
202 
203  if self._pump_type_pump_type != "SINGLE":
204  if "filterSpeed" in self.coordinator.data[self._item_id_item_id]:
205  self._last_speed_last_speed = self.coordinator.data[self._item_id_item_id]["filterSpeed"]
206  else:
207  self._last_speed_last_speed = self.coordinator.data[self._item_id_item_id]["pumpSpeed"]
208 
209  await self.coordinator.api.set_relay_valve(
210  int(self._item_id_item_id[1]),
211  int(self._item_id_item_id[3]),
212  int(self._item_id_item_id[-1]),
213  0,
214  )
215 
216  async def async_set_speed(self, speed):
217  """Set the switch speed."""
218 
219  if self._pump_type_pump_type != "SINGLE":
220  if self._min_speed_min_speed <= speed <= self._max_speed_max_speed:
221  success = await self.coordinator.api.set_relay_valve(
222  int(self._item_id_item_id[1]),
223  int(self._item_id_item_id[3]),
224  int(self._item_id_item_id[-1]),
225  speed,
226  )
227 
228  if success:
229  self.async_write_ha_stateasync_write_ha_state()
230 
231  else:
232  raise OmniLogicException(
233  "Cannot set speed. Speed is outside pump range."
234  )
235 
236  else:
237  raise OmniLogicException("Cannot set speed on a non-variable speed pump.")
238 
239 
240 SWITCH_TYPES: dict[tuple[int, str], list[dict[str, Any]]] = {
241  (4, "Relays"): [
242  {
243  "entity_classes": {"switchState": OmniLogicRelayControl},
244  "name": "",
245  "kind": "relay",
246  "icon": None,
247  "guard_condition": [],
248  },
249  ],
250  (6, "Relays"): [
251  {
252  "entity_classes": {"switchState": OmniLogicRelayControl},
253  "name": "",
254  "kind": "relay",
255  "icon": None,
256  "guard_condition": [],
257  }
258  ],
259  (6, "Pumps"): [
260  {
261  "entity_classes": {"pumpState": OmniLogicPumpControl},
262  "name": "",
263  "kind": "pump",
264  "icon": None,
265  "guard_condition": [],
266  }
267  ],
268  (6, "Filter"): [
269  {
270  "entity_classes": {"filterState": OmniLogicPumpControl},
271  "name": "",
272  "kind": "pump",
273  "icon": None,
274  "guard_condition": [],
275  }
276  ],
277 }
None __init__(self, OmniLogicUpdateCoordinator coordinator, str kind, str name, str icon, tuple item_id, str state_key)
Definition: switch.py:158
None __init__(self, OmniLogicUpdateCoordinator coordinator, str kind, str name, str icon, tuple item_id, str state_key)
Definition: switch.py:82
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
def check_guard(state_key, item, entity_setting)
Definition: common.py:4
None async_setup_entry(HomeAssistant hass, ConfigEntry entry, AddEntitiesCallback async_add_entities)
Definition: switch.py:26