diff --git a/bigtools/src/bbi/bbiwrite.rs b/bigtools/src/bbi/bbiwrite.rs index 69e4ba05..7ac1f4cb 100644 --- a/bigtools/src/bbi/bbiwrite.rs +++ b/bigtools/src/bbi/bbiwrite.rs @@ -91,6 +91,7 @@ pub struct BBIWriteOptions { pub input_sort_type: InputSortType, pub channel_size: usize, pub inmemory: bool, + pub clip: bool } impl Default for BBIWriteOptions { @@ -105,6 +106,7 @@ impl Default for BBIWriteOptions { input_sort_type: InputSortType::ALL, channel_size: 100, inmemory: false, + clip: false } } } @@ -572,7 +574,7 @@ pub trait BBIDataSource: Sized { fn process_to_bbi< P: BBIDataProcessor + Send + 'static, - StartProcessing: FnMut(String) -> Result, + StartProcessing: FnMut(String) -> Result, ProcessDataError>, Advance: FnMut(P), >( &mut self, @@ -820,16 +822,22 @@ pub(crate) fn write_vals< (zooms_channels, ftx) } - let mut do_read = |chrom: String| -> Result<_, ProcessDataError> { + + let mut do_read = |chrom: String| -> Result, ProcessDataError> { let length = match chrom_sizes.get(&chrom) { Some(length) => *length, None => { + if options.clip { + return Ok(None) + } return Err(ProcessDataError::InvalidChromosome(format!( "Input bedGraph contains chromosome that isn't in the input chrom sizes: {}", chrom - ))); + ) + )); } }; + // Make a new id for the chromosome let chrom_id = chrom_ids.get_id(&chrom); @@ -842,9 +850,10 @@ pub(crate) fn write_vals< options.clone(), runtime.handle().clone(), chrom, - length, + length ); - Ok(P::create(internal_data)) + + Ok(Some(P::create(internal_data))) }; let mut advance = |p: P| { @@ -963,10 +972,14 @@ pub(crate) fn write_vals_no_zoom< let length = match chrom_sizes.get(&chrom) { Some(length) => *length, None => { + if options.clip { + return Ok(None) + } return Err(ProcessDataError::InvalidChromosome(format!( "Input bedGraph contains chromosome that isn't in the input chrom sizes: {}", chrom - ))); + ) + )); } }; // Make a new id for the chromosome @@ -982,7 +995,7 @@ pub(crate) fn write_vals_no_zoom< chrom, length, ); - Ok(P::create(internal_data)) + Ok(Some(P::create(internal_data))) }; let mut advance = |p: P| { @@ -1117,7 +1130,7 @@ pub(crate) fn write_zoom_vals< let mut max_uncompressed_buf_size = 0; - let mut do_read = |chrom: String| -> Result { + let mut do_read = |chrom: String| -> Result, ProcessDataError> { // Make a new id for the chromosome let chrom_id = *chrom_ids .get(&chrom) @@ -1149,7 +1162,8 @@ pub(crate) fn write_zoom_vals< options.clone(), runtime.handle().clone(), ); - Ok(P::create(internal_data)) + + Ok(Some(P::create(internal_data))) }; let mut advance = |p: P| { diff --git a/bigtools/src/bbi/beddata.rs b/bigtools/src/bbi/beddata.rs index 53c2ba8b..0ef76912 100644 --- a/bigtools/src/bbi/beddata.rs +++ b/bigtools/src/bbi/beddata.rs @@ -91,7 +91,7 @@ impl BBIDataSource for BedParserStreamingIterator { fn process_to_bbi< P: BBIDataProcessor, - StartProcessing: FnMut(String) -> Result, + StartProcessing: FnMut(String) -> Result, ProcessDataError>, Advance: FnMut(P), >( &mut self, @@ -109,7 +109,20 @@ impl BBIDataSource for BedParserStreamingIterator { // The next value is the first Some(Ok((chrom, val))) => { let chrom = chrom.to_string(); - let mut p = start_processing(chrom.clone())?; + + let p = match start_processing(chrom.clone()) { + Ok(Some(processor)) => processor, + Ok(None) => { + // skip this chromosome + let next_val = self.bed_data.next(); + return match next_val { + Some(Err(e)) => Err(BBIProcessError::SourceError(e)), + Some(Ok(_)) | None => Ok(()), + }; + } + Err(e) => return Err(e.into()), + }; + let next_val = self.bed_data.next(); let next_val = match next_val { Some(Err(e)) => return Err(BBIProcessError::SourceError(e)), @@ -120,11 +133,15 @@ impl BBIDataSource for BedParserStreamingIterator { Some(v) if v.0 == chrom => Some(&v.1), _ => None, }; + p.do_process(val, next_value).await?; ((chrom, p), next_val) } }; loop { + + // todo: how to implement the skip logic here? + next_val = match (&mut curr_state, next_val) { // There are no more values ((_, _), None) => { @@ -213,7 +230,7 @@ impl BBIDataSource for BedParserParallelStreamingIterator fn process_to_bbi< P: BBIDataProcessor + Send + 'static, - StartProcessing: FnMut(String) -> Result, + StartProcessing: FnMut(String) -> Result, ProcessDataError>, Advance: FnMut(P), >( &mut self, @@ -251,12 +268,30 @@ impl BBIDataSource for BedParserParallelStreamingIterator parse: self.parse_fn, }; - let mut p = start_processing(curr.1.clone())?; + let mut p = match start_processing(curr.1.clone()) { + Ok(processor) => processor, + Ok(None) => { + // Skip and fetch the next chromosome? + continue; + } + Err(e) => return Err(e.into()), + }; + let curr_chrom = curr.1.clone(); let data: tokio::task::JoinHandle>> = runtime.spawn(async move { let mut next_val: Option> = None; + + if p.is_none() { + // Skip processing this chromosome since the processor is `None` + println!("Skipping chromosome: {}", curr_chrom); + return Ok(p) + } + + let mut p = p.unwrap(); // Unwrap the processor since we know it exists + + loop { let curr_value = match next_val.take() { Some(v) => Some(v), @@ -346,7 +381,7 @@ mod tests { Ok(()) } } - let mut start_processing = |_: String| Ok(TestBBIDataProcessor::create(())); + let mut start_processing = |_: String| Ok(Some(TestBBIDataProcessor::create(()))); let mut advance = |p: TestBBIDataProcessor| { counts.push(p.count); let _ = p.destroy(); diff --git a/bigtools/src/bbi/bigbedwrite.rs b/bigtools/src/bbi/bigbedwrite.rs index 68df5603..1d40b8c1 100644 --- a/bigtools/src/bbi/bigbedwrite.rs +++ b/bigtools/src/bbi/bigbedwrite.rs @@ -260,7 +260,7 @@ async fn process_val( current_val.start, current_val.end ))); } - if current_val.start >= chrom_length { + if current_val.end >= chrom_length { return Err(ProcessDataError::InvalidInput(format!( "Invalid bed: `{}` is greater than the chromosome ({}) length ({})", current_val.start, chrom, chrom_length diff --git a/bigtools/src/utils/cli.rs b/bigtools/src/utils/cli.rs index b7d6573c..b0ccdeea 100644 --- a/bigtools/src/utils/cli.rs +++ b/bigtools/src/utils/cli.rs @@ -63,6 +63,12 @@ pub struct BBIWriteArgs { #[arg(long)] #[arg(default_value_t = false)] pub inmemory: bool, + + /// If set, just issue warning messages rather than dying if bedgraph + /// file contains chromosomes that are not in the chrom.sizes file. + #[arg(long)] + #[arg(default_value_t = false)] + pub clip: bool, } macro_rules! compat_replace_mut { diff --git a/bigtools/src/utils/cli/bedgraphtobigwig.rs b/bigtools/src/utils/cli/bedgraphtobigwig.rs index 84206a43..a1d41d43 100644 --- a/bigtools/src/utils/cli/bedgraphtobigwig.rs +++ b/bigtools/src/utils/cli/bedgraphtobigwig.rs @@ -35,7 +35,7 @@ pub struct BedGraphToBigWigArgs { /// If set, indicates that only a single pass should be done on the input file. This is most useful /// on large files in order to reduce total time. This automatically happens when the input is `stdin`. - #[arg(long)] + #[arg(short = 'c', long)] #[arg(default_value_t = false)] pub single_pass: bool, @@ -87,6 +87,7 @@ pub fn bedgraphtobigwig(args: BedGraphToBigWigArgs) -> Result<(), Box outb.options.input_sort_type = input_sort_type; outb.options.block_size = args.write_args.block_size; outb.options.inmemory = args.write_args.inmemory; + outb.options.clip = args.write_args.clip; let runtime = if nthreads == 1 { outb.options.channel_size = 0; diff --git a/bigtools/src/utils/cli/bigwigmerge.rs b/bigtools/src/utils/cli/bigwigmerge.rs index da446a42..15765892 100644 --- a/bigtools/src/utils/cli/bigwigmerge.rs +++ b/bigtools/src/utils/cli/bigwigmerge.rs @@ -351,7 +351,7 @@ impl BBIDataSource for ChromGroupReadImpl { fn process_to_bbi< P: BBIDataProcessor, - StartProcessing: FnMut(String) -> Result, + StartProcessing: FnMut(String) -> Result, ProcessDataError>, Advance: FnMut(P), >( &mut self, @@ -364,7 +364,10 @@ impl BBIDataSource for ChromGroupReadImpl { self.iter.next(); match next { Some(Ok((chrom, _, mut group))) => { - let mut p = start_processing(chrom)?; + let mut p = match start_processing(chrom) { + Ok(processor) => processor, + Err(e) => return Err(e.into()), + }; loop { let current_val = match group.iter.next() {