diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dca454d..b8b8555 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -22,7 +22,7 @@ jobs: - name: Install ALSA and Jack dependencies run: | - sudo apt-get update && sudo apt-get install -y libasound2-dev libjack-jackd2-dev + sudo apt-get update && sudo apt-get install -y libasound2-dev libjack-jackd2-dev libdbus-1-dev - name: Check out repository uses: actions/checkout@v4 diff --git a/Cargo.toml b/Cargo.toml index bb75774..85ed3ee 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ version = "0.21.0" crate-type = ["cdylib"] [dependencies] +audio_thread_priority = "0.32.0" crossbeam-channel = "0.5.12" napi = { version="2.15", features=["napi9", "tokio_rt"] } napi-derive = { version="2.15" } diff --git a/examples/audio-worklet.mjs b/examples/audio-worklet.mjs index 552e490..324091b 100644 --- a/examples/audio-worklet.mjs +++ b/examples/audio-worklet.mjs @@ -42,13 +42,16 @@ const whiteNoise = new AudioWorkletNode(audioContext, 'white-noise'); whiteNoise.connect(audioContext.destination); if (TEST_ONLINE) { + var maxPeakLoad = 0.; audioContext.renderCapacity.addEventListener('update', e => { const { timestamp, averageLoad, peakLoad, underrunRatio } = e; console.log('AudioRenderCapacityEvent:', { timestamp, averageLoad, peakLoad, underrunRatio }); + maxPeakLoad = Math.max(maxPeakLoad, peakLoad); }); audioContext.renderCapacity.start({ updateInterval: 1. }); await sleep(8); + console.log('maxPeakLoad', maxPeakLoad); await audioContext.close(); } else { const buffer = await audioContext.startRendering(); diff --git a/src/audio_worklet_node.rs b/src/audio_worklet_node.rs index 8caa400..6b9c216 100644 --- a/src/audio_worklet_node.rs +++ b/src/audio_worklet_node.rs @@ -16,18 +16,22 @@ use std::cell::Cell; use std::collections::HashMap; use std::option::Option; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; -use std::sync::{Arc, Mutex, OnceLock, RwLock}; +use std::sync::{Arc, Condvar, Mutex, OnceLock, RwLock}; + +use std::time::Instant; /// Unique ID generator for AudioWorkletProcessors static INCREMENTING_ID: AtomicU32 = AtomicU32::new(0); /// Command issued from render thread to the Worker +#[derive(Debug)] enum WorkletCommand { Drop(u32), Process(ProcessorArguments), } /// Render thread to Worker processor arguments +#[derive(Debug)] struct ProcessorArguments { // processor unique ID id: u32, @@ -47,10 +51,30 @@ struct ProcessorArguments { /// Message channel from render thread to Worker struct ProcessCallChannel { - send: Sender, - recv: Receiver, + // queue of worklet commands + command_buffer: Mutex>, + // Condition Variable to wait/notify on new worklet commands + cond_var: Condvar, // mark that the worklet has been exited to prevent any further `process` call - exited: Arc, + exited: AtomicBool, +} + +impl ProcessCallChannel { + fn push(&self, command: WorkletCommand) { + let mut buffer = self.command_buffer.lock().unwrap(); + buffer.push(command); + self.cond_var.notify_one(); + } + + fn try_pop(&self) -> Option { + let mut buffer = self.command_buffer.lock().unwrap(); + + if buffer.is_empty() { + return None; + } + + Some(buffer.remove(0)) + } } /// Global map of ID -> ProcessCallChannel @@ -58,18 +82,20 @@ struct ProcessCallChannel { /// Every (Offline)AudioContext is assigned a new channel + ID. The ID is passed to the /// AudioWorklet Worker and to every AudioNode in the context so they can grab the channel and use /// message passing. -static GLOBAL_PROCESS_CALL_CHANNEL_MAP: RwLock> = RwLock::new(vec![]); +static GLOBAL_PROCESS_CALL_CHANNEL_MAP: RwLock>> = RwLock::new(vec![]); /// Request a new channel + ID for a newly created (Offline)AudioContext pub(crate) fn allocate_process_call_channel() -> usize { // Only one process message can be sent at same time from a given context, // but Drop messages could be send too, so let's take some room - let (send, recv) = crossbeam_channel::bounded(32); + let command_buffer = Mutex::new(Vec::with_capacity(32)); + let channel = ProcessCallChannel { - send, - recv, - exited: Arc::new(AtomicBool::new(false)), + command_buffer, + cond_var: Condvar::new(), + exited: AtomicBool::new(false), }; + let channel = Arc::new(channel); // We need a write-lock to initialize the channel let mut write_lock = GLOBAL_PROCESS_CALL_CHANNEL_MAP.write().unwrap(); @@ -80,27 +106,9 @@ pub(crate) fn allocate_process_call_channel() -> usize { } /// Obtain the WorkletCommand sender for this context ID -fn process_call_sender(id: usize) -> Sender { +fn process_call_channel(id: usize) -> Arc { // optimistically assume the channel exists and we can use a shared read-lock - GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id] - .send - .clone() -} - -/// Obtain the WorkletCommand receiver for this context ID -fn process_call_receiver(id: usize) -> Receiver { - // optimistically assume the channel exists and we can use a shared read-lock - GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id] - .recv - .clone() -} - -/// Obtain the WorkletCommand exited flag for this context ID -fn process_call_exited(id: usize) -> Arc { - // optimistically assume the channel exists and we can use a shared read-lock - GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id] - .exited - .clone() + Arc::clone(&GLOBAL_PROCESS_CALL_CHANNEL_MAP.read().unwrap()[id]) } /// Message channel inside the control thread to pass param descriptors of a given AudioWorkletNode @@ -377,13 +385,27 @@ fn process_audio_worklet(env: &Env, processors: &JsObject, args: ProcessorArgume Ok(()) } +static PREV_START: RwLock> = RwLock::new(None); + /// The entry point into Rust from the Worker #[js_function(2)] pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result { + let enter_start = Instant::now(); + let mut lock = PREV_START.write().unwrap(); + if let Some(prev) = *lock { + let micros = enter_start.duration_since(prev).as_micros(); + if micros > 200 { + println!("return to Rust after {} micros", micros); + } + } + // Set thread priority to highest, if not done already if !HAS_THREAD_PRIO.replace(true) { - // allowed to fail - let _ = thread_priority::set_current_thread_priority(thread_priority::ThreadPriority::Max); + let result = audio_thread_priority::promote_current_thread_to_real_time( + 128, 44100, // TODO get sample rate + ); + dbg!(&result); + result.ok(); // allowed to fail } // Obtain the unique worker ID @@ -394,8 +416,15 @@ pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result 3000 { + println!("got command after {} micros", micros); + } + + match cmd { WorkletCommand::Drop(id) => { let mut processors = ctx.get::(1)?; processors.delete_named_property(&id.to_string()).unwrap(); @@ -404,8 +433,17 @@ pub(crate) fn run_audio_worklet_global_scope(ctx: CallContext) -> Result 200 { + println!("handled command after {} micros", micros); + } + + prev = now; } + *lock = Some(Instant::now()); ctx.env.get_undefined() } @@ -414,9 +452,11 @@ pub(crate) fn exit_audio_worklet_global_scope(ctx: CallContext) -> Result(0)?.get_uint32()? as usize; // Flag message channel as exited to prevent any other render call - process_call_exited(worklet_id).store(true, Ordering::SeqCst); + process_call_channel(worklet_id) + .exited + .store(true, Ordering::SeqCst); // Handle any pending message from audio thread - if let Ok(WorkletCommand::Process(args)) = process_call_receiver(worklet_id).try_recv() { + if let Some(WorkletCommand::Process(args)) = process_call_channel(worklet_id).try_pop() { let _ = args.tail_time_sender.send(false); } @@ -614,8 +654,7 @@ fn constructor(ctx: CallContext) -> Result { let id = INCREMENTING_ID.fetch_add(1, Ordering::Relaxed); let processor_options = NapiAudioWorkletProcessor { id, - send: process_call_sender(worklet_id), - exited: process_call_exited(worklet_id), + command_channel: process_call_channel(worklet_id), tail_time_channel: crossbeam_channel::bounded(1), param_values: Vec::with_capacity(32), }; @@ -706,10 +745,8 @@ audio_node_impl!(NapiAudioWorkletNode); struct NapiAudioWorkletProcessor { /// Unique id to pair Napi Worklet and JS processor id: u32, - /// Sender to the JS Worklet - send: Sender, - /// Flag that marks the JS worklet as exited - exited: Arc, + /// Command channel to the JS Worklet + command_channel: Arc, /// tail_time result channel tail_time_channel: (Sender, Receiver), /// Reusable Vec for AudioParam values @@ -739,7 +776,7 @@ impl AudioWorkletProcessor for NapiAudioWorkletProcessor { scope: &'b AudioWorkletGlobalScope, ) -> bool { // Early return if audio thread is still closing while worklet has been exited - if self.exited.load(Ordering::SeqCst) { + if self.command_channel.exited.load(Ordering::SeqCst) { return false; } @@ -773,7 +810,7 @@ impl AudioWorkletProcessor for NapiAudioWorkletProcessor { }; // send command to Worker - self.send.send(WorkletCommand::Process(item)).unwrap(); + self.command_channel.push(WorkletCommand::Process(item)); // await result self.tail_time_channel.1.recv().unwrap() } @@ -781,8 +818,8 @@ impl AudioWorkletProcessor for NapiAudioWorkletProcessor { impl Drop for NapiAudioWorkletProcessor { fn drop(&mut self) { - if !self.exited.load(Ordering::SeqCst) { - self.send.send(WorkletCommand::Drop(self.id)).unwrap(); + if !self.command_channel.exited.load(Ordering::SeqCst) { + self.command_channel.push(WorkletCommand::Drop(self.id)); } } }