Home Assistant Unofficial Reference 2024.12.1
vad.py
Go to the documentation of this file.
1 """Voice activity detection."""
2 
3 from __future__ import annotations
4 
5 from collections.abc import Callable, Iterable
6 from dataclasses import dataclass
7 from enum import StrEnum
8 import logging
9 
10 from .const import SAMPLE_CHANNELS, SAMPLE_RATE, SAMPLE_WIDTH
11 
12 _LOGGER = logging.getLogger(__name__)
13 
14 
15 class VadSensitivity(StrEnum):
16  """How quickly the end of a voice command is detected."""
17 
18  DEFAULT = "default"
19  RELAXED = "relaxed"
20  AGGRESSIVE = "aggressive"
21 
22  @staticmethod
23  def to_seconds(sensitivity: VadSensitivity | str) -> float:
24  """Return seconds of silence for sensitivity level."""
25  sensitivity = VadSensitivity(sensitivity)
26  if sensitivity == VadSensitivity.RELAXED:
27  return 1.25
28 
29  if sensitivity == VadSensitivity.AGGRESSIVE:
30  return 0.25
31 
32  return 0.7
33 
34 
36  """Fixed-sized audio buffer with variable internal length."""
37 
38  def __init__(self, maxlen: int) -> None:
39  """Initialize buffer."""
40  self._buffer_buffer = bytearray(maxlen)
41  self._length_length = 0
42 
43  @property
44  def length(self) -> int:
45  """Get number of bytes currently in the buffer."""
46  return self._length_length
47 
48  def clear(self) -> None:
49  """Clear the buffer."""
50  self._length_length = 0
51 
52  def append(self, data: bytes) -> None:
53  """Append bytes to the buffer, increasing the internal length."""
54  data_len = len(data)
55  if (self._length_length + data_len) > len(self._buffer_buffer):
56  raise ValueError("Length cannot be greater than buffer size")
57 
58  self._buffer_buffer[self._length_length : self._length_length + data_len] = data
59  self._length_length += data_len
60 
61  def bytes(self) -> bytes:
62  """Convert written portion of buffer to bytes."""
63  return bytes(self._buffer_buffer[: self._length_length])
64 
65  def __len__(self) -> int:
66  """Get the number of bytes currently in the buffer."""
67  return self._length_length
68 
69  def __bool__(self) -> bool:
70  """Return True if there are bytes in the buffer."""
71  return self._length_length > 0
72 
73 
74 @dataclass
76  """Segments an audio stream into voice commands."""
77 
78  speech_seconds: float = 0.1
79  """Seconds of speech before voice command has started."""
80 
81  command_seconds: float = 1.0
82  """Minimum number of seconds for a voice command."""
83 
84  silence_seconds: float = 0.7
85  """Seconds of silence after voice command has ended."""
86 
87  timeout_seconds: float = 15.0
88  """Maximum number of seconds before stopping with timeout=True."""
89 
90  reset_seconds: float = 1.0
91  """Seconds before reset start/stop time counters."""
92 
93  in_command: bool = False
94  """True if inside voice command."""
95 
96  timed_out: bool = False
97  """True a timeout occurred during voice command."""
98 
99  before_command_speech_threshold: float = 0.2
100  """Probability threshold for speech before voice command."""
101 
102  in_command_speech_threshold: float = 0.5
103  """Probability threshold for speech during voice command."""
104 
105  _speech_seconds_left: float = 0.0
106  """Seconds left before considering voice command as started."""
107 
108  _command_seconds_left: float = 0.0
109  """Seconds left before voice command could stop."""
110 
111  _silence_seconds_left: float = 0.0
112  """Seconds left before considering voice command as stopped."""
113 
114  _timeout_seconds_left: float = 0.0
115  """Seconds left before considering voice command timed out."""
116 
117  _reset_seconds_left: float = 0.0
118  """Seconds left before resetting start/stop time counters."""
119 
120  def __post_init__(self) -> None:
121  """Reset after initialization."""
122  self.resetreset()
123 
124  def reset(self) -> None:
125  """Reset all counters and state."""
126  self._speech_seconds_left_speech_seconds_left = self.speech_seconds
127  self._command_seconds_left_command_seconds_left = self.command_seconds - self.speech_seconds
128  self._silence_seconds_left_silence_seconds_left = self.silence_seconds
129  self._timeout_seconds_left_timeout_seconds_left = self.timeout_seconds
130  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
131  self.in_commandin_command = False
132 
133  def process(self, chunk_seconds: float, speech_probability: float | None) -> bool:
134  """Process samples using external VAD.
135 
136  Returns False when command is done.
137  """
138  if self.timed_outtimed_out:
139  self.timed_outtimed_out = False
140 
141  self._timeout_seconds_left_timeout_seconds_left -= chunk_seconds
142  if self._timeout_seconds_left_timeout_seconds_left <= 0:
143  _LOGGER.warning(
144  "VAD end of speech detection timed out after %s seconds",
145  self.timeout_seconds,
146  )
147  self.resetreset()
148  self.timed_outtimed_out = True
149  return False
150 
151  if speech_probability is None:
152  speech_probability = 0.0
153 
154  if not self.in_commandin_command:
155  # Before command
156  is_speech = speech_probability > self.before_command_speech_threshold
157  if is_speech:
158  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
159  self._speech_seconds_left_speech_seconds_left -= chunk_seconds
160  if self._speech_seconds_left_speech_seconds_left <= 0:
161  # Inside voice command
162  self.in_commandin_command = True
163  self._command_seconds_left_command_seconds_left = (
164  self.command_seconds - self.speech_seconds
165  )
166  self._silence_seconds_left_silence_seconds_left = self.silence_seconds
167  _LOGGER.debug("Voice command started")
168  else:
169  # Reset if enough silence
170  self._reset_seconds_left_reset_seconds_left -= chunk_seconds
171  if self._reset_seconds_left_reset_seconds_left <= 0:
172  self._speech_seconds_left_speech_seconds_left = self.speech_seconds
173  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
174  else:
175  # In command
176  is_speech = speech_probability > self.in_command_speech_threshold
177  if not is_speech:
178  # Silence in command
179  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
180  self._silence_seconds_left_silence_seconds_left -= chunk_seconds
181  self._command_seconds_left_command_seconds_left -= chunk_seconds
182  if (self._silence_seconds_left_silence_seconds_left <= 0) and (
183  self._command_seconds_left_command_seconds_left <= 0
184  ):
185  # Command finished successfully
186  self.resetreset()
187  _LOGGER.debug("Voice command finished")
188  return False
189  else:
190  # Speech in command.
191  # Reset silence counter if enough speech.
192  self._reset_seconds_left_reset_seconds_left -= chunk_seconds
193  self._command_seconds_left_command_seconds_left -= chunk_seconds
194  if self._reset_seconds_left_reset_seconds_left <= 0:
195  self._silence_seconds_left_silence_seconds_left = self.silence_seconds
196  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
197 
198  return True
199 
201  self,
202  chunk: bytes,
203  vad_samples_per_chunk: int | None,
204  vad_is_speech: Callable[[bytes], bool],
205  leftover_chunk_buffer: AudioBuffer | None,
206  ) -> bool:
207  """Process an audio chunk using an external VAD.
208 
209  A buffer is required if the VAD requires fixed-sized audio chunks (usually the case).
210 
211  Returns False when voice command is finished.
212  """
213  if vad_samples_per_chunk is None:
214  # No chunking
215  chunk_seconds = (
216  len(chunk) // (SAMPLE_WIDTH * SAMPLE_CHANNELS)
217  ) / SAMPLE_RATE
218  is_speech = vad_is_speech(chunk)
219  return self.processprocess(chunk_seconds, is_speech)
220 
221  if leftover_chunk_buffer is None:
222  raise ValueError("leftover_chunk_buffer is required when vad uses chunking")
223 
224  # With chunking
225  seconds_per_chunk = vad_samples_per_chunk / SAMPLE_RATE
226  bytes_per_chunk = vad_samples_per_chunk * (SAMPLE_WIDTH * SAMPLE_CHANNELS)
227  for vad_chunk in chunk_samples(chunk, bytes_per_chunk, leftover_chunk_buffer):
228  is_speech = vad_is_speech(vad_chunk)
229  if not self.processprocess(seconds_per_chunk, is_speech):
230  return False
231 
232  return True
233 
234 
235 @dataclass
237  """Detects silence in audio until a timeout is reached."""
238 
239  silence_seconds: float
240  """Seconds of silence before timeout."""
241 
242  reset_seconds: float = 0.5
243  """Seconds of speech before resetting timeout."""
244 
245  speech_threshold: float = 0.5
246  """Threshold for speech."""
247 
248  _silence_seconds_left: float = 0.0
249  """Seconds left before considering voice command as stopped."""
250 
251  _reset_seconds_left: float = 0.0
252  """Seconds left before resetting start/stop time counters."""
253 
254  def __post_init__(self) -> None:
255  """Reset after initialization."""
256  self.resetreset()
257 
258  def reset(self) -> None:
259  """Reset all counters and state."""
260  self._silence_seconds_left_silence_seconds_left = self.silence_seconds
261  self._reset_seconds_left_reset_seconds_left = self.reset_seconds
262 
263  def process(self, chunk_seconds: float, speech_probability: float | None) -> bool:
264  """Process samples using external VAD.
265 
266  Returns False when timeout is reached.
267  """
268  if speech_probability is None:
269  speech_probability = 0.0
270 
271  if speech_probability > self.speech_threshold:
272  # Speech
273  self._reset_seconds_left_reset_seconds_left -= chunk_seconds
274  if self._reset_seconds_left_reset_seconds_left <= 0:
275  # Reset timeout
276  self._silence_seconds_left_silence_seconds_left = self.silence_seconds
277  else:
278  # Silence
279  self._silence_seconds_left_silence_seconds_left -= chunk_seconds
280  if self._silence_seconds_left_silence_seconds_left <= 0:
281  # Timeout reached
282  self.resetreset()
283  return False
284 
285  # Slowly build reset counter back up
286  self._reset_seconds_left_reset_seconds_left = min(
287  self.reset_seconds, self._reset_seconds_left_reset_seconds_left + chunk_seconds
288  )
289 
290  return True
291 
292 
294  samples: bytes,
295  bytes_per_chunk: int,
296  leftover_chunk_buffer: AudioBuffer,
297 ) -> Iterable[bytes]:
298  """Yield fixed-sized chunks from samples, keeping leftover bytes from previous call(s)."""
299 
300  if (len(leftover_chunk_buffer) + len(samples)) < bytes_per_chunk:
301  # Extend leftover chunk, but not enough samples to complete it
302  leftover_chunk_buffer.append(samples)
303  return
304 
305  next_chunk_idx = 0
306 
307  if leftover_chunk_buffer:
308  # Add to leftover chunk from previous call(s).
309  bytes_to_copy = bytes_per_chunk - len(leftover_chunk_buffer)
310  leftover_chunk_buffer.append(samples[:bytes_to_copy])
311  next_chunk_idx = bytes_to_copy
312 
313  # Process full chunk in buffer
314  yield leftover_chunk_buffer.bytes()
315  leftover_chunk_buffer.clear()
316 
317  while next_chunk_idx < len(samples) - bytes_per_chunk + 1:
318  # Process full chunk
319  yield samples[next_chunk_idx : next_chunk_idx + bytes_per_chunk]
320  next_chunk_idx += bytes_per_chunk
321 
322  # Capture leftover chunks
323  if rest_samples := samples[next_chunk_idx:]:
324  leftover_chunk_buffer.append(rest_samples)
float to_seconds(VadSensitivity|str sensitivity)
Definition: vad.py:23
bool process(self, float chunk_seconds, float|None speech_probability)
Definition: vad.py:263
bool process(self, float chunk_seconds, float|None speech_probability)
Definition: vad.py:133
bool process_with_vad(self, bytes chunk, int|None vad_samples_per_chunk, Callable[[bytes], bool] vad_is_speech, AudioBuffer|None leftover_chunk_buffer)
Definition: vad.py:206
Iterable[bytes] chunk_samples(bytes samples, int bytes_per_chunk, AudioBuffer leftover_chunk_buffer)
Definition: vad.py:297