Skip to content

Commit

Permalink
fix: stop output being swallowed on close (#2738)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Sep 20, 2024
1 parent 246395e commit f1ce9bc
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 18 deletions.
6 changes: 6 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@ func (s *Service) ReplaceDeploy(ctx context.Context, c *connect.Request[ftlv1.Re
return nil, connect.NewError(connect.CodeNotFound, errors.New("deployment not found"))
} else if errors.Is(err, dal.ErrReplaceDeploymentAlreadyActive) {
logger.Infof("Reusing deployment: %s", newDeploymentKey)
dep, err := s.dal.GetDeployment(ctx, newDeploymentKey)
if err == nil {
status.UpdateModuleState(ctx, dep.Module, status.BuildStateDeployed)
} else {
logger.Errorf(err, "Failed to get deployment from database: %s", newDeploymentKey)
}
} else {
logger.Errorf(err, "Could not replace deployment: %s", newDeploymentKey)
return nil, fmt.Errorf("could not replace deployment: %w", err)
Expand Down
4 changes: 0 additions & 4 deletions frontend/cli/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ type devCmd struct {
}

func (d *devCmd) Run(ctx context.Context, projConfig projectconfig.Config) error {
if !cli.Plain {
sm := status.NewStatusManager(ctx)
ctx = sm.IntoContext(ctx)
}

if len(d.Build.Dirs) == 0 {
d.Build.Dirs = projConfig.AbsModuleDirs()
Expand Down
6 changes: 6 additions & 0 deletions frontend/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/status"
)

type CLI struct {
Expand Down Expand Up @@ -89,6 +90,11 @@ func main() {
panic(err)
}

if !cli.Plain {
sm := status.NewStatusManager(ctx)
ctx = sm.IntoContext(ctx)
defer sm.Close()
}
rpc.InitialiseClients(cli.Authenticators, cli.Insecure)

// Set some envars for child processes.
Expand Down
3 changes: 3 additions & 0 deletions internal/status/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ var _ StatusLine = &noopStatusLine{}

type noopStatusManager struct{}

func (r *noopStatusManager) Close() {
}

func (r *noopStatusManager) SetModuleState(module string, state BuildState) {

}
Expand Down
33 changes: 19 additions & 14 deletions internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type terminalStatusManager struct {
moduleStates map[string]BuildState
height int
width int
exitWait sync.WaitGroup
}

type statusKey struct{}
Expand All @@ -80,7 +81,8 @@ func NewStatusManager(ctx context.Context) StatusManager {
if err != nil {
return &noopStatusManager{}
}
sm := &terminalStatusManager{statusLock: sync.RWMutex{}, moduleStates: map[string]BuildState{}, height: height, width: width}
sm := &terminalStatusManager{statusLock: sync.RWMutex{}, moduleStates: map[string]BuildState{}, height: height, width: width, exitWait: sync.WaitGroup{}}
sm.exitWait.Add(1)
sm.old = os.Stdout
sm.oldErr = os.Stderr
sm.read, sm.write, err = os.Pipe()
Expand All @@ -107,14 +109,17 @@ func NewStatusManager(ctx context.Context) StatusManager {
}()

go func() {
defer sm.exitWait.Done()
current := ""
for !sm.closed.Load() {
for {

buf := bytes.Buffer{}
rawData := make([]byte, 104)
n, err := sm.read.Read(rawData)
if err != nil {
// Not much we can do here
sm.Close()
if current != "" {
sm.writeLine(current)
}
return
}
buf.Write(rawData[:n])
Expand All @@ -140,7 +145,6 @@ func NewStatusManager(ctx context.Context) StatusManager {
current += string(d)
}
}

}
}()

Expand Down Expand Up @@ -195,8 +199,9 @@ func (r *terminalStatusManager) NewStatus(message string) StatusLine {
}

func (r *terminalStatusManager) newStatusInternal(line *terminalStatusLine) {
r.clearStatusMessages()
r.underlyingWrite("\n")
if r.closed.Load() {
return
}
for i, l := range r.lines {
if l.priority < line.priority {
r.lines = slices.Insert(r.lines, i, line)
Expand Down Expand Up @@ -226,12 +231,17 @@ func (r *terminalStatusManager) SetModuleState(module string, state BuildState)

func (r *terminalStatusManager) Close() {
r.statusLock.Lock()
defer r.statusLock.Unlock()
r.clearStatusMessages()
r.closed.Store(true)
r.totalStatusLines = 0
r.lines = []*terminalStatusLine{}
r.statusLock.Unlock()
os.Stdout = r.old // restoring the real stdout
os.Stderr = r.oldErr
r.closed.Store(true)
_ = r.write.Close() //nolint:errcheck
r.exitWait.Wait()
}

func (r *terminalStatusManager) writeLine(s string) {
if r.height < 7 || r.width < 20 || r.height-r.totalStatusLines < 5 {
// Not enough space to draw anything
Expand Down Expand Up @@ -293,7 +303,6 @@ func (r *terminalStatusManager) redrawStatus() {
func (r *terminalStatusManager) recalculateLines() {

if len(r.moduleStates) > 0 && r.moduleLine != nil {

entryLength := 0
keys := []string{}
for k := range r.moduleStates {
Expand Down Expand Up @@ -366,10 +375,6 @@ func countLinesAtPos(s string, cursorPos int, width int) int {
return lines
}

func (r *noopStatusManager) Close() {

}

type terminalStatusLine struct {
manager *terminalStatusManager
message string
Expand Down

0 comments on commit f1ce9bc

Please sign in to comment.