Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved the current SDA workflow to reach the North American runs with 6400 sites. #3340

Open
wants to merge 64 commits into
base: develop
Choose a base branch
from

Conversation

DongchenZ
Copy link
Contributor

@DongchenZ DongchenZ commented Jul 22, 2024

Description

Motivation and Context

  1. The foreach package seems to be better compared to the furrr package concerning memory allocation. Thus, in this PR, I replaced every furrr with foreach during the general SDA workflow.
  2. The computational power and memory are limited when executing certain SDA procedures locally (e.g., splitting meteorology files, writing configuration files, reading SDA outputs, running Bayesian MCMC analysis part, and removing files (e.g., removing NC files after the first SDA run)). Therefore, in the PR, I developed features of qsub submissions (specified by the batch.settings section in the XML file, which is also documented in the pecan book) during the SDA workflow.
  3. To avoid the complex if-else usage within the current sda.enkf_Multisite function for the above batch job submissions, I developed a new sda.enkf_NorthAmerica function (although most of them are just copy-and-paste from the sda.enkf_Multisite function), which is cleaner and will only be used if the batch.settings is not empty.

Review Time Estimate

  • Immediately
  • Within one week
  • When possible

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)

Checklist:

  • My change requires a change to the documentation.
  • My name is in the list of CITATION.cff
  • I have updated the CHANGELOG.md.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Comment on lines 697 to 700
PEcAn.logger::logger.info("Checking results.")
while (length(list.files(outdir, pattern="results.Rdata", recursive = T)) < folder.num) {
Sys.sleep(60)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flagging that this has high potential to become an infinite loop if any job fails / is rejected by the queue / etc

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, I'm cautious of submitting anything to qsub and waiting for it to complete before returning from the submitting function -- Part of the point of qsub is that it does the waiting for you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if I resolve this correctly. Here, instead, I detect the job completion by checking if the job still exists on the server. This will ensure a finite while loop even if, for some reason, some jobs never get finished and are killed by the server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

detect the job completion by checking if the job still exists on the server

👍 That seems like a solid improvement. -- still has the potential to be waiting a long time if the queue is stalled / moving slowly, but will only be waiting for outputs the server still plans to provide.

load(file.path(folder.path, "block.Rdata"))
# initialize parallel.
cores <- parallel::detectCores()
if (cores > 28) cores <- 28
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a BU-specific number and should be configurable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 751 to 753
results <- foreach::foreach(l = blocks, .packages=c("Kendall", "purrr"), .options.snow=opts) %dopar% {
MCMC_block_function(l)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naive question: How closely equivalent is this to parallel::parLapply(cl, blocks, MCMC_block_function)? Would it be worth considering that approach since it only uses existing dependencies?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, I haven't used this parallel function before. The reason that I used foreach is that it has great management of memory usage with large data inputs. But I can try this function out to see if it performs similarly to the foreach package.

dir.create(folder.path)
# save corresponding block list to the folder.
blocks <- block.list[head.num:tail.num]
save(blocks, file = file.path(folder.path, "block.Rdata"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to tell what the structure of blocks actually is, but is this something that could be saved as a csv dataframe or something similar to avoid Rdata? At the minimum I'd recommend switching to RDS, which is more transparent about what objects are being reloaded.

blocks <- block.list[head.num:tail.num]
save(blocks, file = file.path(folder.path, "block.Rdata"))
# create job file.
jobsh <- readLines(con = system.file("analysis_qsub.job", package = "PEcAn.ModelName"), n = -1, warn=FALSE)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the generic template model being hard-coded here? Do we really want to create a package dependency of the SDA module on the generic template model?

##' @title qsub_analysis_submission
##' @param block.list list: MCMC configuration lists for the block SDA analysis.
##' @param outdir character: SDA output path.
##' @param job.per.folder numeric: number of jobs per folder.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic seems backwards to me. Generally with a HPC or cloud queueing system you want to specify the number of cores and/or nodes that you have, and then divide jobs across them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

based on folder numbers.

for (i in 1:folder.num) {
# create folder for each set of job runs.
# calculate start and end index for the current folder.
head.num <- (i-1)*job.per.folder + 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all jobs expected to take the same amount of time? If not (e.g. could one block have 1 site while another block has 1000 sites?), can we estimate which are expected to be longer or shorter so that we can load balance a bit more intelligently than doing so uniformly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be balanced by number of sites within each block.

jobsh <- gsub("@FOLDER_PATH@", folder.path, jobsh)
writeLines(jobsh, con = file.path(folder.path, "job.sh"))
# qsub command.
qsub <- "qsub -l h_rt=48:00:00 -l buyin -pe omp 28 -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qsub settings are system and user specific, and thus need to be read from the settings object, not hard-coded. Also, I'd STRONGLY recommend setting up a single array-style qsub over submitting multiple jobs in a loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See remote execution. Also a lot of this code seems to be in the PEcAn::remote package for example start_qsub

@@ -0,0 +1,5 @@
#!/bin/bash -l
module load R/4.1.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very server specific, in the pecan.xml we have a section for qsub that allows you to specify the modules that need to be loaded. See https://pecanproject.github.io/pecan-documentation/master/xml-core-config.html#xml-host for an example.

# loop over sub-folders.
folder.paths <- job.ids <- c()
PEcAn.logger::logger.info(paste("Submitting", folder.num, "jobs."))
for (i in 1:folder.num) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some systems will penalize you if you do many submissions in parallel, or only run two and then wait for those to be done, and more jobs you submit, the lower your priority in the queue. To overcome some of this I added the modellauncher this will create a text file with as first line the command to execute, and will have a list of folders it needs to run the command in.

jobsh <- gsub("@FOLDER_PATH@", folder.path, jobsh)
writeLines(jobsh, con = file.path(folder.path, "job.sh"))
# qsub command.
qsub <- "qsub -l h_rt=48:00:00 -l buyin -pe omp 28 -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See remote execution. Also a lot of this code seems to be in the PEcAn::remote package for example start_qsub

@@ -6,9 +6,6 @@
##' @param var.names vector names of state variable names.
##' @param X a matrix of state variables. In this matrix rows represent ensembles, while columns show the variables for different sites.
##' @param localization.FUN This is the function that performs the localization of the Pf matrix and it returns a localized matrix with the same dimensions.
##' @param t not used
##' @param blocked.dis passed to `localization.FUN`
##' @param ... passed to `localization.FUN`
Copy link
Member

@infotroph infotroph Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is undoing a fix I made in #3346 (and apparently forgot to update Rcheck_reference.log, sorry! That's why the checks didn't complain about this being undone.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...But if you have better descriptions for the parameters, please do improve my wording!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants