Skip to content

Commit

Permalink
fix: build errors canceling deploy contexts (#1225)
Browse files Browse the repository at this point in the history
Fixes an issue where build errors would cancel the deploys for modules
that build successfully.

For example, if we have `time` and `echo` (echo depends on time).

```sh
build: time
(time builds successfully)
deploy: time
build: echo
(echo fails to build)
>>> time's deploy ctx would get canceled here so it would never deploy
```

Then, we fix the build error in `echo`, but time never deployed and
since it doesn't depend on `echo` it's never triggered to rebuild
(unless a change to time is made).

This change introduces a `buildCtx` and `deployCtx` If there are any
build failures we will report them after the deploys for successful
builds complete.
  • Loading branch information
wesbillman authored Apr 11, 2024
1 parent dfc8380 commit e397c88
Showing 1 changed file with 85 additions and 49 deletions.
134 changes: 85 additions & 49 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -45,12 +46,12 @@ type Engine struct {
projects map[ProjectKey]Project
moduleDirs []string
externalDirs []string
commands map[string]string
controllerSchema *xsync.MapOf[string, *schema.Module]
schemaChanges *pubsub.Topic[schemaChange]
cancel func()
parallelism int
listener Listener
projectsToBuild *xsync.MapOf[ProjectKey, bool]
}

type Option func(o *Engine)
Expand Down Expand Up @@ -85,6 +86,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
controllerSchema: xsync.NewMapOf[string, *schema.Module](),
schemaChanges: pubsub.New[schemaChange](),
parallelism: runtime.NumCPU(),
projectsToBuild: xsync.NewMapOf[ProjectKey, bool](),
}
for _, option := range options {
option(e)
Expand All @@ -103,6 +105,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul
return nil, err
}
e.projects[project.Config().Key] = project
e.projectsToBuild.Store(project.Config().Key, true)
}

if client == nil {
Expand Down Expand Up @@ -240,8 +243,10 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
err := e.buildAndDeploy(ctx, 1, true)
if err != nil {
logger.Errorf(err, "initial deploy failed")
} else {
logger.Infof("All modules deployed, watching for changes...")
}
logger.Infof("All modules deployed, watching for changes...")

moduleHashes := map[string][]byte{}
e.controllerSchema.Range(func(name string, sch *schema.Module) bool {
hash, err := computeModuleHash(sch)
Expand Down Expand Up @@ -319,7 +324,6 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}
}

}
}

Expand All @@ -346,52 +350,46 @@ func (e *Engine) getDependentProjectKeys(name string) []ProjectKey {
}

func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, projects ...ProjectKey) error {
logger := log.FromContext(ctx)
if len(projects) == 0 {
projects = maps.Keys(e.projects)
}

deployQueue := make(chan Project, len(projects))
wg, ctx := errgroup.WithContext(ctx)
buildGroup := errgroup.Group{}
deployGroup := errgroup.Group{}

// Build all projects and enqueue the modules for deployment.
wg.Go(func() error {
defer close(deployQueue)

return e.buildWithCallback(ctx, func(ctx context.Context, project Project) error {
if _, ok := project.(Module); ok {
select {
case deployQueue <- project:
return nil
case <-ctx.Done():
return ctx.Err()
}
}
buildGroup.Go(func() error {
return e.buildWithCallback(ctx, func(buildCtx context.Context, builtProject Project) error {
deployGroup.Go(func() error {
e.projectsToBuild.Store(builtProject.Config().Key, false)
return Deploy(buildCtx, builtProject.(Module), replicas, waitForDeployOnline, e.client) //nolint:forcetypeassert
})
return nil
}, projects...)
})

// Process deployment queue.
for range len(projects) {
wg.Go(func() error {
for {
select {
case project, ok := <-deployQueue:
if !ok {
return nil
}
if module, ok := project.(Module); ok {
err := Deploy(ctx, module, replicas, waitForDeployOnline, e.client)
if err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
}
})
// Wait for all build and deploy attempts to complete
buildErr := buildGroup.Wait()
deployErr := deployGroup.Wait()

pendingInitialBuilds := []string{}
e.projectsToBuild.Range(func(key ProjectKey, value bool) bool {
if value {
pendingInitialBuilds = append(pendingInitialBuilds, string(key))
}
return true
})

// Print out all modules that have yet to build if there are any errors
if len(pendingInitialBuilds) > 0 {
logger.Infof("Modules waiting to build: %s", strings.Join(pendingInitialBuilds, ", "))
}

if buildErr != nil {
return buildErr
}
return wg.Wait()

return deployErr
}

type buildCallback func(ctx context.Context, project Project) error
Expand Down Expand Up @@ -424,39 +422,77 @@ func (e *Engine) buildWithCallback(ctx context.Context, callback buildCallback,
}

topology := TopologicalSort(graph)

errCh := make(chan error, 1024)
for _, group := range topology {
// Collect schemas to be inserted into "built" map for subsequent groups.
schemas := make(chan *schema.Module, len(group))
wg, ctx := errgroup.WithContext(ctx)

wg := errgroup.Group{}
wg.SetLimit(e.parallelism)
for _, keyStr := range group {
key := ProjectKey(keyStr)
wg.Go(func() error {
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
wg.Go(func() error {
err := e.tryBuild(ctx, mustBuild, key, builtModules, schemas, callback)
if err != nil {
errCh <- err
}
return err
return nil
})
}
err := wg.Wait()

err = wg.Wait()
if err != nil {
return err
}

// Now this group is built, collect all the schemas.
close(schemas)
for sch := range schemas {
builtModules[sch.Name] = sch
}
}

close(errCh)
allErrors := []error{}
for err := range errCh {
allErrors = append(allErrors, err)
}

if len(allErrors) > 0 {
return errors.Join(allErrors...)
}

return nil
}

func (e *Engine) tryBuild(ctx context.Context, mustBuild map[ProjectKey]bool, key ProjectKey, builtModules map[string]*schema.Module, schemas chan *schema.Module, callback buildCallback) error {
logger := log.FromContext(ctx)
if !mustBuild[key] {
return e.mustSchema(ctx, key, builtModules, schemas)
}

project, ok := e.projects[key]
if !ok {
return fmt.Errorf("project %q not found", key)
}

for _, dep := range project.Config().Dependencies {
if _, ok := builtModules[dep]; !ok {
logger.Warnf("%q build skipped because its dependency %q failed to build", key, dep)
return nil
}
}

err := e.build(ctx, key, builtModules, schemas)
if err == nil && callback != nil {
return callback(ctx, e.projects[key])
}

return err
}

// Publish either the schema from the FTL controller, or from a local build.
func (e *Engine) mustSchema(ctx context.Context, key ProjectKey, builtModules map[string]*schema.Module, schemas chan<- *schema.Module) error {
if sch, ok := e.controllerSchema.Load(string(key)); ok {
Expand Down

0 comments on commit e397c88

Please sign in to comment.