1 """Assist satellite entity."""
3 from abc
import abstractmethod
5 from collections.abc
import AsyncIterable
7 from dataclasses
import dataclass
8 from enum
import StrEnum
11 from typing
import Any, Final, Literal, final
22 async_pipeline_from_audio_stream,
27 generate_media_source_id
as tts_generate_media_source_id,
33 from .const
import AssistSatelliteEntityFeature
34 from .errors
import AssistSatelliteError, SatelliteBusyError
36 _CONVERSATION_TIMEOUT_SEC: Final = 5 * 60
38 _LOGGER = logging.getLogger(__name__)
42 """Valid states of an Assist satellite entity."""
45 """Device is waiting for user input, such as a wake word or a button press."""
47 LISTENING =
"listening"
48 """Device is streaming audio with the voice command to Home Assistant."""
50 PROCESSING =
"processing"
51 """Home Assistant is processing the voice command."""
53 RESPONDING =
"responding"
54 """Device is speaking the response."""
58 """A class that describes Assist satellite entities."""
61 @dataclass(frozen=True)
63 """Available wake word model."""
66 """Unique id for wake word model."""
69 """Wake word phrase."""
71 trained_languages: list[str]
72 """List of languages that the wake word was trained on."""
77 """Satellite configuration."""
79 available_wake_words: list[AssistSatelliteWakeWord]
80 """List of available available wake word models."""
82 active_wake_words: list[str]
83 """List of active wake word ids."""
85 max_active_wake_words: int
86 """Maximum number of simultaneous wake words allowed (0 for no limit)."""
91 """Announcement to be made."""
94 """Message to be spoken."""
97 """Media ID to be played."""
99 media_id_source: Literal[
"url",
"media_id",
"tts"]
103 """Entity encapsulating the state and functionality of an Assist satellite."""
105 entity_description: AssistSatelliteEntityDescription
106 _attr_should_poll =
False
108 _attr_pipeline_entity_id: str |
None =
None
109 _attr_vad_sensitivity_entity_id: str |
None =
None
111 _conversation_id: str |
None =
None
112 _conversation_id_time: float |
None =
None
114 _run_has_tts: bool =
False
115 _is_announcing =
False
116 _wake_word_intercept_future: asyncio.Future[str |
None] |
None =
None
117 _attr_tts_options: dict[str, Any] |
None =
None
118 _pipeline_task: asyncio.Task |
None =
None
120 __assist_satellite_state = AssistSatelliteState.IDLE
125 """Return state of the entity."""
130 """Entity ID of the pipeline to use for the next conversation."""
131 return self._attr_pipeline_entity_id
135 """Entity ID of the VAD sensitivity to use for the next conversation."""
136 return self._attr_vad_sensitivity_entity_id
140 """Options passed for text-to-speech."""
141 return self._attr_tts_options
146 """Get the current satellite configuration."""
150 self, config: AssistSatelliteConfiguration
152 """Set the current satellite configuration."""
155 """Intercept the next wake word from the satellite.
157 Returns the detected wake word phrase or None.
166 _LOGGER.debug(
"Next wake word will be intercepted: %s", self.entity_id)
175 message: str |
None =
None,
176 media_id: str |
None =
None,
178 """Play and show an announcement on the satellite.
180 If media_id is not provided, message is synthesized to
181 audio with the selected pipeline.
183 If media_id is provided, it is played directly. It is possible
184 to omit the message and the satellite will not show any text.
186 Calls async_announce with message and media id.
190 media_id_source: Literal[
"url",
"media_id",
"tts"] |
None =
None
196 media_id_source =
"tts"
201 tts_options: dict[str, Any] = {}
202 if pipeline.tts_voice
is not None:
203 tts_options[tts.ATTR_VOICE] = pipeline.tts_voice
208 media_id = tts_generate_media_source_id(
211 engine=pipeline.tts_engine,
212 language=pipeline.tts_language,
216 if media_source.is_media_source_id(media_id):
217 if not media_id_source:
218 media_id_source =
"media_id"
219 media = await media_source.async_resolve_media(
226 if not media_id_source:
227 media_id_source =
"url"
233 raise SatelliteBusyError
236 self.
_set_state_set_state(AssistSatelliteState.RESPONDING)
245 self.
_set_state_set_state(AssistSatelliteState.IDLE)
247 async
def async_announce(self, announcement: AssistSatelliteAnnouncement) ->
None:
248 """Announce media on the satellite.
250 Should block until the announcement is done playing.
252 raise NotImplementedError
256 audio_stream: AsyncIterable[bytes],
257 start_stage: PipelineStage = PipelineStage.STT,
258 end_stage: PipelineStage = PipelineStage.TTS,
259 wake_word_phrase: str |
None =
None,
261 """Triggers an Assist pipeline in Home Assistant from a satellite."""
265 PipelineStage.WAKE_WORD,
268 if start_stage == PipelineStage.WAKE_WORD:
271 "Only on-device wake words currently supported"
278 "Intercepted wake word: %s (entity_id=%s)",
283 if wake_word_phrase
is None:
292 device_id = self.registry_entry.device_id
if self.registry_entry
else None
296 (self._context
is None)
297 or (self._context_set
is None)
298 or ((time.time() - self._context_set) > entity.CONTEXT_RECENT_TIME_SECONDS)
300 self.async_set_context(
Context())
302 assert self._context
is not None
314 assert self.platform.config_entry
is not None
315 self.
_pipeline_task_pipeline_task = self.platform.config_entry.async_create_background_task(
319 context=self._context,
323 format=stt.AudioFormats.WAV,
324 codec=stt.AudioCodecs.PCM,
325 bit_rate=stt.AudioBitRates.BITRATE_16,
326 sample_rate=stt.AudioSampleRates.SAMPLERATE_16000,
327 channel=stt.AudioChannels.CHANNEL_MONO,
329 stt_stream=audio_stream,
334 wake_word_phrase=wake_word_phrase,
338 start_stage=start_stage,
341 f
"{self.entity_id}_pipeline",
350 """Cancel the current pipeline if it's running."""
353 with contextlib.suppress(asyncio.CancelledError):
360 """Handle pipeline events."""
364 """Set state based on pipeline stage."""
365 if event.type
is PipelineEventType.WAKE_WORD_START:
366 self.
_set_state_set_state(AssistSatelliteState.IDLE)
367 elif event.type
is PipelineEventType.STT_START:
368 self.
_set_state_set_state(AssistSatelliteState.LISTENING)
369 elif event.type
is PipelineEventType.INTENT_START:
370 self.
_set_state_set_state(AssistSatelliteState.PROCESSING)
371 elif event.type
is PipelineEventType.INTENT_END:
372 assert event.data
is not None
375 self.
_conversation_id_conversation_id = event.data[
"intent_output"][
"conversation_id"]
376 elif event.type
is PipelineEventType.TTS_START:
379 self.
_set_state_set_state(AssistSatelliteState.RESPONDING)
380 elif event.type
is PipelineEventType.RUN_END:
382 self.
_set_state_set_state(AssistSatelliteState.IDLE)
388 """Set the entity's state."""
390 self.async_write_ha_state()
394 """Tell entity that the text-to-speech response has finished playing."""
395 self.
_set_state_set_state(AssistSatelliteState.IDLE)
399 """Resolve pipeline from select entity to id.
401 Return None to make async_get_pipeline look up the preferred pipeline.
406 if (pipeline_entity_state := self.hass.states.get(pipeline_entity_id))
is None:
407 raise RuntimeError(
"Pipeline entity not found")
409 if pipeline_entity_state.state != OPTION_PREFERRED:
412 if pipeline.name == pipeline_entity_state.state:
419 """Resolve VAD sensitivity from select entity to enum."""
420 vad_sensitivity = vad.VadSensitivity.DEFAULT
424 vad_sensitivity_state := self.hass.states.get(vad_sensitivity_entity_id)
426 raise RuntimeError(
"VAD sensitivity entity not found")
428 vad_sensitivity = vad.VadSensitivity(vad_sensitivity_state.state)
430 return vad.VadSensitivity.to_seconds(vad_sensitivity)
None async_announce(self, AssistSatelliteAnnouncement announcement)
None on_pipeline_event(self, PipelineEvent event)
None _set_state(self, AssistSatelliteState state)
None async_internal_announce(self, str|None message=None, str|None media_id=None)
None _cancel_running_pipeline(self)
None async_set_configuration(self, AssistSatelliteConfiguration config)
float _resolve_vad_sensitivity(self)
None async_accept_pipeline_from_satellite(self, AsyncIterable[bytes] audio_stream, PipelineStage start_stage=PipelineStage.STT, PipelineStage end_stage=PipelineStage.TTS, str|None wake_word_phrase=None)
None _internal_on_pipeline_event(self, PipelineEvent event)
None tts_response_finished(self)
AssistSatelliteConfiguration async_get_configuration(self)
str|None _resolve_pipeline(self)
str|None async_intercept_wake_word(self)
str|None pipeline_entity_id(self)
dict[str, Any]|None tts_options(self)
str|None vad_sensitivity_entity_id(self)
_wake_word_intercept_future
Pipeline async_get_pipeline(HomeAssistant hass, str|None pipeline_id=None)
list[Pipeline] async_get_pipelines(HomeAssistant hass)
None async_pipeline_from_audio_stream(HomeAssistant hass, *Context context, PipelineEventCallback event_callback, stt.SpeechMetadata stt_metadata, AsyncIterable[bytes] stt_stream, str|None wake_word_phrase=None, str|None pipeline_id=None, str|None conversation_id=None, str|dict[str, Any]|None tts_audio_output=None, WakeWordSettings|None wake_word_settings=None, AudioSettings|None audio_settings=None, str|None device_id=None, PipelineStage start_stage=PipelineStage.STT, PipelineStage end_stage=PipelineStage.TTS)