From ec9cafd70176f3f54598944cdc89299549b2eaf0 Mon Sep 17 00:00:00 2001 From: Henrik Enquist Date: Sat, 18 Jan 2025 21:40:44 +0100 Subject: [PATCH 1/6] WIP use crossbeam_channel everywhere --- src/alsadevice.rs | 38 +++++++++++++++++++++++--------------- src/audiodevice.rs | 7 +++---- src/bin.rs | 11 +++++------ src/config.rs | 2 +- src/cpaldevice.rs | 17 ++++++++--------- src/filedevice.rs | 17 ++++++++--------- src/generatordevice.rs | 13 ++++++------- src/processing.rs | 7 +++---- src/socketserver.rs | 3 +-- 9 files changed, 58 insertions(+), 57 deletions(-) diff --git a/src/alsadevice.rs b/src/alsadevice.rs index bf30dd1..b85784c 100644 --- a/src/alsadevice.rs +++ b/src/alsadevice.rs @@ -10,19 +10,19 @@ use alsa::pcm::{Access, Format, Frames, HwParams}; use alsa::poll::Descriptors; use alsa::{Direction, ValueOr, PCM}; use alsa_sys; +use audio_thread_priority::{ + demote_current_thread_from_real_time, promote_current_thread_to_real_time, +}; +use crossbeam_channel; use nix::errno::Errno; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; use std::ffi::CString; use std::fmt::Debug; -use std::sync::{mpsc, Arc, Barrier}; +use std::sync::{Arc, Barrier}; use std::thread; use std::time::Instant; -use audio_thread_priority::{ - demote_current_thread_from_real_time, promote_current_thread_to_real_time, -}; - use crate::alsadevice_buffermanager::{ CaptureBufferManager, DeviceBufferManager, PlaybackBufferManager, }; @@ -73,13 +73,13 @@ pub struct AlsaCaptureDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct PlaybackChannels { - audio: mpsc::Receiver, + audio: crossbeam_channel::Receiver, status: crossbeam_channel::Sender, } @@ -527,6 +527,12 @@ fn playback_loop_bytes( } else { None }; + let waiting_chunks_in_channel = channels.audio.len(); + trace!( + "Avail frames in buffer: {:?}, waiting chunks in channel: {}", + avail_at_chunk_recvd, + waiting_chunks_in_channel + ); //trace!("PB: Avail at chunk rcvd: {:?}", avail_at_chunk_recvd); conversion_result = @@ -604,7 +610,9 @@ fn playback_loop_bytes( } if let Some(avail) = avail_at_chunk_recvd { let delay = buf_manager.current_delay(avail); - buffer_avg.add_value(delay as f64); + buffer_avg.add_value( + delay as f64 + (params.chunksize * waiting_chunks_in_channel) as f64, + ); } if timer.larger_than_millis((1000.0 * params.adjust_period) as u64) { if let Some(avg_delay) = buffer_avg.average() { @@ -693,7 +701,7 @@ fn playback_loop_bytes( } } -fn drain_check_eos(audio: &mpsc::Receiver) -> Option { +fn drain_check_eos(audio: &crossbeam_channel::Receiver) -> Option { let mut eos: Option = None; while let Some(msg) = audio.try_iter().next() { if let AudioMessage::EndOfStream = msg { @@ -873,8 +881,8 @@ fn capture_loop_bytes( } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -1101,7 +1109,7 @@ fn nbr_capture_bytes_and_frames( impl PlaybackDevice for AlsaPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -1175,10 +1183,10 @@ impl PlaybackDevice for AlsaPlaybackDevice { impl CaptureDevice for AlsaCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, processing_params: Arc, ) -> Res>> { diff --git a/src/audiodevice.rs b/src/audiodevice.rs index da104ca..0e4e075 100644 --- a/src/audiodevice.rs +++ b/src/audiodevice.rs @@ -28,7 +28,6 @@ use rubato::{ }; use std::error; use std::fmt; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Instant; @@ -222,7 +221,7 @@ pub fn rms_and_peak(data: &[PrcFmt]) -> (PrcFmt, PrcFmt) { pub trait PlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -233,10 +232,10 @@ pub trait PlaybackDevice { pub trait CaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, processing_params: Arc, ) -> Res>>; diff --git a/src/bin.rs b/src/bin.rs index 976d9dc..f38a853 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -29,7 +29,6 @@ use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use std::env; use std::path::PathBuf; use std::sync::atomic::AtomicBool; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Duration; @@ -124,15 +123,15 @@ fn run( return Ok(ExitState::Exit); } }; - let (tx_pb, rx_pb) = mpsc::sync_channel(active_config.devices.queuelimit()); - let (tx_cap, rx_cap) = mpsc::sync_channel(active_config.devices.queuelimit()); + let (tx_pb, rx_pb) = crossbeam_channel::bounded(active_config.devices.queuelimit()); + let (tx_cap, rx_cap) = crossbeam_channel::bounded(active_config.devices.queuelimit()); let (tx_status, rx_status) = crossbeam_channel::unbounded(); let tx_status_pb = tx_status.clone(); let tx_status_cap = tx_status; - let (tx_command_cap, rx_command_cap) = mpsc::channel(); - let (tx_pipeconf, rx_pipeconf) = mpsc::channel(); + let (tx_command_cap, rx_command_cap) = crossbeam_channel::unbounded(); + let (tx_pipeconf, rx_pipeconf) = crossbeam_channel::unbounded(); let barrier = Arc::new(Barrier::new(4)); let barrier_pb = barrier.clone(); @@ -1060,7 +1059,7 @@ fn main_process() -> i32 { #[cfg(feature = "websocket")] { - let (tx_state, rx_state) = mpsc::sync_channel(1); + let (tx_state, rx_state) = crossbeam_channel::bounded(1); let processing_params_clone = processing_params.clone(); let active_config_path_clone = active_config_path.clone(); diff --git a/src/config.rs b/src/config.rs index ad33403..17d999c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1825,7 +1825,7 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res< } #[cfg(target_os = "linux")] let target_level_limit = if matches!(conf.devices.playback, PlaybackDevice::Alsa { .. }) { - 4 * conf.devices.chunksize + (4 + conf.devices.queuelimit()) * conf.devices.chunksize } else { 2 * conf.devices.chunksize }; diff --git a/src/cpaldevice.rs b/src/cpaldevice.rs index b86e933..c5b6971 100644 --- a/src/cpaldevice.rs +++ b/src/cpaldevice.rs @@ -14,7 +14,6 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time; @@ -201,7 +200,7 @@ where impl PlaybackDevice for CpalPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -235,7 +234,7 @@ impl PlaybackDevice for CpalPlaybackDevice { } let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1); - let (tx_dev, rx_dev) = mpsc::sync_channel(1); + let (tx_dev, rx_dev) = crossbeam_channel::bounded(1); let buffer_fill = Arc::new(AtomicUsize::new(0)); let buffer_fill_clone = buffer_fill.clone(); let mut buffer_avg = countertimer::Averager::new(); @@ -468,10 +467,10 @@ where impl CaptureDevice for CpalCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -506,8 +505,8 @@ impl CaptureDevice for CpalCaptureDevice { Err(_err) => {} } let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1); - let (tx_dev_i, rx_dev_i) = mpsc::sync_channel(1); - let (tx_dev_f, rx_dev_f) = mpsc::sync_channel(1); + let (tx_dev_i, rx_dev_i) = crossbeam_channel::bounded(1); + let (tx_dev_f, rx_dev_f) = crossbeam_channel::bounded(1); let stream = match sample_format { SampleFormat::S16LE => { trace!("Build i16 input stream"); @@ -593,8 +592,8 @@ impl CaptureDevice for CpalCaptureDevice { } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } diff --git a/src/filedevice.rs b/src/filedevice.rs index fffa0eb..b58bd7b 100644 --- a/src/filedevice.rs +++ b/src/filedevice.rs @@ -11,7 +11,6 @@ use std::fs::OpenOptions; use std::io::{stdin, stdout, Write}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Duration; @@ -74,9 +73,9 @@ pub struct FileCaptureDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct CaptureParams { @@ -111,7 +110,7 @@ pub trait Reader { impl PlaybackDevice for FilePlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -321,8 +320,8 @@ fn capture_loop( } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -541,10 +540,10 @@ fn capture_loop( impl CaptureDevice for FileCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -685,7 +684,7 @@ fn send_silence( samples: usize, channels: usize, chunksize: usize, - audio_channel: &mpsc::SyncSender, + audio_channel: &crossbeam_channel::Sender, resampler: &mut Option>>, ) { let mut samples_left = samples; diff --git a/src/generatordevice.rs b/src/generatordevice.rs index a2b3cd9..542a2b4 100644 --- a/src/generatordevice.rs +++ b/src/generatordevice.rs @@ -2,7 +2,6 @@ use crate::audiodevice::*; use crate::config; use std::f64::consts::PI; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; @@ -101,9 +100,9 @@ pub struct GeneratorDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct GeneratorParams { @@ -158,8 +157,8 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) { Ok(CommandMessage::SetSpeed { .. }) => { warn!("Signal generator does not support rate adjust. Ignoring request."); } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -199,10 +198,10 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) { impl CaptureDevice for GeneratorDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { diff --git a/src/processing.rs b/src/processing.rs index 9d5a1e9..99e56fb 100644 --- a/src/processing.rs +++ b/src/processing.rs @@ -5,16 +5,15 @@ use crate::ProcessingParameters; use audio_thread_priority::{ demote_current_thread_from_real_time, promote_current_thread_to_real_time, }; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; pub fn run_processing( conf_proc: config::Configuration, barrier_proc: Arc, - tx_pb: mpsc::SyncSender, - rx_cap: mpsc::Receiver, - rx_pipeconf: mpsc::Receiver<(config::ConfigChange, config::Configuration)>, + tx_pb: crossbeam_channel::Sender, + rx_cap: crossbeam_channel::Receiver, + rx_pipeconf: crossbeam_channel::Receiver<(config::ConfigChange, config::Configuration)>, processing_params: Arc, ) -> thread::JoinHandle<()> { thread::spawn(move || { diff --git a/src/socketserver.rs b/src/socketserver.rs index 0db3419..2c47cb5 100644 --- a/src/socketserver.rs +++ b/src/socketserver.rs @@ -10,7 +10,6 @@ use std::fs::File; use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -37,7 +36,7 @@ pub struct SharedData { pub playback_status: Arc>, pub processing_params: Arc, pub processing_status: Arc>, - pub state_change_notify: mpsc::SyncSender<()>, + pub state_change_notify: crossbeam_channel::Sender<()>, pub state_file_path: Option, pub unsaved_state_change: Arc, } From 57d53afc429cd09249b0c3b8f941c71710c20fcc Mon Sep 17 00:00:00 2001 From: Henrik Enquist Date: Sat, 18 Jan 2025 22:15:11 +0100 Subject: [PATCH 2/6] Replace mpsc channel in Pulse device --- src/pulsedevice.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/pulsedevice.rs b/src/pulsedevice.rs index ab06f80..6f0dc5c 100644 --- a/src/pulsedevice.rs +++ b/src/pulsedevice.rs @@ -10,7 +10,6 @@ use crate::conversions::{buffer_to_chunk_rawbytes, chunk_to_buffer_rawbytes}; use crate::countertimer; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::{Duration, Instant}; @@ -132,7 +131,7 @@ fn open_pulse( impl PlaybackDevice for PulsePlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -257,10 +256,10 @@ fn nbr_capture_bytes( impl CaptureDevice for PulseCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -341,8 +340,8 @@ impl CaptureDevice for PulseCaptureDevice { } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } From b4a7e4d68fd9a9c480d1cd32556980d7bf123bd9 Mon Sep 17 00:00:00 2001 From: Henrik Enquist Date: Sat, 18 Jan 2025 22:26:13 +0100 Subject: [PATCH 3/6] Add waiting frames to cpal backend buffer calc --- src/cpaldevice.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cpaldevice.rs b/src/cpaldevice.rs index c5b6971..bbb06a3 100644 --- a/src/cpaldevice.rs +++ b/src/cpaldevice.rs @@ -381,7 +381,7 @@ impl PlaybackDevice for CpalPlaybackDevice { playback_status.signal_peak.add_record(chunk_stats.peak_linear()); } buffer_avg.add_value( - (buffer_fill.load(Ordering::Relaxed) / channels_clone) + (buffer_fill.load(Ordering::Relaxed) / channels_clone + channel.len() * chunksize_clone) as f64, ); if adjust From 0ac962fd97a22c613106ed73b7ba430d9a95eb20 Mon Sep 17 00:00:00 2001 From: HEnquist Date: Mon, 20 Jan 2025 21:40:15 +0100 Subject: [PATCH 4/6] Crossbeam in wasapi backend --- src/wasapidevice.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/wasapidevice.rs b/src/wasapidevice.rs index e27387a..e89ef75 100644 --- a/src/wasapidevice.rs +++ b/src/wasapidevice.rs @@ -10,7 +10,6 @@ use rubato::VecResampler; use std::collections::VecDeque; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::time::Duration; @@ -619,7 +618,7 @@ fn capture_loop( impl PlaybackDevice for WasapiPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -773,7 +772,8 @@ impl PlaybackDevice for WasapiPlaybackDevice { 0u8; channels * chunk.frames * sample_format.bytes_per_sample() ]; - buffer_avg.add_value(buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default()); + let buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); + buffer_avg.add_value(buffer_fill + (channel.len() * chunksize) as f64); if adjust && timer.larger_than_millis((1000.0 * adjust_period) as u64) { if let Some(av_delay) = buffer_avg.average() { @@ -930,10 +930,10 @@ fn nbr_capture_frames( impl CaptureDevice for WasapiCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -1083,8 +1083,8 @@ impl CaptureDevice for WasapiCaptureDevice { } } }, - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } From 2c0738ace4e0514e2f652c0730154399b7b8248a Mon Sep 17 00:00:00 2001 From: HEnquist Date: Mon, 20 Jan 2025 22:33:32 +0100 Subject: [PATCH 5/6] Allow larger target level --- src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/config.rs b/src/config.rs index 17d999c..b83acad 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1827,10 +1827,10 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res< let target_level_limit = if matches!(conf.devices.playback, PlaybackDevice::Alsa { .. }) { (4 + conf.devices.queuelimit()) * conf.devices.chunksize } else { - 2 * conf.devices.chunksize + (2 + conf.devices.queuelimit()) * conf.devices.chunksize }; #[cfg(not(target_os = "linux"))] - let target_level_limit = 2 * conf.devices.chunksize; + let target_level_limit = (2 + conf.devices.queuelimit()) * conf.devices.chunksize; if conf.devices.target_level() > target_level_limit { let msg = format!("target_level cannot be larger than {}", target_level_limit); From 6f61455bd39361a3284c4d761088c0ab91036bef Mon Sep 17 00:00:00 2001 From: Henrik Date: Wed, 22 Jan 2025 07:52:15 +0100 Subject: [PATCH 6/6] Crossbeam in coreaudio --- src/coreaudiodevice.rs | 13 +++++++------ src/wasapidevice.rs | 4 ++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/coreaudiodevice.rs b/src/coreaudiodevice.rs index d04daf0..5839ba6 100644 --- a/src/coreaudiodevice.rs +++ b/src/coreaudiodevice.rs @@ -370,7 +370,7 @@ enum PlaybackDeviceMessage { impl PlaybackDevice for CoreaudioPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -533,7 +533,8 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } match channel.recv() { Ok(AudioMessage::Audio(chunk)) => { - buffer_avg.add_value(buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default()); + let estimated_buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); + buffer_avg.add_value(estimated_buffer_fill + (channel.len() * chunksize) as f64); if adjust && timer.larger_than_millis((1000.0 * adjust_period) as u64) { if let Some(av_delay) = buffer_avg.average() { let speed = rate_controller.next(av_delay); @@ -650,10 +651,10 @@ fn nbr_capture_frames( impl CaptureDevice for CoreaudioCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -856,8 +857,8 @@ impl CaptureDevice for CoreaudioCaptureDevice { } } }, - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } diff --git a/src/wasapidevice.rs b/src/wasapidevice.rs index e89ef75..a3849b2 100644 --- a/src/wasapidevice.rs +++ b/src/wasapidevice.rs @@ -772,8 +772,8 @@ impl PlaybackDevice for WasapiPlaybackDevice { 0u8; channels * chunk.frames * sample_format.bytes_per_sample() ]; - let buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); - buffer_avg.add_value(buffer_fill + (channel.len() * chunksize) as f64); + let estimated_buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); + buffer_avg.add_value(estimated_buffer_fill + (channel.len() * chunksize) as f64); if adjust && timer.larger_than_millis((1000.0 * adjust_period) as u64) { if let Some(av_delay) = buffer_avg.average() {