diff --git a/CHANGELOG.md b/CHANGELOG.md
index bcc702afb0e..23c984c7321 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -44,6 +44,7 @@ see if you need to change any of these:
- Added new features of the SDA function including: 1) allow user-defined free-run mode;
2) allow user-defined parallel mode for the qsub submission; 3) allow user-defined email option to report the progress.
- The analysis function now supports the parallelization of multi-chain MCMC sampling with the fully randomized inits function.
+- Added the new feature of the block-based SDA workflow, which supports the parallel computation.
### Fixed
diff --git a/base/remote/R/qsub_parallel.R b/base/remote/R/qsub_parallel.R
index aca657d1ba8..6b9826033fb 100644
--- a/base/remote/R/qsub_parallel.R
+++ b/base/remote/R/qsub_parallel.R
@@ -3,6 +3,7 @@
#' @param settings pecan settings object
#' @param files allow submit jobs based on job.sh file paths.
#' @param prefix used for detecting if jobs are completed or not.
+#' @param sleep time (in second) that we wait each time for the jobs to be completed.
#' @export
#' @examples
#' \dontrun{
@@ -11,7 +12,7 @@
#' @author Dongchen Zhang
#'
#' @importFrom foreach %dopar%
-qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out") {
+qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep = 10) {
if("try-error" %in% class(try(find.package("doSNOW"), silent = T))){
PEcAn.logger::logger.info("Package doSNOW is not installed! Please install it and rerun the function!")
return(0)
@@ -91,19 +92,17 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out") {
pb <- utils::txtProgressBar(min = 0, max = length(unlist(run_list)), style = 3)
pbi <- 0
folders <- file.path(settings$host$outdir, run_list)
- while (length(folders) > 0) {
- Sys.sleep(10)
- completed_folders <- foreach::foreach(folder = folders, settings = rep(settings, length(run_list))) %dopar% {
+ completed_folders <- c()
+ while (length(completed_folders) < length(folders)) {
+ Sys.sleep(sleep)
+ completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
}
- if(length(unlist(completed_folders)) > 0){
- Ind <- which(unlist(completed_folders) %in% folders)
- folders <- folders[-Ind]
- pbi <- pbi + length(completed_folders)
- utils::setTxtProgressBar(pb, pbi)
- }
+ completed_folders <- unlist(completed_folders)
+ pbi <- length(completed_folders)
+ utils::setTxtProgressBar(pb, pbi)
}
close(pb)
parallel::stopCluster(cl)
diff --git a/base/remote/man/qsub_parallel.Rd b/base/remote/man/qsub_parallel.Rd
index cbd2c6614d4..a7d2f8bd751 100644
--- a/base/remote/man/qsub_parallel.Rd
+++ b/base/remote/man/qsub_parallel.Rd
@@ -4,7 +4,7 @@
\alias{qsub_parallel}
\title{qsub_parallel}
\usage{
-qsub_parallel(settings, files = NULL, prefix = "sipnet.out")
+qsub_parallel(settings, files = NULL, prefix = "sipnet.out", sleep = 10)
}
\arguments{
\item{settings}{pecan settings object}
@@ -12,6 +12,8 @@ qsub_parallel(settings, files = NULL, prefix = "sipnet.out")
\item{files}{allow submit jobs based on job.sh file paths.}
\item{prefix}{used for detecting if jobs are completed or not.}
+
+\item{sleep}{time (in second) that we wait each time for the jobs to be completed.}
}
\description{
qsub_parallel
diff --git a/book_source/02_demos_tutorials_workflows/04_more_web_interface/02_hidden_analyses.Rmd b/book_source/02_demos_tutorials_workflows/04_more_web_interface/02_hidden_analyses.Rmd
index dd60e020fc6..9eaaed7a847 100644
--- a/book_source/02_demos_tutorials_workflows/04_more_web_interface/02_hidden_analyses.Rmd
+++ b/book_source/02_demos_tutorials_workflows/04_more_web_interface/02_hidden_analyses.Rmd
@@ -105,7 +105,7 @@ This is the main ensemble Kalman filter and generalized filter code. Originally,
* IC - (optional) initial condition matrix (dimensions: ensemble memeber # by state variables). Default is NULL.
-* Q - (optional) process covariance matrix (dimensions: state variable by state variables). Defualt is NULL.
+* Q - (optional) process covariance matrix (dimensions: state variable by state variables). Default is NULL.
#### State Data Assimilation Workflow
Before running sda.enkf, these tasks must be completed (in no particular order),
@@ -182,7 +182,7 @@ Create diagnostics
#### State Data Assimilation Tags Descriptions
-* **adjustment** : [optional] TRUE/FLASE flag for if ensembles needs to be adjusted based on weights estimated given their likelihood during analysis step. The defualt is TRUE for this flag.
+* **adjustment** : [optional] TRUE/FLASE flag for if ensembles needs to be adjusted based on weights estimated given their likelihood during analysis step. The Default is TRUE for this flag.
* **process.variance** : [optional] TRUE/FLASE flag for if process variance should be estimated (TRUE) or not (FALSE). If TRUE, a generalized ensemble filter will be used. If FALSE, an ensemble Kalman filter will be used. Default is FALSE. If you use the TRUE argument you can set three more optional tags to control the MCMCs built for the generalized esnsemble filter.
* **nitrGEF** : [optional] numeric defining the length of the MCMC chains.
* **nthin** : [optional] numeric defining thining length for the MCMC chains.
@@ -289,14 +289,50 @@ $n_{t+1} = \frac{\sum_{i=1}^{I}\sum_{j=1}^{J}\frac{v_{ijt}^{2}+v_{iit}v_{jjt}}{V
where we calculate the mean of the process covariance matrix $\left(\bar{\boldsymbol{Q}_{t}}\right)$ from the posterior samples at time t. Degrees of freedom for the Wishart are typically calculated element by element where $v_{ij}$ are the elements of $\boldsymbol{V}_{t}$. $I$ and $J$ index rows and columns of $\boldsymbol{V}$. Here, we calculate a mean number of degrees of freedom for $t+1$ by summing over all the elements of the scale matrix $\left(\boldsymbol{V}\right)$ and dividing by the count of those elements $\left(I\times J\right)$. We fit this model sequentially through time in the R computing environment using R package 'rjags.'
-Users have control over how they think is the best way to estimate $Q$. Our code will look for the tag `q.type` in the XML settings under `state.data.assimilation` which can take 3 values of Single, PFT or Site. If `q.type` is set to single then one value of process variance will be estimated across all different sites or PFTs. On the other hand, when q.type` is set to Site or PFT then a process variance will be estimated for each site or PFT at a cost of more time and computation power.
+Users have control over how they think is the best way to estimate $Q$. Our code will look for the tag `q.type` in the XML settings under `state.data.assimilation` which can take 4 values of Single, Site, Vector, or Wishart. If `q.type` is set to single then one value of process variance will be estimated across all different sites and variables. When `q.type` is set to Site then a process variance will be estimated for each siteat a cost of more time and computation power. When the `q.type` is set to Vector or Wishart then process errors for each variable of each site will be estimated and propagated through time, while the Wishart Q support the estimation of covariance between sites and variables through the MCMC sampling of wishart distributions, which further support the propagation of process error through not just time but space and variables.
#### Multi-site State data assimilation.
+`sda.enkf.multisite` is housed within: `/pecan/modules/assim.sequential/R`
+
+The 4-site tutorial is housed within: `~/pecan/modules/assim.sequential/inst/MultiSite-Exs`
+
+#### **sda.enkf.multisite.R Description**
`sda.enkf.multisite` function allows for assimilation of observed data at multiple sites at the same time. In order to run a multi-site SDA, one needs to send a multisettings pecan xml file to this function. This multisettings xml file needs to contain information required for running at least two sites under `run` tag. The code will automatically run the ensembles for all the sites and reformats the outputs matching the required formats for analysis step.
+#### **sda.enkf.multisite.R Arguments**
+* settings - (required) [State Data Assimilation Tags Example] settings object
+
+* obs.mean - (required) Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation means for each state variables of each site for each time point.
+
+* obs.cov - (required) Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point.
+
+* Q - (optional) Process covariance matrix given if there is no data to estimate it. Default is NULL.
+
+* restart - (optional) Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA. Default is NULL.
+
+* pre_enkf_params - (optional) Used for carrying out SDA with pre-existed `enkf.params`, in which the `Pf`, `aqq`, and `bqq` can be used for the analysis step. Default is NULL.
+
+* ensemble.samples - (optional) Pass ensemble.samples from outside, which are lists of calibrated parameters for different plant functional types. This also helps to avoid GitHub check issues. Default is NULL.
+
+* control - (optional) List of flags controlling the behavior of the SDA. Here is an example of the `control` list. The detailed explanation of them are shown inside the `sda.enkf.multisite` function in the `assim.sequential` package.
+```
+control=list(trace = TRUE,
+ TimeseriesPlot = FALSE,
+ debug = FALSE,
+ pause = FALSE,
+ Profiling = FALSE,
+ OutlierDetection=FALSE,
+ parallel_qsub = TRUE,
+ send_email = NULL,
+ keepNC = TRUE,
+ run_parallel = TRUE,
+ MCMC.args = NULL)
+```
-The observed mean and cov needs to be formatted as list of different dates with observations. For each element of this list also there needs to be a list with mean and cov matrices of different sites named by their siteid. In case that zero variance was estimated for a variable inside the obs.cov, the SDA code will automatically replace that with half of the minimum variance from other non-zero variables in that time step.
+#### **obs.mean and obs.cov Description**
+The observations are required for passing the time points into the SDA workflow, which should match the start and end date in the settings object. For the generations of obs.mean and obs.cov, please use the function `SDA_OBS_Assembler` inside the `assim.sequential` package. For the unconstrained runs, please specify the `free.run` flag as TRUE inside the `settings$state.data.assimilation$Obs_Prep` section. Otherwise, please specify the arguments that are needed for preparations of different observations (note that, the observation preparation functions currently only support `MODIS LAI`, `Landtrendr AGB`, `SMAP soil moisture`, and `SoilGrid soil organic carbon`). For the details about how to setup those arguments, please reference the `Create_Multi_settings.R` script inside `~/pecan/modules/assim.sequential/inst/MultiSite-Exs/SDA` directory.
+The observed mean and covariance need to be formatted as list of different dates with observations. For each element of this list also there needs to be a list with mean and cov matrices of different sites named by their siteid. In case that zero variance was estimated for a variable inside the obs.cov, the SDA code will automatically replace that with half of the minimum variance from other non-zero variables in that time step.
-This would look like something like this:
+Here are examples of the `obs.mean` and `obs.cov` for single time point, two sites, and two observations.
```
> obs.mean
@@ -324,34 +360,127 @@ $`2010/12/31`$`1000000651`
[1,] 15.2821691 0.513584319
[2,] 0.1213583 0.001162113
```
-An example of multi-settings pecan xml file also may look like below:
+#### Anlysis SDA workflow
+Before running the SDA analysis functions, the ensemble forecast results have to be generated, and arguments such as H matrix, MCMC arguments, and multi-site Y and R (by `Construct.R` function) have to be generated as well. Here are the workflows for three types of SDA analysis functions that we are currently used.
+
+* Decide which analysis function to be used. Here we have three options: 1) traditional ensemble Kalman Filter (`EnKF.MultiSite`) with analytical solution, within which the process error needs to be prescribed from outside (see `Q` arguments in the `sda.enkf.multisite` function); 2) generalized ensemble Kalman Filter (`GEF.MultiSite`); and 3) block-based generalized ensemble Kalman Filter (`analysis_sda_block`). The latter two methods support the feature of propagating process variance across space and time. To choose the analysis method 1, we need to set the `process.variance` as FALSE. Otherwise, if we set the `process.variance` as TRUE and provide the `q.type` as either `SINGLE` or `SITE` the method `GEF.MultiSite` will be used, and if we provide the `q.type` as either `vector` or `wishart` the method `analysis_sda_block` will be used. The explanations for different Q types can be found in the `The Generalized Ensemble Filter` section in this documentation. For the `analysis_sda_block` method, there is also a special case for complete or partial missing of observations.
+
+* If we decide to use `EnKF.MultiSite`, then the analysis results will be calculated based on equations.
+
+* If we decide to use `GEF.MultiSite`, then it will first do censoring process based on how you setup the `censored.data` flag within settings xml file. Then, if t equals to 1, we will first initialize the `aqq`, `bqq`, `aq`, and `bq` based on how you setup the `q.type` argument within settings xml file. After preparing the initial conditions (ICs), data, and constants for the `GEF.MultiSite.Nimble` nimble model, the MCMC sampling will happen afterwards. Finally, the process variance and the analysis results will be calculated and updated and returned to the `sda.enkf.multisite` function.
+
+* If we decide to use `analysis_sda_block` method, then if t equals 1, the workflow will first build blocks using the `matrix_network` function for the calculations for indexes of variables by networks of site groups based on the spatial scale (see `scalef` argument in the `Example of multi-settings pecan xml file` section) that we specify inside the `state.data.assimilation` section. Then, the workflow will execute the `build.block.xy` to automatically initialize the overall block-based MCMC lists (`block.list.all`) and fill in datasets (`mu.f`, `y.censored`, or `Pf`) for each block and for all time points to facilitate the process of passing arguments between scripts/functions. After that, the `MCMC_args` (see the explanations inside the roxygen structure of the `sda.enkf.multisite` function) will be specified either from the `control` list or by default (see below). Then, the workflow will update the process error using the `update_q` function. If t equals 1, it will initialize the arguments for the process error. Otherwise, it will grab the previously updated process error. After that, in the `MCMC_Init` function, it will create the initial conditions (ICs) for the MCMC sampling based on randomly sampled data. Then, the completed block-based arguments (`block.list.all`) will be used by the `MCMC_block_function` function under the parallel mode. Finally, the block-based results will be converted into long vectors and large matrices by the `block.2.vector` function and values such as `block.list.all`, `mu.f`, `mu.a`, `Pf`, `Pa` will be returned to the `sda.enkf.multisite` function.
+
+```
+MCMC.args <- list(niter = 1e5,
+ nthin = 10,
+ nchain = 3,
+ nburnin = 1e4)
+```
+
+* There is a special case for the `analysis_sda_block` method where NA values appear in the observations, which provides the opportunity for the estimations of process error without any observations. This special case currently is only working under restricted conditions when we set the `scalef` as 0 (that feature currently only works for isolated-site SDA runs) and turn on the `free.run` flag in the settings, which will then automatically fill in NA values to the observations by each site (see bellow).
+
+```
+
+ 0
+ TRUE
+
+```
+
+
+#### Example of multi-settings pecan xml file
+Here is an example of what does a multi-settings pecan xml file look like. The detailed explanation for the xml file can be found under the `Multi-Settings` section in the `03_pecan_xml.Rmd` documentation.
+
```
- FALSE
- TRUE
-
- 1000000040
- 1000013298
-
-
-
- GWBI
- KgC/m^2
- 0
- 9999
-
-
- AbvGrndWood
- KgC/m^2
- 0
- 9999
-
-
- 1960/01/01
- 2000/12/31
-
+ TRUE
+ 1
+ 1
+ FALSE
+ TRUE
+ FALSE
+ TRUE
+ FALSE
+ sipnet.out
+ SINGLE
+ FALSE
+ Local.support
+ 1
+ 5
+
+
+ AbvGrndWood
+ MgC/ha
+ 0
+ 9999
+
+
+ LAI
+
+ 0
+ 9999
+
+
+ SoilMoistFrac
+
+ 0
+ 100
+
+
+ TotSoilCarb
+ kg/m^2
+ 0
+ 9999
+
+
+
+
+ /projectnb/dietzelab/dongchen/Multi-site/download_500_sites/AGB
+ TRUE
+ TRUE
+
+ year
+ 1
+
+
+
+ 30
+ TRUE
+ TRUE
+
+ year
+ 1
+
+
+
+ 30
+ TRUE
+ FALSE
+
+ year
+ 1
+
+
+
+
+ year
+ 1
+
+
+ /projectnb/dietzelab/dongchen/All_NEON_SDA/test_OBS
+ 2012-07-15
+ 2021-07-15
+
+
+ 2004/01/01
+ 2006/12/31
+
+ 1
+ 2004/01/01
+ 2006/12/31
+
-1
diff --git a/book_source/03_topical_pages/03_pecan_xml.Rmd b/book_source/03_topical_pages/03_pecan_xml.Rmd
index 442c3189d37..30eb369ff51 100644
--- a/book_source/03_topical_pages/03_pecan_xml.Rmd
+++ b/book_source/03_topical_pages/03_pecan_xml.Rmd
@@ -750,6 +750,8 @@ The following tags can be used for state data assimilation. More detailed inform
```xml
TRUE
+ 1
+ 1
FALSE
TRUE
FALSE
@@ -757,6 +759,7 @@ The following tags can be used for state data assimilation. More detailed inform
FALSE
sipnet.out
SINGLE
+ FALSE
Local.support
1
5
@@ -835,6 +838,8 @@ The following tags can be used for state data assimilation. More detailed inform
```
* **process.variance** : [optional] TRUE/FLASE flag for if process variance should be estimated (TRUE) or not (FALSE). If TRUE, a generalized ensemble filter will be used. If FALSE, an ensemble Kalman filter will be used. Default is FALSE.
+* **aqq.Init** : [optional] The initial value of `aqq` used for estimate the Q distribution, the default value is 1 (note that, the `aqq.init` and `bqq.init` right now only work on the `VECTOR` q type, and we didn't account for the variabilities of them across sites or variables, meaning we initialize the `aqq` and `bqq` given single value).
+* **bqq.Init** : [optional] The initial value of `bqq` used for estimate the Q distribution, the default value is 1.
* **sample.parameters** : [optional] TRUE/FLASE flag for if parameters should be sampled for each ensemble member or not. This allows for more spread in the intial conditions of the forecast.
* **adjustment** : [optional] Bool variable decide if you want to adjust analysis results by the likelihood.
* **censored.data** : [optional] Bool variable decide if you want to do MCMC sampling for the forecast ensemble space, the default is FALSE.
@@ -842,6 +847,7 @@ The following tags can be used for state data assimilation. More detailed inform
* **NC.Overwrite** : [optional] Bool variable decide if you want to overwrite the previous netcdf file when there is a overlap in time, the default is FALSE.
* **NC.Prefix** : [optional] The prefix for the generation of the full-year netcdf file, the default is sipnet.out.
* **q.type** : [optional] The type of process variance that will be estimated, the default is SINGLE.
+* **free.run** : [optional] If it's a free run without any observations, the default is FALSE.
* **Localization.FUN** : [optional] The localization function name for the localization operation, the default is Local.support.
* **scalef** : [optional] The scale parameter used for the localization operation, the smaller the value is, the sites are more isolated.
* **chains** : [optional] The number of chains needed to be estimated during the MCMC sampling process.
diff --git a/models/sipnet/R/write.configs.SIPNET.R b/models/sipnet/R/write.configs.SIPNET.R
index 91c5224c9a0..f26cb01cad7 100755
--- a/models/sipnet/R/write.configs.SIPNET.R
+++ b/models/sipnet/R/write.configs.SIPNET.R
@@ -441,7 +441,12 @@ write.config.SIPNET <- function(defaults, trait.values, settings, run.id, inputs
plant_wood_vars <- c("AbvGrndWood", "abvGrndWoodFrac", "coarseRootFrac", "fineRootFrac")
if (all(plant_wood_vars %in% ic.names)) {
# reconstruct total wood C
- wood_total_C <- IC$AbvGrndWood / IC$abvGrndWoodFrac
+ if(IC$abvGrndWoodFrac < 0.05){
+ wood_total_C <- IC$AbvGrndWood
+ }else{
+ wood_total_C <- IC$AbvGrndWood / IC$abvGrndWoodFrac
+ }
+
#Sanity check
if (is.infinite(wood_total_C) | is.nan(wood_total_C) | wood_total_C < 0) {
wood_total_C <- 0
diff --git a/modules/assim.sequential/NAMESPACE b/modules/assim.sequential/NAMESPACE
index 6c567538bce..aa4d926ecb5 100644
--- a/modules/assim.sequential/NAMESPACE
+++ b/modules/assim.sequential/NAMESPACE
@@ -11,6 +11,7 @@ export(EnKF.MultiSite)
export(GEF)
export(GEF.MultiSite)
export(GEF.MultiSite.Nimble)
+export(GrabFillMatrix)
export(Local.support)
export(Obs.data.prepare.MultiSite)
export(Prep_OBS_SDA)
@@ -25,6 +26,7 @@ export(alr)
export(assessParams)
export(block_matrix)
export(conj_wt_wishart_sampler)
+export(construct_nimble_H)
export(dwtmnorm)
export(generate_colors_sda)
export(get_ensemble_weights)
@@ -32,6 +34,7 @@ export(hop_test)
export(interactive.plotting.sda)
export(inv.alr)
export(load_data_paleon_sda)
+export(matrix_network)
export(metSplit)
export(obs_timestep2timepoint)
export(outlier.detector.boxplot)
@@ -57,6 +60,7 @@ export(y_star_create)
import(furrr)
import(lubridate)
import(nimble)
+importFrom(dplyr,"%>%")
importFrom(lubridate,"%m+%")
importFrom(magrittr,"%>%")
importFrom(rlang,.data)
diff --git a/modules/assim.sequential/R/Analysis_sda_block.R b/modules/assim.sequential/R/Analysis_sda_block.R
new file mode 100644
index 00000000000..69ccbfeb79f
--- /dev/null
+++ b/modules/assim.sequential/R/Analysis_sda_block.R
@@ -0,0 +1,558 @@
+##' @title analysis_sda_block
+##' @name analysis_sda_block
+##' @author Dongchen Zhang
+##'
+##' @param settings pecan standard multi-site settings list.
+##' @param block.list.all Lists of forecast and analysis outputs for each time point of each block. If t=1, we initialize those outputs of each block with NULL from the `sda.enkf.multisite` function.
+##' @param X A matrix contains ensemble forecasts with the dimensions of `[ensemble number, site number * number of state variables]`. The columns are matched with the site.ids and state variable names of the inside the `FORECAST` object in the `sda.enkf.multisite` script.
+##' @param obs.mean Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation means for each state variables of each site for each time point.
+##' @param obs.cov Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point.
+##' @param t time point in format of YYYY-MM-DD.
+##' @param nt total length of time steps, corresponding to the `nt` variable in the `sda.enkf.multisite` function.
+##' @param MCMC.args arguments for the MCMC sampling, details can be found in the roxygen strucutre for control list in the `sda.enkf.multisite` function.
+##' @param block.list.all.pre pre-existed block.list.all object for passing the aqq and bqq to the current SDA run, the default is NULL. Details can be found in the roxygen structure for `pre_enkf_params` of the `sda.enkf.multisite` function
+##' @details This function will add data and constants into each block that are needed for the MCMC sampling.
+##'
+##' @description This function provides the block-based MCMC sampling approach.
+##'
+##' @return It returns the `build.block.xy` object and the analysis results.
+##' @importFrom dplyr %>%
+analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov, t, nt, MCMC.args, block.list.all.pre = NULL) {
+ #convert from vector values to block lists.
+ if ("try-error" %in% class(try(block.results <- build.block.xy(settings = settings,
+ block.list.all = block.list.all,
+ X = X,
+ obs.mean = obs.mean,
+ obs.cov = obs.cov,
+ t = t)))) {
+ PEcAn.logger::logger.severe("Something wrong within the build.block.xy function.")
+ return(0)
+ }
+ #grab block.list and H from the results.
+ block.list.all <- block.results[[1]]
+ H <- block.results[[2]]
+ Y <- block.results[[3]]
+ R <- block.results[[4]]
+
+ #update q.
+ if ("try-error" %in% class(try(block.list.all <- update_q(block.list.all, t, nt, aqq.Init = as.numeric(settings$state.data.assimilation$aqq.Init),
+ bqq.Init = as.numeric(settings$state.data.assimilation$bqq.Init),
+ MCMC_dat = NULL,
+ block.list.all.pre)))) {
+ PEcAn.logger::logger.severe("Something wrong within the update_q function.")
+ return(0)
+ }
+
+ #add initial conditions for the MCMC sampling.
+ if ("try-error" %in% class(try(block.list.all[[t]] <- MCMC_Init(block.list.all[[t]], X)))) {
+ PEcAn.logger::logger.severe("Something wrong within the MCMC_Init function.")
+ return(0)
+ }
+
+ #update MCMC args.
+ block.list.all[[t]] <- block.list.all[[t]] %>%
+ purrr::map(function(l){
+ l$MCMC <- MCMC.args
+ l
+ })
+
+ #parallel for loop over each block.
+ PEcAn.logger::logger.info(paste0("Running MCMC ", "for ", length(block.list.all[[t]]), " blocks"))
+ if ("try-error" %in% class(try(block.list.all[[t]] <- furrr::future_map(block.list.all[[t]], MCMC_block_function, .progress = T)))) {
+ PEcAn.logger::logger.severe("Something wrong within the MCMC_block_function function.")
+ return(0)
+ }
+ PEcAn.logger::logger.info("Completed!")
+
+ #convert from block lists to vector values.
+ if ("try-error" %in% class(try(V <- block.2.vector(block.list.all[[t]], X, H)))) {
+ PEcAn.logger::logger.severe("Something wrong within the block.2.vector function.")
+ return(0)
+ }
+
+ #return values
+ return(list(block.list.all = block.list.all,
+ mu.f = V$mu.f,
+ Pf = V$Pf,
+ mu.a = V$mu.a,
+ Pa = V$Pa,
+ Y = Y,
+ R = R))
+}
+
+##' @title build.block.xy
+##' @name build.block.xy
+##' @author Dongchen Zhang
+##'
+##' @param settings pecan standard multi-site settings list.
+##' @param block.list.all List contains nt empty sub-elements.
+##' @param X A matrix contains ensemble forecasts.
+##' @param obs.mean List of dataframe of observation means, named with observation datetime.
+##' @param obs.cov List of covariance matrices of state variables , named with observation datetime.
+##' @param t time point.
+##' @details This function will add data and constants into each block that are needed for the MCMC sampling.
+##'
+##' @description This function split long vector and covariance matrix into blocks corresponding to the localization.
+##'
+##' @return It returns the `build.block.xy` object with data and constants filled in.
+build.block.xy <- function(settings, block.list.all, X, obs.mean, obs.cov, t) {
+ #set q.type from settings.
+ if (settings$state.data.assimilation$q.type == "vector") {
+ q.type <- 3
+ } else if (settings$state.data.assimilation$q.type == "wishart") {
+ q.type <- 4
+ }
+
+ #grab basic arguments based on X.
+ site.ids <- unique(attributes(X)$Site)
+ var.names <- unique(attributes(X)$dimnames[[2]])
+ mu.f <- colMeans(X)
+ Pf <- stats::cov(X)
+ if (length(diag(Pf)[which(diag(Pf)==0)]) > 0) {
+ diag(Pf)[which(diag(Pf)==0)] <- min(diag(Pf)[which(diag(Pf) != 0)])/5 #fixing det(Pf)==0
+ PEcAn.logger::logger.warn("The zero variances in Pf is being replaced by one fifth of the minimum variance in those matrices respectively.")
+ }
+
+ #distance calculations and localization
+ site.locs <- settings$run %>%
+ purrr::map('site') %>%
+ purrr::map_dfr(~c(.x[['lon']],.x[['lat']]) %>% as.numeric)%>%
+ t %>%
+ `colnames<-`(c("Lon","Lat")) %>%
+ `rownames<-`(site.ids)
+ #Finding the distance between the sites
+ dis.matrix <- sp::spDists(site.locs, longlat = TRUE)
+ if (!is.null(settings$state.data.assimilation$Localization.FUN)) {
+ Localization.FUN <- get(settings$state.data.assimilation$Localization.FUN)
+ #turn that into a blocked matrix format
+ blocked.dis <- block_matrix(dis.matrix %>% as.numeric(), rep(length(var.names), length(site.ids)))
+ Pf <- Localization.FUN(Pf, blocked.dis, settings$state.data.assimilation$scalef %>% as.numeric())
+ }
+
+ #Handle observation
+ #if we do free run or the current obs.mean are all NULL.
+ if (as.logical(settings$state.data.assimilation$free.run) | all(is.null(unlist(obs.mean[[t]])))) {
+ obs.mean[[t]] <- vector("list", length(site.ids)) %>% `names<-`(site.ids)
+ obs.cov[[t]] <- vector("list", length(site.ids)) %>% `names<-`(site.ids)
+ H <- list(ind = seq_along(rep(var.names, length(site.ids))))
+ Y <- rep(NA, length(H$ind))
+ R <- diag(1, length(H$ind))
+ } else if (!as.logical(settings$state.data.assimilation$free.run) && all(is.null(unlist(obs.mean[[t]])))) {
+ PEcAn.logger::logger.error("Please set the settings$state.data.assimilation$free.run as TRUE if you don't have any observations!")
+ return(0)
+ } else {
+ Obs.cons <- Construct.R(site.ids, var.names, obs.mean[[t]], obs.cov[[t]])
+ Y <- Obs.cons$Y
+ R <- Obs.cons$R
+ if (length(Y) > 1) {
+ if (length(diag(R)[which(diag(R)==0)]) > 0) {
+ diag(R)[which(diag(R)==0)] <- min(diag(R)[which(diag(R) != 0)])/2
+ PEcAn.logger::logger.warn("The zero variances in R is being replaced by half of the minimum variance in those matrices respectively.")
+ }
+ }
+ #create matrix the describes the support for each observed state variable at time t
+ min_max <- settings$state.data.assimilation$state.variables %>%
+ purrr::map(function(state.variable){
+ c(as.numeric(state.variable$min_value),
+ as.numeric(state.variable$max_value))
+ }) %>% unlist() %>% as.vector() %>%
+ matrix(length(settings$state.data.assimilation$state.variables), 2, byrow = T) %>%
+ `rownames<-`(var.names)
+ #Create y.censored and y.ind
+ #describing if the obs are within the defined range.
+ y.ind <- y.censored <- c()
+ for (i in seq_along(Y)) {
+ if (Y[i] > min_max[names(Y[i]), 1]) {
+ y.ind[i] = 1; y.censored[i] = Y[i]
+ } else {y.ind[i] <- y.censored[i] <- 0}
+ }
+ #create H
+ H <- construct_nimble_H(site.ids = site.ids,
+ var.names = var.names,
+ obs.t = obs.mean[[t]],
+ pft.path = settings[[1]]$run$inputs$pft.site$path,
+ by = "block_pft_var")
+ }
+ #observation number per site
+ obs_per_site <- obs.mean[[t]] %>%
+ purrr::map(function(site.obs){length(site.obs)}) %>%
+ unlist()
+
+ #start the blocking process
+ #should we consider interactions between sites?
+ if(as.numeric(settings$state.data.assimilation$scalef) == 0){
+ block.list <- vector("list", length(site.ids))
+ #loop over sites
+ for (i in seq_along(site.ids)) {
+ #store which block contains which sites.
+ block.list[[i]]$sites.per.block <- i
+ block.list[[i]]$site.ids <- site.ids[i]
+ block.list[[i]]$t <- t
+
+ #fill in mu.f and Pf
+ f.start <- (i - 1) * length(var.names) + 1
+ f.end <- i * length(var.names)
+ block.list[[i]]$data$muf <- mu.f[f.start:f.end]
+ block.list[[i]]$data$pf <- Pf[f.start:f.end, f.start:f.end]
+
+ #fill in y and r
+ if (obs_per_site[i] == 0) {
+ y.start <- 1
+ y.end <- length(var.names)
+ block.list[[i]]$data$y.censored <- rep(NA, length(var.names))
+ block.list[[i]]$data$r <- diag(1, length(var.names))
+ block.h <- matrix(1, 1, length(var.names))
+ } else {
+ y.start <- sum(obs_per_site[1:i-1]) + 1#obs_per_site[i] * (i - 1) + 1
+ y.end <- y.start + obs_per_site[i] - 1#obs_per_site[i] * i
+ block.list[[i]]$data$y.censored <- y.censored[y.start:y.end]
+ block.list[[i]]$data$r <- solve(R[y.start:y.end, y.start:y.end])
+ block.h <- Construct.H.multisite(site.ids[i], var.names, obs.mean[[t]])
+ }
+
+ #fill in constants.
+ block.list[[i]]$H <- block.h
+ block.list[[i]]$constant$H <- which(apply(block.h, 2, sum) == 1)
+ block.list[[i]]$constant$N <- length(f.start:f.end)
+ block.list[[i]]$constant$YN <- length(y.start:y.end)
+ block.list[[i]]$constant$q.type <- q.type
+ }
+ names(block.list) <- site.ids
+ } else {
+ #find networks given TRUE/FALSE matrix representing sites' interactions.
+ block.vec <- matrix_network(dis.matrix <= as.numeric(settings$state.data.assimilation$scalef))
+ #check if the matrix_network function is working correctly.
+ #check if the blocks are calculated correctly.
+ if (block.vec %>%
+ purrr::map(function(l){length(l)}) %>%
+ unlist %>%
+ sum() != length(site.ids)) {
+ PEcAn.logger::logger.severe("Block calculation failed, please check the matrix_network function!")
+ return(0)
+ }
+ block.list <- vector("list", length(block.vec))
+ #loop over sites
+ for (i in seq_along(block.vec)) {#i is site index
+ #store which block contains which sites.
+ ids <- block.vec[[i]]
+ block.list[[i]]$sites.per.block <- ids
+ block.list[[i]]$site.ids <- site.ids[ids]
+ block.list[[i]]$t <- t
+ y.ind <- f.ind <- c()
+ for (j in seq_along(ids)) {
+ f.start <- (ids[j] - 1) * length(var.names) + 1
+ f.end <- ids[j] * length(var.names)
+ y.start <- obs_per_site[ids[j]] * (ids[j] - 1) + 1
+ y.end <- obs_per_site[ids[j]] * ids[j]
+
+ f.ind <- c(f.ind, f.start:f.end)
+ y.ind <- c(y.ind, y.start:y.end)
+ }
+ #fill in mu.f and Pf
+ block.list[[i]]$data$muf <- mu.f[f.ind]
+ block.list[[i]]$data$pf <- GrabFillMatrix(Pf, f.ind)
+
+ #fill in y and R
+ block.list[[i]]$data$y.censored <- y.censored[y.ind]
+ block.list[[i]]$data$r <- GrabFillMatrix(solve(R), y.ind)
+
+ #fill in constants
+ block.h <- Construct.H.multisite(site.ids[ids], var.names, obs.mean[[t]])
+ block.list[[i]]$H <- block.h
+ block.list[[i]]$constant$H <- which(apply(block.h, 2, sum) == 1)
+ block.list[[i]]$constant$N <- length(f.ind)
+ block.list[[i]]$constant$YN <- length(y.ind)
+ block.list[[i]]$constant$q.type <- q.type
+ }
+ }
+
+ #return values.
+ block.list.all[[t]] <- block.list
+ return(list(block.list.all = block.list.all, H = H, Y = Y, R = R))
+}
+
+##' @title MCMC_Init
+##' @name MCMC_Init
+##' @author Dongchen Zhang
+##'
+##' @param block.list lists of blocks generated by the `build.block.xy` function.
+##' @param X A matrix contains ensemble forecasts.
+##' @details This function helps create initial conditions for the MCMC sampling.
+##'
+##' @return It returns the `block.list` object with initial conditions filled in.
+MCMC_Init <- function (block.list, X) {
+ var.names <- unique(attributes(X)$dimnames[[2]])
+ #sample mu.f from X.
+ sample.mu.f <- colMeans(X)
+ for (i in seq_along(block.list)) {
+ #number of observations.
+ num.obs <- length(block.list[[i]]$data$y.censored)
+ #loop over each site within each block
+ for (j in seq_along(block.list[[i]]$sites.per.block)) {
+ #initialize mu.f
+ start <- (block.list[[i]]$sites.per.block[j] - 1) * length(var.names) + 1
+ end <- (block.list[[i]]$sites.per.block[j]) * length(var.names)
+ block.list[[i]]$Inits$X.mod <- c(block.list[[i]]$Inits$X.mod, sample.mu.f[start:end])
+ #initialize X
+ block.list[[i]]$Inits$X <- block.list[[i]]$data$y.censored
+ #initialize Xs
+ block.list[[i]]$Inits$Xs <- block.list[[i]]$Inits$X.mod[block.list[[i]]$constant$H]
+ }
+ #initialize q.
+ #if we want the vector q.
+ if (block.list[[i]]$constant$q.type == 1) {
+ for (j in seq_along(block.list[[i]]$data$y.censored)) {
+ block.list[[i]]$Inits$q <- c(block.list[[i]]$Inits$q, stats::rgamma(1, shape = block.list[[i]]$data$aq[j], rate = block.list[[i]]$data$bq[j]))
+ }
+ } else if (block.list[[i]]$constant$q.type == 2) {
+ #if we want the wishart Q.
+ if ("try-error" %in% class(try(block.list[[i]]$Inits$q <-
+ stats::rWishart(1, df = block.list[[i]]$data$bq, Sigma = block.list[[i]]$data$aq)[,,1], silent = T))) {
+ block.list[[i]]$Inits$q <-
+ stats::rWishart(1, df = block.list[[i]]$data$bq, Sigma = stats::toeplitz((block.list[[i]]$constant$YN:1)/block.list[[i]]$constant$YN))[,,1]
+ }
+ }
+ }
+ #return values.
+ return(block.list)
+}
+
+##' @title MCMC_block_function
+##' @name MCMC_block_function
+##' @author Dongchen Zhang
+##'
+##' @param block each block within the `block.list` lists.
+##'
+##' @return It returns the `block` object with analysis results filled in.
+MCMC_block_function <- function(block) {
+ #build nimble model
+ #TODO: harmonize the MCMC code between block-based and general analysis functions to reduce the complexity of code.
+ model_pred <- nimble::nimbleModel(GEF.MultiSite.Nimble,
+ data = block$data,
+ inits = block$Inits,
+ constants = block$constant,
+ name = 'base')
+ #configure MCMC
+ conf <- nimble::configureMCMC(model_pred, print=FALSE)
+ conf$setMonitors(c("X", "X.mod", "q"))
+
+ #Handle samplers
+ #hear we change the RW_block sampler to the ess sampler
+ #because it has a better performance of MVN sampling
+ samplerLists <- conf$getSamplers()
+ samplerNumberOffset <- length(samplerLists)
+ if (block$constant$q.type == 1) {
+ #if we have vector q
+ #only X.mod should be sampled with ess sampler.
+ X.mod.ind <- which(grepl("X.mod", samplerLists %>% purrr::map(~ .x$target) %>% unlist()))
+ samplerLists[[X.mod.ind]]$setName("ess")
+ } else if (block$constant$q.type == 2) {
+ #if we have wishart q
+ #everything should be sampled with ess sampler.
+ samplerLists %>% purrr::map(function(l){l$setName("ess")})
+ }
+ conf$setSamplers(samplerLists)
+
+ #add toggle Y sampler.
+ for (i in 1:block$constant$YN) {
+ conf$addSampler(paste0("y.censored[", i, "]"), 'toggle', control=list(type='RW'))
+ }
+ conf$printSamplers()
+ #compile MCMC
+ Rmcmc <- nimble::buildMCMC(conf)
+ Cmodel <- nimble::compileNimble(model_pred)
+ Cmcmc <- nimble::compileNimble(Rmcmc, project = model_pred, showCompilerOutput = FALSE)
+
+ #if we don't have any NA in the Y.
+ if (!any(is.na(block$data$y.censored))) {
+ #add toggle Y sampler.
+ for(i in 1:block$constant$YN) {
+ valueInCompiledNimbleFunction(Cmcmc$samplerFunctions[[samplerNumberOffset+i]], 'toggle', 0)
+ }
+ }
+
+ #run MCMC
+ dat <- runMCMC(Cmcmc, niter = block$MCMC$niter, nburnin = block$MCMC$nburnin, thin = block$MCMC$nthin, nchains = block$MCMC$nchain)
+
+ #update aq, bq, mua, and pa
+ M <- colMeans(dat)
+ block$update$aq <- block$Inits$q
+ if (block$constant$q.type == 1) {
+ #if it's a vector q case
+ aq <- bq <- rep(NA, length(block$data$y.censored))
+ for (i in seq_along(aq)) {
+ CHAR <- paste0("[", i, "]")
+ aq[i] <- (mean(dat[, paste0("q", CHAR)]))^2/stats::var(dat[, paste0("q", CHAR)])
+ bq[i] <- mean(dat[, paste0("q", CHAR)])/stats::var(dat[, paste0("q", CHAR)])
+ }
+ #update aqq and bqq
+ block$aqq[,block$t+1] <- block$aqq[, block$t]
+ block$aqq[block$constant$H, block$t+1] <- aq
+ block$bqq[,block$t+1] <- block$bqq[, block$t]
+ block$bqq[block$constant$H, block$t+1] <- bq
+ } else if (block$constant$q.type == 2) {
+ #previous updates
+ mq <- dat[, grep("q", colnames(dat))] # Omega, Precision
+ q.bar <- matrix(apply(mq, 2, mean),
+ length(block$constant$H),
+ length(block$constant$H)
+ )
+ wish.df <- function(Om, X, i, j, col) {
+ (Om[i, j]^2 + Om[i, i] * Om[j, j]) / stats::var(X[, col])
+ }
+ col <- matrix(1:length(block$constant$H) ^ 2,
+ length(block$constant$H),
+ length(block$constant$H))
+ WV <- matrix(0, length(block$constant$H), length(block$constant$H))
+ for (i in seq_along(block$constant$H)) {
+ for (j in seq_along(block$constant$H)) {
+ WV[i, j] <- wish.df(q.bar, X = mq, i = i, j = j, col = col[i, j])
+ }
+ }
+ bq <- mean(WV)
+ if (bq < block$constant$YN) {
+ bq <- block$constant$YN
+ }
+ aq <- solve(q.bar) * bq
+ block$aqq[,,block$t+1] <- GrabFillMatrix(block$aqq[,,block$t], block$constant$H, aq)
+ block$bqq[block$t+1] <- bq
+ }
+ #update mua and pa; mufa, and pfa
+ iX <- grep("X[", colnames(dat), fixed = TRUE)
+ iX.mod <- grep("X.mod[", colnames(dat), fixed = TRUE)
+ if (length(iX) == 1) {
+ mua <- mean(dat[, iX])
+ pa <- stats::var(dat[, iX])
+ } else {
+ mua <- colMeans(dat[, iX])
+ pa <- stats::cov(dat[, iX])
+ }
+
+ if (length(iX.mod) == 1) {
+ mufa <- mean(dat[, iX.mod])
+ pfa <- stats::var(dat[, iX.mod])
+ } else {
+ mufa <- colMeans(dat[, iX.mod])
+ pfa <- stats::cov(dat[, iX.mod])
+ }
+
+ #return values.
+ block$update <- list(aq = aq, bq = bq, mua = mua, pa = pa, mufa = mufa, pfa = pfa)
+ return(block)
+}
+
+##' @title update_q
+##' @name update_q
+##' @author Dongchen Zhang
+##'
+##' @param block.list.all each block within the `block.list` lists.
+##' @param t time point.
+##' @param nt total length of time steps.
+##' @param aqq.Init the initial values of aqq, the default is NULL.
+##' @param bqq.Init the initial values of bqq, the default is NULL.
+##' @param MCMC_dat data frame of MCMC samples, the default it NULL.
+##' @param block.list.all.pre pre-existed block.list.all object for passing the aqq and bqq to the current SDA run, the default is NULL.
+##'
+##' @return It returns the `block.list.all` object with initialized/updated Q filled in.
+update_q <- function (block.list.all, t, nt, aqq.Init = NULL, bqq.Init = NULL, MCMC_dat = NULL, block.list.all.pre = NULL) {
+ block.list <- block.list.all[[t]]
+ #if it's an update.
+ if (is.null(MCMC_dat)) {
+ #loop over blocks
+ if (t == 1) {
+ for (i in seq_along(block.list)) {
+ nvar <- length(block.list[[i]]$data$muf)
+ nobs <- length(block.list[[i]]$data$y.censored)
+ if (block.list[[i]]$constant$q.type == 1) {
+ #initialize aqq and bqq for nt
+ if (!is.null(aqq.Init) && !is.null(bqq.Init)) {
+ block.list[[i]]$aqq <- array(aqq.Init, dim = c(nvar, nt + 1))
+ block.list[[i]]$bqq <- array(bqq.Init, dim = c(nvar, nt + 1))
+ } else {
+ block.list[[i]]$aqq <- array(1, dim = c(nvar, nt + 1))
+ block.list[[i]]$bqq <- array(1, dim = c(nvar, nt + 1))
+ }
+ #update aq and bq based on aqq and bqq
+ block.list[[i]]$data$aq <- block.list[[i]]$aqq[block.list[[i]]$constant$H, t]
+ block.list[[i]]$data$bq <- block.list[[i]]$bqq[block.list[[i]]$constant$H, t]
+ } else if (block.list[[i]]$constant$q.type == 2) {
+ #initialize aqq and bqq for nt
+ block.list[[i]]$aqq <- array(1, dim = c(nvar, nvar, nt + 1))
+ block.list[[i]]$aqq[,,t] <- stats::toeplitz((nvar:1)/nvar)
+ block.list[[i]]$bqq <- rep(nobs, nt + 1)
+ #update aq and bq based on aqq and bqq
+ block.list[[i]]$data$aq <- GrabFillMatrix(block.list[[i]]$aqq[,,t], block.list[[i]]$constant$H)
+ block.list[[i]]$data$bq <- block.list[[i]]$bqq[t]
+ }
+ }
+ } else if (t > 1) {
+ if (!is.null(block.list.all.pre)) {
+ block.list.pre <- block.list.all.pre[[t - 1]]
+ } else {
+ #if we want to update q from previous SDA runs.
+ block.list.pre <- block.list.all[[t - 1]]
+ }
+ for (i in seq_along(block.list)) {
+ nvar <- length(block.list[[i]]$data$muf)
+ nobs <- length(block.list[[i]]$data$y.censored)
+ if (block.list[[i]]$constant$q.type == 1) {
+ #copy previous aqq and bqq to the current t
+ block.list[[i]]$aqq <- block.list.pre[[i]]$aqq
+ block.list[[i]]$bqq <- block.list.pre[[i]]$bqq
+ #update aq and bq
+ block.list[[i]]$data$aq <- block.list[[i]]$aqq[block.list[[i]]$constant$H, t]
+ block.list[[i]]$data$bq <- block.list[[i]]$bqq[block.list[[i]]$constant$H, t]
+ } else if (block.list[[i]]$constant$q.type == 2) {
+ #initialize aqq and bqq for nt
+ block.list[[i]]$aqq <- block.list.pre[[i]]$aqq
+ block.list[[i]]$bqq <- block.list.pre[[i]]$bqq
+ #if previous Q is smaller than the actual YN.
+ if (block.list.pre[[i]]$bqq[t] <= block.list[[i]]$constant$YN) {
+ block.list[[i]]$bqq[t] <- block.list[[i]]$constant$YN
+ }
+ #update aq and bq based on aqq and bqq
+ block.list[[i]]$data$aq <- GrabFillMatrix(block.list[[i]]$aqq[,,t], block.list[[i]]$constant$H)
+ block.list[[i]]$data$bq <- block.list[[i]]$bqq[t]
+ }
+ }
+ }
+ } else {
+ #TODO: Implement the feature that Q can be updated based on the pft types.
+ }
+
+ #return values.
+ block.list.all[[t]] <- block.list
+ return(block.list.all)
+}
+
+##' @title block.2.vector
+##' @name block.2.vector
+##' @author Dongchen Zhang
+##'
+##' @param block.list lists of blocks generated by the `build.block.xy` function.
+##' @param X A matrix contains ensemble forecasts.
+##' @param H H index created by the `construct_nimble_H` function.
+##'
+##' @return It returns a list of analysis results by MCMC sampling.
+block.2.vector <- function (block.list, X, H) {
+ site.ids <- attributes(X)$Site
+ mu.f <- mu.a <- c()
+ Pf <- Pa <- matrix(0, length(site.ids), length(site.ids))
+ for (L in block.list) {
+ ind <- c()
+ for (id in L$site.ids) {
+ ind <- c(ind, which(site.ids == id))
+ }
+ #convert mu.f and pf
+ mu.a[ind] <- mu.f[ind] <- L$update$mufa
+ Pa[ind, ind] <- Pf[ind, ind] <- L$update$pfa
+ #convert mu.a and pa
+ ind <- intersect(ind, H$H.ind)
+ mu.a[ind] <- L$update$mua
+ Pa[ind, ind] <- L$update$pa
+ }
+ return(list(mu.f = mu.f,
+ Pf = Pf,
+ mu.a = mu.a,
+ Pa = Pa))
+}
\ No newline at end of file
diff --git a/modules/assim.sequential/R/Analysis_sda_multiSite.R b/modules/assim.sequential/R/Analysis_sda_multiSite.R
index 4b4c67f92c4..219e8983e80 100644
--- a/modules/assim.sequential/R/Analysis_sda_multiSite.R
+++ b/modules/assim.sequential/R/Analysis_sda_multiSite.R
@@ -15,7 +15,7 @@
##'
##' @return It returns a list with estimated mean and cov matrix of forecast state variables as well as mean and cov estimated as a result of assimilation/analysis .
##' @export
-EnKF.MultiSite <-function(settings, Forecast, Observed, H, extraArg=NULL, ...){
+EnKF.MultiSite <- function(settings, Forecast, Observed, H, extraArg=NULL, ...){
#------------------------------Setup
Localization.FUN <- settings$state.data.assimilation$Localization.FUN # localization function
scalef <- settings$state.data.assimilation$scalef %>% as.numeric() # scale factor for localization
@@ -72,7 +72,7 @@ EnKF.MultiSite <-function(settings, Forecast, Observed, H, extraArg=NULL, ...){
##' @rdname GEF
##' @export
-GEF.MultiSite<-function(settings, Forecast, Observed, H, extraArg,...){
+GEF.MultiSite <- function(settings, Forecast, Observed, H, extraArg,...){
#-- reading the dots and exposing them to the inside of the function
dots<-list(...)
if (length(dots) > 0) lapply(names(dots),function(name){assign(name,dots[[name]], pos = 1 )})
diff --git a/modules/assim.sequential/R/Multi_Site_Constructors.R b/modules/assim.sequential/R/Multi_Site_Constructors.R
index c8eeba37af3..e231750ae42 100755
--- a/modules/assim.sequential/R/Multi_Site_Constructors.R
+++ b/modules/assim.sequential/R/Multi_Site_Constructors.R
@@ -200,3 +200,96 @@ Construct.H.multisite <- function(site.ids, var.names, obs.t.mean){
}
H
}
+
+##' @title construct_nimble_H
+##' @name construct_nimble_H
+##' @author Dongchen Zhang
+##'
+##' @param site.ids a vector name of site ids
+##' @param var.names vector names of state variable names
+##' @param obs.t list of vector of means for the time t for different sites.
+##' @param pft.path physical path to the pft.csv file.
+##' @param by criteria, it supports by variable, site, pft, all, and single Q.
+##'
+##' @description This function is an upgrade to the Construct.H.multisite function which provides the index by different criteria.
+##'
+##' @return Returns one vector containing index for which Q to be estimated for which variable,
+##' and the other vector gives which state variable has which observation (= element.W.Data).
+##' @export
+construct_nimble_H <- function(site.ids, var.names, obs.t, pft.path = NULL, by = "single"){
+ if(by == "pft" | by == "block_pft_var" & is.null(pft.path)){
+ PEcAn.logger::logger.info("please provide pft path.")
+ return(0)
+ }
+ H <- Construct.H.multisite(site.ids, var.names, obs.t)
+ if (by == "var") {
+ total_var_name <- rep(var.names, length(site.ids))
+ Ind <- rep(0, dim(H)[2])
+ for (i in seq_along(var.names)) {
+ Ind[which(total_var_name == var.names[i])] <- i
+ }
+ } else if (by == "site") {
+ total_site_id <- rep(site.ids, each = length(var.names))
+ Ind <- rep(0, dim(H)[2])
+ for (i in seq_along(site.ids)) {
+ Ind[which(total_site_id == site.ids[i])] <- i
+ }
+ } else if (by == "pft") {
+ pft <- utils::read.csv(pft.path)
+ rownames(pft) <- pft$site
+ total_site_id <- rep(site.ids, each = length(var.names))
+ total_pft <- pft[total_site_id, 2]
+ Ind <- rep(0, dim(H)[2])
+ pft.names <- sort(unique(pft$pft))
+ for (i in seq_along(pft.names)) {
+ Ind[which(total_pft == pft.names[i])] <- i
+ }
+ } else if (by == "block_pft_var") {
+ #by pft
+ pft <- utils::read.csv(pft.path)
+ rownames(pft) <- pft$site
+ total_site_id <- rep(site.ids, each = length(var.names))
+ total_pft <- pft[total_site_id, 2]
+ Ind_pft <- rep(0, dim(H)[2])
+ pft.names <- sort(unique(pft$pft))
+ for (i in seq_along(pft.names)) {
+ Ind_pft[which(total_pft == pft.names[i])] <- i
+ }
+ #by var
+ total_var_name <- rep(var.names, length(site.ids))
+ Ind_var <- rep(0, dim(H)[2])
+ for (i in seq_along(var.names)) {
+ Ind_var[which(total_var_name == var.names[i])] <- i
+ }
+ #by site
+ total_site_id <- rep(site.ids, each = length(var.names))
+ Ind_site <- rep(0, dim(H)[2])
+ for (i in seq_along(site.ids)) {
+ Ind_site[which(total_site_id == site.ids[i])] <- i
+ }
+ # #create reference to which block and which var
+ # #Ind for which site should use which block
+ # block.index <- var.index <- Ind_site
+ # for (i in seq_along(Ind_site)) {
+ # Ind_block[i] <- Ind_pft[i]
+ # }
+ } else if (by == "all") {
+ Ind <- 1:dim(H)[2]
+ } else if (by == "single") {
+ Ind <- rep(1, dim(H)[2])
+ } else {
+ PEcAn.logger::logger.info("Couldn't find the proper by argument!")
+ return(0)
+ }
+ if (by == "block_pft_var") {
+ return(list(Ind_pft = Ind_pft[which(apply(H, 2, sum) == 1)],
+ Ind_site = Ind_site[which(apply(H, 2, sum) == 1)],
+ Ind_var = Ind_var[which(apply(H, 2, sum) == 1)],
+ H.ind = which(apply(H, 2, sum) == 1)))
+ } else {
+ return(list(Q.ind = Ind[which(apply(H, 2, sum) == 1)],
+ H.ind = which(apply(H, 2, sum) == 1),
+ H.matrix = H))
+ }
+
+}
\ No newline at end of file
diff --git a/modules/assim.sequential/R/Nimble_codes.R b/modules/assim.sequential/R/Nimble_codes.R
index 38929c17e25..4a2e8fc6760 100644
--- a/modules/assim.sequential/R/Nimble_codes.R
+++ b/modules/assim.sequential/R/Nimble_codes.R
@@ -176,48 +176,61 @@ tobit.model <- nimbleCode({
#' @format TBD
#' @export
GEF.MultiSite.Nimble <- nimbleCode({
- if (q.type == 1) {
- # Sorting out qs
- qq ~ dgamma(aq, bq) ## aq and bq are estimated over time
- q[1:YN, 1:YN] <- qq * diag(YN)
- } else if (q.type == 2) {
- # Sorting out qs
- q[1:YN, 1:YN] ~ dwish(R = aq[1:YN, 1:YN], df = bq) ## aq and bq are estimated over time
-
- }
-
# X model
X.mod[1:N] ~ dmnorm(mean = muf[1:N], cov = pf[1:N, 1:N])
-
- for (i in 1:nH) {
- tmpX[i] <- X.mod[H[i]]
- Xs[i] <- tmpX[i]
- }
- ## add process error to x model but just for the state variables that we have data and H knows who
- X[1:YN] ~ dmnorm(Xs[1:YN], prec = q[1:YN, 1:YN])
-
- ## Likelihood
- y.censored[1:YN] ~ dmnorm(X[1:YN], prec = r[1:YN, 1:YN])
-
- # #puting the ones that they don't have q in Xall - They come from X.model
- # # If I don't have data on then then their q is not identifiable, so we use the same Xs as Xmodel
- if(nNotH > 0){
- for (j in 1:nNotH) {
- tmpXmod[j] <- X.mod[NotH[j]]
- Xall[NotH[j]] <- tmpXmod[j]
+ if (q.type == 1 | q.type == 2) {
+ if (q.type == 1) {#single Q
+ # Sorting out qs
+ qq ~ dgamma(aq, bq) ## aq and bq are estimated over time
+ q[1:YN, 1:YN] <- qq * diag(YN)
+ } else if (q.type == 2) {#site Q
+ # Sorting out qs
+ q[1:YN, 1:YN] ~ dwish(R = aq[1:YN, 1:YN], df = bq) ## aq and bq are estimated over time
}
+
+ for (i in 1:nH) {
+ tmpX[i] <- X.mod[H[i]]
+ Xs[i] <- tmpX[i]
+ }
+ ## add process error to x model but just for the state variables that we have data and H knows who
+ X[1:YN] ~ dmnorm(Xs[1:YN], prec = q[1:YN, 1:YN])
+
+ ## Likelihood
+ y.censored[1:YN] ~ dmnorm(X[1:YN], prec = r[1:YN, 1:YN])
+
+ # #puting the ones that they don't have q in Xall - They come from X.model
+ # # If I don't have data on then then their q is not identifiable, so we use the same Xs as Xmodel
+ if(nNotH > 0){
+ for (j in 1:nNotH) {
+ tmpXmod[j] <- X.mod[NotH[j]]
+ Xall[NotH[j]] <- tmpXmod[j]
+ }
+ }
+ } else if (q.type == 3) {#Vector Q
+ for (i in 1:YN) {
+ #sample Q.
+ q[i] ~ dgamma(shape = aq[i], rate = bq[i])
+ if (length(H) == 1) {
+ X[i] ~ dnorm(X.mod[H], sd = 1/sqrt(q[i]))
+ } else {
+ #sample latent variable X.
+ X[i] ~ dnorm(X.mod[H[i]], sd = 1/sqrt(q[i]))
+ }
+ #likelihood
+ y.censored[i] ~ dnorm(X[i], sd = 1/sqrt(r[i, i]))
+ }
+ } else if (q.type == 4) {#Wishart Q
+ #if it's a Wishart Q.
+ #sample Q.
+ q[1:YN, 1:YN] ~ dwishart(R = aq[1:YN, 1:YN], df = bq)
+ #sample latent variable X.
+ for (i in 1:YN) {
+ Xs[i] <- X.mod[H[i]]
+ }
+ X[1:YN] ~ dmnorm(Xs[1:YN], prec = q[1:YN, 1:YN])
+ #likelihood
+ y.censored[1:YN] ~ dmnorm(X[1:YN], prec = r[1:YN, 1:YN])
}
-
- #These are the one that they have data and their q can be estimated.
- for (i in 1:nH) {
- tmpXH[i] <- X[i]
- Xall[H[i]] <- tmpXH[i]
- }
-
- for (i in 1:YN) {
- y.ind[i] ~ dinterval(y.censored[i], 0)
- }
-
})
#sampler_toggle------------------------------------------------------------------------------------------------
diff --git a/modules/assim.sequential/R/SDA_OBS_Assembler.R b/modules/assim.sequential/R/SDA_OBS_Assembler.R
index 9510a2ad89a..e9e778e9af8 100644
--- a/modules/assim.sequential/R/SDA_OBS_Assembler.R
+++ b/modules/assim.sequential/R/SDA_OBS_Assembler.R
@@ -19,6 +19,21 @@ SDA_OBS_Assembler <- function(settings){
#extract Obs_Prep object from settings.
Obs_Prep <- settings$state.data.assimilation$Obs_Prep
+ #check if we want to proceed the free run without any observations.
+ if (as.logical(settings$state.data.assimilation$free.run)) {
+ #calculate time points.
+ time_points <- obs_timestep2timepoint(Obs_Prep$start.date, Obs_Prep$end.date, Obs_Prep$timestep)
+
+ #generate obs.mean and obs.cov with NULL filled.
+ obs.mean = vector("list", length(time_points)) %>% `names<-`(time_points)
+ obs.cov = vector("list", length(time_points)) %>% `names<-`(time_points)
+
+ #save files.
+ save(obs.mean, file = file.path(Obs_Prep$outdir, "Rdata", "obs.mean.Rdata"))
+ save(obs.cov, file = file.path(Obs_Prep$outdir, "Rdata", "obs.cov.Rdata"))
+ return(list(obs.mean = obs.mean, obs.cov = obs.cov))
+ }
+
#prepare site_info offline, because we need to submit this to server remotely, which might not support the Bety connection.
site_info <- settings %>%
purrr::map(~.x[['run']] ) %>%
diff --git a/modules/assim.sequential/R/matrix_operation.R b/modules/assim.sequential/R/matrix_operation.R
new file mode 100644
index 00000000000..c5177c21ece
--- /dev/null
+++ b/modules/assim.sequential/R/matrix_operation.R
@@ -0,0 +1,77 @@
+##' @title GrabFillMatrix
+##' @name GrabFillMatrix
+##' @author Dongchen Zhang
+##'
+##' @param M source matrix that will be either subtracted or filled in.
+##' @param ind vector of index that of where to be subtracted or filled in.
+##' @param M1 additional matrix used to fill in the source matrix, the default it NULL.
+##' @details This function helps subtract or fill in a matrix given the index.
+##'
+##' @export
+GrabFillMatrix <- function (M, ind, M1 = NULL) {
+ if (is.null(M1)) {
+ #grab a sub-matrix
+ m <- matrix(NA, length(ind), length(ind))
+ for (i in seq_along(ind)) {
+ for (j in seq_along(ind)) {
+ m[i, j] <- M[ind[i], ind[j]]
+ }
+ }
+ } else {
+ #fill into a larger matrix
+ m <- M
+ for (i in seq_along(ind)) {
+ for (j in seq_along(ind)) {
+ m[ind[i], ind[j]] <- M1[i, j]
+ }
+ }
+ }
+ m
+}
+
+##' @title matrix_network
+##' @name matrix_network
+##' @author Dongchen Zhang
+##'
+##' @param mat a boolean matrix representing the interactions between any sites.
+##'
+##' @return It returns lists of index representing each network.
+##'
+##' @export
+matrix_network <- function (mat) {
+ #initialize the final returned list.
+ vec_group <- vector("list", ncol(mat))
+ #initialize the vector for sites that are completed.
+ sites.complete <- c()
+ for (i in 1:ncol(mat)) {
+ #if we already completed the ith site, go next.
+ if (i %in% sites.complete) {
+ next
+ }
+ #initialize the arguments for the while loop.
+ vec <- c()
+ stop <- FALSE
+ inits <- i
+ #while loop
+ while (!stop) {
+ Inits <- c()
+ for (init in inits) {
+ Inits <- c(Inits, which(mat[init,]))
+ }
+ Inits <- Inits[which(!Inits %in% vec)]
+ vec <- sort(unique(c(vec, Inits)))
+ #if we don't have any new site that belongs to this network.
+ if (length(Inits) == 0) {
+ #then stop.
+ stop <- !stop
+ } else {
+ #else we initialize a new round of searching by new sites.
+ inits <- Inits
+ }
+ }
+ sites.complete <- c(sites.complete, vec)
+ vec_group[[i]] <- sort(vec)
+ }
+ vec_group[sapply(vec_group, is.null)] <- NULL
+ return(vec_group)
+}
\ No newline at end of file
diff --git a/modules/assim.sequential/R/sda.enkf_MultiSite.R b/modules/assim.sequential/R/sda.enkf_MultiSite.R
index a1614aa93af..569aea85880 100644
--- a/modules/assim.sequential/R/sda.enkf_MultiSite.R
+++ b/modules/assim.sequential/R/sda.enkf_MultiSite.R
@@ -3,20 +3,25 @@
#' @author Michael Dietze, Ann Raiho and Alexis Helgeson \email{dietze@@bu.edu}
#'
#' @param settings PEcAn settings object
-#' @param obs.mean List of dataframe of observation means, named with observation datetime.
-#' @param obs.cov List of covariance matrices of state variables , named with observation datetime.
+#' @param obs.mean Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation means for each state variables of each site for each time point.
+#' @param obs.cov Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point.
#' @param Q Process covariance matrix given if there is no data to estimate it.
-#' @param restart Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA
-#' @param forceRun Used to force job.sh files that were not run for ensembles in SDA (quick fix)
-#' @param keepNC Used for debugging issues. .nc files are usually removed after each year in the out folder. This flag will keep the .nc + .nc.var files for futher investigations.
-#' @param pre_enkf_params Used for carrying out SDA with pre-existed enkf.params, in which the Pf, aqq, and bqq can be used for the analysis step.
-#' @param run_parallel If allows to proceed under parallel mode, default is TRUE.
+#' @param restart Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA.
+#' @param pre_enkf_params Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors.
#' @param ensemble.samples Pass ensemble.samples from outside to avoid GitHub check issues.
-#' @param parallel_qsub Bool variable decide if you want to submit the bash jobs under the parallel mode, the default value is TRUE.
-#' @param free_run Bool variable decide if the run is a free run with no analysis been used, the default value is FALSE.
-#' @param send_email List object containing the "from", "to", and "body", the default value is NULL.
-#' @param control List of flags controlling the behaviour of the SDA. trace for reporting back the SDA outcomes, interactivePlot for plotting the outcomes after each step,
-#' TimeseriesPlot for post analysis examination, BiasPlot for plotting the correlation between state variables, plot.title is the title of post analysis plots and debug mode allows for pausing the code and examining the variables inside the function.
+#' @param control List of flags controlling the behavior of the SDA.
+#' `trace` for reporting back the SDA outcomes;
+#' `TimeseriesPlot` for post analysis examination;
+#' `debug` decide if we want to pause the code and examining the variables inside the function;
+#' `pause` decide if we want to pause the SDA workflow at current time point t;
+#' `Profiling` decide if we want to export the temporal SDA outputs in CSV file;
+#' `OutlierDetection` decide if we want to execute the outlier detection each time after the model forecasting;
+#' `parallel_qsub` decide if we want to execute the `qsub` job submission under parallel mode;
+#' `send_email` contains lists for sending email to report the SDA progress;
+#' `keepNC` decide if we want to keep the NetCDF files inside the out directory;
+#' `forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations;
+#' `run_parallel` decide if we want to run the SDA under parallel mode for the `future_map` function;
+#' `MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.).
#'
#’ @details
#’ Restart mode: Basic idea is that during a restart (primary case envisioned as an iterative forecast), a new workflow folder is created and the previous forecast for the start_time is copied over. During restart the initial run before the loop is skipped, with the info being populated from the previous run. The function then dives right into the first Analysis, then continues on like normal.
@@ -32,25 +37,20 @@ sda.enkf.multisite <- function(settings,
obs.cov,
Q = NULL,
restart = NULL,
- forceRun = TRUE,
- keepNC = TRUE,
pre_enkf_params = NULL,
- run_parallel = TRUE,
ensemble.samples = NULL,
- parallel_qsub = TRUE,
- free_run = FALSE,
- send_email = NULL,
control=list(trace = TRUE,
- FF = FALSE,
- interactivePlot = FALSE,
TimeseriesPlot = FALSE,
- BiasPlot = FALSE,
- plot.title = NULL,
- facet.plots = FALSE,
debug = FALSE,
pause = FALSE,
Profiling = FALSE,
- OutlierDetection=FALSE),
+ OutlierDetection=FALSE,
+ parallel_qsub = TRUE,
+ send_email = NULL,
+ keepNC = TRUE,
+ forceRun = TRUE,
+ run_parallel = TRUE,
+ MCMC.args = NULL),
...) {
#add if/else for when restart points to folder instead if T/F set restart as T
if(is.list(restart)){
@@ -60,7 +60,7 @@ sda.enkf.multisite <- function(settings,
}else{
restart_flag = FALSE
}
- if(run_parallel){
+ if(control$run_parallel){
if (future::supportsMulticore()) {
future::plan(future::multicore)
} else {
@@ -431,15 +431,12 @@ sda.enkf.multisite <- function(settings,
writeLines(runs.tmp[runs.tmp != ''], file.path(rundir, 'runs.txt'))
paste(file.path(rundir, 'runs.txt')) ## testing
Sys.sleep(0.01) ## testing
- if(parallel_qsub){
- PEcAn.remote::qsub_parallel(settings, files=PEcAn.remote::merge_job_files(settings, 10), prefix = paste0(obs.year, ".nc"))
+ if(control$parallel_qsub){
+ PEcAn.remote::qsub_parallel(settings, files=PEcAn.remote::merge_job_files(settings, control$jobs.per.file), prefix = paste0(obs.year, ".nc"))
}else{
PEcAn.workflow::start_model_runs(settings, write=settings$database$bety$write)
}
-
-
#------------- Reading - every iteration and for SDA
-
#put building of X into a function that gets called
max_t <- 0
while("try-error" %in% class(
@@ -456,9 +453,10 @@ sda.enkf.multisite <- function(settings,
){
Sys.sleep(10)
max_t <- max_t + 1
- if(max_t > 20){
+ if(max_t > 3){
PEcAn.logger::logger.info("Can't find outputed NC file! Please rerun the code!")
break
+ return(0)
}
PEcAn.logger::logger.info("Empty folder, try again!")
}
@@ -496,117 +494,143 @@ sda.enkf.multisite <- function(settings,
###-------------------------------------------------------------------###
### preparing OBS ###
###-------------------------------------------------------------------###----
- if (!is.null(obs.mean[[t]][[1]]) & !control$free_run) { ## | any(sapply(obs.mean[[t]],function(x){any(!is.na(x))}))
+ #To trigger the analysis function with free run, you need to first specify the control$forceRun as TRUE,
+ #Then specify the settings$state.data.assimilation$scalef as 0, and settings$state.data.assimilation$free.run as TRUE.
+ if (!is.null(obs.mean[[t]][[1]]) | as.logical(settings$state.data.assimilation$free.run) & control$forceRun) {
# TODO: as currently configured, Analysis runs even if all obs are NA,
# which clearly should be triggering the `else` of this if, but the
# `else` has not been invoked in a while an may need updating
- if (control$debug) browser()
- #Making R and Y
- Obs.cons <- Construct.R(site.ids, var.names, obs.mean[[t]], obs.cov[[t]])
-
- Y <- Obs.cons$Y
- R <- Obs.cons$R
- if (length(Y) > 1) {
- PEcAn.logger::logger.info("The zero variances in R and Pf is being replaced by half and one fifth of the minimum variance in those matrices respectively.")
- diag(R)[which(diag(R)==0)] <- min(diag(R)[which(diag(R) != 0)])/2
+ #decide if we want to estimate the process variance and choose the according function.
+ if(processvar == FALSE) {
+ an.method<-EnKF
+ } else if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("SINGLE", "SITE")) {
+ an.method<-GEF.MultiSite
}
- # making the mapping operator
- H <- Construct.H.multisite(site.ids, var.names, obs.mean[[t]])
- #Pass aqq and bqq.
- aqq <- NULL
- bqq <- numeric(nt + 1)
- Pf <- NULL
- #if t>1
- if(is.null(pre_enkf_params) && t>1){
- aqq <- enkf.params[[t-1]]$aqq
- bqq <- enkf.params[[t-1]]$bqq
- X.new<-enkf.params[[t-1]]$X.new
- }
- if(!is.null(pre_enkf_params) && t>1){
- aqq <- pre_enkf_params[[t-1]]$aqq
- bqq <- pre_enkf_params[[t-1]]$bqq
- X.new<-pre_enkf_params[[t-1]]$X.new
- }
- if(!is.null(pre_enkf_params)){
- Pf <- pre_enkf_params[[t]]$Pf
- }
-
- recompileTobit = !exists('Cmcmc_tobit2space')
- recompileGEF = !exists('Cmcmc')
-
- #weight list
- # This reads ensemble weights generated by `get_ensemble_weights` function from assim.sequential package
- weight_list <- list()
- if(!file.exists(file.path(settings$outdir, "ensemble_weights.Rdata"))){
- PEcAn.logger::logger.warn("ensemble_weights.Rdata cannot be found. Make sure you generate samples by running the get.ensemble.weights function before running SDA if you want the ensembles to be weighted.")
- #create null list
- for(tt in 1:length(obs.times)){
- weight_list[[tt]] <- rep(1,nens) #no weights
+ #decide if we want the block analysis function or multi-site analysis function.
+ if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("vector", "wishart")) {
+ #initialize block.list.all.
+ if (t == 1 | !exists("block.list.all")) {
+ block.list.all <- obs.mean %>% purrr::map(function(l){NULL})
}
- } else{
- load(file.path(settings$outdir, "ensemble_weights.Rdata")) ## loads ensemble.samples
- }
- wts <- unlist(weight_list[[t]])
- ###-------------------------------------------------------------------###
- ### Analysis ###
- ###-------------------------------------------------------------------###----
- if(processvar == FALSE){an.method<-EnKF}else{an.method<-GEF.MultiSite}
-
- #-analysis function
- enkf.params[[obs.t]] <- GEF.MultiSite(
- settings,
- FUN = an.method,
- Forecast = list(Q = Q, X = X),
- Observed = list(R = R, Y = Y),
- H = H,
- extraArg = list(
- aqq = aqq,
- bqq = bqq,
- Pf = Pf,
- t = t,
- nitr.GEF = nitr.GEF,
- nthin = nthin,
- nburnin = nburnin,
- censored.data = censored.data,
- recompileGEF = recompileGEF,
- recompileTobit = recompileTobit,
- wts = wts
- ),
- choose = choose,
- nt = nt,
- obs.mean = obs.mean,
- nitr = 100000,
- nburnin = 10000,
- obs.cov = obs.cov,
- site.ids = site.ids,
- blocked.dis = blocked.dis,
- distances = distances
- )
- tictoc::tic(paste0("Preparing for Adjustment for cycle = ", t))
- #Forecast
- mu.f <- enkf.params[[obs.t]]$mu.f
- Pf <- enkf.params[[obs.t]]$Pf
- #Analysis
- Pa <- enkf.params[[obs.t]]$Pa
- mu.a <- enkf.params[[obs.t]]$mu.a
- #extracting extra outputs
- if (control$debug) browser()
- if (processvar) {
- aqq <- enkf.params[[obs.t]]$aqq
- bqq <- enkf.params[[obs.t]]$bqq
- }
- # Adding obs elements to the enkf.params
- #This can later on help with diagnostics
- enkf.params[[obs.t]] <-
- c(
- enkf.params[[obs.t]],
- R = list(R),
- Y = list(Y),
- RestartList = list(restart.list %>% stats::setNames(site.ids))
+ #initialize MCMC arguments.
+ if (is.null(control$MCMC.args)) {
+ MCMC.args <- list(niter = 1e5,
+ nthin = 10,
+ nchain = 3,
+ nburnin = 5e4)
+ } else {
+ MCMC.args <- control$MCMC.args
+ }
+ #running analysis function.
+ enkf.params[[obs.t]] <- analysis_sda_block(settings, block.list.all, X, obs.mean, obs.cov, t, nt, MCMC.args, pre_enkf_params)
+ enkf.params[[obs.t]] <- c(enkf.params[[obs.t]], RestartList = list(restart.list %>% stats::setNames(site.ids)))
+ block.list.all <- enkf.params[[obs.t]]$block.list.all
+ #Forecast
+ mu.f <- enkf.params[[obs.t]]$mu.f
+ Pf <- enkf.params[[obs.t]]$Pf
+ #Analysis
+ Pa <- enkf.params[[obs.t]]$Pa
+ mu.a <- enkf.params[[obs.t]]$mu.a
+ } else if (exists("an.method")) {
+ #Making R and Y
+ Obs.cons <- Construct.R(site.ids, var.names, obs.mean[[t]], obs.cov[[t]])
+ Y <- Obs.cons$Y
+ R <- Obs.cons$R
+ if (length(Y) > 1) {
+ PEcAn.logger::logger.info("The zero variances in R and Pf is being replaced by half and one fifth of the minimum variance in those matrices respectively.")
+ diag(R)[which(diag(R)==0)] <- min(diag(R)[which(diag(R) != 0)])/2
+ }
+ # making the mapping operator
+ H <- Construct.H.multisite(site.ids, var.names, obs.mean[[t]])
+ #Pass aqq and bqq.
+ aqq <- NULL
+ bqq <- numeric(nt + 1)
+ Pf <- NULL
+ #if t>1
+ if(is.null(pre_enkf_params) && t>1){
+ aqq <- enkf.params[[t-1]]$aqq
+ bqq <- enkf.params[[t-1]]$bqq
+ X.new<-enkf.params[[t-1]]$X.new
+ }
+ if(!is.null(pre_enkf_params) && t>1){
+ aqq <- pre_enkf_params[[t-1]]$aqq
+ bqq <- pre_enkf_params[[t-1]]$bqq
+ X.new<-pre_enkf_params[[t-1]]$X.new
+ }
+ if(!is.null(pre_enkf_params)){
+ Pf <- pre_enkf_params[[t]]$Pf
+ }
+ recompileTobit = !exists('Cmcmc_tobit2space')
+ recompileGEF = !exists('Cmcmc')
+ #weight list
+ # This reads ensemble weights generated by `get_ensemble_weights` function from assim.sequential package
+ weight_list <- list()
+ if(!file.exists(file.path(settings$outdir, "ensemble_weights.Rdata"))){
+ PEcAn.logger::logger.warn("ensemble_weights.Rdata cannot be found. Make sure you generate samples by running the get.ensemble.weights function before running SDA if you want the ensembles to be weighted.")
+ #create null list
+ for(tt in 1:length(obs.times)){
+ weight_list[[tt]] <- rep(1,nens) #no weights
+ }
+ } else{
+ load(file.path(settings$outdir, "ensemble_weights.Rdata")) ## loads ensemble.samples
+ }
+ wts <- unlist(weight_list[[t]])
+ #-analysis function
+ enkf.params[[obs.t]] <- GEF.MultiSite(
+ settings,
+ FUN = an.method,
+ Forecast = list(Q = Q, X = X),
+ Observed = list(R = R, Y = Y),
+ H = H,
+ extraArg = list(
+ aqq = aqq,
+ bqq = bqq,
+ Pf = Pf,
+ t = t,
+ nitr.GEF = nitr.GEF,
+ nthin = nthin,
+ nburnin = nburnin,
+ censored.data = censored.data,
+ recompileGEF = recompileGEF,
+ recompileTobit = recompileTobit,
+ wts = wts
+ ),
+ choose = choose,
+ nt = nt,
+ obs.mean = obs.mean,
+ nitr = 100000,
+ nburnin = 10000,
+ obs.cov = obs.cov,
+ site.ids = site.ids,
+ blocked.dis = blocked.dis,
+ distances = distances
)
+ tictoc::tic(paste0("Preparing for Adjustment for cycle = ", t))
+ #Forecast
+ mu.f <- enkf.params[[obs.t]]$mu.f
+ Pf <- enkf.params[[obs.t]]$Pf
+ #Analysis
+ Pa <- enkf.params[[obs.t]]$Pa
+ mu.a <- enkf.params[[obs.t]]$mu.a
+ #extracting extra outputs
+ if (control$debug) browser()
+ if (processvar) {
+ aqq <- enkf.params[[obs.t]]$aqq
+ bqq <- enkf.params[[obs.t]]$bqq
+ }
+ # Adding obs elements to the enkf.params
+ #This can later on help with diagnostics
+ enkf.params[[obs.t]] <-
+ c(
+ enkf.params[[obs.t]],
+ R = list(R),
+ Y = list(Y),
+ RestartList = list(restart.list %>% stats::setNames(site.ids))
+ )
+ }
###-------------------------------------------------------------------###
### Trace ###
@@ -615,11 +639,9 @@ sda.enkf.multisite <- function(settings,
if(control$trace) {
PEcAn.logger::logger.warn ("\n --------------------------- ",obs.year," ---------------------------\n")
PEcAn.logger::logger.warn ("\n --------------Obs mean----------- \n")
- print(Y)
+ print(enkf.params[[obs.t]]$Y)
PEcAn.logger::logger.warn ("\n --------------Obs Cov ----------- \n")
- print(R)
- PEcAn.logger::logger.warn ("\n --------------Obs H ----------- \n")
- print(H)
+ print(enkf.params[[obs.t]]$R)
PEcAn.logger::logger.warn ("\n --------------Forecast mean ----------- \n")
print(enkf.params[[obs.t]]$mu.f)
PEcAn.logger::logger.warn ("\n --------------Forecast Cov ----------- \n")
@@ -652,11 +674,12 @@ sda.enkf.multisite <- function(settings,
#will throw an error when q.bar and Pf are different sizes i.e. when you are running with no obs and do not variance for all state variables
#Pa <- Pf + solve(q.bar)
#hack have Pa = Pf for now
- if(!is.null(pre_enkf_params)){
- Pf <- pre_enkf_params[[t]]$Pf
- }else{
- Pf <- stats::cov(X) # Cov Forecast - This is used as an initial condition
- }
+ # if(!is.null(pre_enkf_params)){
+ # Pf <- pre_enkf_params[[t]]$Pf
+ # }else{
+ # Pf <- stats::cov(X) # Cov Forecast - This is used as an initial condition
+ # }
+ Pf <- stats::cov(X)
Pa <- Pf
}
enkf.params[[obs.t]] <- list(mu.f = mu.f, Pf = Pf, mu.a = mu.a, Pa = Pa)
@@ -701,14 +724,18 @@ sda.enkf.multisite <- function(settings,
tictoc::tic(paste0("Visulization for cycle = ", t))
#writing down the image - either you asked for it or nor :)
- if ((t%%2 == 0 | t == nt) & (control$TimeseriesPlot) & !is.null(obs.mean[[t]][[1]])){
- SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS", "OBS"))
- }
+ if ((t%%2 == 0 | t == nt) & (control$TimeseriesPlot)){
+ if (as.logical(settings$state.data.assimilation$free.run)) {
+ SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS"))
+ } else {
+ SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS", "OBS"))
+ }
+ }
#Saving the profiling result
if (control$Profiling) alltocs(file.path(settings$outdir,"SDA", "Profiling.csv"))
# remove files as SDA runs
- if (!(keepNC)){
+ if (!(control$keepNC) && t == 1){
unlink(list.files(outdir, "*.nc", recursive = TRUE, full.names = TRUE))
}
if(!is.null(control$send_email)){
diff --git a/modules/assim.sequential/inst/MultiSite-Exs/SDA/Create_Multi_settings.R b/modules/assim.sequential/inst/MultiSite-Exs/SDA/Create_Multi_settings.R
index e259db3343c..eda487b4cf1 100644
--- a/modules/assim.sequential/inst/MultiSite-Exs/SDA/Create_Multi_settings.R
+++ b/modules/assim.sequential/inst/MultiSite-Exs/SDA/Create_Multi_settings.R
@@ -58,12 +58,16 @@ template <- PEcAn.settings::Settings(list(
############################################################################
state.data.assimilation = structure(list(
process.variance = TRUE,
+ aqq.Init = 1,
+ bqq.Init = 1,
adjustment = TRUE,
censored.data = FALSE,
+ free.run = FALSE,
FullYearNC = TRUE,
NC.Overwrite = FALSE,
NC.Prefix = "sipnet.out",
q.type = "SINGLE",
+ by.site = FALSE,
Localization.FUN = "Local.support",
scalef = 1,
chains = 5,
diff --git a/modules/assim.sequential/man/GrabFillMatrix.Rd b/modules/assim.sequential/man/GrabFillMatrix.Rd
new file mode 100644
index 00000000000..1f1ee4f6c58
--- /dev/null
+++ b/modules/assim.sequential/man/GrabFillMatrix.Rd
@@ -0,0 +1,24 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/matrix_operation.R
+\name{GrabFillMatrix}
+\alias{GrabFillMatrix}
+\title{GrabFillMatrix}
+\usage{
+GrabFillMatrix(M, ind, M1 = NULL)
+}
+\arguments{
+\item{M}{source matrix that will be either subtracted or filled in.}
+
+\item{ind}{vector of index that of where to be subtracted or filled in.}
+
+\item{M1}{additional matrix used to fill in the source matrix, the default it NULL.}
+}
+\description{
+GrabFillMatrix
+}
+\details{
+This function helps subtract or fill in a matrix given the index.
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/MCMC_Init.Rd b/modules/assim.sequential/man/MCMC_Init.Rd
new file mode 100644
index 00000000000..df583db642f
--- /dev/null
+++ b/modules/assim.sequential/man/MCMC_Init.Rd
@@ -0,0 +1,25 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{MCMC_Init}
+\alias{MCMC_Init}
+\title{MCMC_Init}
+\usage{
+MCMC_Init(block.list, X)
+}
+\arguments{
+\item{block.list}{lists of blocks generated by the `build.block.xy` function.}
+
+\item{X}{A matrix contains ensemble forecasts.}
+}
+\value{
+It returns the `block.list` object with initial conditions filled in.
+}
+\description{
+MCMC_Init
+}
+\details{
+This function helps create initial conditions for the MCMC sampling.
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/MCMC_block_function.Rd b/modules/assim.sequential/man/MCMC_block_function.Rd
new file mode 100644
index 00000000000..7fffaad194f
--- /dev/null
+++ b/modules/assim.sequential/man/MCMC_block_function.Rd
@@ -0,0 +1,20 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{MCMC_block_function}
+\alias{MCMC_block_function}
+\title{MCMC_block_function}
+\usage{
+MCMC_block_function(block)
+}
+\arguments{
+\item{block}{each block within the `block.list` lists.}
+}
+\value{
+It returns the `block` object with analysis results filled in.
+}
+\description{
+MCMC_block_function
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/analysis_sda_block.Rd b/modules/assim.sequential/man/analysis_sda_block.Rd
new file mode 100644
index 00000000000..4be6f3ba235
--- /dev/null
+++ b/modules/assim.sequential/man/analysis_sda_block.Rd
@@ -0,0 +1,49 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{analysis_sda_block}
+\alias{analysis_sda_block}
+\title{analysis_sda_block}
+\usage{
+analysis_sda_block(
+ settings,
+ block.list.all,
+ X,
+ obs.mean,
+ obs.cov,
+ t,
+ nt,
+ MCMC.args,
+ block.list.all.pre = NULL
+)
+}
+\arguments{
+\item{settings}{pecan standard multi-site settings list.}
+
+\item{block.list.all}{Lists of forecast and analysis outputs for each time point of each block. If t=1, we initialize those outputs of each block with NULL from the `sda.enkf.multisite` function.}
+
+\item{X}{A matrix contains ensemble forecasts with the dimensions of `[ensemble number, site number * number of state variables]`. The columns are matched with the site.ids and state variable names of the inside the `FORECAST` object in the `sda.enkf.multisite` script.}
+
+\item{obs.mean}{Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation means for each state variables of each site for each time point.}
+
+\item{obs.cov}{Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point.}
+
+\item{t}{time point in format of YYYY-MM-DD.}
+
+\item{nt}{total length of time steps, corresponding to the `nt` variable in the `sda.enkf.multisite` function.}
+
+\item{MCMC.args}{arguments for the MCMC sampling, details can be found in the roxygen strucutre for control list in the `sda.enkf.multisite` function.}
+
+\item{block.list.all.pre}{pre-existed block.list.all object for passing the aqq and bqq to the current SDA run, the default is NULL. Details can be found in the roxygen structure for `pre_enkf_params` of the `sda.enkf.multisite` function}
+}
+\value{
+It returns the `build.block.xy` object and the analysis results.
+}
+\description{
+This function provides the block-based MCMC sampling approach.
+}
+\details{
+This function will add data and constants into each block that are needed for the MCMC sampling.
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/block.2.vector.Rd b/modules/assim.sequential/man/block.2.vector.Rd
new file mode 100644
index 00000000000..cf9b1687396
--- /dev/null
+++ b/modules/assim.sequential/man/block.2.vector.Rd
@@ -0,0 +1,24 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{block.2.vector}
+\alias{block.2.vector}
+\title{block.2.vector}
+\usage{
+block.2.vector(block.list, X, H)
+}
+\arguments{
+\item{block.list}{lists of blocks generated by the `build.block.xy` function.}
+
+\item{X}{A matrix contains ensemble forecasts.}
+
+\item{H}{H index created by the `construct_nimble_H` function.}
+}
+\value{
+It returns a list of analysis results by MCMC sampling.
+}
+\description{
+block.2.vector
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/build.block.xy.Rd b/modules/assim.sequential/man/build.block.xy.Rd
new file mode 100644
index 00000000000..261301277d8
--- /dev/null
+++ b/modules/assim.sequential/man/build.block.xy.Rd
@@ -0,0 +1,33 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{build.block.xy}
+\alias{build.block.xy}
+\title{build.block.xy}
+\usage{
+build.block.xy(settings, block.list.all, X, obs.mean, obs.cov, t)
+}
+\arguments{
+\item{settings}{pecan standard multi-site settings list.}
+
+\item{block.list.all}{List contains nt empty sub-elements.}
+
+\item{X}{A matrix contains ensemble forecasts.}
+
+\item{obs.mean}{List of dataframe of observation means, named with observation datetime.}
+
+\item{obs.cov}{List of covariance matrices of state variables , named with observation datetime.}
+
+\item{t}{time point.}
+}
+\value{
+It returns the `build.block.xy` object with data and constants filled in.
+}
+\description{
+This function split long vector and covariance matrix into blocks corresponding to the localization.
+}
+\details{
+This function will add data and constants into each block that are needed for the MCMC sampling.
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/construct_nimble_H.Rd b/modules/assim.sequential/man/construct_nimble_H.Rd
new file mode 100644
index 00000000000..f7705188a1b
--- /dev/null
+++ b/modules/assim.sequential/man/construct_nimble_H.Rd
@@ -0,0 +1,29 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Multi_Site_Constructors.R
+\name{construct_nimble_H}
+\alias{construct_nimble_H}
+\title{construct_nimble_H}
+\usage{
+construct_nimble_H(site.ids, var.names, obs.t, pft.path = NULL, by = "single")
+}
+\arguments{
+\item{site.ids}{a vector name of site ids}
+
+\item{var.names}{vector names of state variable names}
+
+\item{obs.t}{list of vector of means for the time t for different sites.}
+
+\item{pft.path}{physical path to the pft.csv file.}
+
+\item{by}{criteria, it supports by variable, site, pft, all, and single Q.}
+}
+\value{
+Returns one vector containing index for which Q to be estimated for which variable,
+and the other vector gives which state variable has which observation (= element.W.Data).
+}
+\description{
+This function is an upgrade to the Construct.H.multisite function which provides the index by different criteria.
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/matrix_network.Rd b/modules/assim.sequential/man/matrix_network.Rd
new file mode 100644
index 00000000000..d07b08bb94f
--- /dev/null
+++ b/modules/assim.sequential/man/matrix_network.Rd
@@ -0,0 +1,20 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/matrix_operation.R
+\name{matrix_network}
+\alias{matrix_network}
+\title{matrix_network}
+\usage{
+matrix_network(mat)
+}
+\arguments{
+\item{mat}{a boolean matrix representing the interactions between any sites.}
+}
+\value{
+It returns lists of index representing each network.
+}
+\description{
+matrix_network
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/assim.sequential/man/sda.enkf.multisite.Rd b/modules/assim.sequential/man/sda.enkf.multisite.Rd
index 3f216df4803..9c3560b3e89 100644
--- a/modules/assim.sequential/man/sda.enkf.multisite.Rd
+++ b/modules/assim.sequential/man/sda.enkf.multisite.Rd
@@ -10,49 +10,42 @@ sda.enkf.multisite(
obs.cov,
Q = NULL,
restart = NULL,
- forceRun = TRUE,
- keepNC = TRUE,
pre_enkf_params = NULL,
- run_parallel = TRUE,
ensemble.samples = NULL,
- parallel_qsub = TRUE,
- free_run = FALSE,
- send_email = NULL,
- control = list(trace = TRUE, FF = FALSE, interactivePlot = FALSE, TimeseriesPlot =
- FALSE, BiasPlot = FALSE, plot.title = NULL, facet.plots = FALSE, debug = FALSE, pause
- = FALSE, Profiling = FALSE, OutlierDetection = FALSE),
+ control = list(trace = TRUE, TimeseriesPlot = FALSE, debug = FALSE, pause = FALSE,
+ Profiling = FALSE, OutlierDetection = FALSE, parallel_qsub = TRUE, send_email = NULL,
+ keepNC = TRUE, forceRun = TRUE, run_parallel = TRUE, MCMC.args = NULL),
...
)
}
\arguments{
\item{settings}{PEcAn settings object}
-\item{obs.mean}{List of dataframe of observation means, named with observation datetime.}
+\item{obs.mean}{Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation means for each state variables of each site for each time point.}
-\item{obs.cov}{List of covariance matrices of state variables , named with observation datetime.}
+\item{obs.cov}{Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point.}
\item{Q}{Process covariance matrix given if there is no data to estimate it.}
-\item{restart}{Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA}
+\item{restart}{Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA.}
-\item{forceRun}{Used to force job.sh files that were not run for ensembles in SDA (quick fix)}
-
-\item{keepNC}{Used for debugging issues. .nc files are usually removed after each year in the out folder. This flag will keep the .nc + .nc.var files for futher investigations.}
-
-\item{pre_enkf_params}{Used for carrying out SDA with pre-existed enkf.params, in which the Pf, aqq, and bqq can be used for the analysis step.}
-
-\item{run_parallel}{If allows to proceed under parallel mode, default is TRUE.}
+\item{pre_enkf_params}{Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors.}
\item{ensemble.samples}{Pass ensemble.samples from outside to avoid GitHub check issues.}
-\item{parallel_qsub}{Bool variable decide if you want to submit the bash jobs under the parallel mode, the default value is TRUE.}
-
-\item{free_run}{Bool variable decide if the run is a free run with no analysis been used, the default value is FALSE.}
-
-\item{send_email}{List object containing the "from", "to", and "body", the default value is NULL.}
-
-\item{control}{List of flags controlling the behaviour of the SDA. trace for reporting back the SDA outcomes, interactivePlot for plotting the outcomes after each step,
-TimeseriesPlot for post analysis examination, BiasPlot for plotting the correlation between state variables, plot.title is the title of post analysis plots and debug mode allows for pausing the code and examining the variables inside the function.}
+\item{control}{List of flags controlling the behavior of the SDA.
+`trace` for reporting back the SDA outcomes;
+`TimeseriesPlot` for post analysis examination;
+`debug` decide if we want to pause the code and examining the variables inside the function;
+`pause` decide if we want to pause the SDA workflow at current time point t;
+`Profiling` decide if we want to export the temporal SDA outputs in CSV file;
+`OutlierDetection` decide if we want to execute the outlier detection each time after the model forecasting;
+`parallel_qsub` decide if we want to execute the `qsub` job submission under parallel mode;
+`send_email` contains lists for sending email to report the SDA progress;
+`keepNC` decide if we want to keep the NetCDF files inside the out directory;
+`forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations;
+`run_parallel` decide if we want to run the SDA under parallel mode for the `future_map` function;
+`MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.).}
}
\value{
NONE
diff --git a/modules/assim.sequential/man/update_q.Rd b/modules/assim.sequential/man/update_q.Rd
new file mode 100644
index 00000000000..c342c7bbe56
--- /dev/null
+++ b/modules/assim.sequential/man/update_q.Rd
@@ -0,0 +1,40 @@
+% Generated by roxygen2: do not edit by hand
+% Please edit documentation in R/Analysis_sda_block.R
+\name{update_q}
+\alias{update_q}
+\title{update_q}
+\usage{
+update_q(
+ block.list.all,
+ t,
+ nt,
+ aqq.Init = NULL,
+ bqq.Init = NULL,
+ MCMC_dat = NULL,
+ block.list.all.pre = NULL
+)
+}
+\arguments{
+\item{block.list.all}{each block within the `block.list` lists.}
+
+\item{t}{time point.}
+
+\item{nt}{total length of time steps.}
+
+\item{aqq.Init}{the initial values of aqq, the default is NULL.}
+
+\item{bqq.Init}{the initial values of bqq, the default is NULL.}
+
+\item{MCMC_dat}{data frame of MCMC samples, the default it NULL.}
+
+\item{block.list.all.pre}{pre-existed block.list.all object for passing the aqq and bqq to the current SDA run, the default is NULL.}
+}
+\value{
+It returns the `block.list.all` object with initialized/updated Q filled in.
+}
+\description{
+update_q
+}
+\author{
+Dongchen Zhang
+}
diff --git a/modules/data.land/R/pool_ic_netcdf2list.R b/modules/data.land/R/pool_ic_netcdf2list.R
index acf2a3850d3..871d3132e78 100644
--- a/modules/data.land/R/pool_ic_netcdf2list.R
+++ b/modules/data.land/R/pool_ic_netcdf2list.R
@@ -20,6 +20,7 @@ pool_ic_netcdf2list <- function(nc.path){
for(varname in names(vals)){
vals[[varname]] <- ncdf4::ncvar_get(IC.nc,varname)
}
+ on.exit(ncdf4::nc_close(IC.nc), add = FALSE)
return(list(dims = dims, vals = vals))
}
else{