Home Assistant Unofficial Reference 2024.12.1
pipeline.py
Go to the documentation of this file.
1 """Classes for voice assistant pipelines."""
2 
3 from __future__ import annotations
4 
5 import array
6 import asyncio
7 from collections import defaultdict, deque
8 from collections.abc import AsyncGenerator, AsyncIterable, Callable
9 from dataclasses import asdict, dataclass, field
10 from enum import StrEnum
11 import logging
12 from pathlib import Path
13 from queue import Empty, Queue
14 from threading import Thread
15 import time
16 from typing import Any, Literal, cast
17 import wave
18 
19 import voluptuous as vol
20 
21 from homeassistant.components import (
22  conversation,
23  media_source,
24  stt,
25  tts,
26  wake_word,
27  websocket_api,
28 )
29 from homeassistant.components.tts import (
30  generate_media_source_id as tts_generate_media_source_id,
31 )
32 from homeassistant.core import Context, HomeAssistant, callback
33 from homeassistant.exceptions import HomeAssistantError
34 from homeassistant.helpers import intent
36  CHANGE_UPDATED,
37  CollectionError,
38  ItemNotFound,
39  SerializedStorageCollection,
40  StorageCollection,
41  StorageCollectionWebsocket,
42 )
43 from homeassistant.helpers.singleton import singleton
44 from homeassistant.helpers.storage import Store
45 from homeassistant.helpers.typing import UNDEFINED, UndefinedType, VolDictType
46 from homeassistant.util import (
47  dt as dt_util,
48  language as language_util,
49  ulid as ulid_util,
50 )
51 from homeassistant.util.limited_size_dict import LimitedSizeDict
52 
53 from .audio_enhancer import AudioEnhancer, EnhancedAudioChunk, MicroVadSpeexEnhancer
54 from .const import (
55  BYTES_PER_CHUNK,
56  CONF_DEBUG_RECORDING_DIR,
57  DATA_CONFIG,
58  DATA_LAST_WAKE_UP,
59  DATA_MIGRATIONS,
60  DOMAIN,
61  MS_PER_CHUNK,
62  SAMPLE_CHANNELS,
63  SAMPLE_RATE,
64  SAMPLE_WIDTH,
65  SAMPLES_PER_CHUNK,
66  WAKE_WORD_COOLDOWN,
67 )
68 from .error import (
69  DuplicateWakeUpDetectedError,
70  IntentRecognitionError,
71  PipelineError,
72  PipelineNotFound,
73  SpeechToTextError,
74  TextToSpeechError,
75  WakeWordDetectionAborted,
76  WakeWordDetectionError,
77  WakeWordTimeoutError,
78 )
79 from .vad import AudioBuffer, VoiceActivityTimeout, VoiceCommandSegmenter, chunk_samples
80 
81 _LOGGER = logging.getLogger(__name__)
82 
83 STORAGE_KEY = f"{DOMAIN}.pipelines"
84 STORAGE_VERSION = 1
85 STORAGE_VERSION_MINOR = 2
86 
87 ENGINE_LANGUAGE_PAIRS = (
88  ("stt_engine", "stt_language"),
89  ("tts_engine", "tts_language"),
90 )
91 
92 
93 def validate_language(data: dict[str, Any]) -> Any:
94  """Validate language settings."""
95  for engine, language in ENGINE_LANGUAGE_PAIRS:
96  if data[engine] is not None and data[language] is None:
97  raise vol.Invalid(f"Need language {language} for {engine} {data[engine]}")
98  return data
99 
100 
101 PIPELINE_FIELDS: VolDictType = {
102  vol.Required("conversation_engine"): str,
103  vol.Required("conversation_language"): str,
104  vol.Required("language"): str,
105  vol.Required("name"): str,
106  vol.Required("stt_engine"): vol.Any(str, None),
107  vol.Required("stt_language"): vol.Any(str, None),
108  vol.Required("tts_engine"): vol.Any(str, None),
109  vol.Required("tts_language"): vol.Any(str, None),
110  vol.Required("tts_voice"): vol.Any(str, None),
111  vol.Required("wake_word_entity"): vol.Any(str, None),
112  vol.Required("wake_word_id"): vol.Any(str, None),
113  vol.Optional("prefer_local_intents"): bool,
114 }
115 
116 STORED_PIPELINE_RUNS = 10
117 
118 SAVE_DELAY = 10
119 
120 
121 @callback
123  hass: HomeAssistant,
124  *,
125  conversation_engine_id: str | None = None,
126  stt_engine_id: str | None = None,
127  tts_engine_id: str | None = None,
128  pipeline_name: str,
129 ) -> dict[str, str | None]:
130  """Resolve settings for a default pipeline.
131 
132  The default pipeline will use the homeassistant conversation agent and the
133  default stt / tts engines if none are specified.
134  """
135  conversation_language = "en"
136  pipeline_language = "en"
137  stt_engine = None
138  stt_language = None
139  tts_engine = None
140  tts_language = None
141  tts_voice = None
142  wake_word_entity = None
143  wake_word_id = None
144 
145  if conversation_engine_id is None:
146  conversation_engine_id = conversation.HOME_ASSISTANT_AGENT
147 
148  # Find a matching language supported by the Home Assistant conversation agent
149  conversation_languages = language_util.matches(
150  hass.config.language,
151  conversation.async_get_conversation_languages(hass, conversation_engine_id),
152  country=hass.config.country,
153  )
154  if conversation_languages:
155  pipeline_language = hass.config.language
156  conversation_language = conversation_languages[0]
157 
158  if stt_engine_id is None:
159  stt_engine_id = stt.async_default_engine(hass)
160 
161  if stt_engine_id is not None:
162  stt_engine = stt.async_get_speech_to_text_engine(hass, stt_engine_id)
163  if stt_engine is None:
164  stt_engine_id = None
165 
166  if stt_engine:
167  stt_languages = language_util.matches(
168  pipeline_language,
169  stt_engine.supported_languages,
170  country=hass.config.country,
171  )
172  if stt_languages:
173  stt_language = stt_languages[0]
174  else:
175  _LOGGER.debug(
176  "Speech-to-text engine '%s' does not support language '%s'",
177  stt_engine_id,
178  pipeline_language,
179  )
180  stt_engine_id = None
181 
182  if tts_engine_id is None:
183  tts_engine_id = tts.async_default_engine(hass)
184 
185  if tts_engine_id is not None:
186  tts_engine = tts.get_engine_instance(hass, tts_engine_id)
187  if tts_engine is None:
188  tts_engine_id = None
189 
190  if tts_engine:
191  tts_languages = language_util.matches(
192  pipeline_language,
193  tts_engine.supported_languages,
194  country=hass.config.country,
195  )
196  if tts_languages:
197  tts_language = tts_languages[0]
198  tts_voices = tts_engine.async_get_supported_voices(tts_language)
199  if tts_voices:
200  tts_voice = tts_voices[0].voice_id
201  else:
202  _LOGGER.debug(
203  "Text-to-speech engine '%s' does not support language '%s'",
204  tts_engine_id,
205  pipeline_language,
206  )
207  tts_engine_id = None
208 
209  return {
210  "conversation_engine": conversation_engine_id,
211  "conversation_language": conversation_language,
212  "language": hass.config.language,
213  "name": pipeline_name,
214  "stt_engine": stt_engine_id,
215  "stt_language": stt_language,
216  "tts_engine": tts_engine_id,
217  "tts_language": tts_language,
218  "tts_voice": tts_voice,
219  "wake_word_entity": wake_word_entity,
220  "wake_word_id": wake_word_id,
221  }
222 
223 
225  hass: HomeAssistant, pipeline_store: PipelineStorageCollection
226 ) -> Pipeline:
227  """Create a default pipeline.
228 
229  The default pipeline will use the homeassistant conversation agent and the
230  default stt / tts engines.
231  """
232  pipeline_settings = _async_resolve_default_pipeline_settings(
233  hass, pipeline_name="Home Assistant"
234  )
235  return await pipeline_store.async_create_item(pipeline_settings)
236 
237 
239  hass: HomeAssistant,
240  stt_engine_id: str,
241  tts_engine_id: str,
242  pipeline_name: str,
243 ) -> Pipeline | None:
244  """Create a pipeline with default settings.
245 
246  The default pipeline will use the homeassistant conversation agent and the
247  specified stt / tts engines.
248  """
249  pipeline_data: PipelineData = hass.data[DOMAIN]
250  pipeline_store = pipeline_data.pipeline_store
251  pipeline_settings = _async_resolve_default_pipeline_settings(
252  hass,
253  stt_engine_id=stt_engine_id,
254  tts_engine_id=tts_engine_id,
255  pipeline_name=pipeline_name,
256  )
257  if (
258  pipeline_settings["stt_engine"] != stt_engine_id
259  or pipeline_settings["tts_engine"] != tts_engine_id
260  ):
261  return None
262  return await pipeline_store.async_create_item(pipeline_settings)
263 
264 
265 @callback
267  hass: HomeAssistant, entity_id: str
268 ) -> Pipeline:
269  """Get a pipeline by conversation entity ID."""
270  entity = hass.states.get(entity_id)
272  hass,
273  pipeline_name=entity.name if entity else entity_id,
274  conversation_engine_id=entity_id,
275  )
276  settings["id"] = entity_id
277 
278  return Pipeline.from_json(settings)
279 
280 
281 @callback
282 def async_get_pipeline(hass: HomeAssistant, pipeline_id: str | None = None) -> Pipeline:
283  """Get a pipeline by id or the preferred pipeline."""
284  pipeline_data: PipelineData = hass.data[DOMAIN]
285 
286  if pipeline_id is None:
287  # A pipeline was not specified, use the preferred one
288  pipeline_id = pipeline_data.pipeline_store.async_get_preferred_item()
289 
290  if pipeline_id.startswith("conversation."):
291  return _async_get_pipeline_from_conversation_entity(hass, pipeline_id)
292 
293  pipeline = pipeline_data.pipeline_store.data.get(pipeline_id)
294 
295  # If invalid pipeline ID was specified
296  if pipeline is None:
297  raise PipelineNotFound(
298  "pipeline_not_found", f"Pipeline {pipeline_id} not found"
299  )
300 
301  return pipeline
302 
303 
304 @callback
305 def async_get_pipelines(hass: HomeAssistant) -> list[Pipeline]:
306  """Get all pipelines."""
307  pipeline_data: PipelineData = hass.data[DOMAIN]
308 
309  return list(pipeline_data.pipeline_store.data.values())
310 
311 
313  hass: HomeAssistant,
314  pipeline: Pipeline,
315  *,
316  conversation_engine: str | UndefinedType = UNDEFINED,
317  conversation_language: str | UndefinedType = UNDEFINED,
318  language: str | UndefinedType = UNDEFINED,
319  name: str | UndefinedType = UNDEFINED,
320  stt_engine: str | None | UndefinedType = UNDEFINED,
321  stt_language: str | None | UndefinedType = UNDEFINED,
322  tts_engine: str | None | UndefinedType = UNDEFINED,
323  tts_language: str | None | UndefinedType = UNDEFINED,
324  tts_voice: str | None | UndefinedType = UNDEFINED,
325  wake_word_entity: str | None | UndefinedType = UNDEFINED,
326  wake_word_id: str | None | UndefinedType = UNDEFINED,
327  prefer_local_intents: bool | UndefinedType = UNDEFINED,
328 ) -> None:
329  """Update a pipeline."""
330  pipeline_data: PipelineData = hass.data[DOMAIN]
331 
332  updates: dict[str, Any] = pipeline.to_json()
333  updates.pop("id")
334  # Refactor this once we bump to Python 3.12
335  # and have https://peps.python.org/pep-0692/
336  updates.update(
337  {
338  key: val
339  for key, val in (
340  ("conversation_engine", conversation_engine),
341  ("conversation_language", conversation_language),
342  ("language", language),
343  ("name", name),
344  ("stt_engine", stt_engine),
345  ("stt_language", stt_language),
346  ("tts_engine", tts_engine),
347  ("tts_language", tts_language),
348  ("tts_voice", tts_voice),
349  ("wake_word_entity", wake_word_entity),
350  ("wake_word_id", wake_word_id),
351  ("prefer_local_intents", prefer_local_intents),
352  )
353  if val is not UNDEFINED
354  }
355  )
356 
357  await pipeline_data.pipeline_store.async_update_item(pipeline.id, updates)
358 
359 
360 class PipelineEventType(StrEnum):
361  """Event types emitted during a pipeline run."""
362 
363  RUN_START = "run-start"
364  RUN_END = "run-end"
365  WAKE_WORD_START = "wake_word-start"
366  WAKE_WORD_END = "wake_word-end"
367  STT_START = "stt-start"
368  STT_VAD_START = "stt-vad-start"
369  STT_VAD_END = "stt-vad-end"
370  STT_END = "stt-end"
371  INTENT_START = "intent-start"
372  INTENT_END = "intent-end"
373  TTS_START = "tts-start"
374  TTS_END = "tts-end"
375  ERROR = "error"
376 
377 
378 @dataclass(frozen=True)
380  """Events emitted during a pipeline run."""
381 
382  type: PipelineEventType
383  data: dict[str, Any] | None = None
384  timestamp: str = field(default_factory=lambda: dt_util.utcnow().isoformat())
385 
386 
387 type PipelineEventCallback = Callable[[PipelineEvent], None]
388 
389 
390 @dataclass(frozen=True)
391 class Pipeline:
392  """A voice assistant pipeline."""
393 
394  conversation_engine: str
395  conversation_language: str
396  language: str
397  name: str
398  stt_engine: str | None
399  stt_language: str | None
400  tts_engine: str | None
401  tts_language: str | None
402  tts_voice: str | None
403  wake_word_entity: str | None
404  wake_word_id: str | None
405  prefer_local_intents: bool = False
406 
407  id: str = field(default_factory=ulid_util.ulid_now)
408 
409  @classmethod
410  def from_json(cls, data: dict[str, Any]) -> Pipeline:
411  """Create an instance from a JSON serialization.
412 
413  This function was added in HA Core 2023.10, previous versions will raise
414  if there are unexpected items in the serialized data.
415  """
416  return cls(
417  conversation_engine=data["conversation_engine"],
418  conversation_language=data["conversation_language"],
419  id=data["id"],
420  language=data["language"],
421  name=data["name"],
422  stt_engine=data["stt_engine"],
423  stt_language=data["stt_language"],
424  tts_engine=data["tts_engine"],
425  tts_language=data["tts_language"],
426  tts_voice=data["tts_voice"],
427  wake_word_entity=data["wake_word_entity"],
428  wake_word_id=data["wake_word_id"],
429  prefer_local_intents=data.get("prefer_local_intents", False),
430  )
431 
432  def to_json(self) -> dict[str, Any]:
433  """Return a JSON serializable representation for storage."""
434  return {
435  "conversation_engine": self.conversation_engine,
436  "conversation_language": self.conversation_language,
437  "id": self.id,
438  "language": self.language,
439  "name": self.name,
440  "stt_engine": self.stt_engine,
441  "stt_language": self.stt_language,
442  "tts_engine": self.tts_engine,
443  "tts_language": self.tts_language,
444  "tts_voice": self.tts_voice,
445  "wake_word_entity": self.wake_word_entity,
446  "wake_word_id": self.wake_word_id,
447  "prefer_local_intents": self.prefer_local_intents,
448  }
449 
450 
451 class PipelineStage(StrEnum):
452  """Stages of a pipeline."""
453 
454  WAKE_WORD = "wake_word"
455  STT = "stt"
456  INTENT = "intent"
457  TTS = "tts"
458  END = "end"
459 
460 
461 PIPELINE_STAGE_ORDER = [
462  PipelineStage.WAKE_WORD,
463  PipelineStage.STT,
464  PipelineStage.INTENT,
465  PipelineStage.TTS,
466 ]
467 
468 
469 class PipelineRunValidationError(Exception):
470  """Error when a pipeline run is not valid."""
471 
472 
473 class InvalidPipelineStagesError(PipelineRunValidationError):
474  """Error when given an invalid combination of start/end stages."""
475 
476  def __init__(
477  self,
478  start_stage: PipelineStage,
479  end_stage: PipelineStage,
480  ) -> None:
481  """Set error message."""
482  super().__init__(
483  f"Invalid stage combination: start={start_stage}, end={end_stage}"
484  )
485 
486 
487 @dataclass(frozen=True)
489  """Settings for wake word detection."""
490 
491  timeout: float | None = None
492  """Seconds of silence before detection times out."""
493 
494  audio_seconds_to_buffer: float = 0
495  """Seconds of audio to buffer before detection and forward to STT."""
496 
497 
498 @dataclass(frozen=True)
500  """Settings for pipeline audio processing."""
501 
502  noise_suppression_level: int = 0
503  """Level of noise suppression (0 = disabled, 4 = max)"""
504 
505  auto_gain_dbfs: int = 0
506  """Amount of automatic gain in dbFS (0 = disabled, 31 = max)"""
507 
508  volume_multiplier: float = 1.0
509  """Multiplier used directly on PCM samples (1.0 = no change, 2.0 = twice as loud)"""
510 
511  is_vad_enabled: bool = True
512  """True if VAD is used to determine the end of the voice command."""
513 
514  silence_seconds: float = 0.7
515  """Seconds of silence after voice command has ended."""
516 
517  def __post_init__(self) -> None:
518  """Verify settings post-initialization."""
519  if (self.noise_suppression_level < 0) or (self.noise_suppression_level > 4):
520  raise ValueError("noise_suppression_level must be in [0, 4]")
521 
522  if (self.auto_gain_dbfs < 0) or (self.auto_gain_dbfs > 31):
523  raise ValueError("auto_gain_dbfs must be in [0, 31]")
524 
525  @property
526  def needs_processor(self) -> bool:
527  """True if an audio processor is needed."""
528  return (
529  self.is_vad_enabled
530  or (self.noise_suppression_level > 0)
531  or (self.auto_gain_dbfs > 0)
532  )
533 
534 
535 @dataclass
537  """Running context for a pipeline."""
538 
539  hass: HomeAssistant
540  context: Context
541  pipeline: Pipeline
542  start_stage: PipelineStage
543  end_stage: PipelineStage
544  event_callback: PipelineEventCallback
545  language: str = None # type: ignore[assignment]
546  runner_data: Any | None = None
547  intent_agent: str | None = None
548  tts_audio_output: str | dict[str, Any] | None = None
549  wake_word_settings: WakeWordSettings | None = None
550  audio_settings: AudioSettings = field(default_factory=AudioSettings)
551 
552  id: str = field(default_factory=ulid_util.ulid_now)
553  stt_provider: stt.SpeechToTextEntity | stt.Provider = field(init=False, repr=False)
554  tts_engine: str = field(init=False, repr=False)
555  tts_options: dict | None = field(init=False, default=None)
556  wake_word_entity_id: str | None = field(init=False, default=None, repr=False)
557  wake_word_entity: wake_word.WakeWordDetectionEntity = field(init=False, repr=False)
558 
559  abort_wake_word_detection: bool = field(init=False, default=False)
560 
561  debug_recording_thread: Thread | None = None
562  """Thread that records audio to debug_recording_dir"""
563 
564  debug_recording_queue: Queue[str | bytes | None] | None = None
565  """Queue to communicate with debug recording thread"""
566 
567  audio_enhancer: AudioEnhancer | None = None
568  """VAD/noise suppression/auto gain"""
569 
570  audio_chunking_buffer: AudioBuffer = field(
571  default_factory=lambda: AudioBuffer(BYTES_PER_CHUNK)
572  )
573  """Buffer used when splitting audio into chunks for audio processing"""
574 
575  _device_id: str | None = None
576  """Optional device id set during run start."""
577 
578  def __post_init__(self) -> None:
579  """Set language for pipeline."""
580  self.languagelanguage = self.pipeline.language or self.hass.config.language
581 
582  # wake -> stt -> intent -> tts
583  if PIPELINE_STAGE_ORDER.index(self.end_stage) < PIPELINE_STAGE_ORDER.index(
584  self.start_stage
585  ):
586  raise InvalidPipelineStagesError(self.start_stage, self.end_stage)
587 
588  pipeline_data: PipelineData = self.hass.data[DOMAIN]
589  if self.pipeline.id not in pipeline_data.pipeline_debug:
590  pipeline_data.pipeline_debug[self.pipeline.id] = LimitedSizeDict(
591  size_limit=STORED_PIPELINE_RUNS
592  )
593  pipeline_data.pipeline_debug[self.pipeline.id][self.idid] = PipelineRunDebug()
594  pipeline_data.pipeline_runs.add_run(self)
595 
596  # Initialize with audio settings
597  if self.audio_settings.needs_processor and (self.audio_enhanceraudio_enhancer is None):
598  # Default audio enhancer
599  self.audio_enhanceraudio_enhancer = MicroVadSpeexEnhancer(
600  self.audio_settings.auto_gain_dbfs,
601  self.audio_settings.noise_suppression_level,
602  self.audio_settings.is_vad_enabled,
603  )
604 
605  def __eq__(self, other: object) -> bool:
606  """Compare pipeline runs by id."""
607  if isinstance(other, PipelineRun):
608  return self.idid == other.id
609 
610  return False
611 
612  @callback
613  def process_event(self, event: PipelineEvent) -> None:
614  """Log an event and call listener."""
615  self.event_callback(event)
616  pipeline_data: PipelineData = self.hass.data[DOMAIN]
617  if self.idid not in pipeline_data.pipeline_debug[self.pipeline.id]:
618  # This run has been evicted from the logged pipeline runs already
619  return
620  pipeline_data.pipeline_debug[self.pipeline.id][self.idid].events.append(event)
621 
622  def start(self, device_id: str | None) -> None:
623  """Emit run start event."""
624  self._device_id_device_id = device_id
625  self._start_debug_recording_thread_start_debug_recording_thread()
626 
627  data = {
628  "pipeline": self.pipeline.id,
629  "language": self.languagelanguage,
630  }
631  if self.runner_data is not None:
632  data["runner_data"] = self.runner_data
633 
634  self.process_eventprocess_event(PipelineEvent(PipelineEventType.RUN_START, data))
635 
636  async def end(self) -> None:
637  """Emit run end event."""
638  # Signal end of stream to listeners
639  self._capture_chunk_capture_chunk(None)
640 
641  # Stop the recording thread before emitting run-end.
642  # This ensures that files are properly closed if the event handler reads them.
643  await self._stop_debug_recording_thread_stop_debug_recording_thread()
644 
645  self.process_eventprocess_event(
647  PipelineEventType.RUN_END,
648  )
649  )
650 
651  pipeline_data: PipelineData = self.hass.data[DOMAIN]
652  pipeline_data.pipeline_runs.remove_run(self)
653 
654  async def prepare_wake_word_detection(self) -> None:
655  """Prepare wake-word-detection."""
656  entity_id = self.pipeline.wake_word_entity or wake_word.async_default_entity(
657  self.hass
658  )
659  if entity_id is None:
661  code="wake-engine-missing",
662  message="No wake word engine",
663  )
664 
665  wake_word_entity = wake_word.async_get_wake_word_detection_entity(
666  self.hass, entity_id
667  )
668  if wake_word_entity is None:
670  code="wake-provider-missing",
671  message=f"No wake-word-detection provider for: {entity_id}",
672  )
673 
674  self.wake_word_entity_idwake_word_entity_id = entity_id
675  self.wake_word_entitywake_word_entity = wake_word_entity
676 
678  self,
679  stream: AsyncIterable[EnhancedAudioChunk],
680  audio_chunks_for_stt: list[EnhancedAudioChunk],
681  ) -> wake_word.DetectionResult | None:
682  """Run wake-word-detection portion of pipeline. Returns detection result."""
683  metadata_dict = asdict(
685  language="",
686  format=stt.AudioFormats.WAV,
687  codec=stt.AudioCodecs.PCM,
688  bit_rate=stt.AudioBitRates.BITRATE_16,
689  sample_rate=stt.AudioSampleRates.SAMPLERATE_16000,
690  channel=stt.AudioChannels.CHANNEL_MONO,
691  )
692  )
693 
694  wake_word_settings = self.wake_word_settings or WakeWordSettings()
695 
696  # Remove language since it doesn't apply to wake words yet
697  metadata_dict.pop("language", None)
698 
699  self.process_eventprocess_event(
701  PipelineEventType.WAKE_WORD_START,
702  {
703  "entity_id": self.wake_word_entity_idwake_word_entity_id,
704  "metadata": metadata_dict,
705  "timeout": wake_word_settings.timeout or 0,
706  },
707  )
708  )
709 
710  if self.debug_recording_queuedebug_recording_queue is not None:
711  self.debug_recording_queuedebug_recording_queue.put_nowait(f"00_wake-{self.wake_word_entity_id}")
712 
713  wake_word_vad: VoiceActivityTimeout | None = None
714  if (wake_word_settings.timeout is not None) and (
715  wake_word_settings.timeout > 0
716  ):
717  # Use VAD to determine timeout
718  wake_word_vad = VoiceActivityTimeout(wake_word_settings.timeout)
719 
720  # Audio chunk buffer. This audio will be forwarded to speech-to-text
721  # after wake-word-detection.
722  num_audio_chunks_to_buffer = int(
723  (wake_word_settings.audio_seconds_to_buffer * SAMPLE_RATE)
724  / SAMPLES_PER_CHUNK
725  )
726 
727  stt_audio_buffer: deque[EnhancedAudioChunk] | None = None
728  if num_audio_chunks_to_buffer > 0:
729  stt_audio_buffer = deque(maxlen=num_audio_chunks_to_buffer)
730 
731  try:
732  # Detect wake word(s)
733  result = await self.wake_word_entitywake_word_entity.async_process_audio_stream(
734  self._wake_word_audio_stream_wake_word_audio_stream(
735  audio_stream=stream,
736  stt_audio_buffer=stt_audio_buffer,
737  wake_word_vad=wake_word_vad,
738  ),
739  self.pipeline.wake_word_id,
740  )
741 
742  if stt_audio_buffer is not None:
743  # All audio kept from right before the wake word was detected as
744  # a single chunk.
745  audio_chunks_for_stt.extend(stt_audio_buffer)
746  except WakeWordDetectionAborted:
747  raise
748  except WakeWordTimeoutError:
749  _LOGGER.debug("Timeout during wake word detection")
750  raise
751  except Exception as src_error:
752  _LOGGER.exception("Unexpected error during wake-word-detection")
754  code="wake-stream-failed",
755  message="Unexpected error during wake-word-detection",
756  ) from src_error
757 
758  _LOGGER.debug("wake-word-detection result %s", result)
759 
760  if result is None:
761  wake_word_output: dict[str, Any] = {}
762  else:
763  # Avoid duplicate detections by checking cooldown
764  last_wake_up = self.hass.data[DATA_LAST_WAKE_UP].get(
765  result.wake_word_phrase
766  )
767  if last_wake_up is not None:
768  sec_since_last_wake_up = time.monotonic() - last_wake_up
769  if sec_since_last_wake_up < WAKE_WORD_COOLDOWN:
770  _LOGGER.debug(
771  "Duplicate wake word detection occurred for %s",
772  result.wake_word_phrase,
773  )
774  raise DuplicateWakeUpDetectedError(result.wake_word_phrase)
775 
776  # Record last wake up time to block duplicate detections
777  self.hass.data[DATA_LAST_WAKE_UP][result.wake_word_phrase] = (
778  time.monotonic()
779  )
780 
781  if result.queued_audio:
782  # Add audio that was pending at detection.
783  #
784  # Because detection occurs *after* the wake word was actually
785  # spoken, we need to make sure pending audio is forwarded to
786  # speech-to-text so the user does not have to pause before
787  # speaking the voice command.
788  audio_chunks_for_stt.extend(
790  audio=chunk_ts[0],
791  timestamp_ms=chunk_ts[1],
792  speech_probability=None,
793  )
794  for chunk_ts in result.queued_audio
795  )
796 
797  wake_word_output = asdict(result)
798 
799  # Remove non-JSON fields
800  wake_word_output.pop("queued_audio", None)
801 
802  self.process_eventprocess_event(
804  PipelineEventType.WAKE_WORD_END,
805  {"wake_word_output": wake_word_output},
806  )
807  )
808 
809  return result
810 
812  self,
813  audio_stream: AsyncIterable[EnhancedAudioChunk],
814  stt_audio_buffer: deque[EnhancedAudioChunk] | None,
815  wake_word_vad: VoiceActivityTimeout | None,
816  sample_rate: int = SAMPLE_RATE,
817  sample_width: int = SAMPLE_WIDTH,
818  ) -> AsyncIterable[tuple[bytes, int]]:
819  """Yield audio chunks with timestamps (milliseconds since start of stream).
820 
821  Adds audio to a ring buffer that will be forwarded to speech-to-text after
822  detection. Times out if VAD detects enough silence.
823  """
824  async for chunk in audio_stream:
825  if self.abort_wake_word_detection:
826  raise WakeWordDetectionAborted
827 
828  self._capture_chunk_capture_chunk(chunk.audio)
829  yield chunk.audio, chunk.timestamp_ms
830 
831  # Wake-word-detection occurs *after* the wake word was actually
832  # spoken. Keeping audio right before detection allows the voice
833  # command to be spoken immediately after the wake word.
834  if stt_audio_buffer is not None:
835  stt_audio_buffer.append(chunk)
836 
837  if wake_word_vad is not None:
838  chunk_seconds = (len(chunk.audio) // sample_width) / sample_rate
839  if not wake_word_vad.process(chunk_seconds, chunk.speech_probability):
840  raise WakeWordTimeoutError(
841  code="wake-word-timeout", message="Wake word was not detected"
842  )
843 
844  async def prepare_speech_to_text(self, metadata: stt.SpeechMetadata) -> None:
845  """Prepare speech-to-text."""
846  # pipeline.stt_engine can't be None or this function is not called
847  stt_provider = stt.async_get_speech_to_text_engine(
848  self.hass,
849  self.pipeline.stt_engine, # type: ignore[arg-type]
850  )
851 
852  if stt_provider is None:
853  engine = self.pipeline.stt_engine
854  raise SpeechToTextError(
855  code="stt-provider-missing",
856  message=f"No speech-to-text provider for: {engine}",
857  )
858 
859  metadata.language = self.pipeline.stt_language or self.languagelanguage
860 
861  if not stt_provider.check_metadata(metadata):
862  raise SpeechToTextError(
863  code="stt-provider-unsupported-metadata",
864  message=(
865  f"Provider {stt_provider.name} does not support input speech "
866  f"to text metadata {metadata}"
867  ),
868  )
869 
870  self.stt_providerstt_provider = stt_provider
871 
872  async def speech_to_text(
873  self,
874  metadata: stt.SpeechMetadata,
875  stream: AsyncIterable[EnhancedAudioChunk],
876  ) -> str:
877  """Run speech-to-text portion of pipeline. Returns the spoken text."""
878  # Create a background task to prepare the conversation agent
879  if self.end_stage >= PipelineStage.INTENT:
880  self.hass.async_create_background_task(
881  conversation.async_prepare_agent(
882  self.hass, self.intent_agentintent_agent, self.languagelanguage
883  ),
884  f"prepare conversation agent {self.intent_agent}",
885  )
886 
887  if isinstance(self.stt_providerstt_provider, stt.Provider):
888  engine = self.stt_providerstt_provider.name
889  else:
890  engine = self.stt_providerstt_provider.entity_id
891 
892  self.process_eventprocess_event(
894  PipelineEventType.STT_START,
895  {
896  "engine": engine,
897  "metadata": asdict(metadata),
898  },
899  )
900  )
901 
902  if self.debug_recording_queuedebug_recording_queue is not None:
903  # New recording
904  self.debug_recording_queuedebug_recording_queue.put_nowait(f"01_stt-{engine}")
905 
906  try:
907  # Transcribe audio stream
908  stt_vad: VoiceCommandSegmenter | None = None
909  if self.audio_settings.is_vad_enabled:
910  stt_vad = VoiceCommandSegmenter(
911  silence_seconds=self.audio_settings.silence_seconds
912  )
913 
914  result = await self.stt_providerstt_provider.async_process_audio_stream(
915  metadata,
916  self._speech_to_text_stream_speech_to_text_stream(audio_stream=stream, stt_vad=stt_vad),
917  )
918  except (asyncio.CancelledError, TimeoutError):
919  raise # expected
920  except Exception as src_error:
921  _LOGGER.exception("Unexpected error during speech-to-text")
922  raise SpeechToTextError(
923  code="stt-stream-failed",
924  message="Unexpected error during speech-to-text",
925  ) from src_error
926 
927  _LOGGER.debug("speech-to-text result %s", result)
928 
929  if result.result != stt.SpeechResultState.SUCCESS:
930  raise SpeechToTextError(
931  code="stt-stream-failed",
932  message="speech-to-text failed",
933  )
934 
935  if not result.text:
936  raise SpeechToTextError(
937  code="stt-no-text-recognized", message="No text recognized"
938  )
939 
940  self.process_eventprocess_event(
942  PipelineEventType.STT_END,
943  {
944  "stt_output": {
945  "text": result.text,
946  }
947  },
948  )
949  )
950 
951  return result.text
952 
954  self,
955  audio_stream: AsyncIterable[EnhancedAudioChunk],
956  stt_vad: VoiceCommandSegmenter | None,
957  sample_rate: int = SAMPLE_RATE,
958  sample_width: int = SAMPLE_WIDTH,
959  ) -> AsyncGenerator[bytes]:
960  """Yield audio chunks until VAD detects silence or speech-to-text completes."""
961  sent_vad_start = False
962  async for chunk in audio_stream:
963  self._capture_chunk_capture_chunk(chunk.audio)
964 
965  if stt_vad is not None:
966  chunk_seconds = (len(chunk.audio) // sample_width) / sample_rate
967  if not stt_vad.process(chunk_seconds, chunk.speech_probability):
968  # Silence detected at the end of voice command
969  self.process_eventprocess_event(
971  PipelineEventType.STT_VAD_END,
972  {"timestamp": chunk.timestamp_ms},
973  )
974  )
975  break
976 
977  if stt_vad.in_command and (not sent_vad_start):
978  # Speech detected at start of voice command
979  self.process_eventprocess_event(
981  PipelineEventType.STT_VAD_START,
982  {"timestamp": chunk.timestamp_ms},
983  )
984  )
985  sent_vad_start = True
986 
987  yield chunk.audio
988 
989  async def prepare_recognize_intent(self) -> None:
990  """Prepare recognizing an intent."""
991  agent_info = conversation.async_get_agent_info(
992  self.hass,
993  self.pipeline.conversation_engine or conversation.HOME_ASSISTANT_AGENT,
994  )
995 
996  if agent_info is None:
997  engine = self.pipeline.conversation_engine or "default"
999  code="intent-not-supported",
1000  message=f"Intent recognition engine {engine} is not found",
1001  )
1002 
1003  self.intent_agentintent_agent = agent_info.id
1004 
1005  async def recognize_intent(
1006  self, intent_input: str, conversation_id: str | None, device_id: str | None
1007  ) -> str:
1008  """Run intent recognition portion of pipeline. Returns text to speak."""
1009  if self.intent_agentintent_agent is None:
1010  raise RuntimeError("Recognize intent was not prepared")
1011 
1012  self.process_eventprocess_event(
1013  PipelineEvent(
1014  PipelineEventType.INTENT_START,
1015  {
1016  "engine": self.intent_agentintent_agent,
1017  "language": self.pipeline.conversation_language,
1018  "intent_input": intent_input,
1019  "conversation_id": conversation_id,
1020  "device_id": device_id,
1021  "prefer_local_intents": self.pipeline.prefer_local_intents,
1022  },
1023  )
1024  )
1025 
1026  try:
1027  user_input = conversation.ConversationInput(
1028  text=intent_input,
1029  context=self.context,
1030  conversation_id=conversation_id,
1031  device_id=device_id,
1032  language=self.pipeline.language,
1033  agent_id=self.intent_agentintent_agent,
1034  )
1035  processed_locally = self.intent_agentintent_agent == conversation.HOME_ASSISTANT_AGENT
1036 
1037  conversation_result: conversation.ConversationResult | None = None
1038  if user_input.agent_id != conversation.HOME_ASSISTANT_AGENT:
1039  # Sentence triggers override conversation agent
1040  if (
1041  trigger_response_text
1042  := await conversation.async_handle_sentence_triggers(
1043  self.hass, user_input
1044  )
1045  ) is not None:
1046  # Sentence trigger matched
1047  trigger_response = intent.IntentResponse(
1048  self.pipeline.conversation_language
1049  )
1050  trigger_response.async_set_speech(trigger_response_text)
1051  conversation_result = conversation.ConversationResult(
1052  response=trigger_response,
1053  conversation_id=user_input.conversation_id,
1054  )
1055  # Try local intents first, if preferred.
1056  elif self.pipeline.prefer_local_intents and (
1057  intent_response := await conversation.async_handle_intents(
1058  self.hass, user_input
1059  )
1060  ):
1061  # Local intent matched
1062  conversation_result = conversation.ConversationResult(
1063  response=intent_response,
1064  conversation_id=user_input.conversation_id,
1065  )
1066  processed_locally = True
1067 
1068  if conversation_result is None:
1069  # Fall back to pipeline conversation agent
1070  conversation_result = await conversation.async_converse(
1071  hass=self.hass,
1072  text=user_input.text,
1073  conversation_id=user_input.conversation_id,
1074  device_id=user_input.device_id,
1075  context=user_input.context,
1076  language=user_input.language,
1077  agent_id=user_input.agent_id,
1078  )
1079  except Exception as src_error:
1080  _LOGGER.exception("Unexpected error during intent recognition")
1081  raise IntentRecognitionError(
1082  code="intent-failed",
1083  message="Unexpected error during intent recognition",
1084  ) from src_error
1085 
1086  _LOGGER.debug("conversation result %s", conversation_result)
1087 
1088  self.process_eventprocess_event(
1089  PipelineEvent(
1090  PipelineEventType.INTENT_END,
1091  {
1092  "processed_locally": processed_locally,
1093  "intent_output": conversation_result.as_dict(),
1094  },
1095  )
1096  )
1097 
1098  speech: str = conversation_result.response.speech.get("plain", {}).get(
1099  "speech", ""
1100  )
1101 
1102  return speech
1103 
1104  async def prepare_text_to_speech(self) -> None:
1105  """Prepare text-to-speech."""
1106  # pipeline.tts_engine can't be None or this function is not called
1107  engine = cast(str, self.pipeline.tts_engine)
1108 
1109  tts_options: dict[str, Any] = {}
1110  if self.pipeline.tts_voice is not None:
1111  tts_options[tts.ATTR_VOICE] = self.pipeline.tts_voice
1112 
1113  if isinstance(self.tts_audio_outputtts_audio_output, dict):
1114  tts_options.update(self.tts_audio_outputtts_audio_output)
1115  elif isinstance(self.tts_audio_outputtts_audio_output, str):
1116  tts_options[tts.ATTR_PREFERRED_FORMAT] = self.tts_audio_outputtts_audio_output
1117  if self.tts_audio_outputtts_audio_output == "wav":
1118  # 16 Khz, 16-bit mono
1119  tts_options[tts.ATTR_PREFERRED_SAMPLE_RATE] = SAMPLE_RATE
1120  tts_options[tts.ATTR_PREFERRED_SAMPLE_CHANNELS] = SAMPLE_CHANNELS
1121  tts_options[tts.ATTR_PREFERRED_SAMPLE_BYTES] = SAMPLE_WIDTH
1122 
1123  try:
1124  options_supported = await tts.async_support_options(
1125  self.hass,
1126  engine,
1127  self.pipeline.tts_language,
1128  tts_options,
1129  )
1130  except HomeAssistantError as err:
1131  raise TextToSpeechError(
1132  code="tts-not-supported",
1133  message=f"Text-to-speech engine '{engine}' not found",
1134  ) from err
1135  if not options_supported:
1136  raise TextToSpeechError(
1137  code="tts-not-supported",
1138  message=(
1139  f"Text-to-speech engine {engine} "
1140  f"does not support language {self.pipeline.tts_language} or options {tts_options}"
1141  ),
1142  )
1143 
1144  self.tts_enginetts_engine = engine
1145  self.tts_optionstts_options = tts_options
1146 
1147  async def text_to_speech(self, tts_input: str) -> None:
1148  """Run text-to-speech portion of pipeline."""
1149  self.process_eventprocess_event(
1150  PipelineEvent(
1151  PipelineEventType.TTS_START,
1152  {
1153  "engine": self.tts_enginetts_engine,
1154  "language": self.pipeline.tts_language,
1155  "voice": self.pipeline.tts_voice,
1156  "tts_input": tts_input,
1157  },
1158  )
1159  )
1160 
1161  try:
1162  # Synthesize audio and get URL
1163  tts_media_id = tts_generate_media_source_id(
1164  self.hass,
1165  tts_input,
1166  engine=self.tts_enginetts_engine,
1167  language=self.pipeline.tts_language,
1168  options=self.tts_optionstts_options,
1169  )
1170  tts_media = await media_source.async_resolve_media(
1171  self.hass,
1172  tts_media_id,
1173  None,
1174  )
1175  except Exception as src_error:
1176  _LOGGER.exception("Unexpected error during text-to-speech")
1177  raise TextToSpeechError(
1178  code="tts-failed",
1179  message="Unexpected error during text-to-speech",
1180  ) from src_error
1181 
1182  _LOGGER.debug("TTS result %s", tts_media)
1183  tts_output = {
1184  "media_id": tts_media_id,
1185  **asdict(tts_media),
1186  }
1187 
1188  self.process_eventprocess_event(
1189  PipelineEvent(PipelineEventType.TTS_END, {"tts_output": tts_output})
1190  )
1191 
1192  def _capture_chunk(self, audio_bytes: bytes | None) -> None:
1193  """Forward audio chunk to various capturing mechanisms."""
1194  if self.debug_recording_queuedebug_recording_queue is not None:
1195  # Forward to debug WAV file recording
1196  self.debug_recording_queuedebug_recording_queue.put_nowait(audio_bytes)
1197 
1198  if self._device_id_device_id is None:
1199  return
1200 
1201  # Forward to device audio capture
1202  pipeline_data: PipelineData = self.hass.data[DOMAIN]
1203  audio_queue = pipeline_data.device_audio_queues.get(self._device_id_device_id)
1204  if audio_queue is None:
1205  return
1206 
1207  try:
1208  audio_queue.queue.put_nowait(audio_bytes)
1209  except asyncio.QueueFull:
1210  audio_queue.overflow = True
1211  _LOGGER.warning("Audio queue full for device %s", self._device_id_device_id)
1212 
1214  """Start thread to record wake/stt audio if debug_recording_dir is set."""
1215  if self.debug_recording_threaddebug_recording_thread is not None:
1216  # Already started
1217  return
1218 
1219  # Directory to save audio for each pipeline run.
1220  # Configured in YAML for assist_pipeline.
1221  if debug_recording_dir := self.hass.data[DATA_CONFIG].get(
1222  CONF_DEBUG_RECORDING_DIR
1223  ):
1224  if self._device_id_device_id is None:
1225  # <debug_recording_dir>/<pipeline.name>/<run.id>
1226  run_recording_dir = (
1227  Path(debug_recording_dir)
1228  / self.pipeline.name
1229  / str(time.monotonic_ns())
1230  )
1231  else:
1232  # <debug_recording_dir>/<device_id>/<pipeline.name>/<run.id>
1233  run_recording_dir = (
1234  Path(debug_recording_dir)
1235  / self._device_id_device_id
1236  / self.pipeline.name
1237  / str(time.monotonic_ns())
1238  )
1239 
1240  self.debug_recording_queuedebug_recording_queue = Queue()
1241  self.debug_recording_threaddebug_recording_thread = Thread(
1242  target=_pipeline_debug_recording_thread_proc,
1243  args=(run_recording_dir, self.debug_recording_queuedebug_recording_queue),
1244  daemon=True,
1245  )
1246  self.debug_recording_threaddebug_recording_thread.start()
1247 
1248  async def _stop_debug_recording_thread(self) -> None:
1249  """Stop recording thread."""
1250  if (self.debug_recording_threaddebug_recording_thread is None) or (
1251  self.debug_recording_queuedebug_recording_queue is None
1252  ):
1253  # Not running
1254  return
1255 
1256  # NOTE: Expecting a None to have been put in self.debug_recording_queue
1257  # in self.end() to signal the thread to stop.
1258 
1259  # Wait until the thread has finished to ensure that files are fully written
1260  await self.hass.async_add_executor_job(self.debug_recording_threaddebug_recording_thread.join)
1261 
1262  self.debug_recording_queuedebug_recording_queue = None
1263  self.debug_recording_threaddebug_recording_thread = None
1264 
1266  self, audio_stream: AsyncIterable[bytes]
1267  ) -> AsyncGenerator[EnhancedAudioChunk]:
1268  """Apply volume transformation only (no VAD/audio enhancements) with optional chunking."""
1269  timestamp_ms = 0
1270  async for chunk in audio_stream:
1271  if self.audio_settings.volume_multiplier != 1.0:
1272  chunk = _multiply_volume(chunk, self.audio_settings.volume_multiplier)
1273 
1274  for sub_chunk in chunk_samples(
1275  chunk, BYTES_PER_CHUNK, self.audio_chunking_buffer
1276  ):
1277  yield EnhancedAudioChunk(
1278  audio=sub_chunk,
1279  timestamp_ms=timestamp_ms,
1280  speech_probability=None, # no VAD
1281  )
1282  timestamp_ms += MS_PER_CHUNK
1283 
1285  self, audio_stream: AsyncIterable[bytes]
1286  ) -> AsyncGenerator[EnhancedAudioChunk]:
1287  """Split audio into chunks and apply VAD/noise suppression/auto gain/volume transformation."""
1288  assert self.audio_enhanceraudio_enhancer is not None
1289 
1290  timestamp_ms = 0
1291  async for dirty_samples in audio_stream:
1292  if self.audio_settings.volume_multiplier != 1.0:
1293  # Static gain
1294  dirty_samples = _multiply_volume(
1295  dirty_samples, self.audio_settings.volume_multiplier
1296  )
1297 
1298  # Split into chunks for audio enhancements/VAD
1299  for dirty_chunk in chunk_samples(
1300  dirty_samples, BYTES_PER_CHUNK, self.audio_chunking_buffer
1301  ):
1302  yield self.audio_enhanceraudio_enhancer.enhance_chunk(dirty_chunk, timestamp_ms)
1303  timestamp_ms += MS_PER_CHUNK
1304 
1305 
1306 def _multiply_volume(chunk: bytes, volume_multiplier: float) -> bytes:
1307  """Multiplies 16-bit PCM samples by a constant."""
1308 
1309  def _clamp(val: float) -> float:
1310  """Clamp to signed 16-bit."""
1311  return max(-32768, min(32767, val))
1312 
1313  return array.array(
1314  "h",
1315  (int(_clamp(value * volume_multiplier)) for value in array.array("h", chunk)),
1316  ).tobytes()
1317 
1318 
1320  run_recording_dir: Path,
1321  queue: Queue[str | bytes | None],
1322  message_timeout: float = 5,
1323 ) -> None:
1324  wav_writer: wave.Wave_write | None = None
1325 
1326  try:
1327  _LOGGER.debug("Saving wake/stt audio to %s", run_recording_dir)
1328  run_recording_dir.mkdir(parents=True, exist_ok=True)
1329 
1330  while True:
1331  message = queue.get(timeout=message_timeout)
1332  if message is None:
1333  # Stop signal
1334  break
1335 
1336  if isinstance(message, str):
1337  # New WAV file name
1338  if wav_writer is not None:
1339  wav_writer.close()
1340 
1341  wav_path = run_recording_dir / f"{message}.wav"
1342  wav_writer = wave.open(str(wav_path), "wb")
1343  wav_writer.setframerate(SAMPLE_RATE)
1344  wav_writer.setsampwidth(SAMPLE_WIDTH)
1345  wav_writer.setnchannels(SAMPLE_CHANNELS)
1346  elif isinstance(message, bytes):
1347  # Chunk of 16-bit mono audio at 16Khz
1348  if wav_writer is not None:
1349  wav_writer.writeframes(message)
1350  except Empty:
1351  pass # occurs when pipeline has unexpected error
1352  except Exception:
1353  _LOGGER.exception("Unexpected error in debug recording thread")
1354  finally:
1355  if wav_writer is not None:
1356  wav_writer.close()
1357 
1358 
1359 @dataclass
1361  """Input to a pipeline run."""
1362 
1363  run: PipelineRun
1364 
1365  stt_metadata: stt.SpeechMetadata | None = None
1366  """Metadata of stt input audio. Required when start_stage = stt."""
1367 
1368  stt_stream: AsyncIterable[bytes] | None = None
1369  """Input audio for stt. Required when start_stage = stt."""
1370 
1371  wake_word_phrase: str | None = None
1372  """Optional key used to de-duplicate wake-ups for local wake word detection."""
1373 
1374  intent_input: str | None = None
1375  """Input for conversation agent. Required when start_stage = intent."""
1376 
1377  tts_input: str | None = None
1378  """Input for text-to-speech. Required when start_stage = tts."""
1379 
1380  conversation_id: str | None = None
1381 
1382  device_id: str | None = None
1383 
1384  async def execute(self) -> None:
1385  """Run pipeline."""
1386  self.run.start(device_id=self.device_id)
1387  current_stage: PipelineStage | None = self.run.start_stage
1388  stt_audio_buffer: list[EnhancedAudioChunk] = []
1389  stt_processed_stream: AsyncIterable[EnhancedAudioChunk] | None = None
1390 
1391  if self.stt_stream is not None:
1392  if self.run.audio_settings.needs_processor:
1393  # VAD/noise suppression/auto gain/volume
1394  stt_processed_stream = self.run.process_enhance_audio(self.stt_stream)
1395  else:
1396  # Volume multiplier only
1397  stt_processed_stream = self.run.process_volume_only(self.stt_stream)
1398 
1399  try:
1400  if current_stage == PipelineStage.WAKE_WORD:
1401  # wake-word-detection
1402  assert stt_processed_stream is not None
1403  detect_result = await self.run.wake_word_detection(
1404  stt_processed_stream, stt_audio_buffer
1405  )
1406  if detect_result is None:
1407  # No wake word. Abort the rest of the pipeline.
1408  return
1409 
1410  current_stage = PipelineStage.STT
1411 
1412  # speech-to-text
1413  intent_input = self.intent_input
1414  if current_stage == PipelineStage.STT:
1415  assert self.stt_metadata is not None
1416  assert stt_processed_stream is not None
1417 
1418  if self.wake_word_phrase is not None:
1419  # Avoid duplicate wake-ups by checking cooldown
1420  last_wake_up = self.run.hass.data[DATA_LAST_WAKE_UP].get(
1421  self.wake_word_phrase
1422  )
1423  if last_wake_up is not None:
1424  sec_since_last_wake_up = time.monotonic() - last_wake_up
1425  if sec_since_last_wake_up < WAKE_WORD_COOLDOWN:
1426  _LOGGER.debug(
1427  "Speech-to-text cancelled to avoid duplicate wake-up for %s",
1428  self.wake_word_phrase,
1429  )
1430  raise DuplicateWakeUpDetectedError(self.wake_word_phrase)
1431 
1432  # Record last wake up time to block duplicate detections
1433  self.run.hass.data[DATA_LAST_WAKE_UP][self.wake_word_phrase] = (
1434  time.monotonic()
1435  )
1436 
1437  stt_input_stream = stt_processed_stream
1438 
1439  if stt_audio_buffer:
1440  # Send audio in the buffer first to speech-to-text, then move on to stt_stream.
1441  # This is basically an async itertools.chain.
1442  async def buffer_then_audio_stream() -> (
1443  AsyncGenerator[EnhancedAudioChunk]
1444  ):
1445  # Buffered audio
1446  for chunk in stt_audio_buffer:
1447  yield chunk
1448 
1449  # Streamed audio
1450  assert stt_processed_stream is not None
1451  async for chunk in stt_processed_stream:
1452  yield chunk
1453 
1454  stt_input_stream = buffer_then_audio_stream()
1455 
1456  intent_input = await self.run.speech_to_text(
1457  self.stt_metadata,
1458  stt_input_stream,
1459  )
1460  current_stage = PipelineStage.INTENT
1461 
1462  if self.run.end_stage != PipelineStage.STT:
1463  tts_input = self.tts_input
1464 
1465  if current_stage == PipelineStage.INTENT:
1466  # intent-recognition
1467  assert intent_input is not None
1468  tts_input = await self.run.recognize_intent(
1469  intent_input,
1470  self.conversation_id,
1471  self.device_id,
1472  )
1473  if tts_input.strip():
1474  current_stage = PipelineStage.TTS
1475  else:
1476  # Skip TTS
1477  current_stage = PipelineStage.END
1478 
1479  if self.run.end_stage != PipelineStage.INTENT:
1480  # text-to-speech
1481  if current_stage == PipelineStage.TTS:
1482  assert tts_input is not None
1483  await self.run.text_to_speech(tts_input)
1484 
1485  except PipelineError as err:
1486  self.run.process_event(
1487  PipelineEvent(
1488  PipelineEventType.ERROR,
1489  {"code": err.code, "message": err.message},
1490  )
1491  )
1492  finally:
1493  # Always end the run since it needs to shut down the debug recording
1494  # thread, etc.
1495  await self.run.end()
1496 
1497  async def validate(self) -> None:
1498  """Validate pipeline input against start stage."""
1499  if self.run.start_stage in (PipelineStage.WAKE_WORD, PipelineStage.STT):
1500  if self.run.pipeline.stt_engine is None:
1502  "the pipeline does not support speech-to-text"
1503  )
1504  if self.stt_metadata is None:
1506  "stt_metadata is required for speech-to-text"
1507  )
1508  if self.stt_stream is None:
1510  "stt_stream is required for speech-to-text"
1511  )
1512  elif self.run.start_stage == PipelineStage.INTENT:
1513  if self.intent_input is None:
1515  "intent_input is required for intent recognition"
1516  )
1517  elif self.run.start_stage == PipelineStage.TTS:
1518  if self.tts_input is None:
1520  "tts_input is required for text-to-speech"
1521  )
1522  if self.run.end_stage == PipelineStage.TTS:
1523  if self.run.pipeline.tts_engine is None:
1525  "the pipeline does not support text-to-speech"
1526  )
1527 
1528  start_stage_index = PIPELINE_STAGE_ORDER.index(self.run.start_stage)
1529  end_stage_index = PIPELINE_STAGE_ORDER.index(self.run.end_stage)
1530 
1531  prepare_tasks = []
1532 
1533  if (
1534  start_stage_index
1535  <= PIPELINE_STAGE_ORDER.index(PipelineStage.WAKE_WORD)
1536  <= end_stage_index
1537  ):
1538  prepare_tasks.append(self.run.prepare_wake_word_detection())
1539 
1540  if (
1541  start_stage_index
1542  <= PIPELINE_STAGE_ORDER.index(PipelineStage.STT)
1543  <= end_stage_index
1544  ):
1545  # self.stt_metadata can't be None or we'd raise above
1546  prepare_tasks.append(self.run.prepare_speech_to_text(self.stt_metadata)) # type: ignore[arg-type]
1547 
1548  if (
1549  start_stage_index
1550  <= PIPELINE_STAGE_ORDER.index(PipelineStage.INTENT)
1551  <= end_stage_index
1552  ):
1553  prepare_tasks.append(self.run.prepare_recognize_intent())
1554 
1555  if (
1556  start_stage_index
1557  <= PIPELINE_STAGE_ORDER.index(PipelineStage.TTS)
1558  <= end_stage_index
1559  ):
1560  prepare_tasks.append(self.run.prepare_text_to_speech())
1561 
1562  if prepare_tasks:
1563  await asyncio.gather(*prepare_tasks)
1564 
1565 
1567  """Raised when attempting to delete the preferred pipelen."""
1568 
1569  def __init__(self, item_id: str) -> None:
1570  """Initialize pipeline preferred error."""
1571  super().__init__(f"Item {item_id} preferred.")
1572  self.item_iditem_id = item_id
1573 
1574 
1576  """Serialized pipeline storage collection."""
1577 
1578  preferred_item: str
1579 
1580 
1582  StorageCollection[Pipeline, SerializedPipelineStorageCollection]
1583 ):
1584  """Pipeline storage collection."""
1585 
1586  _preferred_item: str
1587 
1588  async def _async_load_data(self) -> SerializedPipelineStorageCollection | None:
1589  """Load the data."""
1590  if not (data := await super()._async_load_data()):
1591  pipeline = await _async_create_default_pipeline(self.hass, self)
1592  self._preferred_item_preferred_item = pipeline.id
1593  return data
1594 
1595  self._preferred_item_preferred_item = data["preferred_item"]
1596 
1597  return data
1598 
1599  async def _process_create_data(self, data: dict) -> dict:
1600  """Validate the config is valid."""
1601  validated_data: dict = validate_language(data)
1602  return validated_data
1603 
1604  @callback
1605  def _get_suggested_id(self, info: dict) -> str:
1606  """Suggest an ID based on the config."""
1607  return ulid_util.ulid_now()
1608 
1609  async def _update_data(self, item: Pipeline, update_data: dict) -> Pipeline:
1610  """Return a new updated item."""
1611  update_data = validate_language(update_data)
1612  return Pipeline(id=item.id, **update_data)
1613 
1614  def _create_item(self, item_id: str, data: dict) -> Pipeline:
1615  """Create an item from validated config."""
1616  return Pipeline(id=item_id, **data)
1617 
1618  def _deserialize_item(self, data: dict) -> Pipeline:
1619  """Create an item from its serialized representation."""
1620  return Pipeline.from_json(data)
1621 
1622  def _serialize_item(self, item_id: str, item: Pipeline) -> dict:
1623  """Return the serialized representation of an item for storing."""
1624  return item.to_json()
1625 
1626  async def async_delete_item(self, item_id: str) -> None:
1627  """Delete item."""
1628  if self._preferred_item_preferred_item == item_id:
1629  raise PipelinePreferred(item_id)
1630  await super().async_delete_item(item_id)
1631 
1632  @callback
1633  def async_get_preferred_item(self) -> str:
1634  """Get the id of the preferred item."""
1635  return self._preferred_item_preferred_item
1636 
1637  @callback
1638  def async_set_preferred_item(self, item_id: str) -> None:
1639  """Set the preferred pipeline."""
1640  if item_id not in self.data:
1641  raise ItemNotFound(item_id)
1642  self._preferred_item_preferred_item = item_id
1643  self._async_schedule_save()
1644 
1645  @callback
1646  def _data_to_save(self) -> SerializedPipelineStorageCollection:
1647  """Return JSON-compatible date for storing to file."""
1648  base_data = super()._base_data_to_save()
1649  return {
1650  "items": base_data["items"],
1651  "preferred_item": self._preferred_item_preferred_item,
1652  }
1653 
1654 
1656  StorageCollectionWebsocket[PipelineStorageCollection]
1657 ):
1658  """Class to expose storage collection management over websocket."""
1659 
1660  @callback
1661  def async_setup(self, hass: HomeAssistant) -> None:
1662  """Set up the websocket commands."""
1663  super().async_setup(hass)
1664 
1665  websocket_api.async_register_command(
1666  hass,
1667  f"{self.api_prefix}/get",
1668  self.ws_get_itemws_get_item,
1669  websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
1670  {
1671  vol.Required("type"): f"{self.api_prefix}/get",
1672  vol.Optional(self.item_id_key): str,
1673  }
1674  ),
1675  )
1676 
1677  websocket_api.async_register_command(
1678  hass,
1679  f"{self.api_prefix}/set_preferred",
1680  websocket_api.require_admin(
1681  websocket_api.async_response(self.ws_set_preferred_itemws_set_preferred_item)
1682  ),
1683  websocket_api.BASE_COMMAND_MESSAGE_SCHEMA.extend(
1684  {
1685  vol.Required("type"): f"{self.api_prefix}/set_preferred",
1686  vol.Required(self.item_id_key): str,
1687  }
1688  ),
1689  )
1690 
1691  async def ws_delete_item(
1692  self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1693  ) -> None:
1694  """Delete an item."""
1695  try:
1696  await super().ws_delete_item(hass, connection, msg)
1697  except PipelinePreferred as exc:
1698  connection.send_error(msg["id"], websocket_api.ERR_NOT_ALLOWED, str(exc))
1699 
1700  @callback
1702  self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1703  ) -> None:
1704  """Get an item."""
1705  item_id = msg.get(self.item_id_key)
1706  if item_id is None:
1707  item_id = self.storage_collection.async_get_preferred_item()
1708 
1709  if item_id.startswith("conversation.") and hass.states.get(item_id):
1710  connection.send_result(
1711  msg["id"], _async_get_pipeline_from_conversation_entity(hass, item_id)
1712  )
1713  return
1714 
1715  if item_id not in self.storage_collection.data:
1716  connection.send_error(
1717  msg["id"],
1718  websocket_api.ERR_NOT_FOUND,
1719  f"Unable to find {self.item_id_key} {item_id}",
1720  )
1721  return
1722 
1723  connection.send_result(msg["id"], self.storage_collection.data[item_id])
1724 
1725  @callback
1727  self, hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1728  ) -> None:
1729  """List items."""
1730  connection.send_result(
1731  msg["id"],
1732  {
1733  "pipelines": async_get_pipelines(hass),
1734  "preferred_pipeline": self.storage_collection.async_get_preferred_item(),
1735  },
1736  )
1737 
1739  self,
1740  hass: HomeAssistant,
1741  connection: websocket_api.ActiveConnection,
1742  msg: dict[str, Any],
1743  ) -> None:
1744  """Set the preferred item."""
1745  try:
1746  self.storage_collection.async_set_preferred_item(msg[self.item_id_key])
1747  except ItemNotFound:
1748  connection.send_error(
1749  msg["id"], websocket_api.ERR_NOT_FOUND, "unknown item"
1750  )
1751  return
1752  connection.send_result(msg["id"])
1753 
1754 
1756  """Class managing pipelineruns."""
1757 
1758  def __init__(self, pipeline_store: PipelineStorageCollection) -> None:
1759  """Initialize."""
1760  self._pipeline_runs: dict[str, dict[str, PipelineRun]] = defaultdict(dict)
1761  self._pipeline_store_pipeline_store = pipeline_store
1762  pipeline_store.async_add_listener(self._change_listener_change_listener)
1763 
1764  def add_run(self, pipeline_run: PipelineRun) -> None:
1765  """Add pipeline run."""
1766  pipeline_id = pipeline_run.pipeline.id
1767  self._pipeline_runs[pipeline_id][pipeline_run.id] = pipeline_run
1768 
1769  def remove_run(self, pipeline_run: PipelineRun) -> None:
1770  """Remove pipeline run."""
1771  pipeline_id = pipeline_run.pipeline.id
1772  self._pipeline_runs[pipeline_id].pop(pipeline_run.id)
1773 
1774  async def _change_listener(
1775  self, change_type: str, item_id: str, change: dict
1776  ) -> None:
1777  """Handle pipeline store changes."""
1778  if change_type != CHANGE_UPDATED:
1779  return
1780  if pipeline_runs := self._pipeline_runs.get(item_id):
1781  # Create a temporary list in case the list is modified while we iterate
1782  for pipeline_run in list(pipeline_runs.values()):
1783  pipeline_run.abort_wake_word_detection = True
1784 
1785 
1786 @dataclass(slots=True)
1788  """Audio capture queue for a satellite device."""
1789 
1790  queue: asyncio.Queue[bytes | None]
1791  """Queue of audio chunks (None = stop signal)"""
1792 
1793  id: str = field(default_factory=ulid_util.ulid_now)
1794  """Unique id to ensure the correct audio queue is cleaned up in websocket API."""
1795 
1796  overflow: bool = False
1797  """Flag to be set if audio samples were dropped because the queue was full."""
1798 
1799 
1800 @dataclass(slots=True)
1802  """Assist device."""
1803 
1804  domain: str
1805  unique_id_prefix: str
1806 
1807 
1809  """Store and debug data stored in hass.data."""
1810 
1811  def __init__(self, pipeline_store: PipelineStorageCollection) -> None:
1812  """Initialize."""
1813  self.pipeline_storepipeline_store = pipeline_store
1814  self.pipeline_debug: dict[str, LimitedSizeDict[str, PipelineRunDebug]] = {}
1815  self.pipeline_devices: dict[str, AssistDevice] = {}
1816  self.pipeline_runspipeline_runs = PipelineRuns(pipeline_store)
1817  self.device_audio_queues: dict[str, DeviceAudioQueue] = {}
1818 
1819 
1820 @dataclass(slots=True)
1822  """Debug data for a pipelinerun."""
1823 
1824  events: list[PipelineEvent] = field(default_factory=list, init=False)
1825  timestamp: str = field(
1826  default_factory=lambda: dt_util.utcnow().isoformat(),
1827  init=False,
1828  )
1829 
1830 
1831 class PipelineStore(Store[SerializedPipelineStorageCollection]):
1832  """Store entity registry data."""
1833 
1835  self,
1836  old_major_version: int,
1837  old_minor_version: int,
1838  old_data: SerializedPipelineStorageCollection,
1839  ) -> SerializedPipelineStorageCollection:
1840  """Migrate to the new version."""
1841  if old_major_version == 1 and old_minor_version < 2:
1842  # Version 1.2 adds wake word configuration
1843  for pipeline in old_data["items"]:
1844  # Populate keys which were introduced before version 1.2
1845  pipeline.setdefault("wake_word_entity", None)
1846  pipeline.setdefault("wake_word_id", None)
1847 
1848  if old_major_version > 1:
1849  raise NotImplementedError
1850  return old_data
1851 
1852 
1853 @singleton(DOMAIN)
1854 async def async_setup_pipeline_store(hass: HomeAssistant) -> PipelineData:
1855  """Set up the pipeline storage collection."""
1856  pipeline_store = PipelineStorageCollection(
1857  PipelineStore(
1858  hass, STORAGE_VERSION, STORAGE_KEY, minor_version=STORAGE_VERSION_MINOR
1859  )
1860  )
1861  await pipeline_store.async_load()
1863  pipeline_store,
1864  f"{DOMAIN}/pipeline",
1865  "pipeline",
1866  PIPELINE_FIELDS,
1867  PIPELINE_FIELDS,
1868  ).async_setup(hass)
1869  return PipelineData(pipeline_store)
1870 
1871 
1872 @callback
1874  hass: HomeAssistant,
1875  engine_type: Literal["conversation", "stt", "tts", "wake_word"],
1876  old_value: str,
1877  new_value: str,
1878 ) -> None:
1879  """Register a migration of an engine used in pipelines."""
1880  hass.data.setdefault(DATA_MIGRATIONS, {})[engine_type] = (old_value, new_value)
1881 
1882  # Run migrations when config is already loaded
1883  if DATA_CONFIG in hass.data:
1884  hass.async_create_background_task(
1885  async_run_migrations(hass), "assist_pipeline_migration", eager_start=True
1886  )
1887 
1888 
1889 async def async_run_migrations(hass: HomeAssistant) -> None:
1890  """Run pipeline migrations."""
1891  if not (migrations := hass.data.get(DATA_MIGRATIONS)):
1892  return
1893 
1894  engine_attr = {
1895  "conversation": "conversation_engine",
1896  "stt": "stt_engine",
1897  "tts": "tts_engine",
1898  "wake_word": "wake_word_entity",
1899  }
1900 
1901  updates = []
1902 
1903  for pipeline in async_get_pipelines(hass):
1904  attr_updates = {}
1905  for engine_type, (old_value, new_value) in migrations.items():
1906  if getattr(pipeline, engine_attr[engine_type]) == old_value:
1907  attr_updates[engine_attr[engine_type]] = new_value
1908 
1909  if attr_updates:
1910  updates.append((pipeline, attr_updates))
1911 
1912  for pipeline, attr_updates in updates:
1913  await async_update_pipeline(hass, pipeline, **attr_updates)
None __init__(self, PipelineStage start_stage, PipelineStage end_stage)
Definition: pipeline.py:480
None __init__(self, PipelineStorageCollection pipeline_store)
Definition: pipeline.py:1811
AsyncGenerator[EnhancedAudioChunk] process_volume_only(self, AsyncIterable[bytes] audio_stream)
Definition: pipeline.py:1267
str speech_to_text(self, stt.SpeechMetadata metadata, AsyncIterable[EnhancedAudioChunk] stream)
Definition: pipeline.py:876
str recognize_intent(self, str intent_input, str|None conversation_id, str|None device_id)
Definition: pipeline.py:1007
AsyncGenerator[EnhancedAudioChunk] process_enhance_audio(self, AsyncIterable[bytes] audio_stream)
Definition: pipeline.py:1286
AsyncIterable[tuple[bytes, int]] _wake_word_audio_stream(self, AsyncIterable[EnhancedAudioChunk] audio_stream, deque[EnhancedAudioChunk]|None stt_audio_buffer, VoiceActivityTimeout|None wake_word_vad, int sample_rate=SAMPLE_RATE, int sample_width=SAMPLE_WIDTH)
Definition: pipeline.py:818
None prepare_speech_to_text(self, stt.SpeechMetadata metadata)
Definition: pipeline.py:844
wake_word.DetectionResult|None wake_word_detection(self, AsyncIterable[EnhancedAudioChunk] stream, list[EnhancedAudioChunk] audio_chunks_for_stt)
Definition: pipeline.py:681
AsyncGenerator[bytes] _speech_to_text_stream(self, AsyncIterable[EnhancedAudioChunk] audio_stream, VoiceCommandSegmenter|None stt_vad, int sample_rate=SAMPLE_RATE, int sample_width=SAMPLE_WIDTH)
Definition: pipeline.py:959
None __init__(self, PipelineStorageCollection pipeline_store)
Definition: pipeline.py:1758
None _change_listener(self, str change_type, str item_id, dict change)
Definition: pipeline.py:1776
None ws_delete_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: pipeline.py:1693
None ws_list_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: pipeline.py:1728
None ws_get_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: pipeline.py:1703
None ws_set_preferred_item(self, HomeAssistant hass, websocket_api.ActiveConnection connection, dict[str, Any] msg)
Definition: pipeline.py:1743
SerializedPipelineStorageCollection|None _async_load_data(self)
Definition: pipeline.py:1588
Pipeline _update_data(self, Pipeline item, dict update_data)
Definition: pipeline.py:1609
SerializedPipelineStorageCollection _async_migrate_func(self, int old_major_version, int old_minor_version, SerializedPipelineStorageCollection old_data)
Definition: pipeline.py:1839
Pipeline from_json(cls, dict[str, Any] data)
Definition: pipeline.py:410
Any validate_language(dict[str, Any] data)
Definition: pipeline.py:93
bytes _multiply_volume(bytes chunk, float volume_multiplier)
Definition: pipeline.py:1306
dict[str, str|None] _async_resolve_default_pipeline_settings(HomeAssistant hass, *str|None conversation_engine_id=None, str|None stt_engine_id=None, str|None tts_engine_id=None, str pipeline_name)
Definition: pipeline.py:129
Pipeline _async_create_default_pipeline(HomeAssistant hass, PipelineStorageCollection pipeline_store)
Definition: pipeline.py:226
Pipeline|None async_create_default_pipeline(HomeAssistant hass, str stt_engine_id, str tts_engine_id, str pipeline_name)
Definition: pipeline.py:243
None _pipeline_debug_recording_thread_proc(Path run_recording_dir, Queue[str|bytes|None] queue, float message_timeout=5)
Definition: pipeline.py:1323
None async_migrate_engine(HomeAssistant hass, Literal["conversation", "stt", "tts", "wake_word"] engine_type, str old_value, str new_value)
Definition: pipeline.py:1878
Pipeline async_get_pipeline(HomeAssistant hass, str|None pipeline_id=None)
Definition: pipeline.py:282
None async_update_pipeline(HomeAssistant hass, Pipeline pipeline, *str|UndefinedType conversation_engine=UNDEFINED, str|UndefinedType conversation_language=UNDEFINED, str|UndefinedType language=UNDEFINED, str|UndefinedType name=UNDEFINED, str|None|UndefinedType stt_engine=UNDEFINED, str|None|UndefinedType stt_language=UNDEFINED, str|None|UndefinedType tts_engine=UNDEFINED, str|None|UndefinedType tts_language=UNDEFINED, str|None|UndefinedType tts_voice=UNDEFINED, str|None|UndefinedType wake_word_entity=UNDEFINED, str|None|UndefinedType wake_word_id=UNDEFINED, bool|UndefinedType prefer_local_intents=UNDEFINED)
Definition: pipeline.py:328
PipelineData async_setup_pipeline_store(HomeAssistant hass)
Definition: pipeline.py:1854
Pipeline _async_get_pipeline_from_conversation_entity(HomeAssistant hass, str entity_id)
Definition: pipeline.py:268
list[Pipeline] async_get_pipelines(HomeAssistant hass)
Definition: pipeline.py:305
Iterable[bytes] chunk_samples(bytes samples, int bytes_per_chunk, AudioBuffer leftover_chunk_buffer)
Definition: vad.py:297
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:81
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
dict[str, Any] validate(SchemaCommonFlowHandler handler, dict[str, Any] user_input)
Definition: config_flow.py:27
SerializedStorageCollection _base_data_to_save(self)
Definition: collection.py:377
float _clamp(float color_component, float minimum=0, float maximum=255)
Definition: color.py:594