Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: module context over gRPC #1311

Merged
merged 10 commits into from
Apr 22, 2024
7 changes: 6 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,12 @@ nextModule:

// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest]) (*connect.Response[ftlv1.ModuleContextResponse], error) {
return nil, fmt.Errorf("not implemented")
// get module schema
schemas, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, err
}
return moduleContextToProto(ctx, req.Msg.Module, schemas)
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down
97 changes: 97 additions & 0 deletions backend/controller/module_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package controller

import (
"context"
"fmt"
"os"
"strings"

"connectrpc.com/connect"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
cf "github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/internal/slices"
)

func moduleContextToProto(ctx context.Context, name string, schemas []*schema.Module) (*connect.Response[ftlv1.ModuleContextResponse], error) {
schemas = slices.Filter(schemas, func(s *schema.Module) bool {
return s.Name == name
})
if len(schemas) == 0 {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no schema found for module %q", name))
} else if len(schemas) > 1 {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("multiple schemas found for module %q", name))
}

// configs
configManager := cf.ConfigFromContext(ctx)
configMap, err := bytesMapFromConfigManager(ctx, configManager, name)
if err != nil {
return nil, err
}

// secrets
secretsManager := cf.SecretsFromContext(ctx)
secretsMap, err := bytesMapFromConfigManager(ctx, secretsManager, name)
if err != nil {
return nil, err
}

// DSNs
dsnProtos := []*ftlv1.ModuleContextResponse_DSN{}
for _, decl := range schemas[0].Decls {
dbDecl, ok := decl.(*schema.Database)
if !ok {
continue
}
key := fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", strings.ToUpper(name), strings.ToUpper(dbDecl.Name))
dsn, ok := os.LookupEnv(key)
if !ok {
return nil, fmt.Errorf("missing environment variable %q", key)
}
dsnProtos = append(dsnProtos, &ftlv1.ModuleContextResponse_DSN{
Name: dbDecl.Name,
Type: ftlv1.ModuleContextResponse_POSTGRES,
Dsn: dsn,
})
}

return connect.NewResponse(&ftlv1.ModuleContextResponse{
Configs: configMap,
Secrets: secretsMap,
Databases: dsnProtos,
}), nil
}

func bytesMapFromConfigManager[R cf.Role](ctx context.Context, manager *cf.Manager[R], moduleName string) (map[string][]byte, error) {
configList, err := manager.List(ctx)
if err != nil {
return nil, err
}

// module specific values must override global values
// put module specific values into moduleConfigMap, then merge with configMap
configMap := map[string][]byte{}
moduleConfigMap := map[string][]byte{}

for _, entry := range configList {
refModule, isModuleSpecific := entry.Module.Get()
if isModuleSpecific && refModule != moduleName {
continue
}
data, err := manager.GetData(ctx, entry.Ref)
if err != nil {
return nil, err
}
if !isModuleSpecific {
configMap[entry.Ref.Name] = data
} else {
moduleConfigMap[entry.Ref.Name] = data
}
}

for name, data := range moduleConfigMap {
configMap[name] = data
}
return configMap, nil
}
55 changes: 55 additions & 0 deletions backend/controller/module_context_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package controller

import (
"context"
"fmt"
"testing"

"github.com/TBD54566975/ftl/backend/schema"
cf "github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/internal/log"
"github.com/alecthomas/assert/v2"
"github.com/alecthomas/types/optional"
)

func TestModuleContextProto(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())

moduleName := "test"

cp := cf.NewInMemoryProvider[cf.Configuration]()
cr := cf.NewInMemoryResolver[cf.Configuration]()
cm, err := cf.New(ctx, cr, []cf.Provider[cf.Configuration]{cp})
assert.NoError(t, err)
ctx = cf.ContextWithConfig(ctx, cm)

sp := cf.NewInMemoryProvider[cf.Secrets]()
sr := cf.NewInMemoryResolver[cf.Secrets]()
sm, err := cf.New(ctx, sr, []cf.Provider[cf.Secrets]{sp})
assert.NoError(t, err)
ctx = cf.ContextWithSecrets(ctx, sm)

// Set 50 configs and 50 global configs
// It's hard to tell if module config beats global configs because we are dealing with unordered maps, or because the logic is correct
// Repeating it 50 times hopefully gives us a good chance of catching inconsistencies
for i := range 50 {
key := fmt.Sprintf("key%d", i)

strValue := "HelloWorld"
globalStrValue := "GlobalHelloWorld"
assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.Some(moduleName), Name: key}, strValue))
assert.NoError(t, cm.Set(ctx, cf.Ref{Module: optional.None[string](), Name: key}, globalStrValue))
}

response, err := moduleContextToProto(ctx, moduleName, []*schema.Module{
{
Name: moduleName,
},
})
assert.NoError(t, err)

for i := range 50 {
key := fmt.Sprintf("key%d", i)
assert.Equal(t, "\"HelloWorld\"", string(response.Msg.Configs[key]), "module configs should beat global configs")
}
}
18 changes: 13 additions & 5 deletions cmd/ftl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"

