Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,7 @@ CAMDAC_manual/
*.log.lintr
data/
bin/

# macOS
.DS_Store
**/.DS_Store
15 changes: 11 additions & 4 deletions R/ascat.R
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@
# Sort genomic loci
#' @title sort_genomic_dt
#' @keywords internal
sort_genomic_dt <- function(dt, with_chr = F) {
# In-place sorting to avoid R creating duplicate copy of objects
sort_genomic_dt <- function(dt, with_chr = FALSE) {
if (with_chr) {
fact_levels <- paste0("chr", c(1:22, "X", "Y"))
} else {
fact_levels <- c(1:22, "X", "Y")
}

dt[, chrom := factor(chrom, levels = fact_levels)]
return(dt[order(chrom, POS)])
data.table::setorder(dt, chrom, POS)

return(dt)
}


Expand Down Expand Up @@ -67,7 +71,7 @@ annotate_normal <- function(tsnps, nsnps, min_cov) {
setnames(nsnps, old = to_suffix, new = paste0(to_suffix, "_n"))
setkey(nsnps, chrom, POS)

tsnps <- merge(tsnps, nsnps, on = c("chrom", "POS"))
tsnps <- merge(tsnps, nsnps, by = c("chrom", "POS"))

tsnps <- tsnps[total_counts_n >= min_cov]

Expand Down Expand Up @@ -651,7 +655,10 @@ select_heterozygous_snps <- function(tsnps) {
# However, this breaks ASCAT, so should be used with caution.
# Note that ASCAT.m will select at 0.1 <> 0.9 as germline hom stretches required
# This must therefore be higher. Does not influence battenberg.m
tsnps <- tsnps[BAF_n >= 0.08 & BAF_n <= 0.92]

# Temporarily changed to 0.2 and 0.8 for high coverage WGBS data.
# TODO: fix this later !!!
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO comment not required

tsnps <- tsnps[BAF_n >= 0.2 & BAF_n <= 0.8]
return(tsnps)
}

Expand Down
21 changes: 11 additions & 10 deletions R/cmain.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ cmain_count_alleles <- function(sample, config) {
min_mapq <- config$min_mapq
min_cov <- config$min_cov

# Initialise parallel workers.
doParallel::registerDoParallel(cores = config$n_cores)

logging::loginfo("Counting alleles for %s", paste0(sample$patient_id, ":", sample$id), logger="CAMDAC")
# For each segment, load the appropriate SNP/CpG loci file segment and call allele counter in parallel
# Set warn=2 to ensure foreach fails if any of the parallel workers are terminated or raise a warning.
# without this option, foreach simply returns a warning and the pipeline continues. Essential for memory warning terminations.
options(warn = 2)
tmpfiles <- foreach(seg = segments, .combine = "c") %dopar% {
# Force fst and data.table to use a single thread inside the workers
fst::threads_fst(1)
data.table::setDTthreads(1)

loci_dt <- load_loci_for_segment(seg, loci_files)
ac_file <- cwrap_get_allele_counts(bam_file, seg, loci_dt, paired_end, drop_ccgg, min_mapq = min_mapq, min_cov = min_cov)
tmp <- tempfile(tmpdir = tempdir, fileext = ".fst")
Expand All @@ -69,18 +70,18 @@ cmain_count_alleles <- function(sample, config) {
options(warn = 0)

# Combine temporary files with allele counts results into a single data table
result <- foreach(i = tmpfiles, .combine = "rbind") %dopar% {
fst::read_fst(i, as.data.table = T)
}
# Sequential merging for memory safety
result_list <- lapply(tmpfiles, function(f) {
fst::read_fst(f, as.data.table = TRUE)
})
result <- data.table::rbindlist(result_list, use.names = TRUE, fill = TRUE)
rm(result_list)
gc()

# Write to output file
format_and_write_output(result, output_filename) # 2 lines
# Delete temporary files
fs::dir_delete(tempdir)

# Stop parallel workers. When running the pipeline multiple times in an R session,
# R re-uses workers but does not clear memory. Hence large objects in foreach loops will remain.
doParallel::stopImplicitCluster()
return(output_filename)
}

Expand Down
49 changes: 34 additions & 15 deletions R/pipeline.R
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,14 @@ pipeline_wgbs <- function(tumor, germline = NULL, infiltrates = NULL, origin = N
logging::loginfo("Pipeline start for %s", tumor$patient_id, logger="CAMDAC")

# Preprocess CpG, SNP and methylation data for all samples
preprocess_wgbs(
list(tumor, germline, infiltrates, origin),
config
sample_list <- list(
tumor = tumor,
germline = germline,
infiltrates = if (identical(infiltrates, germline)) NULL else infiltrates,
origin = if (identical(origin, germline)) NULL else origin
)

preprocess_wgbs(sample_list, config)

# Combine tumor-germline SNPs and call CNAs
cmain_bind_snps(tumor, germline, config)
Expand All @@ -55,22 +59,38 @@ pipeline_wgbs <- function(tumor, germline = NULL, infiltrates = NULL, origin = N
#' @param config. CamConfig object.
#' @export
#' @keywords internal

preprocess_wgbs <- function(sample_list, config) {
for (s in sample_list) {
# Go to next part of loop if its null
if (is.null(s)) {
next
}

# Count SNP and CpG alleles if a BAM file is provided
cmain_count_alleles(s, config)

# Prepare SNP data for CNA calling if allele counts are present
cmain_make_snps(s, config)

# Format methylation rates for deconvolution
cmain_make_methylation_profile(s, config)
}
logging::loginfo("Spawning isolated process for sample: %s", s$id, logger="CAMDAC")
# callr::r() creates a fresh background R session for this specific block of code
callr::r(
func = function(sample, config) {
library(CAMDAC)

# 1. Setup the cluster inside the isolated process
cl <- parallel::makeCluster(config$n_cores, type = "FORK")
doParallel::registerDoParallel(cl)

# 2. Run pipeline functions
CAMDAC:::cmain_count_alleles(sample, config)
CAMDAC:::cmain_make_snps(sample, config)
CAMDAC:::cmain_make_methylation_profile(sample, config)

# 3. Safely stop the cluster
parallel::stopCluster(cl)
},
args = list(sample = s, config = config),
show = TRUE
)

logging::loginfo("Finished sample %s. OS has reclaimed all memory.", s$id, logger="CAMDAC")
}
}


Expand Down Expand Up @@ -142,7 +162,7 @@ pipeline_rrbs <- function(tumor, germline, infiltrates, origin, config){
)

} else {
logging::loginfo("Preprocess RRBS tumour: %s.", ac_file, logger="CAMDAC")
logging::loginfo("RRBS tumour allele count file already exists: %s.", ac_file, logger="CAMDAC")
}

# Create SNP files and run ASCAT (tumor)
Expand Down Expand Up @@ -222,9 +242,8 @@ preprocess_rrbs_normal <- function(patient_id, sample_id, bam_file, min_tumor,
}

# Merge allele counts
is_normal <- ifelse(sample_id == normal_id, TRUE, FALSE)
format_output(
patient_id, sample_id, sex, is_normal, path, pipeline_files, build
patient_id, sample_id, sex, is_normal = TRUE, path, pipeline_files, build
)

loginfo("Allele counting finished.")
Expand Down
7 changes: 4 additions & 3 deletions R/run_ASCAT.m.R
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ run_ASCAT.m <- function (patient_id,sample_id,sex,
Germline_BAF=data.frame(rBAF_n=dt_sample_SNPs$rBAF_n),
Tumor_LogR_segmented=NULL, Tumor_BAF_segmented=NULL,
Tumor_counts=NULL, Germline_counts=NULL,
SNPpos=SNPpos[,c("chrom","POS")], chr=chr,
SNPpos = as.data.frame(SNPpos[, .(chrom, POS)]), chr=chr,
samples=paste(patient_id, sample_id, sep="."),
chrs=chr_names, ch=ch,
gender=sex, sexchromosomes=c("X", "Y"),
Expand All @@ -307,7 +307,7 @@ run_ASCAT.m <- function (patient_id,sample_id,sex,
ascat.frag <- ASCAT::ascat.aspcf(ascat.bc, ascat.gg=gg, penalty=200)
# penalty = 200 recommended for sequencing data

# fix issue with ascat.ascpcf renaming samples
# fix issue with ascat.aspcf renaming samples
ascat.frag$samples <- paste(patient_id, sample_id, sep=".")
rm(ascat.bc)

Expand Down Expand Up @@ -410,7 +410,8 @@ run_ASCAT.m <- function (patient_id,sample_id,sex,

split_genome_RRBS = function(SNPpos) {
# look for gaps of more than 1Mb and chromosome borders
SNPposnum <- SNPpos
# hard copy for data table to avoid modifying SNPpos
SNPposnum <- data.table::copy(SNPpos)
SNPposnum[, chrom := ifelse(chrom == "X", 23, chrom)]
SNPposnum <- SNPposnum[, lapply(.SD, as.numeric)]

Expand Down