Skip to content

Commit

Permalink
fix: use callback instead of pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Feb 29, 2024
1 parent 8ed4d22 commit c1ca97a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 49 deletions.
4 changes: 1 addition & 3 deletions buildengine/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,13 @@ func Deploy(ctx context.Context, module Module, replicas int32, waitForDeployOnl
}

if waitForDeployOnline {
logger.Infof("Waiting for deployment %s to become ready", resp.Msg.DeploymentName)
logger.Debugf("Waiting for deployment %s to become ready", resp.Msg.DeploymentName)
err = checkReadiness(ctx, client, resp.Msg.DeploymentName, replicas)
if err != nil {
return err
}
}

logger.Infof("Successfully created deployment %s", resp.Msg.DeploymentName)

return nil
}

Expand Down
58 changes: 14 additions & 44 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/pubsub"
"github.com/jpillora/backoff"
"github.com/puzpuzpuz/xsync"
"golang.org/x/exp/maps"
Expand All @@ -27,7 +26,6 @@ type Engine struct {
modules map[string]Module
controllerSchema *xsync.MapOf[string, *schema.Module]
cancel func()
builds *pubsub.Topic[Module]
}

// New constructs a new [Engine].
Expand All @@ -43,7 +41,6 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs
client: client,
modules: map[string]Module{},
controllerSchema: xsync.NewMapOf[*schema.Module](),
builds: pubsub.New[Module](),
}
e.controllerSchema.Store("builtin", schema.Builtins())
ctx, cancel := context.WithCancel(ctx)
Expand Down Expand Up @@ -152,8 +149,11 @@ func (e *Engine) Import(ctx context.Context, schema *schema.Module) {
e.controllerSchema.Store(schema.Name, schema)
}

type BuildCallback func(ctx context.Context, module Module) error

// Build attempts to build the specified modules, or all local modules if none are provided.
func (e *Engine) Build(ctx context.Context, modules ...string) error {
// The callback is invoked for each module after it is built.
func (e *Engine) Build(ctx context.Context, callback BuildCallback, modules ...string) error {
mustBuild := map[string]bool{}
if len(modules) == 0 {
modules = maps.Keys(e.modules)
Expand All @@ -179,6 +179,7 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error {
built := map[string]*schema.Module{
"builtin": schema.Builtins(),
}

topology := TopologicalSort(graph)
for _, group := range topology {
// Collect schemas to be inserted into "built" map for subsequent groups.
Expand All @@ -188,7 +189,11 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error {
for _, name := range group {
wg.Go(func() error {
if mustBuild[name] {
return e.build(ctx, name, built, schemas)
err := e.build(ctx, name, built, schemas)
if err == nil && callback != nil {
return callback(ctx, e.modules[name])
}
return err
}
return e.mustSchema(ctx, name, built, schemas)
})
Expand All @@ -206,54 +211,20 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error {
return nil
}

// Deploy attempts to build and deploy the specified modules, or all local modules if none are provided.
func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error {
if len(modules) == 0 {
modules = maps.Keys(e.modules)
}

buildSubscription := e.builds.SubscribeSync(nil)
defer e.builds.UnsubscribeSync(buildSubscription)

// Create a channel to signal when a module is deployed.
deployedModules := make(chan string, len(modules))
expectedBuilds := make(map[string]struct{}, len(modules))
for _, name := range modules {
expectedBuilds[name] = struct{}{}
}

wg, ctx := errgroup.WithContext(ctx)

// Listen for build events and deploy the module after it is built.
wg.Go(func() error {
for build := range buildSubscription {
if _, ok := expectedBuilds[build.Msg.Module]; ok {
err := Deploy(ctx, e.modules[build.Msg.Module], replicas, waitForDeployOnline, e.client)
if err != nil {
return err
}

build.Ack()
deployedModules <- build.Msg.Module
}
}
return nil
})

// Trigger builds for the specified modules.
if err := e.Build(ctx, modules...); err != nil {
return err
}

// Wait for all specified modules to be deployed.
for i := 0; i < len(modules); i++ {
select {
case <-deployedModules:
case <-ctx.Done():
return wg.Wait()
}
}

return nil
return e.Build(ctx, func(ctx context.Context, module Module) error {
return Deploy(ctx, module, replicas, waitForDeployOnline, e.client)
}, modules...)
}

// Publish either the schema from the FTL controller, or from a local build.
Expand Down Expand Up @@ -281,7 +252,6 @@ func (e *Engine) build(ctx context.Context, name string, built map[string]*schem
if err != nil {
return fmt.Errorf("load schema %s: %w", name, err)
}
e.builds.Publish(module)
schemas <- moduleSchema
return nil
}
Expand Down
7 changes: 6 additions & 1 deletion buildengine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ func TestEngine(t *testing.T) {
graph, err := engine.Graph()
assert.NoError(t, err)
assert.Equal(t, expected, graph)
err = engine.Build(ctx, "alpha")
var callbackModule string
err = engine.Build(ctx, func(ctx context.Context, module buildengine.Module) error {
callbackModule = module.Module
return nil
}, "alpha")
assert.NoError(t, err)
assert.Equal(t, "alpha", callbackModule)
}
2 changes: 1 addition & 1 deletion cmd/ftl/cmd_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ func (b *buildCmd) Run(ctx context.Context) error {
if err != nil {
return err
}
return engine.Build(ctx)
return engine.Build(ctx, nil)
}

0 comments on commit c1ca97a

Please sign in to comment.