Skip to content

Commit

Permalink
WIP use crossbeam_channel everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
HEnquist committed Jan 18, 2025
1 parent baf6a44 commit ec9cafd
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 57 deletions.
38 changes: 23 additions & 15 deletions src/alsadevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,19 @@ use alsa::pcm::{Access, Format, Frames, HwParams};
use alsa::poll::Descriptors;
use alsa::{Direction, ValueOr, PCM};
use alsa_sys;
use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};
use crossbeam_channel;
use nix::errno::Errno;
use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use rubato::VecResampler;
use std::ffi::CString;
use std::fmt::Debug;
use std::sync::{mpsc, Arc, Barrier};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;

use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};

use crate::alsadevice_buffermanager::{
CaptureBufferManager, DeviceBufferManager, PlaybackBufferManager,
};
Expand Down Expand Up @@ -73,13 +73,13 @@ pub struct AlsaCaptureDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct PlaybackChannels {
audio: mpsc::Receiver<AudioMessage>,
audio: crossbeam_channel::Receiver<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
}

Expand Down Expand Up @@ -527,6 +527,12 @@ fn playback_loop_bytes(
} else {
None
};
let waiting_chunks_in_channel = channels.audio.len();
trace!(
"Avail frames in buffer: {:?}, waiting chunks in channel: {}",
avail_at_chunk_recvd,
waiting_chunks_in_channel
);
//trace!("PB: Avail at chunk rcvd: {:?}", avail_at_chunk_recvd);

