Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """The System Bridge integration."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from dataclasses import asdict
7 import logging
8 from typing import Any
9 
10 from systembridgeconnector.exceptions import (
11  AuthenticationException,
12  ConnectionClosedException,
13  ConnectionErrorException,
14 )
15 from systembridgeconnector.version import Version
16 from systembridgemodels.keyboard_key import KeyboardKey
17 from systembridgemodels.keyboard_text import KeyboardText
18 from systembridgemodels.modules.processes import Process
19 from systembridgemodels.open_path import OpenPath
20 from systembridgemodels.open_url import OpenUrl
21 import voluptuous as vol
22 
23 from homeassistant.config_entries import ConfigEntry
24 from homeassistant.const import (
25  CONF_API_KEY,
26  CONF_COMMAND,
27  CONF_ENTITY_ID,
28  CONF_HOST,
29  CONF_ID,
30  CONF_NAME,
31  CONF_PATH,
32  CONF_PORT,
33  CONF_TOKEN,
34  CONF_URL,
35  Platform,
36 )
37 from homeassistant.core import (
38  HomeAssistant,
39  ServiceCall,
40  ServiceResponse,
41  SupportsResponse,
42 )
43 from homeassistant.exceptions import (
44  ConfigEntryAuthFailed,
45  ConfigEntryNotReady,
46  HomeAssistantError,
47  ServiceValidationError,
48 )
49 from homeassistant.helpers import (
50  config_validation as cv,
51  device_registry as dr,
52  discovery,
53 )
54 from homeassistant.helpers.aiohttp_client import async_get_clientsession
55 from homeassistant.helpers.issue_registry import IssueSeverity, async_create_issue
56 
57 from .config_flow import SystemBridgeConfigFlow
58 from .const import DATA_WAIT_TIMEOUT, DOMAIN, MODULES
59 from .coordinator import SystemBridgeDataUpdateCoordinator
60 
61 _LOGGER = logging.getLogger(__name__)
62 
63 PLATFORMS = [
64  Platform.BINARY_SENSOR,
65  Platform.MEDIA_PLAYER,
66  Platform.NOTIFY,
67  Platform.SENSOR,
68  Platform.UPDATE,
69 ]
70 
71 CONF_BRIDGE = "bridge"
72 CONF_KEY = "key"
73 CONF_TEXT = "text"
74 
75 SERVICE_GET_PROCESS_BY_ID = "get_process_by_id"
76 SERVICE_GET_PROCESSES_BY_NAME = "get_processes_by_name"
77 SERVICE_OPEN_PATH = "open_path"
78 SERVICE_POWER_COMMAND = "power_command"
79 SERVICE_OPEN_URL = "open_url"
80 SERVICE_SEND_KEYPRESS = "send_keypress"
81 SERVICE_SEND_TEXT = "send_text"
82 
83 POWER_COMMAND_MAP = {
84  "hibernate": "power_hibernate",
85  "lock": "power_lock",
86  "logout": "power_logout",
87  "restart": "power_restart",
88  "shutdown": "power_shutdown",
89  "sleep": "power_sleep",
90 }
91 
92 
94  hass: HomeAssistant,
95  entry: ConfigEntry,
96 ) -> bool:
97  """Set up System Bridge from a config entry."""
98 
99  # Check version before initialising
100  version = Version(
101  entry.data[CONF_HOST],
102  entry.data[CONF_PORT],
103  entry.data[CONF_TOKEN],
104  session=async_get_clientsession(hass),
105  )
106  supported = False
107  try:
108  async with asyncio.timeout(DATA_WAIT_TIMEOUT):
109  supported = await version.check_supported()
110  except AuthenticationException as exception:
111  _LOGGER.error("Authentication failed for %s: %s", entry.title, exception)
112  raise ConfigEntryAuthFailed(
113  translation_domain=DOMAIN,
114  translation_key="authentication_failed",
115  translation_placeholders={
116  "title": entry.title,
117  "host": entry.data[CONF_HOST],
118  },
119  ) from exception
120  except (ConnectionClosedException, ConnectionErrorException) as exception:
121  raise ConfigEntryNotReady(
122  translation_domain=DOMAIN,
123  translation_key="connection_failed",
124  translation_placeholders={
125  "title": entry.title,
126  "host": entry.data[CONF_HOST],
127  },
128  ) from exception
129  except TimeoutError as exception:
130  raise ConfigEntryNotReady(
131  translation_domain=DOMAIN,
132  translation_key="timeout",
133  translation_placeholders={
134  "title": entry.title,
135  "host": entry.data[CONF_HOST],
136  },
137  ) from exception
138 
139  # If not supported, create an issue and raise ConfigEntryNotReady
140  if not supported:
142  hass=hass,
143  domain=DOMAIN,
144  issue_id=f"system_bridge_{entry.entry_id}_unsupported_version",
145  translation_key="unsupported_version",
146  translation_placeholders={"host": entry.data[CONF_HOST]},
147  severity=IssueSeverity.ERROR,
148  is_fixable=False,
149  )
150  raise ConfigEntryNotReady(
151  translation_domain=DOMAIN,
152  translation_key="unsupported_version",
153  translation_placeholders={
154  "title": entry.title,
155  "host": entry.data[CONF_HOST],
156  },
157  )
158 
159  coordinator = SystemBridgeDataUpdateCoordinator(
160  hass,
161  _LOGGER,
162  entry=entry,
163  )
164 
165  try:
166  async with asyncio.timeout(DATA_WAIT_TIMEOUT):
167  await coordinator.async_get_data(MODULES)
168  except AuthenticationException as exception:
169  _LOGGER.error("Authentication failed for %s: %s", entry.title, exception)
170  raise ConfigEntryAuthFailed(
171  translation_domain=DOMAIN,
172  translation_key="authentication_failed",
173  translation_placeholders={
174  "title": entry.title,
175  "host": entry.data[CONF_HOST],
176  },
177  ) from exception
178  except (ConnectionClosedException, ConnectionErrorException) as exception:
179  raise ConfigEntryNotReady(
180  translation_domain=DOMAIN,
181  translation_key="connection_failed",
182  translation_placeholders={
183  "title": entry.title,
184  "host": entry.data[CONF_HOST],
185  },
186  ) from exception
187  except TimeoutError as exception:
188  raise ConfigEntryNotReady(
189  translation_domain=DOMAIN,
190  translation_key="timeout",
191  translation_placeholders={
192  "title": entry.title,
193  "host": entry.data[CONF_HOST],
194  },
195  ) from exception
196 
197  # Fetch initial data so we have data when entities subscribe
198  await coordinator.async_config_entry_first_refresh()
199 
200  hass.data.setdefault(DOMAIN, {})
201  hass.data[DOMAIN][entry.entry_id] = coordinator
202 
203  # Set up all platforms except notify
204  await hass.config_entries.async_forward_entry_setups(
205  entry, [platform for platform in PLATFORMS if platform != Platform.NOTIFY]
206  )
207 
208  # Set up notify platform
209  hass.async_create_task(
210  discovery.async_load_platform(
211  hass,
212  Platform.NOTIFY,
213  DOMAIN,
214  {
215  CONF_NAME: f"{DOMAIN}_{coordinator.data.system.hostname}",
216  CONF_ENTITY_ID: entry.entry_id,
217  },
218  hass.data[DOMAIN][entry.entry_id],
219  )
220  )
221 
222  if hass.services.has_service(DOMAIN, SERVICE_OPEN_URL):
223  return True
224 
225  def valid_device(device: str) -> str:
226  """Check device is valid."""
227  device_registry = dr.async_get(hass)
228  device_entry = device_registry.async_get(device)
229  if device_entry is not None:
230  try:
231  return next(
232  entry.entry_id
233  for entry in hass.config_entries.async_entries(DOMAIN)
234  if entry.entry_id in device_entry.config_entries
235  )
236  except StopIteration as exception:
237  raise HomeAssistantError(
238  translation_domain=DOMAIN,
239  translation_key="device_not_found",
240  translation_placeholders={"device": device},
241  ) from exception
242  raise HomeAssistantError(
243  translation_domain=DOMAIN,
244  translation_key="device_not_found",
245  translation_placeholders={"device": device},
246  )
247 
248  async def handle_get_process_by_id(service_call: ServiceCall) -> ServiceResponse:
249  """Handle the get process by id service call."""
250  _LOGGER.debug("Get process by id: %s", service_call.data)
251  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
252  service_call.data[CONF_BRIDGE]
253  ]
254  processes: list[Process] = coordinator.data.processes
255 
256  # Find process.id from list, raise ServiceValidationError if not found
257  try:
258  return asdict(
259  next(
260  process
261  for process in processes
262  if process.id == service_call.data[CONF_ID]
263  )
264  )
265  except StopIteration as exception:
267  translation_domain=DOMAIN,
268  translation_key="process_not_found",
269  translation_placeholders={"id": service_call.data[CONF_ID]},
270  ) from exception
271 
272  async def handle_get_processes_by_name(
273  service_call: ServiceCall,
274  ) -> ServiceResponse:
275  """Handle the get process by name service call."""
276  _LOGGER.debug("Get process by name: %s", service_call.data)
277  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
278  service_call.data[CONF_BRIDGE]
279  ]
280 
281  # Find processes from list
282  items: list[dict[str, Any]] = [
283  asdict(process)
284  for process in coordinator.data.processes
285  if process.name is not None
286  and service_call.data[CONF_NAME].lower() in process.name.lower()
287  ]
288 
289  return {
290  "count": len(items),
291  "processes": list(items),
292  }
293 
294  async def handle_open_path(service_call: ServiceCall) -> ServiceResponse:
295  """Handle the open path service call."""
296  _LOGGER.debug("Open path: %s", service_call.data)
297  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
298  service_call.data[CONF_BRIDGE]
299  ]
300  response = await coordinator.websocket_client.open_path(
301  OpenPath(path=service_call.data[CONF_PATH])
302  )
303  return asdict(response)
304 
305  async def handle_power_command(service_call: ServiceCall) -> ServiceResponse:
306  """Handle the power command service call."""
307  _LOGGER.debug("Power command: %s", service_call.data)
308  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
309  service_call.data[CONF_BRIDGE]
310  ]
311  response = await getattr(
312  coordinator.websocket_client,
313  POWER_COMMAND_MAP[service_call.data[CONF_COMMAND]],
314  )()
315  return asdict(response)
316 
317  async def handle_open_url(service_call: ServiceCall) -> ServiceResponse:
318  """Handle the open url service call."""
319  _LOGGER.debug("Open URL: %s", service_call.data)
320  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
321  service_call.data[CONF_BRIDGE]
322  ]
323  response = await coordinator.websocket_client.open_url(
324  OpenUrl(url=service_call.data[CONF_URL])
325  )
326  return asdict(response)
327 
328  async def handle_send_keypress(service_call: ServiceCall) -> ServiceResponse:
329  """Handle the send_keypress service call."""
330  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
331  service_call.data[CONF_BRIDGE]
332  ]
333  response = await coordinator.websocket_client.keyboard_keypress(
334  KeyboardKey(key=service_call.data[CONF_KEY])
335  )
336  return asdict(response)
337 
338  async def handle_send_text(service_call: ServiceCall) -> ServiceResponse:
339  """Handle the send_keypress service call."""
340  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
341  service_call.data[CONF_BRIDGE]
342  ]
343  response = await coordinator.websocket_client.keyboard_text(
344  KeyboardText(text=service_call.data[CONF_TEXT])
345  )
346  return asdict(response)
347 
348  hass.services.async_register(
349  DOMAIN,
350  SERVICE_GET_PROCESS_BY_ID,
351  handle_get_process_by_id,
352  schema=vol.Schema(
353  {
354  vol.Required(CONF_BRIDGE): valid_device,
355  vol.Required(CONF_ID): cv.positive_int,
356  },
357  ),
358  supports_response=SupportsResponse.ONLY,
359  )
360 
361  hass.services.async_register(
362  DOMAIN,
363  SERVICE_GET_PROCESSES_BY_NAME,
364  handle_get_processes_by_name,
365  schema=vol.Schema(
366  {
367  vol.Required(CONF_BRIDGE): valid_device,
368  vol.Required(CONF_NAME): cv.string,
369  },
370  ),
371  supports_response=SupportsResponse.ONLY,
372  )
373 
374  hass.services.async_register(
375  DOMAIN,
376  SERVICE_OPEN_PATH,
377  handle_open_path,
378  schema=vol.Schema(
379  {
380  vol.Required(CONF_BRIDGE): valid_device,
381  vol.Required(CONF_PATH): cv.string,
382  },
383  ),
384  supports_response=SupportsResponse.ONLY,
385  )
386 
387  hass.services.async_register(
388  DOMAIN,
389  SERVICE_POWER_COMMAND,
390  handle_power_command,
391  schema=vol.Schema(
392  {
393  vol.Required(CONF_BRIDGE): valid_device,
394  vol.Required(CONF_COMMAND): vol.In(POWER_COMMAND_MAP),
395  },
396  ),
397  supports_response=SupportsResponse.ONLY,
398  )
399 
400  hass.services.async_register(
401  DOMAIN,
402  SERVICE_OPEN_URL,
403  handle_open_url,
404  schema=vol.Schema(
405  {
406  vol.Required(CONF_BRIDGE): valid_device,
407  vol.Required(CONF_URL): cv.string,
408  },
409  ),
410  supports_response=SupportsResponse.ONLY,
411  )
412 
413  hass.services.async_register(
414  DOMAIN,
415  SERVICE_SEND_KEYPRESS,
416  handle_send_keypress,
417  schema=vol.Schema(
418  {
419  vol.Required(CONF_BRIDGE): valid_device,
420  vol.Required(CONF_KEY): cv.string,
421  },
422  ),
423  supports_response=SupportsResponse.ONLY,
424  )
425 
426  hass.services.async_register(
427  DOMAIN,
428  SERVICE_SEND_TEXT,
429  handle_send_text,
430  schema=vol.Schema(
431  {
432  vol.Required(CONF_BRIDGE): valid_device,
433  vol.Required(CONF_TEXT): cv.string,
434  },
435  ),
436  supports_response=SupportsResponse.ONLY,
437  )
438 
439  # Reload entry when its updated.
440  entry.async_on_unload(entry.add_update_listener(async_reload_entry))
441 
442  return True
443 
444 
445 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
446  """Unload a config entry."""
447  unload_ok = await hass.config_entries.async_unload_platforms(
448  entry, [platform for platform in PLATFORMS if platform != Platform.NOTIFY]
449  )
450  if unload_ok:
451  coordinator: SystemBridgeDataUpdateCoordinator = hass.data[DOMAIN][
452  entry.entry_id
453  ]
454 
455  # Ensure disconnected and cleanup stop sub
456  await coordinator.websocket_client.close()
457  if coordinator.unsub:
458  coordinator.unsub()
459 
460  del hass.data[DOMAIN][entry.entry_id]
461 
462  if not hass.data[DOMAIN]:
463  hass.services.async_remove(DOMAIN, SERVICE_OPEN_PATH)
464  hass.services.async_remove(DOMAIN, SERVICE_OPEN_URL)
465  hass.services.async_remove(DOMAIN, SERVICE_SEND_KEYPRESS)
466  hass.services.async_remove(DOMAIN, SERVICE_SEND_TEXT)
467 
468  return unload_ok
469 
470 
471 async def async_reload_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
472  """Reload the config entry when it changed."""
473  await hass.config_entries.async_reload(entry.entry_id)
474 
475 
476 async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry) -> bool:
477  """Migrate old entry."""
478  _LOGGER.debug(
479  "Migrating from version %s.%s",
480  config_entry.version,
481  config_entry.minor_version,
482  )
483 
484  if config_entry.version > SystemBridgeConfigFlow.VERSION:
485  return False
486 
487  if config_entry.minor_version < 2:
488  # Migrate to CONF_TOKEN, which was added in 1.2
489  new_data = dict(config_entry.data)
490  new_data.setdefault(CONF_TOKEN, config_entry.data.get(CONF_API_KEY))
491 
492  hass.config_entries.async_update_entry(
493  config_entry,
494  data=new_data,
495  minor_version=2,
496  )
497 
498  _LOGGER.debug(
499  "Migration to version %s.%s successful",
500  config_entry.version,
501  config_entry.minor_version,
502  )
503 
504  return True
None async_create_issue(HomeAssistant hass, str entry_id)
Definition: repairs.py:69
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:96
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:445
None async_reload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:471
bool async_migrate_entry(HomeAssistant hass, ConfigEntry config_entry)
Definition: __init__.py:476
aiohttp.ClientSession async_get_clientsession(HomeAssistant hass, bool verify_ssl=True, socket.AddressFamily family=socket.AF_UNSPEC, ssl_util.SSLCipherList ssl_cipher=ssl_util.SSLCipherList.PYTHON_DEFAULT)