Skip to content

Commit

Permalink
chore: use ftl serve config with dev provisioner
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Oct 6, 2024
1 parent f653c76 commit 35a524c
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 84 deletions.
12 changes: 6 additions & 6 deletions backend/provisioner/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *Task) Start(ctx context.Context) error {

resp, err := t.handler.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
Module: t.module,
// TODO: We need a proper cluster speicific ID here
// TODO: We need a proper cluster specific ID here
FtlClusterId: "ftl",
ExistingResources: t.existing,
DesiredResources: t.constructResourceContext(t.desired),
Expand Down Expand Up @@ -83,13 +83,13 @@ func (t *Task) Progress(ctx context.Context) error {
if err != nil {
return fmt.Errorf("error getting state: %w", err)
}
if succ, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
switch s := resp.Msg.Status.(type) {
case *provisioner.StatusResponse_Success:
t.state = TaskStateDone
t.output = succ.Success.UpdatedResources
}
if fail, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
t.output = s.Success.UpdatedResources
case *provisioner.StatusResponse_Failed:
t.state = TaskStateFailed
return errors.New(fail.Failed.ErrorMessage)
return errors.New(s.Failed.ErrorMessage)
}
return nil
}
Expand Down
37 changes: 19 additions & 18 deletions backend/provisioner/dev/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dev

import (
"context"
"fmt"
"math/rand/v2"
"strconv"
"sync/atomic"
Expand All @@ -12,7 +13,9 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/internal/dev"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/schema/strcase"
"github.com/XSAM/otelsql"
"github.com/puzpuzpuz/xsync/v3"
)

type task struct {
Expand All @@ -34,19 +37,18 @@ type step struct {
done atomic.Bool
}

// Provisioner is a provisioner for running FTL locally
// Provisioner for running FTL locally
type Provisioner struct {
running map[string]*task
running *xsync.MapOf[string, *task]
postgresDSN string

postgresImage string
postgresPort int
postgresPort int
}

func NewProvisioner(postgresImage string, postgresPort int) *Provisioner {
func NewProvisioner(postgresPort int) *Provisioner {
return &Provisioner{
postgresImage: postgresImage,
postgresPort: postgresPort,
postgresPort: postgresPort,
running: xsync.NewMapOf[string, *task](),
}
}

Expand All @@ -61,10 +63,6 @@ func (d *Provisioner) Plan(context.Context, *connect.Request[provisioner.PlanReq
}

func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provisioner.ProvisionRequest]) (*connect.Response[provisioner.ProvisionResponse], error) {
if d.running == nil {
d.running = map[string]*task{}
}

previous := map[string]*provisioner.Resource{}
for _, r := range req.Msg.ExistingResources {
previous[r.ResourceId] = r
Expand All @@ -80,6 +78,8 @@ func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provis

d.provisionPostgres(ctx, tr, req.Msg.Module, r.Resource.ResourceId, step)
default:
err := fmt.Errorf("unsupported resource type: %T", r.Resource.Resource)
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
}
}
Expand All @@ -90,8 +90,8 @@ func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provis
}), nil
}

token := strconv.Itoa(rand.Int())
d.running[token] = task
token := strconv.Itoa(rand.Int()) //nolint:gosec
d.running.Store(token, task)