conversion_result =
Expand Down Expand Up @@ -604,7 +610,9 @@ fn playback_loop_bytes(
}
if let Some(avail) = avail_at_chunk_recvd {
let delay = buf_manager.current_delay(avail);
buffer_avg.add_value(delay as f64);
buffer_avg.add_value(
delay as f64 + (params.chunksize * waiting_chunks_in_channel) as f64,
);
}
if timer.larger_than_millis((1000.0 * params.adjust_period) as u64) {
if let Some(avg_delay) = buffer_avg.average() {
Expand Down Expand Up @@ -693,7 +701,7 @@ fn playback_loop_bytes(
}
}

fn drain_check_eos(audio: &mpsc::Receiver<AudioMessage>) -> Option<AudioMessage> {
fn drain_check_eos(audio: &crossbeam_channel::Receiver<AudioMessage>) -> Option<AudioMessage> {
let mut eos: Option<AudioMessage> = None;
while let Some(msg) = audio.try_iter().next() {
if let AudioMessage::EndOfStream = msg {
Expand Down Expand Up @@ -873,8 +881,8 @@ fn capture_loop_bytes(
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -1101,7 +1109,7 @@ fn nbr_capture_bytes_and_frames(
impl PlaybackDevice for AlsaPlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -1175,10 +1183,10 @@ impl PlaybackDevice for AlsaPlaybackDevice {
impl CaptureDevice for AlsaCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down
7 changes: 3 additions & 4 deletions src/audiodevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use rubato::{
};
use std::error;
use std::fmt;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Instant;
Expand Down Expand Up @@ -222,7 +221,7 @@ pub fn rms_and_peak(data: &[PrcFmt]) -> (PrcFmt, PrcFmt) {
pub trait PlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand All @@ -233,10 +232,10 @@ pub trait PlaybackDevice {
pub trait CaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>>;
Expand Down
11 changes: 5 additions & 6 deletions src/bin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
use std::env;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -124,15 +123,15 @@ fn run(
return Ok(ExitState::Exit);
}
};
let (tx_pb, rx_pb) = mpsc::sync_channel(active_config.devices.queuelimit());
let (tx_cap, rx_cap) = mpsc::sync_channel(active_config.devices.queuelimit());
let (tx_pb, rx_pb) = crossbeam_channel::bounded(active_config.devices.queuelimit());
let (tx_cap, rx_cap) = crossbeam_channel::bounded(active_config.devices.queuelimit());

let (tx_status, rx_status) = crossbeam_channel::unbounded();
let tx_status_pb = tx_status.clone();
let tx_status_cap = tx_status;

let (tx_command_cap, rx_command_cap) = mpsc::channel();
let (tx_pipeconf, rx_pipeconf) = mpsc::channel();
let (tx_command_cap, rx_command_cap) = crossbeam_channel::unbounded();
let (tx_pipeconf, rx_pipeconf) = crossbeam_channel::unbounded();

let barrier = Arc::new(Barrier::new(4));
let barrier_pb = barrier.clone();
Expand Down Expand Up @@ -1060,7 +1059,7 @@ fn main_process() -> i32 {

#[cfg(feature = "websocket")]
{
let (tx_state, rx_state) = mpsc::sync_channel(1);
let (tx_state, rx_state) = crossbeam_channel::bounded(1);

let processing_params_clone = processing_params.clone();
let active_config_path_clone = active_config_path.clone();
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res<
}
#[cfg(target_os = "linux")]
let target_level_limit = if matches!(conf.devices.playback, PlaybackDevice::Alsa { .. }) {
4 * conf.devices.chunksize
(4 + conf.devices.queuelimit()) * conf.devices.chunksize
} else {
2 * conf.devices.chunksize
};
Expand Down
17 changes: 8 additions & 9 deletions src/cpaldevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use rubato::VecResampler;
use std::collections::VecDeque;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time;
Expand Down Expand Up @@ -201,7 +200,7 @@ where
impl PlaybackDevice for CpalPlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -235,7 +234,7 @@ impl PlaybackDevice for CpalPlaybackDevice {
}
let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1);

let (tx_dev, rx_dev) = mpsc::sync_channel(1);
let (tx_dev, rx_dev) = crossbeam_channel::bounded(1);
let buffer_fill = Arc::new(AtomicUsize::new(0));
let buffer_fill_clone = buffer_fill.clone();
let mut buffer_avg = countertimer::Averager::new();
Expand Down Expand Up @@ -468,10 +467,10 @@ where
impl CaptureDevice for CpalCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down Expand Up @@ -506,8 +505,8 @@ impl CaptureDevice for CpalCaptureDevice {
Err(_err) => {}
}
let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1);
let (tx_dev_i, rx_dev_i) = mpsc::sync_channel(1);
let (tx_dev_f, rx_dev_f) = mpsc::sync_channel(1);
let (tx_dev_i, rx_dev_i) = crossbeam_channel::bounded(1);
let (tx_dev_f, rx_dev_f) = crossbeam_channel::bounded(1);
let stream = match sample_format {
SampleFormat::S16LE => {
trace!("Build i16 input stream");
Expand Down Expand Up @@ -593,8 +592,8 @@ impl CaptureDevice for CpalCaptureDevice {
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down
17 changes: 8 additions & 9 deletions src/filedevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use std::fs::OpenOptions;
use std::io::{stdin, stdout, Write};
#[cfg(target_os = "linux")]
use std::os::unix::fs::OpenOptionsExt;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -74,9 +73,9 @@ pub struct FileCaptureDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct CaptureParams {
Expand Down Expand Up @@ -111,7 +110,7 @@ pub trait Reader {
impl PlaybackDevice for FilePlaybackDevice {
fn start(
&mut self,
channel: mpsc::Receiver<AudioMessage>,
channel: crossbeam_channel::Receiver<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
playback_status: Arc<RwLock<PlaybackStatus>>,
Expand Down Expand Up @@ -321,8 +320,8 @@ fn capture_loop(
}
}
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -541,10 +540,10 @@ fn capture_loop(
impl CaptureDevice for FileCaptureDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down Expand Up @@ -685,7 +684,7 @@ fn send_silence(
samples: usize,
channels: usize,
chunksize: usize,
audio_channel: &mpsc::SyncSender<AudioMessage>,
audio_channel: &crossbeam_channel::Sender<AudioMessage>,
resampler: &mut Option<Box<dyn VecResampler<PrcFmt>>>,
) {
let mut samples_left = samples;
Expand Down
13 changes: 6 additions & 7 deletions src/generatordevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::audiodevice::*;
use crate::config;

use std::f64::consts::PI;
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;

Expand Down Expand Up @@ -101,9 +100,9 @@ pub struct GeneratorDevice {
}

struct CaptureChannels {
audio: mpsc::SyncSender<AudioMessage>,
audio: crossbeam_channel::Sender<AudioMessage>,
status: crossbeam_channel::Sender<StatusMessage>,
command: mpsc::Receiver<CommandMessage>,
command: crossbeam_channel::Receiver<CommandMessage>,
}

struct GeneratorParams {
Expand Down Expand Up @@ -158,8 +157,8 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) {
Ok(CommandMessage::SetSpeed { .. }) => {
warn!("Signal generator does not support rate adjust. Ignoring request.");
}
Err(mpsc::TryRecvError::Empty) => {}
Err(mpsc::TryRecvError::Disconnected) => {
Err(crossbeam_channel::TryRecvError::Empty) => {}
Err(crossbeam_channel::TryRecvError::Disconnected) => {
error!("Command channel was closed");
break;
}
Expand Down Expand Up @@ -199,10 +198,10 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) {
impl CaptureDevice for GeneratorDevice {
fn start(
&mut self,
channel: mpsc::SyncSender<AudioMessage>,
channel: crossbeam_channel::Sender<AudioMessage>,
barrier: Arc<Barrier>,
status_channel: crossbeam_channel::Sender<StatusMessage>,
command_channel: mpsc::Receiver<CommandMessage>,
command_channel: crossbeam_channel::Receiver<CommandMessage>,
capture_status: Arc<RwLock<CaptureStatus>>,
_processing_params: Arc<ProcessingParameters>,
) -> Res<Box<thread::JoinHandle<()>>> {
Expand Down
7 changes: 3 additions & 4 deletions src/processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ use crate::ProcessingParameters;
use audio_thread_priority::{
demote_current_thread_from_real_time, promote_current_thread_to_real_time,
};
use std::sync::mpsc;
use std::sync::{Arc, Barrier};
use std::thread;

pub fn run_processing(
conf_proc: config::Configuration,
barrier_proc: Arc<Barrier>,
tx_pb: mpsc::SyncSender<AudioMessage>,
rx_cap: mpsc::Receiver<AudioMessage>,
rx_pipeconf: mpsc::Receiver<(config::ConfigChange, config::Configuration)>,
tx_pb: crossbeam_channel::Sender<AudioMessage>,
rx_cap: crossbeam_channel::Receiver<AudioMessage>,
rx_pipeconf: crossbeam_channel::Receiver<(config::ConfigChange, config::Configuration)>,
processing_params: Arc<ProcessingParameters>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
Expand Down
3 changes: 1 addition & 2 deletions src/socketserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use std::fs::File;
use std::io::Read;
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
Expand All @@ -37,7 +36,7 @@ pub struct SharedData {
pub playback_status: Arc<RwLock<PlaybackStatus>>,
pub processing_params: Arc<ProcessingParameters>,
pub processing_status: Arc<RwLock<ProcessingStatus>>,
pub state_change_notify: mpsc::SyncSender<()>,
pub state_change_notify: crossbeam_channel::Sender<()>,
pub state_file_path: Option<String>,
pub unsaved_state_change: Arc<AtomicBool>,
}
Expand Down

0 comments on commit ec9cafd

Please sign in to comment.