listener: add live HTTP audio stream feature

This commit is contained in:
2026-05-25 22:20:22 +00:00
parent 97e96739ac
commit 1b84fd3128
6 changed files with 161 additions and 8 deletions
+52
View File
@@ -57,10 +57,13 @@ checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90"
dependencies = [
"axum-core",
"bytes",
"form_urlencoded",
"futures-util",
"http",
"http-body",
"http-body-util",
"hyper",
"hyper-util",
"itoa",
"matchit",
"memchr",
@@ -68,10 +71,15 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"serde_core",
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -90,6 +98,7 @@ dependencies = [
"sync_wrapper",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
@@ -290,6 +299,15 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
[[package]]
name = "form_urlencoded"
version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf"
dependencies = [
"percent-encoding",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
@@ -941,16 +959,25 @@ version = "1.0.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b39cdef0fa800fc44525c84ccb54a029961a8215f9619753635a9c0d2538d46d"
[[package]]
name = "ryu"
version = "1.0.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9774ba4a74de5f7b1c1451ed6cd5285a32eddb5cccb8cc655a4e50009e06477f"
[[package]]
name = "scannerbot-listener"
version = "0.1.0"
dependencies = [
"axum",
"bytes",
"chrono",
"ctrlc",
"hound",
"prost",
"serde",
"tokio",
"tokio-stream",
"toml",
"tonic",
"tonic-prost",
@@ -1013,6 +1040,17 @@ dependencies = [
"zmij",
]
[[package]]
name = "serde_path_to_error"
version = "0.1.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457"
dependencies = [
"itoa",
"serde",
"serde_core",
]
[[package]]
name = "serde_spanned"
version = "1.1.1"
@@ -1022,6 +1060,18 @@ dependencies = [
"serde_core",
]
[[package]]
name = "serde_urlencoded"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd"
dependencies = [
"form_urlencoded",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "shlex"
version = "1.3.0"
@@ -1127,6 +1177,7 @@ dependencies = [
"futures-core",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
@@ -1286,6 +1337,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
+3
View File
@@ -4,12 +4,15 @@ version = "0.1.0"
edition = "2024"
[dependencies]
axum = "0.8"
bytes = "1"
chrono = "0.4.44"
ctrlc = "3.5.2"
hound = "3.5.1"
prost = "0.14.3"
serde = { version = "1.0.228", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["sync"] }
toml = "1.1.2"
tonic = "0.14.6"
tonic-prost = "0.14.6"
+3
View File
@@ -24,3 +24,6 @@ silence_threshold = 0.02 # RMS energy cutoff; raise for noisy environments
pre_buffer_secs = 1.0 # seconds of audio kept before speech onset
hangover_secs = 2.0 # trailing silence before a segment is closed
max_segment_secs = 120.0 # hard cap per segment
# HTTP audio stream port; GET /audio streams a live WAV
# http_port = 8080
+14 -6
View File
@@ -3,6 +3,8 @@ use std::io::Read;
use std::process::{Child, ChildStdout, Command, Stdio};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use bytes::Bytes;
use tokio::sync::broadcast;
const FRAME_SAMPLES: usize = 1600; // 100 ms at 16 kHz
@@ -30,10 +32,11 @@ pub fn decode(input: &str) -> Result<Vec<f32>, Box<dyn std::error::Error>> {
pub struct LiveStream {
child: Child,
stdout: ChildStdout,
broadcast_tx: Option<broadcast::Sender<Bytes>>,
}
impl LiveStream {
pub fn open(source: &str) -> Result<Self, Box<dyn std::error::Error>> {
pub fn open(source: &str, broadcast_tx: Option<broadcast::Sender<Bytes>>) -> Result<Self, Box<dyn std::error::Error>> {
let mut child = Command::new("ffmpeg")
.args(["-f", "pulse", "-i", source, "-ar", "16000", "-ac", "1", "-f", "f32le", "pipe:1"])
.env("PULSE_PROP_application.name", "scannerbot-listener")
@@ -42,15 +45,20 @@ impl LiveStream {
.spawn()?;
let stdout = child.stdout.take().unwrap();
Ok(LiveStream { child, stdout })
Ok(LiveStream { child, stdout, broadcast_tx })
}
fn next_frame(&mut self) -> Result<Option<Vec<f32>>, Box<dyn std::error::Error>> {
let mut buf = vec![0u8; FRAME_SAMPLES * 4];
match self.stdout.read_exact(&mut buf) {
Ok(()) => Ok(Some(buf.chunks_exact(4)
Ok(()) => {
if let Some(tx) = &self.broadcast_tx {
let _ = tx.send(Bytes::copy_from_slice(&buf));
}
Ok(Some(buf.chunks_exact(4)
.map(|b| f32::from_le_bytes(b.try_into().unwrap()))
.collect())),
.collect()))
}
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(None),
Err(e) => Err(e.into()),
}
@@ -75,9 +83,9 @@ pub struct VadStream {
}
impl VadStream {
pub fn open(source: &str, running: Arc<AtomicBool>, cfg: VadConfig) -> Result<Self, Box<dyn std::error::Error>> {
pub fn open(source: &str, running: Arc<AtomicBool>, cfg: VadConfig, broadcast_tx: Option<broadcast::Sender<Bytes>>) -> Result<Self, Box<dyn std::error::Error>> {
Ok(VadStream {
inner: LiveStream::open(source)?,
inner: LiveStream::open(source, broadcast_tx)?,
pre_buffer: VecDeque::with_capacity(cfg.pre_buffer_frames + 1),
running,
cfg,
+72
View File
@@ -0,0 +1,72 @@
use axum::{Router, extract::State, response::Response, routing::get};
use axum::body::Body;
use bytes::Bytes;
use std::convert::Infallible;
use tokio::sync::broadcast;
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream};
use tokio_stream::StreamExt;
#[derive(Clone)]
struct AppState {
audio_tx: broadcast::Sender<Bytes>,
}
fn wav_header() -> Vec<u8> {
let mut h = Vec::with_capacity(44);
h.extend_from_slice(b"RIFF");
h.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); // unknown size
h.extend_from_slice(b"WAVE");
h.extend_from_slice(b"fmt ");
h.extend_from_slice(&16u32.to_le_bytes());
h.extend_from_slice(&3u16.to_le_bytes()); // IEEE_FLOAT
h.extend_from_slice(&1u16.to_le_bytes()); // mono
h.extend_from_slice(&16000u32.to_le_bytes()); // sample rate
h.extend_from_slice(&64000u32.to_le_bytes()); // byte rate = 16000 * 4
h.extend_from_slice(&4u16.to_le_bytes()); // block align
h.extend_from_slice(&32u16.to_le_bytes()); // bits per sample
h.extend_from_slice(b"data");
h.extend_from_slice(&0xFFFF_FFFFu32.to_le_bytes()); // unknown size
h
}
async fn audio_stream(State(state): State<AppState>) -> Response {
let rx = state.audio_tx.subscribe();
let (body_tx, body_rx) = tokio::sync::mpsc::channel::<Result<Bytes, Infallible>>(64);
tokio::spawn(async move {
if body_tx.send(Ok(Bytes::from(wav_header()))).await.is_err() {
return;
}
let mut stream = BroadcastStream::new(rx);
loop {
match stream.next().await {
Some(Ok(frame)) => {
if body_tx.send(Ok(frame)).await.is_err() {
break;
}
}
Some(Err(_)) => {} // lagged receiver, skip
None => break,
}
}
});
Response::builder()
.header("Content-Type", "audio/wav")
.header("Cache-Control", "no-cache")
.body(Body::from_stream(ReceiverStream::new(body_rx)))
.unwrap()
}
pub async fn run(port: u16, audio_tx: broadcast::Sender<Bytes>) {
let state = AppState { audio_tx };
let app = Router::new()
.route("/audio", get(audio_stream))
.with_state(state);
let addr = format!("0.0.0.0:{port}");
let listener = tokio::net::TcpListener::bind(&addr).await
.unwrap_or_else(|e| panic!("failed to bind HTTP server to {addr}: {e}"));
eprintln!("HTTP audio stream: http://{addr}/audio");
axum::serve(listener, app).await.unwrap();
}
+16 -1
View File
@@ -6,6 +6,7 @@ use serde::Deserialize;
use whisper_rs::{FullParams, SamplingStrategy, WhisperContext, WhisperContextParameters};
mod audio;
mod http;
pub mod scannerbot {
tonic::include_proto!("scannerbot");
@@ -32,6 +33,8 @@ struct Config {
max_segment_secs: f32,
#[serde(default = "default_temp_dir")]
temp_dir: String,
#[serde(default = "default_http_port")]
http_port: u16,
}
fn default_chunk_secs() -> u32 { 30 }
@@ -40,6 +43,7 @@ fn default_pre_buffer_secs() -> f32 { 1.0 }
fn default_hangover_secs() -> f32 { 2.0 }
fn default_max_segment_secs() -> f32 { 120.0 }
fn default_temp_dir() -> String { "/tmp".into() }
fn default_http_port() -> u16 { 8080 }
fn save_clip(samples: &[f32], path: &str) -> Result<(), Box<dyn std::error::Error>> {
let spec = hound::WavSpec {
@@ -139,6 +143,17 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
eprintln!("Transcribing {}{} (Ctrl+C to stop)", audio_arg, server_addr);
let (audio_tx, _) = tokio::sync::broadcast::channel::<bytes::Bytes>(128);
let http_audio_tx = audio_tx.clone();
let http_port = cfg.http_port;
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("failed to build HTTP runtime");
rt.block_on(http::run(http_port, http_audio_tx));
});
if let Some(source) = audio_arg.strip_prefix("pulse:") {
eprintln!("Listening for speech (silence threshold: {:.3} RMS) …", cfg.silence_threshold);
@@ -167,7 +182,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
});
// capture loop never pauses for transcription
let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg)?;
let mut stream = audio::VadStream::open(source, running.clone(), vad_cfg, Some(audio_tx))?;
loop {
match stream.next_segment()? {
Some(segment) => {