1 """Classes for voice assistant pipelines."""
3 from __future__
import annotations
7 from collections
import defaultdict, deque
8 from collections.abc
import AsyncGenerator, AsyncIterable, Callable
9 from dataclasses
import asdict, dataclass, field
10 from enum
import StrEnum
12 from pathlib
import Path
13 from queue
import Empty, Queue
14 from threading
import Thread
16 from typing
import Any, Literal, cast
19 import voluptuous
as vol
30 generate_media_source_id
as tts_generate_media_source_id,
39 SerializedStorageCollection,
41 StorageCollectionWebsocket,
48 language
as language_util,
53 from .audio_enhancer
import AudioEnhancer, EnhancedAudioChunk, MicroVadSpeexEnhancer
56 CONF_DEBUG_RECORDING_DIR,
69 DuplicateWakeUpDetectedError,
70 IntentRecognitionError,
75 WakeWordDetectionAborted,
76 WakeWordDetectionError,
79 from .vad
import AudioBuffer, VoiceActivityTimeout, VoiceCommandSegmenter, chunk_samples
81 _LOGGER = logging.getLogger(__name__)
83 STORAGE_KEY = f
"{DOMAIN}.pipelines"
85 STORAGE_VERSION_MINOR = 2
87 ENGINE_LANGUAGE_PAIRS = (
88 (
"stt_engine",
"stt_language"),
89 (
"tts_engine",
"tts_language"),
94 """Validate language settings."""
95 for engine, language
in ENGINE_LANGUAGE_PAIRS:
96 if data[engine]
is not None and data[language]
is None:
97 raise vol.Invalid(f
"Need language {language} for {engine} {data[engine]}")
101 PIPELINE_FIELDS: VolDictType = {
102 vol.Required(
"conversation_engine"): str,
103 vol.Required(
"conversation_language"): str,
104 vol.Required(
"language"): str,
105 vol.Required(
"name"): str,
106 vol.Required(
"stt_engine"): vol.Any(str,
None),
107 vol.Required(
"stt_language"): vol.Any(str,
None),
108 vol.Required(
"tts_engine"): vol.Any(str,
None),
109 vol.Required(
"tts_language"): vol.Any(str,
None),
110 vol.Required(
"tts_voice"): vol.Any(str,
None),
111 vol.Required(
"wake_word_entity"): vol.Any(str,
None),
112 vol.Required(
"wake_word_id"): vol.Any(str,
None),
113 vol.Optional(
"prefer_local_intents"): bool,
116 STORED_PIPELINE_RUNS = 10
125 conversation_engine_id: str |
None =
None,
126 stt_engine_id: str |
None =
None,
127 tts_engine_id: str |
None =
None,
129 ) -> dict[str, str |
None]:
130 """Resolve settings for a default pipeline.
132 The default pipeline will use the homeassistant conversation agent and the
133 default stt / tts engines if none are specified.
135 conversation_language =
"en"
136 pipeline_language =
"en"
142 wake_word_entity =
None
145 if conversation_engine_id
is None:
146 conversation_engine_id = conversation.HOME_ASSISTANT_AGENT
149 conversation_languages = language_util.matches(
150 hass.config.language,
151 conversation.async_get_conversation_languages(hass, conversation_engine_id),
152 country=hass.config.country,
154 if conversation_languages:
155 pipeline_language = hass.config.language
156 conversation_language = conversation_languages[0]
158 if stt_engine_id
is None:
159 stt_engine_id = stt.async_default_engine(hass)
161 if stt_engine_id
is not None:
162 stt_engine = stt.async_get_speech_to_text_engine(hass, stt_engine_id)
163 if stt_engine
is None:
167 stt_languages = language_util.matches(
169 stt_engine.supported_languages,
170 country=hass.config.country,
173 stt_language = stt_languages[0]
176 "Speech-to-text engine '%s' does not support language '%s'",
182 if tts_engine_id
is None:
183 tts_engine_id = tts.async_default_engine(hass)
185 if tts_engine_id
is not None:
186 tts_engine = tts.get_engine_instance(hass, tts_engine_id)
187 if tts_engine
is None:
191 tts_languages = language_util.matches(
193 tts_engine.supported_languages,
194 country=hass.config.country,
197 tts_language = tts_languages[0]
198 tts_voices = tts_engine.async_get_supported_voices(tts_language)
200 tts_voice = tts_voices[0].voice_id
203 "Text-to-speech engine '%s' does not support language '%s'",
210 "conversation_engine": conversation_engine_id,
211 "conversation_language": conversation_language,
212 "language": hass.config.language,
213 "name": pipeline_name,
214 "stt_engine": stt_engine_id,
215 "stt_language": stt_language,
216 "tts_engine": tts_engine_id,
217 "tts_language": tts_language,
218 "tts_voice": tts_voice,
219 "wake_word_entity": wake_word_entity,
220 "wake_word_id": wake_word_id,
225 hass: HomeAssistant, pipeline_store: PipelineStorageCollection
227 """Create a default pipeline.
229 The default pipeline will use the homeassistant conversation agent and the
230 default stt / tts engines.
233 hass, pipeline_name=
"Home Assistant"
235 return await pipeline_store.async_create_item(pipeline_settings)
243 ) -> Pipeline |
None:
244 """Create a pipeline with default settings.
246 The default pipeline will use the homeassistant conversation agent and the
247 specified stt / tts engines.
249 pipeline_data: PipelineData = hass.data[DOMAIN]
250 pipeline_store = pipeline_data.pipeline_store
253 stt_engine_id=stt_engine_id,
254 tts_engine_id=tts_engine_id,
255 pipeline_name=pipeline_name,
258 pipeline_settings[
"stt_engine"] != stt_engine_id
259 or pipeline_settings[
"tts_engine"] != tts_engine_id
262 return await pipeline_store.async_create_item(pipeline_settings)
267 hass: HomeAssistant, entity_id: str
269 """Get a pipeline by conversation entity ID."""
270 entity = hass.states.get(entity_id)
273 pipeline_name=entity.name
if entity
else entity_id,
274 conversation_engine_id=entity_id,
276 settings[
"id"] = entity_id
278 return Pipeline.from_json(settings)
283 """Get a pipeline by id or the preferred pipeline."""
284 pipeline_data: PipelineData = hass.data[DOMAIN]
286 if pipeline_id
is None:
288 pipeline_id = pipeline_data.pipeline_store.async_get_preferred_item()
290 if pipeline_id.startswith(
"conversation."):
293 pipeline = pipeline_data.pipeline_store.data.get(pipeline_id)
298 "pipeline_not_found", f
"Pipeline {pipeline_id} not found"
306 """Get all pipelines."""
307 pipeline_data: PipelineData = hass.data[DOMAIN]
309 return list(pipeline_data.pipeline_store.data.values())
316 conversation_engine: str | UndefinedType = UNDEFINED,
317 conversation_language: str | UndefinedType = UNDEFINED,
318 language: str | UndefinedType = UNDEFINED,
319 name: str | UndefinedType = UNDEFINED,
320 stt_engine: str |
None | UndefinedType = UNDEFINED,
321 stt_language: str |
None | UndefinedType = UNDEFINED,
322 tts_engine: str |
None | UndefinedType = UNDEFINED,
323 tts_language: str |
None | UndefinedType = UNDEFINED,
324 tts_voice: str |
None | UndefinedType = UNDEFINED,
325 wake_word_entity: str |
None | UndefinedType = UNDEFINED,
326 wake_word_id: str |
None | UndefinedType = UNDEFINED,
327 prefer_local_intents: bool | UndefinedType = UNDEFINED,
329 """Update a pipeline."""
330 pipeline_data: PipelineData = hass.data[DOMAIN]
332 updates: dict[str, Any] = pipeline.to_json()
340 (
"conversation_engine", conversation_engine),
341 (
"conversation_language", conversation_language),
342 (
"language", language),
344 (
"stt_engine", stt_engine),
345 (
"stt_language", stt_language),
346 (
"tts_engine", tts_engine),
347 (
"tts_language", tts_language),
348 (
"tts_voice", tts_voice),
349 (
"wake_word_entity", wake_word_entity),
350 (
"wake_word_id", wake_word_id),
351 (
"prefer_local_intents", prefer_local_intents),
353 if val
is not UNDEFINED
357 await pipeline_data.pipeline_store.async_update_item(pipeline.id, updates)
361 """Event types emitted during a pipeline run."""
363 RUN_START =
"run-start"
365 WAKE_WORD_START =
"wake_word-start"
366 WAKE_WORD_END =
"wake_word-end"
367 STT_START =
"stt-start"
368 STT_VAD_START =
"stt-vad-start"
369 STT_VAD_END =
"stt-vad-end"
371 INTENT_START =
"intent-start"
372 INTENT_END =
"intent-end"
373 TTS_START =
"tts-start"
378 @dataclass(frozen=True)
380 """Events emitted during a pipeline run."""
382 type: PipelineEventType
383 data: dict[str, Any] |
None =
None
384 timestamp: str = field(default_factory=
lambda: dt_util.utcnow().isoformat())
387 type PipelineEventCallback = Callable[[PipelineEvent],
None]
390 @dataclass(frozen=True)
392 """A voice assistant pipeline."""
394 conversation_engine: str
395 conversation_language: str
398 stt_engine: str |
None
399 stt_language: str |
None
400 tts_engine: str |
None
401 tts_language: str |
None
402 tts_voice: str |
None
403 wake_word_entity: str |
None
404 wake_word_id: str |
None
405 prefer_local_intents: bool =
False
407 id: str = field(default_factory=ulid_util.ulid_now)
411 """Create an instance from a JSON serialization.
413 This function was added in HA Core 2023.10, previous versions will raise
414 if there are unexpected items in the serialized data.
417 conversation_engine=data[
"conversation_engine"],
418 conversation_language=data[
"conversation_language"],
420 language=data[
"language"],
422 stt_engine=data[
"stt_engine"],
423 stt_language=data[
"stt_language"],
424 tts_engine=data[
"tts_engine"],
425 tts_language=data[
"tts_language"],
426 tts_voice=data[
"tts_voice"],
427 wake_word_entity=data[
"wake_word_entity"],
428 wake_word_id=data[
"wake_word_id"],
429 prefer_local_intents=data.get(
"prefer_local_intents",
False),
433 """Return a JSON serializable representation for storage."""
435 "conversation_engine": self.conversation_engine,
436 "conversation_language": self.conversation_language,
438 "language": self.language,
440 "stt_engine": self.stt_engine,
441 "stt_language": self.stt_language,
442 "tts_engine": self.tts_engine,
443 "tts_language": self.tts_language,
444 "tts_voice": self.tts_voice,
445 "wake_word_entity": self.wake_word_entity,
446 "wake_word_id": self.wake_word_id,
447 "prefer_local_intents": self.prefer_local_intents,
452 """Stages of a pipeline."""
454 WAKE_WORD =
"wake_word"
461 PIPELINE_STAGE_ORDER = [
462 PipelineStage.WAKE_WORD,
464 PipelineStage.INTENT,
470 """Error when a pipeline run is not valid."""
473 class InvalidPipelineStagesError(PipelineRunValidationError):
474 """Error when given an invalid combination of start/end stages."""
478 start_stage: PipelineStage,
479 end_stage: PipelineStage,
481 """Set error message."""
483 f
"Invalid stage combination: start={start_stage}, end={end_stage}"
487 @dataclass(frozen=True)
489 """Settings for wake word detection."""
491 timeout: float |
None =
None
492 """Seconds of silence before detection times out."""
494 audio_seconds_to_buffer: float = 0
495 """Seconds of audio to buffer before detection and forward to STT."""
498 @dataclass(frozen=True)
500 """Settings for pipeline audio processing."""
502 noise_suppression_level: int = 0
503 """Level of noise suppression (0 = disabled, 4 = max)"""
505 auto_gain_dbfs: int = 0
506 """Amount of automatic gain in dbFS (0 = disabled, 31 = max)"""
508 volume_multiplier: float = 1.0
509 """Multiplier used directly on PCM samples (1.0 = no change, 2.0 = twice as loud)"""
511 is_vad_enabled: bool =
True
512 """True if VAD is used to determine the end of the voice command."""
514 silence_seconds: float = 0.7
515 """Seconds of silence after voice command has ended."""
518 """Verify settings post-initialization."""
519 if (self.noise_suppression_level < 0)
or (self.noise_suppression_level > 4):
520 raise ValueError(
"noise_suppression_level must be in [0, 4]")
522 if (self.auto_gain_dbfs < 0)
or (self.auto_gain_dbfs > 31):
523 raise ValueError(
"auto_gain_dbfs must be in [0, 31]")
527 """True if an audio processor is needed."""
530 or (self.noise_suppression_level > 0)
531 or (self.auto_gain_dbfs > 0)
537 """Running context for a pipeline."""
542 start_stage: PipelineStage
543 end_stage: PipelineStage
544 event_callback: PipelineEventCallback
546 runner_data: Any |
None =
None
547 intent_agent: str |
None =
None
548 tts_audio_output: str | dict[str, Any] |
None =
None
549 wake_word_settings: WakeWordSettings |
None =
None
550 audio_settings: AudioSettings = field(default_factory=AudioSettings)
552 id: str = field(default_factory=ulid_util.ulid_now)
554 tts_engine: str = field(init=
False, repr=
False)
555 tts_options: dict |
None = field(init=
False, default=
None)
556 wake_word_entity_id: str |
None = field(init=
False, default=
None, repr=
False)
559 abort_wake_word_detection: bool = field(init=
False, default=
False)
561 debug_recording_thread: Thread |
None =
None
562 """Thread that records audio to debug_recording_dir"""
564 debug_recording_queue: Queue[str | bytes |
None] |
None =
None
565 """Queue to communicate with debug recording thread"""
567 audio_enhancer: AudioEnhancer |
None =
None
568 """VAD/noise suppression/auto gain"""
570 audio_chunking_buffer: AudioBuffer = field(
571 default_factory=
lambda:
AudioBuffer(BYTES_PER_CHUNK)
573 """Buffer used when splitting audio into chunks for audio processing"""
575 _device_id: str |
None =
None
576 """Optional device id set during run start."""
579 """Set language for pipeline."""
580 self.
languagelanguage = self.pipeline.language
or self.hass.config.language
583 if PIPELINE_STAGE_ORDER.index(self.end_stage) < PIPELINE_STAGE_ORDER.index(
588 pipeline_data: PipelineData = self.hass.data[DOMAIN]
589 if self.pipeline.id
not in pipeline_data.pipeline_debug:
591 size_limit=STORED_PIPELINE_RUNS
594 pipeline_data.pipeline_runs.add_run(self)
597 if self.audio_settings.needs_processor
and (self.
audio_enhanceraudio_enhancer
is None):
600 self.audio_settings.auto_gain_dbfs,
601 self.audio_settings.noise_suppression_level,
602 self.audio_settings.is_vad_enabled,
606 """Compare pipeline runs by id."""
607 if isinstance(other, PipelineRun):
608 return self.
idid == other.id
614 """Log an event and call listener."""
615 self.event_callback(event)
616 pipeline_data: PipelineData = self.hass.data[DOMAIN]
617 if self.
idid
not in pipeline_data.pipeline_debug[self.pipeline.id]:
620 pipeline_data.pipeline_debug[self.pipeline.id][self.
idid].events.append(event)
622 def start(self, device_id: str |
None) ->
None:
623 """Emit run start event."""
628 "pipeline": self.pipeline.id,
631 if self.runner_data
is not None:
632 data[
"runner_data"] = self.runner_data
636 async
def end(self) -> None:
637 """Emit run end event."""
647 PipelineEventType.RUN_END,
651 pipeline_data: PipelineData = self.hass.data[DOMAIN]
652 pipeline_data.pipeline_runs.remove_run(self)
655 """Prepare wake-word-detection."""
656 entity_id = self.pipeline.wake_word_entity
or wake_word.async_default_entity(
659 if entity_id
is None:
661 code=
"wake-engine-missing",
662 message=
"No wake word engine",
665 wake_word_entity = wake_word.async_get_wake_word_detection_entity(
668 if wake_word_entity
is None:
670 code=
"wake-provider-missing",
671 message=f
"No wake-word-detection provider for: {entity_id}",
679 stream: AsyncIterable[EnhancedAudioChunk],
680 audio_chunks_for_stt: list[EnhancedAudioChunk],
682 """Run wake-word-detection portion of pipeline. Returns detection result."""
683 metadata_dict = asdict(
686 format=stt.AudioFormats.WAV,
687 codec=stt.AudioCodecs.PCM,
688 bit_rate=stt.AudioBitRates.BITRATE_16,
689 sample_rate=stt.AudioSampleRates.SAMPLERATE_16000,
690 channel=stt.AudioChannels.CHANNEL_MONO,
697 metadata_dict.pop(
"language",
None)
701 PipelineEventType.WAKE_WORD_START,
704 "metadata": metadata_dict,
705 "timeout": wake_word_settings.timeout
or 0,
713 wake_word_vad: VoiceActivityTimeout |
None =
None
714 if (wake_word_settings.timeout
is not None)
and (
715 wake_word_settings.timeout > 0
722 num_audio_chunks_to_buffer =
int(
723 (wake_word_settings.audio_seconds_to_buffer * SAMPLE_RATE)
727 stt_audio_buffer: deque[EnhancedAudioChunk] |
None =
None
728 if num_audio_chunks_to_buffer > 0:
729 stt_audio_buffer = deque(maxlen=num_audio_chunks_to_buffer)
733 result = await self.
wake_word_entitywake_word_entity.async_process_audio_stream(
736 stt_audio_buffer=stt_audio_buffer,
737 wake_word_vad=wake_word_vad,
739 self.pipeline.wake_word_id,
742 if stt_audio_buffer
is not None:
745 audio_chunks_for_stt.extend(stt_audio_buffer)
746 except WakeWordDetectionAborted:
748 except WakeWordTimeoutError:
749 _LOGGER.debug(
"Timeout during wake word detection")
751 except Exception
as src_error:
752 _LOGGER.exception(
"Unexpected error during wake-word-detection")
754 code=
"wake-stream-failed",
755 message=
"Unexpected error during wake-word-detection",
758 _LOGGER.debug(
"wake-word-detection result %s", result)
761 wake_word_output: dict[str, Any] = {}
764 last_wake_up = self.hass.data[DATA_LAST_WAKE_UP].
get(
765 result.wake_word_phrase
767 if last_wake_up
is not None:
768 sec_since_last_wake_up = time.monotonic() - last_wake_up
769 if sec_since_last_wake_up < WAKE_WORD_COOLDOWN:
771 "Duplicate wake word detection occurred for %s",
772 result.wake_word_phrase,
777 self.hass.data[DATA_LAST_WAKE_UP][result.wake_word_phrase] = (
781 if result.queued_audio:
788 audio_chunks_for_stt.extend(
791 timestamp_ms=chunk_ts[1],
792 speech_probability=
None,
794 for chunk_ts
in result.queued_audio
797 wake_word_output = asdict(result)
800 wake_word_output.pop(
"queued_audio",
None)
804 PipelineEventType.WAKE_WORD_END,
805 {
"wake_word_output": wake_word_output},
813 audio_stream: AsyncIterable[EnhancedAudioChunk],
814 stt_audio_buffer: deque[EnhancedAudioChunk] |
None,
815 wake_word_vad: VoiceActivityTimeout |
None,
816 sample_rate: int = SAMPLE_RATE,
817 sample_width: int = SAMPLE_WIDTH,
818 ) -> AsyncIterable[tuple[bytes, int]]:
819 """Yield audio chunks with timestamps (milliseconds since start of stream).
821 Adds audio to a ring buffer that will be forwarded to speech-to-text after
822 detection. Times out if VAD detects enough silence.
824 async
for chunk
in audio_stream:
825 if self.abort_wake_word_detection:
826 raise WakeWordDetectionAborted
829 yield chunk.audio, chunk.timestamp_ms
834 if stt_audio_buffer
is not None:
835 stt_audio_buffer.append(chunk)
837 if wake_word_vad
is not None:
838 chunk_seconds = (len(chunk.audio) // sample_width) / sample_rate
839 if not wake_word_vad.process(chunk_seconds, chunk.speech_probability):
841 code=
"wake-word-timeout", message=
"Wake word was not detected"
845 """Prepare speech-to-text."""
847 stt_provider = stt.async_get_speech_to_text_engine(
849 self.pipeline.stt_engine,
852 if stt_provider
is None:
853 engine = self.pipeline.stt_engine
855 code=
"stt-provider-missing",
856 message=f
"No speech-to-text provider for: {engine}",
859 metadata.language = self.pipeline.stt_language
or self.
languagelanguage
861 if not stt_provider.check_metadata(metadata):
863 code=
"stt-provider-unsupported-metadata",
865 f
"Provider {stt_provider.name} does not support input speech "
866 f
"to text metadata {metadata}"
874 metadata: stt.SpeechMetadata,
875 stream: AsyncIterable[EnhancedAudioChunk],
877 """Run speech-to-text portion of pipeline. Returns the spoken text."""
879 if self.end_stage >= PipelineStage.INTENT:
880 self.hass.async_create_background_task(
881 conversation.async_prepare_agent(
884 f
"prepare conversation agent {self.intent_agent}",
894 PipelineEventType.STT_START,
897 "metadata": asdict(metadata),
908 stt_vad: VoiceCommandSegmenter |
None =
None
909 if self.audio_settings.is_vad_enabled:
911 silence_seconds=self.audio_settings.silence_seconds
914 result = await self.
stt_providerstt_provider.async_process_audio_stream(
918 except (asyncio.CancelledError, TimeoutError):
920 except Exception
as src_error:
921 _LOGGER.exception(
"Unexpected error during speech-to-text")
923 code=
"stt-stream-failed",
924 message=
"Unexpected error during speech-to-text",
927 _LOGGER.debug(
"speech-to-text result %s", result)
929 if result.result != stt.SpeechResultState.SUCCESS:
931 code=
"stt-stream-failed",
932 message=
"speech-to-text failed",
937 code=
"stt-no-text-recognized", message=
"No text recognized"
942 PipelineEventType.STT_END,
955 audio_stream: AsyncIterable[EnhancedAudioChunk],
956 stt_vad: VoiceCommandSegmenter |
None,
957 sample_rate: int = SAMPLE_RATE,
958 sample_width: int = SAMPLE_WIDTH,
959 ) -> AsyncGenerator[bytes]:
960 """Yield audio chunks until VAD detects silence or speech-to-text completes."""
961 sent_vad_start =
False
962 async
for chunk
in audio_stream:
965 if stt_vad
is not None:
966 chunk_seconds = (len(chunk.audio) // sample_width) / sample_rate
967 if not stt_vad.process(chunk_seconds, chunk.speech_probability):
971 PipelineEventType.STT_VAD_END,
972 {
"timestamp": chunk.timestamp_ms},
977 if stt_vad.in_command
and (
not sent_vad_start):
981 PipelineEventType.STT_VAD_START,
982 {
"timestamp": chunk.timestamp_ms},
985 sent_vad_start =
True
990 """Prepare recognizing an intent."""
991 agent_info = conversation.async_get_agent_info(
993 self.pipeline.conversation_engine
or conversation.HOME_ASSISTANT_AGENT,
996 if agent_info
is None:
997 engine = self.pipeline.conversation_engine
or "default"
999 code=
"intent-not-supported",
1000 message=f
"Intent recognition engine {engine} is not found",
1006 self, intent_input: str, conversation_id: str |
None, device_id: str |
None
1008 """Run intent recognition portion of pipeline. Returns text to speak."""
1010 raise RuntimeError(
"Recognize intent was not prepared")
1014 PipelineEventType.INTENT_START,
1017 "language": self.pipeline.conversation_language,
1018 "intent_input": intent_input,
1019 "conversation_id": conversation_id,
1020 "device_id": device_id,
1021 "prefer_local_intents": self.pipeline.prefer_local_intents,
1029 context=self.context,
1030 conversation_id=conversation_id,
1031 device_id=device_id,
1032 language=self.pipeline.language,
1035 processed_locally = self.
intent_agentintent_agent == conversation.HOME_ASSISTANT_AGENT
1038 if user_input.agent_id != conversation.HOME_ASSISTANT_AGENT:
1041 trigger_response_text
1042 := await conversation.async_handle_sentence_triggers(
1043 self.hass, user_input
1047 trigger_response = intent.IntentResponse(
1048 self.pipeline.conversation_language
1050 trigger_response.async_set_speech(trigger_response_text)
1052 response=trigger_response,
1053 conversation_id=user_input.conversation_id,
1056 elif self.pipeline.prefer_local_intents
and (
1057 intent_response := await conversation.async_handle_intents(
1058 self.hass, user_input
1063 response=intent_response,
1064 conversation_id=user_input.conversation_id,
1066 processed_locally =
True
1068 if conversation_result
is None:
1070 conversation_result = await conversation.async_converse(
1072 text=user_input.text,
1073 conversation_id=user_input.conversation_id,
1074 device_id=user_input.device_id,
1075 context=user_input.context,
1076 language=user_input.language,
1077 agent_id=user_input.agent_id,
1079 except Exception
as src_error:
1080 _LOGGER.exception(
"Unexpected error during intent recognition")
1082 code=
"intent-failed",
1083 message=
"Unexpected error during intent recognition",
1086 _LOGGER.debug(
"conversation result %s", conversation_result)
1090 PipelineEventType.INTENT_END,
1092 "processed_locally": processed_locally,
1093 "intent_output": conversation_result.as_dict(),
1098 speech: str = conversation_result.response.speech.get(
"plain", {}).
get(
1105 """Prepare text-to-speech."""
1107 engine = cast(str, self.pipeline.tts_engine)
1109 tts_options: dict[str, Any] = {}
1110 if self.pipeline.tts_voice
is not None:
1111 tts_options[tts.ATTR_VOICE] = self.pipeline.tts_voice
1116 tts_options[tts.ATTR_PREFERRED_FORMAT] = self.
tts_audio_outputtts_audio_output
1119 tts_options[tts.ATTR_PREFERRED_SAMPLE_RATE] = SAMPLE_RATE
1120 tts_options[tts.ATTR_PREFERRED_SAMPLE_CHANNELS] = SAMPLE_CHANNELS
1121 tts_options[tts.ATTR_PREFERRED_SAMPLE_BYTES] = SAMPLE_WIDTH
1124 options_supported = await tts.async_support_options(
1127 self.pipeline.tts_language,
1130 except HomeAssistantError
as err:
1132 code=
"tts-not-supported",
1133 message=f
"Text-to-speech engine '{engine}' not found",
1135 if not options_supported:
1137 code=
"tts-not-supported",
1139 f
"Text-to-speech engine {engine} "
1140 f
"does not support language {self.pipeline.tts_language} or options {tts_options}"
1148 """Run text-to-speech portion of pipeline."""
1151 PipelineEventType.TTS_START,
1154 "language": self.pipeline.tts_language,
1155 "voice": self.pipeline.tts_voice,
1156 "tts_input": tts_input,
1163 tts_media_id = tts_generate_media_source_id(
1167 language=self.pipeline.tts_language,
1170 tts_media = await media_source.async_resolve_media(
1175 except Exception
as src_error:
1176 _LOGGER.exception(
"Unexpected error during text-to-speech")
1179 message=
"Unexpected error during text-to-speech",
1182 _LOGGER.debug(
"TTS result %s", tts_media)
1184 "media_id": tts_media_id,
1185 **asdict(tts_media),
1189 PipelineEvent(PipelineEventType.TTS_END, {
"tts_output": tts_output})
1193 """Forward audio chunk to various capturing mechanisms."""
1202 pipeline_data: PipelineData = self.hass.data[DOMAIN]
1203 audio_queue = pipeline_data.device_audio_queues.get(self.
_device_id_device_id)
1204 if audio_queue
is None:
1208 audio_queue.queue.put_nowait(audio_bytes)
1209 except asyncio.QueueFull:
1210 audio_queue.overflow =
True
1211 _LOGGER.warning(
"Audio queue full for device %s", self.
_device_id_device_id)
1214 """Start thread to record wake/stt audio if debug_recording_dir is set."""
1221 if debug_recording_dir := self.hass.data[DATA_CONFIG].
get(
1222 CONF_DEBUG_RECORDING_DIR
1226 run_recording_dir = (
1227 Path(debug_recording_dir)
1228 / self.pipeline.name
1229 /
str(time.monotonic_ns())
1233 run_recording_dir = (
1234 Path(debug_recording_dir)
1236 / self.pipeline.name
1237 /
str(time.monotonic_ns())
1242 target=_pipeline_debug_recording_thread_proc,
1249 """Stop recording thread."""
1266 self, audio_stream: AsyncIterable[bytes]
1267 ) -> AsyncGenerator[EnhancedAudioChunk]:
1268 """Apply volume transformation only (no VAD/audio enhancements) with optional chunking."""
1270 async
for chunk
in audio_stream:
1271 if self.audio_settings.volume_multiplier != 1.0:
1275 chunk, BYTES_PER_CHUNK, self.audio_chunking_buffer
1279 timestamp_ms=timestamp_ms,
1280 speech_probability=
None,
1282 timestamp_ms += MS_PER_CHUNK
1285 self, audio_stream: AsyncIterable[bytes]
1286 ) -> AsyncGenerator[EnhancedAudioChunk]:
1287 """Split audio into chunks and apply VAD/noise suppression/auto gain/volume transformation."""
1291 async
for dirty_samples
in audio_stream:
1292 if self.audio_settings.volume_multiplier != 1.0:
1295 dirty_samples, self.audio_settings.volume_multiplier
1300 dirty_samples, BYTES_PER_CHUNK, self.audio_chunking_buffer
1302 yield self.
audio_enhanceraudio_enhancer.enhance_chunk(dirty_chunk, timestamp_ms)
1303 timestamp_ms += MS_PER_CHUNK
1307 """Multiplies 16-bit PCM samples by a constant."""
1309 def _clamp(val: float) -> float:
1310 """Clamp to signed 16-bit."""
1311 return max(-32768,
min(32767, val))
1315 (
int(
_clamp(value * volume_multiplier))
for value
in array.array(
"h", chunk)),
1320 run_recording_dir: Path,
1321 queue: Queue[str | bytes |
None],
1322 message_timeout: float = 5,
1324 wav_writer: wave.Wave_write |
None =
None
1327 _LOGGER.debug(
"Saving wake/stt audio to %s", run_recording_dir)
1328 run_recording_dir.mkdir(parents=
True, exist_ok=
True)
1331 message = queue.get(timeout=message_timeout)
1336 if isinstance(message, str):
1338 if wav_writer
is not None:
1341 wav_path = run_recording_dir / f
"{message}.wav"
1342 wav_writer = wave.open(
str(wav_path),
"wb")
1343 wav_writer.setframerate(SAMPLE_RATE)
1344 wav_writer.setsampwidth(SAMPLE_WIDTH)
1345 wav_writer.setnchannels(SAMPLE_CHANNELS)
1346 elif isinstance(message, bytes):
1348 if wav_writer
is not None:
1349 wav_writer.writeframes(message)
1353 _LOGGER.exception(
"Unexpected error in debug recording thread")
1355 if wav_writer
is not None:
1361 """Input to a pipeline run."""
1366 """Metadata of stt input audio. Required when start_stage = stt."""
1368 stt_stream: AsyncIterable[bytes] |
None =
None
1369 """Input audio for stt. Required when start_stage = stt."""
1371 wake_word_phrase: str |
None =
None
1372 """Optional key used to de-duplicate wake-ups for local wake word detection."""
1374 intent_input: str |
None =
None
1375 """Input for conversation agent. Required when start_stage = intent."""
1377 tts_input: str |
None =
None
1378 """Input for text-to-speech. Required when start_stage = tts."""
1380 conversation_id: str |
None =
None
1382 device_id: str |
None =
None
1386 self.run.start(device_id=self.device_id)
1387 current_stage: PipelineStage |
None = self.run.start_stage
1388 stt_audio_buffer: list[EnhancedAudioChunk] = []
1389 stt_processed_stream: AsyncIterable[EnhancedAudioChunk] |
None =
None
1391 if self.stt_stream
is not None:
1392 if self.run.audio_settings.needs_processor:
1394 stt_processed_stream = self.run.process_enhance_audio(self.stt_stream)
1397 stt_processed_stream = self.run.process_volume_only(self.stt_stream)
1400 if current_stage == PipelineStage.WAKE_WORD:
1402 assert stt_processed_stream
is not None
1403 detect_result = await self.run.wake_word_detection(
1404 stt_processed_stream, stt_audio_buffer
1406 if detect_result
is None:
1410 current_stage = PipelineStage.STT
1413 intent_input = self.intent_input
1414 if current_stage == PipelineStage.STT:
1415 assert self.stt_metadata
is not None
1416 assert stt_processed_stream
is not None
1418 if self.wake_word_phrase
is not None:
1420 last_wake_up = self.run.hass.data[DATA_LAST_WAKE_UP].
get(
1421 self.wake_word_phrase
1423 if last_wake_up
is not None:
1424 sec_since_last_wake_up = time.monotonic() - last_wake_up
1425 if sec_since_last_wake_up < WAKE_WORD_COOLDOWN:
1427 "Speech-to-text cancelled to avoid duplicate wake-up for %s",
1428 self.wake_word_phrase,
1433 self.run.hass.data[DATA_LAST_WAKE_UP][self.wake_word_phrase] = (
1437 stt_input_stream = stt_processed_stream
1439 if stt_audio_buffer:
1442 async
def buffer_then_audio_stream() -> (
1443 AsyncGenerator[EnhancedAudioChunk]
1446 for chunk
in stt_audio_buffer:
1450 assert stt_processed_stream
is not None
1451 async
for chunk
in stt_processed_stream:
1454 stt_input_stream = buffer_then_audio_stream()
1456 intent_input = await self.run.speech_to_text(
1460 current_stage = PipelineStage.INTENT
1462 if self.run.end_stage != PipelineStage.STT:
1463 tts_input = self.tts_input
1465 if current_stage == PipelineStage.INTENT:
1467 assert intent_input
is not None
1468 tts_input = await self.run.recognize_intent(
1470 self.conversation_id,
1473 if tts_input.strip():
1474 current_stage = PipelineStage.TTS
1477 current_stage = PipelineStage.END
1479 if self.run.end_stage != PipelineStage.INTENT:
1481 if current_stage == PipelineStage.TTS:
1482 assert tts_input
is not None
1483 await self.run.text_to_speech(tts_input)
1485 except PipelineError
as err:
1486 self.run.process_event(
1488 PipelineEventType.ERROR,
1489 {
"code": err.code,
"message": err.message},
1495 await self.run.end()
1498 """Validate pipeline input against start stage."""
1499 if self.run.start_stage
in (PipelineStage.WAKE_WORD, PipelineStage.STT):
1500 if self.run.pipeline.stt_engine
is None:
1502 "the pipeline does not support speech-to-text"
1504 if self.stt_metadata
is None:
1506 "stt_metadata is required for speech-to-text"
1508 if self.stt_stream
is None:
1510 "stt_stream is required for speech-to-text"
1512 elif self.run.start_stage == PipelineStage.INTENT:
1513 if self.intent_input
is None:
1515 "intent_input is required for intent recognition"
1517 elif self.run.start_stage == PipelineStage.TTS:
1518 if self.tts_input
is None:
1520 "tts_input is required for text-to-speech"
1522 if self.run.end_stage == PipelineStage.TTS:
1523 if self.run.pipeline.tts_engine
is None:
1525 "the pipeline does not support text-to-speech"
1528 start_stage_index = PIPELINE_STAGE_ORDER.index(self.run.start_stage)
1529 end_stage_index = PIPELINE_STAGE_ORDER.index(self.run.end_stage)
1535 <= PIPELINE_STAGE_ORDER.index(PipelineStage.WAKE_WORD)
1538 prepare_tasks.append(self.run.prepare_wake_word_detection())
1542 <= PIPELINE_STAGE_ORDER.index(PipelineStage.STT)
1546 prepare_tasks.append(self.run.prepare_speech_to_text(self.stt_metadata))
1550 <= PIPELINE_STAGE_ORDER.index(PipelineStage.INTENT)
1553 prepare_tasks.append(self.run.prepare_recognize_intent())
1557 <= PIPELINE_STAGE_ORDER.index(PipelineStage.TTS)
1560 prepare_tasks.append(self.run.prepare_text_to_speech())
1563 await asyncio.gather(*prepare_tasks)
1567 """Raised when attempting to delete the preferred pipelen."""
1570 """Initialize pipeline preferred error."""
1571 super().
__init__(f
"Item {item_id} preferred.")
1576 """Serialized pipeline storage collection."""
1582 StorageCollection[Pipeline, SerializedPipelineStorageCollection]
1584 """Pipeline storage collection."""
1586 _preferred_item: str
1589 """Load the data."""
1600 """Validate the config is valid."""
1602 return validated_data
1606 """Suggest an ID based on the config."""
1607 return ulid_util.ulid_now()
1609 async
def _update_data(self, item: Pipeline, update_data: dict) -> Pipeline:
1610 """Return a new updated item."""
1612 return Pipeline(id=item.id, **update_data)
1615 """Create an item from validated config."""
1616 return Pipeline(id=item_id, **data)
1619 """Create an item from its serialized representation."""
1620 return Pipeline.from_json(data)
1623 """Return the serialized representation of an item for storing."""
1624 return item.to_json()
1634 """Get the id of the preferred item."""
1639 """Set the preferred pipeline."""
1640 if item_id
not in self.data:
1643 self._async_schedule_save()
1647 """Return JSON-compatible date for storing to file."""
1650 "items": base_data[
"items"],
1656 StorageCollectionWebsocket[PipelineStorageCollection]
1658 """Class to expose storage collection management over websocket."""
1662 """Set up the websocket commands."""
1665 websocket_api.async_register_command(
1667 f
"{self.api_prefix}/get",
1669 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
1671 vol.Required(
"type"): f
"{self.api_prefix}/get",
1672 vol.Optional(self.item_id_key): str,
1677 websocket_api.async_register_command(
1679 f
"{self.api_prefix}/set_preferred",
1680 websocket_api.require_admin(
1683 websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
1685 vol.Required(
"type"): f
"{self.api_prefix}/set_preferred",
1686 vol.Required(self.item_id_key): str,
1694 """Delete an item."""
1697 except PipelinePreferred
as exc:
1698 connection.send_error(msg[
"id"], websocket_api.ERR_NOT_ALLOWED,
str(exc))
1705 item_id = msg.get(self.item_id_key)
1709 if item_id.startswith(
"conversation.")
and hass.states.get(item_id):
1710 connection.send_result(
1715 if item_id
not in self.storage_collection.data:
1716 connection.send_error(
1718 websocket_api.ERR_NOT_FOUND,
1719 f
"Unable to find {self.item_id_key} {item_id}",
1723 connection.send_result(msg[
"id"], self.storage_collection.data[item_id])
1730 connection.send_result(
1740 hass: HomeAssistant,
1742 msg: dict[str, Any],
1744 """Set the preferred item."""
1747 except ItemNotFound:
1748 connection.send_error(
1749 msg[
"id"], websocket_api.ERR_NOT_FOUND,
"unknown item"
1752 connection.send_result(msg[
"id"])
1756 """Class managing pipelineruns."""
1758 def __init__(self, pipeline_store: PipelineStorageCollection) ->
None:
1760 self._pipeline_runs: dict[str, dict[str, PipelineRun]] = defaultdict(dict)
1764 def add_run(self, pipeline_run: PipelineRun) ->
None:
1765 """Add pipeline run."""
1766 pipeline_id = pipeline_run.pipeline.id
1767 self._pipeline_runs[pipeline_id][pipeline_run.id] = pipeline_run
1770 """Remove pipeline run."""
1771 pipeline_id = pipeline_run.pipeline.id
1772 self._pipeline_runs[pipeline_id].pop(pipeline_run.id)
1775 self, change_type: str, item_id: str, change: dict
1777 """Handle pipeline store changes."""
1778 if change_type != CHANGE_UPDATED:
1780 if pipeline_runs := self._pipeline_runs.
get(item_id):
1782 for pipeline_run
in list(pipeline_runs.values()):
1783 pipeline_run.abort_wake_word_detection =
True
1786 @dataclass(slots=True)
1788 """Audio capture queue for a satellite device."""
1790 queue: asyncio.Queue[bytes |
None]
1791 """Queue of audio chunks (None = stop signal)"""
1793 id: str = field(default_factory=ulid_util.ulid_now)
1794 """Unique id to ensure the correct audio queue is cleaned up in websocket API."""
1796 overflow: bool =
False
1797 """Flag to be set if audio samples were dropped because the queue was full."""
1800 @dataclass(slots=True)
1802 """Assist device."""
1805 unique_id_prefix: str
1809 """Store and debug data stored in hass.data."""
1811 def __init__(self, pipeline_store: PipelineStorageCollection) ->
None:
1814 self.pipeline_debug: dict[str, LimitedSizeDict[str, PipelineRunDebug]] = {}
1815 self.pipeline_devices: dict[str, AssistDevice] = {}
1817 self.device_audio_queues: dict[str, DeviceAudioQueue] = {}
1820 @dataclass(slots=True)
1822 """Debug data for a pipelinerun."""
1824 events: list[PipelineEvent] = field(default_factory=list, init=
False)
1825 timestamp: str = field(
1826 default_factory=
lambda: dt_util.utcnow().isoformat(),
1832 """Store entity registry data."""
1836 old_major_version: int,
1837 old_minor_version: int,
1838 old_data: SerializedPipelineStorageCollection,
1839 ) -> SerializedPipelineStorageCollection:
1840 """Migrate to the new version."""
1841 if old_major_version == 1
and old_minor_version < 2:
1843 for pipeline
in old_data[
"items"]:
1845 pipeline.setdefault(
"wake_word_entity",
None)
1846 pipeline.setdefault(
"wake_word_id",
None)
1848 if old_major_version > 1:
1849 raise NotImplementedError
1855 """Set up the pipeline storage collection."""
1858 hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
1861 await pipeline_store.async_load()
1864 f
"{DOMAIN}/pipeline",
1874 hass: HomeAssistant,
1875 engine_type: Literal[
"conversation",
"stt",
"tts",
"wake_word"],
1879 """Register a migration of an engine used in pipelines."""
1880 hass.data.setdefault(DATA_MIGRATIONS, {})[engine_type] = (old_value, new_value)
1883 if DATA_CONFIG
in hass.data:
1884 hass.async_create_background_task(
1890 """Run pipeline migrations."""
1891 if not (migrations := hass.data.get(DATA_MIGRATIONS)):
1895 "conversation":
"conversation_engine",
1896 "stt":
"stt_engine",
1897 "tts":
"tts_engine",
1898 "wake_word":
"wake_word_entity",
1905 for engine_type, (old_value, new_value)
in migrations.items():
1906 if getattr(pipeline, engine_attr[engine_type]) == old_value:
1907 attr_updates[engine_attr[engine_type]] = new_value
1910 updates.append((pipeline, attr_updates))
1912 for pipeline, attr_updates
in updates:
bool needs_processor(self)
None __init__(self, PipelineStage start_stage, PipelineStage end_stage)
None __init__(self, PipelineStorageCollection pipeline_store)
None __init__(self, str item_id)
None process_event(self, PipelineEvent event)
AsyncGenerator[EnhancedAudioChunk] process_volume_only(self, AsyncIterable[bytes] audio_stream)
str speech_to_text(self, stt.SpeechMetadata metadata, AsyncIterable[EnhancedAudioChunk] stream)
str recognize_intent(self, str intent_input, str|None conversation_id, str|None device_id)
AsyncGenerator[EnhancedAudioChunk] process_enhance_audio(self, AsyncIterable[bytes] audio_stream)
AsyncIterable[tuple[bytes, int]] _wake_word_audio_stream(self, AsyncIterable[EnhancedAudioChunk] audio_stream, deque[EnhancedAudioChunk]|None stt_audio_buffer, VoiceActivityTimeout|None wake_word_vad, int sample_rate=SAMPLE_RATE, int sample_width=SAMPLE_WIDTH)
None prepare_speech_to_text(self, stt.SpeechMetadata metadata)
None text_to_speech(self, str tts_input)
None prepare_wake_word_detection(self)
None prepare_text_to_speech(self)
None start(self, str|None device_id)
None _stop_debug_recording_thread(self)
None _start_debug_recording_thread(self)
wake_word.DetectionResult|None wake_word_detection(self, AsyncIterable[EnhancedAudioChunk] stream, list[EnhancedAudioChunk] audio_chunks_for_stt)
AsyncGenerator[bytes] _speech_to_text_stream(self, AsyncIterable[EnhancedAudioChunk] audio_stream, VoiceCommandSegmenter|None stt_vad, int sample_rate=SAMPLE_RATE, int sample_width=SAMPLE_WIDTH)
None _capture_chunk(self, bytes|None audio_bytes)
bool __eq__(self, object other)
None prepare_recognize_intent(self)
None __init__(self, PipelineStorageCollection pipeline_store)
None remove_run(self, PipelineRun pipeline_run)
None _change_listener(self, str change_type, str item_id, dict change)
None add_run(self, PipelineRun pipeline_run)
None ws_delete_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_list_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None ws_get_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
None async_setup(self, HomeAssistant hass)
None ws_set_preferred_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
str async_get_preferred_item(self)
SerializedPipelineStorageCollection|None _async_load_data(self)
dict _process_create_data(self, dict data)
dict _serialize_item(self, str item_id, Pipeline item)
Pipeline _update_data(self, Pipeline item, dict update_data)
Pipeline _deserialize_item(self, dict data)
None async_set_preferred_item(self, str item_id)
SerializedPipelineStorageCollection _data_to_save(self)
None async_delete_item(self, str item_id)
str _get_suggested_id(self, dict info)
Pipeline _create_item(self, str item_id, dict data)
SerializedPipelineStorageCollection _async_migrate_func(self, int old_major_version, int old_minor_version, SerializedPipelineStorageCollection old_data)
dict[str, Any] to_json(self)
Pipeline from_json(cls, dict[str, Any] data)
Any validate_language(dict[str, Any] data)
bytes _multiply_volume(bytes chunk, float volume_multiplier)
dict[str, str|None] _async_resolve_default_pipeline_settings(HomeAssistant hass, *str|None conversation_engine_id=None, str|None stt_engine_id=None, str|None tts_engine_id=None, str pipeline_name)
Pipeline _async_create_default_pipeline(HomeAssistant hass, PipelineStorageCollection pipeline_store)
Pipeline|None async_create_default_pipeline(HomeAssistant hass, str stt_engine_id, str tts_engine_id, str pipeline_name)
None _pipeline_debug_recording_thread_proc(Path run_recording_dir, Queue[str|bytes|None] queue, float message_timeout=5)
None async_migrate_engine(HomeAssistant hass, Literal["conversation", "stt", "tts", "wake_word"] engine_type, str old_value, str new_value)
Pipeline async_get_pipeline(HomeAssistant hass, str|None pipeline_id=None)
None async_update_pipeline(HomeAssistant hass, Pipeline pipeline, *str|UndefinedType conversation_engine=UNDEFINED, str|UndefinedType conversation_language=UNDEFINED, str|UndefinedType language=UNDEFINED, str|UndefinedType name=UNDEFINED, str|None|UndefinedType stt_engine=UNDEFINED, str|None|UndefinedType stt_language=UNDEFINED, str|None|UndefinedType tts_engine=UNDEFINED, str|None|UndefinedType tts_language=UNDEFINED, str|None|UndefinedType tts_voice=UNDEFINED, str|None|UndefinedType wake_word_entity=UNDEFINED, str|None|UndefinedType wake_word_id=UNDEFINED, bool|UndefinedType prefer_local_intents=UNDEFINED)
PipelineData async_setup_pipeline_store(HomeAssistant hass)
Pipeline _async_get_pipeline_from_conversation_entity(HomeAssistant hass, str entity_id)
None async_run_migrations(HomeAssistant hass)
list[Pipeline] async_get_pipelines(HomeAssistant hass)
Iterable[bytes] chunk_samples(bytes samples, int bytes_per_chunk, AudioBuffer leftover_chunk_buffer)
bool async_setup(HomeAssistant hass, ConfigType config)
web.Response get(self, web.Request request, str config_key)
dict[str, Any] validate(SchemaCommonFlowHandler handler, dict[str, Any] user_input)
SerializedStorageCollection _base_data_to_save(self)
float _clamp(float color_component, float minimum=0, float maximum=255)