From 60a31eb1d76eaf295b19c7e7034619ea1d262290 Mon Sep 17 00:00:00 2001 From: Lesky47 Date: Wed, 11 Feb 2026 15:51:13 -0600 Subject: [PATCH 1/3] Fix data.table subsetting compatability bug with ASCAT v3.2 ASCAT expects data frames; convert SNPpos to data.frame. Deep copy of SNPpos to avoid modifying the original object by reference. --- .gitignore | 4 ++++ R/run_ASCAT.m.R | 7 ++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 62f64ca..4f32b8e 100755 --- a/.gitignore +++ b/.gitignore @@ -46,3 +46,7 @@ CAMDAC_manual/ *.log.lintr data/ bin/ + +# macOS +.DS_Store +**/.DS_Store diff --git a/R/run_ASCAT.m.R b/R/run_ASCAT.m.R index 25ec440..01f5bdf 100755 --- a/R/run_ASCAT.m.R +++ b/R/run_ASCAT.m.R @@ -286,7 +286,7 @@ run_ASCAT.m <- function (patient_id,sample_id,sex, Germline_BAF=data.frame(rBAF_n=dt_sample_SNPs$rBAF_n), Tumor_LogR_segmented=NULL, Tumor_BAF_segmented=NULL, Tumor_counts=NULL, Germline_counts=NULL, - SNPpos=SNPpos[,c("chrom","POS")], chr=chr, + SNPpos = as.data.frame(SNPpos[, .(chrom, POS)]), chr=chr, samples=paste(patient_id, sample_id, sep="."), chrs=chr_names, ch=ch, gender=sex, sexchromosomes=c("X", "Y"), @@ -307,7 +307,7 @@ run_ASCAT.m <- function (patient_id,sample_id,sex, ascat.frag <- ASCAT::ascat.aspcf(ascat.bc, ascat.gg=gg, penalty=200) # penalty = 200 recommended for sequencing data - # fix issue with ascat.ascpcf renaming samples + # fix issue with ascat.aspcf renaming samples ascat.frag$samples <- paste(patient_id, sample_id, sep=".") rm(ascat.bc) @@ -410,7 +410,8 @@ run_ASCAT.m <- function (patient_id,sample_id,sex, split_genome_RRBS = function(SNPpos) { # look for gaps of more than 1Mb and chromosome borders - SNPposnum <- SNPpos + # hard copy for data table to avoid modifying SNPpos + SNPposnum <- data.table::copy(SNPpos) SNPposnum[, chrom := ifelse(chrom == "X", 23, chrom)] SNPposnum <- SNPposnum[, lapply(.SD, as.numeric)] From 2d2d6664bee2a927fa6f2affede64b836e89bb72 Mon Sep 17 00:00:00 2001 From: slai4 Date: Tue, 24 Feb 2026 14:49:12 -0600 Subject: [PATCH 2/3] Optimize memory management and fix parallel deadlocks for HPC - Replaced DT sorting with setorder. Replaced 'foreach' + '.combine="rbind"' with 'lapply' + 'rbindlist' to avoid quadratic copying. Peak RAM dropped from >400GB to ~20GB with 16 cores in this step. Runtime dropped from 1.5 hrs to seconds. - Updated parallelization in allele counting with 'callr' package. Previous 'doParallel::stopImplicitCluster' creates OpenMP deadlocks for the second bam at allele counting step. Now using callr::r() to isolate per-sample processing. Reclaims memory and clears locks between BAM files. - Bugfix: Corrected invalid 'on=' argument to 'by=' in SNP data.table merge --- R/ascat.R | 15 +++++++++++---- R/cmain.R | 21 +++++++++++---------- R/pipeline.R | 34 +++++++++++++++++++++++++--------- 3 files changed, 47 insertions(+), 23 deletions(-) diff --git a/R/ascat.R b/R/ascat.R index ded42bc..9073910 100755 --- a/R/ascat.R +++ b/R/ascat.R @@ -3,14 +3,18 @@ # Sort genomic loci #' @title sort_genomic_dt #' @keywords internal -sort_genomic_dt <- function(dt, with_chr = F) { +# In-place sorting to avoid R creating duplicate copy of objects +sort_genomic_dt <- function(dt, with_chr = FALSE) { if (with_chr) { fact_levels <- paste0("chr", c(1:22, "X", "Y")) } else { fact_levels <- c(1:22, "X", "Y") } + dt[, chrom := factor(chrom, levels = fact_levels)] - return(dt[order(chrom, POS)]) + data.table::setorder(dt, chrom, POS) + + return(dt) } @@ -67,7 +71,7 @@ annotate_normal <- function(tsnps, nsnps, min_cov) { setnames(nsnps, old = to_suffix, new = paste0(to_suffix, "_n")) setkey(nsnps, chrom, POS) - tsnps <- merge(tsnps, nsnps, on = c("chrom", "POS")) + tsnps <- merge(tsnps, nsnps, by = c("chrom", "POS")) tsnps <- tsnps[total_counts_n >= min_cov] @@ -651,7 +655,10 @@ select_heterozygous_snps <- function(tsnps) { # However, this breaks ASCAT, so should be used with caution. # Note that ASCAT.m will select at 0.1 <> 0.9 as germline hom stretches required # This must therefore be higher. Does not influence battenberg.m - tsnps <- tsnps[BAF_n >= 0.08 & BAF_n <= 0.92] + + # Temporarily changed to 0.2 and 0.8 for high coverage WGBS data. + # TODO: fix this later !!! + tsnps <- tsnps[BAF_n >= 0.2 & BAF_n <= 0.8] return(tsnps) } diff --git a/R/cmain.R b/R/cmain.R index 53acbce..882c249 100755 --- a/R/cmain.R +++ b/R/cmain.R @@ -51,15 +51,16 @@ cmain_count_alleles <- function(sample, config) { min_mapq <- config$min_mapq min_cov <- config$min_cov - # Initialise parallel workers. - doParallel::registerDoParallel(cores = config$n_cores) - logging::loginfo("Counting alleles for %s", paste0(sample$patient_id, ":", sample$id), logger="CAMDAC") # For each segment, load the appropriate SNP/CpG loci file segment and call allele counter in parallel # Set warn=2 to ensure foreach fails if any of the parallel workers are terminated or raise a warning. # without this option, foreach simply returns a warning and the pipeline continues. Essential for memory warning terminations. options(warn = 2) tmpfiles <- foreach(seg = segments, .combine = "c") %dopar% { + # Force fst and data.table to use a single thread inside the workers + fst::threads_fst(1) + data.table::setDTthreads(1) + loci_dt <- load_loci_for_segment(seg, loci_files) ac_file <- cwrap_get_allele_counts(bam_file, seg, loci_dt, paired_end, drop_ccgg, min_mapq = min_mapq, min_cov = min_cov) tmp <- tempfile(tmpdir = tempdir, fileext = ".fst") @@ -69,18 +70,18 @@ cmain_count_alleles <- function(sample, config) { options(warn = 0) # Combine temporary files with allele counts results into a single data table - result <- foreach(i = tmpfiles, .combine = "rbind") %dopar% { - fst::read_fst(i, as.data.table = T) - } + # Sequential merging for memory safety + result_list <- lapply(tmpfiles, function(f) { + fst::read_fst(f, as.data.table = TRUE) + }) + result <- data.table::rbindlist(result_list, use.names = TRUE, fill = TRUE) + rm(result_list) + gc() # Write to output file format_and_write_output(result, output_filename) # 2 lines # Delete temporary files fs::dir_delete(tempdir) - - # Stop parallel workers. When running the pipeline multiple times in an R session, - # R re-uses workers but does not clear memory. Hence large objects in foreach loops will remain. - doParallel::stopImplicitCluster() return(output_filename) } diff --git a/R/pipeline.R b/R/pipeline.R index 5127bb9..4757304 100644 --- a/R/pipeline.R +++ b/R/pipeline.R @@ -55,6 +55,7 @@ pipeline_wgbs <- function(tumor, germline = NULL, infiltrates = NULL, origin = N #' @param config. CamConfig object. #' @export #' @keywords internal + preprocess_wgbs <- function(sample_list, config) { for (s in sample_list) { # Go to next part of loop if its null @@ -62,15 +63,30 @@ preprocess_wgbs <- function(sample_list, config) { next } - # Count SNP and CpG alleles if a BAM file is provided - cmain_count_alleles(s, config) - - # Prepare SNP data for CNA calling if allele counts are present - cmain_make_snps(s, config) - - # Format methylation rates for deconvolution - cmain_make_methylation_profile(s, config) - } + logging::loginfo("Spawning isolated process for sample: %s", s$id, logger="CAMDAC") + # callr::r() creates a fresh background R session for this specific block of code + callr::r( + func = function(sample, config) { + library(CAMDAC) + + # 1. Setup the cluster inside the isolated process + cl <- parallel::makeCluster(config$n_cores, type = "FORK") + doParallel::registerDoParallel(cl) + + # 2. Run pipeline functions + CAMDAC:::cmain_count_alleles(sample, config) + CAMDAC:::cmain_make_snps(sample, config) + CAMDAC:::cmain_make_methylation_profile(sample, config) + + # 3. Safely stop the cluster + parallel::stopCluster(cl) + }, + args = list(sample = s, config = config), + show = TRUE + ) + + logging::loginfo("Finished sample %s. OS has reclaimed all memory.", s$id, logger="CAMDAC") +} } From cf4d4d62b35169af2f83ad60fdfda29205d1f160 Mon Sep 17 00:00:00 2001 From: slai4 Date: Tue, 24 Feb 2026 16:32:26 -0600 Subject: [PATCH 3/3] Updated log info when certain files already exist. --- R/pipeline.R | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/R/pipeline.R b/R/pipeline.R index 4757304..4d77f11 100644 --- a/R/pipeline.R +++ b/R/pipeline.R @@ -30,10 +30,14 @@ pipeline_wgbs <- function(tumor, germline = NULL, infiltrates = NULL, origin = N logging::loginfo("Pipeline start for %s", tumor$patient_id, logger="CAMDAC") # Preprocess CpG, SNP and methylation data for all samples - preprocess_wgbs( - list(tumor, germline, infiltrates, origin), - config + sample_list <- list( + tumor = tumor, + germline = germline, + infiltrates = if (identical(infiltrates, germline)) NULL else infiltrates, + origin = if (identical(origin, germline)) NULL else origin ) + + preprocess_wgbs(sample_list, config) # Combine tumor-germline SNPs and call CNAs cmain_bind_snps(tumor, germline, config) @@ -158,7 +162,7 @@ pipeline_rrbs <- function(tumor, germline, infiltrates, origin, config){ ) } else { - logging::loginfo("Preprocess RRBS tumour: %s.", ac_file, logger="CAMDAC") + logging::loginfo("RRBS tumour allele count file already exists: %s.", ac_file, logger="CAMDAC") } # Create SNP files and run ASCAT (tumor) @@ -238,9 +242,8 @@ preprocess_rrbs_normal <- function(patient_id, sample_id, bam_file, min_tumor, } # Merge allele counts - is_normal <- ifelse(sample_id == normal_id, TRUE, FALSE) format_output( - patient_id, sample_id, sex, is_normal, path, pipeline_files, build + patient_id, sample_id, sex, is_normal = TRUE, path, pipeline_files, build ) loginfo("Allele counting finished.")