diff --git a/Rplots.pdf b/Rplots.pdf
index 7d8b34f..e69de29 100644
Binary files a/Rplots.pdf and b/Rplots.pdf differ
diff --git a/ToDo.md b/ToDo.md
index 11386b4..d244083 100644
--- a/ToDo.md
+++ b/ToDo.md
@@ -1,4 +1,6 @@
-Running IDA with Host, Dye and KaHD all set to 0 in batch mode results in
-Error: ! Could not evaluate the loss function --> is not shown to user
-
---> The error messages from the callr r_bg have to be catched and reported
+- [x] interrupt partially run batch process (first optimisation not yet finished)
+- [x] interrupt partially run batch process (at least one dataset finished)
+- [x] interrupt batch process within initialisation phase
+- [x] adapt plotting functions
+ * [x] plotly functionality
+ * [x] ggplot functionality
diff --git a/tsf/R/BatchPlotting.R b/tsf/R/BatchPlotting.R
index 2f6e8f8..b6a2792 100644
--- a/tsf/R/BatchPlotting.R
+++ b/tsf/R/BatchPlotting.R
@@ -129,16 +129,9 @@ combinePlots <- function(p1, p2, p3, index, base_size = 6) {
return(p)
}
-plotStates <- function(list, num_rep = 1) {
+plotStates <- function(list) {
base_size <- baseSize()
list <- list[[1]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
groups <- unique(df$dataset)
plot_list <- lapply(groups, function(x) {
@@ -151,15 +144,8 @@ plotStates <- function(list, num_rep = 1) {
return(plot_list)
}
-plotParams <- function(list, num_rep = 1, base_size = 12) {
+plotParams <- function(list) {
list <- list[[2]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
data <- data.frame(
x = rep(df[, 5], 4),
@@ -172,56 +158,33 @@ plotParams <- function(list, num_rep = 1, base_size = 12) {
),
repetition = rep(df$repetition, 4)
)
- if (num_rep > 1) {
- p <- ggplot() +
- geom_boxplot(
- data = data,
- aes(
- y = y, fill = "Entire data", x = factor(0)
- )
- ) +
- geom_boxplot(
- data = data,
- aes(
- x = factor(x), y = y,
- group = factor(x),
- fill = factor(x)
- )
- ) +
- facet_wrap(. ~ names,
- scales = "free_y",
- strip.position = "left"
- ) +
- xlab(NULL) +
- ylab(NULL) +
- theme(
- panel.spacing = unit(2, "lines"),
- strip.background = element_blank(),
- strip.placement = "outside"
- ) +
- guides(fill = guide_legend(title = "Datasets"))
- } else {
- p <- ggplot() +
- geom_boxplot(
- data = data,
- aes(
- x = factor(x),
- y = y,
- group = names
- )
- ) +
- facet_wrap(~names,
- scales = "free_y",
- strip.position = "left"
- ) +
- xlab(NULL) +
- ylab(NULL) +
- theme(
- panel.spacing = unit(2, "lines"),
- strip.background = element_blank(),
- strip.placement = "outside"
+ p <- ggplot() +
+ geom_boxplot(
+ data = data,
+ aes(
+ y = y, fill = "Entire data", x = factor(0)
+ )
+ ) +
+ geom_boxplot(
+ data = data,
+ aes(
+ x = factor(x), y = y,
+ group = factor(x),
+ fill = factor(x)
)
- }
+ ) +
+ facet_wrap(. ~ names,
+ scales = "free_y",
+ strip.position = "left"
+ ) +
+ xlab(NULL) +
+ ylab(NULL) +
+ theme(
+ panel.spacing = unit(2, "lines"),
+ strip.background = element_blank(),
+ strip.placement = "outside"
+ ) +
+ guides(fill = guide_legend(title = "Datasets"))
p <- addTheme(p)
p <- p + theme(
plot.background = element_rect(color = "grey", fill = NA, size = 2)
@@ -229,15 +192,8 @@ plotParams <- function(list, num_rep = 1, base_size = 12) {
return(p)
}
-plotMetrices <- function(list, num_rep = 1, base_size = 12) {
+plotMetrices <- function(list) {
list <- list[[3]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
data <- data.frame(
x = rep(df[, 6], 5),
@@ -251,56 +207,33 @@ plotMetrices <- function(list, num_rep = 1, base_size = 12) {
),
repetition = rep(df$repetition, 5)
)
- if (num_rep > 1) {
- p <- ggplot() +
- geom_boxplot(
- data = data,
- aes(
- y = y, fill = "Entire data", x = factor(0)
- )
- ) +
- geom_boxplot(
- data = data,
- aes(
- x = factor(x), y = y,
- group = factor(x),
- fill = factor(x)
- )
- ) +
- facet_wrap(. ~ names,
- scales = "free_y",
- strip.position = "left"
- ) +
- xlab(NULL) +
- ylab(NULL) +
- theme(
- panel.spacing = unit(2, "lines"),
- strip.background = element_blank(),
- strip.placement = "outside"
- ) +
- guides(fill = guide_legend(title = "Datasets"))
- } else {
- p <- ggplot() +
- geom_boxplot(
- data = data,
- aes(
- x = factor(x),
- y = y,
- group = names
- )
- ) +
- facet_wrap(~names,
- scales = "free_y",
- strip.position = "left"
- ) +
- xlab(NULL) +
- ylab(NULL) +
- theme(
- panel.spacing = unit(2, "lines"),
- strip.background = element_blank(),
- strip.placement = "outside"
+ p <- ggplot() +
+ geom_boxplot(
+ data = data,
+ aes(
+ y = y, fill = "Entire data", x = factor(0)
+ )
+ ) +
+ geom_boxplot(
+ data = data,
+ aes(
+ x = factor(x), y = y,
+ group = factor(x),
+ fill = factor(x)
)
- }
+ ) +
+ facet_wrap(. ~ names,
+ scales = "free_y",
+ strip.position = "left"
+ ) +
+ xlab(NULL) +
+ ylab(NULL) +
+ theme(
+ panel.spacing = unit(2, "lines"),
+ strip.background = element_blank(),
+ strip.placement = "outside"
+ ) +
+ guides(fill = guide_legend(title = "Datasets"))
p <- addTheme(p)
p <- p + theme(
plot.background = element_rect(color = "grey", fill = NA, size = 2)
diff --git a/tsf/R/BatchPlottingPlotly.R b/tsf/R/BatchPlottingPlotly.R
index 84127a0..b423f78 100644
--- a/tsf/R/BatchPlottingPlotly.R
+++ b/tsf/R/BatchPlottingPlotly.R
@@ -113,15 +113,8 @@ plotHostDyePlotly <- function(df, Dataset) {
return(p)
}
-plotStatesPlotly <- function(list, num_rep = 1, ncols = 4) {
+plotStatesPlotly <- function(list) {
list <- list[[1]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
groups <- unique(df$dataset)
plot_list <- lapply(groups, function(x) {
@@ -187,15 +180,8 @@ plotStatesPlotly <- function(list, num_rep = 1, ncols = 4) {
return(p)
}
-plotParamsPlotly <- function(list, num_rep = 1) {
+plotParamsPlotly <- function(list) {
list <- list[[2]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
data <- data.frame(
x = rep(df$dataset, 4),
@@ -240,15 +226,8 @@ plotParamsPlotly <- function(list, num_rep = 1) {
return(p)
}
-plotMetricesPlotly <- function(list, num_rep = 1, base_size = 12) {
+plotMetricesPlotly <- function(list) {
list <- list[[3]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
data <- data.frame(
x = rep(df[, 6], 5),
@@ -295,12 +274,12 @@ plotMetricesPlotly <- function(list, num_rep = 1, base_size = 12) {
}
-entirePlotPlotly <- function(list, num_rep = 1, ncols = 4) {
- states <- plotStatesPlotly(list, num_rep, ncols)
- params <- plotParamsPlotly(list, num_rep)
- metrices <- plotMetricesPlotly(list, num_rep)
+entirePlotPlotly <- function(list) {
+ states <- plotStatesPlotly(list)
+ params <- plotParamsPlotly(list)
+ metrices <- plotMetricesPlotly(list)
subplot(states, params, metrices, nrows = 1,
- shareX = FALSE, titleX = TRUE, titleY = TRUE,
- widths = c(0.7, 0.15, 0.15),
- margin = 0.02)
+ shareX = FALSE, titleX = TRUE, titleY = TRUE,
+ widths = c(0.7, 0.15, 0.15),
+ margin = 0.02)
}
diff --git a/tsf/R/DBA_Server.R b/tsf/R/DBA_Server.R
index 344f24a..c41195c 100644
--- a/tsf/R/DBA_Server.R
+++ b/tsf/R/DBA_Server.R
@@ -32,6 +32,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked <- reactiveVal(FALSE)
setup_done <- reactiveVal(FALSE)
+ # NOTE: Start of model specific code
+ # ===============================================================================
check_inputs <- function() {
rwn(input$D0 != "", "Please enter a value for the Dye")
rwn(!is.na(input$npop),
@@ -139,6 +141,9 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
"DBAupdateFieldBatch"
}
+ # NOTE: End of model specific code
+ # ===============================================================================
+
get_opti_result <- function() {
opti_result()$parameter
}
@@ -487,11 +492,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
# Batch analysis
# ===============================================================================
setup_batch_done <- reactiveVal(FALSE)
- result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL)
batch_results_created <- reactiveVal(FALSE)
cancel_batch_clicked <- reactiveVal(FALSE)
num_rep_batch <- reactiveVal()
stdout <- reactiveVal(NULL)
+ task_queue <- reactiveVal(NULL)
+ result_batch <- reactiveVal()
batch_message <-function(message) {
session$sendCustomMessage(
@@ -514,6 +520,18 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
length(df_list()) > 0,
"The dataset list seems to be empty. Please upload a file"
)
+ rwn( # TODO: update also other server code
+ !is_integer(input$NumCores),
+ "Please provide an integer entry for number of cores"
+ )
+ }
+
+ get_num_core <- function() {
+ res <- convert_num_to_int(input$NumCores)
+ if(res == 0) {
+ res <- 1
+ }
+ return(res)
}
observeEvent(input$Start_Batch, {
@@ -532,6 +550,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
ngen <- create_ngen()
topo <- create_topology()
et <- create_error_threshold()
+ num_cores <- get_num_core()
# check seed case
seed <- input$Seed
num_rep <- as.integer(input$NumRepDataset)
@@ -542,7 +561,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed_origin <- seed
}
# clear everything
- stdout (NULL)
+ stdout(NULL)
+ result_batch(NULL)
invalid_time(1100)
setup_batch_done(FALSE)
batch_results_created(FALSE)
@@ -558,10 +578,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
})
size <- length(df_list()) * num_rep
- stdout(character(size))
- request_cores(size, session$token)
+ if (num_cores > size) {
+ num_cores <- size
+ }
+ stdout(character(num_cores))
+ request_cores(num_cores, session$token)
nclicks(nclicks() + 1)
- process_list <- vector("list", size)
seeds <- numeric(size)
seeds_from <- 1:1e6
session$sendCustomMessage(
@@ -569,6 +591,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
list(message = "Initializing...")
)
+ # 1. create seeds in loop
for (i in seq_len(size)) {
if (seed_case == 1) {
seed <- sample(seeds_from, 1)
@@ -579,35 +602,57 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed <- sample(seeds_from, 1)
}
} else if (seed_case == 2) {
- seed <- seed
+ seed <- seed # TODO: check is this correct
}
seeds[i] <- seed
- process_list[[i]] <- callr::r_bg(
- function(case, lb, ub, df, ap,
- seed, npop, ngen, Topology, errorThreshold) {
- res <- tsf::opti(
- case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold
- )
- return(res)
- },
- args = list(
- get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]],
- additionalParameters, seed, npop, ngen, topo, et
- )
- )
}
- result_val_batch$result <- process_list
+ # 2. Create message lists for each process
+ messages <- character(size)
+ counter_messages <- 1
+ for (i in seq_len(length(df_list()))) {
+ for (j in seq_len(num_rep)) {
+ messages[counter_messages] <-
+ paste0("Dataset = ", i, "; Replicate = ", j)
+ counter_messages <- counter_messages + 1
+ }
+ }
+
+ # 3. Fill task queue
+ groups <- ceiling(1:size / (size / num_cores))
+ dfs <- df_list()
+ if (length(groups) > length(dfs)) {
+ counter <- 1
+ while (length(groups) > length(dfs)) {
+ dfs <- c(dfs, dfs[counter])
+ counter <- counter + 1
+ }
+ }
+ task_queue(TaskQueue$new(
+ get_Model(),
+ lb, ub, dfs,
+ additionalParameters, seeds,
+ npop, ngen, topo, et,
+ messages, num_cores
+ ))
+
+ # 4. assign tasks
+ task_queue()$assign()
+
setup_batch_done(TRUE)
NULL
})
batch_process_done <- function() {
req(setup_batch_done())
- req(length(result_val_batch$result) > 0)
- lapply(result_val_batch$result, function(x) {
- if (x$is_alive()) req(x$get_status() != "running")
- })
+ req(!is.null(task_queue()))
+ if (task_queue()$check() &&
+ !task_queue()$queue_empty()) {
+ task_queue()$assign()
+ }
+ if (!task_queue()$queue_empty()) {
+ return(FALSE)
+ }
invalid_time(invalid_time() + 1000)
nclicks(0)
return(TRUE)
@@ -618,7 +663,6 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked_batch = TRUE
)
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
cancel_batch_clicked(TRUE)
})
@@ -626,43 +670,20 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
+ req(!is.null(task_queue()))
+ req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
+ task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
- lapply(result_val_batch$result, function(process) {
- process$interrupt()
- process$wait()
- })
- session$sendCustomMessage(
- type = get_update_field_batch(),
- list(message = "")
- )
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
- # check status
- # NOTE: errors are not printed otherwise screen is full of errors
- counter_dataset <- 0
- counter_rep <- 0
- temp_status <- character(length(result_val_batch$result))
- for (i in seq_along(temp_status)) {
- if (((i - 1) %% num_rep_batch()) == 0) {
- counter_dataset <- counter_dataset + 1
- counter_rep <- 1
- } else {
- counter_rep <- counter_rep + 1
- }
- temp_status[i] <- print_status(
- result_val_batch$result[[i]]$read_output(),
- counter_dataset,
- counter_rep,
- get_Model()
- )
- }
- stdout(format_batch_status(stdout(), temp_status))
+ # NOTE: check status
+ # (errors are not printed otherwise screen is full of errors)
+ stdout(task_queue()$get_status(stdout()))
bind <- function(a, b) {
if (is.null(a) && is.null(b)) {
return("Initialisation")
@@ -686,37 +707,33 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
)
})
- plot_data <- reactive({
- values <- try(lapply(result_val_batch$result, function(process) {
- process$get_result()
- }))
+ get_data <- reactive({
+ values <- try({
+ task_queue()$seperate_results()
+ })
if (inherits(values, "try-error")) {
- result_val_batch$result_splitted <- NULL
batch_message("")
print_error("Error in background process")
- } else {
- result_val_batch$result_splitted <- seperate_batch_results(values)
}
- lapply(result_val_batch$result, function(process) {
- process$kill()
- })
+ task_queue()$kill()
send_and_read_info(paste0("release: ", session$token))
- result_val_batch$result <- NULL
})
# observe results
observe({
invalidateLater(invalid_time())
if (batch_process_done() && !batch_results_created()) {
- plot_data()
+ get_data()
batch_results_created(TRUE)
stdout(NULL)
+ values <- task_queue()$results
+ result_batch(values)
output$batch_data_plot <- renderPlotly({
entirePlotPlotly(
- result_val_batch$result_splitted,
- num_rep_batch(), ncols = 4
+ values
)
})
+ task_queue(NULL)
}
})
@@ -726,13 +743,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
},
content = function(file) {
req(batch_results_created())
- req(length(result_val_batch$result_splitted) > 0)
-
+ req(!is.null(result_batch()))
+ values <- result_batch()
download_batch_file(
get_Model_capital(),
file,
- result_val_batch$result_splitted,
- num_rep_batch()
+ values
)
}
)
diff --git a/tsf/R/DBA_UI.R b/tsf/R/DBA_UI.R
index 8fc7da9..6e015cb 100644
--- a/tsf/R/DBA_UI.R
+++ b/tsf/R/DBA_UI.R
@@ -160,6 +160,11 @@ dbaUI <- function(id) {
"How often should each dataset be analysed (using different seeds)",
value = 1
),
+ numericInput(NS(id, "NumCores"),
+ min = 1, max = 20,
+ "How many cores should be used for the batch analysis?",
+ value = 1
+ ),
actionButton(NS(id, "Start_Batch"), "Start batch analysis"),
actionButton(NS(id, "cancel_Batch"), "Stop optimization"),
downloadButton(NS(id, "batch_download"), "Save result of batch analysis"),
diff --git a/tsf/R/GDA_Server.R b/tsf/R/GDA_Server.R
index ec52ea6..270193e 100644
--- a/tsf/R/GDA_Server.R
+++ b/tsf/R/GDA_Server.R
@@ -31,6 +31,9 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked <- reactiveVal(FALSE)
setup_done <- reactiveVal(FALSE)
+ # NOTE: Start of model specific code
+ # ===============================================================================
+
check_inputs <- function() {
rwn(input$H0 != "", "Please enter a value for the Host")
rwn(input$G0 != "", "Please enter a value for the Guest")
@@ -144,6 +147,9 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
"GDAupdateFieldBatch"
}
+ # NOTE: End of model specific code
+ # ===============================================================================
+
get_opti_result <- function() {
opti_result()$parameter
}
@@ -492,11 +498,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
# Batch analysis
# ===============================================================================
setup_batch_done <- reactiveVal(FALSE)
- result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL)
batch_results_created <- reactiveVal(FALSE)
cancel_batch_clicked <- reactiveVal(FALSE)
num_rep_batch <- reactiveVal()
stdout <- reactiveVal(NULL)
+ task_queue <- reactiveVal(NULL)
+ result_batch <- reactiveVal()
batch_message <-function(message) {
session$sendCustomMessage(
@@ -519,6 +526,18 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
length(df_list()) > 0,
"The dataset list seems to be empty. Please upload a file"
)
+ rwn( # TODO: update also other server code
+ !is_integer(input$NumCores),
+ "Please provide an integer entry for number of cores"
+ )
+ }
+
+ get_num_core <- function() {
+ res <- convert_num_to_int(input$NumCores)
+ if(res == 0) {
+ res <- 1
+ }
+ return(res)
}
observeEvent(input$Start_Batch, {
@@ -537,6 +556,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
ngen <- create_ngen()
topo <- create_topology()
et <- create_error_threshold()
+ num_cores <- get_num_core()
# check seed case
seed <- input$Seed
num_rep <- as.integer(input$NumRepDataset)
@@ -547,7 +567,8 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed_origin <- seed
}
# clear everything
- stdout (NULL)
+ stdout(NULL)
+ result_batch(NULL)
invalid_time(1100)
setup_batch_done(FALSE)
batch_results_created(FALSE)
@@ -563,10 +584,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
})
size <- length(df_list()) * num_rep
- stdout(character(size))
- request_cores(size, session$token)
+ if (num_cores > size) {
+ num_cores <- size
+ }
+ stdout(character(num_cores))
+ request_cores(num_cores, session$token)
nclicks(nclicks() + 1)
- process_list <- vector("list", size)
seeds <- numeric(size)
seeds_from <- 1:1e6
session$sendCustomMessage(
@@ -574,6 +597,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
list(message = "Initializing...")
)
+ # 1. create seeds in loop
for (i in seq_len(size)) {
if (seed_case == 1) {
seed <- sample(seeds_from, 1)
@@ -584,35 +608,57 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed <- sample(seeds_from, 1)
}
} else if (seed_case == 2) {
- seed <- seed
+ seed <- seed # TODO: check is this correct
}
seeds[i] <- seed
- process_list[[i]] <- callr::r_bg(
- function(case, lb, ub, df, ap,
- seed, npop, ngen, Topology, errorThreshold) {
- res <- tsf::opti(
- case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold
- )
- return(res)
- },
- args = list(
- get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]],
- additionalParameters, seed, npop, ngen, topo, et
- )
- )
}
- result_val_batch$result <- process_list
+ # 2. Create message lists for each process
+ messages <- character(size)
+ counter_messages <- 1
+ for (i in seq_len(length(df_list()))) {
+ for (j in seq_len(num_rep)) {
+ messages[counter_messages] <-
+ paste0("Dataset = ", i, "; Replicate = ", j)
+ counter_messages <- counter_messages + 1
+ }
+ }
+
+ # 3. Fill task queue
+ groups <- ceiling(1:size / (size / num_cores))
+ dfs <- df_list()
+ if (length(groups) > length(dfs)) {
+ counter <- 1
+ while (length(groups) > length(dfs)) {
+ dfs <- c(dfs, dfs[counter])
+ counter <- counter + 1
+ }
+ }
+ task_queue(TaskQueue$new(
+ get_Model(),
+ lb, ub, dfs,
+ additionalParameters, seeds,
+ npop, ngen, topo, et,
+ messages, num_cores
+ ))
+
+ # 4. assign tasks
+ task_queue()$assign()
+
setup_batch_done(TRUE)
NULL
})
batch_process_done <- function() {
req(setup_batch_done())
- req(length(result_val_batch$result) > 0)
- lapply(result_val_batch$result, function(x) {
- if (x$is_alive()) req(x$get_status() != "running")
- })
+ req(!is.null(task_queue()))
+ if (task_queue()$check() &&
+ !task_queue()$queue_empty()) {
+ task_queue()$assign()
+ }
+ if (!task_queue()$queue_empty()) {
+ return(FALSE)
+ }
invalid_time(invalid_time() + 1000)
nclicks(0)
return(TRUE)
@@ -623,7 +669,6 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked_batch = TRUE
)
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
cancel_batch_clicked(TRUE)
})
@@ -631,43 +676,20 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
+ req(!is.null(task_queue()))
+ req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
+ task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
- lapply(result_val_batch$result, function(process) {
- process$interrupt()
- process$wait()
- })
- session$sendCustomMessage(
- type = get_update_field_batch(),
- list(message = "")
- )
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
- # check status
- # NOTE: errors are not printed otherwise screen is full of errors
- counter_dataset <- 0
- counter_rep <- 0
- temp_status <- character(length(result_val_batch$result))
- for (i in seq_along(temp_status)) {
- if (((i - 1) %% num_rep_batch()) == 0) {
- counter_dataset <- counter_dataset + 1
- counter_rep <- 1
- } else {
- counter_rep <- counter_rep + 1
- }
- temp_status[i] <- print_status(
- result_val_batch$result[[i]]$read_output(),
- counter_dataset,
- counter_rep,
- get_Model()
- )
- }
- stdout(format_batch_status(stdout(), temp_status))
+ # NOTE: check status
+ # (errors are not printed otherwise screen is full of errors)
+ stdout(task_queue()$get_status(stdout()))
bind <- function(a, b) {
if (is.null(a) && is.null(b)) {
return("Initialisation")
@@ -691,37 +713,33 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
)
})
- plot_data <- reactive({
- values <- try(lapply(result_val_batch$result, function(process) {
- process$get_result()
- }))
+ get_data <- reactive({
+ values <- try({
+ task_queue()$seperate_results()
+ })
if (inherits(values, "try-error")) {
- result_val_batch$result_splitted <- NULL
batch_message("")
print_error("Error in background process")
- } else {
- result_val_batch$result_splitted <- seperate_batch_results(values)
}
- lapply(result_val_batch$result, function(process) {
- process$kill()
- })
+ task_queue()$kill()
send_and_read_info(paste0("release: ", session$token))
- result_val_batch$result <- NULL
})
# observe results
observe({
invalidateLater(invalid_time())
if (batch_process_done() && !batch_results_created()) {
- plot_data()
+ get_data()
batch_results_created(TRUE)
stdout(NULL)
+ values <- task_queue()$results
+ result_batch(values)
output$batch_data_plot <- renderPlotly({
entirePlotPlotly(
- result_val_batch$result_splitted,
- num_rep_batch(), ncols = 4
+ values
)
})
+ task_queue(NULL)
}
})
@@ -731,13 +749,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
},
content = function(file) {
req(batch_results_created())
- req(length(result_val_batch$result_splitted) > 0)
-
+ req(!is.null(result_batch()))
+ values <- result_batch()
download_batch_file(
get_Model_capital(),
file,
- result_val_batch$result_splitted,
- num_rep_batch()
+ values
)
}
)
diff --git a/tsf/R/GDA_UI.R b/tsf/R/GDA_UI.R
index 62f34c2..860d58f 100644
--- a/tsf/R/GDA_UI.R
+++ b/tsf/R/GDA_UI.R
@@ -160,6 +160,11 @@ gdaUI <- function(id) {
"How often should each dataset be analysed (using different seeds)",
value = 1
),
+ numericInput(NS(id, "NumCores"),
+ min = 1, max = 20,
+ "How many cores should be used for the batch analysis?",
+ value = 1
+ ),
actionButton(NS(id, "Start_Batch"), "Start batch analysis"),
actionButton(NS(id, "cancel_Batch"), "Stop optimization"),
downloadButton(NS(id, "batch_download"), "Save result of batch analysis"),
diff --git a/tsf/R/HG_Server.R b/tsf/R/HG_Server.R
index 78f3a5c..061c59f 100644
--- a/tsf/R/HG_Server.R
+++ b/tsf/R/HG_Server.R
@@ -31,6 +31,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked <- reactiveVal(FALSE)
setup_done <- reactiveVal(FALSE)
+ # NOTE: Start of model specific code
+ # ===============================================================================
check_inputs <- function() {
rwn(input$H0 != "", "Please enter a value for the Host")
rwn(!is.na(input$npop),
@@ -138,6 +140,9 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
"HGupdateFieldBatch"
}
+ # NOTE: End of model specific code
+ # ===============================================================================
+
get_opti_result <- function() {
opti_result()$parameter
}
@@ -486,11 +491,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
# Batch analysis
# ===============================================================================
setup_batch_done <- reactiveVal(FALSE)
- result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL)
batch_results_created <- reactiveVal(FALSE)
cancel_batch_clicked <- reactiveVal(FALSE)
num_rep_batch <- reactiveVal()
stdout <- reactiveVal(NULL)
+ task_queue <- reactiveVal(NULL)
+ result_batch <- reactiveVal()
batch_message <-function(message) {
session$sendCustomMessage(
@@ -513,6 +519,18 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
length(df_list()) > 0,
"The dataset list seems to be empty. Please upload a file"
)
+ rwn( # TODO: update also other server code
+ !is_integer(input$NumCores),
+ "Please provide an integer entry for number of cores"
+ )
+ }
+
+ get_num_core <- function() {
+ res <- convert_num_to_int(input$NumCores)
+ if(res == 0) {
+ res <- 1
+ }
+ return(res)
}
observeEvent(input$Start_Batch, {
@@ -531,6 +549,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
ngen <- create_ngen()
topo <- create_topology()
et <- create_error_threshold()
+ num_cores <- get_num_core()
# check seed case
seed <- input$Seed
num_rep <- as.integer(input$NumRepDataset)
@@ -541,7 +560,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed_origin <- seed
}
# clear everything
- stdout (NULL)
+ stdout(NULL)
+ result_batch(NULL)
invalid_time(1100)
setup_batch_done(FALSE)
batch_results_created(FALSE)
@@ -557,10 +577,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
})
size <- length(df_list()) * num_rep
- stdout(character(size))
- request_cores(size, session$token)
+ if (num_cores > size) {
+ num_cores <- size
+ }
+ stdout(character(num_cores))
+ request_cores(num_cores, session$token)
nclicks(nclicks() + 1)
- process_list <- vector("list", size)
seeds <- numeric(size)
seeds_from <- 1:1e6
session$sendCustomMessage(
@@ -568,6 +590,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
list(message = "Initializing...")
)
+ # 1. create seeds in loop
for (i in seq_len(size)) {
if (seed_case == 1) {
seed <- sample(seeds_from, 1)
@@ -578,35 +601,57 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
seed <- sample(seeds_from, 1)
}
} else if (seed_case == 2) {
- seed <- seed
+ seed <- seed # TODO: check is this correct
}
seeds[i] <- seed
- process_list[[i]] <- callr::r_bg(
- function(case, lb, ub, df, ap,
- seed, npop, ngen, Topology, errorThreshold) {
- res <- tsf::opti(
- case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold
- )
- return(res)
- },
- args = list(
- get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]],
- additionalParameters, seed, npop, ngen, topo, et
- )
- )
}
- result_val_batch$result <- process_list
+ # 2. Create message lists for each process
+ messages <- character(size)
+ counter_messages <- 1
+ for (i in seq_len(length(df_list()))) {
+ for (j in seq_len(num_rep)) {
+ messages[counter_messages] <-
+ paste0("Dataset = ", i, "; Replicate = ", j)
+ counter_messages <- counter_messages + 1
+ }
+ }
+
+ # 3. Fill task queue
+ groups <- ceiling(1:size / (size / num_cores))
+ dfs <- df_list()
+ if (length(groups) > length(dfs)) {
+ counter <- 1
+ while (length(groups) > length(dfs)) {
+ dfs <- c(dfs, dfs[counter])
+ counter <- counter + 1
+ }
+ }
+ task_queue(TaskQueue$new(
+ get_Model(),
+ lb, ub, dfs,
+ additionalParameters, seeds,
+ npop, ngen, topo, et,
+ messages, num_cores
+ ))
+
+ # 4. assign tasks
+ task_queue()$assign()
+
setup_batch_done(TRUE)
NULL
})
batch_process_done <- function() {
req(setup_batch_done())
- req(length(result_val_batch$result) > 0)
- lapply(result_val_batch$result, function(x) {
- if (x$is_alive()) req(x$get_status() != "running")
- })
+ req(!is.null(task_queue()))
+ if (task_queue()$check() &&
+ !task_queue()$queue_empty()) {
+ task_queue()$assign()
+ }
+ if (!task_queue()$queue_empty()) {
+ return(FALSE)
+ }
invalid_time(invalid_time() + 1000)
nclicks(0)
return(TRUE)
@@ -617,7 +662,6 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked_batch = TRUE
)
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
cancel_batch_clicked(TRUE)
})
@@ -625,43 +669,20 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
+ req(!is.null(task_queue()))
+ req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
+ task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
- lapply(result_val_batch$result, function(process) {
- process$interrupt()
- process$wait()
- })
- session$sendCustomMessage(
- type = get_update_field_batch(),
- list(message = "")
- )
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
- # check status
- # NOTE: errors are not printed otherwise screen is full of errors
- counter_dataset <- 0
- counter_rep <- 0
- temp_status <- character(length(result_val_batch$result))
- for (i in seq_along(temp_status)) {
- if (((i - 1) %% num_rep_batch()) == 0) {
- counter_dataset <- counter_dataset + 1
- counter_rep <- 1
- } else {
- counter_rep <- counter_rep + 1
- }
- temp_status[i] <- print_status(
- result_val_batch$result[[i]]$read_output(),
- counter_dataset,
- counter_rep,
- get_Model()
- )
- }
- stdout(format_batch_status(stdout(), temp_status))
+ # NOTE: check status
+ # (errors are not printed otherwise screen is full of errors)
+ stdout(task_queue()$get_status(stdout()))
bind <- function(a, b) {
if (is.null(a) && is.null(b)) {
return("Initialisation")
@@ -685,37 +706,33 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
)
})
- plot_data <- reactive({
- values <- try(lapply(result_val_batch$result, function(process) {
- process$get_result()
- }))
+ get_data <- reactive({
+ values <- try({
+ task_queue()$seperate_results()
+ })
if (inherits(values, "try-error")) {
- result_val_batch$result_splitted <- NULL
batch_message("")
print_error("Error in background process")
- } else {
- result_val_batch$result_splitted <- seperate_batch_results(values)
}
- lapply(result_val_batch$result, function(process) {
- process$kill()
- })
+ task_queue()$kill()
send_and_read_info(paste0("release: ", session$token))
- result_val_batch$result <- NULL
})
# observe results
observe({
invalidateLater(invalid_time())
if (batch_process_done() && !batch_results_created()) {
- plot_data()
+ get_data()
batch_results_created(TRUE)
stdout(NULL)
+ values <- task_queue()$results
+ result_batch(values)
output$batch_data_plot <- renderPlotly({
entirePlotPlotly(
- result_val_batch$result_splitted,
- num_rep_batch(), ncols = 4
+ values
)
})
+ task_queue(NULL)
}
})
@@ -725,13 +742,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) {
},
content = function(file) {
req(batch_results_created())
- req(length(result_val_batch$result_splitted) > 0)
-
+ req(!is.null(result_batch()))
+ values <- result_batch()
download_batch_file(
get_Model_capital(),
file,
- result_val_batch$result_splitted,
- num_rep_batch()
+ values
)
}
)
diff --git a/tsf/R/HG_UI.R b/tsf/R/HG_UI.R
index a72b835..b95f4de 100644
--- a/tsf/R/HG_UI.R
+++ b/tsf/R/HG_UI.R
@@ -163,6 +163,11 @@ hgUI <- function(id) {
"How often should each dataset be analysed (using different seeds)",
value = 1
),
+ numericInput(NS(id, "NumCores"),
+ min = 1, max = 20,
+ "How many cores should be used for the batch analysis?",
+ value = 1
+ ),
actionButton(NS(id, "Start_Batch"), "Start batch analysis"),
actionButton(NS(id, "cancel_Batch"), "Stop optimization"),
downloadButton(NS(id, "batch_download"), "Save result of batch analysis"),
diff --git a/tsf/R/IDA_Server.R b/tsf/R/IDA_Server.R
index 3da050a..d82c3f5 100644
--- a/tsf/R/IDA_Server.R
+++ b/tsf/R/IDA_Server.R
@@ -33,6 +33,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked <- reactiveVal(FALSE)
setup_done <- reactiveVal(FALSE)
+ # NOTE: Start of model specific code
+ # ===============================================================================
check_inputs <- function() {
rwn(input$H0 != "", "Please enter a value for the Host")
rwn(input$D0 != "", "Please enter a value for the Dye")
@@ -145,6 +147,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
get_update_field_batch <- function() {
"IDAupdateFieldBatch"
}
+ # NOTE: End of model specific code
+ # ===============================================================================
get_opti_result <- function() {
opti_result()$parameter
@@ -494,11 +498,12 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
# Batch analysis
# ===============================================================================
setup_batch_done <- reactiveVal(FALSE)
- result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL)
batch_results_created <- reactiveVal(FALSE)
cancel_batch_clicked <- reactiveVal(FALSE)
num_rep_batch <- reactiveVal()
stdout <- reactiveVal(NULL)
+ task_queue <- reactiveVal(NULL)
+ result_batch <- reactiveVal()
batch_message <-function(message) {
session$sendCustomMessage(
@@ -563,6 +568,7 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
}
# clear everything
stdout(NULL)
+ result_batch(NULL)
invalid_time(1100)
setup_batch_done(FALSE)
batch_results_created(FALSE)
@@ -584,7 +590,6 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
stdout(character(num_cores))
request_cores(num_cores, session$token)
nclicks(nclicks() + 1)
- process_list <- vector("list", size)
seeds <- numeric(size)
seeds_from <- 1:1e6
session$sendCustomMessage(
@@ -613,12 +618,13 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
counter_messages <- 1
for (i in seq_len(length(df_list()))) {
for (j in seq_len(num_rep)) {
- messages[counter_messages] <- paste0("Dataset = ", i, "; Replicate = ", j)
+ messages[counter_messages] <-
+ paste0("Dataset = ", i, "; Replicate = ", j)
counter_messages <- counter_messages + 1
}
}
- # 3. split df_list, seed_list and messages by number of cores
+ # 3. Fill task queue
groups <- ceiling(1:size / (size / num_cores))
dfs <- df_list()
if (length(groups) > length(dfs)) {
@@ -628,32 +634,31 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
counter <- counter + 1
}
}
- dfs <- split(dfs, groups)
- seeds <- split(seeds, groups)
- messages <- split(messages, groups)
-
- # 4. create process list by calling call_several_opti_in_bg
- process_list <- lapply(seq_len(length(dfs)), function(x) {
- temp_dfs <- dfs[[x]]
- temp_seeds <- seeds[[x]]
- temp_messages <- messages[[x]]
- call_several_opti_in_bg(
- get_Model(), lb, ub, temp_dfs, additionalParameters,
- temp_seeds, npop, ngen, topo, et, temp_messages
- )
- })
+ task_queue(TaskQueue$new(
+ get_Model(),
+ lb, ub, dfs,
+ additionalParameters, seeds,
+ npop, ngen, topo, et,
+ messages, num_cores
+ ))
+
+ # 4. assign tasks
+ task_queue()$assign()
- result_val_batch$result <- process_list
setup_batch_done(TRUE)
NULL
})
batch_process_done <- function() {
req(setup_batch_done())
- req(length(result_val_batch$result) > 0)
- lapply(result_val_batch$result, function(x) {
- if (x$is_alive()) req(x$get_status() != "running")
- })
+ req(!is.null(task_queue()))
+ if (task_queue()$check() &&
+ !task_queue()$queue_empty()) {
+ task_queue()$assign()
+ }
+ if (!task_queue()$queue_empty()) {
+ return(FALSE)
+ }
invalid_time(invalid_time() + 1000)
nclicks(0)
return(TRUE)
@@ -664,7 +669,6 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
cancel_clicked_batch = TRUE
)
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
cancel_batch_clicked(TRUE)
})
@@ -672,39 +676,20 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
observe({
invalidateLater(invalid_time())
req(nclicks() != 0)
- req(!is.null(result_val_batch$result))
+ req(!is.null(task_queue()))
+ req(task_queue()$filled)
# is cancel_batch_clicked
if (cancel_batch_clicked()) {
- lapply(result_val_batch$result, function(process) {
- process$interrupt()
- process$wait()
- })
-
- # FIX: press cancel in init phase --> crash
- # FIX: press cancel like a morron --> crash
- # FIX: press cancel in two following runs --> crash
-
+ task_queue()$interrupt()
setup_batch_done(TRUE)
cancel_batch_clicked(FALSE)
nclicks(0)
- session$sendCustomMessage(
- type = get_update_field_batch(),
- list(message = "Shutdown process") # TODO: clean up
- )
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
- # check status
- # NOTE: errors are not printed otherwise screen is full of errors
- temp_status <- character(length(result_val_batch$result))
- for (i in seq_along(temp_status)) {
- # TODO: add core and add progress of core
- temp_status[i] <- print_status(
- result_val_batch$result[[i]]$read_output(),
- get_Model()
- )
- }
- stdout(format_batch_status(stdout(), temp_status))
+ # NOTE: check status
+ # (errors are not printed otherwise screen is full of errors)
+ stdout(task_queue()$get_status(stdout()))
bind <- function(a, b) {
if (is.null(a) && is.null(b)) {
return("Initialisation")
@@ -728,63 +713,33 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
)
})
- plot_data <- reactive({
+ get_data <- reactive({
values <- try({
- res <- list()
- counter <- 1
- for (i in seq_along(result_val_batch$result)) {
- temp <- result_val_batch$result[[i]]$get_result()
- if (length(temp) == 1) {
- res[[counter]] <- temp[[1]]
- counter <- counter + 1
- } else {
- for (j in seq_along(temp)) {
- res[[counter]] <- temp[[j]]
- counter <- counter + 1
- }
- }
- }
- res
+ task_queue()$seperate_results()
})
-
- values <- Filter(Negate(is.integer), values)
-
- if (!all(is.list(values))) {
- result_val_batch$result_splitted <- NULL
- batch_message("")
- print_error("Error in background process")
- } else if(length(values) == 0) {
- result_val_batch$result_splitted <- NULL
+ if (inherits(values, "try-error")) {
batch_message("")
print_error("Error in background process")
- } else if (inherits(values, "try-error")) {
- result_val_batch$result_splitted <- NULL
- batch_message("")
- print_error("Error in background process")
- } else {
- result_val_batch$result_splitted <- seperate_batch_results(values)
}
- lapply(result_val_batch$result, function(process) {
- process$kill()
- })
+ task_queue()$kill()
send_and_read_info(paste0("release: ", session$token))
- result_val_batch$result <- NULL
})
# observe results
observe({
invalidateLater(invalid_time())
if (batch_process_done() && !batch_results_created()) {
- plot_data()
+ get_data()
batch_results_created(TRUE)
stdout(NULL)
- req(is.list(result_val_batch$result_splitted))
+ values <- task_queue()$results
+ result_batch(values)
output$batch_data_plot <- renderPlotly({
entirePlotPlotly(
- result_val_batch$result_splitted,
- num_rep_batch(), ncols = 4
+ values
)
})
+ task_queue(NULL)
}
})
@@ -794,13 +749,12 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) {
},
content = function(file) {
req(batch_results_created())
- req(length(result_val_batch$result_splitted) > 0)
-
+ req(!is.null(result_batch()))
+ values <- result_batch()
download_batch_file(
get_Model_capital(),
file,
- result_val_batch$result_splitted,
- num_rep_batch()
+ values
)
}
)
diff --git a/tsf/R/IDA_UI.R b/tsf/R/IDA_UI.R
index ed41712..157edca 100644
--- a/tsf/R/IDA_UI.R
+++ b/tsf/R/IDA_UI.R
@@ -37,16 +37,15 @@ idaUI <- function(id) {
),
fluidRow(
box(
- # TODO: change back to 0
- textInput(NS(id, "H0"), "Host conc. [M]", value = 1e-6),
- textInput(NS(id, "D0"), "Dye conc. [M]", value = "1e-6"),
- textInput(NS(id, "kHD"), HTML("Ka(HD) [1/M]"), value = "3e6"),
+ textInput(NS(id, "H0"), "Host conc. [M]", value = 0),
+ textInput(NS(id, "D0"), "Dye conc. [M]", value = "0"),
+ textInput(NS(id, "kHD"), HTML("Ka(HD) [1/M]"), value = "0"),
box(
title = "Advanced options",
collapsible = TRUE, collapsed = TRUE,
box(
numericInput(NS(id, "npop"), "Number of particles", value = 40),
- numericInput(NS(id, "ngen"), "Number of generations", value = 20) #TODO: change back to 1000; Better error message when invalid ngen is set
+ numericInput(NS(id, "ngen"), "Number of generations", value = 1000)
),
box(
selectInput(NS(id, "topology"), "Topology of particle swarm",
@@ -168,7 +167,7 @@ idaUI <- function(id) {
value = 1
),
numericInput(NS(id, "NumCores"),
- min = 1, max = 20, # NOTE: maybe adapt to number of cores availabel
+ min = 1, max = 20,
"How many cores should be used for the batch analysis?",
value = 1
),
diff --git a/tsf/R/TaskQueue.R b/tsf/R/TaskQueue.R
new file mode 100644
index 0000000..25bf5e9
--- /dev/null
+++ b/tsf/R/TaskQueue.R
@@ -0,0 +1,220 @@
+DfRepCombi <- R6::R6Class(
+ "DfRepCombi",
+ public = list(
+ df = NULL,
+ rep = NULL,
+ initialize = function(message) {
+ m <- strsplit(message, ";")[[1]]
+ df_m <- gsub("Dataset = ", "", m[[1]]) |> as.numeric()
+ rep_m <- gsub("Replicate = ", "", m[[2]]) |> as.numeric()
+ self$df <- df_m
+ self$rep <- rep_m
+ }
+ )
+)
+
+Process <- R6::R6Class(
+ "Process",
+ public = list(
+ process = NULL,
+ idx = 0,
+ initialize = function(process, idx) {
+ self$process <- process
+ self$idx <- idx
+ },
+ get_status = function() {
+ self$process$get_status()
+ },
+ is_alive = function() {
+ self$process$is_alive()
+ },
+ get_result = function() {
+ self$process$get_result()
+ },
+ read_output = function() {
+ self$process$read_output()
+ },
+ interrupt = function() {
+ self$process$interrupt()
+ },
+ wait = function() {
+ self$process$wait()
+ },
+ kill = function() {
+ self$process$kill()
+ }
+ )
+)
+
+TaskQueue <- R6::R6Class(
+ "TaskQueue",
+ public = list(
+ dfs = NULL,
+ seeds = NULL,
+ messages = NULL,
+ assigned = NULL,
+ sz = NULL,
+ df_reps = NULL,
+ case = NULL,
+ lb = NULL,
+ ub = NULL,
+ ap = NULL,
+ npop = NULL,
+ ngen = NULL,
+ topo = NULL,
+ et = NULL,
+ num_cores = NULL,
+
+ processes = list(),
+ results = NULL,
+ filled = FALSE,
+ interrupted_at = NULL,
+ current_opti = 0,
+
+ initialize = function(case, lb, ub, dfs,
+ ap, seeds, npop, ngen,
+ topo, et,
+ messages, num_cores) {
+ self$case <- case
+ self$lb <- lb
+ self$ub <- ub
+ self$dfs <- dfs
+ self$ap <- ap
+ self$seeds <- seeds
+ self$npop <- npop
+ self$ngen <- ngen
+ self$topo <- topo
+ self$et <- et
+ self$messages <- messages
+ self$sz <- length(dfs)
+ self$results <- vector("list", self$sz)
+ self$assigned <- logical(self$sz)
+ self$df_reps <- lapply(seq_len(self$sz), function(i) {
+ DfRepCombi$new(messages[[i]])
+ })
+ self$num_cores <- num_cores
+ self$processes <- vector("list", num_cores)
+ self$filled <- TRUE
+ },
+
+ set_results = function(process) {
+ results <- process$get_result()
+ idx_res <- length(self$results) + 1
+ self$results[[idx_res]] <- results
+ idx <- process$idx
+ self$results[[idx_res]]$data$repetition <- self$df_reps[[idx]]$rep
+ self$results[[idx_res]]$data$dataset <- self$df_reps[[idx]]$df
+ self$results[[idx_res]]$parameter$repetition <- self$df_reps[[idx]]$rep
+ self$results[[idx_res]]$parameter$dataset <- self$df_reps[[idx]]$df
+ self$results[[idx_res]]$metrices$repetition <- self$df_reps[[idx]]$rep
+ self$results[[idx_res]]$metrices$dataset <- self$df_reps[[idx]]$df
+ },
+
+ assign_task = function() {
+ idx <- which(!self$assigned)[1]
+ if (is.na(idx)) return(NULL)
+ df <- self$dfs[[idx]]
+ seed <- self$seeds[idx]
+ m <- self$messages[idx]
+ process <- callr::r_bg(
+ function(case, lb, ub, df, ap,
+ seed, npop,
+ ngen, Topology, errorThreshold, messages) {
+ res <- tsf::opti(
+ case, lb, ub, df, ap, seed,
+ npop, ngen, Topology, errorThreshold, messages
+ )
+ return(res)
+ },
+ args = list(
+ self$case, self$lb, self$ub, df, self$ap,
+ seed, self$npop, self$ngen, self$topo, self$et, m
+ )
+ )
+ self$assigned[idx] <- TRUE
+ self$current_opti <- self$current_opti + 1
+ return(Process$new(process, self$current_opti))
+ },
+
+ assign = function() {
+ if (any(!self$assigned)) {
+ for (i in seq_len(self$num_cores)) {
+ if (inherits(self$processes[[i]], "Process")) {
+ self$set_results(self$processes[[i]])
+ if (self$processes[[i]]$is_alive()) self$processes[[i]]$kill()
+ }
+ if (self$queue_empty()) {
+ next
+ }
+ process <- self$assign_task()
+ self$processes[[i]] <- process
+ }
+ self$processes <- Filter(Negate(is.null), self$processes)
+ }
+ },
+
+ check = function() {
+ for (i in seq_len(length(self$processes))) {
+ if (self$processes[[i]]$is_alive()) {
+ return(FALSE)
+ }
+ }
+ return(TRUE)
+ },
+
+ get_results = function() {
+ for (i in seq_len(length(self$processes))) {
+ if (!self$processes[[i]]$is_alive()) {
+ self$set_results(self$processes[[i]])
+ }
+ }
+ },
+
+ interrupt = function() {
+ for (i in seq_len(length(self$processes))) {
+ if (self$processes[[i]]$is_alive()) {
+ status <- self$processes[[i]]$get_status()
+ self$processes[[i]]$interrupt()
+ self$processes[[i]]$wait()
+ if (status == "running" && self$processes[[i]]$is_alive()) {
+ self$set_results(self$processes[[i]]$get_result())
+ }
+ self$processes[[i]]$kill()
+ }
+ }
+ self$interrupted_at <- which(!self$assigned)[1]
+ self$assigned <- TRUE
+ },
+
+ get_status = function(stdout) {
+ status <- character(self$num_cores)
+ for (i in seq_len(length(self$processes))) {
+ if (self$processes[[i]]$is_alive()) {
+ status[i] <-
+ print_status(self$processes[[i]]$read_output(), self$case)
+ }
+ }
+ return(format_batch_status(stdout, status))
+ },
+
+ kill = function() {
+ for (i in seq_len(length(self$processes))) {
+ if (self$processes[[i]]$is_alive()) {
+ self$processes[[i]]$kill()
+ }
+ }
+ },
+
+ seperate_results = function() {
+ self$get_results()
+ self$results <- Filter(Negate(is.null), self$results)
+ self$results <- seperate_batch_results(self$results)
+ return(self$results)
+ },
+
+ queue_empty = function() {
+ all(self$assigned) &&
+ all(sapply(self$processes, function(p) !p$is_alive()))
+ }
+ )
+)
diff --git a/tsf/R/Utils_App.R b/tsf/R/Utils_App.R
index c52eb97..19a05f9 100644
--- a/tsf/R/Utils_App.R
+++ b/tsf/R/Utils_App.R
@@ -411,15 +411,8 @@ download_csv <- function(model, file, result_val) {
# download batch file
# ========================================================================================
-create_df_for_batch <- function(list, what, num_rep) { # TODO: use this fct also in the plotting fcts of Batch.R
+create_df_for_batch <- function(list, what) {
list <- list[[what]]
- num_data_sets <- length(list) / num_rep
- repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1
- data_sets <- rep(1:num_data_sets, each = num_rep)
- for (i in seq_along(list)) {
- list[[i]]$dataset <- data_sets[i]
- list[[i]]$repetition <- repetitions[i]
- }
df <- Reduce(rbind, list)
return(df)
}
@@ -445,7 +438,7 @@ adjust_theme <- function(p) {
return(p)
}
-download_batch_file <- function(model, file, result_val, num_rep) {
+download_batch_file <- function(model, file, result_val) {
wb <- openxlsx::createWorkbook()
addWorksheet(wb, "Results")
writeData(wb, "Results",
@@ -455,11 +448,11 @@ download_batch_file <- function(model, file, result_val, num_rep) {
)
curr_row <- 3
- data_trajectories <- create_df_for_batch(result_val, "states", num_rep)
+ data_trajectories <- create_df_for_batch(result_val, "states")
writeData(wb, "Results", data_trajectories, startRow = curr_row)
curr_row <- curr_row + dim(data_trajectories)[1] + 5
- parameter <- create_df_for_batch(result_val, "params", num_rep)
+ parameter <- create_df_for_batch(result_val, "params")
writeData(wb, "Results", parameter, startRow = curr_row)
curr_row <- curr_row + dim(parameter)[1] + 5
@@ -471,11 +464,11 @@ download_batch_file <- function(model, file, result_val, num_rep) {
writeData(wb, "Results", boundaries, startRow = curr_row)
curr_row <- curr_row + dim(boundaries)[1] + 5
- metrices <- create_df_for_batch(result_val, "metrices", num_rep)
+ metrices <- create_df_for_batch(result_val, "metrices")
writeData(wb, "Results", metrices, startRow = curr_row)
curr_row <- curr_row + dim(metrices)[1] + 5
- p1 <- plotStates(result_val, num_rep)
+ p1 <- plotStates(result_val)
temp_files_p1 <- lapply(seq_len(length(p1)), function(x) {
tempfile(fileext = ".png")
})
@@ -492,8 +485,8 @@ download_batch_file <- function(model, file, result_val, num_rep) {
curr_row <- curr_row + 20
}
- p2 <- plotParams(result_val, num_rep, 4)
- p3 <- plotMetrices(result_val, num_rep, 4)
+ p2 <- plotParams(result_val)
+ p3 <- plotMetrices(result_val)
tempfile_plot2 <- tempfile(fileext = ".png")
ggsave(tempfile_plot2,