From df422e325371510c7621fe9142585dc04250154a Mon Sep 17 00:00:00 2001 From: Antonio Nuno Monteiro Date: Sat, 2 Mar 2024 00:00:06 -0800 Subject: [PATCH] feat: shutdown in reverse dependency order address code review fix: unused variable refactor: smaller functions test: shutdown in order Add shutdown order test fix: tests --- .../process-compose-shutdown-inorder.yaml | 29 ++++ src/app/project_opts.go | 18 +- src/app/project_runner.go | 155 ++++++++++++++---- src/app/system_test.go | 95 +++++++++++ src/cmd/project_runner.go | 1 + src/cmd/root.go | 1 + src/config/Flags.go | 66 ++++---- 7 files changed, 293 insertions(+), 72 deletions(-) create mode 100644 fixtures-code/process-compose-shutdown-inorder.yaml diff --git a/fixtures-code/process-compose-shutdown-inorder.yaml b/fixtures-code/process-compose-shutdown-inorder.yaml new file mode 100644 index 0000000..4c3d4cf --- /dev/null +++ b/fixtures-code/process-compose-shutdown-inorder.yaml @@ -0,0 +1,29 @@ +version: "0.5" + +log_level: debug +log_length: 1000 + +processes: + procA: + command: | + trap 'echo "A: exit"' SIGTERM + echo "A: starting" + sleep 3 + + procB: + command: | + trap 'echo "B: exit"' SIGTERM + echo "B: starting" + sleep 3 + depends_on: + procA: + condition: process_started + + procC: + command: | + trap 'echo "C: exit"' SIGTERM + echo "C: starting" + sleep 3 + depends_on: + procB: + condition: process_started diff --git a/src/app/project_opts.go b/src/app/project_opts.go index 340ff67..4315b8e 100644 --- a/src/app/project_opts.go +++ b/src/app/project_opts.go @@ -3,12 +3,13 @@ package app import "github.com/f1bonacc1/process-compose/src/types" type ProjectOpts struct { - project *types.Project - processesToRun []string - noDeps bool - mainProcess string - mainProcessArgs []string - isTuiOn bool + project *types.Project + processesToRun []string + noDeps bool + mainProcess string + mainProcessArgs []string + isTuiOn bool + isOrderedShutDown bool } func (p *ProjectOpts) WithProject(project *types.Project) *ProjectOpts { @@ -39,3 +40,8 @@ func (p *ProjectOpts) WithIsTuiOn(isTuiOn bool) *ProjectOpts { p.isTuiOn = isTuiOn return p } + +func (p *ProjectOpts) WithOrderedShutDown(isOrderedShutDown bool) *ProjectOpts { + p.isOrderedShutDown = isOrderedShutDown + return p +} diff --git a/src/app/project_runner.go b/src/app/project_runner.go index b0dc9b5..8b28262 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -8,6 +8,7 @@ import ( "os" "os/user" "runtime" + "slices" "sync" "time" @@ -15,21 +16,22 @@ import ( ) type ProjectRunner struct { - procConfMutex sync.Mutex - project *types.Project - logsMutex sync.Mutex - processLogs map[string]*pclog.ProcessLogBuffer - statesMutex sync.Mutex - processStates map[string]*types.ProcessState - runProcMutex sync.Mutex - runningProcesses map[string]*Process - logger pclog.PcLogger - waitGroup sync.WaitGroup - exitCode int - projectState *types.ProjectState - mainProcess string - mainProcessArgs []string - isTuiOn bool + procConfMutex sync.Mutex + project *types.Project + logsMutex sync.Mutex + processLogs map[string]*pclog.ProcessLogBuffer + statesMutex sync.Mutex + processStates map[string]*types.ProcessState + runProcMutex sync.Mutex + runningProcesses map[string]*Process + logger pclog.PcLogger + waitGroup sync.WaitGroup + exitCode int + projectState *types.ProjectState + mainProcess string + mainProcessArgs []string + isTuiOn bool + isOrderedShutDown bool } func (p *ProjectRunner) GetLexicographicProcessNames() ([]string, error) { @@ -322,27 +324,111 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error return ports, nil } +func (p *ProjectRunner) runningProcessesReverseDependencies() map[string]map[string]*Process { + reverseDependencies := make(map[string]map[string]*Process) + + // `p.runProcMutex` lock is assumed to have been acquired when calling + // this function. It is currently called by `ShutDownProject()`. + for _, process := range p.runningProcesses { + for k := range process.procConf.DependsOn { + if runningProc, ok := p.runningProcesses[k]; ok { + if _, ok := reverseDependencies[runningProc.getName()]; !ok { + dep := make(map[string]*Process) + dep[process.getName()] = process + reverseDependencies[runningProc.getName()] = dep + } + } else { + continue + } + } + } + + return reverseDependencies +} + +func (p *ProjectRunner) shutDownInOrder(wg *sync.WaitGroup, shutdownOrder []*Process) { + reverseDependencies := p.runningProcessesReverseDependencies() + for _, process := range shutdownOrder { + wg.Add(1) + go func(proc *Process) { + defer wg.Done() + waitForDepsWg := sync.WaitGroup{} + if revDeps, ok := reverseDependencies[proc.getName()]; ok { + for _, runningProc := range revDeps { + waitForDepsWg.Add(1) + go func(pr *Process) { + pr.waitForCompletion() + waitForDepsWg.Done() + }(runningProc) + } + } + waitForDepsWg.Wait() + log.Debug().Msgf("[%s]: waited for all dependencies to shut down", proc.getName()) + + err := proc.shutDown() + if err != nil { + log.Err(err).Msgf("failed to shutdown %s", proc.getName()) + return + } + proc.waitForCompletion() + }(process) + } +} + +func (p *ProjectRunner) shutDownAndWait(shutdownOrder []*Process) { + wg := sync.WaitGroup{} + if p.isOrderedShutDown { + p.shutDownInOrder(&wg, shutdownOrder) + } else { + for _, proc := range shutdownOrder { + err := proc.shutDown() + if err != nil { + log.Err(err).Msgf("failed to shutdown %s", proc.getName()) + continue + } + wg.Add(1) + go func(pr *Process) { + pr.waitForCompletion() + wg.Done() + }(proc) + } + } + + wg.Wait() +} + func (p *ProjectRunner) ShutDownProject() error { p.runProcMutex.Lock() defer p.runProcMutex.Unlock() - runProc := p.runningProcesses - for _, proc := range runProc { - proc.prepareForShutDown() - } - wg := sync.WaitGroup{} - for _, proc := range runProc { - err := proc.shutDown() + + shutdownOrder := []*Process{} + if p.isOrderedShutDown { + err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error { + if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok { + shutdownOrder = append(shutdownOrder, runningProc) + } + return nil + }) if err != nil { - log.Err(err).Msgf("failed to shutdown %s", proc.getName()) - continue + log.Error().Msgf("Failed to build project run order: %s", err.Error()) + } + slices.Reverse(shutdownOrder) + } else { + for _, proc := range p.runningProcesses { + shutdownOrder = append(shutdownOrder, proc) } - wg.Add(1) - go func(pr *Process) { - pr.waitForCompletion() - wg.Done() - }(proc) } - wg.Wait() + + var nameOrder []string + for _, v := range shutdownOrder { + nameOrder = append(nameOrder, v.getName()) + } + log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder) + for _, proc := range shutdownOrder { + proc.prepareForShutDown() + } + + p.shutDownAndWait(shutdownOrder) return nil } @@ -644,10 +730,11 @@ func NewProjectRunner(opts *ProjectOpts) (*ProjectRunner, error) { username = current.Username } runner := &ProjectRunner{ - project: opts.project, - mainProcess: opts.mainProcess, - mainProcessArgs: opts.mainProcessArgs, - isTuiOn: opts.isTuiOn, + project: opts.project, + mainProcess: opts.mainProcess, + mainProcessArgs: opts.mainProcessArgs, + isTuiOn: opts.isTuiOn, + isOrderedShutDown: opts.isOrderedShutDown, projectState: &types.ProjectState{ FileNames: opts.project.FileNames, StartTime: time.Now(), diff --git a/src/app/system_test.go b/src/app/system_test.go index b75e8f3..4ad83ed 100644 --- a/src/app/system_test.go +++ b/src/app/system_test.go @@ -1,10 +1,14 @@ package app import ( + "bufio" "github.com/f1bonacc1/process-compose/src/loader" + "github.com/f1bonacc1/process-compose/src/types" "os" "path/filepath" "reflect" + "slices" + "strings" "testing" "time" ) @@ -406,3 +410,94 @@ func TestSystem_TestProcListToRun(t *testing.T) { } }) } + +func TestSystem_TestProcListShutsDownInOrder(t *testing.T) { + fixture1 := filepath.Join("..", "..", "fixtures-code", "process-compose-shutdown-inorder.yaml") + t.Run("Single Proc with deps", func(t *testing.T) { + + project, err := loader.Load(&loader.LoaderOptions{ + FileNames: []string{fixture1}, + }) + if err != nil { + t.Errorf(err.Error()) + return + } + numProc := len(project.Processes) + runner, err := NewProjectRunner(&ProjectOpts{ + project: project, + processesToRun: []string{}, + mainProcessArgs: []string{}, + isOrderedShutDown: true, + }) + if err != nil { + t.Errorf(err.Error()) + return + } + if len(runner.project.Processes) != numProc { + t.Errorf("should have %d processes", numProc) + } + for name, proc := range runner.project.Processes { + if proc.Disabled { + t.Errorf("process %s is disabled", name) + } + } + file, err := os.CreateTemp("/tmp", "pc_log.*.log") + defer os.Remove(file.Name()) + project.LogLocation = file.Name() + project.LoggerConfig = &types.LoggerConfig{ + FieldsOrder: []string{"message"}, + DisableJSON: true, + TimestampFormat: "", + NoMetadata: true, + FlushEachLine: true, + NoColor: true, + } + go runner.Run() + time.Sleep(10 * time.Millisecond) + states, err := runner.GetProcessesState() + if err != nil { + t.Errorf(err.Error()) + return + } + want := 3 + if len(states.States) != want { + t.Errorf("len(states.States) = %d, want %d", len(states.States), want) + } + + time.Sleep(10 * time.Millisecond) + err = runner.ShutDownProject() + if err != nil { + t.Errorf(err.Error()) + return + } + states, err = runner.GetProcessesState() + if err != nil { + t.Errorf(err.Error()) + return + } + runningProcesses := 0 + for _, processState := range states.States { + if processState.IsRunning { + runningProcesses++ + } + } + want = 0 + if runningProcesses != want { + t.Errorf("runningProcesses = %d, want %d", runningProcesses, want) + } + //read file and validate the shutdown order + scanner := bufio.NewScanner(file) + order := make([]string, 0) + for scanner.Scan() { + line := scanner.Text() + if strings.Contains(line, "exit") { + order = append(order, line) + } + } + wantOrder := []string{"C: exit", "B: exit", "A: exit"} + if !slices.Equal(order, wantOrder) { + t.Errorf("content = %v, want %v", order, wantOrder) + return + } + }) +} diff --git a/src/cmd/project_runner.go b/src/cmd/project_runner.go index 60160c3..bdcf27e 100644 --- a/src/cmd/project_runner.go +++ b/src/cmd/project_runner.go @@ -29,6 +29,7 @@ func getProjectRunner(process []string, noDeps bool, mainProcess string, mainPro WithMainProcessArgs(mainProcessArgs). WithProject(project). WithProcessesToRun(process). + WithOrderedShutDown(*pcFlags.IsOrderedShutDown). WithNoDeps(noDeps), ) if err != nil { diff --git a/src/cmd/root.go b/src/cmd/root.go index c16a220..2a3e62e 100644 --- a/src/cmd/root.go +++ b/src/cmd/root.go @@ -63,6 +63,7 @@ func init() { rootCmd.Flags().BoolVarP(pcFlags.Headless, "tui", "t", *pcFlags.Headless, "enable TUI (-t=false) (env: "+config.EnvVarNameTui+")") rootCmd.PersistentFlags().BoolVar(pcFlags.KeepTuiOn, "keep-tui", *pcFlags.KeepTuiOn, "keep TUI running even after all processes exit") rootCmd.PersistentFlags().BoolVar(pcFlags.NoServer, "no-server", *pcFlags.NoServer, "disable HTTP server (env: "+config.EnvVarNameNoServer+")") + rootCmd.PersistentFlags().BoolVar(pcFlags.IsOrderedShutDown, "ordered-shutdown", *pcFlags.IsOrderedShutDown, "shut down processes in reverse dependency order") rootCmd.Flags().BoolVarP(pcFlags.HideDisabled, "hide-disabled", "d", *pcFlags.HideDisabled, "hide disabled processes") rootCmd.Flags().IntVarP(pcFlags.RefreshRate, "ref-rate", "r", *pcFlags.RefreshRate, "TUI refresh rate in seconds") rootCmd.PersistentFlags().IntVarP(pcFlags.PortNum, "port", "p", *pcFlags.PortNum, "port number (env: "+config.EnvVarNamePort+")") diff --git a/src/config/Flags.go b/src/config/Flags.go index d00fa2a..7c379b7 100644 --- a/src/config/Flags.go +++ b/src/config/Flags.go @@ -33,43 +33,45 @@ const ( // Flags represents PC configuration flags. type Flags struct { - RefreshRate *int - PortNum *int - Address *string - LogLevel *string - LogFile *string - LogLength *int - LogFollow *bool - LogTailLength *int - Headless *bool - Command *string - Write *bool - NoDependencies *bool - HideDisabled *bool - SortColumn *string - IsReverseSort *bool - NoServer *bool - KeepTuiOn *bool + RefreshRate *int + PortNum *int + Address *string + LogLevel *string + LogFile *string + LogLength *int + LogFollow *bool + LogTailLength *int + Headless *bool + Command *string + Write *bool + NoDependencies *bool + HideDisabled *bool + SortColumn *string + IsReverseSort *bool + NoServer *bool + KeepTuiOn *bool + IsOrderedShutDown *bool } // NewFlags returns new configuration flags. func NewFlags() *Flags { return &Flags{ - RefreshRate: toPtr(DefaultRefreshRate), - Headless: toPtr(getTuiDefault()), - PortNum: toPtr(getPortDefault()), - Address: toPtr(DefaultAddress), - LogLength: toPtr(DefaultLogLength), - LogLevel: toPtr(DefaultLogLevel), - LogFile: toPtr(GetLogFilePath()), - LogFollow: toPtr(false), - LogTailLength: toPtr(math.MaxInt), - NoDependencies: toPtr(false), - HideDisabled: toPtr(false), - SortColumn: toPtr(DefaultSortColumn), - IsReverseSort: toPtr(false), - NoServer: toPtr(getNoServerDefault()), - KeepTuiOn: toPtr(false), + RefreshRate: toPtr(DefaultRefreshRate), + Headless: toPtr(getTuiDefault()), + PortNum: toPtr(getPortDefault()), + Address: toPtr(DefaultAddress), + LogLength: toPtr(DefaultLogLength), + LogLevel: toPtr(DefaultLogLevel), + LogFile: toPtr(GetLogFilePath()), + LogFollow: toPtr(false), + LogTailLength: toPtr(math.MaxInt), + NoDependencies: toPtr(false), + HideDisabled: toPtr(false), + SortColumn: toPtr(DefaultSortColumn), + IsReverseSort: toPtr(false), + NoServer: toPtr(getNoServerDefault()), + KeepTuiOn: toPtr(false), + IsOrderedShutDown: toPtr(false), } }