Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# testing files
/tests/samples/local
/tests/data/scmixology2_sample.fastq
/tests/data/scmixology2_sample.fastq.nailpolish.idx

Cargo.lock
**/*.rs.bk
Expand Down
4 changes: 4 additions & 0 deletions src/cli/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ pub struct ConsensusArgs {
/// on runtime.
#[arg(long, default_value_t = 250)]
pub max_group_size: usize,

/// sort groups by the specified capture group tag (e.g., 'CB' for cell barcode)
#[arg(long)]
pub sort_by: Option<String>,
}

/// Extract reads beloning to specific group queries a .fastq file, unmodified.
Expand Down
13 changes: 12 additions & 1 deletion src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,18 @@ pub fn consensus(cli: &crate::cli::ConsensusArgs) -> Result<()> {
let captures = index.captures();

let buffer_chunk_size = 1024 * cli.threads; // number of groups to multithread at one time
for chunk in &index.groups().chunks(buffer_chunk_size) {

let indices = match &cli.sort_by {
Some(tag) => {
info!("Determining sort order");
let indices = index.indices_by_sorted_tag(tag)?;
info!("Starting consensus calling");
indices
}
None => index.indices_by_default_order(),
};

for chunk in &index.groups_by_index(&indices).chunks(buffer_chunk_size) {
let chunk_groups = chunk
.into_iter()
.map(|group_loc| -> Result<Vec<_>> {
Expand Down
4 changes: 2 additions & 2 deletions src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub fn extract(args: &crate::cli::ExtractArgs) -> anyhow::Result<()> {
let re = regex::Regex::new(key)?;

index
.groups()
.groups_by_index(&index.indices_by_default_order())
.filter_map(|group| {
if re.is_match(&group.key.0) {
return Some(group.id);
Expand All @@ -51,7 +51,7 @@ pub fn extract(args: &crate::cli::ExtractArgs) -> anyhow::Result<()> {
.collect()
} else if let Some(group_size) = &args.group_size {
index
.groups()
.groups_by_index(&index.indices_by_default_order())
.filter_map(|group| {
if group.reads.len() == *group_size {
Some(group.id)
Expand Down
73 changes: 58 additions & 15 deletions src/io/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ use std::time::SystemTimeError;

use anyhow::{Context, Result};
use indexmap::IndexMap;
use itertools::Itertools;
use needletail::parser::SequenceRecord;
use rayon::slice::ParallelSliceMut;
use rkyv::{
option::ArchivedOption, string::ArchivedString, vec::ArchivedVec, Archive, Deserialize,
Serialize,
Expand Down Expand Up @@ -178,18 +180,6 @@ impl Index {
}

impl ArchivedIndex {
/// Returns an iterator over the duplicate groups in the index.
pub fn groups(&self) -> impl Iterator<Item = DuplicateGroupLocation> + use<'_> {
self.groups
.iter()
.enumerate()
.map(|(index, (key, value))| DuplicateGroupLocation {
key: deserialize_standard(key),
reads: deserialize_standard(value),
id: index,
})
}

pub fn get_hashmap(
&self,
) -> &rkyv::collections::swiss_table::ArchivedIndexMap<
Expand All @@ -205,10 +195,11 @@ impl ArchivedIndex {
index_path: &FileIndexPath,
) -> Result<Box<dyn GroupedReadsAccessor>> {
let fastq_path = index_path.fastq();

if crate::utils::is_gzip_file(fastq_path) {
let reader = GzippedFileReader::new(fastq_path)
.with_context(|| format!("Error reading gzipped read file {}", fastq_path.display()))?;
let reader = GzippedFileReader::new(fastq_path).with_context(|| {
format!("Error reading gzipped read file {}", fastq_path.display())
})?;
Ok(Box::new(reader))
} else {
let reader = UncompressedFileReader::new(fastq_path)
Expand Down Expand Up @@ -236,4 +227,56 @@ impl ArchivedIndex {
pub fn captures(&self) -> &ArchivedVec<ArchivedOption<ArchivedString>> {
&self.captures
}

/// Find the position of a capture group in the identifier string
pub fn capture_index(&self, tag: &str) -> Option<usize> {
self.captures().iter().position(|name| {
if let Some(archived_name) = name.as_ref() {
archived_name.as_str() == tag
} else {
false
}
})
}

pub fn groups_by_index<'a>(
&'a self,
indices: &'a [usize],
) -> impl Iterator<Item = DuplicateGroupLocation> + 'a {
indices.iter().map(|index| {
let index = *index;
let (key, value) = self.groups.get_index(index).unwrap();
DuplicateGroupLocation {
key: deserialize_standard(key),
reads: deserialize_standard(value),
id: index,
}
})
}

pub fn indices_by_default_order(&self) -> Vec<usize> {
(0..self.groups.len()).collect()
}

pub fn indices_by_sorted_tag(&self, tag: &str) -> Result<Vec<usize>> {
let tag_idx = self.capture_index(tag).with_context(|| {
let captures: Vec<_> = self.captures().iter().flatten().collect();
format!("Unknown sort tag {tag}. Available tags: {captures:?}")
})?;

// "The key-value pairs are indexed in a compact range without holes in the range 0..self.len()"
// - https://docs.rs/indexmap/latest/indexmap/map/struct.IndexMap.html
// As a result, we can get the index and key using enumerate.
// v.0 is index, v.1 is sort component
let mut keys: Vec<_> = self
.groups
.keys()
.enumerate()
.map(|(i, key)| key.component(tag_idx).map(|c| (i, c)))
.try_collect()?;

keys.par_sort_by_key(|v| v.1);

Ok(keys.into_iter().map(|v| v.0).collect())
}
}
19 changes: 19 additions & 0 deletions src/io/index/record_identifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use std::fmt;

use anyhow::{Context, Result};
use rkyv::{Archive, Deserialize, Serialize};

/// A RecordIdentifier is a store of the BC/UMI identifier of a read.
Expand All @@ -23,6 +24,24 @@ impl RecordIdentifier {
pub fn components(&self) -> std::str::Split<'_, char> {
self.0.split('\0')
}

/// Return the N-th component, if present
pub fn component(&self, idx: usize) -> Option<&str> {
self.components().nth(idx)
}
}

impl ArchivedRecordIdentifier {
pub fn components(&self) -> std::str::Split<'_, char> {
self.0.split('\0')
}

/// Return the N-th component, if present
pub fn component(&self, idx: usize) -> Result<&str> {
self.components()
.nth(idx)
.context("Component does not exist")
}
}

impl fmt::Display for RecordIdentifier {
Expand Down
2 changes: 1 addition & 1 deletion src/summary/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub fn summarize_index(index: &ArchivedIndex) -> IndexStatistics {
debug!("Creating count index...");
let mut map = BTreeMap::new();

for group in index.groups() {
for group in index.groups_by_index(&index.indices_by_default_order()) {
let count = group.reads.len();

let entry = map.entry(count).or_insert(RowData::new());
Expand Down
31 changes: 31 additions & 0 deletions tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,37 @@ fn consensus_3t_with_clustering() {
let _ = Command::new("diff").args([&output, CORRECT_FILE]).unwrap();
}

#[test]
fn consensus_sorted() {
let (_temp_dir, input, output) = make_temp_dir(SAMPLE_FASTQ, "input.fastq", true);

let _ = nailpolish_bin()
.args([
"consensus",
&input,
"-o",
&output,
"--threads",
"3",
"--sort-by",
"CB",
"--report-original-header",
"--report-original-reads",
])
.assert()
.success();

const CORRECT_FILE: &str = "tests/correct/consensus_with_cluster.fastq";

// we just look at the headers to see if they match up
let cmp_cmd = format!(
"diff \
<(awk 'NR % 4 == 1' {CORRECT_FILE} | sort) \
<(awk 'NR % 4 == 1' {output} | sort)"
);
let _ = Command::new("bash").args(["-c", &cmp_cmd]).unwrap();
}

#[test]
fn consensus_3t_gz_with_clustering() {
let (_temp_dir, input, output) = make_temp_dir(SAMPLE_FASTQ_GZ, "input.fastq.gz", true);
Expand Down