Skip to content

Commit

Permalink
fix: rebuild dependent modules when schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Mar 11, 2024
1 parent f047efc commit 96b5d81
Showing 1 changed file with 68 additions and 3 deletions.
71 changes: 68 additions & 3 deletions buildengine/engine.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package buildengine

import (
"bytes"
"context"
"crypto/sha256"
"fmt"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -75,10 +77,8 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context,
if err == nil {
sch, err := schema.FromProto(psch.Msg.Schema)
if err == nil {
modules := map[string]*schema.Module{}
for _, module := range sch.Modules {
e.controllerSchema.Store(module.Name, module)
modules[module.Name] = module
}
} else {
logger.Debugf("Failed to parse schema from controller: %s", err)
Expand All @@ -92,7 +92,7 @@ func (e *Engine) startSchemaSync(ctx context.Context) func(ctx context.Context,
switch msg.ChangeType {
case ftlv1.DeploymentChangeType_DEPLOYMENT_ADDED, ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED:
sch, err := schema.ModuleFromProto(msg.Schema)
if err == nil {
if err != nil {
return err
}
e.controllerSchema.Store(sch.Name, sch)
Expand Down Expand Up @@ -185,6 +185,48 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
defer watch.Close()
watch.Subscribe(events)

moduleHashes := map[string][]byte{}
e.controllerSchema.Range(func(name string, sch *schema.Module) bool {
hash, err := computeModuleHash(sch)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", name)
return false
}
moduleHashes[name] = hash
return true
})

// Watch for schema changes from the FTL controller.
go rpc.RetryStreamingServerStream(ctx, backoff.Backoff{Max: time.Second}, &ftlv1.PullSchemaRequest{}, e.client.PullSchema, func(ctx context.Context, msg *ftlv1.PullSchemaResponse) error {
if msg.ChangeType != ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED {
return nil // Early return if the condition for inversion is met
}

module, err := schema.ModuleFromProto(msg.Schema)
if err != nil {
return err
}
hash, err := computeModuleHash(module)
if err != nil {
logger.Errorf(err, "compute hash for %s failed", module.Name)
return nil
}
if bytes.Equal(hash, moduleHashes[msg.ModuleName]) {
return nil
}

moduleHashes[msg.ModuleName] = hash
modulesToDeploy := e.getDependentModules(msg.ModuleName)
logger.Infof("%s's schema changed; redeploying: %+v", msg.ModuleName, modulesToDeploy)
err = e.buildAndDeploy(ctx, 1, true, modulesToDeploy...)
if err != nil {
logger.Errorf(err, "deploy %s failed", msg.ModuleName)
}

return nil
})

// Watch for file system changes.
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -216,6 +258,29 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration
}
}

func computeModuleHash(module *schema.Module) ([]byte, error) {
hasher := sha256.New()
data := []byte(module.String())
if _, err := hasher.Write(data); err != nil {
return nil, err // Handle errors that might occur during the write
}

return hasher.Sum(nil), nil
}

func (e *Engine) getDependentModules(moduleName string) []string {
dependentModules := map[string]bool{}
for key, module := range e.modules {
for _, dep := range module.Dependencies {
if dep == moduleName {
dependentModules[key] = true
}
}
}

return maps.Keys(dependentModules)
}

func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, modules ...string) error {
if len(modules) == 0 {
modules = maps.Keys(e.modules)
Expand Down

0 comments on commit 96b5d81

Please sign in to comment.