Skip to content

Commit

Permalink
feat: shutdown in reverse dependency order
Browse files Browse the repository at this point in the history
  • Loading branch information
anmonteiro committed Mar 2, 2024
1 parent b7989c7 commit 1b97fe4
Showing 1 changed file with 76 additions and 13 deletions.
89 changes: 76 additions & 13 deletions src/app/project_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/user"
"runtime"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -322,25 +323,87 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error
return ports, nil
}

func (p *ProjectRunner) runningProcessesReverseDependencies() (map[string]map[string]struct{}, error) {
reverseDependencies := make(map[string]map[string]struct{})

for _, process := range p.runningProcesses {

for k := range process.procConf.DependsOn {
if runningProc, ok := p.runningProcesses[k]; ok {
if _, ok := reverseDependencies[runningProc.getName()]; !ok {
dep := map[string]struct{}{}
dep[process.getName()] = struct{}{}
reverseDependencies[runningProc.getName()] = dep
}
} else {
log.Error().Msgf("Error: process %s depends on %s, but it isn't running", process.getName(), k)
return nil, fmt.Errorf("process %s depends on %s, but it isn't running", process.getName(), k)
}
}
}

return reverseDependencies, nil
}

func (p *ProjectRunner) ShutDownProject() error {
p.runProcMutex.Lock()
defer p.runProcMutex.Unlock()
runProc := p.runningProcesses
for _, proc := range runProc {
shutdownOrder := []*Process{}
err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error {
if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok {
shutdownOrder = append(shutdownOrder, runningProc)
}
return nil
})
slices.Reverse(shutdownOrder)
if err != nil {
log.Error().Msgf("Failed to build project run order: %s", err.Error())
}

var nameOrder []string
for _, v := range shutdownOrder {
nameOrder = append(nameOrder, v.getName())
}
log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder)
for _, proc := range shutdownOrder {
proc.prepareForShutDown()
}
wg := sync.WaitGroup{}
for _, proc := range runProc {
err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
continue
}
wg.Add(1)
go func(pr *Process) {
pr.waitForCompletion()
wg.Done()
}(proc)

reverseDependencies, err := p.runningProcessesReverseDependencies()
if err != nil {
log.Error().Msgf("Failed to build running processes reverse dependencies: %s", err.Error())
}

for _, process := range shutdownOrder {
wg.Add(1)
go func(proc *Process) {
log.Debug().Msgf("[%s]: shutdown initiated", proc.getName())
waitForDepsWg := sync.WaitGroup{}
if revDeps, ok := reverseDependencies[proc.getName()]; ok {
for revDep, _ := range revDeps {
if runningProc, ok := p.runningProcesses[revDep]; ok {
waitForDepsWg.Add(1)
go func(pr * Process) {
log.Debug().Msgf("[%s]: waiting for %s to shut down first", proc.getName(), revDep)
pr.waitForCompletion()
waitForDepsWg.Done()
}(runningProc)
}
}
}
waitForDepsWg.Wait()
log.Debug().Msgf("[%s]: waited for all deps", proc.getName())

err := proc.shutDown()
if err != nil {
log.Err(err).Msgf("failed to shutdown %s", proc.getName())
return
}
log.Debug().Msgf("[%s]: shutdown complete", proc.getName())
proc.waitForCompletion()
wg.Done()
}(process)
}
wg.Wait()
return nil
Expand Down

0 comments on commit 1b97fe4

Please sign in to comment.