diff --git a/backend/provisioner/deployment.go b/backend/provisioner/deployment.go index 821aa59365..2fd270b0f7 100644 --- a/backend/provisioner/deployment.go +++ b/backend/provisioner/deployment.go @@ -43,7 +43,7 @@ func (t *Task) Start(ctx context.Context) error { resp, err := t.handler.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{ Module: t.module, - // TODO: We need a proper cluster speicific ID here + // TODO: We need a proper cluster specific ID here FtlClusterId: "ftl", ExistingResources: t.existing, DesiredResources: t.constructResourceContext(t.desired), @@ -83,13 +83,13 @@ func (t *Task) Progress(ctx context.Context) error { if err != nil { return fmt.Errorf("error getting state: %w", err) } - if succ, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok { + switch s := resp.Msg.Status.(type) { + case *provisioner.StatusResponse_Success: t.state = TaskStateDone - t.output = succ.Success.UpdatedResources - } - if fail, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok { + t.output = s.Success.UpdatedResources + case *provisioner.StatusResponse_Failed: t.state = TaskStateFailed - return errors.New(fail.Failed.ErrorMessage) + return errors.New(s.Failed.ErrorMessage) } return nil } diff --git a/backend/provisioner/dev/dev_provisioner.go b/backend/provisioner/dev/dev_provisioner.go index ad757f99e4..93a85df9a4 100644 --- a/backend/provisioner/dev/dev_provisioner.go +++ b/backend/provisioner/dev/dev_provisioner.go @@ -34,19 +34,17 @@ type step struct { done atomic.Bool } -// Provisioner is a provisioner for running FTL locally +// Provisioner for running FTL locally type Provisioner struct { running map[string]*task postgresDSN string - postgresImage string - postgresPort int + postgresPort int } -func NewProvisioner(postgresImage string, postgresPort int) *Provisioner { +func NewProvisioner(postgresPort int) *Provisioner { return &Provisioner{ - postgresImage: postgresImage, - postgresPort: postgresPort, + postgresPort: postgresPort, } } @@ -90,7 +88,7 @@ func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provis }), nil } - token := strconv.Itoa(rand.Int()) + token := strconv.Itoa(rand.Int()) //nolint:gosec d.running[token] = task return connect.NewResponse(&provisioner.ProvisionResponse{ @@ -146,7 +144,8 @@ func (d *Provisioner) provisionPostgres(ctx context.Context, tr *provisioner.Res go func() { defer step.done.Store(true) if d.postgresDSN == "" { - dsn, err := dev.SetupDB(ctx, d.postgresImage, d.postgresPort, false) + // We assume that the DB hsas already been started when running in dev mode + dsn, err := dev.WaitForDBReady(ctx, d.postgresPort) if err != nil { step.err = err return diff --git a/backend/provisioner/provisioner_integration_test.go b/backend/provisioner/provisioner_integration_test.go index 0722accbbf..f20dd577ef 100644 --- a/backend/provisioner/provisioner_integration_test.go +++ b/backend/provisioner/provisioner_integration_test.go @@ -12,7 +12,8 @@ import ( func TestDeploymentThroughNoopProvisioner(t *testing.T) { in.Run(t, - in.WithProvisioner(` + in.WithProvisioner(), + in.WithProvisionerConfig(` default = "noop" plugins = [ { id = "noop", resources = ["postgres"] }, @@ -28,7 +29,7 @@ func TestDeploymentThroughNoopProvisioner(t *testing.T) { func TestDeploymentThrougDevProvisionerCreatePostgresDB(t *testing.T) { in.Run(t, - in.WithProvisioner(`default = "dev"`), + in.WithProvisioner(), in.CopyModule("echo"), in.DropDBAction(t, "echo_echodb"), in.Deploy("echo"), diff --git a/backend/provisioner/registry.go b/backend/provisioner/registry.go index 19b77ed341..cb569818e3 100644 --- a/backend/provisioner/registry.go +++ b/backend/provisioner/registry.go @@ -6,7 +6,6 @@ import ( "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" - "github.com/TBD54566975/ftl/backend/provisioner/dev" "github.com/TBD54566975/ftl/backend/provisioner/noop" "github.com/TBD54566975/ftl/common/plugin" "github.com/TBD54566975/ftl/internal/log" @@ -22,8 +21,8 @@ const ( ResourceTypeMysql ResourceType = "mysql" ) -// ProvisionerPluginConfig is a map of provisioner name to resources it supports -type ProvisionerPluginConfig struct { +// provisionerPluginConfig is a map of provisioner name to resources it supports +type provisionerPluginConfig struct { // The default provisioner to use for all resources not matched here Default string `toml:"default"` Plugins []struct { @@ -32,7 +31,7 @@ type ProvisionerPluginConfig struct { } `toml:"plugins"` } -func (cfg *ProvisionerPluginConfig) Validate() error { +func (cfg *provisionerPluginConfig) Validate() error { registeredResources := map[ResourceType]bool{} for _, plugin := range cfg.Plugins { for _, r := range plugin.Resources { @@ -45,18 +44,19 @@ func (cfg *ProvisionerPluginConfig) Validate() error { return nil } -type provisionerConfig struct { - provisioner provisionerconnect.ProvisionerPluginServiceClient - types []ResourceType +// ProvisionerBinding is a Provisioner and the types it supports +type ProvisionerBinding struct { + Provisioner provisionerconnect.ProvisionerPluginServiceClient + Types []ResourceType } // ProvisionerRegistry contains all known resource handlers in the order they should be executed type ProvisionerRegistry struct { Default provisionerconnect.ProvisionerPluginServiceClient - Provisioners []*provisionerConfig + Provisioners []*ProvisionerBinding } -func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) { +func registryFromConfig(ctx context.Context, cfg *provisionerPluginConfig) (*ProvisionerRegistry, error) { def, err := provisionerIDToProvisioner(ctx, cfg.Default) if err != nil { return nil, err @@ -79,9 +79,6 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn switch id { case "noop": return &noop.Provisioner{}, nil - case "dev": - // TODO: Wire in settings from ftl serve - return dev.NewProvisioner("postgres:15.8", 15432), nil default: plugin, _, err := plugin.Spawn( ctx, @@ -101,9 +98,9 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn // Register to the registry, to be executed after all the previously added handlers func (reg *ProvisionerRegistry) Register(handler provisionerconnect.ProvisionerPluginServiceClient, types ...ResourceType) { - reg.Provisioners = append(reg.Provisioners, &provisionerConfig{ - provisioner: handler, - types: types, + reg.Provisioners = append(reg.Provisioners, &ProvisionerBinding{ + Provisioner: handler, + Types: types, }) } @@ -155,10 +152,10 @@ func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Reso for _, r := range resources { found := false for _, cfg := range reg.Provisioners { - for _, t := range cfg.types { + for _, t := range cfg.Types { typed := typeOf(r) if t == typed { - result[cfg.provisioner] = append(result[cfg.provisioner], r) + result[cfg.Provisioner] = append(result[cfg.Provisioner], r) found = true break } diff --git a/backend/provisioner/service.go b/backend/provisioner/service.go index cbf3db0b82..999003c08b 100644 --- a/backend/provisioner/service.go +++ b/backend/provisioner/service.go @@ -48,12 +48,7 @@ type Service struct { var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil) -func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *ProvisionerPluginConfig) (*Service, error) { - registry, err := NewProvisionerRegistry(ctx, pluginConfig) - if err != nil { - return nil, fmt.Errorf("error creating provisioner registry: %w", err) - } - +func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, registry *ProvisionerRegistry) (*Service, error) { return &Service{ controllerClient: controllerClient, currentResources: map[string][]*proto.Resource{}, @@ -129,7 +124,7 @@ func replaceOutputs(to []*proto.Resource, from []*proto.Resource) error { } // Start the Provisioner. Blocks until the context is cancelled. -func Start(ctx context.Context, config Config, devel bool) error { +func Start(ctx context.Context, config Config, registry *ProvisionerRegistry) error { config.SetDefaults() logger := log.FromContext(ctx) @@ -137,19 +132,7 @@ func Start(ctx context.Context, config Config, devel bool) error { controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error) - pluginConfig := &ProvisionerPluginConfig{Default: "noop"} - if devel { - pluginConfig = &ProvisionerPluginConfig{Default: "dev"} - } - if config.PluginConfigFile != nil { - pc, err := readPluginConfig(config.PluginConfigFile) - if err != nil { - return fmt.Errorf("error reading plugin configuration: %w", err) - } - pluginConfig = pc - } - - svc, err := New(ctx, config, controllerClient, pluginConfig) + svc, err := New(ctx, config, controllerClient, registry) if err != nil { return err } @@ -169,16 +152,22 @@ func Start(ctx context.Context, config Config, devel bool) error { return nil } -func readPluginConfig(file *os.File) (*ProvisionerPluginConfig, error) { - result := ProvisionerPluginConfig{} +func RegistryFromConfigFile(ctx context.Context, file *os.File) (*ProvisionerRegistry, error) { + config := provisionerPluginConfig{} bytes, err := io.ReadAll(bufio.NewReader(file)) if err != nil { return nil, fmt.Errorf("error reading plugin configuration: %w", err) } - if err := toml.Unmarshal(bytes, &result); err != nil { + if err := toml.Unmarshal(bytes, &config); err != nil { return nil, fmt.Errorf("error parsing plugin configuration: %w", err) } - return &result, nil + + registry, err := registryFromConfig(ctx, &config) + if err != nil { + return nil, fmt.Errorf("error creating provisioner registry: %w", err) + } + + return registry, nil } // Deployment client calls to ftl-controller diff --git a/cmd/ftl-provisioner/main.go b/cmd/ftl-provisioner/main.go index b51ddf9a85..97b3dac5c4 100644 --- a/cmd/ftl-provisioner/main.go +++ b/cmd/ftl-provisioner/main.go @@ -40,6 +40,9 @@ func main() { err = observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - err = provisioner.Start(ctx, cli.ProvisionerConfig, false) + registry, err := provisioner.RegistryFromConfigFile(ctx, cli.ProvisionerConfig.PluginConfigFile) + kctx.FatalIfErrorf(err, "failed to create provisioner registry") + + err = provisioner.Start(ctx, cli.ProvisionerConfig, registry) kctx.FatalIfErrorf(err, "failed to start provisioner") } diff --git a/frontend/cli/cmd_serve.go b/frontend/cli/cmd_serve.go index 4079b9e15d..f7f97b4729 100644 --- a/frontend/cli/cmd_serve.go +++ b/frontend/cli/cmd_serve.go @@ -25,6 +25,7 @@ import ( "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect" "github.com/TBD54566975/ftl/backend/provisioner" + devprovisioner "github.com/TBD54566975/ftl/backend/provisioner/dev" "github.com/TBD54566975/ftl/internal/bind" "github.com/TBD54566975/ftl/internal/configuration" "github.com/TBD54566975/ftl/internal/configuration/manager" @@ -198,8 +199,23 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini scope := fmt.Sprintf("provisioner%d", i) provisionerCtx := log.ContextWithLogger(ctx, logger.Scope(scope)) + + // read provisioners from a config file if provided + registry := &provisioner.ProvisionerRegistry{} + if s.PluginConfigFile != nil { + r, err := provisioner.RegistryFromConfigFile(provisionerCtx, s.PluginConfigFile) + if err != nil { + return fmt.Errorf("failed to create provisioner registry: %w", err) + } + registry = r + } + + if registry.Default == nil { + registry.Default = devprovisioner.NewProvisioner(s.DBPort) + } + wg.Go(func() error { - if err := provisioner.Start(provisionerCtx, config, true); err != nil { + if err := provisioner.Start(provisionerCtx, config, registry); err != nil { logger.Errorf(err, "provisioner%d failed: %v", i, err) return fmt.Errorf("provisioner%d failed: %w", i, err) } diff --git a/internal/dev/db.go b/internal/dev/db.go index b263a614c5..2af056fc55 100644 --- a/internal/dev/db.go +++ b/internal/dev/db.go @@ -19,7 +19,7 @@ func SetupDB(ctx context.Context, image string, port int, recreate bool) (string exists, err := container.DoesExist(ctx, ftlContainerName, optional.Some(image)) if err != nil { - return "", err + return "", fmt.Errorf("failed to check if container exists: %w", err) } if !exists { @@ -34,7 +34,7 @@ func SetupDB(ctx context.Context, image string, port int, recreate bool) (string err = container.RunDB(ctx, ftlContainerName, port, image) if err != nil { - return "", err + return "", fmt.Errorf("failed to run db container: %w", err) } recreate = true @@ -42,30 +42,39 @@ func SetupDB(ctx context.Context, image string, port int, recreate bool) (string // Start the existing container err = container.Start(ctx, ftlContainerName) if err != nil { - return "", err + return "", fmt.Errorf("failed to start existing db container: %w", err) } // Grab the port from the existing container port, err = container.GetContainerPort(ctx, ftlContainerName, 5432) if err != nil { - return "", err + return "", fmt.Errorf("failed to get port from existing db container: %w", err) } logger.Debugf("Reusing existing docker container %s on port %d for postgres db", ftlContainerName, port) } - err = container.PollContainerHealth(ctx, ftlContainerName, 10*time.Second) + dsn, err := WaitForDBReady(ctx, port) if err != nil { return "", fmt.Errorf("db container failed to be healthy: %w", err) } - dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%d/ftl?sslmode=disable", port) - logger.Debugf("Postgres DSN: %s", dsn) - _, err = databasetesting.CreateForDevel(ctx, dsn, recreate) if err != nil { - return "", err + return "", fmt.Errorf("failed to create database: %w", err) } return dsn, nil } + +func WaitForDBReady(ctx context.Context, port int) (string, error) { + logger := log.FromContext(ctx) + err := container.PollContainerHealth(ctx, ftlContainerName, 10*time.Second) + if err != nil { + return "", fmt.Errorf("db container failed to be healthy: %w", err) + } + + dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%d/ftl?sslmode=disable", port) + logger.Debugf("Postgres DSN: %s", dsn) + return dsn, nil +} diff --git a/internal/integration/harness.go b/internal/integration/harness.go index 209965afad..e576072d2e 100644 --- a/internal/integration/harness.go +++ b/internal/integration/harness.go @@ -124,15 +124,21 @@ func WithoutController() Option { // WithProvisioner is a Run* option that starts the provisioner service. // if set, all deployments are done through the provisioner -func WithProvisioner(config string) Option { +func WithProvisioner() Option { return func(o *options) { - o.provisionerConfig = config o.startProvisioner = true // provisioner always needs a controller to talk to o.startController = true } } +// WithProvisionerConfig is a Run* option that specifies the provisioner config to use. +func WithProvisionerConfig(config string) Option { + return func(o *options) { + o.provisionerConfig = config + } +} + type options struct { languages []string testDataDir string @@ -247,11 +253,13 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) { Infof("Starting ftl cluster") args := []string{filepath.Join(binDir, "ftl"), "serve", "--recreate"} if opts.startProvisioner { - configFile := filepath.Join(tmpDir, "provisioner-plugin-config.toml") - os.WriteFile(configFile, []byte(opts.provisionerConfig), 0644) - args = append(args, "--provisioners=1") - args = append(args, "--provisioner-plugin-config="+configFile) + + if opts.provisionerConfig != "" { + configFile := filepath.Join(tmpDir, "provisioner-plugin-config.toml") + os.WriteFile(configFile, []byte(opts.provisionerConfig), 0644) + args = append(args, "--provisioner-plugin-config="+configFile) + } } ctx = startProcess(ctx, t, args...) }