Home Assistant Unofficial Reference 2024.12.1
assist_satellite.py
Go to the documentation of this file.
1 """Support for assist satellites in ESPHome."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import AsyncIterable
7 from functools import partial
8 import io
9 from itertools import chain
10 import logging
11 import socket
12 from typing import Any, cast
13 import wave
14 
15 from aioesphomeapi import (
16  MediaPlayerFormatPurpose,
17  MediaPlayerSupportedFormat,
18  VoiceAssistantAnnounceFinished,
19  VoiceAssistantAudioSettings,
20  VoiceAssistantCommandFlag,
21  VoiceAssistantEventType,
22  VoiceAssistantFeature,
23  VoiceAssistantTimerEventType,
24 )
25 
26 from homeassistant.components import assist_satellite, tts
28  PipelineEvent,
29  PipelineEventType,
30  PipelineStage,
31 )
33  TimerEventType,
34  TimerInfo,
35  async_register_timer_handler,
36 )
37 from homeassistant.components.media_player import async_process_play_media_url
38 from homeassistant.config_entries import ConfigEntry
39 from homeassistant.const import Platform
40 from homeassistant.core import HomeAssistant, callback
41 from homeassistant.helpers import entity_registry as er
42 from homeassistant.helpers.entity_platform import AddEntitiesCallback
43 
44 from .const import DOMAIN
45 from .entity import EsphomeAssistEntity
46 from .entry_data import ESPHomeConfigEntry, RuntimeEntryData
47 from .enum_mapper import EsphomeEnumMapper
48 from .ffmpeg_proxy import async_create_proxy_url
49 
50 _LOGGER = logging.getLogger(__name__)
51 
52 _VOICE_ASSISTANT_EVENT_TYPES: EsphomeEnumMapper[
53  VoiceAssistantEventType, PipelineEventType
55  {
56  VoiceAssistantEventType.VOICE_ASSISTANT_ERROR: PipelineEventType.ERROR,
57  VoiceAssistantEventType.VOICE_ASSISTANT_RUN_START: PipelineEventType.RUN_START,
58  VoiceAssistantEventType.VOICE_ASSISTANT_RUN_END: PipelineEventType.RUN_END,
59  VoiceAssistantEventType.VOICE_ASSISTANT_STT_START: PipelineEventType.STT_START,
60  VoiceAssistantEventType.VOICE_ASSISTANT_STT_END: PipelineEventType.STT_END,
61  VoiceAssistantEventType.VOICE_ASSISTANT_INTENT_START: PipelineEventType.INTENT_START,
62  VoiceAssistantEventType.VOICE_ASSISTANT_INTENT_END: PipelineEventType.INTENT_END,
63  VoiceAssistantEventType.VOICE_ASSISTANT_TTS_START: PipelineEventType.TTS_START,
64  VoiceAssistantEventType.VOICE_ASSISTANT_TTS_END: PipelineEventType.TTS_END,
65  VoiceAssistantEventType.VOICE_ASSISTANT_WAKE_WORD_START: PipelineEventType.WAKE_WORD_START,
66  VoiceAssistantEventType.VOICE_ASSISTANT_WAKE_WORD_END: PipelineEventType.WAKE_WORD_END,
67  VoiceAssistantEventType.VOICE_ASSISTANT_STT_VAD_START: PipelineEventType.STT_VAD_START,
68  VoiceAssistantEventType.VOICE_ASSISTANT_STT_VAD_END: PipelineEventType.STT_VAD_END,
69  }
70 )
71 
72 _TIMER_EVENT_TYPES: EsphomeEnumMapper[VoiceAssistantTimerEventType, TimerEventType] = (
74  {
75  VoiceAssistantTimerEventType.VOICE_ASSISTANT_TIMER_STARTED: TimerEventType.STARTED,
76  VoiceAssistantTimerEventType.VOICE_ASSISTANT_TIMER_UPDATED: TimerEventType.UPDATED,
77  VoiceAssistantTimerEventType.VOICE_ASSISTANT_TIMER_CANCELLED: TimerEventType.CANCELLED,
78  VoiceAssistantTimerEventType.VOICE_ASSISTANT_TIMER_FINISHED: TimerEventType.FINISHED,
79  }
80  )
81 )
82 
83 _ANNOUNCEMENT_TIMEOUT_SEC = 5 * 60 # 5 minutes
84 _CONFIG_TIMEOUT_SEC = 5
85 
86 
88  hass: HomeAssistant,
89  entry: ESPHomeConfigEntry,
90  async_add_entities: AddEntitiesCallback,
91 ) -> None:
92  """Set up Assist satellite entity."""
93  entry_data = entry.runtime_data
94  assert entry_data.device_info is not None
95  if entry_data.device_info.voice_assistant_feature_flags_compat(
96  entry_data.api_version
97  ):
98  async_add_entities([EsphomeAssistSatellite(entry, entry_data)])
99 
100 
102  EsphomeAssistEntity, assist_satellite.AssistSatelliteEntity
103 ):
104  """Satellite running ESPHome."""
105 
107  key="assist_satellite", translation_key="assist_satellite"
108  )
109 
110  def __init__(
111  self,
112  config_entry: ConfigEntry,
113  entry_data: RuntimeEntryData,
114  ) -> None:
115  """Initialize satellite."""
116  super().__init__(entry_data)
117 
118  self.config_entryconfig_entry = config_entry
119  self.entry_dataentry_data = entry_data
120  self.clicli = self.entry_dataentry_data.client
121 
122  self._is_running_is_running: bool = True
123  self._pipeline_task_pipeline_task_pipeline_task: asyncio.Task | None = None
124  self._audio_queue: asyncio.Queue[bytes | None] = asyncio.Queue()
125  self._tts_streaming_task_tts_streaming_task: asyncio.Task | None = None
126  self._udp_server_udp_server: VoiceAssistantUDPServer | None = None
127 
128  # Empty config. Updated when added to HA.
130  available_wake_words=[], active_wake_words=[], max_active_wake_words=1
131  )
132 
133  @property
134  def pipeline_entity_id(self) -> str | None:
135  """Return the entity ID of the pipeline to use for the next conversation."""
136  assert self.entry_dataentry_data.device_info is not None
137  ent_reg = er.async_get(self.hasshass)
138  return ent_reg.async_get_entity_id(
139  Platform.SELECT,
140  DOMAIN,
141  f"{self.entry_data.device_info.mac_address}-pipeline",
142  )
143 
144  @property
145  def vad_sensitivity_entity_id(self) -> str | None:
146  """Return the entity ID of the VAD sensitivity to use for the next conversation."""
147  assert self.entry_dataentry_data.device_info is not None
148  ent_reg = er.async_get(self.hasshass)
149  return ent_reg.async_get_entity_id(
150  Platform.SELECT,
151  DOMAIN,
152  f"{self.entry_data.device_info.mac_address}-vad_sensitivity",
153  )
154 
155  @callback
157  self,
158  ) -> assist_satellite.AssistSatelliteConfiguration:
159  """Get the current satellite configuration."""
160  return self._satellite_config_satellite_config
161 
163  self, config: assist_satellite.AssistSatelliteConfiguration
164  ) -> None:
165  """Set the current satellite configuration."""
166  await self.clicli.set_voice_assistant_configuration(
167  active_wake_words=config.active_wake_words
168  )
169  _LOGGER.debug("Set active wake words: %s", config.active_wake_words)
170 
171  # Ensure configuration is updated
172  await self._update_satellite_config_update_satellite_config()
173 
174  async def _update_satellite_config(self) -> None:
175  """Get the latest satellite configuration from the device."""
176  try:
177  config = await self.clicli.get_voice_assistant_configuration(
178  _CONFIG_TIMEOUT_SEC
179  )
180  except TimeoutError:
181  # Placeholder config will be used
182  return
183 
184  # Update available/active wake words
185  self._satellite_config_satellite_config.available_wake_words = [
187  id=model.id,
188  wake_word=model.wake_word,
189  trained_languages=list(model.trained_languages),
190  )
191  for model in config.available_wake_words
192  ]
193  self._satellite_config_satellite_config.active_wake_words = list(config.active_wake_words)
194  self._satellite_config_satellite_config.max_active_wake_words = config.max_active_wake_words
195  _LOGGER.debug("Received satellite configuration: %s", self._satellite_config_satellite_config)
196 
197  # Inform listeners that config has been updated
198  self.entry_dataentry_data.async_assist_satellite_config_updated(self._satellite_config_satellite_config)
199 
200  async def async_added_to_hass(self) -> None:
201  """Run when entity about to be added to hass."""
202  await super().async_added_to_hass()
203 
204  assert self.entry_dataentry_data.device_info is not None
205  feature_flags = (
206  self.entry_dataentry_data.device_info.voice_assistant_feature_flags_compat(
207  self.entry_dataentry_data.api_version
208  )
209  )
210  if feature_flags & VoiceAssistantFeature.API_AUDIO:
211  # TCP audio
212  self.async_on_removeasync_on_remove(
213  self.clicli.subscribe_voice_assistant(
214  handle_start=self.handle_pipeline_starthandle_pipeline_start,
215  handle_stop=self.handle_pipeline_stophandle_pipeline_stop,
216  handle_audio=self.handle_audiohandle_audio,
217  handle_announcement_finished=self.handle_announcement_finishedhandle_announcement_finished,
218  )
219  )
220  else:
221  # UDP audio
222  self.async_on_removeasync_on_remove(
223  self.clicli.subscribe_voice_assistant(
224  handle_start=self.handle_pipeline_starthandle_pipeline_start,
225  handle_stop=self.handle_pipeline_stophandle_pipeline_stop,
226  handle_announcement_finished=self.handle_announcement_finishedhandle_announcement_finished,
227  )
228  )
229 
230  if feature_flags & VoiceAssistantFeature.TIMERS:
231  # Device supports timers
232  assert (self.registry_entryregistry_entry is not None) and (
233  self.registry_entryregistry_entry.device_id is not None
234  )
235  self.async_on_removeasync_on_remove(
237  self.hasshass, self.registry_entryregistry_entry.device_id, self.handle_timer_eventhandle_timer_event
238  )
239  )
240 
241  if feature_flags & VoiceAssistantFeature.ANNOUNCE:
242  # Device supports announcements
243  self._attr_supported_features_attr_supported_features |= (
244  assist_satellite.AssistSatelliteEntityFeature.ANNOUNCE
245  )
246 
247  # Block until config is retrieved.
248  # If the device supports announcements, it will return a config.
249  _LOGGER.debug("Waiting for satellite configuration")
250  await self._update_satellite_config_update_satellite_config()
251 
252  if not (feature_flags & VoiceAssistantFeature.SPEAKER):
253  # Will use media player for TTS/announcements
254  self._update_tts_format_update_tts_format()
255 
256  # Update wake word select when config is updated
257  self.async_on_removeasync_on_remove(
258  self.entry_dataentry_data.async_register_assist_satellite_set_wake_word_callback(
259  self.async_set_wake_wordasync_set_wake_word
260  )
261  )
262 
263  async def async_will_remove_from_hass(self) -> None:
264  """Run when entity will be removed from hass."""
265  await super().async_will_remove_from_hass()
266 
267  self._is_running_is_running = False
268  self._stop_pipeline_stop_pipeline()
269 
270  def on_pipeline_event(self, event: PipelineEvent) -> None:
271  """Handle pipeline events."""
272  try:
273  event_type = _VOICE_ASSISTANT_EVENT_TYPES.from_hass(event.type)
274  except KeyError:
275  _LOGGER.debug("Received unknown pipeline event type: %s", event.type)
276  return
277 
278  data_to_send: dict[str, Any] = {}
279  if event_type == VoiceAssistantEventType.VOICE_ASSISTANT_STT_START:
280  self.entry_dataentry_data.async_set_assist_pipeline_state(True)
281  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_STT_END:
282  assert event.data is not None
283  data_to_send = {"text": event.data["stt_output"]["text"]}
284  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_INTENT_END:
285  assert event.data is not None
286  data_to_send = {
287  "conversation_id": event.data["intent_output"]["conversation_id"] or "",
288  }
289  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_TTS_START:
290  assert event.data is not None
291  data_to_send = {"text": event.data["tts_input"]}
292  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_TTS_END:
293  assert event.data is not None
294  if tts_output := event.data["tts_output"]:
295  path = tts_output["url"]
296  url = async_process_play_media_url(self.hasshass, path)
297  data_to_send = {"url": url}
298 
299  assert self.entry_dataentry_data.device_info is not None
300  feature_flags = (
301  self.entry_dataentry_data.device_info.voice_assistant_feature_flags_compat(
302  self.entry_dataentry_data.api_version
303  )
304  )
305  if feature_flags & VoiceAssistantFeature.SPEAKER:
306  media_id = tts_output["media_id"]
307  self._tts_streaming_task_tts_streaming_task = (
308  self.config_entryconfig_entry.async_create_background_task(
309  self.hasshass,
310  self._stream_tts_audio_stream_tts_audio(media_id),
311  "esphome_voice_assistant_tts",
312  )
313  )
314  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_WAKE_WORD_END:
315  assert event.data is not None
316  if not event.data["wake_word_output"]:
317  event_type = VoiceAssistantEventType.VOICE_ASSISTANT_ERROR
318  data_to_send = {
319  "code": "no_wake_word",
320  "message": "No wake word detected",
321  }
322  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_ERROR:
323  assert event.data is not None
324  data_to_send = {
325  "code": event.data["code"],
326  "message": event.data["message"],
327  }
328  elif event_type == VoiceAssistantEventType.VOICE_ASSISTANT_RUN_END:
329  if self._tts_streaming_task_tts_streaming_task is None:
330  # No TTS
331  self.entry_dataentry_data.async_set_assist_pipeline_state(False)
332 
333  self.clicli.send_voice_assistant_event(event_type, data_to_send)
334 
335  async def async_announce(
336  self, announcement: assist_satellite.AssistSatelliteAnnouncement
337  ) -> None:
338  """Announce media on the satellite.
339 
340  Should block until the announcement is done playing.
341  """
342  _LOGGER.debug(
343  "Waiting for announcement to finished (message=%s, media_id=%s)",
344  announcement.message,
345  announcement.media_id,
346  )
347  media_id = announcement.media_id
348  if announcement.media_id_source != "tts":
349  # Route non-TTS media through the proxy
350  format_to_use: MediaPlayerSupportedFormat | None = None
351  for supported_format in chain(
352  *self.entry_dataentry_data.media_player_formats.values()
353  ):
354  if supported_format.purpose == MediaPlayerFormatPurpose.ANNOUNCEMENT:
355  format_to_use = supported_format
356  break
357 
358  if format_to_use is not None:
359  assert (self.registry_entryregistry_entry is not None) and (
360  self.registry_entryregistry_entry.device_id is not None
361  )
362  proxy_url = async_create_proxy_url(
363  self.hasshass,
364  self.registry_entryregistry_entry.device_id,
365  media_id,
366  media_format=format_to_use.format,
367  rate=format_to_use.sample_rate or None,
368  channels=format_to_use.num_channels or None,
369  width=format_to_use.sample_bytes or None,
370  )
371  media_id = async_process_play_media_url(self.hasshass, proxy_url)
372 
373  await self.clicli.send_voice_assistant_announcement_await_response(
374  media_id, _ANNOUNCEMENT_TIMEOUT_SEC, announcement.message
375  )
376 
378  self,
379  conversation_id: str,
380  flags: int,
381  audio_settings: VoiceAssistantAudioSettings,
382  wake_word_phrase: str | None,
383  ) -> int | None:
384  """Handle pipeline run request."""
385  # Clear audio queue
386  while not self._audio_queue.empty():
387  await self._audio_queue.get()
388 
389  if self._tts_streaming_task_tts_streaming_task is not None:
390  # Cancel current TTS response
391  self._tts_streaming_task_tts_streaming_task.cancel()
392  self._tts_streaming_task_tts_streaming_task = None
393 
394  # API or UDP output audio
395  port: int = 0
396  assert self.entry_dataentry_data.device_info is not None
397  feature_flags = (
398  self.entry_dataentry_data.device_info.voice_assistant_feature_flags_compat(
399  self.entry_dataentry_data.api_version
400  )
401  )
402  if (feature_flags & VoiceAssistantFeature.SPEAKER) and not (
403  feature_flags & VoiceAssistantFeature.API_AUDIO
404  ):
405  port = await self._start_udp_server_start_udp_server()
406  _LOGGER.debug("Started UDP server on port %s", port)
407 
408  # Device triggered pipeline (wake word, etc.)
409  if flags & VoiceAssistantCommandFlag.USE_WAKE_WORD:
410  start_stage = PipelineStage.WAKE_WORD
411  else:
412  start_stage = PipelineStage.STT
413 
414  end_stage = PipelineStage.TTS
415 
416  if feature_flags & VoiceAssistantFeature.SPEAKER:
417  # Stream WAV audio
418  self._attr_tts_options_attr_tts_options = {
419  tts.ATTR_PREFERRED_FORMAT: "wav",
420  tts.ATTR_PREFERRED_SAMPLE_RATE: 16000,
421  tts.ATTR_PREFERRED_SAMPLE_CHANNELS: 1,
422  tts.ATTR_PREFERRED_SAMPLE_BYTES: 2,
423  }
424  else:
425  # ANNOUNCEMENT format from media player
426  self._update_tts_format_update_tts_format()
427 
428  # Run the pipeline
429  _LOGGER.debug("Running pipeline from %s to %s", start_stage, end_stage)
430  self._pipeline_task_pipeline_task_pipeline_task = self.config_entryconfig_entry.async_create_background_task(
431  self.hasshass,
432  self.async_accept_pipeline_from_satelliteasync_accept_pipeline_from_satellite(
433  audio_stream=self._wrap_audio_stream_wrap_audio_stream(),
434  start_stage=start_stage,
435  end_stage=end_stage,
436  wake_word_phrase=wake_word_phrase,
437  ),
438  "esphome_assist_satellite_pipeline",
439  )
440  self._pipeline_task_pipeline_task_pipeline_task.add_done_callback(
441  lambda _future: self.handle_pipeline_finishedhandle_pipeline_finished()
442  )
443 
444  return port
445 
446  async def handle_audio(self, data: bytes) -> None:
447  """Handle incoming audio chunk from API."""
448  self._audio_queue.put_nowait(data)
449 
450  async def handle_pipeline_stop(self, abort: bool) -> None:
451  """Handle request for pipeline to stop."""
452  if abort:
453  self._abort_pipeline_abort_pipeline()
454  else:
455  self._stop_pipeline_stop_pipeline()
456 
457  def handle_pipeline_finished(self) -> None:
458  """Handle when pipeline has finished running."""
459  self._stop_udp_server_stop_udp_server()
460  _LOGGER.debug("Pipeline finished")
461 
463  self, event_type: TimerEventType, timer_info: TimerInfo
464  ) -> None:
465  """Handle timer events."""
466  try:
467  native_event_type = _TIMER_EVENT_TYPES.from_hass(event_type)
468  except KeyError:
469  _LOGGER.debug("Received unknown timer event type: %s", event_type)
470  return
471 
472  self.clicli.send_voice_assistant_timer_event(
473  native_event_type,
474  timer_info.id,
475  timer_info.name,
476  timer_info.created_seconds,
477  timer_info.seconds_left,
478  timer_info.is_active,
479  )
480 
482  self, announce_finished: VoiceAssistantAnnounceFinished
483  ) -> None:
484  """Handle announcement finished message (also sent for TTS)."""
485  self.tts_response_finishedtts_response_finished()
486 
487  @callback
488  def async_set_wake_word(self, wake_word_id: str) -> None:
489  """Set active wake word and update config on satellite."""
490  self._satellite_config_satellite_config.active_wake_words = [wake_word_id]
491  self.config_entryconfig_entry.async_create_background_task(
492  self.hasshass,
493  self.async_set_configurationasync_set_configurationasync_set_configuration(self._satellite_config_satellite_config),
494  "esphome_voice_assistant_set_config",
495  )
496  _LOGGER.debug("Setting active wake word: %s", wake_word_id)
497 
498  def _update_tts_format(self) -> None:
499  """Update the TTS format from the first media player."""
500  for supported_format in chain(*self.entry_dataentry_data.media_player_formats.values()):
501  # Find first announcement format
502  if supported_format.purpose == MediaPlayerFormatPurpose.ANNOUNCEMENT:
503  self._attr_tts_options_attr_tts_options = {
504  tts.ATTR_PREFERRED_FORMAT: supported_format.format,
505  }
506 
507  if supported_format.sample_rate > 0:
508  self._attr_tts_options_attr_tts_options[tts.ATTR_PREFERRED_SAMPLE_RATE] = (
509  supported_format.sample_rate
510  )
511 
512  if supported_format.sample_rate > 0:
513  self._attr_tts_options_attr_tts_options[tts.ATTR_PREFERRED_SAMPLE_CHANNELS] = (
514  supported_format.num_channels
515  )
516 
517  if supported_format.sample_rate > 0:
518  self._attr_tts_options_attr_tts_options[tts.ATTR_PREFERRED_SAMPLE_BYTES] = (
519  supported_format.sample_bytes
520  )
521 
522  break
523 
524  async def _stream_tts_audio(
525  self,
526  media_id: str,
527  sample_rate: int = 16000,
528  sample_width: int = 2,
529  sample_channels: int = 1,
530  samples_per_chunk: int = 512,
531  ) -> None:
532  """Stream TTS audio chunks to device via API or UDP."""
533  self.clicli.send_voice_assistant_event(
534  VoiceAssistantEventType.VOICE_ASSISTANT_TTS_STREAM_START, {}
535  )
536 
537  try:
538  if not self._is_running_is_running:
539  return
540 
541  extension, data = await tts.async_get_media_source_audio(
542  self.hasshass,
543  media_id,
544  )
545 
546  if extension != "wav":
547  _LOGGER.error("Only WAV audio can be streamed, got %s", extension)
548  return
549 
550  with io.BytesIO(data) as wav_io, wave.open(wav_io, "rb") as wav_file:
551  if (
552  (wav_file.getframerate() != sample_rate)
553  or (wav_file.getsampwidth() != sample_width)
554  or (wav_file.getnchannels() != sample_channels)
555  ):
556  _LOGGER.error("Can only stream 16Khz 16-bit mono WAV")
557  return
558 
559  _LOGGER.debug("Streaming %s audio samples", wav_file.getnframes())
560 
561  while self._is_running_is_running:
562  chunk = wav_file.readframes(samples_per_chunk)
563  if not chunk:
564  break
565 
566  if self._udp_server_udp_server is not None:
567  self._udp_server_udp_server.send_audio_bytes(chunk)
568  else:
569  self.clicli.send_voice_assistant_audio(chunk)
570 
571  # Wait for 90% of the duration of the audio that was
572  # sent for it to be played. This will overrun the
573  # device's buffer for very long audio, so using a media
574  # player is preferred.
575  samples_in_chunk = len(chunk) // (sample_width * sample_channels)
576  seconds_in_chunk = samples_in_chunk / sample_rate
577  await asyncio.sleep(seconds_in_chunk * 0.9)
578  except asyncio.CancelledError:
579  return # Don't trigger state change
580  finally:
581  self.clicli.send_voice_assistant_event(
582  VoiceAssistantEventType.VOICE_ASSISTANT_TTS_STREAM_END, {}
583  )
584 
585  # State change
586  self.tts_response_finishedtts_response_finished()
587  self.entry_dataentry_data.async_set_assist_pipeline_state(False)
588 
589  async def _wrap_audio_stream(self) -> AsyncIterable[bytes]:
590  """Yield audio chunks from the queue until None."""
591  while True:
592  chunk = await self._audio_queue.get()
593  if not chunk:
594  break
595 
596  yield chunk
597 
598  def _stop_pipeline(self) -> None:
599  """Request pipeline to be stopped by ending the audio stream and continue processing."""
600  self._audio_queue.put_nowait(None)
601  _LOGGER.debug("Requested pipeline stop")
602 
603  def _abort_pipeline(self) -> None:
604  """Request pipeline to be aborted (no further processing)."""
605  _LOGGER.debug("Requested pipeline abort")
606  self._audio_queue.put_nowait(None)
607  if self._pipeline_task_pipeline_task_pipeline_task is not None:
608  self._pipeline_task_pipeline_task_pipeline_task.cancel()
609 
610  async def _start_udp_server(self) -> int:
611  """Start a UDP server on a random free port."""
612  sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
613  sock.setblocking(False)
614  sock.bind(("", 0)) # random free port
615 
616  (
617  _transport,
618  protocol,
619  ) = await asyncio.get_running_loop().create_datagram_endpoint(
620  partial(VoiceAssistantUDPServer, self._audio_queue), sock=sock
621  )
622 
623  assert isinstance(protocol, VoiceAssistantUDPServer)
624  self._udp_server_udp_server = protocol
625 
626  # Return port
627  return cast(int, sock.getsockname()[1])
628 
629  def _stop_udp_server(self) -> None:
630  """Stop the UDP server if it's running."""
631  if self._udp_server_udp_server is None:
632  return
633 
634  try:
635  self._udp_server_udp_server.close()
636  finally:
637  self._udp_server_udp_server = None
638 
639  _LOGGER.debug("Stopped UDP server")
640 
641 
642 class VoiceAssistantUDPServer(asyncio.DatagramProtocol):
643  """Receive UDP packets and forward them to the audio queue."""
644 
645  transport: asyncio.DatagramTransport | None = None
646  remote_addr: tuple[str, int] | None = None
647 
648  def __init__(
649  self, audio_queue: asyncio.Queue[bytes | None], *args: Any, **kwargs: Any
650  ) -> None:
651  """Initialize protocol."""
652  super().__init__(*args, **kwargs)
653  self._audio_queue_audio_queue = audio_queue
654 
655  def connection_made(self, transport: asyncio.BaseTransport) -> None:
656  """Store transport for later use."""
657  self.transporttransport = cast(asyncio.DatagramTransport, transport)
658 
659  def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
660  """Handle incoming UDP packet."""
661  if self.remote_addrremote_addr is None:
662  self.remote_addrremote_addr = addr
663 
664  self._audio_queue_audio_queue.put_nowait(data)
665 
666  def error_received(self, exc: Exception) -> None:
667  """Handle when a send or receive operation raises an OSError.
668 
669  (Other than BlockingIOError or InterruptedError.)
670  """
671  _LOGGER.error("ESPHome Voice Assistant UDP server error received: %s", exc)
672 
673  # Stop pipeline
674  self._audio_queue_audio_queue.put_nowait(None)
675 
676  def close(self) -> None:
677  """Close the receiver."""
678  if self.transporttransport is not None:
679  self.transporttransport.close()
680 
681  self.remote_addrremote_addr = None
682 
683  def send_audio_bytes(self, data: bytes) -> None:
684  """Send bytes to the device via UDP."""
685  if self.transporttransport is None:
686  _LOGGER.error("No transport to send audio to")
687  return
688 
689  if self.remote_addrremote_addr is None:
690  _LOGGER.error("No address to send audio to")
691  return
692 
693  self.transporttransport.sendto(data, self.remote_addrremote_addr)
None async_set_configuration(self, AssistSatelliteConfiguration config)
Definition: entity.py:151
None async_accept_pipeline_from_satellite(self, AsyncIterable[bytes] audio_stream, PipelineStage start_stage=PipelineStage.STT, PipelineStage end_stage=PipelineStage.TTS, str|None wake_word_phrase=None)
Definition: entity.py:260
None handle_announcement_finished(self, VoiceAssistantAnnounceFinished announce_finished)
assist_satellite.AssistSatelliteConfiguration async_get_configuration(self)
None __init__(self, ConfigEntry config_entry, RuntimeEntryData entry_data)
None async_announce(self, assist_satellite.AssistSatelliteAnnouncement announcement)
None async_set_configuration(self, assist_satellite.AssistSatelliteConfiguration config)
int|None handle_pipeline_start(self, str conversation_id, int flags, VoiceAssistantAudioSettings audio_settings, str|None wake_word_phrase)
None _stream_tts_audio(self, str media_id, int sample_rate=16000, int sample_width=2, int sample_channels=1, int samples_per_chunk=512)
None handle_timer_event(self, TimerEventType event_type, TimerInfo timer_info)
None __init__(self, asyncio.Queue[bytes|None] audio_queue, *Any args, **Any kwargs)
None async_on_remove(self, CALLBACK_TYPE func)
Definition: entity.py:1331
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
None async_setup_entry(HomeAssistant hass, ESPHomeConfigEntry entry, AddEntitiesCallback async_add_entities)
str async_create_proxy_url(HomeAssistant hass, str device_id, str media_url, str media_format, int|None rate=None, int|None channels=None, int|None width=None)
Definition: ffmpeg_proxy.py:33
Callable[[], None] async_register_timer_handler(HomeAssistant hass, str device_id, TimerHandler handler)
Definition: timers.py:489
str async_process_play_media_url(HomeAssistant hass, str media_content_id, *bool allow_relative_url=False, bool for_supervisor_network=False)
Definition: browse_media.py:36