From 1b84fd312895f8dad9912c13e2606b91f6e55aec Mon Sep 17 00:00:00 2001 From: William P Date: Mon, 25 May 2026 22:20:22 +0000 Subject: [PATCH] listener: add live HTTP audio stream feature --- listener/Cargo.lock | 52 ++++++++++++++++++++++++++ listener/Cargo.toml | 3 ++ listener/config.toml.example | 3 ++ listener/src/audio.rs | 22 +++++++---- listener/src/http.rs | 72 ++++++++++++++++++++++++++++++++++++ listener/src/main.rs | 17 ++++++++- 6 files changed, 161 insertions(+), 8 deletions(-) create mode 100644 listener/src/http.rs diff --git a/listener/Cargo.lock b/listener/Cargo.lock index cf3b3b5..f8da7d2 100644 --- a/listener/Cargo.lock +++ b/listener/Cargo.lock @@ -57,10 +57,13 @@ checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" dependencies = [ "axum-core", "bytes", + "form_urlencoded", "futures-util", "http", "http-body", "http-body-util", + "hyper", + "hyper-util", "itoa", "matchit", "memchr", @@ -68,10 +71,15 @@ dependencies = [ "percent-encoding", "pin-project-lite", "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -90,6 +98,7 @@ dependencies = [ "sync_wrapper", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -290,6 +299,15 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "form_urlencoded" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" +dependencies = [ + "percent-encoding", +] + [[package]] name = "fs_extra" version = "1.3.0" @@ -941,16 +959,25 @@ version = "1.0.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d" +[[package]] +name = "ryu" +version = "1.0.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f" + [[package]] name = "scannerbot-listener" version = "0.1.0" dependencies = [ + "axum", + "bytes", "chrono", "ctrlc", "hound", "prost", "serde", "tokio", + "tokio-stream", "toml", "tonic", "tonic-prost", @@ -1013,6 +1040,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_spanned" version = "1.1.1" @@ -1022,6 +1060,18 @@ dependencies = [ "serde_core", ] +[[package]] +name = "serde_urlencoded" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" +dependencies = [ + "form_urlencoded", + "itoa", + "ryu", + "serde", +] + [[package]] name = "shlex" version = "1.3.0" @@ -1127,6 +1177,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] @@ -1286,6 +1337,7 @@ version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/listener/Cargo.toml b/listener/Cargo.toml index 9b2a5d5..0f1dbdf 100644 --- a/listener/Cargo.toml +++ b/listener/Cargo.toml @@ -4,12 +4,15 @@ version = "0.1.0" edition = "2024" [dependencies] +axum = "0.8" +bytes = "1" chrono = "0.4.44" ctrlc = "3.5.2" hound = "3.5.1" prost = "0.14.3" serde = { version = "1.0.228", features = ["derive"] } tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1", features = ["sync"] } toml = "1.1.2" tonic = "0.14.6" tonic-prost = "0.14.6" diff --git a/listener/config.toml.example b/listener/config.toml.example index 3e45735..fbb2e94 100644 --- a/listener/config.toml.example +++ b/listener/config.toml.example @@ -24,3 +24,6 @@ silence_threshold = 0.02 # RMS energy cutoff; raise for noisy environments pre_buffer_secs = 1.0 # seconds of audio kept before speech onset hangover_secs = 2.0 # trailing silence before a segment is closed max_segment_secs = 120.0 # hard cap per segment + +# HTTP audio stream port; GET /audio streams a live WAV +# http_port = 8080 diff --git a/listener/src/audio.rs b/listener/src/audio.rs index 7008221..1b6fe5c 100644 --- a/listener/src/audio.rs +++ b/listener/src/audio.rs @@ -3,6 +3,8 @@ use std::io::Read; use std::process::{Child, ChildStdout, Command, Stdio}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use bytes::Bytes; +use tokio::sync::broadcast; const FRAME_SAMPLES: usize = 1600; // 100 ms at 16 kHz @@ -30,10 +32,11 @@ pub fn decode(input: &str) -> Result, Box> { pub struct LiveStream { child: Child, stdout: ChildStdout, + broadcast_tx: Option>, } impl LiveStream { - pub fn open(source: &str) -> Result> { + pub fn open(source: &str, broadcast_tx: Option>) -> Result> { let mut child = Command::new("ffmpeg") .args(["-f", "pulse", "-i", source, "-ar", "16000", "-ac", "1", "-f", "f32le", "pipe:1"]) .env("PULSE_PROP_application.name", "scannerbot-listener") @@ -42,15 +45,20 @@ impl LiveStream { .spawn()?; let stdout = child.stdout.take().unwrap(); - Ok(LiveStream { child, stdout }) + Ok(LiveStream { child, stdout, broadcast_tx }) } fn next_frame(&mut self) -> Result>, Box> { let mut buf = vec![0u8; FRAME_SAMPLES * 4]; match self.stdout.read_exact(&mut buf) { - Ok(()) => Ok(Some(buf.chunks_exact(4) - .map(|b| f32::from_le_bytes(b.try_into().unwrap())) - .collect())), + Ok(()) => { + if let Some(tx) = &self.broadcast_tx { + let _ = tx.send(Bytes::copy_from_slice(&buf)); + } + Ok(Some(buf.chunks_exact(4) + .map(|b| f32::from_le_bytes(b.try_into().unwrap())) + .collect())) + } Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None), Err(e) => Err(e.into()), } @@ -75,9 +83,9 @@ pub struct VadStream { } impl VadStream { - pub fn open(source: &str, running: Arc, cfg: VadConfig) -> Result> { + pub fn open(source: &str, running: Arc, cfg: VadConfig, broadcast_tx: Option>) -> Result> { Ok(VadStream { - inner: LiveStream::open(source)?, + inner: LiveStream::open(source, broadcast_tx)?, pre_buffer: VecDeque::with_capacity(cfg.pre_buffer_frames + 1), running, cfg, diff --git a/listener/src/http.rs b/listener/src/http.rs new file mode 100644 index 0000000..80495d6 --- /dev/null +++ b/listener/src/http.rs @@ -0,0 +1,72 @@ +use axum::{Router, extract::State, response::Response, routing::get}; +use axum::body::Body; +use bytes::Bytes; +use std::convert::Infallible; +use tokio::sync::broadcast; +use tokio_stream::wrappers::{BroadcastStream, ReceiverStream}; +use tokio_stream::StreamExt; + +#[derive(Clone)] +struct AppState { + audio_tx: broadcast::Sender, +} + +fn wav_header() -> Vec { + let mut h = Vec::with_capacity(44); + h.extend_from_slice(b"RIFF"); + h.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); // unknown size + h.extend_from_slice(b"WAVE"); + h.extend_from_slice(b"fmt "); + h.extend_from_slice(&16u32.to_le_bytes()); + h.extend_from_slice(&3u16.to_le_bytes()); // IEEE_FLOAT + h.extend_from_slice(&1u16.to_le_bytes()); // mono + h.extend_from_slice(&16000u32.to_le_bytes()); // sample rate + h.extend_from_slice(&64000u32.to_le_bytes()); // byte rate = 16000 * 4 + h.extend_from_slice(&4u16.to_le_bytes()); // block align + h.extend_from_slice(&32u16.to_le_bytes()); // bits per sample + h.extend_from_slice(b"data"); + h.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); // unknown size + h +} + +async fn audio_stream(State(state): State) -> Response { + let rx = state.audio_tx.subscribe(); + let (body_tx, body_rx) = tokio::sync::mpsc::channel::>(64); + + tokio::spawn(async move { + if body_tx.send(Ok(Bytes::from(wav_header()))).await.is_err() { + return; + } + let mut stream = BroadcastStream::new(rx); + loop { + match stream.next().await { + Some(Ok(frame)) => { + if body_tx.send(Ok(frame)).await.is_err() { + break; + } + } + Some(Err(_)) => {} // lagged receiver, skip + None => break, + } + } + }); + + Response::builder() + .header("Content-Type", "audio/wav") + .header("Cache-Control", "no-cache") + .body(Body::from_stream(ReceiverStream::new(body_rx))) + .unwrap() +} + +pub async fn run(port: u16, audio_tx: broadcast::Sender) { + let state = AppState { audio_tx }; + let app = Router::new() + .route("/audio", get(audio_stream)) + .with_state(state); + + let addr = format!("0.0.0.0:{port}"); + let listener = tokio::net::TcpListener::bind(&addr).await + .unwrap_or_else(|e| panic!("failed to bind HTTP server to {addr}: {e}")); + eprintln!("HTTP audio stream: http://{addr}/audio"); + axum::serve(listener, app).await.unwrap(); +} diff --git a/listener/src/main.rs b/listener/src/main.rs index bc0e2f6..75dc519 100644 --- a/listener/src/main.rs +++ b/listener/src/main.rs @@ -6,6 +6,7 @@ use serde::Deserialize; use whisper_rs::{FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters}; mod audio; +mod http; pub mod scannerbot { tonic::include_proto!("scannerbot"); @@ -32,6 +33,8 @@ struct Config { max_segment_secs: f32, #[serde(default = "default_temp_dir")] temp_dir: String, + #[serde(default = "default_http_port")] + http_port: u16, } fn default_chunk_secs() -> u32 { 30 } @@ -40,6 +43,7 @@ fn default_pre_buffer_secs() -> f32 { 1.0 } fn default_hangover_secs() -> f32 { 2.0 } fn default_max_segment_secs() -> f32 { 120.0 } fn default_temp_dir() -> String { "/tmp".into() } +fn default_http_port() -> u16 { 8080 } fn save_clip(samples: &[f32], path: &str) -> Result<(), Box> { let spec = hound::WavSpec { @@ -139,6 +143,17 @@ fn main() -> Result<(), Box> { eprintln!("Transcribing {} → {} (Ctrl+C to stop)", audio_arg, server_addr); + let (audio_tx, _) = tokio::sync::broadcast::channel::(128); + let http_audio_tx = audio_tx.clone(); + let http_port = cfg.http_port; + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("failed to build HTTP runtime"); + rt.block_on(http::run(http_port, http_audio_tx)); + }); + if let Some(source) = audio_arg.strip_prefix("pulse:") { eprintln!("Listening for speech (silence threshold: {:.3} RMS) …", cfg.silence_threshold); @@ -167,7 +182,7 @@ fn main() -> Result<(), Box> { }); // capture loop never pauses for transcription - let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg)?; + let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg, Some(audio_tx))?; loop { match stream.next_segment()? { Some(segment) => {