Skip to content

Commit

Permalink
Fix block macro to support multiple outputs, and port Tee to that
Browse files Browse the repository at this point in the history
  • Loading branch information
ThomasHabets committed Feb 16, 2025
1 parent cb4775f commit 369d35f
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
16 changes: 13 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ cpal = { version = "0.15.3", features = ["jack"], optional=true }
errno = "0.3.9"
rustradio_macros = { version = "0.10.1", path = "rustradio_macros" }
rayon = "1.10.0"
itertools = "0.14.0"
# System fftw has been many times faster for me than not. Maybe because the C
# code is not compiled with the right options, like -march=native?
[dependencies.fftw]
Expand Down
29 changes: 19 additions & 10 deletions rustradio_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ pub fn derive_block(input: TokenStream) -> TokenStream {
//
// Elements are of the form:
// * out_names: dst
// * out_names_samp: dst_sample
// * out_types_types: WriteStream<Complex>
// * outval_types: Complex
let (out_names, _out_types, outval_types) = unzip_n![
let (out_names, out_names_samp, _out_types, outval_types) = unzip_n![
fields_named
.named
.iter()
Expand All @@ -245,11 +246,18 @@ pub fn derive_block(input: TokenStream) -> TokenStream {
let inner = inner_type(&field.ty);
let ty = field.ty.clone();
let field_name = field.ident.clone().unwrap();
(field_name.clone(), quote! { #ty }, quote! { #inner })
let samp_name: syn::Ident = syn::parse_str(&format!("{field_name}_sample")).unwrap();
(
field_name.clone(),
samp_name,
quote! { #ty },
quote! { #inner },
)
}),
a,
b,
c
c,
d
];

// Ensure no field is marked both input and output.
Expand Down Expand Up @@ -323,7 +331,8 @@ pub fn derive_block(input: TokenStream) -> TokenStream {
extra.push(quote! {
impl #impl_generics #struct_name #ty_generics #where_clause {
fn process_sync_tags<'a>(&mut self, #(#inval_name_types, #intag_name_types,)*) -> (#(#outval_types,)* std::borrow::Cow<'a, [#path::stream::Tag]>) {
(self.process_sync(#(#in_names,)*), std::borrow::Cow::Borrowed(#first_tags))
let (#(#out_names),*) = self.process_sync(#(#in_names,)*);
(#(#out_names,)*std::borrow::Cow::Borrowed(#first_tags))
}
}
});
Expand Down Expand Up @@ -352,26 +361,26 @@ pub fn derive_block(input: TokenStream) -> TokenStream {
// There may be opportunity to deduplicate some of
// the next couple of lines with the !empty_tags
// case.
let (s, ts) = self.process_sync_tags(#(*#in_names, &[]),*);
let (#(#out_names,)* ts) = self.process_sync_tags(#(*#in_names, &[]),*);
for tag in ts.iter() {
otags.push(#path::stream::Tag::new(pos, tag.key().into(), tag.val().clone()));
}
s
(#(#out_names),*)
} else {
// TODO: This tag filtering is quite expensive.
#(let #in_tag_names: Vec<_> = #in_tag_names.iter()
.filter(|t| t.pos() == pos)
.map(|t| #path::stream::Tag::new(0, t.key().to_string(), t.val().clone()))
.collect();)*
let (s, ts) = self.process_sync_tags(#(*#in_names, &#in_tag_names),*);
let (#(#out_names,)* ts) = self.process_sync_tags(#(*#in_names, &#in_tag_names),*);
for tag in ts.iter() {
otags.push(#path::stream::Tag::new(pos, tag.key().into(), tag.val().clone()));
}
s
(#(#out_names),*)
}
});
for (samp, w) in it.zip(#(#out_names.slice().iter_mut())*) {
*w = samp;
for ((#(#out_names_samp),*), #(#out_names,)*) in itertools::izip!(it, #(#out_names.slice().iter_mut()),*) {
(#(*#out_names),*) = (#(#out_names_samp),*);
}
#(#in_names.consume(n);)*
#(#out_names.produce(n, &otags);)*
Expand Down
24 changes: 4 additions & 20 deletions src/tee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@
use anyhow::Result;

use crate::block::{Block, BlockRet};
use crate::stream::{ReadStream, WriteStream};
use crate::Error;

/// Tee
// TODO: make sync
#[derive(rustradio_macros::Block)]
#[rustradio(crate, new)]
#[rustradio(crate, new, sync)]
pub struct Tee<T: Copy> {
#[rustradio(in)]
src: ReadStream<T>,
Expand All @@ -18,22 +16,8 @@ pub struct Tee<T: Copy> {
#[rustradio(out)]
dst2: WriteStream<T>,
}

impl<T: Copy> Block for Tee<T> {
fn work(&mut self) -> Result<BlockRet, Error> {
let (i, tags) = self.src.read_buf()?;
let mut o1 = self.dst1.write_buf()?;
let mut o2 = self.dst2.write_buf()?;
if i.is_empty() {
return Ok(BlockRet::Noop);
}
let n = std::cmp::min(i.len(), o1.len());
let n = std::cmp::min(n, o2.len());
o1.fill_from_slice(&i.slice()[..n]);
o2.fill_from_slice(&i.slice()[..n]);
o1.produce(n, &tags);
o2.produce(n, &tags);
i.consume(n);
Ok(BlockRet::Ok)
impl<T: Copy> Tee<T> {
fn process_sync(&self, s: T) -> (T, T) {
(s, s)
}
}

0 comments on commit 369d35f

Please sign in to comment.