Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved the current SDA workflow to reach the North American runs with 6400 sites. #3340

Open
wants to merge 64 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
81d6c05
add the template for the MCMC qsub job.
Jul 22, 2024
39a886b
added namespaces
Jul 22, 2024
cead5d5
added qsub parallel analysis functions.
Jul 22, 2024
6ce1b39
Merge branch 'develop' of https://github.com/DongchenZ/pecan into dev…
Jul 22, 2024
dc06316
include qsub analysis part into the main function.
Jul 23, 2024
7dc12d3
Update namespace.
Jul 23, 2024
a538129
Update dependencies.
Jul 23, 2024
4c172a1
Update dependencies.
Jul 23, 2024
c7d0ca9
Update namespace.
Jul 23, 2024
f6f1c2a
Apply suggestions.
Jul 23, 2024
cc025a4
Update dependency.
Jul 24, 2024
fecb781
Update dependency.
Jul 24, 2024
b407404
Pull from main branch.
Sep 18, 2024
e9edbd6
Removing the job file template.
Sep 18, 2024
9346ed3
automatic add-up.
Sep 18, 2024
08b93dc
Update the qsub option to submit analysis jobs.
Sep 18, 2024
3b1a250
Using foreach to create Y and R.
Sep 18, 2024
0fc424a
Resolve dimension issue.
Sep 18, 2024
b29173f
Add functions for initializing qsub job submissions during the genera…
Sep 18, 2024
bb8b142
Create a new workflow solely for the North American SDA runs.
Sep 18, 2024
75f02eb
Add qsub functions for parallel SDA workflows.
Sep 18, 2024
b7f72b4
Upgrade the weight calculation function and fix some bugs.
Sep 18, 2024
70d6a90
Update the script for matching the current progress with NA SDA runs.
Sep 18, 2024
12fa426
Rd file.
Sep 18, 2024
cd6360e
Add Rd files.
Sep 18, 2024
98d8046
Remove hard-coded path.
Sep 18, 2024
1870d98
Adding SDA workflow for NA SDA runs.
Sep 18, 2024
abdc1f8
Update documentation associated with the SDA batch job submission fea…
Sep 19, 2024
0050826
Update doc.
Sep 19, 2024
5f1216e
Update dependency.
Sep 19, 2024
be9b047
Update dependency.
Sep 19, 2024
b51cc40
Update job completion detection.
Sep 20, 2024
8cc437e
Add the verbose argument to the function of checking qsub job complet…
Sep 20, 2024
ab56e79
Remove the library call from the functions.
Sep 20, 2024
bb71ff8
Add qsub statements to the creation of the settings in case you are u…
Sep 20, 2024
9ab9b08
Relocate qsub functions for the analysis part and optimize the functi…
Sep 20, 2024
4c0131d
Update the roxygen structure.
Sep 20, 2024
296b0d6
Update dependency.
Sep 20, 2024
daf3558
Update and fix namespaces.
Sep 20, 2024
86a3d16
Fix GitHub checks.
Sep 20, 2024
c43cba4
Fix function usage.
Sep 20, 2024
a683263
Replace read function.
Sep 20, 2024
e8ac7bc
Update modules/assim.sequential/R/Analysis_sda_block.R
infotroph Sep 21, 2024
0224572
Update modules/assim.sequential/DESCRIPTION
infotroph Sep 21, 2024
76f2b72
whitespace
infotroph Sep 21, 2024
345530f
Update roxygen text.
Sep 23, 2024
fcc3804
Revert "Update roxygen text."
Sep 23, 2024
9fa90a8
Merge branch 'PecanProject:develop' into develop
DongchenZ Sep 30, 2024
382e38d
Merge branch 'develop' of https://github.com/DongchenZ/pecan into dev…
Sep 30, 2024
2cdfea8
Update roxygen.
Sep 30, 2024
59c348d
Revert the changes.
Sep 30, 2024
9934011
Merge branch 'PecanProject:develop' into develop
DongchenZ Jan 13, 2025
7962ffd
Pass the printing when the job is finished.
Jan 13, 2025
0a25664
Bug fixes for the indexing when the scale is set to zero.
Jan 13, 2025
aa67032
Update the file deletion arguments.
Jan 13, 2025
6b5543e
Add the feature of exporting all analysis and forecast results.
Jan 13, 2025
54bb331
Reduce the job time limit to request nodes more efficiently. Also, up…
Jan 13, 2025
dd5f44b
Remove the usage of the site name and the feature of parallel extract…
Jan 13, 2025
b796bcd
Attempt to fix github check issue.
Jan 13, 2025
51cb8b5
Attempt to fix github check issue.
Jan 13, 2025
f1536d6
Resolve github check issue.
Jan 13, 2025
bf38bcb
Revert changes.
Jan 13, 2025
3e20018
Revert changes.
Jan 13, 2025
85d0b46
Revert changes.
Jan 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions base/remote/R/check_qsub_status.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@
#'
#' @param run run ID, as an integer
#' @param qstat (string) qstat command for checking job status
#' @param verbose Boolean: determine if you want to print out the progress, default is TRUE.
#' @inheritParams remote.execute.cmd
#'
#' @return `TRUE` if run is marked as DONE, otherwise FALSE.
#' @export
qsub_run_finished <- function(run, host, qstat) {
qsub_run_finished <- function(run, host, qstat, verbose = TRUE) {
if (is.na(run)) {
PEcAn.logger::logger.warn("Job", run, "encountered an error during submission.",
"NOTE that the job will be stamped as 'finished' in BETY.")
Expand All @@ -25,7 +26,9 @@ qsub_run_finished <- function(run, host, qstat) {
}

if (length(out) > 0 && substring(out, nchar(out) - 3) == "DONE") {
PEcAn.logger::logger.debug("Job", run, "for run", run_id_string, "finished")
if (verbose) {
PEcAn.logger::logger.debug("Job", run, "for run", run_id_string, "finished")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEcAn.logger already has built-in verbosity control via logger.setLevel() -- is a function-specific verbose flag needed here or is it enough for the user to set the logger level to something higher than debug so that this message isn't printed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I am unsure if I fully understand how logger.setLevel() works inside the PEcAn.logger package.
  2. For a small amount of jobs, I think it will still be helpful to have job info printed instead of creating a progress bar.

}
return(TRUE)
} else {
return(FALSE)
Expand Down
4 changes: 3 additions & 1 deletion base/remote/man/qsub_run_finished.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,28 @@ Here is an example of what does a multi-settings pecan xml file look like. The d
<start.date>2012-07-15</start.date>
<end.date>2021-07-15</end.date>
</Obs_Prep>
<batch.settings>
<write.config>
<cores>28</cores>
<folder.num>1</folder.num>
</write.config>
<analysis>
<cores>28</cores>
<folder.num>16</folder.num>
</analysis>
<met.split>
<cores>28</cores>
<folder.num>16</folder.num>
</met.split>
<sda.read>
<cores>28</cores>
<folder.num>16</folder.num>
</sda.read>
<general.job>
<cores>28</cores>
<folder.num>16</folder.num>
</general.job>
</batch.settings>
<spin.up>
<start.date>2004/01/01</start.date>
<end.date>2006/12/31</end.date>
Expand Down
23 changes: 23 additions & 0 deletions book_source/03_topical_pages/03_pecan_xml.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,28 @@ The following tags can be used for state data assimilation. More detailed inform
<start.date>2012-07-15</start.date>
<end.date>2021-07-15</end.date>
</Obs_Prep>
<batch.settings>
<write.config>
<cores>28</cores>
<folder.num>1</folder.num>
</write.config>
<analysis>
<cores>28</cores>
<folder.num>16</folder.num>
</analysis>
<met.split>
<cores>28</cores>
<folder.num>16</folder.num>
</met.split>
<sda.read>
<cores>28</cores>
<folder.num>16</folder.num>
</sda.read>
<general.job>
<cores>28</cores>
<folder.num>16</folder.num>
</general.job>
</batch.settings>
<spin.up>
<start.date>2004/01/01</start.date>
<end.date>2006/12/31</end.date>
Expand Down Expand Up @@ -853,6 +875,7 @@ The following tags can be used for state data assimilation. More detailed inform
* **_NOTE:_** If TRUE, you must also assign a vector of trait names to pick.trait.params within the sda.enkf function.
* **state.variable** : [required] State variable that is to be assimilated (in PEcAn standard format, with pre-specified variable name, unit, and range). Four variables can be assimilated so far: including Aboveground biomass (AbvGrndWood), LAI, SoilMoistFrac, and Soil carbon (TotSoilCarb).
* **Obs_Prep** : [required] This section will be handled through the SDA_Obs_Assembler function, if you want to proceed with this function, this section is required.
* **batch.settings** : [optional] This section contains configurations of computation resources to be allocated for each SDA procedure (e.g., the number of CPUs and the number of jobs to be submitted to the cluster). Procedures include splitting meteorology files, writing configuration files, reading SDA outputs, running Bayesian MCMC analysis part, and removing files (e.g., removing NC files after the first SDA run). This configuration will significantly improve the computation efficiency especially when the number of sites goes crazy (e.g., North America runs with 6400 sites), and meanwhile maintaining the minimum usage of memory. The `sda.enkf_Multisite` function will be used if this section is not specified, otherwise the `sda.enkf_NorthAmerica` function will be used.
* **spin.up** : [required] start.date and end.date for model spin up.
* **_NOTE:_** start.date and end.date are distinct from values set in the run tag because spin up can be done over a subset of the run.
* **forecast.time.step** : [optional] start.date and end.date for model spin up.
Expand Down
4 changes: 3 additions & 1 deletion docker/depends/pecan_package_dependencies.csv
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"doParallel","*","modules/data.atmosphere","Suggests",FALSE
"doParallel","*","modules/data.remote","Imports",FALSE
"doSNOW","*","base/remote","Suggests",FALSE
"doSNOW","*","modules/assim.sequential","Suggests",FALSE
"dplR","*","modules/data.land","Imports",FALSE
"dplyr","*","base/qaqc","Imports",FALSE
"dplyr","*","base/remote","Imports",FALSE
Expand All @@ -62,6 +63,7 @@
"emdbook","*","modules/assim.sequential","Suggests",FALSE
"exactextractr","*","modules/assim.sequential","Suggests",FALSE
"foreach","*","base/remote","Imports",FALSE
"foreach","*","modules/assim.sequential","Imports",FALSE
"foreach","*","modules/data.atmosphere","Suggests",FALSE
"foreach","*","modules/data.remote","Imports",FALSE
"fs","*","base/db","Imports",FALSE
Expand All @@ -83,7 +85,7 @@
"ggmcmc","*","modules/meta.analysis","Suggests",FALSE
"ggplot2","*","base/utils","Suggests",FALSE
"ggplot2","*","base/visualization","Imports",FALSE
"ggplot2","*","modules/assim.sequential","Imports",FALSE
"ggplot2","*","modules/assim.sequential","Suggests",FALSE
"ggplot2","*","modules/benchmark","Imports",FALSE
"ggplot2","*","modules/data.atmosphere","Imports",FALSE
"ggplot2","*","modules/data.remote","Suggests",FALSE
Expand Down
6 changes: 4 additions & 2 deletions modules/assim.sequential/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ Description: The Predictive Ecosystem Carbon Analyzer (PEcAn) is a scientific
Imports:
coda,
dplyr,
foreach,
furrr,
future,
ggplot2,
lubridate (>= 1.6.0),
magrittr,
Matrix,
Expand All @@ -32,7 +32,9 @@ Imports:
stringr
Suggests:
corrplot,
doSNOW,
exactextractr,
ggplot2,
ggrepel,
emdbook,
glue,
Expand Down Expand Up @@ -65,4 +67,4 @@ Suggests:
License: BSD_3_clause + file LICENSE
Copyright: Authors
Encoding: UTF-8
RoxygenNote: 7.3.2
RoxygenNote: 7.3.2
infotroph marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions modules/assim.sequential/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export(sampler_toggle)
export(sda.enkf)
export(sda.enkf.multisite)
export(sda.enkf.original)
export(sda.enkf_NorthAmerica)
export(sda_weights_site)
export(simple.local)
export(tobit.model)
Expand All @@ -63,6 +64,7 @@ import(furrr)
import(lubridate)
import(nimble)
importFrom(dplyr,"%>%")
importFrom(foreach,"%dopar%")
infotroph marked this conversation as resolved.
Show resolved Hide resolved
importFrom(lubridate,"%m+%")
importFrom(magrittr,"%>%")
importFrom(rlang,.data)
32 changes: 24 additions & 8 deletions modules/assim.sequential/R/Analysis_sda_block.R
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov,

#parallel for loop over each block.
PEcAn.logger::logger.info(paste0("Running MCMC ", "for ", length(block.list.all[[t]]), " blocks"))
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
return(0)
if (!is.null(settings$state.data.assimilation$batch.settings$analysis)) {
if ("try-error" %in% class(try(block.list.all[[t]] <- qsub_analysis_submission(settings = settings, block.list = block.list.all[[t]])))) {
PEcAn.logger::logger.severe("Something wrong within the qsub_analysis_submission function.")
return(0)
}
} else {
if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
return(0)
}
}
PEcAn.logger::logger.info("Completed!")

Expand All @@ -77,7 +84,8 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov,
mu.a = V$mu.a,
Pa = V$Pa,
Y = Y,
R = R))
R = R,
analysis = V$analysis))
}

