Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP use crossbeam_channel everywhere #391

Draft
wants to merge 5 commits into
base: next31
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 23 additions & 15 deletions src/alsadevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ use alsa::pcm::{Access, Format, Frames, HwParams};
use alsa::poll::Descriptors;
use alsa::{Direction, ValueOr, PCM};
use alsa_sys;
use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};
use crossbeam_channel;
use nix::errno::Errno;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use rubato::VecResampler;
use std::ffi::CString;
use std::fmt::Debug;
use std::sync::{mpsc, Arc, Barrier};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;

use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};

use crate::alsadevice_buffermanager::{
CaptureBufferManager, DeviceBufferManager, PlaybackBufferManager,
};
Expand Down Expand Up @@ -73,13 +73,13 @@ pub struct AlsaCaptureDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct PlaybackChannels {
audio: mpsc::Receiver<AudioMessage>,
audio: crossbeam_channel::Receiver<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
}

Expand Down Expand Up @@ -527,6 +527,12 @@ fn playback_loop_bytes(
} else {
None
};
let waiting_chunks_in_channel = channels.audio.len();
trace!(
"Avail frames in buffer: {:?}, waiting chunks in channel: {}",
avail_at_chunk_recvd,
waiting_chunks_in_channel
);
//trace!("PB: Avail at chunk rcvd: {:?}", avail_at_chunk_recvd);

