From 800bbee1690dd90c92c7186fce9fcbdd40b8bc0b Mon Sep 17 00:00:00 2001 From: Konrad1991 Date: Thu, 5 Sep 2024 14:59:37 +0200 Subject: [PATCH] proceed implementing new parallel fct where several optimizsations are conducted by one background process --- Rplots.pdf | Bin 4085 -> 0 bytes ToDo.md | 10 +- tsf/R/BatchPlotting.R | 177 +++++++++-------------------- tsf/R/BatchPlottingPlotly.R | 41 ++----- tsf/R/DBA_Server.R | 160 ++++++++++++++------------ tsf/R/DBA_UI.R | 5 + tsf/R/GDA_Server.R | 161 ++++++++++++++------------ tsf/R/GDA_UI.R | 5 + tsf/R/HG_Server.R | 160 ++++++++++++++------------ tsf/R/HG_UI.R | 5 + tsf/R/IDA_Server.R | 138 ++++++++-------------- tsf/R/IDA_UI.R | 11 +- tsf/R/TaskQueue.R | 220 ++++++++++++++++++++++++++++++++++++ tsf/R/Utils_App.R | 23 ++-- 14 files changed, 630 insertions(+), 486 deletions(-) create mode 100644 tsf/R/TaskQueue.R diff --git a/Rplots.pdf b/Rplots.pdf index 7d8b34fe41e243aad6f8058e9a8a44ba224a4d3a..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 100644 GIT binary patch literal 0 HcmV?d00001 literal 4085 zcmb7Hc{o&k{}v(BB3qg#z2Oygjl#>_NJmW(}Q`$eUcHATzo zm5?n;vXmHGN@>Frk)g}rEa{bOVb6w}$-~0R7|M=XR_9P1(j4lqL88sL+ z6eW!6@#G*d02*K&_!D7dgaFMr5S2$~F-cS&1fWQUSTq)gHbCPr`dEUVfffR^W%>S( zXHIk;1HMHDEICwm0G-AK?D#x3p9f#F=dgVFG>9WQ#p&X7F);Sc7fS?bA$IOWKuljz z6xbip0H9L{8v;NxDv!!w`2(Om)gR&ldN6?u{O1DUw+kYkh|PhRyuaz78H>q-Ke+($ z+jWo(aanv04Yr5-#h!@py&QWXravzLz@h=rg3jQ<@q-o&xHKe)#`1+kQFD24#R3t* zS$a;>WVBN&88wU;78jH45D$bC`up_n)C_)D{+`gkhF*3K&^twW-OKJyUzza=DU#lPMEoDdfPZRoyS zzylDqh75?(CIen@BUya7Ay@#kg?#B$Q&uqGAxcpnjT8O7e(I^HC*SjeM~CJJ@c<7H z9$wfF6wLENfEI8+0V5;01E6^@&(e`68dxJEkqy`ngT--VQ)%B2kW02S1@ylK0iVLZ zzuO^#L{2=wJ!WR6R4(KT;6*I2?*!4;_w;s)y3Z6j0ZGtdIt>yT1^2<@1%4kAeFOY| zTY6DUKgJ|GBgo0Ie_gz=j!=fME7xYg=9t2Eth7@4)!w~vdd<4I0zl0#bI z3M}enWPw>)se0nkWe%m|%aNqsLu|zj8GC$-kuPqBOtqhTp*k0SFt<+l%zsMIS0i?O zDMs?nURRmORr#)puSS{@Qj+|Aq?fIl7L#Czy=0e_nXa3+5YH43dz{S3RG3$;t62kl zTHMyNE4e}JYwpQvH+77d{h8#E_w)UpNacG(V@j50rQN+VH*mHcXzLDjw`i=(3GRDs zkFbxKkgUNBHr5E)#_bq{riTZ%4y72#*C(g>AeBc}%vWRUx6opEd*V!*S;M8suzh$j zRQiDKYp$a&Lx+}*!ba;1g^0bhy0?G28+3gjRA`fJ>UiL;E3Z+MfZXEg7?*NFA?cM+ zXgiYB^Z>nvkg|CY^Eeafrt*p-<|kZoQw#}2URU4om}o#m2kHdYqk1E}md>Tm9nSEM zjhIA|ROcH%Qo=J=i(M;(D*6}V1L;a)UgOK`Yd^=g`+#DJ*X#Lf1cx@&gn5(i4r=>v zzdA7BX|pW%pcXhfwUE0e!fbz-kX-+4>Mv^W%n4#T4`b#{VETm?lJ2&446;A1dYv0T zuPXh8ziFM^l16<3c+#BIR=5|~n^ED>s*oU=X&PrTpL=#*w(-k_{_2clPv+%1GhD?l zs{LUSzDW$VO%9Mb;=8?bO0croQsPs4Qdj1g>_uVp7XhVe@0Zqf8WE{9OA@w9-l5~| z*(G;z93WAj z>m@0Eo*XU)&tA`qqP@fsrFEm)e3lO^Bcs>kL~2bV9+<3+J=(ffi5$-Z%vx2hA9X#x zVOT=aOwL~AX8}3bGiK@EMmA6l^CBoKD#@aQWektdQ*cDYI7r9o>@O%89BA9$K@}G(@mZn-*iE>I?*bvsMSMJ%G?}dzxU+I`>IMu2hV5ZyX4~wt~f~B zXJ6Xc>)ea_6Y)XL*5cs#_O6amF`l*|GCN7vf|7TxbK_{)sN5`ZK}jkmpO%bCT_3Sm zWs6docp0*6LrYAA(dnSaJwnQo)% z`bKSNaHLd8fy%)vX=IIbzm_Wff%{qDh<4lQbelY2f z)w|2`-uJAI72kDmzvxuJb9xikU5<6`w=i%rFbgT;B)ymX%TiIhDPhp;l|v$G5#^7f zImKSFgB+`S`MnlnZezsNW_pE^on-pOa#lc0xt3C5Vzza;b&hq){X&Lb{#6R4ArR3r3OCguuJ+$t9^3bC9`DMLcL2plQ<>jEuFM3Sq zXXvp9hyQ%rMZEfswCEcu?t9;JzxztJVZjHd zbN`&k&%RP;wix^I!W!omTg$vMvylrOv^YfLiL{POzO$G@o<9FGU$f>zO-zkmXmhAT z=+V%L(79Qe1?|ropEoYlEjUUUN~uXnN)=00u6`G%-Kf~;756!=BqcfJKr((kA8Cwr z(OZM=ATago51Qf+5#0Q`eK_rkwEI|Fy=Q@;{j2b01RKm%%m!?H`^E>7uWQ@mbvoDW zTW6Xim9$~6e9wu@Mh2AX-mMd(vr?y4JJ${A+T;;fHdeV15Xj1AJ_``E#2ruwa17|7 z&0H1sDJb&CZ-^!HNTzO-E|w& zRoeGXDybPk3-rqlGTjkkP<+_%^=w~%gWKD%$Dfpy3vJHYB()WUG|$Y>{4tVJJyy-} z9QV4&TvbV`tYeacyn<4X#4Jk43S@IEPqf4qUG~2+vRmly-RG@dvZmw_!~di8M=jRf zr^4F1M>^vN75;LZ;J%;;3H_B1gYFBTvyQ&diYSjb{uTdq!!q|}1Cg&IFPpU7J{2X{ zeIZYB_ru+?(e2UZ4aRD0E1%Yn$7^R+kE|RCIvU*>**TFvcPF_q+;XzTidKAp((Ikslb~>y z-$ENI7&$MtMQjEAU<9gMR~rng*7ftUOLOTud%pBT__#-s zQlfbi0bj+q^fciK>M^m3K3RChR@Ek0Q`x52X1nzZ)@x?{{Hb=zsr`Z{>E+P12l))4 z%LA8JedA9DDmS7ix89&-Txpu+z0}h?PQSfrPo+3iCS9|+ZsU&h?DvA~D=NkW zeCrQZDHPiy#eNfoPaf5V)Rw$`_qfNU$eH3k$1L%gY7f1&&>>Y2mnb+RFj(A#w;+6s zXkVzQ-cxY7{7qx{tME5b2@-<#)b@6rvpP^Q`|1V$d!IWEZ7+f&dY&~iuCj_MPj3>B z*NEQ~ac+L%L*wrFBmZ?RwOrso?-eLrQE)+F%=5Xy!&BXw6RG2ycK#7uUfFW7Xy{1W z)SPOMu?yGpui$=}JIBr|G=h zbtO5q>(~?BtxIXI#vLB9o?BI^)!nQ8H0rWYJW|PJ3J2V{%7V(_obI~B#j|r8!f~HV zy~Y6Jonyv^(>@I`J7@!?TX#WUXTxufs(qq1#deTpHb1p&*&^5q?(&`wUfR=lBD1Jz z*Y>ZTiw}nTgKh;cVbMxE^uKt{F7=e0+kffS}LwH#LxTmO@|2XOP%q+ zueE1_6!fa6N~gB(IukznY5nZG$)aSt+;EAx*qO!OwrtGL0`1?D$eao54>}XpM54X* zKVK9&b2pPq{|+EP65Y=af_0Ec=y(7)eK3&DmTp`w z9RU3~)Wh(SOQrEeyDp8+q45L#7*H?(^5_g-7z(7)I4mXr9^gPCB7BodqruV&0s7Km zS6n*0|LZbYJYUEU06E`+q`~_>gF*cX58yNXsT_VFgUaUtAj_Y{gbu>qsNXDeMXS5l zPn&u$2l7Lp0Sp#_{_y~KeSJNBzz_I=Vc|N79)S4+!{A|w@(ZSK@H-DrgoVg2JOc4| z9uWg8lV5mP3@llH#SDJO^oYM>xZmT`(?`RC>AO81he~Ha99WwJpd&pLf@=VPdzgMK jKr|F&03@?mB1t8B|0Xl29NxET=@IY*gr=ssodx1wFU7Y= diff --git a/ToDo.md b/ToDo.md index 11386b4..d244083 100644 --- a/ToDo.md +++ b/ToDo.md @@ -1,4 +1,6 @@ -Running IDA with Host, Dye and KaHD all set to 0 in batch mode results in -Error: ! Could not evaluate the loss function --> is not shown to user - ---> The error messages from the callr r_bg have to be catched and reported +- [x] interrupt partially run batch process (first optimisation not yet finished) +- [x] interrupt partially run batch process (at least one dataset finished) +- [x] interrupt batch process within initialisation phase +- [x] adapt plotting functions + * [x] plotly functionality + * [x] ggplot functionality diff --git a/tsf/R/BatchPlotting.R b/tsf/R/BatchPlotting.R index 2f6e8f8..b6a2792 100644 --- a/tsf/R/BatchPlotting.R +++ b/tsf/R/BatchPlotting.R @@ -129,16 +129,9 @@ combinePlots <- function(p1, p2, p3, index, base_size = 6) { return(p) } -plotStates <- function(list, num_rep = 1) { +plotStates <- function(list) { base_size <- baseSize() list <- list[[1]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) groups <- unique(df$dataset) plot_list <- lapply(groups, function(x) { @@ -151,15 +144,8 @@ plotStates <- function(list, num_rep = 1) { return(plot_list) } -plotParams <- function(list, num_rep = 1, base_size = 12) { +plotParams <- function(list) { list <- list[[2]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) data <- data.frame( x = rep(df[, 5], 4), @@ -172,56 +158,33 @@ plotParams <- function(list, num_rep = 1, base_size = 12) { ), repetition = rep(df$repetition, 4) ) - if (num_rep > 1) { - p <- ggplot() + - geom_boxplot( - data = data, - aes( - y = y, fill = "Entire data", x = factor(0) - ) - ) + - geom_boxplot( - data = data, - aes( - x = factor(x), y = y, - group = factor(x), - fill = factor(x) - ) - ) + - facet_wrap(. ~ names, - scales = "free_y", - strip.position = "left" - ) + - xlab(NULL) + - ylab(NULL) + - theme( - panel.spacing = unit(2, "lines"), - strip.background = element_blank(), - strip.placement = "outside" - ) + - guides(fill = guide_legend(title = "Datasets")) - } else { - p <- ggplot() + - geom_boxplot( - data = data, - aes( - x = factor(x), - y = y, - group = names - ) - ) + - facet_wrap(~names, - scales = "free_y", - strip.position = "left" - ) + - xlab(NULL) + - ylab(NULL) + - theme( - panel.spacing = unit(2, "lines"), - strip.background = element_blank(), - strip.placement = "outside" + p <- ggplot() + + geom_boxplot( + data = data, + aes( + y = y, fill = "Entire data", x = factor(0) + ) + ) + + geom_boxplot( + data = data, + aes( + x = factor(x), y = y, + group = factor(x), + fill = factor(x) ) - } + ) + + facet_wrap(. ~ names, + scales = "free_y", + strip.position = "left" + ) + + xlab(NULL) + + ylab(NULL) + + theme( + panel.spacing = unit(2, "lines"), + strip.background = element_blank(), + strip.placement = "outside" + ) + + guides(fill = guide_legend(title = "Datasets")) p <- addTheme(p) p <- p + theme( plot.background = element_rect(color = "grey", fill = NA, size = 2) @@ -229,15 +192,8 @@ plotParams <- function(list, num_rep = 1, base_size = 12) { return(p) } -plotMetrices <- function(list, num_rep = 1, base_size = 12) { +plotMetrices <- function(list) { list <- list[[3]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) data <- data.frame( x = rep(df[, 6], 5), @@ -251,56 +207,33 @@ plotMetrices <- function(list, num_rep = 1, base_size = 12) { ), repetition = rep(df$repetition, 5) ) - if (num_rep > 1) { - p <- ggplot() + - geom_boxplot( - data = data, - aes( - y = y, fill = "Entire data", x = factor(0) - ) - ) + - geom_boxplot( - data = data, - aes( - x = factor(x), y = y, - group = factor(x), - fill = factor(x) - ) - ) + - facet_wrap(. ~ names, - scales = "free_y", - strip.position = "left" - ) + - xlab(NULL) + - ylab(NULL) + - theme( - panel.spacing = unit(2, "lines"), - strip.background = element_blank(), - strip.placement = "outside" - ) + - guides(fill = guide_legend(title = "Datasets")) - } else { - p <- ggplot() + - geom_boxplot( - data = data, - aes( - x = factor(x), - y = y, - group = names - ) - ) + - facet_wrap(~names, - scales = "free_y", - strip.position = "left" - ) + - xlab(NULL) + - ylab(NULL) + - theme( - panel.spacing = unit(2, "lines"), - strip.background = element_blank(), - strip.placement = "outside" + p <- ggplot() + + geom_boxplot( + data = data, + aes( + y = y, fill = "Entire data", x = factor(0) + ) + ) + + geom_boxplot( + data = data, + aes( + x = factor(x), y = y, + group = factor(x), + fill = factor(x) ) - } + ) + + facet_wrap(. ~ names, + scales = "free_y", + strip.position = "left" + ) + + xlab(NULL) + + ylab(NULL) + + theme( + panel.spacing = unit(2, "lines"), + strip.background = element_blank(), + strip.placement = "outside" + ) + + guides(fill = guide_legend(title = "Datasets")) p <- addTheme(p) p <- p + theme( plot.background = element_rect(color = "grey", fill = NA, size = 2) diff --git a/tsf/R/BatchPlottingPlotly.R b/tsf/R/BatchPlottingPlotly.R index 84127a0..b423f78 100644 --- a/tsf/R/BatchPlottingPlotly.R +++ b/tsf/R/BatchPlottingPlotly.R @@ -113,15 +113,8 @@ plotHostDyePlotly <- function(df, Dataset) { return(p) } -plotStatesPlotly <- function(list, num_rep = 1, ncols = 4) { +plotStatesPlotly <- function(list) { list <- list[[1]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) groups <- unique(df$dataset) plot_list <- lapply(groups, function(x) { @@ -187,15 +180,8 @@ plotStatesPlotly <- function(list, num_rep = 1, ncols = 4) { return(p) } -plotParamsPlotly <- function(list, num_rep = 1) { +plotParamsPlotly <- function(list) { list <- list[[2]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) data <- data.frame( x = rep(df$dataset, 4), @@ -240,15 +226,8 @@ plotParamsPlotly <- function(list, num_rep = 1) { return(p) } -plotMetricesPlotly <- function(list, num_rep = 1, base_size = 12) { +plotMetricesPlotly <- function(list) { list <- list[[3]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) data <- data.frame( x = rep(df[, 6], 5), @@ -295,12 +274,12 @@ plotMetricesPlotly <- function(list, num_rep = 1, base_size = 12) { } -entirePlotPlotly <- function(list, num_rep = 1, ncols = 4) { - states <- plotStatesPlotly(list, num_rep, ncols) - params <- plotParamsPlotly(list, num_rep) - metrices <- plotMetricesPlotly(list, num_rep) +entirePlotPlotly <- function(list) { + states <- plotStatesPlotly(list) + params <- plotParamsPlotly(list) + metrices <- plotMetricesPlotly(list) subplot(states, params, metrices, nrows = 1, - shareX = FALSE, titleX = TRUE, titleY = TRUE, - widths = c(0.7, 0.15, 0.15), - margin = 0.02) + shareX = FALSE, titleX = TRUE, titleY = TRUE, + widths = c(0.7, 0.15, 0.15), + margin = 0.02) } diff --git a/tsf/R/DBA_Server.R b/tsf/R/DBA_Server.R index 344f24a..c41195c 100644 --- a/tsf/R/DBA_Server.R +++ b/tsf/R/DBA_Server.R @@ -32,6 +32,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked <- reactiveVal(FALSE) setup_done <- reactiveVal(FALSE) + # NOTE: Start of model specific code + # =============================================================================== check_inputs <- function() { rwn(input$D0 != "", "Please enter a value for the Dye") rwn(!is.na(input$npop), @@ -139,6 +141,9 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { "DBAupdateFieldBatch" } + # NOTE: End of model specific code + # =============================================================================== + get_opti_result <- function() { opti_result()$parameter } @@ -487,11 +492,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { # Batch analysis # =============================================================================== setup_batch_done <- reactiveVal(FALSE) - result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL) batch_results_created <- reactiveVal(FALSE) cancel_batch_clicked <- reactiveVal(FALSE) num_rep_batch <- reactiveVal() stdout <- reactiveVal(NULL) + task_queue <- reactiveVal(NULL) + result_batch <- reactiveVal() batch_message <-function(message) { session$sendCustomMessage( @@ -514,6 +520,18 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { length(df_list()) > 0, "The dataset list seems to be empty. Please upload a file" ) + rwn( # TODO: update also other server code + !is_integer(input$NumCores), + "Please provide an integer entry for number of cores" + ) + } + + get_num_core <- function() { + res <- convert_num_to_int(input$NumCores) + if(res == 0) { + res <- 1 + } + return(res) } observeEvent(input$Start_Batch, { @@ -532,6 +550,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { ngen <- create_ngen() topo <- create_topology() et <- create_error_threshold() + num_cores <- get_num_core() # check seed case seed <- input$Seed num_rep <- as.integer(input$NumRepDataset) @@ -542,7 +561,8 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed_origin <- seed } # clear everything - stdout (NULL) + stdout(NULL) + result_batch(NULL) invalid_time(1100) setup_batch_done(FALSE) batch_results_created(FALSE) @@ -558,10 +578,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { }) size <- length(df_list()) * num_rep - stdout(character(size)) - request_cores(size, session$token) + if (num_cores > size) { + num_cores <- size + } + stdout(character(num_cores)) + request_cores(num_cores, session$token) nclicks(nclicks() + 1) - process_list <- vector("list", size) seeds <- numeric(size) seeds_from <- 1:1e6 session$sendCustomMessage( @@ -569,6 +591,7 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { list(message = "Initializing...") ) + # 1. create seeds in loop for (i in seq_len(size)) { if (seed_case == 1) { seed <- sample(seeds_from, 1) @@ -579,35 +602,57 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed <- sample(seeds_from, 1) } } else if (seed_case == 2) { - seed <- seed + seed <- seed # TODO: check is this correct } seeds[i] <- seed - process_list[[i]] <- callr::r_bg( - function(case, lb, ub, df, ap, - seed, npop, ngen, Topology, errorThreshold) { - res <- tsf::opti( - case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold - ) - return(res) - }, - args = list( - get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]], - additionalParameters, seed, npop, ngen, topo, et - ) - ) } - result_val_batch$result <- process_list + # 2. Create message lists for each process + messages <- character(size) + counter_messages <- 1 + for (i in seq_len(length(df_list()))) { + for (j in seq_len(num_rep)) { + messages[counter_messages] <- + paste0("Dataset = ", i, "; Replicate = ", j) + counter_messages <- counter_messages + 1 + } + } + + # 3. Fill task queue + groups <- ceiling(1:size / (size / num_cores)) + dfs <- df_list() + if (length(groups) > length(dfs)) { + counter <- 1 + while (length(groups) > length(dfs)) { + dfs <- c(dfs, dfs[counter]) + counter <- counter + 1 + } + } + task_queue(TaskQueue$new( + get_Model(), + lb, ub, dfs, + additionalParameters, seeds, + npop, ngen, topo, et, + messages, num_cores + )) + + # 4. assign tasks + task_queue()$assign() + setup_batch_done(TRUE) NULL }) batch_process_done <- function() { req(setup_batch_done()) - req(length(result_val_batch$result) > 0) - lapply(result_val_batch$result, function(x) { - if (x$is_alive()) req(x$get_status() != "running") - }) + req(!is.null(task_queue())) + if (task_queue()$check() && + !task_queue()$queue_empty()) { + task_queue()$assign() + } + if (!task_queue()$queue_empty()) { + return(FALSE) + } invalid_time(invalid_time() + 1000) nclicks(0) return(TRUE) @@ -618,7 +663,6 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked_batch = TRUE ) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) cancel_batch_clicked(TRUE) }) @@ -626,43 +670,20 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { observe({ invalidateLater(invalid_time()) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) + req(!is.null(task_queue())) + req(task_queue()$filled) # is cancel_batch_clicked if (cancel_batch_clicked()) { + task_queue()$interrupt() setup_batch_done(TRUE) cancel_batch_clicked(FALSE) nclicks(0) - lapply(result_val_batch$result, function(process) { - process$interrupt() - process$wait() - }) - session$sendCustomMessage( - type = get_update_field_batch(), - list(message = "") - ) send_and_read_info(paste0("release: ", session$token)) return(NULL) } - # check status - # NOTE: errors are not printed otherwise screen is full of errors - counter_dataset <- 0 - counter_rep <- 0 - temp_status <- character(length(result_val_batch$result)) - for (i in seq_along(temp_status)) { - if (((i - 1) %% num_rep_batch()) == 0) { - counter_dataset <- counter_dataset + 1 - counter_rep <- 1 - } else { - counter_rep <- counter_rep + 1 - } - temp_status[i] <- print_status( - result_val_batch$result[[i]]$read_output(), - counter_dataset, - counter_rep, - get_Model() - ) - } - stdout(format_batch_status(stdout(), temp_status)) + # NOTE: check status + # (errors are not printed otherwise screen is full of errors) + stdout(task_queue()$get_status(stdout())) bind <- function(a, b) { if (is.null(a) && is.null(b)) { return("Initialisation") @@ -686,37 +707,33 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { ) }) - plot_data <- reactive({ - values <- try(lapply(result_val_batch$result, function(process) { - process$get_result() - })) + get_data <- reactive({ + values <- try({ + task_queue()$seperate_results() + }) if (inherits(values, "try-error")) { - result_val_batch$result_splitted <- NULL batch_message("") print_error("Error in background process") - } else { - result_val_batch$result_splitted <- seperate_batch_results(values) } - lapply(result_val_batch$result, function(process) { - process$kill() - }) + task_queue()$kill() send_and_read_info(paste0("release: ", session$token)) - result_val_batch$result <- NULL }) # observe results observe({ invalidateLater(invalid_time()) if (batch_process_done() && !batch_results_created()) { - plot_data() + get_data() batch_results_created(TRUE) stdout(NULL) + values <- task_queue()$results + result_batch(values) output$batch_data_plot <- renderPlotly({ entirePlotPlotly( - result_val_batch$result_splitted, - num_rep_batch(), ncols = 4 + values ) }) + task_queue(NULL) } }) @@ -726,13 +743,12 @@ dbaServer <- function(id, df_reactive, df_list_reactive, nclicks) { }, content = function(file) { req(batch_results_created()) - req(length(result_val_batch$result_splitted) > 0) - + req(!is.null(result_batch())) + values <- result_batch() download_batch_file( get_Model_capital(), file, - result_val_batch$result_splitted, - num_rep_batch() + values ) } ) diff --git a/tsf/R/DBA_UI.R b/tsf/R/DBA_UI.R index 8fc7da9..6e015cb 100644 --- a/tsf/R/DBA_UI.R +++ b/tsf/R/DBA_UI.R @@ -160,6 +160,11 @@ dbaUI <- function(id) { "How often should each dataset be analysed (using different seeds)", value = 1 ), + numericInput(NS(id, "NumCores"), + min = 1, max = 20, + "How many cores should be used for the batch analysis?", + value = 1 + ), actionButton(NS(id, "Start_Batch"), "Start batch analysis"), actionButton(NS(id, "cancel_Batch"), "Stop optimization"), downloadButton(NS(id, "batch_download"), "Save result of batch analysis"), diff --git a/tsf/R/GDA_Server.R b/tsf/R/GDA_Server.R index ec52ea6..270193e 100644 --- a/tsf/R/GDA_Server.R +++ b/tsf/R/GDA_Server.R @@ -31,6 +31,9 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked <- reactiveVal(FALSE) setup_done <- reactiveVal(FALSE) + # NOTE: Start of model specific code + # =============================================================================== + check_inputs <- function() { rwn(input$H0 != "", "Please enter a value for the Host") rwn(input$G0 != "", "Please enter a value for the Guest") @@ -144,6 +147,9 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { "GDAupdateFieldBatch" } + # NOTE: End of model specific code + # =============================================================================== + get_opti_result <- function() { opti_result()$parameter } @@ -492,11 +498,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { # Batch analysis # =============================================================================== setup_batch_done <- reactiveVal(FALSE) - result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL) batch_results_created <- reactiveVal(FALSE) cancel_batch_clicked <- reactiveVal(FALSE) num_rep_batch <- reactiveVal() stdout <- reactiveVal(NULL) + task_queue <- reactiveVal(NULL) + result_batch <- reactiveVal() batch_message <-function(message) { session$sendCustomMessage( @@ -519,6 +526,18 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { length(df_list()) > 0, "The dataset list seems to be empty. Please upload a file" ) + rwn( # TODO: update also other server code + !is_integer(input$NumCores), + "Please provide an integer entry for number of cores" + ) + } + + get_num_core <- function() { + res <- convert_num_to_int(input$NumCores) + if(res == 0) { + res <- 1 + } + return(res) } observeEvent(input$Start_Batch, { @@ -537,6 +556,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { ngen <- create_ngen() topo <- create_topology() et <- create_error_threshold() + num_cores <- get_num_core() # check seed case seed <- input$Seed num_rep <- as.integer(input$NumRepDataset) @@ -547,7 +567,8 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed_origin <- seed } # clear everything - stdout (NULL) + stdout(NULL) + result_batch(NULL) invalid_time(1100) setup_batch_done(FALSE) batch_results_created(FALSE) @@ -563,10 +584,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { }) size <- length(df_list()) * num_rep - stdout(character(size)) - request_cores(size, session$token) + if (num_cores > size) { + num_cores <- size + } + stdout(character(num_cores)) + request_cores(num_cores, session$token) nclicks(nclicks() + 1) - process_list <- vector("list", size) seeds <- numeric(size) seeds_from <- 1:1e6 session$sendCustomMessage( @@ -574,6 +597,7 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { list(message = "Initializing...") ) + # 1. create seeds in loop for (i in seq_len(size)) { if (seed_case == 1) { seed <- sample(seeds_from, 1) @@ -584,35 +608,57 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed <- sample(seeds_from, 1) } } else if (seed_case == 2) { - seed <- seed + seed <- seed # TODO: check is this correct } seeds[i] <- seed - process_list[[i]] <- callr::r_bg( - function(case, lb, ub, df, ap, - seed, npop, ngen, Topology, errorThreshold) { - res <- tsf::opti( - case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold - ) - return(res) - }, - args = list( - get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]], - additionalParameters, seed, npop, ngen, topo, et - ) - ) } - result_val_batch$result <- process_list + # 2. Create message lists for each process + messages <- character(size) + counter_messages <- 1 + for (i in seq_len(length(df_list()))) { + for (j in seq_len(num_rep)) { + messages[counter_messages] <- + paste0("Dataset = ", i, "; Replicate = ", j) + counter_messages <- counter_messages + 1 + } + } + + # 3. Fill task queue + groups <- ceiling(1:size / (size / num_cores)) + dfs <- df_list() + if (length(groups) > length(dfs)) { + counter <- 1 + while (length(groups) > length(dfs)) { + dfs <- c(dfs, dfs[counter]) + counter <- counter + 1 + } + } + task_queue(TaskQueue$new( + get_Model(), + lb, ub, dfs, + additionalParameters, seeds, + npop, ngen, topo, et, + messages, num_cores + )) + + # 4. assign tasks + task_queue()$assign() + setup_batch_done(TRUE) NULL }) batch_process_done <- function() { req(setup_batch_done()) - req(length(result_val_batch$result) > 0) - lapply(result_val_batch$result, function(x) { - if (x$is_alive()) req(x$get_status() != "running") - }) + req(!is.null(task_queue())) + if (task_queue()$check() && + !task_queue()$queue_empty()) { + task_queue()$assign() + } + if (!task_queue()$queue_empty()) { + return(FALSE) + } invalid_time(invalid_time() + 1000) nclicks(0) return(TRUE) @@ -623,7 +669,6 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked_batch = TRUE ) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) cancel_batch_clicked(TRUE) }) @@ -631,43 +676,20 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { observe({ invalidateLater(invalid_time()) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) + req(!is.null(task_queue())) + req(task_queue()$filled) # is cancel_batch_clicked if (cancel_batch_clicked()) { + task_queue()$interrupt() setup_batch_done(TRUE) cancel_batch_clicked(FALSE) nclicks(0) - lapply(result_val_batch$result, function(process) { - process$interrupt() - process$wait() - }) - session$sendCustomMessage( - type = get_update_field_batch(), - list(message = "") - ) send_and_read_info(paste0("release: ", session$token)) return(NULL) } - # check status - # NOTE: errors are not printed otherwise screen is full of errors - counter_dataset <- 0 - counter_rep <- 0 - temp_status <- character(length(result_val_batch$result)) - for (i in seq_along(temp_status)) { - if (((i - 1) %% num_rep_batch()) == 0) { - counter_dataset <- counter_dataset + 1 - counter_rep <- 1 - } else { - counter_rep <- counter_rep + 1 - } - temp_status[i] <- print_status( - result_val_batch$result[[i]]$read_output(), - counter_dataset, - counter_rep, - get_Model() - ) - } - stdout(format_batch_status(stdout(), temp_status)) + # NOTE: check status + # (errors are not printed otherwise screen is full of errors) + stdout(task_queue()$get_status(stdout())) bind <- function(a, b) { if (is.null(a) && is.null(b)) { return("Initialisation") @@ -691,37 +713,33 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { ) }) - plot_data <- reactive({ - values <- try(lapply(result_val_batch$result, function(process) { - process$get_result() - })) + get_data <- reactive({ + values <- try({ + task_queue()$seperate_results() + }) if (inherits(values, "try-error")) { - result_val_batch$result_splitted <- NULL batch_message("") print_error("Error in background process") - } else { - result_val_batch$result_splitted <- seperate_batch_results(values) } - lapply(result_val_batch$result, function(process) { - process$kill() - }) + task_queue()$kill() send_and_read_info(paste0("release: ", session$token)) - result_val_batch$result <- NULL }) # observe results observe({ invalidateLater(invalid_time()) if (batch_process_done() && !batch_results_created()) { - plot_data() + get_data() batch_results_created(TRUE) stdout(NULL) + values <- task_queue()$results + result_batch(values) output$batch_data_plot <- renderPlotly({ entirePlotPlotly( - result_val_batch$result_splitted, - num_rep_batch(), ncols = 4 + values ) }) + task_queue(NULL) } }) @@ -731,13 +749,12 @@ gdaServer <- function(id, df_reactive, df_list_reactive, nclicks) { }, content = function(file) { req(batch_results_created()) - req(length(result_val_batch$result_splitted) > 0) - + req(!is.null(result_batch())) + values <- result_batch() download_batch_file( get_Model_capital(), file, - result_val_batch$result_splitted, - num_rep_batch() + values ) } ) diff --git a/tsf/R/GDA_UI.R b/tsf/R/GDA_UI.R index 62f34c2..860d58f 100644 --- a/tsf/R/GDA_UI.R +++ b/tsf/R/GDA_UI.R @@ -160,6 +160,11 @@ gdaUI <- function(id) { "How often should each dataset be analysed (using different seeds)", value = 1 ), + numericInput(NS(id, "NumCores"), + min = 1, max = 20, + "How many cores should be used for the batch analysis?", + value = 1 + ), actionButton(NS(id, "Start_Batch"), "Start batch analysis"), actionButton(NS(id, "cancel_Batch"), "Stop optimization"), downloadButton(NS(id, "batch_download"), "Save result of batch analysis"), diff --git a/tsf/R/HG_Server.R b/tsf/R/HG_Server.R index 78f3a5c..061c59f 100644 --- a/tsf/R/HG_Server.R +++ b/tsf/R/HG_Server.R @@ -31,6 +31,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked <- reactiveVal(FALSE) setup_done <- reactiveVal(FALSE) + # NOTE: Start of model specific code + # =============================================================================== check_inputs <- function() { rwn(input$H0 != "", "Please enter a value for the Host") rwn(!is.na(input$npop), @@ -138,6 +140,9 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { "HGupdateFieldBatch" } + # NOTE: End of model specific code + # =============================================================================== + get_opti_result <- function() { opti_result()$parameter } @@ -486,11 +491,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { # Batch analysis # =============================================================================== setup_batch_done <- reactiveVal(FALSE) - result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL) batch_results_created <- reactiveVal(FALSE) cancel_batch_clicked <- reactiveVal(FALSE) num_rep_batch <- reactiveVal() stdout <- reactiveVal(NULL) + task_queue <- reactiveVal(NULL) + result_batch <- reactiveVal() batch_message <-function(message) { session$sendCustomMessage( @@ -513,6 +519,18 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { length(df_list()) > 0, "The dataset list seems to be empty. Please upload a file" ) + rwn( # TODO: update also other server code + !is_integer(input$NumCores), + "Please provide an integer entry for number of cores" + ) + } + + get_num_core <- function() { + res <- convert_num_to_int(input$NumCores) + if(res == 0) { + res <- 1 + } + return(res) } observeEvent(input$Start_Batch, { @@ -531,6 +549,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { ngen <- create_ngen() topo <- create_topology() et <- create_error_threshold() + num_cores <- get_num_core() # check seed case seed <- input$Seed num_rep <- as.integer(input$NumRepDataset) @@ -541,7 +560,8 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed_origin <- seed } # clear everything - stdout (NULL) + stdout(NULL) + result_batch(NULL) invalid_time(1100) setup_batch_done(FALSE) batch_results_created(FALSE) @@ -557,10 +577,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { }) size <- length(df_list()) * num_rep - stdout(character(size)) - request_cores(size, session$token) + if (num_cores > size) { + num_cores <- size + } + stdout(character(num_cores)) + request_cores(num_cores, session$token) nclicks(nclicks() + 1) - process_list <- vector("list", size) seeds <- numeric(size) seeds_from <- 1:1e6 session$sendCustomMessage( @@ -568,6 +590,7 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { list(message = "Initializing...") ) + # 1. create seeds in loop for (i in seq_len(size)) { if (seed_case == 1) { seed <- sample(seeds_from, 1) @@ -578,35 +601,57 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { seed <- sample(seeds_from, 1) } } else if (seed_case == 2) { - seed <- seed + seed <- seed # TODO: check is this correct } seeds[i] <- seed - process_list[[i]] <- callr::r_bg( - function(case, lb, ub, df, ap, - seed, npop, ngen, Topology, errorThreshold) { - res <- tsf::opti( - case, lb, ub, df, ap, seed, npop, ngen, Topology, errorThreshold - ) - return(res) - }, - args = list( - get_Model(), lb, ub, df_list()[[(i - 1) %% length(df_list()) + 1]], - additionalParameters, seed, npop, ngen, topo, et - ) - ) } - result_val_batch$result <- process_list + # 2. Create message lists for each process + messages <- character(size) + counter_messages <- 1 + for (i in seq_len(length(df_list()))) { + for (j in seq_len(num_rep)) { + messages[counter_messages] <- + paste0("Dataset = ", i, "; Replicate = ", j) + counter_messages <- counter_messages + 1 + } + } + + # 3. Fill task queue + groups <- ceiling(1:size / (size / num_cores)) + dfs <- df_list() + if (length(groups) > length(dfs)) { + counter <- 1 + while (length(groups) > length(dfs)) { + dfs <- c(dfs, dfs[counter]) + counter <- counter + 1 + } + } + task_queue(TaskQueue$new( + get_Model(), + lb, ub, dfs, + additionalParameters, seeds, + npop, ngen, topo, et, + messages, num_cores + )) + + # 4. assign tasks + task_queue()$assign() + setup_batch_done(TRUE) NULL }) batch_process_done <- function() { req(setup_batch_done()) - req(length(result_val_batch$result) > 0) - lapply(result_val_batch$result, function(x) { - if (x$is_alive()) req(x$get_status() != "running") - }) + req(!is.null(task_queue())) + if (task_queue()$check() && + !task_queue()$queue_empty()) { + task_queue()$assign() + } + if (!task_queue()$queue_empty()) { + return(FALSE) + } invalid_time(invalid_time() + 1000) nclicks(0) return(TRUE) @@ -617,7 +662,6 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked_batch = TRUE ) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) cancel_batch_clicked(TRUE) }) @@ -625,43 +669,20 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { observe({ invalidateLater(invalid_time()) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) + req(!is.null(task_queue())) + req(task_queue()$filled) # is cancel_batch_clicked if (cancel_batch_clicked()) { + task_queue()$interrupt() setup_batch_done(TRUE) cancel_batch_clicked(FALSE) nclicks(0) - lapply(result_val_batch$result, function(process) { - process$interrupt() - process$wait() - }) - session$sendCustomMessage( - type = get_update_field_batch(), - list(message = "") - ) send_and_read_info(paste0("release: ", session$token)) return(NULL) } - # check status - # NOTE: errors are not printed otherwise screen is full of errors - counter_dataset <- 0 - counter_rep <- 0 - temp_status <- character(length(result_val_batch$result)) - for (i in seq_along(temp_status)) { - if (((i - 1) %% num_rep_batch()) == 0) { - counter_dataset <- counter_dataset + 1 - counter_rep <- 1 - } else { - counter_rep <- counter_rep + 1 - } - temp_status[i] <- print_status( - result_val_batch$result[[i]]$read_output(), - counter_dataset, - counter_rep, - get_Model() - ) - } - stdout(format_batch_status(stdout(), temp_status)) + # NOTE: check status + # (errors are not printed otherwise screen is full of errors) + stdout(task_queue()$get_status(stdout())) bind <- function(a, b) { if (is.null(a) && is.null(b)) { return("Initialisation") @@ -685,37 +706,33 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { ) }) - plot_data <- reactive({ - values <- try(lapply(result_val_batch$result, function(process) { - process$get_result() - })) + get_data <- reactive({ + values <- try({ + task_queue()$seperate_results() + }) if (inherits(values, "try-error")) { - result_val_batch$result_splitted <- NULL batch_message("") print_error("Error in background process") - } else { - result_val_batch$result_splitted <- seperate_batch_results(values) } - lapply(result_val_batch$result, function(process) { - process$kill() - }) + task_queue()$kill() send_and_read_info(paste0("release: ", session$token)) - result_val_batch$result <- NULL }) # observe results observe({ invalidateLater(invalid_time()) if (batch_process_done() && !batch_results_created()) { - plot_data() + get_data() batch_results_created(TRUE) stdout(NULL) + values <- task_queue()$results + result_batch(values) output$batch_data_plot <- renderPlotly({ entirePlotPlotly( - result_val_batch$result_splitted, - num_rep_batch(), ncols = 4 + values ) }) + task_queue(NULL) } }) @@ -725,13 +742,12 @@ hgServer <- function(id, df_reactive, df_list_reactive, nclicks) { }, content = function(file) { req(batch_results_created()) - req(length(result_val_batch$result_splitted) > 0) - + req(!is.null(result_batch())) + values <- result_batch() download_batch_file( get_Model_capital(), file, - result_val_batch$result_splitted, - num_rep_batch() + values ) } ) diff --git a/tsf/R/HG_UI.R b/tsf/R/HG_UI.R index a72b835..b95f4de 100644 --- a/tsf/R/HG_UI.R +++ b/tsf/R/HG_UI.R @@ -163,6 +163,11 @@ hgUI <- function(id) { "How often should each dataset be analysed (using different seeds)", value = 1 ), + numericInput(NS(id, "NumCores"), + min = 1, max = 20, + "How many cores should be used for the batch analysis?", + value = 1 + ), actionButton(NS(id, "Start_Batch"), "Start batch analysis"), actionButton(NS(id, "cancel_Batch"), "Stop optimization"), downloadButton(NS(id, "batch_download"), "Save result of batch analysis"), diff --git a/tsf/R/IDA_Server.R b/tsf/R/IDA_Server.R index 3da050a..d82c3f5 100644 --- a/tsf/R/IDA_Server.R +++ b/tsf/R/IDA_Server.R @@ -33,6 +33,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked <- reactiveVal(FALSE) setup_done <- reactiveVal(FALSE) + # NOTE: Start of model specific code + # =============================================================================== check_inputs <- function() { rwn(input$H0 != "", "Please enter a value for the Host") rwn(input$D0 != "", "Please enter a value for the Dye") @@ -145,6 +147,8 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { get_update_field_batch <- function() { "IDAupdateFieldBatch" } + # NOTE: End of model specific code + # =============================================================================== get_opti_result <- function() { opti_result()$parameter @@ -494,11 +498,12 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { # Batch analysis # =============================================================================== setup_batch_done <- reactiveVal(FALSE) - result_val_batch <- reactiveValues(result = NULL, result_splitted = NULL) batch_results_created <- reactiveVal(FALSE) cancel_batch_clicked <- reactiveVal(FALSE) num_rep_batch <- reactiveVal() stdout <- reactiveVal(NULL) + task_queue <- reactiveVal(NULL) + result_batch <- reactiveVal() batch_message <-function(message) { session$sendCustomMessage( @@ -563,6 +568,7 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { } # clear everything stdout(NULL) + result_batch(NULL) invalid_time(1100) setup_batch_done(FALSE) batch_results_created(FALSE) @@ -584,7 +590,6 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { stdout(character(num_cores)) request_cores(num_cores, session$token) nclicks(nclicks() + 1) - process_list <- vector("list", size) seeds <- numeric(size) seeds_from <- 1:1e6 session$sendCustomMessage( @@ -613,12 +618,13 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { counter_messages <- 1 for (i in seq_len(length(df_list()))) { for (j in seq_len(num_rep)) { - messages[counter_messages] <- paste0("Dataset = ", i, "; Replicate = ", j) + messages[counter_messages] <- + paste0("Dataset = ", i, "; Replicate = ", j) counter_messages <- counter_messages + 1 } } - # 3. split df_list, seed_list and messages by number of cores + # 3. Fill task queue groups <- ceiling(1:size / (size / num_cores)) dfs <- df_list() if (length(groups) > length(dfs)) { @@ -628,32 +634,31 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { counter <- counter + 1 } } - dfs <- split(dfs, groups) - seeds <- split(seeds, groups) - messages <- split(messages, groups) - - # 4. create process list by calling call_several_opti_in_bg - process_list <- lapply(seq_len(length(dfs)), function(x) { - temp_dfs <- dfs[[x]] - temp_seeds <- seeds[[x]] - temp_messages <- messages[[x]] - call_several_opti_in_bg( - get_Model(), lb, ub, temp_dfs, additionalParameters, - temp_seeds, npop, ngen, topo, et, temp_messages - ) - }) + task_queue(TaskQueue$new( + get_Model(), + lb, ub, dfs, + additionalParameters, seeds, + npop, ngen, topo, et, + messages, num_cores + )) + + # 4. assign tasks + task_queue()$assign() - result_val_batch$result <- process_list setup_batch_done(TRUE) NULL }) batch_process_done <- function() { req(setup_batch_done()) - req(length(result_val_batch$result) > 0) - lapply(result_val_batch$result, function(x) { - if (x$is_alive()) req(x$get_status() != "running") - }) + req(!is.null(task_queue())) + if (task_queue()$check() && + !task_queue()$queue_empty()) { + task_queue()$assign() + } + if (!task_queue()$queue_empty()) { + return(FALSE) + } invalid_time(invalid_time() + 1000) nclicks(0) return(TRUE) @@ -664,7 +669,6 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { cancel_clicked_batch = TRUE ) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) cancel_batch_clicked(TRUE) }) @@ -672,39 +676,20 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { observe({ invalidateLater(invalid_time()) req(nclicks() != 0) - req(!is.null(result_val_batch$result)) + req(!is.null(task_queue())) + req(task_queue()$filled) # is cancel_batch_clicked if (cancel_batch_clicked()) { - lapply(result_val_batch$result, function(process) { - process$interrupt() - process$wait() - }) - - # FIX: press cancel in init phase --> crash - # FIX: press cancel like a morron --> crash - # FIX: press cancel in two following runs --> crash - + task_queue()$interrupt() setup_batch_done(TRUE) cancel_batch_clicked(FALSE) nclicks(0) - session$sendCustomMessage( - type = get_update_field_batch(), - list(message = "Shutdown process") # TODO: clean up - ) send_and_read_info(paste0("release: ", session$token)) return(NULL) } - # check status - # NOTE: errors are not printed otherwise screen is full of errors - temp_status <- character(length(result_val_batch$result)) - for (i in seq_along(temp_status)) { - # TODO: add core and add progress of core - temp_status[i] <- print_status( - result_val_batch$result[[i]]$read_output(), - get_Model() - ) - } - stdout(format_batch_status(stdout(), temp_status)) + # NOTE: check status + # (errors are not printed otherwise screen is full of errors) + stdout(task_queue()$get_status(stdout())) bind <- function(a, b) { if (is.null(a) && is.null(b)) { return("Initialisation") @@ -728,63 +713,33 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { ) }) - plot_data <- reactive({ + get_data <- reactive({ values <- try({ - res <- list() - counter <- 1 - for (i in seq_along(result_val_batch$result)) { - temp <- result_val_batch$result[[i]]$get_result() - if (length(temp) == 1) { - res[[counter]] <- temp[[1]] - counter <- counter + 1 - } else { - for (j in seq_along(temp)) { - res[[counter]] <- temp[[j]] - counter <- counter + 1 - } - } - } - res + task_queue()$seperate_results() }) - - values <- Filter(Negate(is.integer), values) - - if (!all(is.list(values))) { - result_val_batch$result_splitted <- NULL - batch_message("") - print_error("Error in background process") - } else if(length(values) == 0) { - result_val_batch$result_splitted <- NULL + if (inherits(values, "try-error")) { batch_message("") print_error("Error in background process") - } else if (inherits(values, "try-error")) { - result_val_batch$result_splitted <- NULL - batch_message("") - print_error("Error in background process") - } else { - result_val_batch$result_splitted <- seperate_batch_results(values) } - lapply(result_val_batch$result, function(process) { - process$kill() - }) + task_queue()$kill() send_and_read_info(paste0("release: ", session$token)) - result_val_batch$result <- NULL }) # observe results observe({ invalidateLater(invalid_time()) if (batch_process_done() && !batch_results_created()) { - plot_data() + get_data() batch_results_created(TRUE) stdout(NULL) - req(is.list(result_val_batch$result_splitted)) + values <- task_queue()$results + result_batch(values) output$batch_data_plot <- renderPlotly({ entirePlotPlotly( - result_val_batch$result_splitted, - num_rep_batch(), ncols = 4 + values ) }) + task_queue(NULL) } }) @@ -794,13 +749,12 @@ idaServer <- function(id, df_reactive, df_list_reactive, nclicks) { }, content = function(file) { req(batch_results_created()) - req(length(result_val_batch$result_splitted) > 0) - + req(!is.null(result_batch())) + values <- result_batch() download_batch_file( get_Model_capital(), file, - result_val_batch$result_splitted, - num_rep_batch() + values ) } ) diff --git a/tsf/R/IDA_UI.R b/tsf/R/IDA_UI.R index ed41712..157edca 100644 --- a/tsf/R/IDA_UI.R +++ b/tsf/R/IDA_UI.R @@ -37,16 +37,15 @@ idaUI <- function(id) { ), fluidRow( box( - # TODO: change back to 0 - textInput(NS(id, "H0"), "Host conc. [M]", value = 1e-6), - textInput(NS(id, "D0"), "Dye conc. [M]", value = "1e-6"), - textInput(NS(id, "kHD"), HTML("Ka(HD) [1/M]"), value = "3e6"), + textInput(NS(id, "H0"), "Host conc. [M]", value = 0), + textInput(NS(id, "D0"), "Dye conc. [M]", value = "0"), + textInput(NS(id, "kHD"), HTML("Ka(HD) [1/M]"), value = "0"), box( title = "Advanced options", collapsible = TRUE, collapsed = TRUE, box( numericInput(NS(id, "npop"), "Number of particles", value = 40), - numericInput(NS(id, "ngen"), "Number of generations", value = 20) #TODO: change back to 1000; Better error message when invalid ngen is set + numericInput(NS(id, "ngen"), "Number of generations", value = 1000) ), box( selectInput(NS(id, "topology"), "Topology of particle swarm", @@ -168,7 +167,7 @@ idaUI <- function(id) { value = 1 ), numericInput(NS(id, "NumCores"), - min = 1, max = 20, # NOTE: maybe adapt to number of cores availabel + min = 1, max = 20, "How many cores should be used for the batch analysis?", value = 1 ), diff --git a/tsf/R/TaskQueue.R b/tsf/R/TaskQueue.R new file mode 100644 index 0000000..25bf5e9 --- /dev/null +++ b/tsf/R/TaskQueue.R @@ -0,0 +1,220 @@ +DfRepCombi <- R6::R6Class( + "DfRepCombi", + public = list( + df = NULL, + rep = NULL, + initialize = function(message) { + m <- strsplit(message, ";")[[1]] + df_m <- gsub("Dataset = ", "", m[[1]]) |> as.numeric() + rep_m <- gsub("Replicate = ", "", m[[2]]) |> as.numeric() + self$df <- df_m + self$rep <- rep_m + } + ) +) + +Process <- R6::R6Class( + "Process", + public = list( + process = NULL, + idx = 0, + initialize = function(process, idx) { + self$process <- process + self$idx <- idx + }, + get_status = function() { + self$process$get_status() + }, + is_alive = function() { + self$process$is_alive() + }, + get_result = function() { + self$process$get_result() + }, + read_output = function() { + self$process$read_output() + }, + interrupt = function() { + self$process$interrupt() + }, + wait = function() { + self$process$wait() + }, + kill = function() { + self$process$kill() + } + ) +) + +TaskQueue <- R6::R6Class( + "TaskQueue", + public = list( + dfs = NULL, + seeds = NULL, + messages = NULL, + assigned = NULL, + sz = NULL, + df_reps = NULL, + case = NULL, + lb = NULL, + ub = NULL, + ap = NULL, + npop = NULL, + ngen = NULL, + topo = NULL, + et = NULL, + num_cores = NULL, + + processes = list(), + results = NULL, + filled = FALSE, + interrupted_at = NULL, + current_opti = 0, + + initialize = function(case, lb, ub, dfs, + ap, seeds, npop, ngen, + topo, et, + messages, num_cores) { + self$case <- case + self$lb <- lb + self$ub <- ub + self$dfs <- dfs + self$ap <- ap + self$seeds <- seeds + self$npop <- npop + self$ngen <- ngen + self$topo <- topo + self$et <- et + self$messages <- messages + self$sz <- length(dfs) + self$results <- vector("list", self$sz) + self$assigned <- logical(self$sz) + self$df_reps <- lapply(seq_len(self$sz), function(i) { + DfRepCombi$new(messages[[i]]) + }) + self$num_cores <- num_cores + self$processes <- vector("list", num_cores) + self$filled <- TRUE + }, + + set_results = function(process) { + results <- process$get_result() + idx_res <- length(self$results) + 1 + self$results[[idx_res]] <- results + idx <- process$idx + self$results[[idx_res]]$data$repetition <- self$df_reps[[idx]]$rep + self$results[[idx_res]]$data$dataset <- self$df_reps[[idx]]$df + self$results[[idx_res]]$parameter$repetition <- self$df_reps[[idx]]$rep + self$results[[idx_res]]$parameter$dataset <- self$df_reps[[idx]]$df + self$results[[idx_res]]$metrices$repetition <- self$df_reps[[idx]]$rep + self$results[[idx_res]]$metrices$dataset <- self$df_reps[[idx]]$df + }, + + assign_task = function() { + idx <- which(!self$assigned)[1] + if (is.na(idx)) return(NULL) + df <- self$dfs[[idx]] + seed <- self$seeds[idx] + m <- self$messages[idx] + process <- callr::r_bg( + function(case, lb, ub, df, ap, + seed, npop, + ngen, Topology, errorThreshold, messages) { + res <- tsf::opti( + case, lb, ub, df, ap, seed, + npop, ngen, Topology, errorThreshold, messages + ) + return(res) + }, + args = list( + self$case, self$lb, self$ub, df, self$ap, + seed, self$npop, self$ngen, self$topo, self$et, m + ) + ) + self$assigned[idx] <- TRUE + self$current_opti <- self$current_opti + 1 + return(Process$new(process, self$current_opti)) + }, + + assign = function() { + if (any(!self$assigned)) { + for (i in seq_len(self$num_cores)) { + if (inherits(self$processes[[i]], "Process")) { + self$set_results(self$processes[[i]]) + if (self$processes[[i]]$is_alive()) self$processes[[i]]$kill() + } + if (self$queue_empty()) { + next + } + process <- self$assign_task() + self$processes[[i]] <- process + } + self$processes <- Filter(Negate(is.null), self$processes) + } + }, + + check = function() { + for (i in seq_len(length(self$processes))) { + if (self$processes[[i]]$is_alive()) { + return(FALSE) + } + } + return(TRUE) + }, + + get_results = function() { + for (i in seq_len(length(self$processes))) { + if (!self$processes[[i]]$is_alive()) { + self$set_results(self$processes[[i]]) + } + } + }, + + interrupt = function() { + for (i in seq_len(length(self$processes))) { + if (self$processes[[i]]$is_alive()) { + status <- self$processes[[i]]$get_status() + self$processes[[i]]$interrupt() + self$processes[[i]]$wait() + if (status == "running" && self$processes[[i]]$is_alive()) { + self$set_results(self$processes[[i]]$get_result()) + } + self$processes[[i]]$kill() + } + } + self$interrupted_at <- which(!self$assigned)[1] + self$assigned <- TRUE + }, + + get_status = function(stdout) { + status <- character(self$num_cores) + for (i in seq_len(length(self$processes))) { + if (self$processes[[i]]$is_alive()) { + status[i] <- + print_status(self$processes[[i]]$read_output(), self$case) + } + } + return(format_batch_status(stdout, status)) + }, + + kill = function() { + for (i in seq_len(length(self$processes))) { + if (self$processes[[i]]$is_alive()) { + self$processes[[i]]$kill() + } + } + }, + + seperate_results = function() { + self$get_results() + self$results <- Filter(Negate(is.null), self$results) + self$results <- seperate_batch_results(self$results) + return(self$results) + }, + + queue_empty = function() { + all(self$assigned) && + all(sapply(self$processes, function(p) !p$is_alive())) + } + ) +) diff --git a/tsf/R/Utils_App.R b/tsf/R/Utils_App.R index c52eb97..19a05f9 100644 --- a/tsf/R/Utils_App.R +++ b/tsf/R/Utils_App.R @@ -411,15 +411,8 @@ download_csv <- function(model, file, result_val) { # download batch file # ======================================================================================== -create_df_for_batch <- function(list, what, num_rep) { # TODO: use this fct also in the plotting fcts of Batch.R +create_df_for_batch <- function(list, what) { list <- list[[what]] - num_data_sets <- length(list) / num_rep - repetitions <- (seq_len(length(list)) - 1) %% num_rep + 1 - data_sets <- rep(1:num_data_sets, each = num_rep) - for (i in seq_along(list)) { - list[[i]]$dataset <- data_sets[i] - list[[i]]$repetition <- repetitions[i] - } df <- Reduce(rbind, list) return(df) } @@ -445,7 +438,7 @@ adjust_theme <- function(p) { return(p) } -download_batch_file <- function(model, file, result_val, num_rep) { +download_batch_file <- function(model, file, result_val) { wb <- openxlsx::createWorkbook() addWorksheet(wb, "Results") writeData(wb, "Results", @@ -455,11 +448,11 @@ download_batch_file <- function(model, file, result_val, num_rep) { ) curr_row <- 3 - data_trajectories <- create_df_for_batch(result_val, "states", num_rep) + data_trajectories <- create_df_for_batch(result_val, "states") writeData(wb, "Results", data_trajectories, startRow = curr_row) curr_row <- curr_row + dim(data_trajectories)[1] + 5 - parameter <- create_df_for_batch(result_val, "params", num_rep) + parameter <- create_df_for_batch(result_val, "params") writeData(wb, "Results", parameter, startRow = curr_row) curr_row <- curr_row + dim(parameter)[1] + 5 @@ -471,11 +464,11 @@ download_batch_file <- function(model, file, result_val, num_rep) { writeData(wb, "Results", boundaries, startRow = curr_row) curr_row <- curr_row + dim(boundaries)[1] + 5 - metrices <- create_df_for_batch(result_val, "metrices", num_rep) + metrices <- create_df_for_batch(result_val, "metrices") writeData(wb, "Results", metrices, startRow = curr_row) curr_row <- curr_row + dim(metrices)[1] + 5 - p1 <- plotStates(result_val, num_rep) + p1 <- plotStates(result_val) temp_files_p1 <- lapply(seq_len(length(p1)), function(x) { tempfile(fileext = ".png") }) @@ -492,8 +485,8 @@ download_batch_file <- function(model, file, result_val, num_rep) { curr_row <- curr_row + 20 } - p2 <- plotParams(result_val, num_rep, 4) - p3 <- plotMetrices(result_val, num_rep, 4) + p2 <- plotParams(result_val) + p3 <- plotMetrices(result_val) tempfile_plot2 <- tempfile(fileext = ".png") ggsave(tempfile_plot2,