##' @title build.block.xy
Expand All @@ -104,7 +112,7 @@ build.block.xy <- function(settings, block.list.all, X, obs.mean, obs.cov, t) {
}
#grab basic arguments based on X.
site.ids <- unique(attributes(X)$Site)
var.names <- unique(attributes(X)$dimnames[[2]])
var.names <- unique(colnames(X))
mu.f <- colMeans(X)
Pf <- stats::cov(X)
if (length(diag(Pf)[which(diag(Pf)==0)]) > 0) {
Expand All @@ -120,7 +128,8 @@ build.block.xy <- function(settings, block.list.all, X, obs.mean, obs.cov, t) {
`rownames<-`(site.ids)
#Finding the distance between the sites
dis.matrix <- sp::spDists(site.locs, longlat = TRUE)
if (!is.null(settings$state.data.assimilation$Localization.FUN)) {
if (!is.null(settings$state.data.assimilation$Localization.FUN) &&
! as.numeric(settings$state.data.assimilation$scalef) == 0) {
Localization.FUN <- get(settings$state.data.assimilation$Localization.FUN)
#turn that into a blocked matrix format
blocked.dis <- block_matrix(dis.matrix %>% as.numeric(), rep(length(var.names), length(site.ids)))
Expand Down Expand Up @@ -430,7 +439,6 @@ MCMC_block_function <- function(block) {
conf$addSampler(target = samplerLists[[X.mod.ind]]$target, type = "ess",
control = list(propCov= block$data$pf, adaptScaleOnly = TRUE,
latents = "X", pfOptimizeNparticles = TRUE))

#add toggle Y sampler.
for (i in 1:block$constant$YN) {
conf$addSampler(paste0("y.censored[", i, "]"), 'toggle', control=list(type='RW'))
Expand Down Expand Up @@ -615,6 +623,7 @@ block.2.vector <- function (block.list, X, H) {
site.ids <- attributes(X)$Site
mu.f <- mu.a <- c()
Pf <- Pa <- matrix(0, length(site.ids), length(site.ids))
analysis <- X
for (L in block.list) {
ind <- c()
for (id in L$site.ids) {
Expand All @@ -623,6 +632,12 @@ block.2.vector <- function (block.list, X, H) {
#convert mu.f and pf
mu.a[ind] <- mu.f[ind] <- L$update$mufa
Pa[ind, ind] <- Pf[ind, ind] <- L$update$pfa
# MVN sample based on block.
sample <- as.data.frame(mvtnorm::rmvnorm(nrow(X),
L$update$mufa,
L$update$pfa,
method = "svd"))
analysis[,ind] <- sample
#convert mu.a and pa
ind <- intersect(ind, H$H.ind)
mu.a[ind] <- L$update$mua
Expand All @@ -631,5 +646,6 @@ block.2.vector <- function (block.list, X, H) {
return(list(mu.f = mu.f,
Pf = Pf,
mu.a = mu.a,
Pa = Pa))
Pa = Pa,
analysis = analysis))
}
72 changes: 47 additions & 25 deletions modules/assim.sequential/R/Multi_Site_Constructors.R
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
##' @param var.names vector names of state variable names.
##' @param X a matrix of state variables. In this matrix rows represent ensembles, while columns show the variables for different sites.
##' @param localization.FUN This is the function that performs the localization of the Pf matrix and it returns a localized matrix with the same dimensions.
##' @param t not used
##' @param blocked.dis passed to `localization.FUN`
##' @param ... passed to `localization.FUN`
Copy link
Member

@infotroph infotroph Sep 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is undoing a fix I made in #3346 (and apparently forgot to update Rcheck_reference.log, sorry! That's why the checks didn't complain about this being undone.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...But if you have better descriptions for the parameters, please do improve my wording!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

##' @description The argument X needs to have an attribute pointing the state variables to their corresponding site. This attribute needs to be called `Site`.
##' At the moment, the cov between state variables at blocks defining the cov between two sites are assumed zero.
##' @return It returns the var-cov matrix of state variables at multiple sites.
Expand All @@ -27,15 +24,15 @@ Contruct.Pf <- function(site.ids, var.names, X, localization.FUN=NULL, t=1, bloc
for (site in site.ids){
#let's find out where this cov (for the current site needs to go in the main cov matrix)
pos.in.matrix <- which(attr(X,"Site") %in% site)
#foreach site let's get the Xs
#foreach site let's get the Xs
pf.matrix [pos.in.matrix, pos.in.matrix] <- stats::cov( X [, pos.in.matrix] ,use="complete.obs")
}

# This is where we estimate the cov between state variables of different sites
#I put this into a sperate loop so we can have more control over it
site.cov.orders <- expand.grid(site.ids,site.ids) %>%
dplyr::filter( .data$Var1 != .data$Var2)

infotroph marked this conversation as resolved.
Show resolved Hide resolved
for (i in seq_len(nrow(site.cov.orders))){
# first we need to find out where to put it in the big matrix
rows.in.matrix <- which(attr(X,"Site") %in% site.cov.orders[i,1])
Expand All @@ -57,13 +54,13 @@ Contruct.Pf <- function(site.ids, var.names, X, localization.FUN=NULL, t=1, bloc

# adding labels to rownames and colnames
labelss <- paste0(rep(var.names, length(site.ids)) %>% as.character(),"(",
rep(site.ids, each=length(var.names)),")")
rep(site.ids, each=length(var.names)),")")

colnames(pf.matrix.out ) <-labelss
rownames(pf.matrix.out ) <-labelss

return(pf.matrix.out)

infotroph marked this conversation as resolved.
Show resolved Hide resolved
}

##' @title Construct.R
Expand All @@ -82,34 +79,59 @@ Contruct.Pf <- function(site.ids, var.names, X, localization.FUN=NULL, t=1, bloc
##' @export

Construct.R<-function(site.ids, var.names, obs.t.mean, obs.t.cov){

# foreach.
cores <- parallel::detectCores()
cl <- parallel::makeCluster(cores)
doSNOW::registerDoSNOW(cl)
#progress bar
pb <- utils::txtProgressBar(min=1, max=length(site.ids), style=3)
progress <- function(n) utils::setTxtProgressBar(pb, n)
opts <- list(progress=progress)

# keeps Hs of sites
site.specific.Rs <-list()
#
nsite <- length(site.ids)
#
nvariable <- length(var.names)
Y<-c()

for (site in site.ids){
choose <- sapply(var.names, agrep, x=names(obs.t.mean[[site]]), max=1, USE.NAMES = FALSE) %>% unlist
# if there is no obs for this site
if(length(choose) == 0){
next;
}else{
Y <- c(Y, unlist(obs.t.mean[[site]][choose]))
#collecting them
if (ncol(obs.t.mean[[site]]) > 1)
{
site.specific.Rs <- c(site.specific.Rs, list(as.matrix(obs.t.cov[[site]][choose,choose])))
} else {
site.specific.Rs <- c(site.specific.Rs, list(as.matrix(obs.t.cov[[site]][choose])))
}
# fix GitHub checks.
site <- NULL
res <- foreach::foreach(site = site.ids,
.packages=c("Kendall", "purrr"),
.options.snow=opts) %dopar% {
choose <- sapply(var.names, agrep, x=names(obs.t.mean[[site]]), max=1, USE.NAMES = FALSE) %>% unlist
# if there is no obs for this site
if(length(choose) == 0){
return(NA);
}else{
Y <- unlist(obs.t.mean[[site]][choose])
#collecting them
if (ncol(obs.t.mean[[site]]) > 1)
{
site.R <- list(as.matrix(obs.t.cov[[site]][choose,choose]))
} else {
site.R <- list(as.matrix(obs.t.cov[[site]][choose]))
}
}
return(list(site.R = site.R,
site.Y = Y,
choose = choose))
}
for (i in seq_along(site.ids)){
temp <- res[[i]]
if (is.na(temp)) {
next
} else {
Y <- c(Y, unlist(obs.t.mean[[site.ids[i]]][temp$choose]))
site.specific.Rs <- c(site.specific.Rs, temp$site.R)
}
}
#make block matrix out of our collection
R <- Matrix::bdiag(site.specific.Rs) %>% as.matrix()
}

# stop parallel.
parallel::stopCluster(cl)
foreach::registerDoSEQ()
return(list(Y=Y, R=R))
}

Expand Down
Loading
Loading