Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
experimental = ["setup-scripts"]

[scripts.setup.decompress-test-data]
command = "tests/setup.sh"

# match all tests in the project
[[profile.default.overrides]]
filter = "test(/.*/)"
threads-required = 4

[[profile.default.scripts]]
filter = "test(/.*/)"
setup = "decompress-test-data"
3 changes: 3 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jobs:
key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }}
restore-keys: |
${{ runner.os }}-cargo-test-

- name: "Run tests/setup.sh"
run: tests/setup.sh

- name: Test
uses: houseabsolute/actions-rust-cross@v0
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

# testing files
/tests/samples/local
/tests/data/scmixology2_sample.fastq

Cargo.lock
**/*.rs.bk
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ serde = { version = "1.0.219", features = ["serde_derive"] }
serde_json = "1.0.140"
humansize = "2.1.3"
libc = "0.2.172"
indexed_deflate = "0.1.0"

[[bin]]
name = "nailpolish"
Expand Down
16 changes: 10 additions & 6 deletions src/io/index/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use regex::Regex;
use thiserror::Error;

use super::{storage::FileIndexPath, Index, ReadLocation, RecordIdentifier};
use crate::io::reads::QualityCompute;
use crate::io::reads::{QualityCompute, SequentialIndexedReader};

/// Constructs an index file for a FASTQ file, extracting barcodes and UMIs
/// from read headers using either a regex pattern or a cluster file
Expand All @@ -43,8 +43,7 @@ pub fn construct_index(cli: &crate::cli::IndexArgs) -> Result<()> {

let total_bytes = metadata.len();

let mut reader = BufReader::new(f);

let mut reader = SequentialIndexedReader::from_path(path.fastq())?;
let mut index = Index::new(path);

let size_formatter = FormatSizeOptions::from(humansize::BINARY)
Expand Down Expand Up @@ -116,7 +115,12 @@ pub fn construct_index(cli: &crate::cli::IndexArgs) -> Result<()> {
format_size(total_bytes, size_formatter)
);

index.mark_indexation_complete(reader.stream_position()? as f64 / (1024.0 * 1024.0))?;
let final_position = reader.stream_position()? as f64 / (1024.0 * 1024.0);

// finalise the gzip index, if the source file is a .gzip file
reader.finish_gzip_index()?;

index.mark_indexation_complete(final_position)?;
index.metadata().report_read_counts();

index.write()?;
Expand All @@ -126,7 +130,7 @@ pub fn construct_index(cli: &crate::cli::IndexArgs) -> Result<()> {

/// Process FASTQ reads using a regex to extract identifiers from headers
fn iter_lines_with_regex<F>(
reader: &mut BufReader<File>,
reader: &mut SequentialIndexedReader,
re: &regex::Regex,
mut callback: F,
) -> Result<()>
Expand Down Expand Up @@ -169,7 +173,7 @@ where
}

fn iter_lines_with_cluster_file<F>(
reader: &mut BufReader<File>,
reader: &mut SequentialIndexedReader,
cluster_file: &Path,
mut callback: F,
) -> Result<()>
Expand Down
12 changes: 7 additions & 5 deletions src/io/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use rkyv::{
use smallvec::{smallvec, SmallVec};

use crate::io::index::record_identifier::ArchivedRecordIdentifier;
use crate::io::reads::gzipped::GzippedFileReader;
use crate::io::reads::uncompressed::UncompressedFileReader;
use crate::io::reads::GroupedReadsAccessor;
use crate::utils::deserialize_standard;
Expand Down Expand Up @@ -203,12 +204,13 @@ impl ArchivedIndex {
&self,
index_path: &FileIndexPath,
) -> Result<Box<dyn GroupedReadsAccessor>> {
// for now, all reads are uncompressed
let is_zlib_compressed = false;
if is_zlib_compressed {
todo!()
let fastq_path = index_path.fastq();

if crate::utils::is_gzip_file(fastq_path) {
let reader = GzippedFileReader::new(fastq_path)
.with_context(|| format!("Error reading gzipped read file {}", fastq_path.display()))?;
Ok(Box::new(reader))
} else {
let fastq_path = index_path.fastq();
let reader = UncompressedFileReader::new(fastq_path)
.with_context(|| format!("Error reading read file {}", fastq_path.display()))?;
Ok(Box::new(reader))
Expand Down
6 changes: 6 additions & 0 deletions src/io/index/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ impl FileIndexPath {
&self.index
}

/// Returns the path for the gzip index file (.gzi2)
/// e.g., /path/to/data.fastq.gz -> /path/to/data.gzi2
pub fn gzip_index(&self) -> PathBuf {
self.path.with_extension("gzi2")
}

pub fn check_indexed(&self) -> Result<()> {
if self.index().exists() {
Ok(())
Expand Down
174 changes: 174 additions & 0 deletions src/io/reads/gzipped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
// Copyright 2025 Oliver Cheng <[email protected]> and the Davidson Lab.
// This program is distributed under the MIT License.
// We also ask that you cite this software in publications
// where you made use of it for any part of the data analysis.

use std::{
fs::File,
io::{Read, Seek, SeekFrom},
path::Path,
};

use anyhow::{Context, Result};
use indexed_deflate::GzDecoder;
use thiserror::Error;

use super::GroupedReadsAccessor;
use crate::io::index::{FileIndexPath, ReadLocation};

#[derive(Error, Debug)]
enum GzipReadError {
#[error("No reads were provided")]
EmptyReadsProvided,

#[error("Gzip index file not found at {path}. Please run the indexing step first.")]
IndexNotFound { path: String },
}

/// A reader that performs both sequential and random access reads from a gzip-compressed file
/// using indexed_deflate for efficient random access
pub struct GzippedFileReader {
seq: SequentialGzReader,
rnd: RandomAccessGzReader,
}

impl GzippedFileReader {
fn fetch_reads_random(&mut self, reads: &[ReadLocation]) -> Result<Vec<Vec<u8>>> {
reads.iter().map(|read| self.rnd.fetch(read)).collect()
}
}

impl GroupedReadsAccessor for GzippedFileReader {
fn new(file: &Path) -> Result<Self> {
Ok(Self {
seq: SequentialGzReader::new(file)?,
rnd: RandomAccessGzReader::new(file)?,
})
}

fn fetch_sequential_read(&mut self, read: &ReadLocation) -> Result<Vec<u8>> {
self.seq.fetch(read)
}

fn fetch_reads(&mut self, reads: &[ReadLocation]) -> Result<Vec<Vec<u8>>> {
// the first read (`head`) should be read using a sequential reader, while the
// tail is not sequential and should be read using a random reader
let (head, tail) = reads.split_at(1);
let head = head.first().ok_or(GzipReadError::EmptyReadsProvided)?;

let mut reads = vec![self.seq.fetch(head)?];
reads.extend(self.fetch_reads_random(tail)?);

Ok(reads)
}
}

pub trait SingleGzReadAccessor: Sized {
fn new(file: &Path) -> Result<Self>;
fn _fetch(&mut self, pos: u64, len: usize) -> Result<Vec<u8>>;

fn fetch(&mut self, read: &ReadLocation) -> Result<Vec<u8>> {
self._fetch(read.pos(), read.byte_len() as usize)
}
}

/// A reader that performs random access reads from a gzip-compressed file
pub struct RandomAccessGzReader {
decoder: GzDecoder<File, File>,
}

impl SingleGzReadAccessor for RandomAccessGzReader {
fn new(file: &Path) -> Result<Self> {
let file_index_path = FileIndexPath::new(file);
let index_path = file_index_path.gzip_index();

if !index_path.exists() {
return Err(GzipReadError::IndexNotFound {
path: index_path.display().to_string(),
}.into());
}

let data_file = File::open(file)?;
let index_file = File::open(&index_path)?;

let decoder = GzDecoder::new(data_file, index_file)
.context("Failed to create GzDecoder")?;

Ok(Self { decoder })
}

fn _fetch(&mut self, pos: u64, num_bytes: usize) -> Result<Vec<u8>> {
let mut buffer = vec![0; num_bytes];

self.decoder
.seek(SeekFrom::Start(pos))
.with_context(|| format!("Unable to seek gzip file at position {}", pos))?;

self.decoder
.read_exact(&mut buffer)
.with_context(|| format!("Could not read {num_bytes} bytes at position {pos}"))?;

debug!(
"RandomAccessGzReader: read contents ({pos}, {num_bytes}):\n\n{}\n\n",
std::str::from_utf8(&buffer).unwrap()
);

Ok(buffer)
}
}

/// A reader that performs sequential reads from a gzip-compressed file
pub struct SequentialGzReader {
decoder: GzDecoder<File, File>,
position: u64,
}

impl SingleGzReadAccessor for SequentialGzReader {
fn new(file: &Path) -> Result<Self> {
let file_index_path = FileIndexPath::new(file);
let index_path = file_index_path.gzip_index();

if !index_path.exists() {
return Err(GzipReadError::IndexNotFound {
path: index_path.display().to_string(),
}.into());
}

let data_file = File::open(file)?;
let index_file = File::open(&index_path)?;

let decoder = GzDecoder::new(data_file, index_file)
.context("Failed to create GzDecoder")?;

Ok(Self {
decoder,
position: 0,
})
}

fn _fetch(&mut self, pos: u64, num_bytes: usize) -> Result<Vec<u8>> {
let mut buffer = vec![0; num_bytes];

let offset = (pos as i64) - (self.position as i64);
if offset != 0 {
debug!(
"Gzip offset is NOT zero:\n {} -> {pos}, {num_bytes} bytes to read, offset: {offset}",
self.position
);
self.decoder.seek(SeekFrom::Start(pos))?;
}

self.position = pos + num_bytes as u64;

self.decoder
.read_exact(&mut buffer)
.with_context(|| format!("Could not read {num_bytes} bytes at position {pos}"))?;

debug!(
"SequentialGzReader: read contents ({pos}, {num_bytes}):\n\n{}\n\n",
std::str::from_utf8(&buffer).unwrap()
);

Ok(buffer)
}
}
3 changes: 3 additions & 0 deletions src/io/reads/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
// We also ask that you cite this software in publications
// where you made use of it for any part of the data analysis.

pub mod gzipped;
pub mod record;
pub mod sequential_indexed_reader;
pub mod uncompressed;

use anyhow::Result;

use crate::io::index::{DuplicateGroupLocation, ReadLocation};

pub use record::QualityCompute;
pub use sequential_indexed_reader::SequentialIndexedReader;

pub trait GroupedReadsAccessor {
fn new(file: &std::path::Path) -> Result<Self>
Expand Down
Loading