From c1ca97a935598047f3064d8eb2d08d8b7cc1b7ca Mon Sep 17 00:00:00 2001 From: Wes Date: Thu, 29 Feb 2024 09:19:57 -0800 Subject: [PATCH] fix: use callback instead of pubsub --- buildengine/deploy.go | 4 +-- buildengine/engine.go | 58 +++++++++----------------------------- buildengine/engine_test.go | 7 ++++- cmd/ftl/cmd_build.go | 2 +- 4 files changed, 22 insertions(+), 49 deletions(-) diff --git a/buildengine/deploy.go b/buildengine/deploy.go index be01b0a58a..e3989c4dc9 100644 --- a/buildengine/deploy.go +++ b/buildengine/deploy.go @@ -85,15 +85,13 @@ func Deploy(ctx context.Context, module Module, replicas int32, waitForDeployOnl } if waitForDeployOnline { - logger.Infof("Waiting for deployment %s to become ready", resp.Msg.DeploymentName) + logger.Debugf("Waiting for deployment %s to become ready", resp.Msg.DeploymentName) err = checkReadiness(ctx, client, resp.Msg.DeploymentName, replicas) if err != nil { return err } } - logger.Infof("Successfully created deployment %s", resp.Msg.DeploymentName) - return nil } diff --git a/buildengine/engine.go b/buildengine/engine.go index b55ca3fc18..95e924b72f 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -8,7 +8,6 @@ import ( "time" "connectrpc.com/connect" - "github.com/alecthomas/types/pubsub" "github.com/jpillora/backoff" "github.com/puzpuzpuz/xsync" "golang.org/x/exp/maps" @@ -27,7 +26,6 @@ type Engine struct { modules map[string]Module controllerSchema *xsync.MapOf[string, *schema.Module] cancel func() - builds *pubsub.Topic[Module] } // New constructs a new [Engine]. @@ -43,7 +41,6 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs client: client, modules: map[string]Module{}, controllerSchema: xsync.NewMapOf[*schema.Module](), - builds: pubsub.New[Module](), } e.controllerSchema.Store("builtin", schema.Builtins()) ctx, cancel := context.WithCancel(ctx) @@ -152,8 +149,11 @@ func (e *Engine) Import(ctx context.Context, schema *schema.Module) { e.controllerSchema.Store(schema.Name, schema) } +type BuildCallback func(ctx context.Context, module Module) error + // Build attempts to build the specified modules, or all local modules if none are provided. -func (e *Engine) Build(ctx context.Context, modules ...string) error { +// The callback is invoked for each module after it is built. +func (e *Engine) Build(ctx context.Context, callback BuildCallback, modules ...string) error { mustBuild := map[string]bool{} if len(modules) == 0 { modules = maps.Keys(e.modules) @@ -179,6 +179,7 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error { built := map[string]*schema.Module{ "builtin": schema.Builtins(), } + topology := TopologicalSort(graph) for _, group := range topology { // Collect schemas to be inserted into "built" map for subsequent groups. @@ -188,7 +189,11 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error { for _, name := range group { wg.Go(func() error { if mustBuild[name] { - return e.build(ctx, name, built, schemas) + err := e.build(ctx, name, built, schemas) + if err == nil && callback != nil { + return callback(ctx, e.modules[name]) + } + return err } return e.mustSchema(ctx, name, built, schemas) }) @@ -206,54 +211,20 @@ func (e *Engine) Build(ctx context.Context, modules ...string) error { return nil } +// Deploy attempts to build and deploy the specified modules, or all local modules if none are provided. func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error { if len(modules) == 0 { modules = maps.Keys(e.modules) } - buildSubscription := e.builds.SubscribeSync(nil) - defer e.builds.UnsubscribeSync(buildSubscription) - - // Create a channel to signal when a module is deployed. - deployedModules := make(chan string, len(modules)) expectedBuilds := make(map[string]struct{}, len(modules)) for _, name := range modules { expectedBuilds[name] = struct{}{} } - wg, ctx := errgroup.WithContext(ctx) - - // Listen for build events and deploy the module after it is built. - wg.Go(func() error { - for build := range buildSubscription { - if _, ok := expectedBuilds[build.Msg.Module]; ok { - err := Deploy(ctx, e.modules[build.Msg.Module], replicas, waitForDeployOnline, e.client) - if err != nil { - return err - } - - build.Ack() - deployedModules <- build.Msg.Module - } - } - return nil - }) - - // Trigger builds for the specified modules. - if err := e.Build(ctx, modules...); err != nil { - return err - } - - // Wait for all specified modules to be deployed. - for i := 0; i < len(modules); i++ { - select { - case <-deployedModules: - case <-ctx.Done(): - return wg.Wait() - } - } - - return nil + return e.Build(ctx, func(ctx context.Context, module Module) error { + return Deploy(ctx, module, replicas, waitForDeployOnline, e.client) + }, modules...) } // Publish either the schema from the FTL controller, or from a local build. @@ -281,7 +252,6 @@ func (e *Engine) build(ctx context.Context, name string, built map[string]*schem if err != nil { return fmt.Errorf("load schema %s: %w", name, err) } - e.builds.Publish(module) schemas <- moduleSchema return nil } diff --git a/buildengine/engine_test.go b/buildengine/engine_test.go index 17a7251b07..8af61bd0f1 100644 --- a/buildengine/engine_test.go +++ b/buildengine/engine_test.go @@ -52,6 +52,11 @@ func TestEngine(t *testing.T) { graph, err := engine.Graph() assert.NoError(t, err) assert.Equal(t, expected, graph) - err = engine.Build(ctx, "alpha") + var callbackModule string + err = engine.Build(ctx, func(ctx context.Context, module buildengine.Module) error { + callbackModule = module.Module + return nil + }, "alpha") assert.NoError(t, err) + assert.Equal(t, "alpha", callbackModule) } diff --git a/cmd/ftl/cmd_build.go b/cmd/ftl/cmd_build.go index 4b10d5737a..06c7abca66 100644 --- a/cmd/ftl/cmd_build.go +++ b/cmd/ftl/cmd_build.go @@ -18,5 +18,5 @@ func (b *buildCmd) Run(ctx context.Context) error { if err != nil { return err } - return engine.Build(ctx) + return engine.Build(ctx, nil) }