1 """Support for HDMI CEC."""
3 from __future__
import annotations
5 from functools
import reduce
10 from pycec.cec
import CecAdapter
11 from pycec.commands
import CecCommand, KeyPressCommand, KeyReleaseCommand
12 from pycec.const
import (
22 from pycec.network
import HDMINetwork, PhysicalAddress
23 from pycec.tcp
import TcpAdapter
24 import voluptuous
as vol
32 EVENT_HOMEASSISTANT_START,
33 EVENT_HOMEASSISTANT_STOP,
40 from .const
import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
42 _LOGGER = logging.getLogger(__name__)
44 DEFAULT_DISPLAY_NAME =
"HA"
51 CMD_MUTE_TOGGLE =
"toggle mute"
53 CMD_RELEASE =
"release"
55 EVENT_CEC_COMMAND_RECEIVED =
"cec_command_received"
56 EVENT_CEC_KEYPRESS_RECEIVED =
"cec_keypress_received"
58 ATTR_DEVICE =
"device"
71 ATTR_TOGGLE =
"toggle"
73 _VOL_HEX = vol.Any(vol.Coerce(int),
lambda x:
int(x, 16))
75 SERVICE_SEND_COMMAND =
"send_command"
76 SERVICE_SEND_COMMAND_SCHEMA = vol.Schema(
78 vol.Optional(ATTR_CMD): _VOL_HEX,
79 vol.Optional(ATTR_SRC): _VOL_HEX,
80 vol.Optional(ATTR_DST): _VOL_HEX,
81 vol.Optional(ATTR_ATT): _VOL_HEX,
82 vol.Optional(ATTR_RAW): vol.Coerce(str),
84 extra=vol.PREVENT_EXTRA,
87 SERVICE_VOLUME =
"volume"
88 SERVICE_VOLUME_SCHEMA = vol.Schema(
90 vol.Optional(CMD_UP): vol.Any(CMD_PRESS, CMD_RELEASE, vol.Coerce(int)),
91 vol.Optional(CMD_DOWN): vol.Any(CMD_PRESS, CMD_RELEASE, vol.Coerce(int)),
92 vol.Optional(CMD_MUTE): vol.Any(ATTR_ON, ATTR_OFF, ATTR_TOGGLE),
94 extra=vol.PREVENT_EXTRA,
97 SERVICE_UPDATE_DEVICES =
"update"
98 SERVICE_UPDATE_DEVICES_SCHEMA = vol.Schema(
99 {DOMAIN: vol.Schema({})}, extra=vol.PREVENT_EXTRA
102 SERVICE_SELECT_DEVICE =
"select_device"
104 SERVICE_POWER_ON =
"power_on"
105 SERVICE_STANDBY =
"standby"
107 DEVICE_SCHEMA: vol.Schema = vol.Schema(
109 vol.All(cv.positive_int): vol.Any(
117 CONF_DISPLAY_NAME =
"osd_name"
119 CONFIG_SCHEMA = vol.Schema(
123 vol.Optional(CONF_DEVICES): vol.Any(
124 DEVICE_SCHEMA, vol.Schema({vol.All(cv.string): vol.Any(cv.string)})
126 vol.Optional(CONF_PLATFORM): vol.Any(SWITCH, MEDIA_PLAYER),
127 vol.Optional(CONF_HOST): cv.string,
128 vol.Optional(CONF_DISPLAY_NAME): cv.string,
129 vol.Optional(CONF_TYPES, default={}): vol.Schema(
130 {cv.entity_id: vol.Any(MEDIA_PLAYER, SWITCH)}
135 extra=vol.ALLOW_EXTRA,
138 WATCHDOG_INTERVAL = 120
142 """Right-pad a physical address."""
143 return addr + [0] * (4 - len(addr))
147 """Parse configuration device mapping."""
150 for addr, val
in mapping.items():
151 if isinstance(addr, (str,))
and isinstance(val, (str,)):
152 yield (addr, PhysicalAddress(val))
154 cur = [*parents, addr]
155 if isinstance(val, dict):
157 elif isinstance(val, str):
161 def setup(hass: HomeAssistant, base_config: ConfigType) -> bool:
162 """Set up the CEC capability."""
164 hass.data[DOMAIN] = {}
169 devices = base_config[DOMAIN].
get(CONF_DEVICES, {})
170 _LOGGER.debug(
"Parsing config %s", devices)
172 _LOGGER.debug(
"Parsed devices: %s", device_aliases)
174 platform = base_config[DOMAIN].
get(CONF_PLATFORM, SWITCH)
178 hass.loop
if multiprocessing.cpu_count() < 2
else None
180 host = base_config[DOMAIN].
get(CONF_HOST)
181 display_name = base_config[DOMAIN].
get(CONF_DISPLAY_NAME, DEFAULT_DISPLAY_NAME)
183 adapter = TcpAdapter(host, name=display_name, activate_source=
False)
185 adapter = CecAdapter(name=display_name[:12], activate_source=
False)
186 hdmi_network = HDMINetwork(adapter, loop=loop)
188 def _adapter_watchdog(now=None):
189 _LOGGER.debug(
"Reached _adapter_watchdog")
190 event.call_later(hass, WATCHDOG_INTERVAL, _adapter_watchdog_job)
191 if not adapter.initialized:
192 _LOGGER.warning(
"Adapter not initialized; Trying to restart")
193 hass.bus.fire(EVENT_HDMI_CEC_UNAVAILABLE)
196 _adapter_watchdog_job =
HassJob(_adapter_watchdog, cancel_on_shutdown=
True)
199 def _async_initialized_callback(*_: Any):
200 """Add watchdog on initialization."""
201 return event.async_call_later(hass, WATCHDOG_INTERVAL, _adapter_watchdog_job)
203 hdmi_network.set_initialized_callback(_async_initialized_callback)
205 def _volume(call: ServiceCall) ->
None:
206 """Increase/decrease volume and mute/unmute system."""
208 ATTR_TOGGLE: KEY_MUTE_TOGGLE,
209 ATTR_ON: KEY_MUTE_ON,
210 ATTR_OFF: KEY_MUTE_OFF,
212 for cmd, att
in call.data.items():
214 _process_volume(KEY_VOLUME_UP, att)
215 elif cmd == CMD_DOWN:
216 _process_volume(KEY_VOLUME_DOWN, att)
217 elif cmd == CMD_MUTE:
218 hdmi_network.send_command(
219 KeyPressCommand(mute_key_mapping[att], dst=ADDR_AUDIOSYSTEM)
221 hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
222 _LOGGER.debug(
"Audio muted")
224 _LOGGER.warning(
"Unknown command %s", cmd)
226 def _process_volume(cmd, att):
227 if isinstance(att, (str,)):
230 hdmi_network.send_command(KeyPressCommand(cmd, dst=ADDR_AUDIOSYSTEM))
231 elif att == CMD_RELEASE:
232 hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
234 att = 1
if att ==
"" else int(att)
236 hdmi_network.send_command(KeyPressCommand(cmd, dst=ADDR_AUDIOSYSTEM))
237 hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
239 def _tx(call: ServiceCall) ->
None:
240 """Send CEC command."""
243 command = CecCommand(data[ATTR_RAW])
245 src = data.get(ATTR_SRC, ADDR_UNREGISTERED)
246 dst = data.get(ATTR_DST, ADDR_BROADCAST)
250 _LOGGER.error(
"Attribute 'cmd' is missing")
253 if isinstance(data[ATTR_ATT], (list,)):
256 att = reduce(
lambda x, y: f
"{x}:{y:x}", data[ATTR_ATT])
259 command = CecCommand(cmd, dst, src, att)
260 hdmi_network.send_command(command)
262 def _standby(call: ServiceCall) ->
None:
263 hdmi_network.standby()
265 def _power_on(call: ServiceCall) ->
None:
266 hdmi_network.power_on()
268 def _select_device(call: ServiceCall) ->
None:
269 """Select the active device."""
270 if not (addr := call.data[ATTR_DEVICE]):
271 _LOGGER.error(
"Device not found: %s", call.data[ATTR_DEVICE])
273 if addr
in device_aliases:
274 addr = device_aliases[addr]
276 entity = hass.states.get(addr)
277 _LOGGER.debug(
"Selecting entity %s", entity)
278 if entity
is not None:
279 addr = entity.attributes[
"physical_address"]
280 _LOGGER.debug(
"Address acquired: %s", addr)
283 "Device %s has not physical address", call.data[ATTR_DEVICE]
286 if not isinstance(addr, (PhysicalAddress,)):
287 addr = PhysicalAddress(addr)
288 hdmi_network.active_source(addr)
289 _LOGGER.debug(
"Selected %s (%s)", call.data[ATTR_DEVICE], addr)
291 def _update(call: ServiceCall) ->
None:
292 """Update if device update is needed.
294 Called by service, requests CEC network to update data.
298 def _new_device(device):
299 """Handle new devices which are detected by HDMI network."""
300 key = f
"{DOMAIN}.{device.name}"
301 hass.data[DOMAIN][key] = device
302 ent_platform = base_config[DOMAIN][CONF_TYPES].
get(key, platform)
303 discovery.load_platform(
307 discovered={ATTR_NEW: [key]},
308 hass_config=base_config,
314 def _start_cec(callback_event):
315 """Register services and start HDMI network to watch for devices."""
316 hass.services.register(
317 DOMAIN, SERVICE_SEND_COMMAND, _tx, SERVICE_SEND_COMMAND_SCHEMA
319 hass.services.register(
320 DOMAIN, SERVICE_VOLUME, _volume, schema=SERVICE_VOLUME_SCHEMA
322 hass.services.register(
324 SERVICE_UPDATE_DEVICES,
326 schema=SERVICE_UPDATE_DEVICES_SCHEMA,
328 hass.services.register(DOMAIN, SERVICE_POWER_ON, _power_on)
329 hass.services.register(DOMAIN, SERVICE_STANDBY, _standby)
330 hass.services.register(DOMAIN, SERVICE_SELECT_DEVICE, _select_device)
332 hdmi_network.set_new_device_callback(_new_device)
335 hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_cec)
336 hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown)
web.Response get(self, web.Request request, str config_key)
def parse_mapping(mapping, parents=None)
bool setup(HomeAssistant hass, ConfigType base_config)
def pad_physical_address(addr)