Skip to content

Commit

Permalink
Merge branch 'develop' into check-on-newer-R
Browse files Browse the repository at this point in the history
  • Loading branch information
infotroph authored Feb 14, 2024
2 parents 5647ae8 + 98d0703 commit 64c8877
Show file tree
Hide file tree
Showing 31 changed files with 668 additions and 400 deletions.
2 changes: 1 addition & 1 deletion Makefile.depends
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ $(call depends,modules/allometry): | .install/base/db
$(call depends,modules/assim.batch): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/base/workflow .install/modules/benchmark .install/modules/emulator .install/modules/meta.analysis .install/modules/uncertainty
$(call depends,modules/assim.sequential): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/workflow .install/modules/benchmark .install/modules/data.remote .install/modules/uncertainty
$(call depends,modules/benchmark): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/modules/data.land
$(call depends,modules/data.atmosphere): | .install/base/db .install/base/logger .install/base/remote .install/base/utils
$(call depends,modules/data.atmosphere): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils
$(call depends,modules/data.hydrology): | .install/base/logger .install/base/utils
$(call depends,modules/data.land): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/base/visualization .install/modules/benchmark .install/modules/data.atmosphere
$(call depends,modules/data.remote): | .install/base/db .install/base/logger .install/base/remote .install/base/utils
Expand Down
2 changes: 2 additions & 0 deletions base/remote/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ Authors@R: c(person("David", "LeBauer", role = c("aut"),
Description: This package contains utilities for communicating with and executing code on local and remote hosts.
In particular, it has PEcAn-specific utilities for starting ecosystem model runs.
Imports:
dplyr,
foreach,
furrr,
PEcAn.logger,
httr,
jsonlite,
Expand Down
1 change: 1 addition & 0 deletions base/remote/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ export(start_qsub)
export(start_rabbitmq)
export(start_serial)
export(test_remote)
importFrom(dplyr,"%>%")
importFrom(foreach,"%dopar%")
134 changes: 104 additions & 30 deletions base/remote/R/qsub_parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#' @param files allow submit jobs based on job.sh file paths.
#' @param prefix used for detecting if jobs are completed or not.
#' @param sleep time (in second) that we wait each time for the jobs to be completed.
#' @param hybrid A Boolean argument decide the way of detecting job completion. If it's TRUE then we will detect both the outputted files and job ids on the server. If it's FALSE then we will only detect the job ids on the server.
#' @export
#' @examples
#' \dontrun{
Expand All @@ -12,7 +13,8 @@
#' @author Dongchen Zhang
#'
#' @importFrom foreach %dopar%
qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep = 10) {
#' @importFrom dplyr %>%
qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep = 10, hybrid = TRUE) {
if("try-error" %in% class(try(find.package("doSNOW"), silent = T))){
PEcAn.logger::logger.info("Package doSNOW is not installed! Please install it and rerun the function!")
return(0)
Expand All @@ -22,6 +24,8 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
folder <- NULL
run_list <- readLines(con = file.path(settings$rundir, "runs.txt"))
is_local <- PEcAn.remote::is.localhost(settings$host)
is_qsub <- !is.null(settings$host$qsub)
is_rabbitmq <- !is.null(settings$host$rabbitmq)
# loop through runs and either call start run, or launch job on remote machine
# parallel submit jobs
cores <- parallel::detectCores()
Expand All @@ -34,25 +38,32 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
PEcAn.logger::logger.info("Submitting jobs!")
# if we want to submit jobs separately.
if(is.null(files)){
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(files))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
if (is_qsub) {
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
} else if (is_rabbitmq) {
out <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
PEcAn.remote::start_rabbitmq(file.path(settings$host$rundir, run_id_string), settings$host$rabbitmq$uri, settings$host$rabbitmq$queue)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
}else{
# if we want to submit merged job files.
Expand Down Expand Up @@ -89,20 +100,83 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
PEcAn.logger::logger.info("Checking the qsub jobs status!")
PEcAn.logger::logger.info(paste0("Checking the file ", prefix))
## setup progressbar
pb <- utils::txtProgressBar(min = 0, max = length(unlist(run_list)), style = 3)
pbi <- 0
folders <- file.path(settings$host$outdir, run_list)
completed_folders <- c()
while (length(completed_folders) < length(folders)) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
L_folder <- length(folders)
pb <- utils::txtProgressBar(min = 0, max = L_folder, style = 3)
pbi <- 0
#here we not only detect if the target files are generated.
#we also detect if the jobs are still existed on the server.
if (is_rabbitmq) {
while ((L_folder - length(folders)) < L_folder) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)
}
} else {
L_jobid <- length(jobids)
pb1 <- utils::txtProgressBar(min = 0, max = L_jobid, style = 3)
pb1 <- 0
if (hybrid) {
while ((L_folder - length(folders)) < L_folder &
(L_jobid - length(jobids)) < L_jobid) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]

#or we can try detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)

pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
} else {
#special case that only detect the job ids on the server.
while ((L_jobid - length(jobids)) < L_jobid) {
#detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
Sys.sleep(sleep)
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
}
completed_folders <- unlist(completed_folders)
pbi <- length(completed_folders)
utils::setTxtProgressBar(pb, pbi)
}
close(pb)
parallel::stopCluster(cl)
Expand Down
10 changes: 9 additions & 1 deletion base/remote/man/qsub_parallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions base/workflow/R/start_model_runs.R
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ start_model_runs <- function(settings, write = TRUE, stop.on.error = TRUE) {
}

if (job_finished) {

# TODO check output log
if (is_rabbitmq) {
data <- readLines(file.path(jobids[run], "rabbitmq.out"))
Expand Down Expand Up @@ -372,4 +372,4 @@ runModule_start_model_runs <- function(settings, stop.on.error=TRUE) {
PEcAn.logger::logger.severe(
"runModule_start_model_runs only works with Settings or MultiSettings")
}
} # runModule_start_model_runs
} # runModule_start_model_runs
10 changes: 9 additions & 1 deletion docker/depends/pecan_package_dependencies.csv
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
"doSNOW","*","base/remote","Suggests",FALSE
"dplR","*","modules/data.land","Imports",FALSE
"dplyr","*","base/qaqc","Imports",FALSE
"dplyr","*","base/remote","Imports",FALSE
"dplyr","*","base/utils","Imports",FALSE
"dplyr","*","base/workflow","Imports",FALSE
"dplyr","*","models/biocro","Imports",FALSE
"dplyr","*","models/ed","Imports",FALSE
"dplyr","*","models/ldndc","Imports",FALSE
"dplyr","*","models/stics","Imports",FALSE
"dplyr","*","modules/assim.batch","Imports",FALSE
"dplyr","*","modules/assim.sequential","Imports",FALSE
"dplyr","*","modules/benchmark","Imports",FALSE
"dplyr","*","modules/data.atmosphere","Imports",FALSE
Expand All @@ -69,8 +69,15 @@
"foreach","*","modules/data.remote","Imports",FALSE
"fs","*","base/db","Imports",FALSE
"fs","*","modules/data.land","Imports",FALSE
"furrr","*","base/remote","Imports",FALSE
"furrr","*","modules/assim.sequential","Imports",FALSE
"furrr","*","modules/data.atmosphere","Suggests",FALSE
"furrr","*","modules/data.land","Imports",FALSE
"furrr","*","modules/data.remote","Imports",FALSE
"future","*","modules/assim.sequential","Imports",FALSE
"future","*","modules/data.atmosphere","Suggests",FALSE
"future","*","modules/data.land","Imports",FALSE
"future","*","modules/data.remote","Imports",FALSE
"geonames","> 0.998","modules/data.atmosphere","Imports",FALSE
"getPass","*","base/remote","Suggests",FALSE
"ggmcmc","*","modules/meta.analysis","Suggests",FALSE
Expand Down Expand Up @@ -358,6 +365,7 @@
"PEcAn.settings","*","modules/assim.batch","Imports",TRUE
"PEcAn.settings","*","modules/assim.sequential","Imports",TRUE
"PEcAn.settings","*","modules/benchmark","Imports",TRUE
"PEcAn.settings","*","modules/data.atmosphere","Suggests",TRUE
"PEcAn.settings","*","modules/data.land","Suggests",TRUE
"PEcAn.settings","*","modules/meta.analysis","Imports",TRUE
"PEcAn.settings","*","modules/uncertainty","Imports",TRUE
Expand Down
22 changes: 21 additions & 1 deletion models/sipnet/R/write.configs.SIPNET.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,29 @@ write.config.SIPNET <- function(defaults, trait.values, settings, run.id, inputs
cdosetup <- paste(cdosetup, sep = "\n", paste(settings$host$cdosetup, collapse = "\n"))
}


hostteardown <- ""
if (!is.null(settings$model$postrun)) {
hostteardown <- paste(hostteardown, sep = "\n", paste(settings$model$postrun, collapse = "\n"))
}
if (!is.null(settings$host$postrun)) {
hostteardown <- paste(hostteardown, sep = "\n", paste(settings$host$postrun, collapse = "\n"))
}

# create rabbitmq specific setup.
cpruncmd <- cpoutcmd <- rmoutdircmd <- rmrundircmd <- ""
if (!is.null(settings$host$rabbitmq)) {
#rsync cmd from remote to local host.
cpruncmd <- gsub("@OUTDIR@", settings$host$rundir, settings$host$rabbitmq$cpfcmd)
cpruncmd <- gsub("@OUTFOLDER@", rundir, cpruncmd)

cpoutcmd <- gsub("@OUTDIR@", settings$host$outdir, settings$host$rabbitmq$cpfcmd)
cpoutcmd <- gsub("@OUTFOLDER@", outdir, cpoutcmd)

#delete files within rundir and outdir.
rmoutdircmd <- paste("rm", file.path(outdir, "*"))
rmrundircmd <- paste("rm", file.path(rundir, "*"))
}

# create job.sh
jobsh <- gsub("@HOST_SETUP@", hostsetup, jobsh)
jobsh <- gsub("@CDO_SETUP@", cdosetup, jobsh)
Expand All @@ -87,6 +102,11 @@ write.config.SIPNET <- function(defaults, trait.values, settings, run.id, inputs
jobsh <- gsub("@BINARY@", settings$model$binary, jobsh)
jobsh <- gsub("@REVISION@", settings$model$revision, jobsh)

jobsh <- gsub("@CPRUNCMD@", cpruncmd, jobsh)
jobsh <- gsub("@CPOUTCMD@", cpoutcmd, jobsh)
jobsh <- gsub("@RMOUTDIRCMD@", rmoutdircmd, jobsh)
jobsh <- gsub("@RMRUNDIRCMD@", rmrundircmd, jobsh)

if(is.null(settings$state.data.assimilation$NC.Prefix)){
settings$state.data.assimilation$NC.Prefix <- "sipnet.out"
}
Expand Down
25 changes: 14 additions & 11 deletions models/sipnet/R/write_restart.SIPNET.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
##' @param RENAME flag to either rename output file or not
##' @param new.params list of parameters to convert between different states
##' @param inputs list of model inputs to use in write.configs.SIPNET
##' @param verbose decide if we want to print the outputs.
##'
##' @description Write restart files for SIPNET. WARNING: Some variables produce illegal values < 0 and have been hardcoded to correct these values!!
##'
##' @return NONE
##' @export
write_restart.SIPNET <- function(outdir, runid, start.time, stop.time, settings, new.state,
RENAME = TRUE, new.params = FALSE, inputs) {
RENAME = TRUE, new.params = FALSE, inputs, verbose = FALSE) {

rundir <- settings$host$rundir
variables <- colnames(new.state)
Expand Down Expand Up @@ -58,16 +59,16 @@ write_restart.SIPNET <- function(outdir, runid, start.time, stop.time, settings,
analysis.save[[length(analysis.save) + 1]] <- PEcAn.utils::ud_convert(new.state$NPP, "kg/m^2/s", "Mg/ha/yr") #*unit.conv -> Mg/ha/yr
names(analysis.save[[length(analysis.save)]]) <- c("NPP")
}

if ("NEE" %in% variables) {
analysis.save[[length(analysis.save) + 1]] <- new.state$NEE
names(analysis.save[[length(analysis.save)]]) <- c("NEE")
}

if ("AbvGrndWood" %in% variables) {
AbvGrndWood <- PEcAn.utils::ud_convert(new.state$AbvGrndWood, "Mg/ha", "g/m^2")
analysis.save[[length(analysis.save) + 1]] <- AbvGrndWood
names(analysis.save[[length(analysis.save)]]) <- c("AbvGrndWood")
if ("AbvGrndWood" %in% variables) {
AbvGrndWood <- PEcAn.utils::ud_convert(new.state$AbvGrndWood, "Mg/ha", "g/m^2")
analysis.save[[length(analysis.save) + 1]] <- AbvGrndWood
names(analysis.save[[length(analysis.save)]]) <- c("AbvGrndWood")
}

if ("LeafC" %in% variables) {
Expand Down Expand Up @@ -105,7 +106,7 @@ write_restart.SIPNET <- function(outdir, runid, start.time, stop.time, settings,
if (analysis.save[[length(analysis.save)]] < 0) analysis.save[[length(analysis.save)]] <- 0
names(analysis.save[[length(analysis.save)]]) <- c("SWE")
}

if ("LAI" %in% variables) {
analysis.save[[length(analysis.save) + 1]] <- new.state$LAI
if (new.state$LAI < 0) analysis.save[[length(analysis.save)]] <- 0
Expand All @@ -119,14 +120,16 @@ write_restart.SIPNET <- function(outdir, runid, start.time, stop.time, settings,
}else{
analysis.save.mat <- NULL
}

print(runid %>% as.character())
print(analysis.save.mat)

if (verbose) {
print(runid %>% as.character())
print(analysis.save.mat)
}
do.call(write.config.SIPNET, args = list(defaults = NULL,
trait.values = new.params,
settings = settings,
run.id = runid,
inputs = inputs,
IC = analysis.save.mat))
print(runid)
} # write_restart.SIPNET
} # write_restart.SIPNET
Loading

0 comments on commit 64c8877

Please sign in to comment.