return connect.NewResponse(&provisioner.ProvisionResponse{
ProvisioningToken: token,
Expand All @@ -101,7 +101,7 @@ func (d *Provisioner) Provision(ctx context.Context, req *connect.Request[provis

func (d *Provisioner) Status(ctx context.Context, req *connect.Request[provisioner.StatusRequest]) (*connect.Response[provisioner.StatusResponse], error) {
token := req.Msg.ProvisioningToken
task, ok := d.running[token]
task, ok := d.running.Load(token)
if !ok {
return statusFailure("unknown token")
}
Expand All @@ -118,7 +118,7 @@ func (d *Provisioner) Status(ctx context.Context, req *connect.Request[provision
}
resources = append(resources, step.resource)
}
delete(d.running, token)
d.running.Delete(token)

return connect.NewResponse(&provisioner.StatusResponse{
Status: &provisioner.StatusResponse_Success{
Expand Down Expand Up @@ -146,14 +146,15 @@ func (d *Provisioner) provisionPostgres(ctx context.Context, tr *provisioner.Res
go func() {
defer step.done.Store(true)
if d.postgresDSN == "" {
dsn, err := dev.SetupDB(ctx, d.postgresImage, d.postgresPort, false)
// We assume that the DB hsas already been started when running in dev mode
dsn, err := dev.WaitForDBReady(ctx, d.postgresPort)
if err != nil {
step.err = err
return
}
d.postgresDSN = dsn
}
dbName := module + "_" + id
dbName := strcase.ToLowerSnake(module) + "_" + strcase.ToLowerSnake(id)
conn, err := otelsql.Open("pgx", d.postgresDSN)
if err != nil {
step.err = err
Expand All @@ -166,14 +167,14 @@ func (d *Provisioner) provisionPostgres(ctx context.Context, tr *provisioner.Res
step.err = err
return
}
defer res.Close()
if !res.Next() {
_, err = conn.ExecContext(ctx, "CREATE DATABASE "+dbName)
if err != nil {
step.err = err
return
}
}
res.Close()

if tr.Postgres == nil {
tr.Postgres = &provisioner.PostgresResource{}
Expand Down
5 changes: 3 additions & 2 deletions backend/provisioner/provisioner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ import (

func TestDeploymentThroughNoopProvisioner(t *testing.T) {
in.Run(t,
in.WithProvisioner(`
in.WithProvisioner(),
in.WithProvisionerConfig(`
default = "noop"
plugins = [
{ id = "noop", resources = ["postgres"] },
Expand All @@ -28,7 +29,7 @@ func TestDeploymentThroughNoopProvisioner(t *testing.T) {

func TestDeploymentThrougDevProvisionerCreatePostgresDB(t *testing.T) {
in.Run(t,
in.WithProvisioner(`default = "dev"`),
in.WithProvisioner(),
in.CopyModule("echo"),
in.DropDBAction(t, "echo_echodb"),
in.Deploy("echo"),
Expand Down
31 changes: 14 additions & 17 deletions backend/provisioner/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/provisioner/dev"
"github.com/TBD54566975/ftl/backend/provisioner/noop"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -22,8 +21,8 @@ const (
ResourceTypeMysql ResourceType = "mysql"
)

// ProvisionerPluginConfig is a map of provisioner name to resources it supports
type ProvisionerPluginConfig struct {
// provisionerPluginConfig is a map of provisioner name to resources it supports
type provisionerPluginConfig struct {
// The default provisioner to use for all resources not matched here
Default string `toml:"default"`
Plugins []struct {
Expand All @@ -32,7 +31,7 @@ type ProvisionerPluginConfig struct {
} `toml:"plugins"`
}

func (cfg *ProvisionerPluginConfig) Validate() error {
func (cfg *provisionerPluginConfig) Validate() error {
registeredResources := map[ResourceType]bool{}
for _, plugin := range cfg.Plugins {
for _, r := range plugin.Resources {
Expand All @@ -45,18 +44,19 @@ func (cfg *ProvisionerPluginConfig) Validate() error {
return nil
}

type provisionerConfig struct {
provisioner provisionerconnect.ProvisionerPluginServiceClient
types []ResourceType
// ProvisionerBinding is a Provisioner and the types it supports
type ProvisionerBinding struct {
Provisioner provisionerconnect.ProvisionerPluginServiceClient
Types []ResourceType
}

// ProvisionerRegistry contains all known resource handlers in the order they should be executed
type ProvisionerRegistry struct {
Default provisionerconnect.ProvisionerPluginServiceClient
Provisioners []*provisionerConfig
Provisioners []*ProvisionerBinding
}

func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) {
func registryFromConfig(ctx context.Context, cfg *provisionerPluginConfig) (*ProvisionerRegistry, error) {
def, err := provisionerIDToProvisioner(ctx, cfg.Default)
if err != nil {
return nil, err
Expand All @@ -79,9 +79,6 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn
switch id {
case "noop":
return &noop.Provisioner{}, nil
case "dev":
// TODO: Wire in settings from ftl serve
return dev.NewProvisioner("postgres:15.8", 15432), nil
default:
plugin, _, err := plugin.Spawn(
ctx,
Expand All @@ -101,9 +98,9 @@ func provisionerIDToProvisioner(ctx context.Context, id string) (provisionerconn

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(handler provisionerconnect.ProvisionerPluginServiceClient, types ...ResourceType) {
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{
provisioner: handler,
types: types,
reg.Provisioners = append(reg.Provisioners, &ProvisionerBinding{
Provisioner: handler,
Types: types,
})
}

Expand Down Expand Up @@ -155,10 +152,10 @@ func (reg *ProvisionerRegistry) groupByProvisioner(resources []*provisioner.Reso
for _, r := range resources {
found := false
for _, cfg := range reg.Provisioners {
for _, t := range cfg.types {
for _, t := range cfg.Types {
typed := typeOf(r)
if t == typed {
result[cfg.provisioner] = append(result[cfg.provisioner], r)
result[cfg.Provisioner] = append(result[cfg.Provisioner], r)
found = true
break
}
Expand Down
37 changes: 13 additions & 24 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,7 @@ type Service struct {

var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil)

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *ProvisionerPluginConfig) (*Service, error) {
registry, err := NewProvisionerRegistry(ctx, pluginConfig)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, registry *ProvisionerRegistry) (*Service, error) {
return &Service{
controllerClient: controllerClient,
currentResources: map[string][]*proto.Resource{},
Expand Down Expand Up @@ -129,27 +124,15 @@ func replaceOutputs(to []*proto.Resource, from []*proto.Resource) error {
}

// Start the Provisioner. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config, devel bool) error {
func Start(ctx context.Context, config Config, registry *ProvisionerRegistry) error {
config.SetDefaults()

logger := log.FromContext(ctx)
logger.Debugf("Starting FTL provisioner")

controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error)

pluginConfig := &ProvisionerPluginConfig{Default: "noop"}
if devel {
pluginConfig = &ProvisionerPluginConfig{Default: "dev"}
}
if config.PluginConfigFile != nil {
pc, err := readPluginConfig(config.PluginConfigFile)
if err != nil {
return fmt.Errorf("error reading plugin configuration: %w", err)
}
pluginConfig = pc
}

svc, err := New(ctx, config, controllerClient, pluginConfig)
svc, err := New(ctx, config, controllerClient, registry)
if err != nil {
return err
}
Expand All @@ -169,16 +152,22 @@ func Start(ctx context.Context, config Config, devel bool) error {
return nil
}

func readPluginConfig(file *os.File) (*ProvisionerPluginConfig, error) {
result := ProvisionerPluginConfig{}
func RegistryFromConfigFile(ctx context.Context, file *os.File) (*ProvisionerRegistry, error) {
config := provisionerPluginConfig{}
bytes, err := io.ReadAll(bufio.NewReader(file))
if err != nil {
return nil, fmt.Errorf("error reading plugin configuration: %w", err)
}
if err := toml.Unmarshal(bytes, &result); err != nil {
if err := toml.Unmarshal(bytes, &config); err != nil {
return nil, fmt.Errorf("error parsing plugin configuration: %w", err)
}
return &result, nil

registry, err := registryFromConfig(ctx, &config)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

return registry, nil
}

// Deployment client calls to ftl-controller
Expand Down
5 changes: 4 additions & 1 deletion cmd/ftl-provisioner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func main() {
err = observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

err = provisioner.Start(ctx, cli.ProvisionerConfig, false)
registry, err := provisioner.RegistryFromConfigFile(ctx, cli.ProvisionerConfig.PluginConfigFile)
kctx.FatalIfErrorf(err, "failed to create provisioner registry")

err = provisioner.Start(ctx, cli.ProvisionerConfig, registry)
kctx.FatalIfErrorf(err, "failed to start provisioner")
}
18 changes: 17 additions & 1 deletion frontend/cli/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/provisioner"
devprovisioner "github.com/TBD54566975/ftl/backend/provisioner/dev"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
Expand Down Expand Up @@ -198,8 +199,23 @@ func (s *serveCmd) run(ctx context.Context, projConfig projectconfig.Config, ini

scope := fmt.Sprintf("provisioner%d", i)
provisionerCtx := log.ContextWithLogger(ctx, logger.Scope(scope))

// read provisioners from a config file if provided
registry := &provisioner.ProvisionerRegistry{}
if s.PluginConfigFile != nil {
r, err := provisioner.RegistryFromConfigFile(provisionerCtx, s.PluginConfigFile)
if err != nil {
return fmt.Errorf("failed to create provisioner registry: %w", err)
}
registry = r
}

if registry.Default == nil {
registry.Default = devprovisioner.NewProvisioner(s.DBPort)
}

wg.Go(func() error {
if err := provisioner.Start(provisionerCtx, config, true); err != nil {
if err := provisioner.Start(provisionerCtx, config, registry); err != nil {
logger.Errorf(err, "provisioner%d failed: %v", i, err)
return fmt.Errorf("provisioner%d failed: %w", i, err)
}
Expand Down
Loading

0 comments on commit 35a524c

Please sign in to comment.