Home Assistant Unofficial Reference 2024.12.1
websocket_api.py
Go to the documentation of this file.
1 """Assist satellite Websocket API."""
2 
3 import asyncio
4 from dataclasses import asdict, replace
5 from typing import Any
6 
7 import voluptuous as vol
8 
9 from homeassistant.components import websocket_api
10 from homeassistant.core import HomeAssistant, callback
11 from homeassistant.exceptions import HomeAssistantError
12 from homeassistant.helpers import config_validation as cv
13 from homeassistant.helpers.entity_component import EntityComponent
14 from homeassistant.util import uuid as uuid_util
15 
16 from .connection_test import CONNECTION_TEST_URL_BASE
17 from .const import (
18  CONNECTION_TEST_DATA,
19  DATA_COMPONENT,
20  DOMAIN,
21  AssistSatelliteEntityFeature,
22 )
23 from .entity import AssistSatelliteEntity
24 
25 CONNECTION_TEST_TIMEOUT = 30
26 
27 
28 @callback
29 def async_register_websocket_api(hass: HomeAssistant) -> None:
30  """Register the websocket API."""
31  websocket_api.async_register_command(hass, websocket_intercept_wake_word)
32  websocket_api.async_register_command(hass, websocket_get_configuration)
33  websocket_api.async_register_command(hass, websocket_set_wake_words)
34  websocket_api.async_register_command(hass, websocket_test_connection)
35 
36 
37 @websocket_api.websocket_command( { vol.Required("type"): "assist_satellite/intercept_wake_word",
38  vol.Required("entity_id"): cv.entity_domain(DOMAIN),
39  }
40 )
41 @websocket_api.require_admin
42 @websocket_api.async_response
44  hass: HomeAssistant,
46  msg: dict[str, Any],
47 ) -> None:
48  """Intercept the next wake word from a satellite."""
49  satellite = hass.data[DATA_COMPONENT].get_entity(msg["entity_id"])
50  if satellite is None:
51  connection.send_error(
52  msg["id"], websocket_api.ERR_NOT_FOUND, "Entity not found"
53  )
54  return
55 
56  async def intercept_wake_word() -> None:
57  """Push an intercepted wake word to websocket."""
58  try:
59  wake_word_phrase = await satellite.async_intercept_wake_word()
60  connection.send_message(
61  websocket_api.event_message(
62  msg["id"],
63  {"wake_word_phrase": wake_word_phrase},
64  )
65  )
66  except HomeAssistantError as err:
67  connection.send_error(msg["id"], "home_assistant_error", str(err))
68 
69  task = hass.async_create_task(intercept_wake_word(), "intercept_wake_word")
70  connection.subscriptions[msg["id"]] = task.cancel
71  connection.send_message(websocket_api.result_message(msg["id"]))
72 
73 
74 @callback
75 @websocket_api.websocket_command( { vol.Required("type"): "assist_satellite/get_configuration",
76  vol.Required("entity_id"): cv.entity_domain(DOMAIN),
77  }
78 )
80  hass: HomeAssistant,
82  msg: dict[str, Any],
83 ) -> None:
84  """Get the current satellite configuration."""
85  satellite = hass.data[DATA_COMPONENT].get_entity(msg["entity_id"])
86  if satellite is None:
87  connection.send_error(
88  msg["id"], websocket_api.ERR_NOT_FOUND, "Entity not found"
89  )
90  return
91 
92  config_dict = asdict(satellite.async_get_configuration())
93  config_dict["pipeline_entity_id"] = satellite.pipeline_entity_id
94  config_dict["vad_entity_id"] = satellite.vad_sensitivity_entity_id
95 
96  connection.send_result(msg["id"], config_dict)
97 
98 
99 @websocket_api.websocket_command( { vol.Required("type"): "assist_satellite/set_wake_words",
100  vol.Required("entity_id"): cv.entity_domain(DOMAIN),
101  vol.Required("wake_word_ids"): [str],
102  }
103 )
104 @websocket_api.require_admin
105 @websocket_api.async_response
106 async def websocket_set_wake_words(
107  hass: HomeAssistant,
109  msg: dict[str, Any],
110 ) -> None:
111  """Set the active wake words for the satellite."""
112  satellite = hass.data[DATA_COMPONENT].get_entity(msg["entity_id"])
113  if satellite is None:
114  connection.send_error(
115  msg["id"], websocket_api.ERR_NOT_FOUND, "Entity not found"
116  )
117  return
118 
119  config = satellite.async_get_configuration()
120 
121  # Don't set too many active wake words
122  actual_ids = msg["wake_word_ids"]
123  if len(actual_ids) > config.max_active_wake_words:
124  connection.send_error(
125  msg["id"],
126  websocket_api.ERR_NOT_SUPPORTED,
127  f"Maximum number of active wake words is {config.max_active_wake_words}",
128  )
129  return
130 
131  # Verify all ids are available
132  available_ids = {ww.id for ww in config.available_wake_words}
133  for ww_id in actual_ids:
134  if ww_id not in available_ids:
135  connection.send_error(
136  msg["id"],
137  websocket_api.ERR_NOT_SUPPORTED,
138  f"Wake word id is not supported: {ww_id}",
139  )
140  return
141 
142  await satellite.async_set_configuration(
143  replace(config, active_wake_words=actual_ids)
144  )
145  connection.send_result(msg["id"])
146 
147 
148 @websocket_api.websocket_command( { vol.Required("type"): "assist_satellite/test_connection",
149  vol.Required("entity_id"): cv.entity_domain(DOMAIN),
150  }
151 )
152 @websocket_api.async_response
153 async def websocket_test_connection(
154  hass: HomeAssistant,
156  msg: dict[str, Any],
157 ) -> None:
158  """Test the connection between the device and Home Assistant.
159 
160  Send an announcement to the device with a special media id.
161  """
162  component: EntityComponent[AssistSatelliteEntity] = hass.data[DOMAIN]
163  satellite = component.get_entity(msg["entity_id"])
164  if satellite is None:
165  connection.send_error(
166  msg["id"], websocket_api.ERR_NOT_FOUND, "Entity not found"
167  )
168  return
169  if not (satellite.supported_features or 0) & AssistSatelliteEntityFeature.ANNOUNCE:
170  connection.send_error(
171  msg["id"],
172  websocket_api.ERR_NOT_SUPPORTED,
173  "Entity does not support announce",
174  )
175  return
176 
177  # Announce and wait for event
178  connection_test_data = hass.data[CONNECTION_TEST_DATA]
179  connection_id = uuid_util.random_uuid_hex()
180  connection_test_event = asyncio.Event()
181  connection_test_data[connection_id] = connection_test_event
182 
183  hass.async_create_background_task(
184  satellite.async_internal_announce(
185  media_id=f"{CONNECTION_TEST_URL_BASE}/{connection_id}"
186  ),
187  f"assist_satellite_connection_test_{msg['entity_id']}",
188  )
189 
190  try:
191  async with asyncio.timeout(CONNECTION_TEST_TIMEOUT):
192  await connection_test_event.wait()
193  connection.send_result(msg["id"], {"status": "success"})
194  except TimeoutError:
195  connection.send_result(msg["id"], {"status": "timeout"})
196  finally:
197  connection_test_data.pop(connection_id, None)
198 
None websocket_set_wake_words(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict[str, Any] msg)
None websocket_get_configuration(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict[str, Any] msg)
None websocket_intercept_wake_word(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict[str, Any] msg)
None websocket_test_connection(HomeAssistant hass, websocket_api.connection.ActiveConnection connection, dict[str, Any] msg)
CalendarEntity get_entity(HomeAssistant hass, str entity_id)
Definition: trigger.py:96