diff --git a/buildengine/engine.go b/buildengine/engine.go index 050600ae50..969fdd61b1 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -1,7 +1,9 @@ package buildengine import ( + "bytes" "context" + "crypto/sha256" "fmt" "path/filepath" "runtime" @@ -75,10 +77,8 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, if err == nil { sch, err := schema.FromProto(psch.Msg.Schema) if err == nil { - modules := map[string]*schema.Module{} for _, module := range sch.Modules { e.controllerSchema.Store(module.Name, module) - modules[module.Name] = module } } else { logger.Debugf("Failed to parse schema from controller: %s", err) @@ -92,7 +92,7 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context, switch msg.ChangeType { case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED: sch, err := schema.ModuleFromProto(msg.Schema) - if err == nil { + if err != nil { return err } e.controllerSchema.Store(sch.Name, sch) @@ -185,6 +185,48 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration defer watch.Close() watch.Subscribe(events) + moduleHashes := map[string][]byte{} + e.controllerSchema.Range(func(name string, sch *schema.Module) bool { + hash, err := computeModuleHash(sch) + if err != nil { + logger.Errorf(err, "compute hash for %s failed", name) + return false + } + moduleHashes[name] = hash + return true + }) + + // Watch for schema changes from the FTL controller. + go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, e.client.PullSchema, func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error { + if msg.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED { + return nil // Early return if the condition for inversion is met + } + + module, err := schema.ModuleFromProto(msg.Schema) + if err != nil { + return err + } + hash, err := computeModuleHash(module) + if err != nil { + logger.Errorf(err, "compute hash for %s failed", module.Name) + return nil + } + if bytes.Equal(hash, moduleHashes[msg.ModuleName]) { + return nil + } + + moduleHashes[msg.ModuleName] = hash + modulesToDeploy := e.getDependentModules(msg.ModuleName) + logger.Infof("%s's schema changed; redeploying: %+v", msg.ModuleName, modulesToDeploy) + err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...) + if err != nil { + logger.Errorf(err, "deploy %s failed", msg.ModuleName) + } + + return nil + }) + + // Watch for file system changes. for { select { case <-ctx.Done(): @@ -216,6 +258,29 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration } } +func computeModuleHash(module *schema.Module) ([]byte, error) { + hasher := sha256.New() + data := []byte(module.String()) + if _, err := hasher.Write(data); err != nil { + return nil, err // Handle errors that might occur during the write + } + + return hasher.Sum(nil), nil +} + +func (e *Engine) getDependentModules(moduleName string) []string { + dependentModules := map[string]bool{} + for key, module := range e.modules { + for _, dep := range module.Dependencies { + if dep == moduleName { + dependentModules[key] = true + } + } + } + + return maps.Keys(dependentModules) +} + func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error { if len(modules) == 0 { modules = maps.Keys(e.modules)