Home Assistant Unofficial Reference 2024.12.1
cover.py
Go to the documentation of this file.
1 """Support for LCN covers."""
2 
3 from collections.abc import Iterable
4 from functools import partial
5 from typing import Any
6 
7 import pypck
8 
9 from homeassistant.components.cover import DOMAIN as DOMAIN_COVER, CoverEntity
10 from homeassistant.config_entries import ConfigEntry
11 from homeassistant.const import CONF_DOMAIN, CONF_ENTITIES
12 from homeassistant.core import HomeAssistant
13 from homeassistant.helpers.entity_platform import AddEntitiesCallback
14 from homeassistant.helpers.typing import ConfigType
15 
16 from .const import (
17  ADD_ENTITIES_CALLBACKS,
18  CONF_DOMAIN_DATA,
19  CONF_MOTOR,
20  CONF_REVERSE_TIME,
21  DOMAIN,
22 )
23 from .entity import LcnEntity
24 from .helpers import InputType
25 
26 PARALLEL_UPDATES = 0
27 
28 
30  config_entry: ConfigEntry,
31  async_add_entities: AddEntitiesCallback,
32  entity_configs: Iterable[ConfigType],
33 ) -> None:
34  """Add entities for this domain."""
35  entities: list[LcnOutputsCover | LcnRelayCover] = []
36  for entity_config in entity_configs:
37  if entity_config[CONF_DOMAIN_DATA][CONF_MOTOR] in "OUTPUTS":
38  entities.append(LcnOutputsCover(entity_config, config_entry))
39  else: # in RELAYS
40  entities.append(LcnRelayCover(entity_config, config_entry))
41 
42  async_add_entities(entities)
43 
44 
46  hass: HomeAssistant,
47  config_entry: ConfigEntry,
48  async_add_entities: AddEntitiesCallback,
49 ) -> None:
50  """Set up LCN cover entities from a config entry."""
51  add_entities = partial(
52  add_lcn_entities,
53  config_entry,
54  async_add_entities,
55  )
56 
57  hass.data[DOMAIN][config_entry.entry_id][ADD_ENTITIES_CALLBACKS].update(
58  {DOMAIN_COVER: add_entities}
59  )
60 
62  (
63  entity_config
64  for entity_config in config_entry.data[CONF_ENTITIES]
65  if entity_config[CONF_DOMAIN] == DOMAIN_COVER
66  ),
67  )
68 
69 
70 class LcnOutputsCover(LcnEntity, CoverEntity):
71  """Representation of a LCN cover connected to output ports."""
72 
73  _attr_is_closed = False
74  _attr_is_closing = False
75  _attr_is_opening = False
76  _attr_assumed_state = True
77 
78  def __init__(self, config: ConfigType, config_entry: ConfigEntry) -> None:
79  """Initialize the LCN cover."""
80  super().__init__(config, config_entry)
81 
82  self.output_idsoutput_ids = [
83  pypck.lcn_defs.OutputPort["OUTPUTUP"].value,
84  pypck.lcn_defs.OutputPort["OUTPUTDOWN"].value,
85  ]
86  if CONF_REVERSE_TIME in config[CONF_DOMAIN_DATA]:
87  self.reverse_timereverse_time = pypck.lcn_defs.MotorReverseTime[
88  config[CONF_DOMAIN_DATA][CONF_REVERSE_TIME]
89  ]
90  else:
91  self.reverse_timereverse_time = None
92 
93  async def async_added_to_hass(self) -> None:
94  """Run when entity about to be added to hass."""
95  await super().async_added_to_hass()
96  if not self.device_connectiondevice_connection.is_group:
97  await self.device_connectiondevice_connection.activate_status_request_handler(
98  pypck.lcn_defs.OutputPort["OUTPUTUP"]
99  )
100  await self.device_connectiondevice_connection.activate_status_request_handler(
101  pypck.lcn_defs.OutputPort["OUTPUTDOWN"]
102  )
103 
104  async def async_will_remove_from_hass(self) -> None:
105  """Run when entity will be removed from hass."""
106  await super().async_will_remove_from_hass()
107  if not self.device_connectiondevice_connection.is_group:
108  await self.device_connectiondevice_connection.cancel_status_request_handler(
109  pypck.lcn_defs.OutputPort["OUTPUTUP"]
110  )
111  await self.device_connectiondevice_connection.cancel_status_request_handler(
112  pypck.lcn_defs.OutputPort["OUTPUTDOWN"]
113  )
114 
115  async def async_close_cover(self, **kwargs: Any) -> None:
116  """Close the cover."""
117  state = pypck.lcn_defs.MotorStateModifier.DOWN
118  if not await self.device_connectiondevice_connection.control_motors_outputs(
119  state, self.reverse_timereverse_time
120  ):
121  return
122  self._attr_is_opening_attr_is_opening_attr_is_opening = False
123  self._attr_is_closing_attr_is_closing_attr_is_closing = True
124  self.async_write_ha_stateasync_write_ha_state()
125 
126  async def async_open_cover(self, **kwargs: Any) -> None:
127  """Open the cover."""
128  state = pypck.lcn_defs.MotorStateModifier.UP
129  if not await self.device_connectiondevice_connection.control_motors_outputs(
130  state, self.reverse_timereverse_time
131  ):
132  return
133  self._attr_is_closed_attr_is_closed_attr_is_closed = False
134  self._attr_is_opening_attr_is_opening_attr_is_opening = True
135  self._attr_is_closing_attr_is_closing_attr_is_closing = False
136  self.async_write_ha_stateasync_write_ha_state()
137 
138  async def async_stop_cover(self, **kwargs: Any) -> None:
139  """Stop the cover."""
140  state = pypck.lcn_defs.MotorStateModifier.STOP
141  if not await self.device_connectiondevice_connection.control_motors_outputs(state):
142  return
143  self._attr_is_closing_attr_is_closing_attr_is_closing = False
144  self._attr_is_opening_attr_is_opening_attr_is_opening = False
145  self.async_write_ha_stateasync_write_ha_state()
146 
147  def input_received(self, input_obj: InputType) -> None:
148  """Set cover states when LCN input object (command) is received."""
149  if (
150  not isinstance(input_obj, pypck.inputs.ModStatusOutput)
151  or input_obj.get_output_id() not in self.output_idsoutput_ids
152  ):
153  return
154 
155  if input_obj.get_percent() > 0: # motor is on
156  if input_obj.get_output_id() == self.output_idsoutput_ids[0]:
157  self._attr_is_opening_attr_is_opening_attr_is_opening = True
158  self._attr_is_closing_attr_is_closing_attr_is_closing = False
159  else: # self.output_ids[1]
160  self._attr_is_opening_attr_is_opening_attr_is_opening = False
161  self._attr_is_closing_attr_is_closing_attr_is_closing = True
162  self._attr_is_closed_attr_is_closed_attr_is_closed = self._attr_is_closing_attr_is_closing_attr_is_closing
163  else: # motor is off
164  # cover is assumed to be closed if we were in closing state before
165  self._attr_is_closed_attr_is_closed_attr_is_closed = self._attr_is_closing_attr_is_closing_attr_is_closing
166  self._attr_is_closing_attr_is_closing_attr_is_closing = False
167  self._attr_is_opening_attr_is_opening_attr_is_opening = False
168 
169  self.async_write_ha_stateasync_write_ha_state()
170 
171 
172 class LcnRelayCover(LcnEntity, CoverEntity):
173  """Representation of a LCN cover connected to relays."""
174 
175  _attr_is_closed = False
176  _attr_is_closing = False
177  _attr_is_opening = False
178  _attr_assumed_state = True
179 
180  def __init__(self, config: ConfigType, config_entry: ConfigEntry) -> None:
181  """Initialize the LCN cover."""
182  super().__init__(config, config_entry)
183 
184  self.motormotor = pypck.lcn_defs.MotorPort[config[CONF_DOMAIN_DATA][CONF_MOTOR]]
185  self.motor_port_onoffmotor_port_onoff = self.motormotor.value * 2
186  self.motor_port_updownmotor_port_updown = self.motor_port_onoffmotor_port_onoff + 1
187 
188  self._is_closed_is_closed = False
189  self._is_closing_is_closing = False
190  self._is_opening_is_opening = False
191 
192  async def async_added_to_hass(self) -> None:
193  """Run when entity about to be added to hass."""
194  await super().async_added_to_hass()
195  if not self.device_connectiondevice_connection.is_group:
196  await self.device_connectiondevice_connection.activate_status_request_handler(self.motormotor)
197 
198  async def async_will_remove_from_hass(self) -> None:
199  """Run when entity will be removed from hass."""
200  await super().async_will_remove_from_hass()
201  if not self.device_connectiondevice_connection.is_group:
202  await self.device_connectiondevice_connection.cancel_status_request_handler(self.motormotor)
203 
204  async def async_close_cover(self, **kwargs: Any) -> None:
205  """Close the cover."""
206  states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4
207  states[self.motormotor.value] = pypck.lcn_defs.MotorStateModifier.DOWN
208  if not await self.device_connectiondevice_connection.control_motors_relays(states):
209  return
210  self._attr_is_opening_attr_is_opening_attr_is_opening = False
211  self._attr_is_closing_attr_is_closing_attr_is_closing = True
212  self.async_write_ha_stateasync_write_ha_state()
213 
214  async def async_open_cover(self, **kwargs: Any) -> None:
215  """Open the cover."""
216  states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4
217  states[self.motormotor.value] = pypck.lcn_defs.MotorStateModifier.UP
218  if not await self.device_connectiondevice_connection.control_motors_relays(states):
219  return
220  self._attr_is_closed_attr_is_closed_attr_is_closed = False
221  self._attr_is_opening_attr_is_opening_attr_is_opening = True
222  self._attr_is_closing_attr_is_closing_attr_is_closing = False
223  self.async_write_ha_stateasync_write_ha_state()
224 
225  async def async_stop_cover(self, **kwargs: Any) -> None:
226  """Stop the cover."""
227  states = [pypck.lcn_defs.MotorStateModifier.NOCHANGE] * 4
228  states[self.motormotor.value] = pypck.lcn_defs.MotorStateModifier.STOP
229  if not await self.device_connectiondevice_connection.control_motors_relays(states):
230  return
231  self._attr_is_closing_attr_is_closing_attr_is_closing = False
232  self._attr_is_opening_attr_is_opening_attr_is_opening = False
233  self.async_write_ha_stateasync_write_ha_state()
234 
235  def input_received(self, input_obj: InputType) -> None:
236  """Set cover states when LCN input object (command) is received."""
237  if not isinstance(input_obj, pypck.inputs.ModStatusRelays):
238  return
239 
240  states = input_obj.states # list of boolean values (relay on/off)
241  if states[self.motor_port_onoffmotor_port_onoff]: # motor is on
242  self._attr_is_opening_attr_is_opening_attr_is_opening = not states[self.motor_port_updownmotor_port_updown] # set direction
243  self._attr_is_closing_attr_is_closing_attr_is_closing = states[self.motor_port_updownmotor_port_updown] # set direction
244  else: # motor is off
245  self._attr_is_opening_attr_is_opening_attr_is_opening = False
246  self._attr_is_closing_attr_is_closing_attr_is_closing = False
247  self._attr_is_closed_attr_is_closed_attr_is_closed = states[self.motor_port_updownmotor_port_updown]
248 
249  self.async_write_ha_stateasync_write_ha_state()
None async_open_cover(self, **Any kwargs)
Definition: cover.py:126
None __init__(self, ConfigType config, ConfigEntry config_entry)
Definition: cover.py:78
None input_received(self, InputType input_obj)
Definition: cover.py:147
None async_stop_cover(self, **Any kwargs)
Definition: cover.py:138
None async_close_cover(self, **Any kwargs)
Definition: cover.py:115
None async_close_cover(self, **Any kwargs)
Definition: cover.py:204
None __init__(self, ConfigType config, ConfigEntry config_entry)
Definition: cover.py:180
None async_open_cover(self, **Any kwargs)
Definition: cover.py:214
None input_received(self, InputType input_obj)
Definition: cover.py:235
None async_stop_cover(self, **Any kwargs)
Definition: cover.py:225
None add_entities(AsusWrtRouter router, AddEntitiesCallback async_add_entities, set[str] tracked)
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
None add_lcn_entities(ConfigEntry config_entry, AddEntitiesCallback async_add_entities, Iterable[ConfigType] entity_configs)
Definition: cover.py:33
None async_setup_entry(HomeAssistant hass, ConfigEntry config_entry, AddEntitiesCallback async_add_entities)
Definition: cover.py:49