Skip to content

Commit

Permalink
feat: save module schemas to .ftl/schemas (#3182)
Browse files Browse the repository at this point in the history
This removes the need for some cli commands to have a bind allocator, as
they no longer need a language plugin to declare where the deploy dir
is.
  • Loading branch information
matt2e authored Oct 24, 2024
1 parent 09e923f commit 5a99916
Show file tree
Hide file tree
Showing 21 changed files with 75 additions and 107 deletions.
17 changes: 6 additions & 11 deletions backend/controller/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"fmt"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/go-runtime/encoding"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand All @@ -23,25 +21,22 @@ type AdminService struct {
schr SchemaRetriever
cm *manager.Manager[configuration.Configuration]
sm *manager.Manager[configuration.Secrets]
// bindAllocator needs to be set for local client to retrieve schemas from disk using language plugins
bindAllocator optional.Option[*bind.BindAllocator]
}

var _ ftlv1connect.AdminServiceHandler = (*AdminService)(nil)

type SchemaRetriever interface {
// BindAllocator is required if the schema is retrieved from disk using language plugins
GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error)
GetActiveSchema(ctx context.Context) (*schema.Schema, error)
}

// NewAdminService creates a new AdminService.
// bindAllocator is optional and should be set if a local client is to be used that accesses schema from disk using language plugins.
func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever, bindAllocator optional.Option[*bind.BindAllocator]) *AdminService {
func NewAdminService(cm *manager.Manager[configuration.Configuration], sm *manager.Manager[configuration.Secrets], schr SchemaRetriever) *AdminService {
return &AdminService{
schr: schr,
cm: cm,
sm: sm,
bindAllocator: bindAllocator,
schr: schr,
cm: cm,
sm: sm,
}
}

Expand Down Expand Up @@ -254,7 +249,7 @@ func (s *AdminService) validateAgainstSchema(ctx context.Context, isSecret bool,
}

// If we can't retrieve an active schema, skip validation.
sch, err := s.schr.GetActiveSchema(ctx, s.bindAllocator)
sch, err := s.schr.GetActiveSchema(ctx)
if err != nil {
logger.Debugf("skipping validation; could not get the active schema: %v", err)
return nil
Expand Down
7 changes: 3 additions & 4 deletions backend/controller/admin/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/alecthomas/types/optional"

ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand All @@ -36,7 +35,7 @@ func TestAdminService(t *testing.T) {
providers.Inline[configuration.Secrets]{},
})
assert.NoError(t, err)
admin := NewAdminService(cm, sm, &diskSchemaRetriever{}, optional.None[*bind.BindAllocator]())
admin := NewAdminService(cm, sm, &diskSchemaRetriever{})
assert.NotZero(t, admin)

expectedEnvarValue, err := json.MarshalIndent(map[string]string{"bar": "barfoo"}, "", " ")
Expand Down Expand Up @@ -200,7 +199,7 @@ var testSchema = schema.MustValidate(&schema.Schema{
type mockSchemaRetriever struct {
}

func (d *mockSchemaRetriever) GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
func (d *mockSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
return testSchema, nil
}

Expand All @@ -218,7 +217,7 @@ func TestAdminValidation(t *testing.T) {
providers.Inline[configuration.Secrets]{},
})
assert.NoError(t, err)
admin := NewAdminService(cm, sm, &mockSchemaRetriever{}, optional.None[*bind.BindAllocator]())
admin := NewAdminService(cm, sm, &mockSchemaRetriever{})
assert.NotZero(t, admin)

testSetConfig(t, ctx, admin, "batmobile", "color", "Black", "")
Expand Down
30 changes: 4 additions & 26 deletions backend/controller/admin/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/buildengine/languageplugin"
cf "github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/errors"
Expand All @@ -30,15 +28,11 @@ type diskSchemaRetriever struct {
}

// NewLocalClient creates a admin client that reads and writes from the provided config and secret managers
func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[cf.Secrets], bindAllocator *bind.BindAllocator) Client {
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{}, optional.Some(bindAllocator))}
func NewLocalClient(cm *manager.Manager[cf.Configuration], sm *manager.Manager[cf.Secrets]) Client {
return &localClient{NewAdminService(cm, sm, &diskSchemaRetriever{})}
}

