From 369d35f0a1770ab8c966544486b909b1ded1d985 Mon Sep 17 00:00:00 2001 From: Thomas Habets Date: Sun, 16 Feb 2025 21:36:06 +0000 Subject: [PATCH] Fix block macro to support multiple outputs, and port Tee to that --- Cargo.lock | 16 +++++++++++++--- Cargo.toml | 1 + rustradio_macros/src/lib.rs | 29 +++++++++++++++++++---------- src/tee.rs | 24 ++++-------------------- 4 files changed, 37 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b2ae0d..fa41a75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -144,7 +144,7 @@ dependencies = [ "bitflags 2.4.0", "cexpr", "clang-sys", - "itertools", + "itertools 0.13.0", "proc-macro2", "quote", "regex", @@ -727,6 +727,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b192c782037fadd9cfa75548310488aabdbf3d2da73885b31bd0abd03351285" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -1174,7 +1183,7 @@ dependencies = [ "crossterm", "indoc", "instability", - "itertools", + "itertools 0.13.0", "lru", "paste", "strum", @@ -1312,6 +1321,7 @@ dependencies = [ "errno", "fast-math", "fftw", + "itertools 0.14.0", "libc", "log", "num-complex", @@ -1659,7 +1669,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3644627a5af5fa321c95b9b235a72fd24cd29c648c2c379431e6628655627bf" dependencies = [ - "itertools", + "itertools 0.13.0", "unicode-segmentation", "unicode-width 0.1.14", ] diff --git a/Cargo.toml b/Cargo.toml index f82a8b9..a5b0d6d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ cpal = { version = "0.15.3", features = ["jack"], optional=true } errno = "0.3.9" rustradio_macros = { version = "0.10.1", path = "rustradio_macros" } rayon = "1.10.0" +itertools = "0.14.0" # System fftw has been many times faster for me than not. Maybe because the C # code is not compiled with the right options, like -march=native? [dependencies.fftw] diff --git a/rustradio_macros/src/lib.rs b/rustradio_macros/src/lib.rs index d077be6..628b900 100644 --- a/rustradio_macros/src/lib.rs +++ b/rustradio_macros/src/lib.rs @@ -234,9 +234,10 @@ pub fn derive_block(input: TokenStream) -> TokenStream { // // Elements are of the form: // * out_names: dst + // * out_names_samp: dst_sample // * out_types_types: WriteStream // * outval_types: Complex - let (out_names, _out_types, outval_types) = unzip_n![ + let (out_names, out_names_samp, _out_types, outval_types) = unzip_n![ fields_named .named .iter() @@ -245,11 +246,18 @@ pub fn derive_block(input: TokenStream) -> TokenStream { let inner = inner_type(&field.ty); let ty = field.ty.clone(); let field_name = field.ident.clone().unwrap(); - (field_name.clone(), quote! { #ty }, quote! { #inner }) + let samp_name: syn::Ident = syn::parse_str(&format!("{field_name}_sample")).unwrap(); + ( + field_name.clone(), + samp_name, + quote! { #ty }, + quote! { #inner }, + ) }), a, b, - c + c, + d ]; // Ensure no field is marked both input and output. @@ -323,7 +331,8 @@ pub fn derive_block(input: TokenStream) -> TokenStream { extra.push(quote! { impl #impl_generics #struct_name #ty_generics #where_clause { fn process_sync_tags<'a>(&mut self, #(#inval_name_types, #intag_name_types,)*) -> (#(#outval_types,)* std::borrow::Cow<'a, [#path::stream::Tag]>) { - (self.process_sync(#(#in_names,)*), std::borrow::Cow::Borrowed(#first_tags)) + let (#(#out_names),*) = self.process_sync(#(#in_names,)*); + (#(#out_names,)*std::borrow::Cow::Borrowed(#first_tags)) } } }); @@ -352,26 +361,26 @@ pub fn derive_block(input: TokenStream) -> TokenStream { // There may be opportunity to deduplicate some of // the next couple of lines with the !empty_tags // case. - let (s, ts) = self.process_sync_tags(#(*#in_names, &[]),*); + let (#(#out_names,)* ts) = self.process_sync_tags(#(*#in_names, &[]),*); for tag in ts.iter() { otags.push(#path::stream::Tag::new(pos, tag.key().into(), tag.val().clone())); } - s + (#(#out_names),*) } else { // TODO: This tag filtering is quite expensive. #(let #in_tag_names: Vec<_> = #in_tag_names.iter() .filter(|t| t.pos() == pos) .map(|t| #path::stream::Tag::new(0, t.key().to_string(), t.val().clone())) .collect();)* - let (s, ts) = self.process_sync_tags(#(*#in_names, &#in_tag_names),*); + let (#(#out_names,)* ts) = self.process_sync_tags(#(*#in_names, &#in_tag_names),*); for tag in ts.iter() { otags.push(#path::stream::Tag::new(pos, tag.key().into(), tag.val().clone())); } - s + (#(#out_names),*) } }); - for (samp, w) in it.zip(#(#out_names.slice().iter_mut())*) { - *w = samp; + for ((#(#out_names_samp),*), #(#out_names,)*) in itertools::izip!(it, #(#out_names.slice().iter_mut()),*) { + (#(*#out_names),*) = (#(#out_names_samp),*); } #(#in_names.consume(n);)* #(#out_names.produce(n, &otags);)* diff --git a/src/tee.rs b/src/tee.rs index 7bbb47b..20f8239 100644 --- a/src/tee.rs +++ b/src/tee.rs @@ -2,14 +2,12 @@ use anyhow::Result; -use crate::block::{Block, BlockRet}; use crate::stream::{ReadStream, WriteStream}; -use crate::Error; /// Tee // TODO: make sync #[derive(rustradio_macros::Block)] -#[rustradio(crate, new)] +#[rustradio(crate, new, sync)] pub struct Tee { #[rustradio(in)] src: ReadStream, @@ -18,22 +16,8 @@ pub struct Tee { #[rustradio(out)] dst2: WriteStream, } - -impl Block for Tee { - fn work(&mut self) -> Result { - let (i, tags) = self.src.read_buf()?; - let mut o1 = self.dst1.write_buf()?; - let mut o2 = self.dst2.write_buf()?; - if i.is_empty() { - return Ok(BlockRet::Noop); - } - let n = std::cmp::min(i.len(), o1.len()); - let n = std::cmp::min(n, o2.len()); - o1.fill_from_slice(&i.slice()[..n]); - o2.fill_from_slice(&i.slice()[..n]); - o1.produce(n, &tags); - o2.produce(n, &tags); - i.consume(n); - Ok(BlockRet::Ok) +impl Tee { + fn process_sync(&self, s: T) -> (T, T) { + (s, s) } }