Home Assistant Unofficial Reference 2024.12.1
__init__.py
Go to the documentation of this file.
1 """Provide functionality for TTS."""
2 
3 from __future__ import annotations
4 
5 import asyncio
6 from collections.abc import Mapping
7 from datetime import datetime
8 from functools import partial
9 import hashlib
10 from http import HTTPStatus
11 import io
12 import logging
13 import mimetypes
14 import os
15 import re
16 import secrets
17 import subprocess
18 import tempfile
19 from typing import Any, Final, TypedDict, final
20 
21 from aiohttp import web
22 import mutagen
23 from mutagen.id3 import ID3, TextFrame as ID3Text
24 from propcache import cached_property
25 import voluptuous as vol
26 
27 from homeassistant.components import ffmpeg, websocket_api
28 from homeassistant.components.http import HomeAssistantView
30  ATTR_MEDIA_ANNOUNCE,
31  ATTR_MEDIA_CONTENT_ID,
32  ATTR_MEDIA_CONTENT_TYPE,
33  DOMAIN as DOMAIN_MP,
34  SERVICE_PLAY_MEDIA,
35  MediaType,
36 )
37 from homeassistant.config_entries import ConfigEntry
38 from homeassistant.const import (
39  ATTR_ENTITY_ID,
40  PLATFORM_FORMAT,
41  STATE_UNAVAILABLE,
42  STATE_UNKNOWN,
43 )
44 from homeassistant.core import HassJob, HomeAssistant, ServiceCall, callback
45 from homeassistant.exceptions import HomeAssistantError
47 from homeassistant.helpers.entity_component import EntityComponent
48 from homeassistant.helpers.event import async_call_later
49 from homeassistant.helpers.network import get_url
50 from homeassistant.helpers.restore_state import RestoreEntity
51 from homeassistant.helpers.typing import UNDEFINED, ConfigType
52 from homeassistant.util import dt as dt_util, language as language_util
53 
54 from .const import (
55  ATTR_CACHE,
56  ATTR_LANGUAGE,
57  ATTR_MESSAGE,
58  ATTR_OPTIONS,
59  CONF_CACHE,
60  CONF_CACHE_DIR,
61  CONF_TIME_MEMORY,
62  DATA_COMPONENT,
63  DATA_TTS_MANAGER,
64  DEFAULT_CACHE,
65  DEFAULT_CACHE_DIR,
66  DEFAULT_TIME_MEMORY,
67  DOMAIN,
68  TtsAudioType,
69 )
70 from .helper import get_engine_instance
71 from .legacy import PLATFORM_SCHEMA, PLATFORM_SCHEMA_BASE, Provider, async_setup_legacy
72 from .media_source import generate_media_source_id, media_source_id_to_kwargs
73 from .models import Voice
74 
75 __all__ = [
76  "async_default_engine",
77  "async_get_media_source_audio",
78  "async_support_options",
79  "ATTR_AUDIO_OUTPUT",
80  "ATTR_PREFERRED_FORMAT",
81  "ATTR_PREFERRED_SAMPLE_RATE",
82  "ATTR_PREFERRED_SAMPLE_CHANNELS",
83  "ATTR_PREFERRED_SAMPLE_BYTES",
84  "CONF_LANG",
85  "DEFAULT_CACHE_DIR",
86  "generate_media_source_id",
87  "PLATFORM_SCHEMA_BASE",
88  "PLATFORM_SCHEMA",
89  "SampleFormat",
90  "Provider",
91  "TtsAudioType",
92  "Voice",
93 ]
94 
95 _LOGGER = logging.getLogger(__name__)
96 
97 ATTR_PLATFORM = "platform"
98 ATTR_AUDIO_OUTPUT = "audio_output"
99 ATTR_PREFERRED_FORMAT = "preferred_format"
100 ATTR_PREFERRED_SAMPLE_RATE = "preferred_sample_rate"
101 ATTR_PREFERRED_SAMPLE_CHANNELS = "preferred_sample_channels"
102 ATTR_PREFERRED_SAMPLE_BYTES = "preferred_sample_bytes"
103 ATTR_MEDIA_PLAYER_ENTITY_ID = "media_player_entity_id"
104 ATTR_VOICE = "voice"
105 
106 _DEFAULT_FORMAT = "mp3"
107 _PREFFERED_FORMAT_OPTIONS: Final[set[str]] = {
108  ATTR_PREFERRED_FORMAT,
109  ATTR_PREFERRED_SAMPLE_RATE,
110  ATTR_PREFERRED_SAMPLE_CHANNELS,
111  ATTR_PREFERRED_SAMPLE_BYTES,
112 }
113 
114 CONF_LANG = "language"
115 
116 SERVICE_CLEAR_CACHE = "clear_cache"
117 
118 _RE_LEGACY_VOICE_FILE = re.compile(
119  r"([a-f0-9]{40})_([^_]+)_([^_]+)_([a-z_]+)\.[a-z0-9]{3,4}"
120 )
121 _RE_VOICE_FILE = re.compile(
122  r"([a-f0-9]{40})_([^_]+)_([^_]+)_(tts\.[a-z0-9_]+)\.[a-z0-9]{3,4}"
123 )
124 KEY_PATTERN = "{0}_{1}_{2}_{3}"
125 
126 SCHEMA_SERVICE_CLEAR_CACHE = vol.Schema({})
127 
128 
129 class TTSCache(TypedDict):
130  """Cached TTS file."""
131 
132  filename: str
133  voice: bytes
134  pending: asyncio.Task | None
135 
136 
137 @callback
138 def async_default_engine(hass: HomeAssistant) -> str | None:
139  """Return the domain or entity id of the default engine.
140 
141  Returns None if no engines found.
142  """
143  default_entity_id: str | None = None
144 
145  for entity in hass.data[DATA_COMPONENT].entities:
146  if entity.platform and entity.platform.platform_name == "cloud":
147  return entity.entity_id
148 
149  if default_entity_id is None:
150  default_entity_id = entity.entity_id
151 
152  return default_entity_id or next(iter(hass.data[DATA_TTS_MANAGER].providers), None)
153 
154 
155 @callback
156 def async_resolve_engine(hass: HomeAssistant, engine: str | None) -> str | None:
157  """Resolve engine.
158 
159  Returns None if no engines found or invalid engine passed in.
160  """
161  if engine is not None:
162  if (
163  not hass.data[DATA_COMPONENT].get_entity(engine)
164  and engine not in hass.data[DATA_TTS_MANAGER].providers
165  ):
166  return None
167  return engine
168 
169  return async_default_engine(hass)
170 
171 
173  hass: HomeAssistant,
174  engine: str,
175  language: str | None = None,
176  options: dict | None = None,
177 ) -> bool:
178  """Return if an engine supports options."""
179  if (engine_instance := get_engine_instance(hass, engine)) is None:
180  raise HomeAssistantError(f"Provider {engine} not found")
181 
182  try:
183  hass.data[DATA_TTS_MANAGER].process_options(engine_instance, language, options)
184  except HomeAssistantError:
185  return False
186 
187  return True
188 
189 
191  hass: HomeAssistant,
192  media_source_id: str,
193 ) -> tuple[str, bytes]:
194  """Get TTS audio as extension, data."""
195  return await hass.data[DATA_TTS_MANAGER].async_get_tts_audio(
196  **media_source_id_to_kwargs(media_source_id),
197  )
198 
199 
200 @callback
201 def async_get_text_to_speech_languages(hass: HomeAssistant) -> set[str]:
202  """Return a set with the union of languages supported by tts engines."""
203  languages = set()
204 
205  for entity in hass.data[DATA_COMPONENT].entities:
206  for language_tag in entity.supported_languages:
207  languages.add(language_tag)
208 
209  for tts_engine in hass.data[DATA_TTS_MANAGER].providers.values():
210  for language_tag in tts_engine.supported_languages:
211  languages.add(language_tag)
212 
213  return languages
214 
215 
217  hass: HomeAssistant,
218  from_extension: str,
219  audio_bytes: bytes,
220  to_extension: str,
221  to_sample_rate: int | None = None,
222  to_sample_channels: int | None = None,
223  to_sample_bytes: int | None = None,
224 ) -> bytes:
225  """Convert audio to a preferred format using ffmpeg."""
226  ffmpeg_manager = ffmpeg.get_ffmpeg_manager(hass)
227  return await hass.async_add_executor_job(
228  lambda: _convert_audio(
229  ffmpeg_manager.binary,
230  from_extension,
231  audio_bytes,
232  to_extension,
233  to_sample_rate=to_sample_rate,
234  to_sample_channels=to_sample_channels,
235  to_sample_bytes=to_sample_bytes,
236  )
237  )
238 
239 
241  ffmpeg_binary: str,
242  from_extension: str,
243  audio_bytes: bytes,
244  to_extension: str,
245  to_sample_rate: int | None = None,
246  to_sample_channels: int | None = None,
247  to_sample_bytes: int | None = None,
248 ) -> bytes:
249  """Convert audio to a preferred format using ffmpeg."""
250 
251  # We have to use a temporary file here because some formats like WAV store
252  # the length of the file in the header, and therefore cannot be written in a
253  # streaming fashion.
254  with tempfile.NamedTemporaryFile(
255  mode="wb+", suffix=f".{to_extension}"
256  ) as output_file:
257  # input
258  command = [
259  ffmpeg_binary,
260  "-y", # overwrite temp file
261  "-f",
262  from_extension,
263  "-i",
264  "pipe:", # input from stdin
265  ]
266 
267  # output
268  command.extend(["-f", to_extension])
269 
270  if to_sample_rate is not None:
271  command.extend(["-ar", str(to_sample_rate)])
272 
273  if to_sample_channels is not None:
274  command.extend(["-ac", str(to_sample_channels)])
275 
276  if to_extension == "mp3":
277  # Max quality for MP3
278  command.extend(["-q:a", "0"])
279 
280  if to_sample_bytes == 2:
281  # 16-bit samples
282  command.extend(["-sample_fmt", "s16"])
283 
284  command.append(output_file.name)
285 
286  with subprocess.Popen(
287  command, stdin=subprocess.PIPE, stderr=subprocess.PIPE
288  ) as proc:
289  _stdout, stderr = proc.communicate(input=audio_bytes)
290  if proc.returncode != 0:
291  _LOGGER.error(stderr.decode())
292  raise RuntimeError(
293  f"Unexpected error while running ffmpeg with arguments: {command}."
294  "See log for details."
295  )
296 
297  output_file.seek(0)
298  return output_file.read()
299 
300 
301 async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
302  """Set up TTS."""
303  websocket_api.async_register_command(hass, websocket_list_engines)
304  websocket_api.async_register_command(hass, websocket_get_engine)
305  websocket_api.async_register_command(hass, websocket_list_engine_voices)
306 
307  # Legacy config options
308  conf = config[DOMAIN][0] if config.get(DOMAIN) else {}
309  use_cache: bool = conf.get(CONF_CACHE, DEFAULT_CACHE)
310  cache_dir: str = conf.get(CONF_CACHE_DIR, DEFAULT_CACHE_DIR)
311  time_memory: int = conf.get(CONF_TIME_MEMORY, DEFAULT_TIME_MEMORY)
312 
313  tts = SpeechManager(hass, use_cache, cache_dir, time_memory)
314 
315  try:
316  await tts.async_init_cache()
317  except (HomeAssistantError, KeyError):
318  _LOGGER.exception("Error on cache init")
319  return False
320 
321  hass.data[DATA_TTS_MANAGER] = tts
322  component = hass.data[DATA_COMPONENT] = EntityComponent[TextToSpeechEntity](
323  _LOGGER, DOMAIN, hass
324  )
325 
326  component.register_shutdown()
327 
328  hass.http.register_view(TextToSpeechView(tts))
329  hass.http.register_view(TextToSpeechUrlView(tts))
330 
331  platform_setups = await async_setup_legacy(hass, config)
332 
333  component.async_register_entity_service(
334  "speak",
335  {
336  vol.Required(ATTR_MEDIA_PLAYER_ENTITY_ID): cv.comp_entity_ids,
337  vol.Required(ATTR_MESSAGE): cv.string,
338  vol.Optional(ATTR_CACHE, default=DEFAULT_CACHE): cv.boolean,
339  vol.Optional(ATTR_LANGUAGE): cv.string,
340  vol.Optional(ATTR_OPTIONS): dict,
341  },
342  "async_speak",
343  )
344 
345  async def async_clear_cache_handle(service: ServiceCall) -> None:
346  """Handle clear cache service call."""
347  await tts.async_clear_cache()
348 
349  hass.services.async_register(
350  DOMAIN,
351  SERVICE_CLEAR_CACHE,
352  async_clear_cache_handle,
353  schema=SCHEMA_SERVICE_CLEAR_CACHE,
354  )
355 
356  for setup in platform_setups:
357  # Tasks are created as tracked tasks to ensure startup
358  # waits for them to finish, but we explicitly do not
359  # want to wait for them to finish here because we want
360  # any config entries that use tts as a base platform
361  # to be able to start with out having to wait for the
362  # legacy platforms to finish setting up.
363  hass.async_create_task(setup, eager_start=True)
364 
365  return True
366 
367 
368 async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
369  """Set up a config entry."""
370  return await hass.data[DATA_COMPONENT].async_setup_entry(entry)
371 
372 
373 async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
374  """Unload a config entry."""
375  return await hass.data[DATA_COMPONENT].async_unload_entry(entry)
376 
377 
378 CACHED_PROPERTIES_WITH_ATTR_ = {
379  "default_language",
380  "default_options",
381  "supported_languages",
382  "supported_options",
383 }
384 
385 
386 class TextToSpeechEntity(RestoreEntity, cached_properties=CACHED_PROPERTIES_WITH_ATTR_):
387  """Represent a single TTS engine."""
388 
389  _attr_should_poll = False
390  __last_tts_loaded: str | None = None
391 
392  _attr_default_language: str
393  _attr_default_options: Mapping[str, Any] | None = None
394  _attr_supported_languages: list[str]
395  _attr_supported_options: list[str] | None = None
396 
397  @property
398  @final
399  def state(self) -> str | None:
400  """Return the state of the entity."""
401  if self.__last_tts_loaded__last_tts_loaded is None:
402  return None
403  return self.__last_tts_loaded__last_tts_loaded
404 
405  @cached_property
406  def supported_languages(self) -> list[str]:
407  """Return a list of supported languages."""
408  return self._attr_supported_languages
409 
410  @cached_property
411  def default_language(self) -> str:
412  """Return the default language."""
413  return self._attr_default_language
414 
415  @cached_property
416  def supported_options(self) -> list[str] | None:
417  """Return a list of supported options like voice, emotions."""
418  return self._attr_supported_options
419 
420  @cached_property
421  def default_options(self) -> Mapping[str, Any] | None:
422  """Return a mapping with the default options."""
423  return self._attr_default_options
424 
425  @callback
426  def async_get_supported_voices(self, language: str) -> list[Voice] | None:
427  """Return a list of supported voices for a language."""
428  return None
429 
430  async def async_internal_added_to_hass(self) -> None:
431  """Call when the entity is added to hass."""
432  await super().async_internal_added_to_hass()
433  try:
434  _ = self.default_languagedefault_language
435  except AttributeError as err:
436  raise AttributeError(
437  "TTS entities must either set the '_attr_default_language' attribute or override the 'default_language' property"
438  ) from err
439  try:
440  _ = self.supported_languagessupported_languages
441  except AttributeError as err:
442  raise AttributeError(
443  "TTS entities must either set the '_attr_supported_languages' attribute or override the 'supported_languages' property"
444  ) from err
445  state = await self.async_get_last_stateasync_get_last_state()
446  if (
447  state is not None
448  and state.state is not None
449  and state.state not in (STATE_UNAVAILABLE, STATE_UNKNOWN)
450  ):
451  self.__last_tts_loaded__last_tts_loaded = state.state
452 
453  async def async_speak(
454  self,
455  media_player_entity_id: list[str],
456  message: str,
457  cache: bool,
458  language: str | None = None,
459  options: dict | None = None,
460  ) -> None:
461  """Speak via a Media Player."""
462  await self.hasshass.services.async_call(
463  DOMAIN_MP,
464  SERVICE_PLAY_MEDIA,
465  {
466  ATTR_ENTITY_ID: media_player_entity_id,
467  ATTR_MEDIA_CONTENT_ID: generate_media_source_id(
468  self.hasshass,
469  message=message,
470  engine=self.entity_identity_id,
471  language=language,
472  options=options,
473  cache=cache,
474  ),
475  ATTR_MEDIA_CONTENT_TYPE: MediaType.MUSIC,
476  ATTR_MEDIA_ANNOUNCE: True,
477  },
478  blocking=True,
479  context=self._context_context,
480  )
481 
482  @final
484  self, message: str, language: str, options: dict[str, Any]
485  ) -> TtsAudioType:
486  """Process an audio stream to TTS service.
487 
488  Only streaming content is allowed!
489  """
490  self.__last_tts_loaded__last_tts_loaded = dt_util.utcnow().isoformat()
491  self.async_write_ha_stateasync_write_ha_state()
492  return await self.async_get_tts_audioasync_get_tts_audio(
493  message=message, language=language, options=options
494  )
495 
497  self, message: str, language: str, options: dict[str, Any]
498  ) -> TtsAudioType:
499  """Load tts audio file from the engine."""
500  raise NotImplementedError
501 
503  self, message: str, language: str, options: dict[str, Any]
504  ) -> TtsAudioType:
505  """Load tts audio file from the engine.
506 
507  Return a tuple of file extension and data as bytes.
508  """
509  return await self.hasshass.async_add_executor_job(
510  partial(self.get_tts_audioget_tts_audio, message, language, options=options)
511  )
512 
513 
514 def _hash_options(options: dict) -> str:
515  """Hashes an options dictionary."""
516  opts_hash = hashlib.blake2s(digest_size=5)
517  for key, value in sorted(options.items()):
518  opts_hash.update(str(key).encode())
519  opts_hash.update(str(value).encode())
520 
521  return opts_hash.hexdigest()
522 
523 
525  """Representation of a speech store."""
526 
527  def __init__(
528  self,
529  hass: HomeAssistant,
530  use_cache: bool,
531  cache_dir: str,
532  time_memory: int,
533  ) -> None:
534  """Initialize a speech store."""
535  self.hasshass = hass
536  self.providers: dict[str, Provider] = {}
537 
538  self.use_cacheuse_cache = use_cache
539  self.cache_dircache_dir = cache_dir
540  self.time_memorytime_memory = time_memory
541  self.file_cachefile_cache: dict[str, str] = {}
542  self.mem_cachemem_cache: dict[str, TTSCache] = {}
543 
544  # filename <-> token
545  self.filename_to_token: dict[str, str] = {}
546  self.token_to_filename: dict[str, str] = {}
547 
548  def _init_cache(self) -> dict[str, str]:
549  """Init cache folder and fetch files."""
550  try:
551  self.cache_dircache_dir = _init_tts_cache_dir(self.hasshass, self.cache_dircache_dir)
552  except OSError as err:
553  raise HomeAssistantError(f"Can't init cache dir {err}") from err
554 
555  try:
556  return _get_cache_files(self.cache_dircache_dir)
557  except OSError as err:
558  raise HomeAssistantError(f"Can't read cache dir {err}") from err
559 
560  async def async_init_cache(self) -> None:
561  """Init config folder and load file cache."""
562  self.file_cachefile_cache.update(await self.hasshass.async_add_executor_job(self._init_cache_init_cache))
563 
564  async def async_clear_cache(self) -> None:
565  """Read file cache and delete files."""
566  self.mem_cachemem_cache = {}
567 
568  def remove_files() -> None:
569  """Remove files from filesystem."""
570  for filename in self.file_cachefile_cache.values():
571  try:
572  os.remove(os.path.join(self.cache_dircache_dir, filename))
573  except OSError as err:
574  _LOGGER.warning("Can't remove cache file '%s': %s", filename, err)
575 
576  await self.hasshass.async_add_executor_job(remove_files)
577  self.file_cachefile_cache = {}
578 
579  @callback
581  self, engine: str, provider: Provider, config: ConfigType
582  ) -> None:
583  """Register a legacy TTS engine."""
584  provider.hass = self.hasshass
585  if provider.name is None:
586  provider.name = engine
587  self.providers[engine] = provider
588 
589  self.hasshass.config.components.add(
590  PLATFORM_FORMAT.format(domain=DOMAIN, platform=engine)
591  )
592 
593  @callback
595  self,
596  engine_instance: TextToSpeechEntity | Provider,
597  language: str | None,
598  options: dict | None,
599  ) -> tuple[str, dict[str, Any]]:
600  """Validate and process options."""
601  # Languages
602  language = language or engine_instance.default_language
603  if (
604  language is None
605  or engine_instance.supported_languages is None
606  or language not in engine_instance.supported_languages
607  ):
608  raise HomeAssistantError(f"Language '{language}' not supported")
609 
610  options = options or {}
611  supported_options = engine_instance.supported_options or []
612 
613  # Update default options with provided options
614  invalid_opts: list[str] = []
615  merged_options = dict(engine_instance.default_options or {})
616  for option_name, option_value in options.items():
617  # Only count an option as invalid if it's not a "preferred format"
618  # option. These are used as hints to the TTS system if supported,
619  # and otherwise as parameters to ffmpeg conversion.
620  if (option_name in supported_options) or (
621  option_name in _PREFFERED_FORMAT_OPTIONS
622  ):
623  merged_options[option_name] = option_value
624  else:
625  invalid_opts.append(option_name)
626 
627  if invalid_opts:
628  raise HomeAssistantError(f"Invalid options found: {invalid_opts}")
629 
630  return language, merged_options
631 
633  self,
634  engine: str,
635  message: str,
636  cache: bool | None = None,
637  language: str | None = None,
638  options: dict | None = None,
639  ) -> str:
640  """Get URL for play message.
641 
642  This method is a coroutine.
643  """
644  if (engine_instance := get_engine_instance(self.hasshass, engine)) is None:
645  raise HomeAssistantError(f"Provider {engine} not found")
646 
647  language, options = self.process_optionsprocess_options(engine_instance, language, options)
648  cache_key = self._generate_cache_key_generate_cache_key(message, language, options, engine)
649  use_cache = cache if cache is not None else self.use_cacheuse_cache
650 
651  # Is speech already in memory
652  if cache_key in self.mem_cachemem_cache:
653  filename = self.mem_cachemem_cache[cache_key]["filename"]
654  # Is file store in file cache
655  elif use_cache and cache_key in self.file_cachefile_cache:
656  filename = self.file_cachefile_cache[cache_key]
657  self.hasshass.async_create_task(self._async_file_to_mem_async_file_to_mem(cache_key))
658  # Load speech from engine into memory
659  else:
660  filename = await self._async_get_tts_audio_async_get_tts_audio(
661  engine_instance, cache_key, message, use_cache, language, options
662  )
663 
664  # Use a randomly generated token instead of exposing the filename
665  token = self.filename_to_token.get(filename)
666  if not token:
667  # Keep extension (.mp3, etc.)
668  token = secrets.token_urlsafe(16) + os.path.splitext(filename)[1]
669 
670  # Map token <-> filename
671  self.filename_to_token[filename] = token
672  self.token_to_filename[token] = filename
673 
674  return f"/api/tts_proxy/{token}"
675 
677  self,
678  engine: str,
679  message: str,
680  cache: bool | None = None,
681  language: str | None = None,
682  options: dict | None = None,
683  ) -> tuple[str, bytes]:
684  """Fetch TTS audio."""
685  if (engine_instance := get_engine_instance(self.hasshass, engine)) is None:
686  raise HomeAssistantError(f"Provider {engine} not found")
687 
688  language, options = self.process_optionsprocess_options(engine_instance, language, options)
689  cache_key = self._generate_cache_key_generate_cache_key(message, language, options, engine)
690  use_cache = cache if cache is not None else self.use_cacheuse_cache
691 
692  # If we have the file, load it into memory if necessary
693  if cache_key not in self.mem_cachemem_cache:
694  if use_cache and cache_key in self.file_cachefile_cache:
695  await self._async_file_to_mem_async_file_to_mem(cache_key)
696  else:
697  await self._async_get_tts_audio_async_get_tts_audio(
698  engine_instance, cache_key, message, use_cache, language, options
699  )
700 
701  extension = os.path.splitext(self.mem_cachemem_cache[cache_key]["filename"])[1][1:]
702  cached = self.mem_cachemem_cache[cache_key]
703  if pending := cached.get("pending"):
704  await pending
705  cached = self.mem_cachemem_cache[cache_key]
706  return extension, cached["voice"]
707 
708  @callback
710  self,
711  message: str,
712  language: str,
713  options: dict | None,
714  engine: str,
715  ) -> str:
716  """Generate a cache key for a message."""
717  options_key = _hash_options(options) if options else "-"
718  msg_hash = hashlib.sha1(bytes(message, "utf-8")).hexdigest()
719  return KEY_PATTERN.format(
720  msg_hash, language.replace("_", "-"), options_key, engine
721  ).lower()
722 
724  self,
725  engine_instance: TextToSpeechEntity | Provider,
726  cache_key: str,
727  message: str,
728  cache: bool,
729  language: str,
730  options: dict[str, Any],
731  ) -> str:
732  """Receive TTS, store for view in cache and return filename.
733 
734  This method is a coroutine.
735  """
736  options = dict(options or {})
737  supported_options = engine_instance.supported_options or []
738 
739  # Extract preferred format options.
740  #
741  # These options are used by Assist pipelines, etc. to get a format that
742  # the voice satellite will support.
743  #
744  # The TTS system ideally supports options directly so we won't have
745  # to convert with ffmpeg later. If not, we pop the options here and
746  # perform the conversation after receiving the audio.
747  if ATTR_PREFERRED_FORMAT in supported_options:
748  final_extension = options.get(ATTR_PREFERRED_FORMAT, _DEFAULT_FORMAT)
749  else:
750  final_extension = options.pop(ATTR_PREFERRED_FORMAT, _DEFAULT_FORMAT)
751 
752  if ATTR_PREFERRED_SAMPLE_RATE in supported_options:
753  sample_rate = options.get(ATTR_PREFERRED_SAMPLE_RATE)
754  else:
755  sample_rate = options.pop(ATTR_PREFERRED_SAMPLE_RATE, None)
756 
757  if sample_rate is not None:
758  sample_rate = int(sample_rate)
759 
760  if ATTR_PREFERRED_SAMPLE_CHANNELS in supported_options:
761  sample_channels = options.get(ATTR_PREFERRED_SAMPLE_CHANNELS)
762  else:
763  sample_channels = options.pop(ATTR_PREFERRED_SAMPLE_CHANNELS, None)
764 
765  if sample_channels is not None:
766  sample_channels = int(sample_channels)
767 
768  if ATTR_PREFERRED_SAMPLE_BYTES in supported_options:
769  sample_bytes = options.get(ATTR_PREFERRED_SAMPLE_BYTES)
770  else:
771  sample_bytes = options.pop(ATTR_PREFERRED_SAMPLE_BYTES, None)
772 
773  if sample_bytes is not None:
774  sample_bytes = int(sample_bytes)
775 
776  async def get_tts_data() -> str:
777  """Handle data available."""
778  if engine_instance.name is None or engine_instance.name is UNDEFINED:
779  raise HomeAssistantError("TTS engine name is not set.")
780 
781  if isinstance(engine_instance, Provider):
782  extension, data = await engine_instance.async_get_tts_audio(
783  message, language, options
784  )
785  else:
786  extension, data = await engine_instance.internal_async_get_tts_audio(
787  message, language, options
788  )
789 
790  if data is None or extension is None:
791  raise HomeAssistantError(
792  f"No TTS from {engine_instance.name} for '{message}'"
793  )
794 
795  # Only convert if we have a preferred format different than the
796  # expected format from the TTS system, or if a specific sample
797  # rate/format/channel count is requested.
798  needs_conversion = (
799  (final_extension != extension)
800  or (sample_rate is not None)
801  or (sample_channels is not None)
802  or (sample_bytes is not None)
803  )
804 
805  if needs_conversion:
806  data = await async_convert_audio(
807  self.hasshass,
808  extension,
809  data,
810  to_extension=final_extension,
811  to_sample_rate=sample_rate,
812  to_sample_channels=sample_channels,
813  to_sample_bytes=sample_bytes,
814  )
815 
816  # Create file infos
817  filename = f"{cache_key}.{final_extension}".lower()
818 
819  # Validate filename
820  if not _RE_VOICE_FILE.match(filename) and not _RE_LEGACY_VOICE_FILE.match(
821  filename
822  ):
823  raise HomeAssistantError(
824  f"TTS filename '{filename}' from {engine_instance.name} is invalid!"
825  )
826 
827  # Save to memory
828  if final_extension == "mp3":
829  data = self.write_tagswrite_tags(
830  filename, data, engine_instance.name, message, language, options
831  )
832 
833  self._async_store_to_memcache_async_store_to_memcache(cache_key, filename, data)
834 
835  if cache:
836  self.hasshass.async_create_task(
837  self._async_save_tts_audio_async_save_tts_audio(cache_key, filename, data)
838  )
839 
840  return filename
841 
842  audio_task = self.hasshass.async_create_task(get_tts_data(), eager_start=False)
843 
844  def handle_error(_future: asyncio.Future) -> None:
845  """Handle error."""
846  if audio_task.exception():
847  self.mem_cachemem_cache.pop(cache_key, None)
848 
849  audio_task.add_done_callback(handle_error)
850 
851  filename = f"{cache_key}.{final_extension}".lower()
852  self.mem_cachemem_cache[cache_key] = {
853  "filename": filename,
854  "voice": b"",
855  "pending": audio_task,
856  }
857  return filename
858 
860  self, cache_key: str, filename: str, data: bytes
861  ) -> None:
862  """Store voice data to file and file_cache.
863 
864  This method is a coroutine.
865  """
866  voice_file = os.path.join(self.cache_dircache_dir, filename)
867 
868  def save_speech() -> None:
869  """Store speech to filesystem."""
870  with open(voice_file, "wb") as speech:
871  speech.write(data)
872 
873  try:
874  await self.hasshass.async_add_executor_job(save_speech)
875  self.file_cachefile_cache[cache_key] = filename
876  except OSError as err:
877  _LOGGER.error("Can't write %s: %s", filename, err)
878 
879  async def _async_file_to_mem(self, cache_key: str) -> None:
880  """Load voice from file cache into memory.
881 
882  This method is a coroutine.
883  """
884  if not (filename := self.file_cachefile_cache.get(cache_key)):
885  raise HomeAssistantError(f"Key {cache_key} not in file cache!")
886 
887  voice_file = os.path.join(self.cache_dircache_dir, filename)
888 
889  def load_speech() -> bytes:
890  """Load a speech from filesystem."""
891  with open(voice_file, "rb") as speech:
892  return speech.read()
893 
894  try:
895  data = await self.hasshass.async_add_executor_job(load_speech)
896  except OSError as err:
897  del self.file_cachefile_cache[cache_key]
898  raise HomeAssistantError(f"Can't read {voice_file}") from err
899 
900  self._async_store_to_memcache_async_store_to_memcache(cache_key, filename, data)
901 
902  @callback
904  self, cache_key: str, filename: str, data: bytes
905  ) -> None:
906  """Store data to memcache and set timer to remove it."""
907  self.mem_cachemem_cache[cache_key] = {
908  "filename": filename,
909  "voice": data,
910  "pending": None,
911  }
912 
913  @callback
914  def async_remove_from_mem(_: datetime) -> None:
915  """Cleanup memcache."""
916  self.mem_cachemem_cache.pop(cache_key, None)
917 
919  self.hasshass,
920  self.time_memorytime_memory,
921  HassJob(
922  async_remove_from_mem,
923  name="tts remove_from_mem",
924  cancel_on_shutdown=True,
925  ),
926  )
927 
928  async def async_read_tts(self, token: str) -> tuple[str | None, bytes]:
929  """Read a voice file and return binary.
930 
931  This method is a coroutine.
932  """
933  filename = self.token_to_filename.get(token)
934  if not filename:
935  raise HomeAssistantError(f"{token} was not recognized!")
936 
937  if not (record := _RE_VOICE_FILE.match(filename.lower())) and not (
938  record := _RE_LEGACY_VOICE_FILE.match(filename.lower())
939  ):
940  raise HomeAssistantError("Wrong tts file format!")
941 
942  cache_key = KEY_PATTERN.format(
943  record.group(1), record.group(2), record.group(3), record.group(4)
944  )
945 
946  if cache_key not in self.mem_cachemem_cache:
947  if cache_key not in self.file_cachefile_cache:
948  raise HomeAssistantError(f"{cache_key} not in cache!")
949  await self._async_file_to_mem_async_file_to_mem(cache_key)
950 
951  cached = self.mem_cachemem_cache[cache_key]
952  if pending := cached.get("pending"):
953  await pending
954  cached = self.mem_cachemem_cache[cache_key]
955 
956  content, _ = mimetypes.guess_type(filename)
957  return content, cached["voice"]
958 
959  @staticmethod
961  filename: str,
962  data: bytes,
963  engine_name: str,
964  message: str,
965  language: str,
966  options: dict | None,
967  ) -> bytes:
968  """Write ID3 tags to file.
969 
970  Async friendly.
971  """
972 
973  data_bytes = io.BytesIO(data)
974  data_bytes.name = filename
975  data_bytes.seek(0)
976 
977  album = engine_name
978  artist = language
979 
980  if options is not None and (voice := options.get("voice")) is not None:
981  artist = voice
982 
983  try:
984  tts_file = mutagen.File(data_bytes)
985  if tts_file is not None:
986  if not tts_file.tags:
987  tts_file.add_tags()
988  if isinstance(tts_file.tags, ID3):
989  tts_file["artist"] = ID3Text(
990  encoding=3,
991  text=artist, # type: ignore[no-untyped-call]
992  )
993  tts_file["album"] = ID3Text(
994  encoding=3,
995  text=album, # type: ignore[no-untyped-call]
996  )
997  tts_file["title"] = ID3Text(
998  encoding=3,
999  text=message, # type: ignore[no-untyped-call]
1000  )
1001  else:
1002  tts_file["artist"] = artist
1003  tts_file["album"] = album
1004  tts_file["title"] = message
1005  tts_file.save(data_bytes)
1006  except mutagen.MutagenError as err:
1007  _LOGGER.error("ID3 tag error: %s", err)
1008 
1009  return data_bytes.getvalue()
1010 
1011 
1012 def _init_tts_cache_dir(hass: HomeAssistant, cache_dir: str) -> str:
1013  """Init cache folder."""
1014  if not os.path.isabs(cache_dir):
1015  cache_dir = hass.config.path(cache_dir)
1016  if not os.path.isdir(cache_dir):
1017  _LOGGER.info("Create cache dir %s", cache_dir)
1018  os.mkdir(cache_dir)
1019  return cache_dir
1020 
1021 
1022 def _get_cache_files(cache_dir: str) -> dict[str, str]:
1023  """Return a dict of given engine files."""
1024  cache = {}
1025 
1026  folder_data = os.listdir(cache_dir)
1027  for file_data in folder_data:
1028  if (record := _RE_VOICE_FILE.match(file_data)) or (
1029  record := _RE_LEGACY_VOICE_FILE.match(file_data)
1030  ):
1031  key = KEY_PATTERN.format(
1032  record.group(1), record.group(2), record.group(3), record.group(4)
1033  )
1034  cache[key.lower()] = file_data.lower()
1035  return cache
1036 
1037 
1038 class TextToSpeechUrlView(HomeAssistantView):
1039  """TTS view to get a url to a generated speech file."""
1040 
1041  requires_auth = True
1042  url = "/api/tts_get_url"
1043  name = "api:tts:geturl"
1044 
1045  def __init__(self, tts: SpeechManager) -> None:
1046  """Initialize a tts view."""
1047  self.ttstts = tts
1048 
1049  async def post(self, request: web.Request) -> web.Response:
1050  """Generate speech and provide url."""
1051  try:
1052  data = await request.json()
1053  except ValueError:
1054  return self.json_message("Invalid JSON specified", HTTPStatus.BAD_REQUEST)
1055  if (
1056  not data.get("engine_id")
1057  and not data.get(ATTR_PLATFORM)
1058  or not data.get(ATTR_MESSAGE)
1059  ):
1060  return self.json_message(
1061  "Must specify platform and message", HTTPStatus.BAD_REQUEST
1062  )
1063 
1064  engine = data.get("engine_id") or data[ATTR_PLATFORM]
1065  message = data[ATTR_MESSAGE]
1066  cache = data.get(ATTR_CACHE)
1067  language = data.get(ATTR_LANGUAGE)
1068  options = data.get(ATTR_OPTIONS)
1069 
1070  try:
1071  path = await self.ttstts.async_get_url_path(
1072  engine, message, cache=cache, language=language, options=options
1073  )
1074  except HomeAssistantError as err:
1075  _LOGGER.error("Error on init tts: %s", err)
1076  return self.json({"error": err}, HTTPStatus.BAD_REQUEST)
1077 
1078  base = get_url(self.ttstts.hass)
1079  url = base + path
1080 
1081  return self.json({"url": url, "path": path})
1082 
1083 
1084 class TextToSpeechView(HomeAssistantView):
1085  """TTS view to serve a speech audio."""
1086 
1087  requires_auth = False
1088  url = "/api/tts_proxy/{filename}"
1089  name = "api:tts_speech"
1090 
1091  def __init__(self, tts: SpeechManager) -> None:
1092  """Initialize a tts view."""
1093  self.ttstts = tts
1094 
1095  async def get(self, request: web.Request, filename: str) -> web.Response:
1096  """Start a get request."""
1097  try:
1098  # filename is actually token, but we keep its name for compatibility
1099  content, data = await self.ttstts.async_read_tts(filename)
1100  except HomeAssistantError as err:
1101  _LOGGER.error("Error on load tts: %s", err)
1102  return web.Response(status=HTTPStatus.NOT_FOUND)
1103 
1104  return web.Response(body=data, content_type=content)
1105 
1106 
1107 @websocket_api.websocket_command( { "type": "tts/engine/list", vol.Optional("country"): str,
1108  vol.Optional("language"): str,
1109  }
1110 )
1111 @callback
1113  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1114 ) -> None:
1115  """List text to speech engines and, optionally, if they support a given language."""
1116  country = msg.get("country")
1117  language = msg.get("language")
1118  providers = []
1119  provider_info: dict[str, Any]
1120  entity_domains: set[str] = set()
1121 
1122  for entity in hass.data[DATA_COMPONENT].entities:
1123  provider_info = {
1124  "engine_id": entity.entity_id,
1125  "supported_languages": entity.supported_languages,
1126  }
1127  if language:
1128  provider_info["supported_languages"] = language_util.matches(
1129  language, entity.supported_languages, country
1130  )
1131  providers.append(provider_info)
1132  if entity.platform:
1133  entity_domains.add(entity.platform.platform_name)
1134  for engine_id, provider in hass.data[DATA_TTS_MANAGER].providers.items():
1135  provider_info = {
1136  "engine_id": engine_id,
1137  "name": provider.name,
1138  "supported_languages": provider.supported_languages,
1139  }
1140  if language:
1141  provider_info["supported_languages"] = language_util.matches(
1142  language, provider.supported_languages, country
1143  )
1144  if engine_id in entity_domains:
1145  provider_info["deprecated"] = True
1146  providers.append(provider_info)
1147 
1148  connection.send_message(
1149  websocket_api.result_message(msg["id"], {"providers": providers})
1150  )
1151 
1152 
1153 @websocket_api.websocket_command( { "type": "tts/engine/get", vol.Required("engine_id"): str,
1154  }
1155 )
1156 @callback
1158  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1159 ) -> None:
1160  """Get text to speech engine info."""
1161  engine_id = msg["engine_id"]
1162  provider_info: dict[str, Any]
1164  provider: TextToSpeechEntity | Provider | None = next(
1165  (
1166  entity
1167  for entity in hass.data[DATA_COMPONENT].entities
1168  if entity.entity_id == engine_id
1169  ),
1170  None,
1171  )
1172  if not provider:
1173  provider = hass.data[DATA_TTS_MANAGER].providers.get(engine_id)
1174 
1175  if not provider:
1176  connection.send_error(
1177  msg["id"],
1178  websocket_api.ERR_NOT_FOUND,
1179  f"tts engine {engine_id} not found",
1180  )
1181  return
1182 
1183  provider_info = {
1184  "engine_id": engine_id,
1185  "supported_languages": provider.supported_languages,
1186  }
1187  if isinstance(provider, Provider):
1188  provider_info["name"] = provider.name
1189 
1190  connection.send_message(
1191  websocket_api.result_message(msg["id"], {"provider": provider_info})
1192  )
1193 
1194 
1195 @websocket_api.websocket_command( { "type": "tts/engine/voices", vol.Required("engine_id"): str,
1196  vol.Required("language"): str,
1197  }
1198 )
1199 @callback
1201  hass: HomeAssistant, connection: websocket_api.ActiveConnection, msg: dict
1202 ) -> None:
1203  """List voices for a given language."""
1204  engine_id = msg["engine_id"]
1205  language = msg["language"]
1206 
1207  engine_instance = get_engine_instance(hass, engine_id)
1208 
1209  if not engine_instance:
1210  connection.send_error(
1211  msg["id"],
1212  websocket_api.ERR_NOT_FOUND,
1213  f"tts engine {engine_id} not found",
1214  )
1215  return
1216 
1217  voices = {"voices": engine_instance.async_get_supported_voices(language)}
1218 
1219  connection.send_message(websocket_api.result_message(msg["id"], voices))
1220 
None async_register_legacy_engine(self, str engine, Provider provider, ConfigType config)
Definition: __init__.py:582
None __init__(self, HomeAssistant hass, bool use_cache, str cache_dir, int time_memory)
Definition: __init__.py:533
str _async_get_tts_audio(self, TextToSpeechEntity|Provider engine_instance, str cache_key, str message, bool cache, str language, dict[str, Any] options)
Definition: __init__.py:731
str async_get_url_path(self, str engine, str message, bool|None cache=None, str|None language=None, dict|None options=None)
Definition: __init__.py:639
str _generate_cache_key(self, str message, str language, dict|None options, str engine)
Definition: __init__.py:715
tuple[str, bytes] async_get_tts_audio(self, str engine, str message, bool|None cache=None, str|None language=None, dict|None options=None)
Definition: __init__.py:683
None _async_file_to_mem(self, str cache_key)
Definition: __init__.py:879
None _async_store_to_memcache(self, str cache_key, str filename, bytes data)
Definition: __init__.py:905
bytes write_tags(str filename, bytes data, str engine_name, str message, str language, dict|None options)
Definition: __init__.py:967
None _async_save_tts_audio(self, str cache_key, str filename, bytes data)
Definition: __init__.py:861
tuple[str|None, bytes] async_read_tts(self, str token)
Definition: __init__.py:928
tuple[str, dict[str, Any]] process_options(self, TextToSpeechEntity|Provider engine_instance, str|None language, dict|None options)
Definition: __init__.py:599
Mapping[str, Any]|None default_options(self)
Definition: __init__.py:421
list[Voice]|None async_get_supported_voices(self, str language)
Definition: __init__.py:426
TtsAudioType internal_async_get_tts_audio(self, str message, str language, dict[str, Any] options)
Definition: __init__.py:485
TtsAudioType get_tts_audio(self, str message, str language, dict[str, Any] options)
Definition: __init__.py:498
TtsAudioType async_get_tts_audio(self, str message, str language, dict[str, Any] options)
Definition: __init__.py:504
None async_speak(self, list[str] media_player_entity_id, str message, bool cache, str|None language=None, dict|None options=None)
Definition: __init__.py:460
web.Response post(self, web.Request request)
Definition: __init__.py:1049
None __init__(self, SpeechManager tts)
Definition: __init__.py:1045
None __init__(self, SpeechManager tts)
Definition: __init__.py:1091
web.Response get(self, web.Request request, str filename)
Definition: __init__.py:1095
CalendarEntity get_entity(HomeAssistant hass, str entity_id)
Definition: trigger.py:96
web.Response get(self, web.Request request, str config_key)
Definition: view.py:88
IssData update(pyiss.ISS iss)
Definition: __init__.py:33
str generate_media_source_id(str domain, str identifier)
Definition: __init__.py:71
list[Coroutine[Any, Any, None]] async_setup_legacy(HomeAssistant hass, ConfigType config)
Definition: legacy.py:70
None open(self, **Any kwargs)
Definition: lock.py:86
Callable[[Command|list[Command]], Coroutine[Any, Any, None]] handle_error(Callable[[Command|list[Command]], Any] func)
Definition: entity.py:24
TextToSpeechEntity|Provider|None get_engine_instance(HomeAssistant hass, str engine)
Definition: helper.py:18
MediaSourceOptions media_source_id_to_kwargs(str media_source_id)
Definition: media_source.py:80
tuple[str, bytes] async_get_media_source_audio(HomeAssistant hass, str media_source_id)
Definition: __init__.py:193
dict[str, str] _get_cache_files(str cache_dir)
Definition: __init__.py:1022
set[str] async_get_text_to_speech_languages(HomeAssistant hass)
Definition: __init__.py:201
bool async_support_options(HomeAssistant hass, str engine, str|None language=None, dict|None options=None)
Definition: __init__.py:177
bytes _convert_audio(str ffmpeg_binary, str from_extension, bytes audio_bytes, str to_extension, int|None to_sample_rate=None, int|None to_sample_channels=None, int|None to_sample_bytes=None)
Definition: __init__.py:248
None websocket_list_engine_voices(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: __init__.py:1211
bool async_setup(HomeAssistant hass, ConfigType config)
Definition: __init__.py:301
bool async_setup_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:368
str _init_tts_cache_dir(HomeAssistant hass, str cache_dir)
Definition: __init__.py:1012
bool async_unload_entry(HomeAssistant hass, ConfigEntry entry)
Definition: __init__.py:373
str|None async_resolve_engine(HomeAssistant hass, str|None engine)
Definition: __init__.py:156
None websocket_get_engine(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: __init__.py:1165
str _hash_options(dict options)
Definition: __init__.py:514
None websocket_list_engines(HomeAssistant hass, websocket_api.ActiveConnection connection, dict msg)
Definition: __init__.py:1117
str|None async_default_engine(HomeAssistant hass)
Definition: __init__.py:138
bytes async_convert_audio(HomeAssistant hass, str from_extension, bytes audio_bytes, str to_extension, int|None to_sample_rate=None, int|None to_sample_channels=None, int|None to_sample_bytes=None)
Definition: __init__.py:224
CALLBACK_TYPE async_call_later(HomeAssistant hass, float|timedelta delay, HassJob[[datetime], Coroutine[Any, Any, None]|None]|Callable[[datetime], Coroutine[Any, Any, None]|None] action)
Definition: event.py:1597
str get_url(HomeAssistant hass, *bool require_current_request=False, bool require_ssl=False, bool require_standard_port=False, bool require_cloud=False, bool allow_internal=True, bool allow_external=True, bool allow_cloud=True, bool|None allow_ip=None, bool|None prefer_external=None, bool prefer_cloud=False)
Definition: network.py:131