Skip to content

Commit

Permalink
Process manager mode
Browse files Browse the repository at this point in the history
To allow config reload and binary upgrade
  • Loading branch information
setaou committed Aug 30, 2023
1 parent c0d42f7 commit f02663c
Show file tree
Hide file tree
Showing 2 changed files with 206 additions and 100 deletions.
188 changes: 88 additions & 100 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"flag"
"fmt"
"mlb/backend"
"mlb/backends_inventory"
"mlb/backends_processor"
Expand All @@ -17,7 +16,6 @@ import (
"net/http"
"os"
"os/signal"
"slices"
"sync"
"syscall"
"time"
Expand All @@ -29,13 +27,11 @@ import (

// Main
func main() {
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

// Parse CLI args
arg_config := flag.String("config", "config.hcl", "config file")
arg_kill := flag.Int("kill", 0, "Kill process PID")
arg_debug := flag.Bool("debug", false, "sets log level to debug")
arg_process_manager := flag.Bool("process-manager", false, "enable process manager mode")
arg_notify_parent := flag.Bool("notify-parent", false, "send SIGUSR1 to parent once everything is running")
flag.Parse()

// Setup logger
Expand All @@ -45,116 +41,108 @@ func main() {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}

// Parse conf
conf, diags := config.LoadConfig(*arg_config)
if diags.HasErrors() {
os.Exit(1)
}

// Adjust max allowed file descriptors
if conf.System.RLimit.NOFile > 0 {
system.SetRlimitNOFILE(conf.System.RLimit.NOFile)
// CLI args validation
if *arg_process_manager && *arg_notify_parent {
log.Fatal().Msg("Parameters process-manager and notify-parent are mutually exclusives")
}

// Start serious business
backendUpdatesProviders := make(map[string]backend.BackendUpdateProvider, 0)
backendUpdateSubscribers := make(map[string]backend.BackendUpdateSubscriber, 0)
backendProviders := make(map[string]backend.BackendProvider, 0)
backendListProviders := make(map[string]backend.BackendListProvider, 0)

for _, tc := range conf.BackendsInventoryList {
i := backends_inventory.New(tc, &wg, ctx)
id := i.(misc.GetIDInterface).GetID()
backendUpdatesProviders[id] = i.(backend.BackendUpdateProvider)
backendListProviders[id] = i.(backend.BackendListProvider)
}
if *arg_process_manager { // Process manager mode
processManager()

for _, tc := range conf.BackendsProcessorList {
f := backends_processor.New(tc, &wg, ctx)
id := f.(misc.GetIDInterface).GetID()
backendUpdatesProviders[id] = f.(backend.BackendUpdateProvider)
backendListProviders[id] = f.(backend.BackendListProvider)
backendUpdateSubscribers[id] = f.(backend.BackendUpdateSubscriber)
}
} else { // Normal mode
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())

for _, tc := range conf.BalancerList {
b := balancer.New(tc, &wg, ctx)
id := b.(misc.GetIDInterface).GetID()
backendProviders[id] = b.(backend.BackendProvider)
backendListProviders[id] = b.(backend.BackendListProvider)
backendUpdateSubscribers[id] = b.(backend.BackendUpdateSubscriber)
}
// Parse conf
conf, diags := config.LoadConfig(*arg_config)
if diags.HasErrors() {
os.Exit(1)
}

for _, c := range conf.ProxyList {
proxy.New(c, backendProviders, &wg, ctx)
}
// Adjust max allowed file descriptors
if conf.System.RLimit.NOFile > 0 {
system.SetRlimitNOFILE(conf.System.RLimit.NOFile)
}

// Plug update subscribers to providers
for _, bus := range backendUpdateSubscribers {
source := bus.GetUpdateSource()
provider, ok := backendUpdatesProviders[source]
if !ok {
log.Panic().Str("subscriber", bus.(misc.GetIDInterface).GetID()).Str("provider", source).Msg("Backend update provider not found !")
// Start serious business
backendUpdatesProviders := make(map[string]backend.BackendUpdateProvider, 0)
backendUpdateSubscribers := make(map[string]backend.BackendUpdateSubscriber, 0)
backendProviders := make(map[string]backend.BackendProvider, 0)
backendListProviders := make(map[string]backend.BackendListProvider, 0)

for _, tc := range conf.BackendsInventoryList {
i := backends_inventory.New(tc, &wg, ctx)
id := i.(misc.GetIDInterface).GetID()
backendUpdatesProviders[id] = i.(backend.BackendUpdateProvider)
backendListProviders[id] = i.(backend.BackendListProvider)
}
bus.SubscribeTo(provider)
}

// HTTP Metrics
http.HandleFunc("/backends", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
backendsByProvider := make(map[string]backend.BackendsList, len(backendListProviders))
for id := range backendListProviders {
backendsByProvider[id] = backendListProviders[id].GetBackendList()
for _, tc := range conf.BackendsProcessorList {
f := backends_processor.New(tc, &wg, ctx)
id := f.(misc.GetIDInterface).GetID()
backendUpdatesProviders[id] = f.(backend.BackendUpdateProvider)
backendListProviders[id] = f.(backend.BackendListProvider)
backendUpdateSubscribers[id] = f.(backend.BackendUpdateSubscriber)
}
out, _ := json.Marshal(backendsByProvider)
w.Write(out)
})
http.Handle("/metrics", metrics.HttpLogWrapper(promhttp.Handler()))

metrics.NewHTTPServer(conf.Metrics.Address, &wg, ctx)

// Termination signals
chan_signals := make(chan os.Signal, 1)
signal.Notify(chan_signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGUSR1)
go func() {
for {
switch <-chan_signals {
case syscall.SIGINT, syscall.SIGTERM:
log.Info().Msg("Termination signal received")
cancel()

case syscall.SIGUSR1:
log.Info().Msg("Restart signal received")

procAttr := os.ProcAttr{
Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
}

// Ensure the children has the kill switch with current PID
var args = make([]string, len(os.Args))
copy(args, os.Args)
if i := slices.Index(args, "--kill"); i >= 0 { // Update the PID if the switch was present
args[i+1] = fmt.Sprintf("%d", os.Getpid())
} else { // Add the switch if it was not present
args = append(args, "--kill", fmt.Sprintf("%d", os.Getpid()))
}
for _, tc := range conf.BalancerList {
b := balancer.New(tc, &wg, ctx)
id := b.(misc.GetIDInterface).GetID()
backendProviders[id] = b.(backend.BackendProvider)
backendListProviders[id] = b.(backend.BackendListProvider)
backendUpdateSubscribers[id] = b.(backend.BackendUpdateSubscriber)
}

_, err := os.StartProcess(args[0], args, &procAttr)
for _, c := range conf.ProxyList {
proxy.New(c, backendProviders, &wg, ctx)
}

if err != nil {
log.Error().Err(err).Msg("Error while starting the new process")
}
// Plug update subscribers to providers
for _, bus := range backendUpdateSubscribers {
source := bus.GetUpdateSource()
provider, ok := backendUpdatesProviders[source]
if !ok {
log.Panic().Str("subscriber", bus.(misc.GetIDInterface).GetID()).Str("provider", source).Msg("Backend update provider not found !")
}
bus.SubscribeTo(provider)
}
}()

// If we have the kill switch, kill the given PID after a short delay
if *arg_kill != 0 {
// HTTP Metrics
http.HandleFunc("/backends", func(w http.ResponseWriter, r *http.Request) {
w.Header().Add("Content-Type", "application/json")
backendsByProvider := make(map[string]backend.BackendsList, len(backendListProviders))
for id := range backendListProviders {
backendsByProvider[id] = backendListProviders[id].GetBackendList()
}
out, _ := json.Marshal(backendsByProvider)
w.Write(out)
})
http.Handle("/metrics", metrics.HttpLogWrapper(promhttp.Handler()))

metrics.NewHTTPServer(conf.Metrics.Address, &wg, ctx)

// Termination signals
chan_signals := make(chan os.Signal, 1)
signal.Notify(chan_signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
time.Sleep(5 * time.Second)
syscall.Kill(*arg_kill, syscall.SIGTERM)
for {
switch <-chan_signals {
case syscall.SIGINT, syscall.SIGTERM:
log.Info().Msg("Termination signal received")
cancel()
}
}
}()
}

wg.Wait()
// If requested, once everything is loaded, notify parent
if *arg_notify_parent {
go func() {
// Add a small delay to ensure modules are all started
time.Sleep(5 * time.Second)
syscall.Kill(syscall.Getppid(), syscall.SIGUSR1)
}()
}

wg.Wait()
}
}
118 changes: 118 additions & 0 deletions process_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package main

import (
"os"
"os/signal"
"syscall"

"github.com/rs/zerolog/log"
)

type Process struct {
Process *os.Process
FinalState *os.ProcessState
}

func startProcess(chan_process chan *Process) (*Process, error) {
log.Info().Msg("Starting new process")

procAttr := os.ProcAttr{
Files: []*os.File{os.Stdin, os.Stdout, os.Stderr},
}

// Ensure the children does not have the --process-manager switch
var args = make([]string, 0, len(os.Args))
for _, v := range os.Args {
if v == "--process-manager" || v == "-process-manager" {
continue
}
args = append(args, v)
}
args = append(args, "--notify-parent")

p := &Process{}

proc, err := os.StartProcess(args[0], args, &procAttr)
if err != nil {
log.Error().Err(err).Msg("Error while starting new process")
return p, err
}
p.Process = proc

go func() {
s, _ := proc.Wait()
p.FinalState = s
chan_process <- p
}()

return p, err
}

func processManager() {
processes := map[*Process]*Process{}
chan_process := make(chan *Process)
var starting *Process

// Start first worker process
proc, err := startProcess(chan_process)
if err != nil {
log.Panic().Err(err).Msg("Unable to start the worker process")
}
processes[proc] = proc
starting = proc

// Signals
chan_signals := make(chan os.Signal, 1)
signal.Notify(chan_signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGUSR1)

process_manager_loop:
for {
select {
case s := <-chan_signals:
switch s {
case syscall.SIGINT, syscall.SIGTERM:
log.Info().Msg("Termination signal received, forwarding to worker processes")
for _, p := range processes {
p.Process.Signal(syscall.SIGTERM)
}

case syscall.SIGHUP:
if starting != nil {
log.Warn().Msg("Restart signal received but a restart is already ongoing")

} else {
log.Info().Msg("Restart signal received, starting new worker process")
proc, err := startProcess(chan_process)
if err != nil {
log.Panic().Err(err).Msg("Unable to start the new worker process")
}
processes[proc] = proc
starting = proc
}

case syscall.SIGUSR1:
log.Info().Msg("New worker successfully started")
for _, p := range processes {
if p != starting {
p.Process.Signal(syscall.SIGTERM)
}
}
starting = nil
}
case p := <-chan_process:
if p == starting {
log.Error().Int("worker_pid", p.Process.Pid).Int("exit_code", p.FinalState.ExitCode()).Msg("New worker process exited unexpectedly")
starting = nil
} else {
log.Info().Int("worker_pid", p.Process.Pid).Int("exit_code", p.FinalState.ExitCode()).Msg("Worker process exited")
}

delete(processes, p)

if len(processes) == 0 {
log.Info().Msg("All worker processes have ended")
break process_manager_loop
}
}
}
}

0 comments on commit f02663c

Please sign in to comment.