func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context, bAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
bindAllocator, ok := bAllocator.Get()
if !ok {
return nil, fmt.Errorf("no bind allocator available")
}
func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
path, ok := projectconfig.DefaultConfigPath().Get()
if !ok {
return nil, fmt.Errorf("no project config path available")
Expand All @@ -57,23 +51,7 @@ func (s *diskSchemaRetriever) GetActiveSchema(ctx context.Context, bAllocator op

for _, m := range modules {
go func() {
// Loading a plugin can be expensive. Is there a better way?
plugin, err := languageplugin.New(ctx, bindAllocator, m.Language)
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not load plugin for %s: %w", m.Module, err))
}
defer plugin.Kill() // nolint:errcheck

customDefaults, err := plugin.ModuleConfigDefaults(ctx, m.Dir)
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not get module config defaults for %s: %w", m.Module, err))
}

config, err := m.FillDefaultsAndValidate(customDefaults)
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not validate module config for %s: %w", m.Module, err))
}
module, err := schema.ModuleFromProtoFile(config.Abs().Schema())
module, err := schema.ModuleFromProtoFile(projConfig.SchemaPath(m.Module))
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](fmt.Errorf("could not load module schema: %w", err))
return
Expand Down
13 changes: 3 additions & 10 deletions backend/controller/admin/local_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,11 @@ package admin

