From a06ebf722fea8cadfde74671504b616d7fb4141a Mon Sep 17 00:00:00 2001 From: William P Date: Sun, 10 May 2026 19:20:01 -0400 Subject: [PATCH] listener: implement rms (voice activity) detection --- listener/src/audio.rs | 89 +++++++++++++++++++++++++++++++++++++------ listener/src/main.rs | 56 +++++++++++++++++++++------ 2 files changed, 122 insertions(+), 23 deletions(-) diff --git a/listener/src/audio.rs b/listener/src/audio.rs index 4c01deb..cbde5f1 100644 --- a/listener/src/audio.rs +++ b/listener/src/audio.rs @@ -1,5 +1,14 @@ +use std::collections::VecDeque; use std::io::Read; use std::process::{Child, ChildStdout, Command, Stdio}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +const FRAME_SAMPLES: usize = 1600; // 100 ms at 16 kHz +const PRE_BUFFER_FRAMES: usize = 10; // 1.0 s pre-roll captured before speech onset +pub const SILENCE_THRESHOLD: f32 = 0.02; // RMS energy: tune up for noisy environments +const HANGOVER_FRAMES: usize = 20; // 2.0 s trailing silence before segment closes +const MAX_SEGMENT_SAMPLES: usize = 16000 * 120; // 2-minute hard cap per segment pub fn decode(input: &str) -> Result, Box> { let output = Command::new("ffmpeg") @@ -33,18 +42,12 @@ impl LiveStream { Ok(LiveStream { child, stdout }) } - // Reads exactly `secs` seconds of audio. Returns None when the stream ends. - pub fn next_chunk(&mut self, secs: u32) -> Result>, Box> { - let num_bytes = secs as usize * 16000 * 4; - let mut buf = vec![0u8; num_bytes]; - + fn next_frame(&mut self) -> Result>, Box> { + let mut buf = vec![0u8; FRAME_SAMPLES * 4]; match self.stdout.read_exact(&mut buf) { - Ok(()) => { - let samples = buf.chunks_exact(4) - .map(|b| f32::from_le_bytes(b.try_into().unwrap())) - .collect(); - Ok(Some(samples)) - } + Ok(()) => Ok(Some(buf.chunks_exact(4) + .map(|b| f32::from_le_bytes(b.try_into().unwrap())) + .collect())), Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None), Err(e) => Err(e.into()), } @@ -56,3 +59,67 @@ impl Drop for LiveStream { let _ = self.child.kill(); } } + +fn rms(samples: &[f32]) -> f32 { + (samples.iter().map(|&s| s * s).sum::() / samples.len() as f32).sqrt() +} + +pub struct VadStream { + inner: LiveStream, + pre_buffer: VecDeque>, + running: Arc, +} + +impl VadStream { + pub fn open(source: &str, running: Arc) -> Result> { + Ok(VadStream { + inner: LiveStream::open(source)?, + pre_buffer: VecDeque::with_capacity(PRE_BUFFER_FRAMES + 1), + running, + }) + } + + /// Function itself blocks until a complete speech segment is captured, then returns it. + /// returns None when the underlying stream ends or running is set to false. + pub fn next_segment(&mut self) -> Result>, Box> { + let mut speech: Vec = Vec::new(); + let mut speech_active = false; + let mut hangover = 0usize; + + loop { + if !self.running.load(Ordering::SeqCst) { + return if speech.is_empty() { Ok(None) } else { Ok(Some(speech)) }; + } + + let frame = match self.inner.next_frame()? { + Some(f) => f, + None => return if speech.is_empty() { Ok(None) } else { Ok(Some(speech)) }, + }; + + let energy = rms(&frame); + + if energy > SILENCE_THRESHOLD { + if !speech_active { + speech_active = true; + eprintln!(" [recording]"); + for pre in self.pre_buffer.drain(..) { + speech.extend(pre); + } + } + speech.extend(&frame); + hangover = 0; + } else if speech_active { + speech.extend(&frame); + hangover += 1; + if hangover >= HANGOVER_FRAMES || speech.len() >= MAX_SEGMENT_SAMPLES { + return Ok(Some(speech)); + } + } else { + if self.pre_buffer.len() >= PRE_BUFFER_FRAMES { + self.pre_buffer.pop_front(); + } + self.pre_buffer.push_back(frame); + } + } + } +} diff --git a/listener/src/main.rs b/listener/src/main.rs index 4ea4105..0049362 100644 --- a/listener/src/main.rs +++ b/listener/src/main.rs @@ -1,7 +1,7 @@ use std::fs::{self, OpenOptions}; use std::io::Write; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; +use std::sync::{mpsc, Arc}; use chrono::{DateTime, Local}; use whisper_rs::{FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters}; @@ -67,24 +67,56 @@ fn main() -> Result<(), Box> { r.store(false, Ordering::SeqCst); })?; - let ctx = WhisperContext::new_with_params(&model_path, WhisperContextParameters::default()) - .expect("failed to load model"); - let mut state = ctx.create_state().expect("failed to create state"); - - let mut out = OpenOptions::new().create(true).append(true).open(&output_path)?; - let mut counter: u32 = 0; - eprintln!("Transcribing {} → {} (clips → {}) (Ctrl+C to stop)", audio_arg, output_path, CLIP_DIR); if let Some(source) = audio_arg.strip_prefix("pulse:") { - let mut stream = audio::LiveStream::open(source)?; - while running.load(Ordering::SeqCst) { - match stream.next_chunk(CHUNK_SECS)? { - Some(chunk) => transcribe_chunk(&mut state, &chunk, &mut out, &mut counter)?, + eprintln!("Listening for speech (silence threshold: {:.3} RMS) …", audio::SILENCE_THRESHOLD); + + let (tx, rx) = mpsc::channel::>(); + + // transcription run goes to a background thread so capture is never blocked. + let model_path_t = model_path.clone(); + let output_path_t = output_path.clone(); + let transcription_thread = std::thread::spawn(move || { + let ctx = WhisperContext::new_with_params(&model_path_t, WhisperContextParameters::default()) + .expect("failed to load model"); + let mut state = ctx.create_state().expect("failed to create state"); + let mut out = OpenOptions::new().create(true).append(true).open(&output_path_t) + .expect("failed to open output file"); + let mut counter: u32 = 0; + for segment in rx { + let secs = segment.len() as f32 / 16000.0; + eprintln!(" [transcribing {:.1}s segment…]", secs); + if let Err(e) = transcribe_chunk(&mut state, &segment, &mut out, &mut counter) { + eprintln!("Transcription error: {e}"); + } + } + }); + + // capture loop.. never pauses for transcription. + let mut stream = audio::VadStream::open(source, running.clone())?; + loop { + match stream.next_segment()? { + Some(segment) => { + let secs = segment.len() as f32 / 16000.0; + eprintln!(" [captured {:.1}s, queued for transcription]", secs); + if tx.send(segment).is_err() { + break; // transcription thread died + } + } None => break, } } + + drop(tx); // closing the channel signals the transcription thread to finish + transcription_thread.join().expect("transcription thread panicked"); } else { + let ctx = WhisperContext::new_with_params(&model_path, WhisperContextParameters::default()) + .expect("failed to load model"); + let mut state = ctx.create_state().expect("failed to create state"); + let mut out = OpenOptions::new().create(true).append(true).open(&output_path)?; + let mut counter: u32 = 0; + while running.load(Ordering::SeqCst) { let audio = audio::decode(&audio_arg)?; for chunk in audio.chunks(CHUNK_SAMPLES) {