Skip to content

Commit

Permalink
chore: use ftl serve config with dev provisioner
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Oct 6, 2024
1 parent f653c76 commit 0002ebe
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 63 deletions.
2 changes: 1 addition & 1 deletion backend/provisioner/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *Task) Start(ctx context.Context) error {

resp, err := t.handler.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
Module: t.module,
// TODO: We need a proper cluster speicific ID here
// TODO: We need a proper cluster specific ID here
FtlClusterId: "ftl",
ExistingResources: t.existing,
DesiredResources: t.constructResourceContext(t.desired),
Expand Down
5 changes: 3 additions & 2 deletions backend/provisioner/dev/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provis
}), nil
}

token := strconv.Itoa(rand.Int())
token := strconv.Itoa(rand.Int()) //nolint:gosec
d.running[token] = task

return connect.NewResponse(&provisioner.ProvisionResponse{
Expand Down Expand Up @@ -146,7 +146,8 @@ func (d *Provisioner) provisionPostgres(ctx context.Context, tr *provisioner.Res
go func() {
defer step.done.Store(true)
if d.postgresDSN == "" {
dsn, err := dev.SetupDB(ctx, d.postgresImage, d.postgresPort, false)
// We assume that the DB hsas already been started when running in dev mode
dsn, err := dev.WaitForDBReady(ctx, d.postgresPort)
if err != nil {
step.err = err
return
Expand Down
5 changes: 3 additions & 2 deletions backend/provisioner/provisioner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (

func TestDeploymentThroughNoopProvisioner(t *testing.T) {
in.Run(t,
in.WithProvisioner(`
in.WithProvisioner(),
in.WithProvisionerConfig(`
default = "noop"
plugins = [
{ id = "noop", resources = ["postgres"] },
Expand All @@ -28,7 +29,7 @@ func TestDeploymentThroughNoopProvisioner(t *testing.T) {

func TestDeploymentThrougDevProvisionerCreatePostgresDB(t *testing.T) {
in.Run(t,
in.WithProvisioner(`default = "dev"`),
in.WithProvisioner(),
in.CopyModule("echo"),
in.DropDBAction(t, "echo_echodb"),
in.Deploy("echo"),
Expand Down
31 changes: 14 additions & 17 deletions backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/provisioner/dev"
"github.com/TBD54566975/ftl/backend/provisioner/noop"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -22,8 +21,8 @@ const (
ResourceTypeMysql ResourceType = "mysql"
)

// ProvisionerPluginConfig is a map of provisioner name to resources it supports
type ProvisionerPluginConfig struct {
// provisionerPluginConfig is a map of provisioner name to resources it supports
type provisionerPluginConfig struct {
// The default provisioner to use for all resources not matched here
Default string `toml:"default"`
Plugins []struct {
Expand All @@ -32,7 +31,7 @@ type ProvisionerPluginConfig struct {
} `toml:"plugins"`
}

func (cfg *ProvisionerPluginConfig) Validate() error {
func (cfg *provisionerPluginConfig) Validate() error {
registeredResources := map[ResourceType]bool{}
for _, plugin := range cfg.Plugins {
for _, r := range plugin.Resources {
Expand All @@ -45,18 +44,19 @@ func (cfg *ProvisionerPluginConfig) Validate() error {
return nil
}

type provisionerConfig struct {
provisioner provisionerconnect.ProvisionerPluginServiceClient
types []ResourceType
// ProvisionerBinding is a Provisioner and the types it supports
type ProvisionerBinding struct {
Provisioner provisionerconnect.ProvisionerPluginServiceClient
Types []ResourceType
}

// ProvisionerRegistry contains all known resource handlers in the order they should be executed
type ProvisionerRegistry struct {
Default provisionerconnect.ProvisionerPluginServiceClient
Provisioners []*provisionerConfig
Provisioners []*ProvisionerBinding
}

func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) {
func registryFromConfig(ctx context.Context, cfg *provisionerPluginConfig) (*ProvisionerRegistry, error) {
def, err := provisionerIDToProvisioner(ctx, cfg.Default)
if err != nil {
return nil, err
Expand All @@ -79,9 +79,6 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn
switch id {
case "noop":
return &noop.Provisioner{}, nil
case "dev":
// TODO: Wire in settings from ftl serve
return dev.NewProvisioner("postgres:15.8", 15432), nil
default:
plugin, _, err := plugin.Spawn(
ctx,
Expand All @@ -101,9 +98,9 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(handler provisionerconnect.ProvisionerPluginServiceClient, types ...ResourceType) {
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{
provisioner: handler,
types: types,
reg.Provisioners = append(reg.Provisioners, &ProvisionerBinding{
Provisioner: handler,
Types: types,
})
}

Expand Down Expand Up @@ -155,10 +152,10 @@ func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Reso
for _, r := range resources {
found := false
for _, cfg := range reg.Provisioners {
for _, t := range cfg.types {
for _, t := range cfg.Types {
typed := typeOf(r)
if t == typed {
result[cfg.provisioner] = append(result[cfg.provisioner], r)
result[cfg.Provisioner] = append(result[cfg.Provisioner], r)
found = true
break
}
Expand Down
37 changes: 13 additions & 24 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ type Service struct {

var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil)

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *ProvisionerPluginConfig) (*Service, error) {
registry, err := NewProvisionerRegistry(ctx, pluginConfig)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, registry *ProvisionerRegistry) (*Service, error) {
return &Service{
controllerClient: controllerClient,
currentResources: map[string][]*proto.Resource{},
Expand Down Expand Up @@ -129,27 +124,15 @@ func replaceOutputs(to []*proto.Resource, from []*proto.Resource) error {
}

// Start the Provisioner. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, devel bool) error {
func Start(ctx context.Context, config Config, registry *ProvisionerRegistry) error {
config.SetDefaults()

logger := log.FromContext(ctx)
logger.Debugf("Starting FTL provisioner")

controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error)

pluginConfig := &ProvisionerPluginConfig{Default: "noop"}
if devel {
pluginConfig = &ProvisionerPluginConfig{Default: "dev"}
}
if config.PluginConfigFile != nil {
pc, err := readPluginConfig(config.PluginConfigFile)
if err != nil {
return fmt.Errorf("error reading plugin configuration: %w", err)
}
pluginConfig = pc
}

svc, err := New(ctx, config, controllerClient, pluginConfig)
svc, err := New(ctx, config, controllerClient, registry)
if err != nil {
return err
}
Expand All @@ -169,16 +152,22 @@ func Start(ctx context.Context, config Config, devel bool) error {
return nil
}

func readPluginConfig(file *os.File) (*ProvisionerPluginConfig, error) {
result := ProvisionerPluginConfig{}
func RegistryFromConfigFile(ctx context.Context, file *os.File) (*ProvisionerRegistry, error) {
config := provisionerPluginConfig{}
bytes, err := io.ReadAll(bufio.NewReader(file))
if err != nil {
return nil, fmt.Errorf("error reading plugin configuration: %w", err)
}
if err := toml.Unmarshal(bytes, &result); err != nil {
if err := toml.Unmarshal(bytes, &config); err != nil {
return nil, fmt.Errorf("error parsing plugin configuration: %w", err)
}
return &result, nil

registry, err := registryFromConfig(ctx, &config)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

return registry, nil
}

// Deployment client calls to ftl-controller
Expand Down
5 changes: 4 additions & 1 deletion cmd/ftl-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func main() {
err = observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

err = provisioner.Start(ctx, cli.ProvisionerConfig, false)
registry, err := provisioner.RegistryFromConfigFile(ctx, cli.ProvisionerConfig.PluginConfigFile)
kctx.FatalIfErrorf(err, "failed to create provisioner registry")

err = provisioner.Start(ctx, cli.ProvisionerConfig, registry)
kctx.FatalIfErrorf(err, "failed to start provisioner")
}
18 changes: 17 additions & 1 deletion frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/provisioner"
devprovisioner "github.com/TBD54566975/ftl/backend/provisioner/dev"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
Expand Down Expand Up @@ -198,8 +199,23 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini

scope := fmt.Sprintf("provisioner%d", i)
provisionerCtx := log.ContextWithLogger(ctx, logger.Scope(scope))

// read provisioners from a config file if provided
registry := &provisioner.ProvisionerRegistry{}
if s.PluginConfigFile != nil {
r, err := provisioner.RegistryFromConfigFile(provisionerCtx, s.PluginConfigFile)
if err != nil {
return fmt.Errorf("failed to create provisioner registry: %w", err)
}
registry = r
}

if registry.Default == nil {
registry.Default = devprovisioner.NewProvisioner(s.DatabaseImage, s.DBPort)
}

wg.Go(func() error {
if err := provisioner.Start(provisionerCtx, config, true); err != nil {
if err := provisioner.Start(provisionerCtx, config, registry); err != nil {
logger.Errorf(err, "provisioner%d failed: %v", i, err)
return fmt.Errorf("provisioner%d failed: %w", i, err)
}
Expand Down
27 changes: 18 additions & 9 deletions internal/dev/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func SetupDB(ctx context.Context, image string, port int, recreate bool) (string

exists, err := container.DoesExist(ctx, ftlContainerName, optional.Some(image))
if err != nil {
return "", err
return "", fmt.Errorf("failed to check if container exists: %w", err)
}

if !exists {
Expand All @@ -34,38 +34,47 @@ func SetupDB(ctx context.Context, image string, port int, recreate bool) (string

err = container.RunDB(ctx, ftlContainerName, port, image)
if err != nil {
return "", err
return "", fmt.Errorf("failed to run db container: %w", err)
}

recreate = true
} else {
// Start the existing container
err = container.Start(ctx, ftlContainerName)
if err != nil {
return "", err
return "", fmt.Errorf("failed to start existing db container: %w", err)
}

// Grab the port from the existing container
port, err = container.GetContainerPort(ctx, ftlContainerName, 5432)
if err != nil {
return "", err
return "", fmt.Errorf("failed to get port from existing db container: %w", err)
}

logger.Debugf("Reusing existing docker container %s on port %d for postgres db", ftlContainerName, port)
}

err = container.PollContainerHealth(ctx, ftlContainerName, 10*time.Second)
dsn, err := WaitForDBReady(ctx, port)
if err != nil {
return "", fmt.Errorf("db container failed to be healthy: %w", err)
}

dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%d/ftl?sslmode=disable", port)
logger.Debugf("Postgres DSN: %s", dsn)

_, err = databasetesting.CreateForDevel(ctx, dsn, recreate)
if err != nil {
return "", err
return "", fmt.Errorf("failed to create database: %w", err)
}

return dsn, nil
}

func WaitForDBReady(ctx context.Context, port int) (string, error) {
logger := log.FromContext(ctx)
err := container.PollContainerHealth(ctx, ftlContainerName, 10*time.Second)
if err != nil {
return "", fmt.Errorf("db container failed to be healthy: %w", err)
}

dsn := fmt.Sprintf("postgres://postgres:secret@localhost:%d/ftl?sslmode=disable", port)
logger.Debugf("Postgres DSN: %s", dsn)
return dsn, nil
}
20 changes: 14 additions & 6 deletions internal/integration/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,21 @@ func WithoutController() Option {

// WithProvisioner is a Run* option that starts the provisioner service.
// if set, all deployments are done through the provisioner
func WithProvisioner(config string) Option {
func WithProvisioner() Option {
return func(o *options) {
o.provisionerConfig = config
o.startProvisioner = true
// provisioner always needs a controller to talk to
o.startController = true
}
}

// WithProvisionerConfig is a Run* option that specifies the provisioner config to use.
func WithProvisionerConfig(config string) Option {
return func(o *options) {
o.provisionerConfig = config
}
}

type options struct {
languages []string
testDataDir string
Expand Down Expand Up @@ -247,11 +253,13 @@ func run(t *testing.T, actionsOrOptions ...ActionOrOption) {
Infof("Starting ftl cluster")
args := []string{filepath.Join(binDir, "ftl"), "serve", "--recreate"}
if opts.startProvisioner {
configFile := filepath.Join(tmpDir, "provisioner-plugin-config.toml")
os.WriteFile(configFile, []byte(opts.provisionerConfig), 0644)

args = append(args, "--provisioners=1")
args = append(args, "--provisioner-plugin-config="+configFile)

if opts.provisionerConfig != "" {
configFile := filepath.Join(tmpDir, "provisioner-plugin-config.toml")
os.WriteFile(configFile, []byte(opts.provisionerConfig), 0644)
args = append(args, "--provisioner-plugin-config="+configFile)
}
}
ctx = startProcess(ctx, t, args...)
}
Expand Down

0 comments on commit 0002ebe

Please sign in to comment.