Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Support for HDMI CEC."""
2 
3 from __future__ import annotations
4 
5 from functools import reduce
6 import logging
7 import multiprocessing
8 from typing import Any
9 
10 from pycec.cec import CecAdapter
11 from pycec.commands import CecCommand, KeyPressCommand, KeyReleaseCommand
12 from pycec.const import (
13  ADDR_AUDIOSYSTEM,
14  ADDR_BROADCAST,
15  ADDR_UNREGISTERED,
16  KEY_MUTE_OFF,
17  KEY_MUTE_ON,
18  KEY_MUTE_TOGGLE,
19  KEY_VOLUME_DOWN,
20  KEY_VOLUME_UP,
21 )
22 from pycec.network import HDMINetwork, PhysicalAddress
23 from pycec.tcp import TcpAdapter
24 import voluptuous as vol
25 
26 from homeassistant.components.media_player import DOMAIN as MEDIA_PLAYER
27 from homeassistant.components.switch import DOMAIN as SWITCH
28 from homeassistant.const import (
29  CONF_DEVICES,
30  CONF_HOST,
31  CONF_PLATFORM,
32  EVENT_HOMEASSISTANT_START,
33  EVENT_HOMEASSISTANT_STOP,
34 )
35 from homeassistant.core import HassJob, HomeAssistant, ServiceCall, callback
36 from homeassistant.helpers import discovery, event
38 from homeassistant.helpers.typing import ConfigType
39 
40 from .const import DOMAIN, EVENT_HDMI_CEC_UNAVAILABLE
41 
42 _LOGGER = logging.getLogger(__name__)
43 
44 DEFAULT_DISPLAY_NAME = "HA"
45 CONF_TYPES = "types"
46 
47 CMD_UP = "up"
48 CMD_DOWN = "down"
49 CMD_MUTE = "mute"
50 CMD_UNMUTE = "unmute"
51 CMD_MUTE_TOGGLE = "toggle mute"
52 CMD_PRESS = "press"
53 CMD_RELEASE = "release"
54 
55 EVENT_CEC_COMMAND_RECEIVED = "cec_command_received"
56 EVENT_CEC_KEYPRESS_RECEIVED = "cec_keypress_received"
57 
58 ATTR_DEVICE = "device"
59 ATTR_KEY = "key"
60 ATTR_DUR = "dur"
61 ATTR_SRC = "src"
62 ATTR_DST = "dst"
63 ATTR_CMD = "cmd"
64 ATTR_ATT = "att"
65 ATTR_RAW = "raw"
66 ATTR_DIR = "dir"
67 ATTR_ABT = "abt"
68 ATTR_NEW = "new"
69 ATTR_ON = "on"
70 ATTR_OFF = "off"
71 ATTR_TOGGLE = "toggle"
72 
73 _VOL_HEX = vol.Any(vol.Coerce(int), lambda x: int(x, 16))
74 
75 SERVICE_SEND_COMMAND = "send_command"
76 SERVICE_SEND_COMMAND_SCHEMA = vol.Schema(
77  {
78  vol.Optional(ATTR_CMD): _VOL_HEX,
79  vol.Optional(ATTR_SRC): _VOL_HEX,
80  vol.Optional(ATTR_DST): _VOL_HEX,
81  vol.Optional(ATTR_ATT): _VOL_HEX,
82  vol.Optional(ATTR_RAW): vol.Coerce(str),
83  },
84  extra=vol.PREVENT_EXTRA,
85 )
86 
87 SERVICE_VOLUME = "volume"
88 SERVICE_VOLUME_SCHEMA = vol.Schema(
89  {
90  vol.Optional(CMD_UP): vol.Any(CMD_PRESS, CMD_RELEASE, vol.Coerce(int)),
91  vol.Optional(CMD_DOWN): vol.Any(CMD_PRESS, CMD_RELEASE, vol.Coerce(int)),
92  vol.Optional(CMD_MUTE): vol.Any(ATTR_ON, ATTR_OFF, ATTR_TOGGLE),
93  },
94  extra=vol.PREVENT_EXTRA,
95 )
96 
97 SERVICE_UPDATE_DEVICES = "update"
98 SERVICE_UPDATE_DEVICES_SCHEMA = vol.Schema(
99  {DOMAIN: vol.Schema({})}, extra=vol.PREVENT_EXTRA
100 )
101 
102 SERVICE_SELECT_DEVICE = "select_device"
103 
104 SERVICE_POWER_ON = "power_on"
105 SERVICE_STANDBY = "standby"
106 
107 DEVICE_SCHEMA: vol.Schema = vol.Schema(
108  {
109  vol.All(cv.positive_int): vol.Any(
110  # pylint: disable-next=unnecessary-lambda
111  lambda devices: DEVICE_SCHEMA(devices),
112  cv.string,
113  )
114  }
115 )
116 
117 CONF_DISPLAY_NAME = "osd_name"
118 
119 CONFIG_SCHEMA = vol.Schema(
120  {
121  DOMAIN: vol.Schema(
122  {
123  vol.Optional(CONF_DEVICES): vol.Any(
124  DEVICE_SCHEMA, vol.Schema({vol.All(cv.string): vol.Any(cv.string)})
125  ),
126  vol.Optional(CONF_PLATFORM): vol.Any(SWITCH, MEDIA_PLAYER),
127  vol.Optional(CONF_HOST): cv.string,
128  vol.Optional(CONF_DISPLAY_NAME): cv.string,
129  vol.Optional(CONF_TYPES, default={}): vol.Schema(
130  {cv.entity_id: vol.Any(MEDIA_PLAYER, SWITCH)}
131  ),
132  }
133  )
134  },
135  extra=vol.ALLOW_EXTRA,
136 )
137 
138 WATCHDOG_INTERVAL = 120
139 
140 
142  """Right-pad a physical address."""
143  return addr + [0] * (4 - len(addr))
144 
145 
146 def parse_mapping(mapping, parents=None):
147  """Parse configuration device mapping."""
148  if parents is None:
149  parents = []
150  for addr, val in mapping.items():
151  if isinstance(addr, (str,)) and isinstance(val, (str,)):
152  yield (addr, PhysicalAddress(val))
153  else:
154  cur = [*parents, addr]
155  if isinstance(val, dict):
156  yield from parse_mapping(val, cur)
157  elif isinstance(val, str):
158  yield (val, pad_physical_address(cur))
159 
160 
161 def setup(hass: HomeAssistant, base_config: ConfigType) -> bool: # noqa: C901
162  """Set up the CEC capability."""
163 
164  hass.data[DOMAIN] = {}
165 
166  # Parse configuration into a dict of device name to physical address
167  # represented as a list of four elements.
168  device_aliases = {}
169  devices = base_config[DOMAIN].get(CONF_DEVICES, {})
170  _LOGGER.debug("Parsing config %s", devices)
171  device_aliases.update(parse_mapping(devices))
172  _LOGGER.debug("Parsed devices: %s", device_aliases)
173 
174  platform = base_config[DOMAIN].get(CONF_PLATFORM, SWITCH)
175 
176  loop = (
177  # Create own thread if more than 1 CPU
178  hass.loop if multiprocessing.cpu_count() < 2 else None
179  )
180  host = base_config[DOMAIN].get(CONF_HOST)
181  display_name = base_config[DOMAIN].get(CONF_DISPLAY_NAME, DEFAULT_DISPLAY_NAME)
182  if host:
183  adapter = TcpAdapter(host, name=display_name, activate_source=False)
184  else:
185  adapter = CecAdapter(name=display_name[:12], activate_source=False)
186  hdmi_network = HDMINetwork(adapter, loop=loop)
187 
188  def _adapter_watchdog(now=None):
189  _LOGGER.debug("Reached _adapter_watchdog")
190  event.call_later(hass, WATCHDOG_INTERVAL, _adapter_watchdog_job)
191  if not adapter.initialized:
192  _LOGGER.warning("Adapter not initialized; Trying to restart")
193  hass.bus.fire(EVENT_HDMI_CEC_UNAVAILABLE)
194  adapter.init()
195 
196  _adapter_watchdog_job = HassJob(_adapter_watchdog, cancel_on_shutdown=True)
197 
198  @callback
199  def _async_initialized_callback(*_: Any):
200  """Add watchdog on initialization."""
201  return event.async_call_later(hass, WATCHDOG_INTERVAL, _adapter_watchdog_job)
202 
203  hdmi_network.set_initialized_callback(_async_initialized_callback)
204 
205  def _volume(call: ServiceCall) -> None:
206  """Increase/decrease volume and mute/unmute system."""
207  mute_key_mapping = {
208  ATTR_TOGGLE: KEY_MUTE_TOGGLE,
209  ATTR_ON: KEY_MUTE_ON,
210  ATTR_OFF: KEY_MUTE_OFF,
211  }
212  for cmd, att in call.data.items():
213  if cmd == CMD_UP:
214  _process_volume(KEY_VOLUME_UP, att)
215  elif cmd == CMD_DOWN:
216  _process_volume(KEY_VOLUME_DOWN, att)
217  elif cmd == CMD_MUTE:
218  hdmi_network.send_command(
219  KeyPressCommand(mute_key_mapping[att], dst=ADDR_AUDIOSYSTEM)
220  )
221  hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
222  _LOGGER.debug("Audio muted")
223  else:
224  _LOGGER.warning("Unknown command %s", cmd)
225 
226  def _process_volume(cmd, att):
227  if isinstance(att, (str,)):
228  att = att.strip()
229  if att == CMD_PRESS:
230  hdmi_network.send_command(KeyPressCommand(cmd, dst=ADDR_AUDIOSYSTEM))
231  elif att == CMD_RELEASE:
232  hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
233  else:
234  att = 1 if att == "" else int(att)
235  for _ in range(att):
236  hdmi_network.send_command(KeyPressCommand(cmd, dst=ADDR_AUDIOSYSTEM))
237  hdmi_network.send_command(KeyReleaseCommand(dst=ADDR_AUDIOSYSTEM))
238 
239  def _tx(call: ServiceCall) -> None:
240  """Send CEC command."""
241  data = call.data
242  if ATTR_RAW in data:
243  command = CecCommand(data[ATTR_RAW])
244  else:
245  src = data.get(ATTR_SRC, ADDR_UNREGISTERED)
246  dst = data.get(ATTR_DST, ADDR_BROADCAST)
247  if ATTR_CMD in data:
248  cmd = data[ATTR_CMD]
249  else:
250  _LOGGER.error("Attribute 'cmd' is missing")
251  return
252  if ATTR_ATT in data:
253  if isinstance(data[ATTR_ATT], (list,)):
254  att = data[ATTR_ATT]
255  else:
256  att = reduce(lambda x, y: f"{x}:{y:x}", data[ATTR_ATT])
257  else:
258  att = ""
259  command = CecCommand(cmd, dst, src, att)
260  hdmi_network.send_command(command)
261 
262  def _standby(call: ServiceCall) -> None:
263  hdmi_network.standby()
264 
265  def _power_on(call: ServiceCall) -> None:
266  hdmi_network.power_on()
267 
268  def _select_device(call: ServiceCall) -> None:
269  """Select the active device."""
270  if not (addr := call.data[ATTR_DEVICE]):
271  _LOGGER.error("Device not found: %s", call.data[ATTR_DEVICE])
272  return
273  if addr in device_aliases:
274  addr = device_aliases[addr]
275  else:
276  entity = hass.states.get(addr)
277  _LOGGER.debug("Selecting entity %s", entity)
278  if entity is not None:
279  addr = entity.attributes["physical_address"]
280  _LOGGER.debug("Address acquired: %s", addr)
281  if addr is None:
282  _LOGGER.error(
283  "Device %s has not physical address", call.data[ATTR_DEVICE]
284  )
285  return
286  if not isinstance(addr, (PhysicalAddress,)):
287  addr = PhysicalAddress(addr)
288  hdmi_network.active_source(addr)
289  _LOGGER.debug("Selected %s (%s)", call.data[ATTR_DEVICE], addr)
290 
291  def _update(call: ServiceCall) -> None:
292  """Update if device update is needed.
293 
294  Called by service, requests CEC network to update data.
295  """
296  hdmi_network.scan()
297 
298  def _new_device(device):
299  """Handle new devices which are detected by HDMI network."""
300  key = f"{DOMAIN}.{device.name}"
301  hass.data[DOMAIN][key] = device
302  ent_platform = base_config[DOMAIN][CONF_TYPES].get(key, platform)
303  discovery.load_platform(
304  hass,
305  ent_platform,
306  DOMAIN,
307  discovered={ATTR_NEW: [key]},
308  hass_config=base_config,
309  )
310 
311  def _shutdown(call):
312  hdmi_network.stop()
313 
314  def _start_cec(callback_event):
315  """Register services and start HDMI network to watch for devices."""
316  hass.services.register(
317  DOMAIN, SERVICE_SEND_COMMAND, _tx, SERVICE_SEND_COMMAND_SCHEMA
318  )
319  hass.services.register(
320  DOMAIN, SERVICE_VOLUME, _volume, schema=SERVICE_VOLUME_SCHEMA
321  )
322  hass.services.register(
323  DOMAIN,
324  SERVICE_UPDATE_DEVICES,
325  _update,
326  schema=SERVICE_UPDATE_DEVICES_SCHEMA,
327  )
328  hass.services.register(DOMAIN, SERVICE_POWER_ON, _power_on)
329  hass.services.register(DOMAIN, SERVICE_STANDBY, _standby)
330  hass.services.register(DOMAIN, SERVICE_SELECT_DEVICE, _select_device)
331 
332  hdmi_network.set_new_device_callback(_new_device)
333  hdmi_network.start()
334 
335  hass.bus.listen_once(EVENT_HOMEASSISTANT_START, _start_cec)
336  hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, _shutdown)
337  return True
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
def parse_mapping(mapping, parents=None)
Definition: __init__.py:146
bool setup(HomeAssistant hass, ConfigType base_config)
Definition: __init__.py:161