1 """Support for Rflink devices."""
3 from __future__
import annotations
8 from rflink.protocol
import ProtocolBase
18 DATA_ENTITY_GROUP_LOOKUP,
20 DEFAULT_SIGNAL_REPETITIONS,
26 from .utils
import brightness_to_rflink, identify_event_type
28 _LOGGER = logging.getLogger(__name__)
30 EVENT_BUTTON_PRESSED =
"button_pressed"
34 """Representation of a Rflink device.
36 Contains the common logic for Rflink entities.
39 _state: bool |
None =
None
41 _attr_should_poll =
False
53 signal_repetitions=DEFAULT_SIGNAL_REPETITIONS,
55 """Initialize the device."""
63 self.
_name_name = device_id
74 """Handle incoming event for device type."""
83 self.
hasshass.bus.async_fire(
85 {ATTR_ENTITY_ID: self.
entity_identity_id, ATTR_STATE: event[EVENT_KEY_COMMAND]},
88 "Fired bus event for %s: %s", self.
entity_identity_id, event[EVENT_KEY_COMMAND]
92 """Platform specific event handler."""
93 raise NotImplementedError
97 """Return a name for the device."""
98 return self.
_name_name
102 """Return true if device is on."""
109 """Assume device state until first device event sets state."""
110 return self._state
is None
114 """Return True if entity is available."""
119 """Update availability state."""
124 """Register update callback."""
127 tmp_entity = TMP_ENTITY.format(self.
_device_id_device_id)
130 in self.
hasshass.data[DATA_ENTITY_LOOKUP][EVENT_KEY_COMMAND][self.
_device_id_device_id]
132 self.
hasshass.data[DATA_ENTITY_LOOKUP][EVENT_KEY_COMMAND][
137 self.
hasshass.data[DATA_ENTITY_LOOKUP][EVENT_KEY_COMMAND][self.
_device_id_device_id].append(
141 self.
hasshass.data[DATA_ENTITY_GROUP_LOOKUP][EVENT_KEY_COMMAND][
147 self.
hasshass.data[DATA_ENTITY_LOOKUP][EVENT_KEY_COMMAND][_id].append(
150 self.
hasshass.data[DATA_ENTITY_GROUP_LOOKUP][EVENT_KEY_COMMAND][_id].append(
156 self.
hasshass.data[DATA_ENTITY_GROUP_LOOKUP][EVENT_KEY_COMMAND][_id].append(
162 self.
hasshass.data[DATA_ENTITY_LOOKUP][EVENT_KEY_COMMAND][_id].append(
173 SIGNAL_HANDLE_EVENT.format(self.
entity_identity_id),
184 """Singleton class to make Rflink command interface available to entities.
186 This class is to be inherited by every Entity class that is actionable
187 (switches/lights). It exposes the Rflink command interface for these
190 The Rflink interface is managed as a class level and set during setup (and
196 _repetition_task: asyncio.Task[
None] |
None =
None
198 _protocol: ProtocolBase |
None =
None
200 _wait_ack: bool |
None =
None
204 cls, protocol: ProtocolBase |
None, wait_ack: bool |
None =
None
206 """Set the Rflink asyncio protocol as a class variable."""
208 if wait_ack
is not None:
213 """Return connection status."""
218 """Send device command to Rflink and wait for acknowledgement."""
219 return await cls.
_protocol_protocol.send_command_ack(device_id, action)
222 """Do bookkeeping for command, send it to rflink and update state."""
225 if command ==
"turn_on":
229 elif command ==
"turn_off":
233 elif command ==
"dim":
238 elif command ==
"toggle":
245 elif command ==
"close_cover":
249 elif command ==
"open_cover":
253 elif command ==
"stop_cover":
266 """Cancel queued signal repetition commands.
268 For example when user changed state while repetitions are still
269 queued for broadcast. Or when an incoming Rflink command (remote
270 switch) changes the state.
277 """Send a command for device to Rflink gateway."""
278 _LOGGER.debug(
"Sending command: %s to Rflink device: %s", cmd, self.
_device_id_device_id)
301 """Rflink entity which can switch on/off (eg: light, switch)."""
304 """Restore RFLink device state (ON/OFF)."""
310 """Adjust state if Rflink picks up a remote command for this device."""
313 command = event[
"command"]
314 if command
in [
"on",
"allon"]:
316 elif command
in [
"off",
"alloff"]:
320 """Turn the device on."""
324 """Turn the device off."""
def _async_send_command(self, cmd, repetitions)
def send_command(cls, device_id, action)
def _async_handle_command(self, command, *args)
None set_rflink_protocol(cls, ProtocolBase|None protocol, bool|None wait_ack=None)
def cancel_queued_send_commands(self)
def async_added_to_hass(self)
def handle_event_callback(self, event)
def _availability_callback(self, availability)
def __init__(self, device_id, initial_event=None, name=None, aliases=None, group=True, group_aliases=None, nogroup_aliases=None, fire_event=False, signal_repetitions=DEFAULT_SIGNAL_REPETITIONS)
def _handle_event(self, event)
def async_turn_off(self, **kwargs)
def async_added_to_hass(self)
def async_turn_on(self, **kwargs)
def _handle_event(self, event)
None async_write_ha_state(self)
None async_on_remove(self, CALLBACK_TYPE func)
State|None async_get_last_state(self)
bool remove(self, _T matcher)
int brightness_to_rflink(int brightness)
def identify_event_type(event)
Callable[[], None] async_dispatcher_connect(HomeAssistant hass, str signal, Callable[..., Any] target)