Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 23 additions & 9 deletions bigtools/src/bbi/bbiwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub struct BBIWriteOptions {
pub input_sort_type: InputSortType,
pub channel_size: usize,
pub inmemory: bool,
pub clip: bool
}

impl Default for BBIWriteOptions {
Expand All @@ -105,6 +106,7 @@ impl Default for BBIWriteOptions {
input_sort_type: InputSortType::ALL,
channel_size: 100,
inmemory: false,
clip: false
}
}
}
Expand Down Expand Up @@ -572,7 +574,7 @@ pub trait BBIDataSource: Sized {

fn process_to_bbi<
P: BBIDataProcessor<Value = Self::Value> + Send + 'static,
StartProcessing: FnMut(String) -> Result<P, ProcessDataError>,
StartProcessing: FnMut(String) -> Result<Option<P>, ProcessDataError>,
Advance: FnMut(P),
>(
&mut self,
Expand Down Expand Up @@ -820,16 +822,22 @@ pub(crate) fn write_vals<

(zooms_channels, ftx)
}
let mut do_read = |chrom: String| -> Result<_, ProcessDataError> {

let mut do_read = |chrom: String| -> Result<Option<_>, ProcessDataError> {
let length = match chrom_sizes.get(&chrom) {
Some(length) => *length,
None => {
if options.clip {
return Ok(None)
}
return Err(ProcessDataError::InvalidChromosome(format!(
"Input bedGraph contains chromosome that isn't in the input chrom sizes: {}",
chrom
)));
)
));
}
};

// Make a new id for the chromosome
let chrom_id = chrom_ids.get_id(&chrom);

Expand All @@ -842,9 +850,10 @@ pub(crate) fn write_vals<
options.clone(),
runtime.handle().clone(),
chrom,
length,
length
);
Ok(P::create(internal_data))

Ok(Some(P::create(internal_data)))
};

let mut advance = |p: P| {
Expand Down Expand Up @@ -963,10 +972,14 @@ pub(crate) fn write_vals_no_zoom<
let length = match chrom_sizes.get(&chrom) {
Some(length) => *length,
None => {
if options.clip {
return Ok(None)
}
return Err(ProcessDataError::InvalidChromosome(format!(
"Input bedGraph contains chromosome that isn't in the input chrom sizes: {}",
chrom
)));
)
));
}
};
// Make a new id for the chromosome
Expand All @@ -982,7 +995,7 @@ pub(crate) fn write_vals_no_zoom<
chrom,
length,
);
Ok(P::create(internal_data))
Ok(Some(P::create(internal_data)))
};

