Skip to content

Commit

Permalink
fix: redeploy all modules on schema changes
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Jan 17, 2024
1 parent e47581a commit a541198
Showing 1 changed file with 90 additions and 23 deletions.
113 changes: 90 additions & 23 deletions cmd/ftl/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,63 @@ import (
"strings"
"time"

"connectrpc.com/connect"
"github.com/bmatcuk/doublestar/v4"

"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/moduleconfig"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/ftlv1connect"
)

type moduleFolderInfo struct {
ModuleName string
NumFiles int
LastModTime time.Time
}

type devCmd struct {
BaseDir string `arg:"" help:"Directory to watch for FTL modules" type:"existingdir" default:"."`
Watch time.Duration `help:"Watch template directory at this frequency and regenerate on change." default:"500ms"`
FailureDelay time.Duration `help:"Delay before retrying a failed deploy." default:"5s"`
modules map[string]moduleFolderInfo
client ftlv1connect.ControllerServiceClient
BaseDir string `arg:"" help:"Directory to watch for FTL modules" type:"existingdir" default:"."`
Watch time.Duration `help:"Watch template directory at this frequency and regenerate on change." default:"500ms"`
FailureDelay time.Duration `help:"Delay before retrying a failed deploy." default:"5s"`
ReconnectDelay time.Duration `help:"Delay before attempting to reconnect to FTL." default:"5s"`
}

type moduleMap map[string]*moduleFolderInfo

func (m *moduleMap) ForceRebuild(dir string) {
(*m)[dir].NumFiles = 0
(*m)[dir].LastModTime = time.Now()
}

func (m *moduleMap) AddModule(dir string, module string) {
(*m)[dir] = &moduleFolderInfo{
ModuleName: module,
LastModTime: time.Now(),
}
}

func (m *moduleMap) RemoveModule(dir string) {
delete(*m, dir)
}

func (m *moduleMap) ForceRebuildFromDependent(module string) {
for dir, moduleInfo := range *m {
if moduleInfo.ModuleName != module {
(*m).ForceRebuild(dir)
}
}
}

func (d *devCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient) error {
logger := log.FromContext(ctx)
logger.Infof("Watching %s for FTL modules", d.BaseDir)

d.modules = make(map[string]moduleFolderInfo)
d.client = client
schemaChanges := make(chan string)
modules := make(moduleMap)

// Start a goroutine to watch for schema changes
go d.watchForSchemaChanges(ctx, client, schemaChanges)

lastScanTime := time.Now()
for {
Expand All @@ -48,24 +79,27 @@ func (d *devCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC
return err
}

d.addOrRemoveModules(tomls)
err = d.addOrRemoveModules(tomls, modules)
if err != nil {
return err
}

for dir := range d.modules {
currentModule := d.modules[dir]
err := d.updateFileInfo(ctx, dir)
for dir := range modules {
currentModule := modules[dir]
err := d.updateFileInfo(ctx, dir, modules)
if err != nil {
return err
}

if currentModule.NumFiles != d.modules[dir].NumFiles || d.modules[dir].LastModTime.After(lastScanTime) {
if currentModule.NumFiles != modules[dir].NumFiles || modules[dir].LastModTime.After(lastScanTime) {
deploy := deployCmd{
Replicas: 1,
ModuleDir: dir,
}
err = deploy.Run(ctx, client)
if err != nil {
logger.Errorf(err, "Error deploying module %s. Will retry", dir)
delete(d.modules, dir)
modules.RemoveModule(dir)
// Increase delay when there's a compile failure.
delay = d.FailureDelay
}
Expand All @@ -74,13 +108,44 @@ func (d *devCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC

lastScanTime = iterationStartTime
select {
case moduleName := <-schemaChanges:
logger.Infof("Schema change detected for module %s, rebuilding other modules.", moduleName)
modules.ForceRebuildFromDependent(moduleName)
case <-time.After(delay):
case <-ctx.Done():
return nil
}
}
}

func (d *devCmd) watchForSchemaChanges(ctx context.Context, client ftlv1connect.ControllerServiceClient, schemaChanges chan string) {
logger := log.FromContext(ctx)
for {
stream, err := client.PullSchema(ctx, connect.NewRequest(&ftlv1.PullSchemaRequest{}))
if err != nil {
logger.Errorf(err, "Error connecting to FTL. Will retry...")
time.Sleep(d.ReconnectDelay)
continue
}

for {
ok := stream.Receive()
if !ok {
stream.Close()
logger.Infof("Stream disconnected, attempting to reconnect...")
time.Sleep(d.ReconnectDelay)
break
}

msg := stream.Msg()

if msg.ChangeType == ftlv1.DeploymentChangeType_DEPLOYMENT_CHANGED {
schemaChanges <- msg.ModuleName
}
}
}
}

func (d *devCmd) getTomls(ctx context.Context) ([]string, error) {
baseDir := d.BaseDir
ignores := initGitIgnore(ctx, baseDir)
Expand All @@ -100,17 +165,19 @@ func (d *devCmd) getTomls(ctx context.Context) ([]string, error) {
return tomls, nil
}

func (d *devCmd) addOrRemoveModules(tomls []string) {
func (d *devCmd) addOrRemoveModules(tomls []string, modules moduleMap) error {
for _, toml := range tomls {
dir := filepath.Dir(toml)
if _, ok := d.modules[dir]; !ok {
d.modules[dir] = moduleFolderInfo{
LastModTime: time.Now(),
if _, ok := modules[dir]; !ok {
config, err := moduleconfig.LoadConfig(dir)
if err != nil {
return err
}
modules.AddModule(dir, config.Module)
}
}

for dir := range d.modules {
for dir := range modules {
found := false
for _, toml := range tomls {
if filepath.Dir(toml) == dir {
Expand All @@ -119,19 +186,19 @@ func (d *devCmd) addOrRemoveModules(tomls []string) {
}
}
if !found {
delete(d.modules, dir) // Remove deleted module from d.modules
modules.RemoveModule(dir)
}
}
return nil
}

func (d *devCmd) updateFileInfo(ctx context.Context, dir string) error {
func (d *devCmd) updateFileInfo(ctx context.Context, dir string, modules moduleMap) error {
config, err := moduleconfig.LoadConfig(dir)
if err != nil {
return err
}

ignores := initGitIgnore(ctx, dir)
d.modules[dir] = moduleFolderInfo{}

var changed string
err = walkDir(dir, ignores, func(srcPath string, entry fs.DirEntry) error {
Expand All @@ -152,13 +219,13 @@ func (d *devCmd) updateFileInfo(ctx context.Context, dir string) error {
return err
}

module := d.modules[dir]
module := modules[dir]
module.NumFiles++
if fileInfo.ModTime().After(module.LastModTime) {
changed = srcPath
module.LastModTime = fileInfo.ModTime()
}
d.modules[dir] = module
modules[dir] = module
}
}

Expand Down

0 comments on commit a541198

Please sign in to comment.