listener: implement gRPC for outgoing transcriptions and clips

This commit is contained in:
2026-05-25 01:53:30 +00:00
parent 8a190496ca
commit 97e96739ac
5 changed files with 1020 additions and 36 deletions
+936 -1
View File
File diff suppressed because it is too large Load Diff
+7
View File
@@ -7,6 +7,13 @@ edition = "2024"
chrono = "0.4.44" chrono = "0.4.44"
ctrlc = "3.5.2" ctrlc = "3.5.2"
hound = "3.5.1" hound = "3.5.1"
prost = "0.14.3"
serde = { version = "1.0.228", features = ["derive"] } serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
toml = "1.1.2" toml = "1.1.2"
tonic = "0.14.6"
tonic-prost = "0.14.6"
whisper-rs = "0.16.0" whisper-rs = "0.16.0"
[build-dependencies]
tonic-prost-build = "0.14.6"
+6
View File
@@ -0,0 +1,6 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_prost_build::configure()
.build_server(false)
.compile_protos(&["../server/proto/message.proto"], &["../server/proto"])?;
Ok(())
}
+7 -4
View File
@@ -7,11 +7,14 @@ model = "models/ggml-base.en.bin"
# audio = "pulse:default" # audio = "pulse:default"
audio = "pulse:default" audio = "pulse:default"
# Where to write the transcript # gRPC server address
output = "transcription.txt" server_addr = "http://localhost:3001"
# Directory to store per-segment WAV clips # Channel UUID to post transcriptions to (required by the server)
clip_dir = "./audio_clips" channel_id = "00000000-0000-0000-0000-000000000000"
# Directory for temporary WAV files (deleted after upload); defaults to /tmp
# temp_dir = "/tmp"
# File-mode only: how many seconds of audio to transcribe per chunk # File-mode only: how many seconds of audio to transcribe per chunk
chunk_secs = 30 chunk_secs = 30
+64 -31
View File
@@ -1,5 +1,4 @@
use std::fs::{self, OpenOptions}; use std::fs;
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{mpsc, Arc}; use std::sync::{mpsc, Arc};
use chrono::{DateTime, Local}; use chrono::{DateTime, Local};
@@ -8,14 +7,19 @@ use whisper_rs::{FullParams, SamplingStrategy, WhisperContext, WhisperContextPar
mod audio; mod audio;
pub mod scannerbot {
tonic::include_proto!("scannerbot");
}
use scannerbot::message_service_client::MessageServiceClient;
use scannerbot::SendMessageRequest;
#[derive(Deserialize)] #[derive(Deserialize)]
struct Config { struct Config {
model: String, model: String,
audio: String, audio: String,
#[serde(default = "default_output")] server_addr: String,
output: String, channel_id: String,
#[serde(default = "default_clip_dir")]
clip_dir: String,
#[serde(default = "default_chunk_secs")] #[serde(default = "default_chunk_secs")]
chunk_secs: u32, chunk_secs: u32,
#[serde(default = "default_silence_threshold")] #[serde(default = "default_silence_threshold")]
@@ -26,15 +30,16 @@ struct Config {
hangover_secs: f32, hangover_secs: f32,
#[serde(default = "default_max_segment_secs")] #[serde(default = "default_max_segment_secs")]
max_segment_secs: f32, max_segment_secs: f32,
#[serde(default = "default_temp_dir")]
temp_dir: String,
} }
fn default_output() -> String { "transcription.txt".into() }
fn default_clip_dir() -> String { "./audio_clips".into() }
fn default_chunk_secs() -> u32 { 30 } fn default_chunk_secs() -> u32 { 30 }
fn default_silence_threshold() -> f32 { 0.02 } fn default_silence_threshold() -> f32 { 0.02 }
fn default_pre_buffer_secs() -> f32 { 1.0 } fn default_pre_buffer_secs() -> f32 { 1.0 }
fn default_hangover_secs() -> f32 { 2.0 } fn default_hangover_secs() -> f32 { 2.0 }
fn default_max_segment_secs() -> f32 { 120.0 } fn default_max_segment_secs() -> f32 { 120.0 }
fn default_temp_dir() -> String { "/tmp".into() }
fn save_clip(samples: &[f32], path: &str) -> Result<(), Box<dyn std::error::Error>> { fn save_clip(samples: &[f32], path: &str) -> Result<(), Box<dyn std::error::Error>> {
let spec = hound::WavSpec { let spec = hound::WavSpec {
@@ -54,29 +59,54 @@ fn save_clip(samples: &[f32], path: &str) -> Result<(), Box<dyn std::error::Erro
fn transcribe_chunk( fn transcribe_chunk(
state: &mut whisper_rs::WhisperState, state: &mut whisper_rs::WhisperState,
chunk: &[f32], chunk: &[f32],
out: &mut impl Write,
counter: &mut u32, counter: &mut u32,
clip_dir: &str, channel_id: &str,
server_addr: &str,
temp_dir: &str,
rt: &tokio::runtime::Runtime,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let time: DateTime<Local> = Local::now(); let time: DateTime<Local> = Local::now();
let id = format!("{}_{:04}", time.format("%Y%m%d_%H%M%S"), counter); let id = format!("{}_{:04}", time.format("%Y%m%d_%H%M%S"), counter);
*counter += 1; *counter += 1;
let clip_path = format!("{}/{}.wav", clip_dir, id); let clip_path = std::path::Path::new(temp_dir).join(format!("scannerbot_{}.wav", id));
save_clip(chunk, &clip_path)?; save_clip(chunk, clip_path.to_str().unwrap())?;
let params = FullParams::new(SamplingStrategy::BeamSearch { let params = FullParams::new(SamplingStrategy::BeamSearch {
beam_size: 5, beam_size: 5,
patience: -1.0, patience: -1.0,
}); });
out.write_all(format!("[{}] [{}]: ", time, id).as_bytes())?;
state.full(params, chunk)?; state.full(params, chunk)?;
let mut content = String::new();
for segment in state.as_iter() { for segment in state.as_iter() {
let line = format!("{}\n", segment); let line = format!("{}\n", segment);
print!("{}", line); print!("{}", line);
out.write_all(line.as_bytes())?; content.push_str(&line);
} }
out.flush()?;
let audio_bytes = fs::read(&clip_path)?;
let audio_filename = format!("scannerbot_{}.wav", id);
let _ = fs::remove_file(&clip_path);
let channel_id = channel_id.to_string();
let server_addr = server_addr.to_string();
let result = rt.block_on(async move {
let mut client = MessageServiceClient::connect(server_addr).await?;
let request = tonic::Request::new(SendMessageRequest {
channel_id,
content,
audio_filename,
audio: audio_bytes,
});
let response = client.send_message(request).await?;
eprintln!(" [message sent: {}]", response.into_inner().id);
Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
});
if let Err(e) = result {
return Err(e.to_string().into());
}
Ok(()) Ok(())
} }
@@ -89,8 +119,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let model_path = cfg.model.clone(); let model_path = cfg.model.clone();
let audio_arg = cfg.audio.clone(); let audio_arg = cfg.audio.clone();
let output_path = cfg.output.clone(); let channel_id = cfg.channel_id.clone();
let clip_dir = cfg.clip_dir.clone(); let server_addr = cfg.server_addr.clone();
let temp_dir = cfg.temp_dir.clone();
let chunk_samples: usize = 16000 * cfg.chunk_secs as usize; let chunk_samples: usize = 16000 * cfg.chunk_secs as usize;
let vad_cfg = audio::VadConfig { let vad_cfg = audio::VadConfig {
@@ -100,42 +131,42 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
max_segment_samples: (cfg.max_segment_secs * 16000.0) as usize, max_segment_samples: (cfg.max_segment_secs * 16000.0) as usize,
}; };
fs::create_dir_all(&clip_dir)?;
let running = Arc::new(AtomicBool::new(true)); let running = Arc::new(AtomicBool::new(true));
let r = running.clone(); let r = running.clone();
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst); r.store(false, Ordering::SeqCst);
})?; })?;
eprintln!("Transcribing {}{} (clips → {}) (Ctrl+C to stop)", audio_arg, output_path, clip_dir); eprintln!("Transcribing {}{} (Ctrl+C to stop)", audio_arg, server_addr);
if let Some(source) = audio_arg.strip_prefix("pulse:") { if let Some(source) = audio_arg.strip_prefix("pulse:") {
eprintln!("Listening for speech (silence threshold: {:.3} RMS) …", cfg.silence_threshold); eprintln!("Listening for speech (silence threshold: {:.3} RMS) …", cfg.silence_threshold);
let (tx, rx) = mpsc::channel::<Vec<f32>>(); let (tx, rx) = mpsc::channel::<Vec<f32>>();
// transcription run goes to a background thread so capture is never blocked.
let model_path_t = model_path.clone(); let model_path_t = model_path.clone();
let output_path_t = output_path.clone(); let channel_id_t = channel_id.clone();
let clip_dir_t = clip_dir.clone(); let server_addr_t = server_addr.clone();
let temp_dir_t = temp_dir.clone();
let transcription_thread = std::thread::spawn(move || { let transcription_thread = std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build tokio runtime");
let ctx = WhisperContext::new_with_params(&model_path_t, WhisperContextParameters::default()) let ctx = WhisperContext::new_with_params(&model_path_t, WhisperContextParameters::default())
.expect("failed to load model"); .expect("failed to load model");
let mut state = ctx.create_state().expect("failed to create state"); let mut state = ctx.create_state().expect("failed to create state");
let mut out = OpenOptions::new().create(true).append(true).open(&output_path_t)
.expect("failed to open output file");
let mut counter: u32 = 0; let mut counter: u32 = 0;
for segment in rx { for segment in rx {
let secs = segment.len() as f32 / 16000.0; let secs = segment.len() as f32 / 16000.0;
eprintln!(" [transcribing {:.1}s segment…]", secs); eprintln!(" [transcribing {:.1}s segment…]", secs);
if let Err(e) = transcribe_chunk(&mut state, &segment, &mut out, &mut counter, &clip_dir_t) { if let Err(e) = transcribe_chunk(&mut state, &segment, &mut counter, &channel_id_t, &server_addr_t, &temp_dir_t, &rt) {
eprintln!("Transcription error: {e}"); eprintln!("Transcription error: {e}");
} }
} }
}); });
// capture loop.. never pauses for transcription. // capture loop never pauses for transcription
let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg)?; let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg)?;
loop { loop {
match stream.next_segment()? { match stream.next_segment()? {
@@ -143,20 +174,22 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let secs = segment.len() as f32 / 16000.0; let secs = segment.len() as f32 / 16000.0;
eprintln!(" [captured {:.1}s, queued for transcription]", secs); eprintln!(" [captured {:.1}s, queued for transcription]", secs);
if tx.send(segment).is_err() { if tx.send(segment).is_err() {
break; // transcription thread died break;
} }
} }
None => break, None => break,
} }
} }
drop(tx); // closing the channel signals the transcription thread to finish drop(tx);
transcription_thread.join().expect("transcription thread panicked"); transcription_thread.join().expect("transcription thread panicked");
} else { } else {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()?;
let ctx = WhisperContext::new_with_params(&model_path, WhisperContextParameters::default()) let ctx = WhisperContext::new_with_params(&model_path, WhisperContextParameters::default())
.expect("failed to load model"); .expect("failed to load model");
let mut state = ctx.create_state().expect("failed to create state"); let mut state = ctx.create_state().expect("failed to create state");
let mut out = OpenOptions::new().create(true).append(true).open(&output_path)?;
let mut counter: u32 = 0; let mut counter: u32 = 0;
while running.load(Ordering::SeqCst) { while running.load(Ordering::SeqCst) {
@@ -165,7 +198,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
if !running.load(Ordering::SeqCst) { if !running.load(Ordering::SeqCst) {
break; break;
} }
transcribe_chunk(&mut state, chunk, &mut out, &mut counter, &clip_dir)?; transcribe_chunk(&mut state, chunk, &mut counter, &channel_id, &server_addr, &temp_dir, &rt)?;
} }
} }
} }