"github.com/alecthomas/kong"
Expand Down Expand Up @@ -92,10 +91,19 @@ func main() {
kctx.BindTo(sr, (*cf.Resolver[cf.Secrets])(nil))
kctx.BindTo(cr, (*cf.Resolver[cf.Configuration])(nil))

// Propagate to runner processes.
// TODO: This is a bit of a hack until we get proper configuration
// management through the Controller.
os.Setenv("FTL_CONFIG", strings.Join(projectconfig.ConfigPaths(cli.ConfigFlag), ","))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noice!

// Add config manager to context.
cm, err := cf.NewConfigurationManager(ctx, cr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithConfig(ctx, cm)

// Add secrets manager to context.
sm, err := cf.NewSecretsManager(ctx, sr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithSecrets(ctx, sm)

// Handle signals.
sigch := make(chan os.Signal, 1)
Expand Down
44 changes: 44 additions & 0 deletions common/configuration/in_memory_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

// InMemoryProvider is a configuration provider that keeps values in memory
type InMemoryProvider[R Role] struct {
values map[Ref][]byte
}

var _ MutableProvider[Configuration] = &InMemoryProvider[Configuration]{}

func NewInMemoryProvider[R Role]() *InMemoryProvider[R] {
return &InMemoryProvider[R]{values: map[Ref][]byte{}}
}

func (p *InMemoryProvider[R]) Role() R { var r R; return r }
func (p *InMemoryProvider[R]) Key() string { return "grpc" }

func (p *InMemoryProvider[R]) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
if bytes, found := p.values[ref]; found {
return bytes, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (p *InMemoryProvider[R]) Writer() bool {
return true
}

// Store a configuration value and return its key.
func (p *InMemoryProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
p.values[ref] = value
return &url.URL{Scheme: p.Key()}, nil
}

// Delete a configuration value.
func (p *InMemoryProvider[R]) Delete(ctx context.Context, ref Ref) error {
delete(p.values, ref)
return nil
}
45 changes: 45 additions & 0 deletions common/configuration/in_memory_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

type InMemoryResolver[R Role] struct {
keyMap map[Ref]*url.URL
}

var _ Resolver[Configuration] = InMemoryResolver[Configuration]{}
var _ Resolver[Secrets] = InMemoryResolver[Secrets]{}

func NewInMemoryResolver[R Role]() *InMemoryResolver[R] {
return &InMemoryResolver[R]{keyMap: map[Ref]*url.URL{}}
}

func (k InMemoryResolver[R]) Role() R { var r R; return r }

func (k InMemoryResolver[R]) Get(ctx context.Context, ref Ref) (*url.URL, error) {
if key, found := k.keyMap[ref]; found {
return key, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (k InMemoryResolver[R]) List(ctx context.Context) ([]Entry, error) {
entries := []Entry{}
for ref, url := range k.keyMap {
entries = append(entries, Entry{Ref: ref, Accessor: url})
}
return entries, nil
}

func (k InMemoryResolver[R]) Set(ctx context.Context, ref Ref, key *url.URL) error {
k.keyMap[ref] = key
return nil
}

func (k InMemoryResolver[R]) Unset(ctx context.Context, ref Ref) error {
delete(k.keyMap, ref)
return nil
}
45 changes: 31 additions & 14 deletions common/configuration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,49 +64,66 @@ func (m *Manager[R]) Mutable() error {
return fmt.Errorf("no writeable configuration provider available, specify one of %s", strings.Join(writers, ", "))
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
// GetData returns a data value for a configuration from the active providers.
// The data can be unmarshalled from JSON.
func (m *Manager[R]) GetData(ctx context.Context, ref Ref) ([]byte, error) {
key, err := m.resolver.Get(ctx, ref)
// Try again at the global scope if the value is not found in module scope.
if ref.Module.Ok() && errors.Is(err, ErrNotFound) {
gref := ref
gref.Module = optional.None[string]()
key, err = m.resolver.Get(ctx, gref)
if err != nil {
return err
return nil, err
}
} else if err != nil {
return err
return nil, err
}
provider, ok := m.providers[key.Scheme]
if !ok {
return fmt.Errorf("no provider for scheme %q", key.Scheme)
return nil, fmt.Errorf("no provider for scheme %q", key.Scheme)
}
data, err := provider.Load(ctx, ref, key)
if err != nil {
return fmt.Errorf("%s: %w", ref, err)
return nil, fmt.Errorf("%s: %w", ref, err)
}
return data, nil
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
data, err := m.GetData(ctx, ref)
if err != nil {
return err
}
return json.Unmarshal(data, value)
}

// Set a configuration value in the active writing provider.
// SetData updates the configuration value in the active writing provider as raw bytes.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
// "data" must be bytes that can be unmarshalled into a Go type.
func (m *Manager[R]) SetData(ctx context.Context, ref Ref, data []byte) error {
if err := m.Mutable(); err != nil {
return err
}
data, err := json.Marshal(value)
key, err := m.writer.Store(ctx, ref, data)
if err != nil {
return err
}
key, err := m.writer.Store(ctx, ref, data)
return m.resolver.Set(ctx, ref, key)
}

// Set a configuration value in the active writing provider.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
return m.resolver.Set(ctx, ref, key)
return m.SetData(ctx, ref, data)
}

// Unset a configuration value in all providers.
Expand Down
Loading
Loading