Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: configurable provisioners in ftl-provisioner #2925

Merged
merged 2 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions backend/provisioner/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ func (d *Deployment) Progress(ctx context.Context) (bool, error) {
return true, err
}
}
err := next.Progress(ctx)
return d.next().Ok(), err
if next.state != TaskStateDone {
err := next.Progress(ctx)
if err != nil {
return true, err
}
}
return d.next().Ok(), nil
}

type DeploymentState struct {
Expand Down
19 changes: 19 additions & 0 deletions backend/provisioner/deployment/noop_provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package deployment

import (
"context"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
)

type NoopProvisioner struct{}

func (n *NoopProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
return "", nil
}

func (n *NoopProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
return TaskStateDone, desired, nil
}

var _ Provisioner = (*NoopProvisioner)(nil)
71 changes: 71 additions & 0 deletions backend/provisioner/deployment/plugin_provisioner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package deployment

import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
)

// PluginProvisioner delegates provisioning to an external plugin
type PluginProvisioner struct {
cmdCtx context.Context
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient]
}

var _ Provisioner = (*PluginProvisioner)(nil)

func NewPluginProvisioner(ctx context.Context, name string) (*PluginProvisioner, error) {
client, cmdCtx, err := plugin.Spawn(
ctx,
log.Debug,
"ftl-provisioner-"+name,
".",
"ftl-provisioner-"+name,
provisionerconnect.NewProvisionerPluginServiceClient,
)
if err != nil {
return nil, fmt.Errorf("error spawning plugin: %w", err)
}

return &PluginProvisioner{
cmdCtx: cmdCtx,
client: client,
}, nil
}

func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
DesiredResources: desired,
ExistingResources: existing,
FtlClusterId: "ftl",
Module: module,
}))
if err != nil {
return "", fmt.Errorf("error calling plugin: %w", err)
}
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED {
return resp.Msg.ProvisioningToken, nil
}
return "", nil
}

func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{
ProvisioningToken: token,
}))
if err != nil {
return "", nil, fmt.Errorf("error getting status from plugin: %w", err)
}
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage)
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
return TaskStateDone, success.Success.UpdatedResources, nil
}
return TaskStateRunning, nil, nil
}
107 changes: 44 additions & 63 deletions backend/provisioner/deployment/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,8 @@ import (
"context"
"fmt"

"connectrpc.com/connect"

"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1beta1/provisioner/provisionerconnect"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/plugin"
"github.com/TBD54566975/ftl/internal/log"
)

// ResourceType is a type of resource used to configure provisioners
Expand All @@ -28,16 +23,60 @@ type Provisioner interface {
State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error)
}

// ProvisionerPluginConfig is a map of provisioner name to resources it supports
type ProvisionerPluginConfig struct {
// The default provisioner to use for all resources not matched here
Default string `toml:"default"`
Plugins []struct {
Name string `toml:"name"`
Resources []ResourceType `toml:"resources"`
} `toml:"plugins"`
}

func (cfg *ProvisionerPluginConfig) Validate() error {
registeredResources := map[ResourceType]bool{}
for _, plugin := range cfg.Plugins {
for _, r := range plugin.Resources {
if registeredResources[r] {
return fmt.Errorf("resource type %s is already registered. Trying to re-register for %s", r, plugin.Name)
}
registeredResources[r] = true
}
}
return nil
}

type provisionerConfig struct {
provisioner Provisioner
types []ResourceType
}

// ProvisionerRegistry contains all known resource handlers in the order they should be executed
type ProvisionerRegistry struct {
Default Provisioner
Provisioners []*provisionerConfig
}

func NewProvisionerRegistry(ctx context.Context, cfg *ProvisionerPluginConfig) (*ProvisionerRegistry, error) {
result := &ProvisionerRegistry{}
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("error validating provisioner config: %w", err)
}
for _, plugin := range cfg.Plugins {
switch plugin.Name {
case "noop":
result.Register(&NoopProvisioner{}, plugin.Resources...)
default:
provisioner, err := NewPluginProvisioner(ctx, plugin.Name)
if err != nil {
return nil, fmt.Errorf("error creating provisioner plugin %s: %w", plugin.Name, err)
}
result.Register(provisioner, plugin.Resources...)
}
}
return result, nil
}

