Home Assistant Unofficial Reference 2024.12.1
entity.py
Go to the documentation of this file.
1 """Assist satellite entity."""
2 
3 from abc import abstractmethod
4 import asyncio
5 from collections.abc import AsyncIterable
6 import contextlib
7 from dataclasses import dataclass
8 from enum import StrEnum
9 import logging
10 import time
11 from typing import Any, Final, Literal, final
12 
13 from homeassistant.components import media_source, stt, tts
15  OPTION_PREFERRED,
16  AudioSettings,
17  PipelineEvent,
18  PipelineEventType,
19  PipelineStage,
20  async_get_pipeline,
21  async_get_pipelines,
22  async_pipeline_from_audio_stream,
23  vad,
24 )
25 from homeassistant.components.media_player import async_process_play_media_url
26 from homeassistant.components.tts import (
27  generate_media_source_id as tts_generate_media_source_id,
28 )
29 from homeassistant.core import Context, callback
30 from homeassistant.helpers import entity
31 from homeassistant.helpers.entity import EntityDescription
32 
33 from .const import AssistSatelliteEntityFeature
34 from .errors import AssistSatelliteError, SatelliteBusyError
35 
36 _CONVERSATION_TIMEOUT_SEC: Final = 5 * 60 # 5 minutes
37 
38 _LOGGER = logging.getLogger(__name__)
39 
40 
41 class AssistSatelliteState(StrEnum):
42  """Valid states of an Assist satellite entity."""
43 
44  IDLE = "idle"
45  """Device is waiting for user input, such as a wake word or a button press."""
46 
47  LISTENING = "listening"
48  """Device is streaming audio with the voice command to Home Assistant."""
49 
50  PROCESSING = "processing"
51  """Home Assistant is processing the voice command."""
52 
53  RESPONDING = "responding"
54  """Device is speaking the response."""
55 
56 
58  """A class that describes Assist satellite entities."""
59 
60 
61 @dataclass(frozen=True)
63  """Available wake word model."""
64 
65  id: str
66  """Unique id for wake word model."""
67 
68  wake_word: str
69  """Wake word phrase."""
70 
71  trained_languages: list[str]
72  """List of languages that the wake word was trained on."""
73 
74 
75 @dataclass
77  """Satellite configuration."""
78 
79  available_wake_words: list[AssistSatelliteWakeWord]
80  """List of available available wake word models."""
81 
82  active_wake_words: list[str]
83  """List of active wake word ids."""
84 
85  max_active_wake_words: int
86  """Maximum number of simultaneous wake words allowed (0 for no limit)."""
87 
88 
89 @dataclass
91  """Announcement to be made."""
92 
93  message: str
94  """Message to be spoken."""
95 
96  media_id: str
97  """Media ID to be played."""
98 
99  media_id_source: Literal["url", "media_id", "tts"]
100 
101 
102 class AssistSatelliteEntity(entity.Entity):
103  """Entity encapsulating the state and functionality of an Assist satellite."""
104 
105  entity_description: AssistSatelliteEntityDescription
106  _attr_should_poll = False
107  _attr_supported_features = AssistSatelliteEntityFeature(0)
108  _attr_pipeline_entity_id: str | None = None
109  _attr_vad_sensitivity_entity_id: str | None = None
110 
111  _conversation_id: str | None = None
112  _conversation_id_time: float | None = None
113 
114  _run_has_tts: bool = False
115  _is_announcing = False
116  _wake_word_intercept_future: asyncio.Future[str | None] | None = None
117  _attr_tts_options: dict[str, Any] | None = None
118  _pipeline_task: asyncio.Task | None = None
119 
120  __assist_satellite_state = AssistSatelliteState.IDLE
121 
122  @final
123  @property
124  def state(self) -> str | None:
125  """Return state of the entity."""
126  return self.__assist_satellite_state__assist_satellite_state
127 
128  @property
129  def pipeline_entity_id(self) -> str | None:
130  """Entity ID of the pipeline to use for the next conversation."""
131  return self._attr_pipeline_entity_id
132 
133  @property
134  def vad_sensitivity_entity_id(self) -> str | None:
135  """Entity ID of the VAD sensitivity to use for the next conversation."""
136  return self._attr_vad_sensitivity_entity_id
137 
138  @property
139  def tts_options(self) -> dict[str, Any] | None:
140  """Options passed for text-to-speech."""
141  return self._attr_tts_options
142 
143  @callback
144  @abstractmethod
145  def async_get_configuration(self) -> AssistSatelliteConfiguration:
146  """Get the current satellite configuration."""
147 
148  @abstractmethod
150  self, config: AssistSatelliteConfiguration
151  ) -> None:
152  """Set the current satellite configuration."""
153 
154  async def async_intercept_wake_word(self) -> str | None:
155  """Intercept the next wake word from the satellite.
156 
157  Returns the detected wake word phrase or None.
158  """
159  if self._wake_word_intercept_future_wake_word_intercept_future is not None:
160  raise SatelliteBusyError("Wake word interception already in progress")
161 
162  # Will cause next wake word to be intercepted in
163  # async_accept_pipeline_from_satellite
164  self._wake_word_intercept_future_wake_word_intercept_future = asyncio.Future()
165 
166  _LOGGER.debug("Next wake word will be intercepted: %s", self.entity_id)
167 
168  try:
169  return await self._wake_word_intercept_future_wake_word_intercept_future
170  finally:
171  self._wake_word_intercept_future_wake_word_intercept_future = None
172 
174  self,
175  message: str | None = None,
176  media_id: str | None = None,
177  ) -> None:
178  """Play and show an announcement on the satellite.
179 
180  If media_id is not provided, message is synthesized to
181  audio with the selected pipeline.
182 
183  If media_id is provided, it is played directly. It is possible
184  to omit the message and the satellite will not show any text.
185 
186  Calls async_announce with message and media id.
187  """
188  await self._cancel_running_pipeline_cancel_running_pipeline()
189 
190  media_id_source: Literal["url", "media_id", "tts"] | None = None
191 
192  if message is None:
193  message = ""
194 
195  if not media_id:
196  media_id_source = "tts"
197  # Synthesize audio and get URL
198  pipeline_id = self._resolve_pipeline_resolve_pipeline()
199  pipeline = async_get_pipeline(self.hass, pipeline_id)
200 
201  tts_options: dict[str, Any] = {}
202  if pipeline.tts_voice is not None:
203  tts_options[tts.ATTR_VOICE] = pipeline.tts_voice
204 
205  if self.tts_optionstts_options is not None:
206  tts_options.update(self.tts_optionstts_options)
207 
208  media_id = tts_generate_media_source_id(
209  self.hass,
210  message,
211  engine=pipeline.tts_engine,
212  language=pipeline.tts_language,
213  options=tts_options,
214  )
215 
216  if media_source.is_media_source_id(media_id):
217  if not media_id_source:
218  media_id_source = "media_id"
219  media = await media_source.async_resolve_media(
220  self.hass,
221  media_id,
222  None,
223  )
224  media_id = media.url
225 
226  if not media_id_source:
227  media_id_source = "url"
228 
229  # Resolve to full URL
230  media_id = async_process_play_media_url(self.hass, media_id)
231 
232  if self._is_announcing_is_announcing_is_announcing:
233  raise SatelliteBusyError
234 
235  self._is_announcing_is_announcing_is_announcing = True
236  self._set_state_set_state(AssistSatelliteState.RESPONDING)
237 
238  try:
239  # Block until announcement is finished
240  await self.async_announceasync_announce(
241  AssistSatelliteAnnouncement(message, media_id, media_id_source)
242  )
243  finally:
244  self._is_announcing_is_announcing_is_announcing = False
245  self._set_state_set_state(AssistSatelliteState.IDLE)
246 
247  async def async_announce(self, announcement: AssistSatelliteAnnouncement) -> None:
248  """Announce media on the satellite.
249 
250  Should block until the announcement is done playing.
251  """
252  raise NotImplementedError
253 
255  self,
256  audio_stream: AsyncIterable[bytes],
257  start_stage: PipelineStage = PipelineStage.STT,
258  end_stage: PipelineStage = PipelineStage.TTS,
259  wake_word_phrase: str | None = None,
260  ) -> None:
261  """Triggers an Assist pipeline in Home Assistant from a satellite."""
262  await self._cancel_running_pipeline_cancel_running_pipeline()
263 
264  if self._wake_word_intercept_future_wake_word_intercept_future and start_stage in (
265  PipelineStage.WAKE_WORD,
266  PipelineStage.STT,
267  ):
268  if start_stage == PipelineStage.WAKE_WORD:
269  self._wake_word_intercept_future_wake_word_intercept_future.set_exception(
271  "Only on-device wake words currently supported"
272  )
273  )
274  return
275 
276  # Intercepting wake word and immediately end pipeline
277  _LOGGER.debug(
278  "Intercepted wake word: %s (entity_id=%s)",
279  wake_word_phrase,
280  self.entity_id,
281  )
282 
283  if wake_word_phrase is None:
284  self._wake_word_intercept_future_wake_word_intercept_future.set_exception(
285  AssistSatelliteError("No wake word phrase provided")
286  )
287  else:
288  self._wake_word_intercept_future_wake_word_intercept_future.set_result(wake_word_phrase)
289  self._internal_on_pipeline_event_internal_on_pipeline_event(PipelineEvent(PipelineEventType.RUN_END))
290  return
291 
292  device_id = self.registry_entry.device_id if self.registry_entry else None
293 
294  # Refresh context if necessary
295  if (
296  (self._context is None)
297  or (self._context_set is None)
298  or ((time.time() - self._context_set) > entity.CONTEXT_RECENT_TIME_SECONDS)
299  ):
300  self.async_set_context(Context())
301 
302  assert self._context is not None
303 
304  # Reset conversation id if necessary
305  if self._conversation_id_time_conversation_id_time and (
306  (time.monotonic() - self._conversation_id_time_conversation_id_time) > _CONVERSATION_TIMEOUT_SEC
307  ):
308  self._conversation_id_conversation_id = None
309  self._conversation_id_time_conversation_id_time = None
310 
311  # Set entity state based on pipeline events
312  self._run_has_tts_run_has_tts = False
313 
314  assert self.platform.config_entry is not None
315  self._pipeline_task_pipeline_task = self.platform.config_entry.async_create_background_task(
316  self.hass,
318  self.hass,
319  context=self._context,
320  event_callback=self._internal_on_pipeline_event_internal_on_pipeline_event,
321  stt_metadata=stt.SpeechMetadata(
322  language="", # set in async_pipeline_from_audio_stream
323  format=stt.AudioFormats.WAV,
324  codec=stt.AudioCodecs.PCM,
325  bit_rate=stt.AudioBitRates.BITRATE_16,
326  sample_rate=stt.AudioSampleRates.SAMPLERATE_16000,
327  channel=stt.AudioChannels.CHANNEL_MONO,
328  ),
329  stt_stream=audio_stream,
330  pipeline_id=self._resolve_pipeline_resolve_pipeline(),
331  conversation_id=self._conversation_id_conversation_id,
332  device_id=device_id,
333  tts_audio_output=self.tts_optionstts_options,
334  wake_word_phrase=wake_word_phrase,
335  audio_settings=AudioSettings(
336  silence_seconds=self._resolve_vad_sensitivity_resolve_vad_sensitivity()
337  ),
338  start_stage=start_stage,
339  end_stage=end_stage,
340  ),
341  f"{self.entity_id}_pipeline",
342  )
343 
344  try:
345  await self._pipeline_task_pipeline_task
346  finally:
347  self._pipeline_task_pipeline_task = None
348 
349  async def _cancel_running_pipeline(self) -> None:
350  """Cancel the current pipeline if it's running."""
351  if self._pipeline_task_pipeline_task is not None:
352  self._pipeline_task_pipeline_task.cancel()
353  with contextlib.suppress(asyncio.CancelledError):
354  await self._pipeline_task_pipeline_task
355 
356  self._pipeline_task_pipeline_task = None
357 
358  @abstractmethod
359  def on_pipeline_event(self, event: PipelineEvent) -> None:
360  """Handle pipeline events."""
361 
362  @callback
363  def _internal_on_pipeline_event(self, event: PipelineEvent) -> None:
364  """Set state based on pipeline stage."""
365  if event.type is PipelineEventType.WAKE_WORD_START:
366  self._set_state_set_state(AssistSatelliteState.IDLE)
367  elif event.type is PipelineEventType.STT_START:
368  self._set_state_set_state(AssistSatelliteState.LISTENING)
369  elif event.type is PipelineEventType.INTENT_START:
370  self._set_state_set_state(AssistSatelliteState.PROCESSING)
371  elif event.type is PipelineEventType.INTENT_END:
372  assert event.data is not None
373  # Update timeout
374  self._conversation_id_time_conversation_id_time = time.monotonic()
375  self._conversation_id_conversation_id = event.data["intent_output"]["conversation_id"]
376  elif event.type is PipelineEventType.TTS_START:
377  # Wait until tts_response_finished is called to return to waiting state
378  self._run_has_tts_run_has_tts = True
379  self._set_state_set_state(AssistSatelliteState.RESPONDING)
380  elif event.type is PipelineEventType.RUN_END:
381  if not self._run_has_tts_run_has_tts:
382  self._set_state_set_state(AssistSatelliteState.IDLE)
383 
384  self.on_pipeline_eventon_pipeline_event(event)
385 
386  @callback
387  def _set_state(self, state: AssistSatelliteState) -> None:
388  """Set the entity's state."""
389  self.__assist_satellite_state__assist_satellite_state = state
390  self.async_write_ha_state()
391 
392  @callback
393  def tts_response_finished(self) -> None:
394  """Tell entity that the text-to-speech response has finished playing."""
395  self._set_state_set_state(AssistSatelliteState.IDLE)
396 
397  @callback
398  def _resolve_pipeline(self) -> str | None:
399  """Resolve pipeline from select entity to id.
400 
401  Return None to make async_get_pipeline look up the preferred pipeline.
402  """
403  if not (pipeline_entity_id := self.pipeline_entity_idpipeline_entity_id):
404  return None
405 
406  if (pipeline_entity_state := self.hass.states.get(pipeline_entity_id)) is None:
407  raise RuntimeError("Pipeline entity not found")
408 
409  if pipeline_entity_state.state != OPTION_PREFERRED:
410  # Resolve pipeline by name
411  for pipeline in async_get_pipelines(self.hass):
412  if pipeline.name == pipeline_entity_state.state:
413  return pipeline.id
414 
415  return None
416 
417  @callback
418  def _resolve_vad_sensitivity(self) -> float:
419  """Resolve VAD sensitivity from select entity to enum."""
420  vad_sensitivity = vad.VadSensitivity.DEFAULT
421 
422  if vad_sensitivity_entity_id := self.vad_sensitivity_entity_idvad_sensitivity_entity_id:
423  if (
424  vad_sensitivity_state := self.hass.states.get(vad_sensitivity_entity_id)
425  ) is None:
426  raise RuntimeError("VAD sensitivity entity not found")
427 
428  vad_sensitivity = vad.VadSensitivity(vad_sensitivity_state.state)
429 
430  return vad.VadSensitivity.to_seconds(vad_sensitivity)
None async_announce(self, AssistSatelliteAnnouncement announcement)
Definition: entity.py:247
None async_internal_announce(self, str|None message=None, str|None media_id=None)
Definition: entity.py:177
None async_set_configuration(self, AssistSatelliteConfiguration config)
Definition: entity.py:151
None async_accept_pipeline_from_satellite(self, AsyncIterable[bytes] audio_stream, PipelineStage start_stage=PipelineStage.STT, PipelineStage end_stage=PipelineStage.TTS, str|None wake_word_phrase=None)
Definition: entity.py:260
Pipeline async_get_pipeline(HomeAssistant hass, str|None pipeline_id=None)
Definition: pipeline.py:282
list[Pipeline] async_get_pipelines(HomeAssistant hass)
Definition: pipeline.py:305
None async_pipeline_from_audio_stream(HomeAssistant hass, *Context context, PipelineEventCallback event_callback, stt.SpeechMetadata stt_metadata, AsyncIterable[bytes] stt_stream, str|None wake_word_phrase=None, str|None pipeline_id=None, str|None conversation_id=None, str|dict[str, Any]|None tts_audio_output=None, WakeWordSettings|None wake_word_settings=None, AudioSettings|None audio_settings=None, str|None device_id=None, PipelineStage start_stage=PipelineStage.STT, PipelineStage end_stage=PipelineStage.TTS)
Definition: __init__.py:111
str async_process_play_media_url(HomeAssistant hass, str media_content_id, *bool allow_relative_url=False, bool for_supervisor_network=False)
Definition: browse_media.py:36