Skip to content

Commit

Permalink
feat: module context over gRPC (#1311)
Browse files Browse the repository at this point in the history
Doc: https://hackmd.io/@ftl/SyynEK2eC
Other PRs
- [x] Pre-Req: protobuf changes
#1312
- [ ] Possible future change for tests moved here:
#1317

Changes:
- Module server requests module context, instead of calculating them
independently from envars
- controller alculates values for module context (config, secrets,
DSNs), returns it to module via gRPC
- Within a module, `ftl.PostgresDatabase(name string)` now returns a
handler, with `Get()` needing to be called to get `*sql.DB`
- Module context builder is used to compose a module context. It can
take in:
    - a GetModuleContext response, to read in values received over gRPC
    - manual entry (useful for testing) (see other PR above)
- Added in-memory implementations of `configuration.MutableProvider` and
`configuration.Resolver`. This allows us to have a backing store for
configs and secrets for `ModuleContext` which shares the same behaviour
for (un)marshalling json.
- `configuration.Manager` now has raw data versions of `Get()` and
`Set()` called `GetData()` and `SetData()` which expose the features
without needing unnecessary conversion steps. This helps when moving
values between configuration managers directly or over gRPC.
  • Loading branch information
matt2e authored Apr 22, 2024
1 parent 4ded60b commit a916327
Show file tree
Hide file tree
Showing 12 changed files with 496 additions and 42 deletions.
7 changes: 6 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,12 @@ nextModule:

// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest]) (*connect.Response[ftlv1.ModuleContextResponse], error) {
return nil, fmt.Errorf("not implemented")
// get module schema
schemas, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, err
}
return moduleContextToProto(ctx, req.Msg.Module, schemas)
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down
97 changes: 97 additions & 0 deletions backend/controller/module_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package controller

import (
"context"
"fmt"
"os"
"strings"

"connectrpc.com/connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
cf "github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/internal/slices"
)

func moduleContextToProto(ctx context.Context, name string, schemas []*schema.Module) (*connect.Response[ftlv1.ModuleContextResponse], error) {
schemas = slices.Filter(schemas, func(s *schema.Module) bool {
return s.Name == name
})
if len(schemas) == 0 {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no schema found for module %q", name))
} else if len(schemas) > 1 {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("multiple schemas found for module %q", name))
}

// configs
configManager := cf.ConfigFromContext(ctx)
configMap, err := bytesMapFromConfigManager(ctx, configManager, name)
if err != nil {
return nil, err
}

// secrets
secretsManager := cf.SecretsFromContext(ctx)
secretsMap, err := bytesMapFromConfigManager(ctx, secretsManager, name)
if err != nil {
return nil, err
}

// DSNs
dsnProtos := []*ftlv1.ModuleContextResponse_DSN{}
for _, decl := range schemas[0].Decls {
dbDecl, ok := decl.(*schema.Database)
if !ok {
continue
}
key := fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", strings.ToUpper(name), strings.ToUpper(dbDecl.Name))
dsn, ok := os.LookupEnv(key)
if !ok {
return nil, fmt.Errorf("missing environment variable %q", key)
}
dsnProtos = append(dsnProtos, &ftlv1.ModuleContextResponse_DSN{
Name: dbDecl.Name,
Type: ftlv1.ModuleContextResponse_POSTGRES,
Dsn: dsn,
})
}

return connect.NewResponse(&ftlv1.ModuleContextResponse{
Configs: configMap,
Secrets: secretsMap,
Databases: dsnProtos,
}), nil
}

func bytesMapFromConfigManager[R cf.Role](ctx context.Context, manager *cf.Manager[R], moduleName string) (map[string][]byte, error) {
configList, err := manager.List(ctx)
if err != nil {
return nil, err
}

// module specific values must override global values
// put module specific values into moduleConfigMap, then merge with configMap
configMap := map[string][]byte{}
moduleConfigMap := map[string][]byte{}

for _, entry := range configList {
refModule, isModuleSpecific := entry.Module.Get()
if isModuleSpecific && refModule != moduleName {
continue
}
data, err := manager.GetData(ctx, entry.Ref)
if err != nil {
return nil, err
}
if !isModuleSpecific {
configMap[entry.Ref.Name] = data
} else {
moduleConfigMap[entry.Ref.Name] = data
}
}

for name, data := range moduleConfigMap {
configMap[name] = data
}
return configMap, nil
}
55 changes: 55 additions & 0 deletions backend/controller/module_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package controller

import (
"context"
"fmt"
"testing"

"github.com/TBD54566975/ftl/backend/schema"
cf "github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestModuleContextProto(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())

moduleName := "test"

cp := cf.NewInMemoryProvider[cf.Configuration]()
cr := cf.NewInMemoryResolver[cf.Configuration]()
cm, err := cf.New(ctx, cr, []cf.Provider[cf.Configuration]{cp})
assert.NoError(t, err)
ctx = cf.ContextWithConfig(ctx, cm)

sp := cf.NewInMemoryProvider[cf.Secrets]()
sr := cf.NewInMemoryResolver[cf.Secrets]()
sm, err := cf.New(ctx, sr, []cf.Provider[cf.Secrets]{sp})
assert.NoError(t, err)
ctx = cf.ContextWithSecrets(ctx, sm)

// Set 50 configs and 50 global configs
// It's hard to tell if module config beats global configs because we are dealing with unordered maps, or because the logic is correct
// Repeating it 50 times hopefully gives us a good chance of catching inconsistencies
for i := range 50 {
key := fmt.Sprintf("key%d", i)

strValue := "HelloWorld"
globalStrValue := "GlobalHelloWorld"
assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.Some(moduleName), Name: key}, strValue))
assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.None[string](), Name: key}, globalStrValue))
}