conversion_result =
Expand Down Expand Up @@ -604,7 +610,9 @@ fn playback_loop_bytes(
}
if let Some(avail) = avail_at_chunk_recvd {
let delay = buf_manager.current_delay(avail);
buffer_avg.add_value(delay as f64);
buffer_avg.add_value(
delay as f64 + (params.chunksize * waiting_chunks_in_channel) as f64,
);
}
if timer.larger_than_millis((1000.0 * params.adjust_period) as u64) {
if let Some(avg_delay) = buffer_avg.average() {
Expand Down Expand Up @@ -693,7 +701,7 @@ fn playback_loop_bytes(
}
}

fn drain_check_eos(audio: &mpsc::Receiver<AudioMessage>) -> Option<AudioMessage> {
fn drain_check_eos(audio: &crossbeam_channel::Receiver<AudioMessage>) -> Option<AudioMessage> {
let mut eos: Option<AudioMessage> = None;
while let Some(msg) = audio.try_iter().next() {
if let AudioMessage::EndOfStream = msg {
Expand Down Expand Up @@ -873,8 +881,8 @@ fn capture_loop_bytes(
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -1101,7 +1109,7 @@ fn nbr_capture_bytes_and_frames(
impl PlaybackDevice for AlsaPlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -1175,10 +1183,10 @@ impl PlaybackDevice for AlsaPlaybackDevice {
impl CaptureDevice for AlsaCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down
7 changes: 3 additions & 4 deletions src/audiodevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use rubato::{
};
use std::error;
use std::fmt;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
Expand Down Expand Up @@ -222,7 +221,7 @@ pub fn rms_and_peak(data: &[PrcFmt]) -> (PrcFmt, PrcFmt) {
pub trait PlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand All @@ -233,10 +232,10 @@ pub trait PlaybackDevice {
pub trait CaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>>;
Expand Down
11 changes: 5 additions & 6 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use std::env;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -124,15 +123,15 @@ fn run(
return Ok(ExitState::Exit);
}
};
let (tx_pb, rx_pb) = mpsc::sync_channel(active_config.devices.queuelimit());
let (tx_cap, rx_cap) = mpsc::sync_channel(active_config.devices.queuelimit());
let (tx_pb, rx_pb) = crossbeam_channel::bounded(active_config.devices.queuelimit());
let (tx_cap, rx_cap) = crossbeam_channel::bounded(active_config.devices.queuelimit());

let (tx_status, rx_status) = crossbeam_channel::unbounded();
let tx_status_pb = tx_status.clone();
let tx_status_cap = tx_status;

let (tx_command_cap, rx_command_cap) = mpsc::channel();
let (tx_pipeconf, rx_pipeconf) = mpsc::channel();
let (tx_command_cap, rx_command_cap) = crossbeam_channel::unbounded();
let (tx_pipeconf, rx_pipeconf) = crossbeam_channel::unbounded();

let barrier = Arc::new(Barrier::new(4));
let barrier_pb = barrier.clone();
Expand Down Expand Up @@ -1060,7 +1059,7 @@ fn main_process() -> i32 {

#[cfg(feature = "websocket")]
{
let (tx_state, rx_state) = mpsc::sync_channel(1);
let (tx_state, rx_state) = crossbeam_channel::bounded(1);

let processing_params_clone = processing_params.clone();
let active_config_path_clone = active_config_path.clone();
Expand Down
6 changes: 3 additions & 3 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1825,12 +1825,12 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res<
}
#[cfg(target_os = "linux")]
let target_level_limit = if matches!(conf.devices.playback, PlaybackDevice::Alsa { .. }) {
4 * conf.devices.chunksize
(4 + conf.devices.queuelimit()) * conf.devices.chunksize
} else {
2 * conf.devices.chunksize
(2 + conf.devices.queuelimit()) * conf.devices.chunksize
};
#[cfg(not(target_os = "linux"))]
let target_level_limit = 2 * conf.devices.chunksize;
let target_level_limit = (2 + conf.devices.queuelimit()) * conf.devices.chunksize;

if conf.devices.target_level() > target_level_limit {
let msg = format!("target_level cannot be larger than {}", target_level_limit);
Expand Down
19 changes: 9 additions & 10 deletions src/cpaldevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rubato::VecResampler;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time;
Expand Down Expand Up @@ -201,7 +200,7 @@ where
impl PlaybackDevice for CpalPlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -235,7 +234,7 @@ impl PlaybackDevice for CpalPlaybackDevice {
}
let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1);

let (tx_dev, rx_dev) = mpsc::sync_channel(1);
let (tx_dev, rx_dev) = crossbeam_channel::bounded(1);
let buffer_fill = Arc::new(AtomicUsize::new(0));
let buffer_fill_clone = buffer_fill.clone();
let mut buffer_avg = countertimer::Averager::new();
Expand Down Expand Up @@ -382,7 +381,7 @@ impl PlaybackDevice for CpalPlaybackDevice {
playback_status.signal_peak.add_record(chunk_stats.peak_linear());
}
buffer_avg.add_value(
(buffer_fill.load(Ordering::Relaxed) / channels_clone)
(buffer_fill.load(Ordering::Relaxed) / channels_clone + channel.len() * chunksize_clone)
as f64,
);
if adjust
Expand Down Expand Up @@ -468,10 +467,10 @@ where
impl CaptureDevice for CpalCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down Expand Up @@ -506,8 +505,8 @@ impl CaptureDevice for CpalCaptureDevice {
Err(_err) => {}
}
let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1);
let (tx_dev_i, rx_dev_i) = mpsc::sync_channel(1);
let (tx_dev_f, rx_dev_f) = mpsc::sync_channel(1);
let (tx_dev_i, rx_dev_i) = crossbeam_channel::bounded(1);
let (tx_dev_f, rx_dev_f) = crossbeam_channel::bounded(1);
let stream = match sample_format {
SampleFormat::S16LE => {
trace!("Build i16 input stream");
Expand Down Expand Up @@ -593,8 +592,8 @@ impl CaptureDevice for CpalCaptureDevice {
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down
17 changes: 8 additions & 9 deletions src/filedevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::fs::OpenOptions;
use std::io::{stdin, stdout, Write};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -74,9 +73,9 @@ pub struct FileCaptureDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct CaptureParams {
Expand Down Expand Up @@ -111,7 +110,7 @@ pub trait Reader {
impl PlaybackDevice for FilePlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -321,8 +320,8 @@ fn capture_loop(
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -541,10 +540,10 @@ fn capture_loop(
impl CaptureDevice for FileCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down Expand Up @@ -685,7 +684,7 @@ fn send_silence(
samples: usize,
channels: usize,
chunksize: usize,
audio_channel: &mpsc::SyncSender<AudioMessage>,
audio_channel: &crossbeam_channel::Sender<AudioMessage>,
resampler: &mut Option<Box<dyn VecResampler<PrcFmt>>>,
) {
let mut samples_left = samples;
Expand Down
13 changes: 6 additions & 7 deletions src/generatordevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::audiodevice::*;
use crate::config;

use std::f64::consts::PI;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;

Expand Down Expand Up @@ -101,9 +100,9 @@ pub struct GeneratorDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct GeneratorParams {
Expand Down Expand Up @@ -158,8 +157,8 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) {
Ok(CommandMessage::SetSpeed { .. }) => {
warn!("Signal generator does not support rate adjust. Ignoring request.");
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -199,10 +198,10 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) {
impl CaptureDevice for GeneratorDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down
7 changes: 3 additions & 4 deletions src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ use crate::ProcessingParameters;
use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;

pub fn run_processing(
conf_proc: config::Configuration,
barrier_proc: Arc<Barrier>,
tx_pb: mpsc::SyncSender<AudioMessage>,
rx_cap: mpsc::Receiver<AudioMessage>,
rx_pipeconf: mpsc::Receiver<(config::ConfigChange, config::Configuration)>,
tx_pb: crossbeam_channel::Sender<AudioMessage>,
rx_cap: crossbeam_channel::Receiver<AudioMessage>,
rx_pipeconf: crossbeam_channel::Receiver<(config::ConfigChange, config::Configuration)>,
processing_params: Arc<ProcessingParameters>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
Expand Down
Loading
Loading