let mut advance = |p: P| {
Expand Down Expand Up @@ -1117,7 +1130,7 @@ pub(crate) fn write_zoom_vals<

let mut max_uncompressed_buf_size = 0;

let mut do_read = |chrom: String| -> Result<P, ProcessDataError> {
let mut do_read = |chrom: String| -> Result<Option<P>, ProcessDataError> {
// Make a new id for the chromosome
let chrom_id = *chrom_ids
.get(&chrom)
Expand Down Expand Up @@ -1149,7 +1162,8 @@ pub(crate) fn write_zoom_vals<
options.clone(),
runtime.handle().clone(),
);
Ok(P::create(internal_data))

Ok(Some(P::create(internal_data)))
};

let mut advance = |p: P| {
Expand Down
45 changes: 40 additions & 5 deletions bigtools/src/bbi/beddata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl<S: StreamingBedValues> BBIDataSource for BedParserStreamingIterator<S> {

fn process_to_bbi<
P: BBIDataProcessor<Value = Self::Value>,
StartProcessing: FnMut(String) -> Result<P, ProcessDataError>,
StartProcessing: FnMut(String) -> Result<Option<P>, ProcessDataError>,
Advance: FnMut(P),
>(
&mut self,
Expand All @@ -109,7 +109,20 @@ impl<S: StreamingBedValues> BBIDataSource for BedParserStreamingIterator<S> {
// The next value is the first
Some(Ok((chrom, val))) => {
let chrom = chrom.to_string();
let mut p = start_processing(chrom.clone())?;

let p = match start_processing(chrom.clone()) {
Ok(Some(processor)) => processor,
Ok(None) => {
// skip this chromosome
let next_val = self.bed_data.next();
return match next_val {
Some(Err(e)) => Err(BBIProcessError::SourceError(e)),
Some(Ok(_)) | None => Ok(()),
};
}
Err(e) => return Err(e.into()),
};

let next_val = self.bed_data.next();
let next_val = match next_val {
Some(Err(e)) => return Err(BBIProcessError::SourceError(e)),
Expand All @@ -120,11 +133,15 @@ impl<S: StreamingBedValues> BBIDataSource for BedParserStreamingIterator<S> {
Some(v) if v.0 == chrom => Some(&v.1),
_ => None,
};

p.do_process(val, next_value).await?;
((chrom, p), next_val)
}
};
loop {

// todo: how to implement the skip logic here?

next_val = match (&mut curr_state, next_val) {
// There are no more values
((_, _), None) => {
Expand Down Expand Up @@ -213,7 +230,7 @@ impl<V: Send + 'static> BBIDataSource for BedParserParallelStreamingIterator<V>

fn process_to_bbi<
P: BBIDataProcessor<Value = Self::Value> + Send + 'static,
StartProcessing: FnMut(String) -> Result<P, ProcessDataError>,
StartProcessing: FnMut(String) -> Result<Option<P>, ProcessDataError>,
Advance: FnMut(P),
>(
&mut self,
Expand Down Expand Up @@ -251,12 +268,30 @@ impl<V: Send + 'static> BBIDataSource for BedParserParallelStreamingIterator<V>
parse: self.parse_fn,
};

let mut p = start_processing(curr.1.clone())?;
let mut p = match start_processing(curr.1.clone()) {
Ok(processor) => processor,
Ok(None) => {
// Skip and fetch the next chromosome?
continue;
}
Err(e) => return Err(e.into()),
};

let curr_chrom = curr.1.clone();
let data: tokio::task::JoinHandle<Result<P, BBIProcessError<BedValueError>>> =
runtime.spawn(async move {
let mut next_val: Option<Result<(&str, V), BedValueError>> = None;


if p.is_none() {
// Skip processing this chromosome since the processor is `None`
println!("Skipping chromosome: {}", curr_chrom);
return Ok(p)
}

let mut p = p.unwrap(); // Unwrap the processor since we know it exists


loop {
let curr_value = match next_val.take() {
Some(v) => Some(v),
Expand Down Expand Up @@ -346,7 +381,7 @@ mod tests {
Ok(())
}
}
let mut start_processing = |_: String| Ok(TestBBIDataProcessor::create(()));
let mut start_processing = |_: String| Ok(Some(TestBBIDataProcessor::create(())));
let mut advance = |p: TestBBIDataProcessor| {
counts.push(p.count);
let _ = p.destroy();
Expand Down
2 changes: 1 addition & 1 deletion bigtools/src/bbi/bigbedwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async fn process_val(
current_val.start, current_val.end
)));
}
if current_val.start >= chrom_length {
if current_val.end >= chrom_length {
return Err(ProcessDataError::InvalidInput(format!(
"Invalid bed: `{}` is greater than the chromosome ({}) length ({})",
current_val.start, chrom, chrom_length
Expand Down
6 changes: 6 additions & 0 deletions bigtools/src/utils/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ pub struct BBIWriteArgs {
#[arg(long)]
#[arg(default_value_t = false)]
pub inmemory: bool,

/// If set, just issue warning messages rather than dying if bedgraph
/// file contains chromosomes that are not in the chrom.sizes file.
#[arg(long)]
#[arg(default_value_t = false)]
pub clip: bool,
}

macro_rules! compat_replace_mut {
Expand Down
3 changes: 2 additions & 1 deletion bigtools/src/utils/cli/bedgraphtobigwig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub struct BedGraphToBigWigArgs {

/// If set, indicates that only a single pass should be done on the input file. This is most useful
/// on large files in order to reduce total time. This automatically happens when the input is `stdin`.
#[arg(long)]
#[arg(short = 'c', long)]
#[arg(default_value_t = false)]
pub single_pass: bool,

Expand Down Expand Up @@ -87,6 +87,7 @@ pub fn bedgraphtobigwig(args: BedGraphToBigWigArgs) -> Result<(), Box<dyn Error>
outb.options.input_sort_type = input_sort_type;
outb.options.block_size = args.write_args.block_size;
outb.options.inmemory = args.write_args.inmemory;
outb.options.clip = args.write_args.clip;

let runtime = if nthreads == 1 {
outb.options.channel_size = 0;
Expand Down
7 changes: 5 additions & 2 deletions bigtools/src/utils/cli/bigwigmerge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ impl BBIDataSource for ChromGroupReadImpl {

fn process_to_bbi<
P: BBIDataProcessor<Value = Self::Value>,
StartProcessing: FnMut(String) -> Result<P, ProcessDataError>,
StartProcessing: FnMut(String) -> Result<Option<P>, ProcessDataError>,
Advance: FnMut(P),
>(
&mut self,
Expand All @@ -364,7 +364,10 @@ impl BBIDataSource for ChromGroupReadImpl {
self.iter.next();
match next {
Some(Ok((chrom, _, mut group))) => {
let mut p = start_processing(chrom)?;
let mut p = match start_processing(chrom) {
Ok(processor) => processor,
Err(e) => return Err(e.into()),
};

loop {
let current_val = match group.iter.next() {
Expand Down
Loading