Skip to content

Commit

Permalink
Merge pull request #3240 from DongchenZ/develop
Browse files Browse the repository at this point in the history
Bug fixes and improvements for the ERA5 and observation prep functions. And the automation of the rabbitmq job submission.
  • Loading branch information
mdietze authored Feb 14, 2024
2 parents 3664f75 + 79d7e28 commit 98d0703
Show file tree
Hide file tree
Showing 33 changed files with 669 additions and 410 deletions.
6 changes: 3 additions & 3 deletions Makefile.depends
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ $(call depends,base/qaqc): | .install/base/db .install/base/logger .install/base
$(call depends,base/remote): | .install/base/logger
$(call depends,base/settings): | .install/base/db .install/base/logger .install/base/remote .install/base/utils
$(call depends,base/utils): | .install/base/logger
$(call depends,base/visualization): | .install/base/db .install/base/logger .install/base/utils
$(call depends,base/visualization): | .install/base/logger
$(call depends,base/workflow): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/modules/data.atmosphere .install/modules/data.land .install/modules/uncertainty
$(call depends,models/basgra): | .install/base/logger .install/base/utils .install/modules/data.atmosphere
$(call depends,models/biocro): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/modules/data.atmosphere .install/modules/data.land
Expand All @@ -17,7 +17,7 @@ $(call depends,models/ed): | .install/base/logger .install/base/remote .install/
$(call depends,models/fates): | .install/base/logger .install/base/remote .install/base/utils
$(call depends,models/gday): | .install/base/logger .install/base/remote .install/base/utils
$(call depends,models/jules): | .install/base/logger .install/base/remote .install/base/utils .install/modules/data.atmosphere
$(call depends,models/ldndc): | .install/base/db .install/base/logger .install/base/remote .install/base/utils .install/modules/data.atmosphere
$(call depends,models/ldndc): | .install/base/logger .install/base/remote .install/base/utils .install/modules/data.atmosphere
$(call depends,models/linkages): | .install/base/logger .install/base/remote .install/base/utils .install/modules/data.atmosphere
$(call depends,models/lpjguess): | .install/base/logger .install/base/remote .install/base/utils
$(call depends,models/maat): | .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/modules/data.atmosphere
Expand All @@ -31,7 +31,7 @@ $(call depends,modules/allometry): | .install/base/db
$(call depends,modules/assim.batch): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/base/workflow .install/modules/benchmark .install/modules/emulator .install/modules/meta.analysis .install/modules/uncertainty
$(call depends,modules/assim.sequential): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/workflow .install/modules/benchmark .install/modules/data.remote .install/modules/uncertainty
$(call depends,modules/benchmark): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/modules/data.land
$(call depends,modules/data.atmosphere): | .install/base/db .install/base/logger .install/base/remote .install/base/utils
$(call depends,modules/data.atmosphere): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils
$(call depends,modules/data.hydrology): | .install/base/logger .install/base/utils
$(call depends,modules/data.land): | .install/base/db .install/base/logger .install/base/remote .install/base/settings .install/base/utils .install/base/visualization .install/modules/benchmark .install/modules/data.atmosphere
$(call depends,modules/data.remote): | .install/base/db .install/base/logger .install/base/remote .install/base/utils
Expand Down
2 changes: 2 additions & 0 deletions base/remote/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ Authors@R: c(person("David", "LeBauer", role = c("aut"),
Description: This package contains utilities for communicating with and executing code on local and remote hosts.
In particular, it has PEcAn-specific utilities for starting ecosystem model runs.
Imports:
dplyr,
foreach,
furrr,
PEcAn.logger,
httr,
jsonlite,
Expand Down
1 change: 1 addition & 0 deletions base/remote/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ export(start_qsub)
export(start_rabbitmq)
export(start_serial)
export(test_remote)
importFrom(dplyr,"%>%")
importFrom(foreach,"%dopar%")
134 changes: 104 additions & 30 deletions base/remote/R/qsub_parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#' @param files allow submit jobs based on job.sh file paths.
#' @param prefix used for detecting if jobs are completed or not.
#' @param sleep time (in second) that we wait each time for the jobs to be completed.
#' @param hybrid A Boolean argument decide the way of detecting job completion. If it's TRUE then we will detect both the outputted files and job ids on the server. If it's FALSE then we will only detect the job ids on the server.
#' @export
#' @examples
#' \dontrun{
Expand All @@ -12,7 +13,8 @@
#' @author Dongchen Zhang
#'
#' @importFrom foreach %dopar%
qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep = 10) {
#' @importFrom dplyr %>%
qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep = 10, hybrid = TRUE) {
if("try-error" %in% class(try(find.package("doSNOW"), silent = T))){
PEcAn.logger::logger.info("Package doSNOW is not installed! Please install it and rerun the function!")
return(0)
Expand All @@ -22,6 +24,8 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
folder <- NULL
run_list <- readLines(con = file.path(settings$rundir, "runs.txt"))
is_local <- PEcAn.remote::is.localhost(settings$host)
is_qsub <- !is.null(settings$host$qsub)
is_rabbitmq <- !is.null(settings$host$rabbitmq)
# loop through runs and either call start run, or launch job on remote machine
# parallel submit jobs
cores <- parallel::detectCores()
Expand All @@ -34,25 +38,32 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
PEcAn.logger::logger.info("Submitting jobs!")
# if we want to submit jobs separately.
if(is.null(files)){
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(files))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
if (is_qsub) {
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
} else if (is_rabbitmq) {
out <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
PEcAn.remote::start_rabbitmq(file.path(settings$host$rundir, run_id_string), settings$host$rabbitmq$uri, settings$host$rabbitmq$queue)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
}else{
# if we want to submit merged job files.
Expand Down Expand Up @@ -89,20 +100,83 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
PEcAn.logger::logger.info("Checking the qsub jobs status!")
PEcAn.logger::logger.info(paste0("Checking the file ", prefix))
## setup progressbar
pb <- utils::txtProgressBar(min = 0, max = length(unlist(run_list)), style = 3)
pbi <- 0
folders <- file.path(settings$host$outdir, run_list)
completed_folders <- c()
while (length(completed_folders) < length(folders)) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
L_folder <- length(folders)
pb <- utils::txtProgressBar(min = 0, max = L_folder, style = 3)
pbi <- 0
#here we not only detect if the target files are generated.
#we also detect if the jobs are still existed on the server.
if (is_rabbitmq) {
while ((L_folder - length(folders)) < L_folder) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)
}
} else {
L_jobid <- length(jobids)
pb1 <- utils::txtProgressBar(min = 0, max = L_jobid, style = 3)
pb1 <- 0
if (hybrid) {
while ((L_folder - length(folders)) < L_folder &
(L_jobid - length(jobids)) < L_jobid) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]

#or we can try detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)

pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
} else {
#special case that only detect the job ids on the server.
while ((L_jobid - length(jobids)) < L_jobid) {
#detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
Sys.sleep(sleep)
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
}
completed_folders <- unlist(completed_folders)
pbi <- length(completed_folders)
utils::setTxtProgressBar(pb, pbi)
}
close(pb)
parallel::stopCluster(cl)
Expand Down
10 changes: 9 additions & 1 deletion base/remote/man/qsub_parallel.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions base/visualization/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,8 @@ Description: The Predictive Ecosystem Carbon Analyzer (PEcAn) is a scientific
Imports:
data.table,
ggplot2,
maps,
ncdf4 (>= 1.15),
PEcAn.DB,
PEcAn.logger,
PEcAn.utils,
plyr (>= 1.8.4),
reshape2,
rlang,
Expand Down
4 changes: 2 additions & 2 deletions base/workflow/R/start_model_runs.R
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ start_model_runs <- function(settings, write = TRUE, stop.on.error = TRUE) {
}

if (job_finished) {

# TODO check output log
if (is_rabbitmq) {
data <- readLines(file.path(jobids[run], "rabbitmq.out"))
Expand Down Expand Up @@ -372,4 +372,4 @@ runModule_start_model_runs <- function(settings, stop.on.error=TRUE) {
PEcAn.logger::logger.severe(
"runModule_start_model_runs only works with Settings or MultiSettings")
}
} # runModule_start_model_runs
} # runModule_start_model_runs
14 changes: 9 additions & 5 deletions docker/depends/pecan_package_dependencies.csv
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@
"dplR","*","modules/data.land","Imports",FALSE
"dplyr","*","base/db","Imports",FALSE
"dplyr","*","base/qaqc","Imports",FALSE
"dplyr","*","base/remote","Imports",FALSE
"dplyr","*","base/utils","Imports",FALSE
"dplyr","*","base/workflow","Imports",FALSE
"dplyr","*","models/biocro","Imports",FALSE
"dplyr","*","models/ed","Imports",FALSE
"dplyr","*","models/ldndc","Imports",FALSE
"dplyr","*","models/stics","Imports",FALSE
"dplyr","*","modules/assim.batch","Imports",FALSE
"dplyr","*","modules/assim.sequential","Imports",FALSE
"dplyr","*","modules/benchmark","Imports",FALSE
"dplyr","*","modules/data.atmosphere","Imports",FALSE
Expand All @@ -69,8 +69,15 @@
"foreach","*","modules/data.remote","Imports",FALSE
"fs","*","base/db","Imports",FALSE
"fs","*","modules/data.land","Imports",FALSE
"furrr","*","base/remote","Imports",FALSE
"furrr","*","modules/assim.sequential","Imports",FALSE
"furrr","*","modules/data.atmosphere","Suggests",FALSE
"furrr","*","modules/data.land","Imports",FALSE
"furrr","*","modules/data.remote","Imports",FALSE
"future","*","modules/assim.sequential","Imports",FALSE
"future","*","modules/data.atmosphere","Suggests",FALSE
"future","*","modules/data.land","Imports",FALSE
"future","*","modules/data.remote","Imports",FALSE
"geonames","> 0.998","modules/data.atmosphere","Imports",FALSE
"getPass","*","base/remote","Suggests",FALSE
"ggmcmc","*","modules/meta.analysis","Suggests",FALSE
Expand Down Expand Up @@ -155,7 +162,6 @@
"magrittr","*","modules/data.atmosphere","Imports",FALSE
"magrittr","*","modules/data.land","Imports",FALSE
"magrittr","*","modules/data.remote","Imports",FALSE
"maps","*","base/visualization","Imports",FALSE
"markdown","*","modules/allometry","Suggests",FALSE
"markdown","*","modules/photosynthesis","Suggests",FALSE
"MASS","*","base/utils","Suggests",FALSE
Expand Down Expand Up @@ -262,10 +268,8 @@
"PEcAn.DB","*","base/all","Depends",TRUE
"PEcAn.DB","*","base/qaqc","Imports",TRUE
"PEcAn.DB","*","base/settings","Imports",TRUE
"PEcAn.DB","*","base/visualization","Imports",TRUE
"PEcAn.DB","*","base/workflow","Imports",TRUE
"PEcAn.DB","*","models/biocro","Suggests",TRUE
"PEcAn.DB","*","models/ldndc","Imports",TRUE
"PEcAn.DB","*","models/stics","Imports",TRUE
"PEcAn.DB","*","models/template","Imports",TRUE
"PEcAn.DB","*","modules/allometry","Imports",TRUE
Expand Down Expand Up @@ -361,6 +365,7 @@
"PEcAn.settings","*","modules/assim.batch","Imports",TRUE
"PEcAn.settings","*","modules/assim.sequential","Imports",TRUE
"PEcAn.settings","*","modules/benchmark","Imports",TRUE
"PEcAn.settings","*","modules/data.atmosphere","Suggests",TRUE
"PEcAn.settings","*","modules/data.land","Suggests",TRUE
"PEcAn.settings","*","modules/meta.analysis","Imports",TRUE
"PEcAn.settings","*","modules/uncertainty","Imports",TRUE
Expand All @@ -374,7 +379,6 @@
"PEcAn.utils","*","base/db","Imports",TRUE
"PEcAn.utils","*","base/qaqc","Suggests",TRUE
"PEcAn.utils","*","base/settings","Imports",TRUE
"PEcAn.utils","*","base/visualization","Imports",TRUE
"PEcAn.utils","*","base/workflow","Imports",TRUE
"PEcAn.utils","*","models/biocro","Imports",TRUE
"PEcAn.utils","*","models/clm45","Depends",TRUE
Expand Down
1 change: 0 additions & 1 deletion models/ldndc/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ Authors@R: c(person("Henri", "Kajasilta", role = c("aut", "cre"),
email = "[email protected]"))
Description: This module provides functions to link the (LDNDC) to PEcAn.
Imports:
PEcAn.DB,
PEcAn.logger,
PEcAn.utils (>= 1.4.8),
dplyr,
Expand Down
22 changes: 21 additions & 1 deletion models/sipnet/R/write.configs.SIPNET.R
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,29 @@ write.config.SIPNET <- function(defaults, trait.values, settings, run.id, inputs
cdosetup <- paste(cdosetup, sep = "\n", paste(settings$host$cdosetup, collapse = "\n"))
}


hostteardown <- ""
if (!is.null(settings$model$postrun)) {
hostteardown <- paste(hostteardown, sep = "\n", paste(settings$model$postrun, collapse = "\n"))
}
if (!is.null(settings$host$postrun)) {
hostteardown <- paste(hostteardown, sep = "\n", paste(settings$host$postrun, collapse = "\n"))
}

# create rabbitmq specific setup.
cpruncmd <- cpoutcmd <- rmoutdircmd <- rmrundircmd <- ""
if (!is.null(settings$host$rabbitmq)) {
#rsync cmd from remote to local host.
cpruncmd <- gsub("@OUTDIR@", settings$host$rundir, settings$host$rabbitmq$cpfcmd)
cpruncmd <- gsub("@OUTFOLDER@", rundir, cpruncmd)

cpoutcmd <- gsub("@OUTDIR@", settings$host$outdir, settings$host$rabbitmq$cpfcmd)
cpoutcmd <- gsub("@OUTFOLDER@", outdir, cpoutcmd)

#delete files within rundir and outdir.
rmoutdircmd <- paste("rm", file.path(outdir, "*"))
rmrundircmd <- paste("rm", file.path(rundir, "*"))
}

# create job.sh
jobsh <- gsub("@HOST_SETUP@", hostsetup, jobsh)
jobsh <- gsub("@CDO_SETUP@", cdosetup, jobsh)
Expand All @@ -87,6 +102,11 @@ write.config.SIPNET <- function(defaults, trait.values, settings, run.id, inputs
jobsh <- gsub("@BINARY@", settings$model$binary, jobsh)
jobsh <- gsub("@REVISION@", settings$model$revision, jobsh)

jobsh <- gsub("@CPRUNCMD@", cpruncmd, jobsh)
jobsh <- gsub("@CPOUTCMD@", cpoutcmd, jobsh)
jobsh <- gsub("@RMOUTDIRCMD@", rmoutdircmd, jobsh)
jobsh <- gsub("@RMRUNDIRCMD@", rmrundircmd, jobsh)

if(is.null(settings$state.data.assimilation$NC.Prefix)){
settings$state.data.assimilation$NC.Prefix <- "sipnet.out"
}
Expand Down
Loading

0 comments on commit 98d0703

Please sign in to comment.