diff --git a/src/app/project_runner.go b/src/app/project_runner.go index b0dc9b5..56eb7c9 100644 --- a/src/app/project_runner.go +++ b/src/app/project_runner.go @@ -8,6 +8,7 @@ import ( "os" "os/user" "runtime" + "slices" "sync" "time" @@ -322,25 +323,87 @@ func (p *ProjectRunner) GetProcessPorts(name string) (*types.ProcessPorts, error return ports, nil } +func (p *ProjectRunner) runningProcessesReverseDependencies() (map[string]map[string]struct{}, error) { + reverseDependencies := make(map[string]map[string]struct{}) + + for _, process := range p.runningProcesses { + + for k := range process.procConf.DependsOn { + if runningProc, ok := p.runningProcesses[k]; ok { + if _, ok := reverseDependencies[runningProc.getName()]; !ok { + dep := map[string]struct{}{} + dep[process.getName()] = struct{}{} + reverseDependencies[runningProc.getName()] = dep + } + } else { + log.Error().Msgf("Error: process %s depends on %s, but it isn't running", process.getName(), k) + return nil, fmt.Errorf("process %s depends on %s, but it isn't running", process.getName(), k) + } + } + } + + return reverseDependencies, nil +} + func (p *ProjectRunner) ShutDownProject() error { p.runProcMutex.Lock() defer p.runProcMutex.Unlock() - runProc := p.runningProcesses - for _, proc := range runProc { + shutdownOrder := []*Process{} + err := p.project.WithProcesses([]string{}, func(process types.ProcessConfig) error { + if runningProc, ok := p.runningProcesses[process.ReplicaName]; ok { + shutdownOrder = append(shutdownOrder, runningProc) + } + return nil + }) + slices.Reverse(shutdownOrder) + if err != nil { + log.Error().Msgf("Failed to build project run order: %s", err.Error()) + } + + var nameOrder []string + for _, v := range shutdownOrder { + nameOrder = append(nameOrder, v.getName()) + } + log.Debug().Msgf("Shutting down %d processes. Order: %q", len(shutdownOrder), nameOrder) + for _, proc := range shutdownOrder { proc.prepareForShutDown() } wg := sync.WaitGroup{} - for _, proc := range runProc { - err := proc.shutDown() - if err != nil { - log.Err(err).Msgf("failed to shutdown %s", proc.getName()) - continue - } - wg.Add(1) - go func(pr *Process) { - pr.waitForCompletion() - wg.Done() - }(proc) + + reverseDependencies, err := p.runningProcessesReverseDependencies() + if err != nil { + log.Error().Msgf("Failed to build running processes reverse dependencies: %s", err.Error()) + } + + for _, process := range shutdownOrder { + wg.Add(1) + go func(proc *Process) { + log.Debug().Msgf("[%s]: shutdown initiated", proc.getName()) + waitForDepsWg := sync.WaitGroup{} + if revDeps, ok := reverseDependencies[proc.getName()]; ok { + for revDep, _ := range revDeps { + if runningProc, ok := p.runningProcesses[revDep]; ok { + waitForDepsWg.Add(1) + go func(pr * Process) { + log.Debug().Msgf("[%s]: waiting for %s to shut down first", proc.getName(), revDep) + pr.waitForCompletion() + waitForDepsWg.Done() + }(runningProc) + } + } + } + waitForDepsWg.Wait() + log.Debug().Msgf("[%s]: waited for all deps", proc.getName()) + + err := proc.shutDown() + if err != nil { + log.Err(err).Msgf("failed to shutdown %s", proc.getName()) + return + } + log.Debug().Msgf("[%s]: shutdown complete", proc.getName()) + proc.waitForCompletion() + wg.Done() + }(process) } wg.Wait() return nil