response, err := moduleContextToProto(ctx, moduleName, []*schema.Module{
{
Name: moduleName,
},
})
assert.NoError(t, err)

for i := range 50 {
key := fmt.Sprintf("key%d", i)
assert.Equal(t, "\"HelloWorld\"", string(response.Msg.Configs[key]), "module configs should beat global configs")
}
}
18 changes: 13 additions & 5 deletions cmd/ftl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"

"github.com/alecthomas/kong"
Expand Down Expand Up @@ -92,10 +91,19 @@ func main() {
kctx.BindTo(sr, (*cf.Resolver[cf.Secrets])(nil))
kctx.BindTo(cr, (*cf.Resolver[cf.Configuration])(nil))

// Propagate to runner processes.
// TODO: This is a bit of a hack until we get proper configuration
// management through the Controller.
os.Setenv("FTL_CONFIG", strings.Join(projectconfig.ConfigPaths(cli.ConfigFlag), ","))
// Add config manager to context.
cm, err := cf.NewConfigurationManager(ctx, cr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithConfig(ctx, cm)

// Add secrets manager to context.
sm, err := cf.NewSecretsManager(ctx, sr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithSecrets(ctx, sm)

// Handle signals.
sigch := make(chan os.Signal, 1)
Expand Down
44 changes: 44 additions & 0 deletions common/configuration/in_memory_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

// InMemoryProvider is a configuration provider that keeps values in memory
type InMemoryProvider[R Role] struct {
values map[Ref][]byte
}

var _ MutableProvider[Configuration] = &InMemoryProvider[Configuration]{}

func NewInMemoryProvider[R Role]() *InMemoryProvider[R] {
return &InMemoryProvider[R]{values: map[Ref][]byte{}}
}

func (p *InMemoryProvider[R]) Role() R { var r R; return r }
func (p *InMemoryProvider[R]) Key() string { return "grpc" }

func (p *InMemoryProvider[R]) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
if bytes, found := p.values[ref]; found {
return bytes, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (p *InMemoryProvider[R]) Writer() bool {
return true
}

// Store a configuration value and return its key.
func (p *InMemoryProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
p.values[ref] = value
return &url.URL{Scheme: p.Key()}, nil
}

// Delete a configuration value.
func (p *InMemoryProvider[R]) Delete(ctx context.Context, ref Ref) error {
delete(p.values, ref)
return nil
}
45 changes: 45 additions & 0 deletions common/configuration/in_memory_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

type InMemoryResolver[R Role] struct {
keyMap map[Ref]*url.URL
}

var _ Resolver[Configuration] = InMemoryResolver[Configuration]{}
var _ Resolver[Secrets] = InMemoryResolver[Secrets]{}

func NewInMemoryResolver[R Role]() *InMemoryResolver[R] {
return &InMemoryResolver[R]{keyMap: map[Ref]*url.URL{}}
}

func (k InMemoryResolver[R]) Role() R { var r R; return r }

func (k InMemoryResolver[R]) Get(ctx context.Context, ref Ref) (*url.URL, error) {
if key, found := k.keyMap[ref]; found {
return key, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (k InMemoryResolver[R]) List(ctx context.Context) ([]Entry, error) {
entries := []Entry{}
for ref, url := range k.keyMap {
entries = append(entries, Entry{Ref: ref, Accessor: url})
}
return entries, nil
}

func (k InMemoryResolver[R]) Set(ctx context.Context, ref Ref, key *url.URL) error {
k.keyMap[ref] = key
return nil
}

func (k InMemoryResolver[R]) Unset(ctx context.Context, ref Ref) error {
delete(k.keyMap, ref)
return nil
}
45 changes: 31 additions & 14 deletions common/configuration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,49 +64,66 @@ func (m *Manager[R]) Mutable() error {
return fmt.Errorf("no writeable configuration provider available, specify one of %s", strings.Join(writers, ", "))
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
// GetData returns a data value for a configuration from the active providers.
// The data can be unmarshalled from JSON.
func (m *Manager[R]) GetData(ctx context.Context, ref Ref) ([]byte, error) {
key, err := m.resolver.Get(ctx, ref)
// Try again at the global scope if the value is not found in module scope.
if ref.Module.Ok() && errors.Is(err, ErrNotFound) {
gref := ref
gref.Module = optional.None[string]()
key, err = m.resolver.Get(ctx, gref)
if err != nil {
return err
return nil, err
}
} else if err != nil {
return err
return nil, err
}
provider, ok := m.providers[key.Scheme]
if !ok {
return fmt.Errorf("no provider for scheme %q", key.Scheme)
return nil, fmt.Errorf("no provider for scheme %q", key.Scheme)
}
data, err := provider.Load(ctx, ref, key)
if err != nil {
return fmt.Errorf("%s: %w", ref, err)
return nil, fmt.Errorf("%s: %w", ref, err)
}
return data, nil
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
data, err := m.GetData(ctx, ref)
if err != nil {
return err
}
return json.Unmarshal(data, value)
}

// Set a configuration value in the active writing provider.
// SetData updates the configuration value in the active writing provider as raw bytes.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
// "data" must be bytes that can be unmarshalled into a Go type.
func (m *Manager[R]) SetData(ctx context.Context, ref Ref, data []byte) error {
if err := m.Mutable(); err != nil {
return err
}
data, err := json.Marshal(value)
key, err := m.writer.Store(ctx, ref, data)
if err != nil {
return err
}
key, err := m.writer.Store(ctx, ref, data)
return m.resolver.Set(ctx, ref, key)
}

// Set a configuration value in the active writing provider.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
return m.resolver.Set(ctx, ref, key)
return m.SetData(ctx, ref, data)
}

// Unset a configuration value in all providers.
Expand Down
Loading

0 comments on commit a916327

Please sign in to comment.