import (
"context"
"net/url"
"testing"

"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/internal/bind"
cf "github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand All @@ -22,13 +20,8 @@ import (

func getDiskSchema(t testing.TB, ctx context.Context) (*schema.Schema, error) {
t.Helper()

bindURL, err := url.Parse("http://127.0.0.1:8893")
assert.NoError(t, err)
bindAllocator, err := bind.NewBindAllocator(bindURL)
assert.NoError(t, err)
dsr := &diskSchemaRetriever{}
return dsr.GetActiveSchema(ctx, optional.Some(bindAllocator))
return dsr.GetActiveSchema(ctx)
}

func TestDiskSchemaRetrieverWithBuildArtefact(t *testing.T) {
Expand Down Expand Up @@ -76,10 +69,10 @@ func TestAdminNoValidationWithNoSchema(t *testing.T) {
assert.NoError(t, err)

dsr := &diskSchemaRetriever{deployRoot: optional.Some(string(t.TempDir()))}
_, err = dsr.GetActiveSchema(ctx, optional.None[*bind.BindAllocator]())
_, err = dsr.GetActiveSchema(ctx)
assert.Error(t, err)

admin := NewAdminService(cm, sm, dsr, optional.None[*bind.BindAllocator]())
admin := NewAdminService(cm, sm, dsr)
testSetConfig(t, ctx, admin, "batmobile", "color", "Red", "")
testSetSecret(t, ctx, admin, "batmobile", "owner", 99, "")
}
3 changes: 1 addition & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/providers"
Expand Down Expand Up @@ -164,7 +163,7 @@ func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScali
cm := cf.ConfigFromContext(ctx)
sm := cf.SecretsFromContext(ctx)

admin := admin.NewAdminService(cm, sm, svc.dal, optional.None[*bind.BindAllocator]())
admin := admin.NewAdminService(cm, sm, svc.dal)
console := console.NewService(svc.dal, svc.timeline)

ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress")
Expand Down
3 changes: 1 addition & 2 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltypes"
"github.com/TBD54566975/ftl/backend/libdal"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -584,7 +583,7 @@ func (d *DAL) GetActiveDeployments(ctx context.Context) ([]dalmodel.Deployment,
}

// GetActiveSchema returns the schema for all active deployments.
func (d *DAL) GetActiveSchema(ctx context.Context, bindAllocator optional.Option[*bind.BindAllocator]) (*schema.Schema, error) {
func (d *DAL) GetActiveSchema(ctx context.Context) (*schema.Schema, error) {
deployments, err := d.GetActiveDeployments(ctx)
if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (b *boxCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC
}
_ = bindAllocator.Next()

engine, err := buildengine.New(ctx, client, projConfig.Root(), b.Build.Dirs, bindAllocator, buildengine.BuildEnv(b.Build.BuildEnv), buildengine.Parallelism(b.Build.Parallelism))
engine, err := buildengine.New(ctx, client, projConfig, b.Build.Dirs, bindAllocator, buildengine.BuildEnv(b.Build.BuildEnv), buildengine.Parallelism(b.Build.Parallelism))
if err != nil {
return err
}
Expand Down Expand Up @@ -163,7 +163,7 @@ func (b *boxCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC
return err
}
files = append(files, filepath.Join(config.Dir, "ftl.toml"))
files = append(files, config.Schema())
files = append(files, projConfig.SchemaPath(config.Module))
for _, file := range files {
relFile, err := filepath.Rel(config.Dir, file)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions frontend/cli/cmd_box_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
return fmt.Errorf("controller failed to start: %w", err)
}

engine, err := buildengine.New(ctx, client, projConfig.Root(), []string{b.Dir}, runnerPortAllocator)
engine, err := buildengine.New(ctx, client, projConfig, []string{b.Dir}, runnerPortAllocator)
if err != nil {
return fmt.Errorf("failed to create build engine: %w", err)
}
Expand All @@ -85,7 +85,7 @@ func (b *boxRunCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
// Manually import the schema for each module to get the dependency graph.
err = engine.Each(func(m buildengine.Module) error {
logger.Debugf("Loading schema for module %q", m.Config.Module)
mod, err := schema.ModuleFromProtoFile(m.Config.Abs().Schema())
mod, err := schema.ModuleFromProtoFile(projConfig.SchemaPath(m.Config.Module))
if err != nil {
return fmt.Errorf("failed to read schema for module %q: %w", m.Config.Module, err)
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_build.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (b *buildCmd) Run(ctx context.Context, client ftlv1connect.ControllerServic
}
_ = bindAllocator.Next()

engine, err := buildengine.New(ctx, client, projConfig.Root(), b.Dirs, bindAllocator, buildengine.BuildEnv(b.BuildEnv), buildengine.Parallelism(b.Parallelism))
engine, err := buildengine.New(ctx, client, projConfig, b.Dirs, bindAllocator, buildengine.BuildEnv(b.BuildEnv), buildengine.Parallelism(b.Parallelism))
if err != nil {
return err
}
Expand Down
11 changes: 1 addition & 10 deletions frontend/cli/cmd_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/admin"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/internal/bind"
"github.com/TBD54566975/ftl/internal/configuration"
"github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/configuration/routers"
Expand Down Expand Up @@ -87,15 +86,7 @@ func setUpAdminClient(ctx context.Context, config projectconfig.Config) (ctxOut
return ctx, client, fmt.Errorf("could not create secrets manager: %w", err)
}
ctx = manager.ContextWithSecrets(ctx, sm)

// use the cli endpoint to create the bind allocator, but leave the first port unused as it is meant to be reserved by a controller
bindAllocator, err := bind.NewBindAllocator(cli.Endpoint)
if err != nil {
return ctx, client, fmt.Errorf("could not create bind allocator: %w", err)
}
_ = bindAllocator.Next()

return ctx, admin.NewLocalClient(cm, sm, bindAllocator), nil
return ctx, admin.NewLocalClient(cm, sm), nil
}
return ctx, adminServiceClient, nil
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (d *deployCmd) Run(ctx context.Context, projConfig projectconfig.Config) er
}
_ = bindAllocator.Next()

engine, err := buildengine.New(ctx, client, projConfig.Root(), d.Build.Dirs, bindAllocator, buildengine.BuildEnv(d.Build.BuildEnv), buildengine.Parallelism(d.Build.Parallelism))
engine, err := buildengine.New(ctx, client, projConfig, d.Build.Dirs, bindAllocator, buildengine.BuildEnv(d.Build.BuildEnv), buildengine.Parallelism(d.Build.Parallelism))
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (d *devCmd) Run(ctx context.Context, k *kong.Kong, projConfig projectconfig
})
}

engine, err := buildengine.New(ctx, client, projConfig.Root(), d.Build.Dirs, bindAllocator, opts...)
engine, err := buildengine.New(ctx, client, projConfig, d.Build.Dirs, bindAllocator, opts...)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion frontend/cli/cmd_schema_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func localSchema(ctx context.Context, projectConfig projectconfig.Config, bindAl
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](err)
}
module, err := schema.ModuleFromProtoFile(config.Abs().Schema())
module, err := schema.ModuleFromProtoFile(projectConfig.SchemaPath(config.Module))
if err != nil {
moduleSchemas <- either.RightOf[*schema.Module](err)
return
Expand Down
17 changes: 12 additions & 5 deletions internal/buildengine/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"time"

"github.com/alecthomas/types/result"
Expand All @@ -14,6 +15,7 @@ import (
"github.com/TBD54566975/ftl/internal/errors"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/moduleconfig"
"github.com/TBD54566975/ftl/internal/projectconfig"
"github.com/TBD54566975/ftl/internal/schema"
)

Expand All @@ -24,16 +26,16 @@ var errInvalidateDependencies = errors.New("dependencies need to be updated")
// Plugins must use a lock file to ensure that only one build is running at a time.
//
// Returns invalidateDependenciesError if the build failed due to a change in dependencies.
func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectRootDir string, bctx languageplugin.BuildContext, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) {
func build(ctx context.Context, plugin languageplugin.LanguagePlugin, projectConfig projectconfig.Config, bctx languageplugin.BuildContext, buildEnv []string, devMode bool) (moduleSchema *schema.Module, deploy []string, err error) {
logger := log.FromContext(ctx).Module(bctx.Config.Module).Scope("build")
ctx = log.ContextWithLogger(ctx, logger)

stubsRoot := stubsLanguageDir(projectRootDir, bctx.Config.Language)
return handleBuildResult(ctx, bctx.Config, result.From(plugin.Build(ctx, projectRootDir, stubsRoot, bctx, buildEnv, devMode)))
stubsRoot := stubsLanguageDir(projectConfig.Root(), bctx.Config.Language)
return handleBuildResult(ctx, projectConfig, bctx.Config, result.From(plugin.Build(ctx, projectConfig.Root(), stubsRoot, bctx, buildEnv, devMode)))
}

// handleBuildResult processes the result of a build
func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherResult result.Result[languageplugin.BuildResult]) (moduleSchema *schema.Module, deploy []string, err error) {
func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config, c moduleconfig.ModuleConfig, eitherResult result.Result[languageplugin.BuildResult]) (moduleSchema *schema.Module, deploy []string, err error) {
logger := log.FromContext(ctx)
config := c.Abs()

Expand Down Expand Up @@ -66,7 +68,12 @@ func handleBuildResult(ctx context.Context, c moduleconfig.ModuleConfig, eitherR
if err != nil {
return nil, nil, fmt.Errorf("failed to marshal schema: %w", err)
}
if err := os.WriteFile(config.Schema(), schemaBytes, 0600); err != nil {
schemaPath := projectConfig.SchemaPath(config.Module)
err = os.MkdirAll(filepath.Dir(schemaPath), 0700)
if err != nil {
return nil, nil, fmt.Errorf("failed to create schema directory: %w", err)
}
if err := os.WriteFile(schemaPath, schemaBytes, 0600); err != nil {
return nil, nil, fmt.Errorf("failed to write schema: %w", err)
}
return result.Schema, result.Deploy, nil
Expand Down
Loading

0 comments on commit 5a99916

Please sign in to comment.