// Register to the registry, to be executed after all the previously added handlers
func (reg *ProvisionerRegistry) Register(handler Provisioner, types ...ResourceType) {
reg.Provisioners = append(reg.Provisioners, &provisionerConfig{
Expand Down Expand Up @@ -112,61 +151,3 @@ func typeOf(r *provisioner.Resource) ResourceType {
}
return ResourceTypeUnknown
}

// PluginProvisioner delegates provisioning to an external plugin
type PluginProvisioner struct {
cmdCtx context.Context
client *plugin.Plugin[provisionerconnect.ProvisionerPluginServiceClient]
}

func NewPluginProvisioner(ctx context.Context, name, dir, exe string) (*PluginProvisioner, error) {
client, cmdCtx, err := plugin.Spawn(
ctx,
log.Debug,
name,
dir,
exe,
provisionerconnect.NewProvisionerPluginServiceClient,
)
if err != nil {
return nil, fmt.Errorf("error spawning plugin: %w", err)
}

return &PluginProvisioner{
cmdCtx: cmdCtx,
client: client,
}, nil
}

func (p *PluginProvisioner) Provision(ctx context.Context, module string, desired []*provisioner.ResourceContext, existing []*provisioner.Resource) (string, error) {
resp, err := p.client.Client.Provision(ctx, connect.NewRequest(&provisioner.ProvisionRequest{
DesiredResources: desired,
ExistingResources: existing,
FtlClusterId: "ftl",
Module: module,
}))
if err != nil {
return "", fmt.Errorf("error calling plugin: %w", err)
}
if resp.Msg.Status != provisioner.ProvisionResponse_SUBMITTED {
return resp.Msg.ProvisioningToken, nil
}
return "", nil
}

func (p *PluginProvisioner) State(ctx context.Context, token string, desired []*provisioner.Resource) (TaskState, []*provisioner.Resource, error) {
resp, err := p.client.Client.Status(ctx, connect.NewRequest(&provisioner.StatusRequest{
ProvisioningToken: token,
}))
if err != nil {
return "", nil, fmt.Errorf("error getting status from plugin: %w", err)
}
if failed, ok := resp.Msg.Status.(*provisioner.StatusResponse_Failed); ok {
return TaskStateFailed, nil, fmt.Errorf("provisioning failed: %s", failed.Failed.ErrorMessage)
} else if success, ok := resp.Msg.Status.(*provisioner.StatusResponse_Success); ok {
return TaskStateDone, success.Success.UpdatedResources, nil
}
return TaskStateRunning, nil, nil
}

var _ Provisioner = (*PluginProvisioner)(nil)
9 changes: 7 additions & 2 deletions backend/provisioner/provisioner_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
"github.com/alecthomas/assert/v2"
)

func TestDeploymentThroughProvisioner(t *testing.T) {
func TestDeploymentThroughNoopProvisioner(t *testing.T) {
in.Run(t,
in.WithProvisioner(),
in.WithProvisioner(`
default = "noop"
plugins = [
{ name = "noop", resources = ["postgres"] },
]
`),
in.CopyModule("echo"),
in.Deploy("echo"),
in.Call("echo", "echo", "Bob", func(t testing.TB, response string) {
Expand Down
48 changes: 40 additions & 8 deletions backend/provisioner/service.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package provisioner

import (
"bufio"
"context"
"fmt"
"io"
"net/url"
"os"

"connectrpc.com/connect"
"github.com/BurntSushi/toml"
"github.com/alecthomas/kong"
"golang.org/x/sync/errgroup"

Expand All @@ -19,34 +23,42 @@ import (
"github.com/TBD54566975/ftl/internal/rpc"
)

// CommonProvisionerConfig is shared config between the production controller and development server.
type CommonProvisionerConfig struct {
PluginConfigFile *os.File `name:"provisioner-plugin-config" help:"Path to the plugin configuration file." env:"FTL_PROVISIONER_PLUGIN_CONFIG_FILE"`
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8893" env:"FTL_PROVISIONER_BIND"`
Advertise *url.URL `help:"Endpoint the Provisioner should advertise (must be unique across the cluster, defaults to --bind if omitted)." env:"FTL_PROVISIONER_ADVERTISE"`
ControllerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
CommonProvisionerConfig
}

func (c *Config) SetDefaults() {
if err := kong.ApplyDefaults(c); err != nil {
panic(err)
}
if c.Advertise == nil {
c.Advertise = c.Bind
}
}

type Service struct {
controllerClient ftlv1connect.ControllerServiceClient
// TODO: Store in a resource graph
currentResources map[string][]*provisioner.Resource
registry deployment.ProvisionerRegistry
registry *deployment.ProvisionerRegistry
}

var _ provisionerconnect.ProvisionerServiceHandler = (*Service)(nil)

func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, devel bool) (*Service, error) {
func New(ctx context.Context, config Config, controllerClient ftlv1connect.ControllerServiceClient, pluginConfig *deployment.ProvisionerPluginConfig) (*Service, error) {
registry, err := deployment.NewProvisionerRegistry(ctx, pluginConfig)
if err != nil {
return nil, fmt.Errorf("error creating provisioner registry: %w", err)
}

return &Service{
controllerClient: controllerClient,
currentResources: map[string][]*provisioner.Resource{},
registry: registry,
}, nil
}

Expand Down Expand Up @@ -126,12 +138,20 @@ func Start(ctx context.Context, config Config, devel bool) error {

controllerClient := rpc.Dial(ftlv1connect.NewControllerServiceClient, config.ControllerEndpoint.String(), log.Error)

svc, err := New(ctx, config, controllerClient, devel)
pluginConfig := &deployment.ProvisionerPluginConfig{Default: "noop"}
if config.PluginConfigFile != nil {
pc, err := readPluginConfig(config.PluginConfigFile)
if err != nil {
return fmt.Errorf("error reading plugin configuration: %w", err)
}
pluginConfig = pc
}

svc, err := New(ctx, config, controllerClient, pluginConfig)
if err != nil {
return err
}
logger.Debugf("Provisioner available at: %s", config.Bind)
logger.Debugf("Advertising as %s", config.Advertise)
logger.Debugf("Using FTL endpoint: %s", config.ControllerEndpoint)

g, ctx := errgroup.WithContext(ctx)
Expand All @@ -147,6 +167,18 @@ func Start(ctx context.Context, config Config, devel bool) error {
return nil
}

func readPluginConfig(file *os.File) (*deployment.ProvisionerPluginConfig, error) {
result := deployment.ProvisionerPluginConfig{}
bytes, err := io.ReadAll(bufio.NewReader(file))
if err != nil {
return nil, fmt.Errorf("error reading plugin configuration: %w", err)
}
if err := toml.Unmarshal(bytes, &result); err != nil {
return nil, fmt.Errorf("error parsing plugin configuration: %w", err)
}
return &result, nil
}

// Deployment client calls to ftl-controller

func (s *Service) GetArtefactDiffs(ctx context.Context, req *connect.Request[ftlv1.GetArtefactDiffsRequest]) (*connect.Response[ftlv1.GetArtefactDiffsResponse], error) {
Expand Down
4 changes: 4 additions & 0 deletions backend/provisioner/testdata/go/echo/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package echo
import (
"context"
"fmt"

"github.com/TBD54566975/ftl/go-runtime/ftl"
)

var db = ftl.PostgresDatabase("echodb")

// Echo returns a greeting with the current time.
//
//ftl:verb export
Expand Down
Loading
Loading