diff --git a/Cargo.toml b/Cargo.toml index 9bd557868..16ebd748d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,10 +10,8 @@ edition = "2021" rust-version = "1.70" [features] -asio = [ - "asio-sys", - "num-traits", -] # Only available on Windows. See README for setup instructions. +asio = ["asio-sys", "num-traits"] # Only available on Windows. See README for setup instructions. +pulseaudio = ["dep:pulseaudio", "dep:futures"] # Only available on some Unix platforms. # Only available on web when atomics are enabled. See README for what it does. web_audio_worklet = [ @@ -65,6 +63,8 @@ alsa = "0.10" libc = "0.2" audio_thread_priority = { version = "0.34", optional = true } jack = { version = "0.13", optional = true } +pulseaudio = { version = "0.3", optional = true } +futures = { version = "0.3", optional = true } [target.'cfg(target_vendor = "apple")'.dependencies] mach2 = "0.5" diff --git a/examples/beep.rs b/examples/beep.rs index e9d74d0c5..a91812b75 100644 --- a/examples/beep.rs +++ b/examples/beep.rs @@ -1,7 +1,7 @@ use clap::Parser; use cpal::{ traits::{DeviceTrait, HostTrait, StreamTrait}, - FromSample, Sample, SizedSample, I24, + FromSample, HostUnavailable, Sample, SizedSample, I24, }; #[derive(Parser, Debug)] @@ -11,58 +11,57 @@ struct Opt { #[arg(short, long)] device: Option, - /// Use the JACK host - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" - ))] - #[arg(short, long)] - #[allow(dead_code)] + /// Use the JACK host. Requires `--features jack`. + #[arg(long, default_value_t = false)] jack: bool, + + /// Use the PulseAudio host. Requires `--features pulseaudio`. + #[arg(long, default_value_t = false)] + pulseaudio: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - // Conditionally compile with jack if the feature is specified. - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" + // Jack/PulseAudio support must be enabled at compile time, and is + // only available on some platforms. + #[allow(unused_mut, unused_assignments)] + let mut jack_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pulseaudio_host_id = Err(HostUnavailable); + + #[cfg(any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" ))] + { + #[cfg(feature = "jack")] + { + jack_host_id = Ok(cpal::HostId::Jack); + } + + #[cfg(feature = "pulseaudio")] + { + pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); + } + } + // Manually check for flags. Can be passed through cargo with -- e.g. // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { - cpal::host_from_id(cpal::available_hosts() - .into_iter() - .find(|id| *id == cpal::HostId::Jack) - .expect( - "make sure --features jack is specified. only works on OSes where jack is available", - )).expect("jack host unavailable") + jack_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features jack` is specified, and the platform is supported") + } else if opt.pulseaudio { + pulseaudio_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pulseaudio` is specified, and the platform is supported") } else { cpal::default_host() }; - #[cfg(any( - not(any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - )), - not(feature = "jack") - ))] - let host = cpal::default_host(); - let device = if let Some(device) = opt.device { let id = &device.parse().expect("failed to parse device id"); host.device_by_id(id) diff --git a/examples/feedback.rs b/examples/feedback.rs index 76a0eaf14..753efe094 100644 --- a/examples/feedback.rs +++ b/examples/feedback.rs @@ -7,7 +7,10 @@ //! precisely synchronised. use clap::Parser; -use cpal::traits::{DeviceTrait, HostTrait, StreamTrait}; +use cpal::{ + traits::{DeviceTrait, HostTrait, StreamTrait}, + HostUnavailable, +}; use ringbuf::{ traits::{Consumer, Producer, Split}, HeapRb, @@ -28,58 +31,57 @@ struct Opt { #[arg(short, long, value_name = "DELAY_MS", default_value_t = 150.0)] latency: f32, - /// Use the JACK host - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" - ))] - #[arg(short, long)] - #[allow(dead_code)] + /// Use the JACK host. Requires `--features jack`. + #[arg(long, default_value_t = false)] jack: bool, + + /// Use the PulseAudio host. Requires `--features pulseaudio`. + #[arg(long, default_value_t = false)] + pulseaudio: bool, } fn main() -> anyhow::Result<()> { let opt = Opt::parse(); - // Conditionally compile with jack if the feature is specified. - #[cfg(all( - any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - ), - feature = "jack" + // Jack/PulseAudio support must be enabled at compile time, and is + // only available on some platforms. + #[allow(unused_mut, unused_assignments)] + let mut jack_host_id = Err(HostUnavailable); + #[allow(unused_mut, unused_assignments)] + let mut pulseaudio_host_id = Err(HostUnavailable); + + #[cfg(any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" ))] + { + #[cfg(feature = "jack")] + { + jack_host_id = Ok(cpal::HostId::Jack); + } + + #[cfg(feature = "pulseaudio")] + { + pulseaudio_host_id = Ok(cpal::HostId::PulseAudio); + } + } + // Manually check for flags. Can be passed through cargo with -- e.g. // cargo run --release --example beep --features jack -- --jack let host = if opt.jack { - cpal::host_from_id(cpal::available_hosts() - .into_iter() - .find(|id| *id == cpal::HostId::Jack) - .expect( - "make sure --features jack is specified. only works on OSes where jack is available", - )).expect("jack host unavailable") + jack_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features jack` is specified, and the platform is supported") + } else if opt.pulseaudio { + pulseaudio_host_id + .and_then(cpal::host_from_id) + .expect("make sure `--features pulseaudio` is specified, and the platform is supported") } else { cpal::default_host() }; - #[cfg(any( - not(any( - target_os = "linux", - target_os = "dragonfly", - target_os = "freebsd", - target_os = "netbsd" - )), - not(feature = "jack") - ))] - let host = cpal::default_host(); - // Find devices. let input_device = if let Some(device) = opt.input_device { let id = &device.parse().expect("failed to parse input device id"); @@ -160,9 +162,9 @@ fn main() -> anyhow::Result<()> { input_stream.play()?; output_stream.play()?; - // Run for 3 seconds before closing. - println!("Playing for 3 seconds... "); - std::thread::sleep(std::time::Duration::from_secs(3)); + // Run for 10 seconds before closing. + println!("Playing for 10 seconds... "); + std::thread::sleep(std::time::Duration::from_secs(10)); drop(input_stream); drop(output_stream); println!("Done!"); diff --git a/src/host/alsa/mod.rs b/src/host/alsa/mod.rs index f0c5ec64a..b3734681d 100644 --- a/src/host/alsa/mod.rs +++ b/src/host/alsa/mod.rs @@ -1193,9 +1193,10 @@ impl StreamTrait for Stream { } } -// Overly safe clamp because alsa Frames are i64 (64-bit) or i32 (32-bit) +// Convert ALSA frames to FrameCount, clamping to valid range. +// ALSA Frames are i64 (64-bit) or i32 (32-bit). fn clamp_frame_count(buffer_size: alsa::pcm::Frames) -> FrameCount { - buffer_size.clamp(1, FrameCount::MAX as alsa::pcm::Frames) as FrameCount + buffer_size.max(1).try_into().unwrap_or(FrameCount::MAX) } fn hw_params_buffer_size_min_max(hw_params: &alsa::pcm::HwParams) -> (FrameCount, FrameCount) { diff --git a/src/host/mod.rs b/src/host/mod.rs index a27d7b1e3..a287166ed 100644 --- a/src/host/mod.rs +++ b/src/host/mod.rs @@ -23,6 +23,16 @@ pub(crate) mod emscripten; feature = "jack" ))] pub(crate) mod jack; +#[cfg(all( + any( + target_os = "linux", + target_os = "dragonfly", + target_os = "freebsd", + target_os = "netbsd" + ), + feature = "pulseaudio" +))] +pub(crate) mod pulseaudio; #[cfg(windows)] pub(crate) mod wasapi; #[cfg(all( diff --git a/src/host/pulseaudio/mod.rs b/src/host/pulseaudio/mod.rs new file mode 100644 index 000000000..b7c604f26 --- /dev/null +++ b/src/host/pulseaudio/mod.rs @@ -0,0 +1,402 @@ +use futures::executor::block_on; +use pulseaudio::protocol; + +mod stream; + +pub use stream::Stream; + +use crate::{ + traits::{DeviceTrait, HostTrait}, + BackendSpecificError, BuildStreamError, Data, DefaultStreamConfigError, DeviceDescription, + DeviceDescriptionBuilder, DeviceDirection, DeviceId, DeviceIdError, DeviceNameError, + DevicesError, HostId, HostUnavailable, InputCallbackInfo, OutputCallbackInfo, SampleFormat, + SampleRate, StreamConfig, StreamError, SupportedBufferSize, SupportedStreamConfig, + SupportedStreamConfigRange, SupportedStreamConfigsError, +}; + +const PULSE_FORMATS: &[SampleFormat] = &[ + SampleFormat::U8, + SampleFormat::I16, + SampleFormat::I24, + SampleFormat::I32, + SampleFormat::F32, +]; + +impl TryFrom for SampleFormat { + type Error = (); + + fn try_from(spec: protocol::SampleFormat) -> Result { + match spec { + protocol::SampleFormat::U8 => Ok(SampleFormat::U8), + protocol::SampleFormat::S16Le | protocol::SampleFormat::S16Be => Ok(SampleFormat::I16), + protocol::SampleFormat::S24Le | protocol::SampleFormat::S24Be => Ok(SampleFormat::I24), + protocol::SampleFormat::S32Le | protocol::SampleFormat::S32Be => Ok(SampleFormat::I32), + protocol::SampleFormat::Float32Le | protocol::SampleFormat::Float32Be => { + Ok(SampleFormat::F32) + } + _ => Err(()), + } + } +} + +impl TryFrom for protocol::SampleFormat { + type Error = (); + + fn try_from(format: SampleFormat) -> Result { + match (format, cfg!(target_endian = "little")) { + (SampleFormat::U8, _) => Ok(protocol::SampleFormat::U8), + (SampleFormat::I16, true) => Ok(protocol::SampleFormat::S16Le), + (SampleFormat::I16, false) => Ok(protocol::SampleFormat::S16Be), + (SampleFormat::I24, true) => Ok(protocol::SampleFormat::S24Le), + (SampleFormat::I24, false) => Ok(protocol::SampleFormat::S24Be), + (SampleFormat::I32, true) => Ok(protocol::SampleFormat::S32Le), + (SampleFormat::I32, false) => Ok(protocol::SampleFormat::S32Be), + (SampleFormat::F32, true) => Ok(protocol::SampleFormat::Float32Le), + (SampleFormat::F32, false) => Ok(protocol::SampleFormat::Float32Be), + _ => Err(()), + } + } +} + +impl From for BackendSpecificError { + fn from(err: pulseaudio::ClientError) -> Self { + BackendSpecificError { + description: err.to_string(), + } + } +} + +/// A Host for connecting to the popular PulseAudio and PipeWire (via +/// pipewire-pulse) audio servers on linux. +pub struct Host { + client: pulseaudio::Client, +} + +impl Host { + pub fn new() -> Result { + let client = + pulseaudio::Client::from_env(c"cpal-pulseaudio").map_err(|_| HostUnavailable)?; + + Ok(Self { client }) + } +} + +impl HostTrait for Host { + type Devices = std::vec::IntoIter; + type Device = Device; + + fn is_available() -> bool { + pulseaudio::socket_path_from_env().is_some() + } + + fn devices(&self) -> Result { + let sinks = block_on(self.client.list_sinks()).map_err(|err| BackendSpecificError { + description: format!("Failed to list sinks: {err}"), + })?; + + let sources = block_on(self.client.list_sources()).map_err(|err| BackendSpecificError { + description: format!("Failed to list sources: {err}"), + })?; + + Ok(sinks + .into_iter() + .map(|sink_info| Device::Sink { + client: self.client.clone(), + info: sink_info, + }) + .chain(sources.into_iter().map(|source_info| Device::Source { + client: self.client.clone(), + info: source_info, + })) + .collect::>() + .into_iter()) + } + + fn default_input_device(&self) -> Option { + let source_info = block_on( + self.client + .source_info_by_name(protocol::DEFAULT_SOURCE.to_owned()), + ) + .ok()?; + + Some(Device::Source { + client: self.client.clone(), + info: source_info, + }) + } + + fn default_output_device(&self) -> Option { + let sink_info = block_on( + self.client + .sink_info_by_name(protocol::DEFAULT_SINK.to_owned()), + ) + .ok()?; + + Some(Device::Sink { + client: self.client.clone(), + info: sink_info, + }) + } +} + +/// A PulseAudio sink or source. +#[derive(Debug, Clone)] +pub enum Device { + Sink { + client: pulseaudio::Client, + info: protocol::SinkInfo, + }, + Source { + client: pulseaudio::Client, + info: protocol::SourceInfo, + }, +} + +impl DeviceTrait for Device { + type SupportedInputConfigs = std::vec::IntoIter; + type SupportedOutputConfigs = std::vec::IntoIter; + type Stream = Stream; + + fn name(&self) -> Result { + let name = match self { + Device::Sink { info, .. } => &info.name, + Device::Source { info, .. } => &info.name, + }; + + Ok(String::from_utf8_lossy(name.as_bytes()).into_owned()) + } + + fn supported_input_configs( + &self, + ) -> Result { + let Device::Source { .. } = self else { + return Ok(vec![].into_iter()); + }; + + let mut ranges = vec![]; + for format in PULSE_FORMATS { + for channel_count in 1..protocol::sample_spec::MAX_CHANNELS { + ranges.push(SupportedStreamConfigRange { + channels: channel_count as _, + min_sample_rate: SampleRate(1), + max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: *format, + }) + } + } + + Ok(ranges.into_iter()) + } + + fn supported_output_configs( + &self, + ) -> Result { + let Device::Sink { .. } = self else { + return Ok(vec![].into_iter()); + }; + + let mut ranges = vec![]; + for format in PULSE_FORMATS { + for channel_count in 1..protocol::sample_spec::MAX_CHANNELS { + ranges.push(SupportedStreamConfigRange { + channels: channel_count as _, + min_sample_rate: SampleRate(1), + max_sample_rate: SampleRate(protocol::sample_spec::MAX_RATE), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: *format, + }) + } + } + + Ok(ranges.into_iter()) + } + + fn default_input_config(&self) -> Result { + let Device::Source { info, .. } = self else { + return Err(DefaultStreamConfigError::StreamTypeNotSupported); + }; + + Ok(SupportedStreamConfig { + channels: info.channel_map.num_channels() as _, + sample_rate: SampleRate(info.sample_spec.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: info + .sample_spec + .format + .try_into() + .unwrap_or(SampleFormat::F32), + }) + } + + fn default_output_config(&self) -> Result { + let Device::Sink { info, .. } = self else { + return Err(DefaultStreamConfigError::StreamTypeNotSupported); + }; + + Ok(SupportedStreamConfig { + channels: info.channel_map.num_channels() as _, + sample_rate: SampleRate(info.sample_spec.sample_rate), + buffer_size: SupportedBufferSize::Range { + min: 0, + max: protocol::MAX_MEMBLOCKQ_LENGTH as _, + }, + sample_format: info + .sample_spec + .format + .try_into() + .unwrap_or(SampleFormat::F32), + }) + } + + fn build_input_stream_raw( + &self, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let Device::Source { client, info } = self else { + return Err(BuildStreamError::StreamConfigNotSupported); + }; + + let format: protocol::SampleFormat = sample_format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + let sample_spec = make_sample_spec(config, format); + let channel_map = make_channel_map(config); + let buffer_attr = make_buffer_attr(config, format); + + let params = protocol::RecordStreamParams { + sample_spec, + channel_map, + source_index: Some(info.index), + buffer_attr, + flags: protocol::stream::StreamFlags { + // Start the stream suspended. + start_corked: true, + ..Default::default() + }, + ..Default::default() + }; + + stream::Stream::new_record(client.clone(), params, data_callback, error_callback) + } + + fn build_output_stream_raw( + &self, + config: &StreamConfig, + sample_format: SampleFormat, + data_callback: D, + error_callback: E, + _timeout: Option, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let Device::Sink { client, info } = self else { + return Err(BuildStreamError::StreamConfigNotSupported); + }; + + let format: protocol::SampleFormat = sample_format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + let sample_spec = make_sample_spec(config, format); + let channel_map = make_channel_map(config); + let buffer_attr = make_buffer_attr(config, format); + + let params = protocol::PlaybackStreamParams { + sink_index: Some(info.index), + sample_spec, + channel_map, + buffer_attr, + flags: protocol::stream::StreamFlags { + // Start the stream suspended. + start_corked: true, + ..Default::default() + }, + ..Default::default() + }; + + stream::Stream::new_playback(client.clone(), params, data_callback, error_callback) + } + + fn description(&self) -> Result { + let (name, description, direction) = match self { + Device::Sink { info, .. } => (&info.name, &info.description, DeviceDirection::Output), + Device::Source { info, .. } => (&info.name, &info.description, DeviceDirection::Input), + }; + + let mut builder = DeviceDescriptionBuilder::new(String::from_utf8_lossy(name.as_bytes())) + .direction(direction); + if let Some(desc) = description { + builder = builder.add_extended_line(String::from_utf8_lossy(desc.as_bytes())); + } + + Ok(builder.build()) + } + + fn id(&self) -> Result { + let id = match self { + Device::Sink { info, .. } => info.index, + Device::Source { info, .. } => info.index, + }; + + Ok(DeviceId(HostId::PulseAudio, id.to_string())) + } +} + +fn make_sample_spec(config: &StreamConfig, format: protocol::SampleFormat) -> protocol::SampleSpec { + protocol::SampleSpec { + format, + sample_rate: config.sample_rate.0, + channels: config.channels as _, + } +} + +fn make_channel_map(config: &StreamConfig) -> protocol::ChannelMap { + if config.channels == 2 { + return protocol::ChannelMap::stereo(); + } + + let mut map = protocol::ChannelMap::empty(); + for _ in 0..config.channels { + map.push(protocol::ChannelPosition::Mono); + } + + map +} + +fn make_buffer_attr( + config: &StreamConfig, + format: protocol::SampleFormat, +) -> protocol::stream::BufferAttr { + match config.buffer_size { + crate::BufferSize::Default => Default::default(), + crate::BufferSize::Fixed(frame_count) => { + let len = frame_count * config.channels as u32 * format.bytes_per_sample() as u32; + protocol::stream::BufferAttr { + max_length: len, + target_length: len, + ..Default::default() + } + } + } +} diff --git a/src/host/pulseaudio/stream.rs b/src/host/pulseaudio/stream.rs new file mode 100644 index 000000000..f946f0d42 --- /dev/null +++ b/src/host/pulseaudio/stream.rs @@ -0,0 +1,248 @@ +use std::{ + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, + time::{self, SystemTime}, +}; + +use futures::executor::block_on; +use pulseaudio::{protocol, AsPlaybackSource}; + +use crate::{ + traits::StreamTrait, BackendSpecificError, BuildStreamError, Data, InputCallbackInfo, + InputStreamTimestamp, OutputCallbackInfo, OutputStreamTimestamp, PlayStreamError, SampleFormat, + StreamError, StreamInstant, +}; + +pub enum Stream { + Playback(pulseaudio::PlaybackStream), + Record(pulseaudio::RecordStream), +} + +impl StreamTrait for Stream { + fn play(&self) -> Result<(), PlayStreamError> { + match self { + Stream::Playback(stream) => { + block_on(stream.uncork()).map_err(Into::::into)?; + } + Stream::Record(stream) => { + block_on(stream.uncork()).map_err(Into::::into)?; + block_on(stream.started()).map_err(Into::::into)?; + } + }; + + Ok(()) + } + + fn pause(&self) -> Result<(), crate::PauseStreamError> { + let res = match self { + Stream::Playback(stream) => block_on(stream.cork()), + Stream::Record(stream) => block_on(stream.cork()), + }; + + res.map_err(Into::::into)?; + Ok(()) + } +} + +impl Stream { + pub fn new_playback( + client: pulseaudio::Client, + params: protocol::PlaybackStreamParams, + mut data_callback: D, + mut error_callback: E, + ) -> Result + where + D: FnMut(&mut Data, &OutputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let epoch = std::time::SystemTime::now(); + + let current_latency_micros = Arc::new(AtomicU64::new(0)); + let latency_clone = current_latency_micros.clone(); + let sample_spec = params.sample_spec.clone(); + + let format: SampleFormat = sample_spec + .format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + // Wrap the write callback to match the pulseaudio signature. + let callback = move |buf: &mut [u8]| { + let now = SystemTime::now().duration_since(epoch).unwrap_or_default(); + let latency = latency_clone.load(atomic::Ordering::Relaxed); + let playback_time = now + time::Duration::from_micros(latency as u64); + + let timestamp = OutputStreamTimestamp { + callback: StreamInstant { + secs: now.as_secs() as i64, + nanos: now.subsec_nanos(), + }, + playback: StreamInstant { + secs: playback_time.as_secs() as i64, + nanos: playback_time.subsec_nanos(), + }, + }; + + // Preemptively zero the buffer. + for b in buf.iter_mut() { + *b = 0; + } + + let bps = sample_spec.format.bytes_per_sample(); + let n_samples = buf.len() / bps; + + // SAFETY: we calculated the number of samples based on + // `sample_spec.format`, and `format` is directly derived from (and + // equivalent to) `sample_spec.format`. + let mut data = unsafe { Data::from_parts(buf.as_mut_ptr().cast(), n_samples, format) }; + + data_callback(&mut data, &OutputCallbackInfo { timestamp }); + + // We always consider the full buffer filled, because cpal's + // user-facing api doesn't allow for short writes. + n_samples * bps + }; + + let stream = block_on(client.create_playback_stream(params, callback.as_playback_source())) + .map_err(Into::::into)?; + + // Spawn a thread to drive the stream future. It will exit automatically + // when the stream is stopped by the user. + let stream_clone = stream.clone(); + let _worker_thread = std::thread::spawn(move || block_on(stream_clone.play_all())); + + // Spawn a thread to monitor the stream's latency in a loop. It will + // exit automatically when the stream ends. + let stream_clone = stream.clone(); + let latency_clone = current_latency_micros.clone(); + std::thread::spawn(move || loop { + let timing_info = match block_on(stream_clone.timing_info()) { + Ok(timing_info) => timing_info, + Err(e) => { + error_callback(StreamError::from(BackendSpecificError { + description: e.to_string(), + })); + break; + } + }; + + store_latency( + &latency_clone, + sample_spec, + timing_info.sink_usec, + timing_info.write_offset, + timing_info.read_offset, + ); + + std::thread::sleep(time::Duration::from_millis(100)); + }); + + Ok(Self::Playback(stream)) + } + + pub fn new_record( + client: pulseaudio::Client, + params: protocol::RecordStreamParams, + mut data_callback: D, + mut error_callback: E, + ) -> Result + where + D: FnMut(&Data, &InputCallbackInfo) + Send + 'static, + E: FnMut(StreamError) + Send + 'static, + { + let epoch = std::time::SystemTime::now(); + + let current_latency_micros = Arc::new(AtomicU64::new(0)); + let latency_clone = current_latency_micros.clone(); + let sample_spec = params.sample_spec.clone(); + + let format: SampleFormat = sample_spec + .format + .try_into() + .map_err(|_| BuildStreamError::StreamConfigNotSupported)?; + + let callback = move |buf: &[u8]| { + let now = SystemTime::now().duration_since(epoch).unwrap_or_default(); + let latency = latency_clone.load(atomic::Ordering::Relaxed); + let capture_time = now + .checked_sub(time::Duration::from_micros(latency as u64)) + .unwrap_or_default(); + + let timestamp = InputStreamTimestamp { + callback: StreamInstant { + secs: now.as_secs() as i64, + nanos: now.subsec_nanos(), + }, + capture: StreamInstant { + secs: capture_time.as_secs() as i64, + nanos: capture_time.subsec_nanos(), + }, + }; + + let bps = sample_spec.format.bytes_per_sample(); + let n_samples = buf.len() / bps; + + // SAFETY: we calculated the number of samples based on + // `sample_spec.format`, and `format` is directly derived from (and + // equivalent to) `sample_spec.format`. + let data = unsafe { Data::from_parts(buf.as_ptr() as *mut _, n_samples, format) }; + + data_callback(&data, &InputCallbackInfo { timestamp }); + }; + + let stream = block_on(client.create_record_stream(params, callback)) + .map_err(Into::::into)?; + + // Spawn a thread to monitor the stream's latency in a loop. It will + // exit automatically when the stream ends. + let stream_clone = stream.clone(); + let latency_clone = current_latency_micros.clone(); + std::thread::spawn(move || loop { + let timing_info = match block_on(stream_clone.timing_info()) { + Ok(timing_info) => timing_info, + Err(e) => { + error_callback(StreamError::from(BackendSpecificError { + description: e.to_string(), + })); + break; + } + }; + + store_latency( + &latency_clone, + sample_spec, + timing_info.sink_usec, + timing_info.write_offset, + timing_info.read_offset, + ); + + std::thread::sleep(time::Duration::from_millis(100)); + }); + + Ok(Self::Record(stream)) + } +} + +fn store_latency( + latency_micros: &AtomicU64, + sample_spec: protocol::SampleSpec, + device_latency_usec: u64, + write_offset: i64, + read_offset: i64, +) -> time::Duration { + let offset = (write_offset as u64) + .checked_sub(read_offset as u64) + .unwrap_or(0); + + let latency = time::Duration::from_micros(device_latency_usec) + + sample_spec.bytes_to_duration(offset as usize); + + latency_micros.store( + latency.as_micros().try_into().unwrap_or(u64::MAX), + atomic::Ordering::Relaxed, + ); + + latency +} diff --git a/src/platform/mod.rs b/src/platform/mod.rs index f3962497c..cbf7809ed 100644 --- a/src/platform/mod.rs +++ b/src/platform/mod.rs @@ -672,7 +672,6 @@ macro_rules! impl_platform_host { }; } -// TODO: Add pulseaudio and jack here eventually. #[cfg(any( target_os = "linux", target_os = "dragonfly", @@ -683,8 +682,11 @@ mod platform_impl { pub use crate::host::alsa::Host as AlsaHost; #[cfg(feature = "jack")] pub use crate::host::jack::Host as JackHost; + #[cfg(feature = "pulseaudio")] + pub use crate::host::pulseaudio::Host as PulseAudioHost; impl_platform_host!( + #[cfg(feature = "pulseaudio")] PulseAudio => PulseAudioHost, #[cfg(feature = "jack")] Jack => JackHost, Alsa => AlsaHost, #[cfg(feature = "custom")] Custom => super::CustomHost