diff --git a/buildengine/engine.go b/buildengine/engine.go index 8cf430f750..157e9e2bf6 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -10,6 +10,7 @@ import ( "time" "connectrpc.com/connect" + "github.com/alecthomas/types/pubsub" "github.com/jpillora/backoff" "github.com/puzpuzpuz/xsync/v3" "golang.org/x/exp/maps" @@ -22,12 +23,18 @@ import ( "github.com/TBD54566975/ftl/internal/rpc" ) +type schemaChange struct { + ChangeType ftlv1.DeploymentChangeType + *schema.Module +} + // Engine for building a set of modules. type Engine struct { client ftlv1connect.ControllerServiceClient modules map[string]Module dirs []string controllerSchema *xsync.MapOf[string, *schema.Module] + schemaChanges *pubsub.Topic[schemaChange] cancel func() } @@ -45,6 +52,7 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, dirs dirs: dirs, modules: map[string]Module{}, controllerSchema: xsync.NewMapOf[string, *schema.Module](), + schemaChanges: pubsub.New[schemaChange](), } e.controllerSchema.Store("builtin", schema.Builtins()) ctx, cancel := context.WithCancel(ctx) @@ -96,9 +104,11 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, return err } e.controllerSchema.Store(sch.Name, sch) + e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: sch}) case ftlv1.DeploymentChangeType_DEPLOYMENT_REMOVED: e.controllerSchema.Delete(msg.ModuleName) + e.schemaChanges.Publish(schemaChange{ChangeType: msg.ChangeType, Module: nil}) } return nil } @@ -191,47 +201,22 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration return true }) - // Watch for schema changes from the FTL controller. - go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, e.client.PullSchema, func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error { - if msg.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED { - return nil // Early return if the condition for inversion is met - } - - module, err := schema.ModuleFromProto(msg.Schema) - if err != nil { - return err - } - hash, err := computeModuleHash(module) - if err != nil { - logger.Errorf(err, "compute hash for %s failed", module.Name) - return nil - } - if bytes.Equal(hash, moduleHashes[msg.ModuleName]) { - return nil - } - - moduleHashes[msg.ModuleName] = hash - modulesToDeploy := e.getDependentModules(msg.ModuleName) - logger.Infof("%s's schema changed; redeploying: %+v", msg.ModuleName, modulesToDeploy) - err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...) - if err != nil { - logger.Errorf(err, "deploy %s failed", msg.ModuleName) - } - - return nil - }) + schemaChanges := make(chan schemaChange, 128) + e.schemaChanges.Subscribe(schemaChanges) + defer e.schemaChanges.Unsubscribe(schemaChanges) - events := make(chan WatchEvent, 128) + watchEvents := make(chan WatchEvent, 128) watch := Watch(ctx, period, e.dirs...) + watch.Subscribe(watchEvents) + defer watch.Unsubscribe(watchEvents) defer watch.Close() - watch.Subscribe(events) - // Watch for file system changes. + // Watch for file and schema changes for { select { case <-ctx.Done(): return ctx.Err() - case event := <-events: + case event := <-watchEvents: switch event := event.(type) { case WatchEventModuleAdded: if _, exists := e.modules[event.Module.Module]; !exists { @@ -254,7 +239,31 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration logger.Errorf(err, "deploy %s failed", event.Module.Module) } } + case change := <-schemaChanges: + if change.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED { + continue + } + + hash, err := computeModuleHash(change.Module) + if err != nil { + logger.Errorf(err, "compute hash for %s failed", change.Name) + continue + } + + if bytes.Equal(hash, moduleHashes[change.Name]) { + logger.Tracef("schema for %s has not changed", change.Name) + continue + } + + moduleHashes[change.Name] = hash + modulesToDeploy := e.getDependentModules(change.Name) + logger.Infof("%s's schema changed; redeploying: %+v", change.Name, modulesToDeploy) + err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...) + if err != nil { + logger.Errorf(err, "deploy %s failed", change.Name) + } } + } }