diff --git a/Dockerfile.box b/Dockerfile.box index fca5f065ab..c794549d4f 100644 --- a/Dockerfile.box +++ b/Dockerfile.box @@ -47,4 +47,4 @@ RUN mkdir modules EXPOSE 8891 EXPOSE 8892 -CMD ["/root/ftl", "box", "run"] +CMD ["/root/ftl", "box-run", "/root/deployments"] diff --git a/buildengine/engine.go b/buildengine/engine.go index 93755907d4..33e5ed0845 100644 --- a/buildengine/engine.go +++ b/buildengine/engine.go @@ -115,7 +115,6 @@ func New(ctx context.Context, client ftlv1connect.ControllerServiceClient, modul e.moduleMetas.Store(module.Config.Module, moduleMeta{module: module}) e.modulesToBuild.Store(module.Config.Module, true) } - if client == nil { return e, nil } @@ -233,9 +232,50 @@ func (e *Engine) Each(fn func(Module) error) (err error) { return } -// Deploy attempts to build and deploy all local modules. +// Deploy attempts to deploy all (already compiled) local modules. +// +// If waitForDeployOnline is true, this function will block until all deployments are online. func (e *Engine) Deploy(ctx context.Context, replicas int32, waitForDeployOnline bool) error { - return e.buildAndDeploy(ctx, replicas, waitForDeployOnline) + graph, err := e.Graph(e.Modules()...) + if err != nil { + return err + } + + groups, err := TopologicalSort(graph) + if err != nil { + return fmt.Errorf("topological sort failed: %w", err) + } + + for _, group := range groups { + deployGroup, ctx := errgroup.WithContext(ctx) + for _, moduleName := range group { + if moduleName == "builtin" { + continue + } + deployGroup.Go(func() error { + module, ok := e.moduleMetas.Load(moduleName) + if !ok { + return fmt.Errorf("module %q not found", moduleName) + } + return Deploy(ctx, module.module, replicas, waitForDeployOnline, e.client) + }) + } + if err := deployGroup.Wait(); err != nil { + return fmt.Errorf("deploy failed: %w", err) + } + } + log.FromContext(ctx).Infof("All modules deployed") + return nil +} + +// Modules returns the names of all modules. +func (e *Engine) Modules() []string { + var moduleNames []string + e.moduleMetas.Range(func(name string, meta moduleMeta) bool { + moduleNames = append(moduleNames, name) + return true + }) + return moduleNames } // Dev builds and deploys all local modules and watches for changes, redeploying as necessary. @@ -278,7 +318,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration }() // Build and deploy all modules first. - err = e.buildAndDeploy(ctx, 1, true) + err = e.BuildAndDeploy(ctx, 1, true) if err != nil { logger.Errorf(err, "initial deploy failed") e.reportBuildFailed(err) @@ -326,7 +366,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration if _, exists := e.moduleMetas.Load(config.Module); !exists { e.moduleMetas.Store(config.Module, moduleMeta{module: event.Module}) didError = false - err := e.buildAndDeploy(ctx, 1, true, config.Module) + err := e.BuildAndDeploy(ctx, 1, true, config.Module) if err != nil { didError = true e.reportBuildFailed(err) @@ -362,7 +402,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration continue // Skip this event as it's outdated } didError = false - err := e.buildAndDeploy(ctx, 1, true, config.Module) + err := e.BuildAndDeploy(ctx, 1, true, config.Module) if err != nil { didError = true e.reportBuildFailed(err) @@ -395,7 +435,7 @@ func (e *Engine) watchForModuleChanges(ctx context.Context, period time.Duration if len(dependentModuleNames) > 0 { logger.Infof("%s's schema changed; processing %s", change.Name, strings.Join(dependentModuleNames, ", ")) didError = false - err = e.buildAndDeploy(ctx, 1, true, dependentModuleNames...) + err = e.BuildAndDeploy(ctx, 1, true, dependentModuleNames...) if err != nil { didError = true e.reportBuildFailed(err) @@ -431,13 +471,11 @@ func (e *Engine) getDependentModuleNames(moduleName string) []string { return maps.Keys(dependentModuleNames) } -func (e *Engine) buildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, moduleNames ...string) error { +// BuildAndDeploy attempts to build and deploy all local modules. +func (e *Engine) BuildAndDeploy(ctx context.Context, replicas int32, waitForDeployOnline bool, moduleNames ...string) error { logger := log.FromContext(ctx) if len(moduleNames) == 0 { - e.moduleMetas.Range(func(name string, meta moduleMeta) bool { - moduleNames = append(moduleNames, name) - return true - }) + moduleNames = e.Modules() } buildGroup := errgroup.Group{} diff --git a/cmd/ftl/cmd_box.go b/cmd/ftl/cmd_box.go index 68affdec28..53e55a7bd9 100644 --- a/cmd/ftl/cmd_box.go +++ b/cmd/ftl/cmd_box.go @@ -27,34 +27,22 @@ COPY modules /root EXPOSE 8891 EXPOSE 8892 -CMD ["/root/ftl", "dev"] +ENTRYPOINT ["/root/ftl", "box-run", "/root/modules"] ` type boxCmd struct { - Build boxBuildCmd `cmd:"" help:"Build a self-contained Docker container (FTL-in-a-box) for running a set of modules."` - Run boxRunCmd `cmd:"" help:"Run an FTL-in-a-box container."` -} - -type boxRunCmd struct { -} - -func (b *boxRunCmd) Run() error { - return fmt.Errorf("not implemented") -} - -type boxBuildCmd struct { BaseImage string `help:"Name of the ftl-box Docker image to use as a base." default:"ftl0/ftl-box:${version}"` Parallelism int `short:"j" help:"Number of modules to build in parallel." default:"${numcpu}"` Image string `arg:"" help:"Name of image to build."` Dirs []string `arg:"" help:"Base directories containing modules (defaults to modules in project config)." type:"existingdir" optional:""` } -func (b *boxBuildCmd) Help() string { +func (b *boxCmd) Help() string { return `` } -func (b *boxBuildCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient, projConfig projectconfig.Config) error { +func (b *boxCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient, projConfig projectconfig.Config) error { if len(b.Dirs) == 0 { b.Dirs = projConfig.AbsModuleDirs() } @@ -121,5 +109,6 @@ func (b *boxBuildCmd) Run(ctx context.Context, client ftlv1connect.ControllerSer if err != nil { return fmt.Errorf("failed to write Dockerfile: %w", err) } - return exec.Command(ctx, log.Info, workDir, "docker", "build", "-t", b.Image, "--progress=plain", "--platform=linux/amd64", ".").Run() + logger.Infof("Building image %s", b.Image) + return exec.Command(ctx, log.Debug, workDir, "docker", "build", "-t", b.Image, "--progress=plain", "--platform=linux/amd64", ".").RunBuffered(ctx) } diff --git a/cmd/ftl/cmd_box_run.go b/cmd/ftl/cmd_box_run.go new file mode 100644 index 0000000000..d825a026ab --- /dev/null +++ b/cmd/ftl/cmd_box_run.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "fmt" + "net/url" + "time" + + "github.com/alecthomas/kong" + "github.com/jpillora/backoff" + "golang.org/x/sync/errgroup" + + "github.com/TBD54566975/ftl/backend/controller" + "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/scaling/localscaling" + "github.com/TBD54566975/ftl/backend/controller/sql/databasetesting" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/buildengine" + "github.com/TBD54566975/ftl/internal/bind" + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/rpc" +) + +type boxRunCmd struct { + Recreate bool `help:"Recreate the database."` + DSN string `help:"DSN for the database." default:"postgres://postgres:secret@localhost:5432/ftl?sslmode=disable" env:"FTL_CONTROLLER_DSN"` + IngressBind *url.URL `help:"Bind address for the ingress server." default:"http://0.0.0.0:8891" env:"FTL_INGRESS_BIND"` + Bind *url.URL `help:"Bind address for the FTL controller." default:"http://0.0.0.0:8892" env:"FTL_BIND"` + RunnerBase *url.URL `help:"Base bind address for FTL runners." default:"http://127.0.0.1:8893" env:"FTL_RUNNER_BIND"` + Dir string `arg:"" help:"Directory to scan for precompiled modules." default:"."` + ControllerTimeout time.Duration `help:"Timeout for Controller start." default:"30s"` +} + +func (b *boxRunCmd) Run(ctx context.Context) error { + conn, err := databasetesting.CreateForDevel(ctx, b.DSN, b.Recreate) + if err != nil { + return fmt.Errorf("failed to create database: %w", err) + } + dal, err := dal.New(ctx, conn) + if err != nil { + return fmt.Errorf("failed to create DAL: %w", err) + } + config := controller.Config{ + Bind: b.Bind, + IngressBind: b.IngressBind, + Key: model.NewLocalControllerKey(0), + DSN: b.DSN, + } + if err := kong.ApplyDefaults(&config); err != nil { + return err + } + + // Start the controller. + runnerPortAllocator, err := bind.NewBindAllocator(b.RunnerBase) + if err != nil { + return fmt.Errorf("failed to create runner port allocator: %w", err) + } + runnerScaling, err := localscaling.NewLocalScaling(runnerPortAllocator, []*url.URL{b.Bind}) + if err != nil { + return fmt.Errorf("failed to create runner autoscaler: %w", err) + } + wg := errgroup.Group{} + wg.Go(func() error { + return controller.Start(ctx, config, runnerScaling, dal) + }) + + // Wait for the controller to come up. + client := ftlv1connect.NewControllerServiceClient(rpc.GetHTTPClient(b.Bind.String()), b.Bind.String()) + waitCtx, cancel := context.WithTimeout(ctx, b.ControllerTimeout) + defer cancel() + if err := rpc.Wait(waitCtx, backoff.Backoff{}, client); err != nil { + return fmt.Errorf("controller failed to start: %w", err) + } + + engine, err := buildengine.New(ctx, client, []string{b.Dir}) + if err != nil { + return fmt.Errorf("failed to create build engine: %w", err) + } + + if err := engine.Deploy(ctx, 1, true); err != nil { + return fmt.Errorf("failed to deploy: %w", err) + } + return wg.Wait() +} diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index 8ea47241c3..0ecd47a6ed 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -21,5 +21,5 @@ func (d *deployCmd) Run(ctx context.Context) error { if err != nil { return err } - return engine.Deploy(ctx, d.Replicas, !d.NoWait) + return engine.BuildAndDeploy(ctx, d.Replicas, !d.NoWait) } diff --git a/cmd/ftl/main.go b/cmd/ftl/main.go index b114725da7..e5a2d68a41 100644 --- a/cmd/ftl/main.go +++ b/cmd/ftl/main.go @@ -45,6 +45,7 @@ type CLI struct { Schema schemaCmd `cmd:"" help:"FTL schema commands."` Build buildCmd `cmd:"" help:"Build all modules found in the specified directories."` Box boxCmd `cmd:"" help:"Build a self-contained Docker container for running a set of module."` + BoxRun boxRunCmd `cmd:"" hidden:"" help:"Run FTL inside an ftl-in-a-box container"` Deploy deployCmd `cmd:"" help:"Build and deploy all modules found in the specified directories."` Download downloadCmd `cmd:"" help:"Download a deployment."` Secret secretCmd `cmd:"" help:"Manage secrets."` diff --git a/internal/modulecontext/module_context.go b/internal/modulecontext/module_context.go index 275e358389..e3783c9177 100644 --- a/internal/modulecontext/module_context.go +++ b/internal/modulecontext/module_context.go @@ -5,16 +5,18 @@ import ( "database/sql" "encoding/json" "fmt" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" - "github.com/TBD54566975/ftl/internal/rpc" - "github.com/alecthomas/atomic" - "github.com/jpillora/backoff" - "golang.org/x/sync/errgroup" "strings" "sync" "time" + "github.com/alecthomas/atomic" + "github.com/jpillora/backoff" + "golang.org/x/sync/errgroup" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" + "github.com/TBD54566975/ftl/internal/rpc" + "github.com/alecthomas/types/optional" _ "github.com/jackc/pgx/v5/stdlib" // SQL driver diff --git a/internal/rpc/rpc.go b/internal/rpc/rpc.go index effc8adb17..ac21bdeb98 100644 --- a/internal/rpc/rpc.go +++ b/internal/rpc/rpc.go @@ -162,7 +162,7 @@ func RetryStreamingClientStream[Req, Resp any]( break } if errored { - logger.Debugf("Stream recovered") + logger.Debugf("Client stream recovered") errored = false } select { @@ -218,7 +218,7 @@ func RetryStreamingServerStream[Req, Resp any]( break } if errored { - logger.Debugf("Stream recovered") + logger.Debugf("Server stream recovered") errored = false } select { @@ -238,6 +238,8 @@ func RetryStreamingServerStream[Req, Resp any]( delay := retry.Duration() if err != nil && !errors.Is(err, context.Canceled) { logger.Logf(logLevel, "Stream handler failed, retrying in %s: %s", delay, err) + } else if err == nil { + logger.Debugf("Stream finished, retrying in %s", delay) } select { case <-ctx.Done():