Skip to content

Commit

Permalink
added first draft of communication between background go process and …
Browse files Browse the repository at this point in the history
…shiny app
  • Loading branch information
Konrad1991 committed Aug 16, 2024
1 parent a580f59 commit c38de59
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 11 deletions.
5 changes: 5 additions & 0 deletions ManageCores/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module ManageCores

go 1.23.0

require github.com/magiconair/properties v1.8.7
2 changes: 2 additions & 0 deletions ManageCores/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY=
github.com/magiconair/properties v1.8.7/go.mod h1:Dhd985XPs7jluiymwWYZ0G4Z61jb3vdS329zhj2hYo0=
Empty file added ManageCores/log.txt
Empty file.
150 changes: 150 additions & 0 deletions ManageCores/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package main

import (
"bufio"
"errors"
"fmt"
"log"
"net"
"os"
"runtime"
"strconv"
"strings"
"sync"
)

// Session list
type Session struct {
id string
cores int
}

// Status
const (
OK = "OK"
TOOMANYCORES = "Exceeded core limit"
CORESALLOCATED = "Cores allocated"
NOTENOUGHCORES = "Not enough cores"
)

// Core list
var totalCores = runtime.NumCPU()
var usedCores = 0

// Shiny-Session list
var shinySessions = make(map[string]Session)
var mutex = &sync.Mutex{}

// Request cores
func requestCores(session *Session) string {
mutex.Lock()
defer mutex.Unlock()
if session.cores > 0 && session.cores <= (totalCores-usedCores) {
usedCores += session.cores
return CORESALLOCATED
} else if session.cores > totalCores {
return TOOMANYCORES
} else {
return NOTENOUGHCORES
}
}

// Release cores
func releaseCores(session *Session) {
mutex.Lock()
defer mutex.Unlock()
usedCores -= session.cores
session.cores = 0
}

// Handle info from shiny
func handleInfo(info string) (string, error) {
status := OK
split := strings.Split(info, ":")
if (len(split) != 2) && (len(split) != 3) {
return status, errors.New("Invalid input from shiny")
}
if split[0] == "add" {
mutex.Lock()
shinySessions[split[1]] = Session{split[1], 0}
mutex.Unlock()
} else if split[0] == "remove" {
mutex.Lock()
session := shinySessions[split[1]]
releaseCores(&session)
delete(shinySessions, split[1])
mutex.Unlock()
} else if split[0] == "request" {
s := strings.Replace(split[2], "\n", "", -1)
cores, err := strconv.Atoi(s)
if err != nil {
return status, err
}
mutex.Lock()
session := shinySessions[split[1]]
session.cores = cores
shinySessions[split[1]] = session
mutex.Unlock()
status = requestCores(&session)
} else if split[0] == "release" {
mutex.Lock()
session := shinySessions[split[1]]
releaseCores(&session)
shinySessions[split[1]] = session
mutex.Unlock()
}
return status, nil
}

// Read info from shiny app
func readRInfo(conn net.Conn, errCh chan<- error) string {
reader := bufio.NewReader(conn)
message, _ := reader.ReadString('\n')
status, err := handleInfo(message)
if err != nil {
errCh <- err
}
return status
}

// Send info to shiny app
func sendRInfo(conn net.Conn, info string) error {
_, err := fmt.Fprintln(conn, info)
return err
}

func runTCP(errCh chan<- error) {
ln, err := net.Listen("tcp", ":8080")
if err != nil {
errCh <- err
}
defer ln.Close()

for {
conn, err := ln.Accept()
if err != nil {
errCh <- err
}
status := readRInfo(conn, errCh)
sendRInfo(conn, status)
conn.Close()
}
}

func main() {
errCh := make(chan error)
logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
if err != nil {
log.Fatal("Failed to open log file:", err)
}
log.SetOutput(logFile)

go func() {
for err := range errCh {
if err != nil {
log.Println("Error: ", err)
}
}
}()
runTCP(errCh)
}
26 changes: 15 additions & 11 deletions tsf/R/IDA_Server.R
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,

# Batch analysis
# ===============================================================================
# TODO: remove this helper fct
destroy_files <- reactive({
l <- com_batch$list
if (length(l) >= 1) {
Expand All @@ -359,7 +360,6 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
return(NULL)
}
session$sendCustomMessage(type = "IDAclearFieldBatch", list(message = NULL))
nclicks(nclicks() + 1)
result_val(data.frame(Status = "Running..."))
session$sendCustomMessage(type = "IDAclearFieldBatch", list(message = NULL, arg = 1))
# check input
Expand Down Expand Up @@ -391,6 +391,7 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
Only for the first analysis of each dataset respectivly,
the seed which will be used.")
}
# FIX: seed stuff is currently broken. When a seed is set app crushes
if (is.na(seed)) {
seed_case <- 1
} else {
Expand Down Expand Up @@ -420,13 +421,18 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
})

size <- length(df_list) * num_rep
# TODO: handle load on server
if (size > 10) {
print_noti("The number of replications is too high.
Please reduce the number of replications", type = "error")
nclicks(0)
return(NULL)
status <- send_and_read_info(paste0("request: ", session$token, " :", size))
if (status == "Exceeded core limit") {
rwn(
FALSE,
"Exceed core limit. Please reduce the number of datasets or replications."
)
}
rwn(
status == "Cores allocated",
"Could not allocate cores. Please try again later"
)
nclicks(nclicks() + 1)
process_list <- vector("list", size)
seeds <- numeric(size)
seeds_from <- 1:1e6
Expand Down Expand Up @@ -457,11 +463,7 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
}

result_val_batch$result <- process_list

# TODO: how to handle this?
# nclicks(0)
batch_done(TRUE)

NULL
})

Expand Down Expand Up @@ -492,6 +494,7 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
type = "IDAupdateFieldBatch",
list(message = "")
)
send_and_read_info(paste0("release: ", session$token))
return(NULL)
}
# check status
Expand Down Expand Up @@ -542,6 +545,7 @@ idaServer <- function(id, df, df_list, com, com_sense, com_batch,
lapply(result_val_batch$result, function(process) {
process$kill()
})
send_and_read_info(paste0("release: ", session$token))
result_val_batch$result <- NULL
})

Expand Down
23 changes: 23 additions & 0 deletions tsf/R/Utils_App.R
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
# Send information to golang
# ========================================================================================
send_and_read_info <- function(message) {
if (length(message) == 0) {
return()
}
con <- socketConnection(
host = "localhost",
port = 8080, blocking = TRUE,
server = FALSE,
open = "w+b"
)
bind <- function(a, b) {
paste(a, b, collapse = " , ")
}
m <- Reduce(bind, as.character(message))
writeLines(m, con)
response <- readLines(con, warn = FALSE)
close(con)
return(response)
}


format_scientific <- function(x) {
formatC(x, format = "e", digits = 3)
}
Expand Down
10 changes: 10 additions & 0 deletions tsf/R/server.R
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
server <- function(input, output, session) {
isolate({
send_and_read_info(paste0("add:", session$token))
})

onSessionEnded(function() {
isolate({
send_and_read_info(paste0("remove:", session$token))
})
})

# data import
# ============================================================================
data <- reactiveValues(df = NULL)
Expand Down

0 comments on commit c38de59

Please sign in to comment.