1 """Voice activity detection."""
3 from __future__
import annotations
5 from collections.abc
import Callable, Iterable
6 from dataclasses
import dataclass
7 from enum
import StrEnum
10 from .const
import SAMPLE_CHANNELS, SAMPLE_RATE, SAMPLE_WIDTH
12 _LOGGER = logging.getLogger(__name__)
16 """How quickly the end of a voice command is detected."""
20 AGGRESSIVE =
"aggressive"
23 def to_seconds(sensitivity: VadSensitivity | str) -> float:
24 """Return seconds of silence for sensitivity level."""
26 if sensitivity == VadSensitivity.RELAXED:
29 if sensitivity == VadSensitivity.AGGRESSIVE:
36 """Fixed-sized audio buffer with variable internal length."""
39 """Initialize buffer."""
45 """Get number of bytes currently in the buffer."""
49 """Clear the buffer."""
52 def append(self, data: bytes) ->
None:
53 """Append bytes to the buffer, increasing the internal length."""
56 raise ValueError(
"Length cannot be greater than buffer size")
62 """Convert written portion of buffer to bytes."""
66 """Get the number of bytes currently in the buffer."""
70 """Return True if there are bytes in the buffer."""
76 """Segments an audio stream into voice commands."""
78 speech_seconds: float = 0.1
79 """Seconds of speech before voice command has started."""
81 command_seconds: float = 1.0
82 """Minimum number of seconds for a voice command."""
84 silence_seconds: float = 0.7
85 """Seconds of silence after voice command has ended."""
87 timeout_seconds: float = 15.0
88 """Maximum number of seconds before stopping with timeout=True."""
90 reset_seconds: float = 1.0
91 """Seconds before reset start/stop time counters."""
93 in_command: bool =
False
94 """True if inside voice command."""
96 timed_out: bool =
False
97 """True a timeout occurred during voice command."""
99 before_command_speech_threshold: float = 0.2
100 """Probability threshold for speech before voice command."""
102 in_command_speech_threshold: float = 0.5
103 """Probability threshold for speech during voice command."""
105 _speech_seconds_left: float = 0.0
106 """Seconds left before considering voice command as started."""
108 _command_seconds_left: float = 0.0
109 """Seconds left before voice command could stop."""
111 _silence_seconds_left: float = 0.0
112 """Seconds left before considering voice command as stopped."""
114 _timeout_seconds_left: float = 0.0
115 """Seconds left before considering voice command timed out."""
117 _reset_seconds_left: float = 0.0
118 """Seconds left before resetting start/stop time counters."""
121 """Reset after initialization."""
125 """Reset all counters and state."""
133 def process(self, chunk_seconds: float, speech_probability: float |
None) -> bool:
134 """Process samples using external VAD.
136 Returns False when command is done.
144 "VAD end of speech detection timed out after %s seconds",
145 self.timeout_seconds,
151 if speech_probability
is None:
152 speech_probability = 0.0
156 is_speech = speech_probability > self.before_command_speech_threshold
164 self.command_seconds - self.speech_seconds
167 _LOGGER.debug(
"Voice command started")
176 is_speech = speech_probability > self.in_command_speech_threshold
187 _LOGGER.debug(
"Voice command finished")
203 vad_samples_per_chunk: int |
None,
204 vad_is_speech: Callable[[bytes], bool],
205 leftover_chunk_buffer: AudioBuffer |
None,
207 """Process an audio chunk using an external VAD.
209 A buffer is required if the VAD requires fixed-sized audio chunks (usually the case).
211 Returns False when voice command is finished.
213 if vad_samples_per_chunk
is None:
216 len(chunk) // (SAMPLE_WIDTH * SAMPLE_CHANNELS)
218 is_speech = vad_is_speech(chunk)
219 return self.
processprocess(chunk_seconds, is_speech)
221 if leftover_chunk_buffer
is None:
222 raise ValueError(
"leftover_chunk_buffer is required when vad uses chunking")
225 seconds_per_chunk = vad_samples_per_chunk / SAMPLE_RATE
226 bytes_per_chunk = vad_samples_per_chunk * (SAMPLE_WIDTH * SAMPLE_CHANNELS)
227 for vad_chunk
in chunk_samples(chunk, bytes_per_chunk, leftover_chunk_buffer):
228 is_speech = vad_is_speech(vad_chunk)
229 if not self.
processprocess(seconds_per_chunk, is_speech):
237 """Detects silence in audio until a timeout is reached."""
239 silence_seconds: float
240 """Seconds of silence before timeout."""
242 reset_seconds: float = 0.5
243 """Seconds of speech before resetting timeout."""
245 speech_threshold: float = 0.5
246 """Threshold for speech."""
248 _silence_seconds_left: float = 0.0
249 """Seconds left before considering voice command as stopped."""
251 _reset_seconds_left: float = 0.0
252 """Seconds left before resetting start/stop time counters."""
255 """Reset after initialization."""
259 """Reset all counters and state."""
263 def process(self, chunk_seconds: float, speech_probability: float |
None) -> bool:
264 """Process samples using external VAD.
266 Returns False when timeout is reached.
268 if speech_probability
is None:
269 speech_probability = 0.0
271 if speech_probability > self.speech_threshold:
295 bytes_per_chunk: int,
296 leftover_chunk_buffer: AudioBuffer,
297 ) -> Iterable[bytes]:
298 """Yield fixed-sized chunks from samples, keeping leftover bytes from previous call(s)."""
300 if (len(leftover_chunk_buffer) + len(samples)) < bytes_per_chunk:
302 leftover_chunk_buffer.append(samples)
307 if leftover_chunk_buffer:
309 bytes_to_copy = bytes_per_chunk - len(leftover_chunk_buffer)
310 leftover_chunk_buffer.append(samples[:bytes_to_copy])
311 next_chunk_idx = bytes_to_copy
314 yield leftover_chunk_buffer.bytes()
315 leftover_chunk_buffer.clear()
317 while next_chunk_idx < len(samples) - bytes_per_chunk + 1:
319 yield samples[next_chunk_idx : next_chunk_idx + bytes_per_chunk]
320 next_chunk_idx += bytes_per_chunk
323 if rest_samples := samples[next_chunk_idx:]:
324 leftover_chunk_buffer.append(rest_samples)
None append(self, bytes data)
None __init__(self, int maxlen)
float to_seconds(VadSensitivity|str sensitivity)
bool process(self, float chunk_seconds, float|None speech_probability)
bool process(self, float chunk_seconds, float|None speech_probability)
bool process_with_vad(self, bytes chunk, int|None vad_samples_per_chunk, Callable[[bytes], bool] vad_is_speech, AudioBuffer|None leftover_chunk_buffer)
Iterable[bytes] chunk_samples(bytes samples, int bytes_per_chunk, AudioBuffer leftover_chunk_buffer)