Skip to content

Commit

Permalink
feat: module context over grpc
Browse files Browse the repository at this point in the history
# Conflicts:
#	backend/controller/controller.go
  • Loading branch information
matt2e committed Apr 19, 2024
1 parent 7fc41f2 commit 8a62999
Show file tree
Hide file tree
Showing 12 changed files with 764 additions and 42 deletions.
30 changes: 29 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/common/configuration"
"github.com/TBD54566975/ftl/common/modulecontext"
frontend "github.com/TBD54566975/ftl/frontend"
"github.com/TBD54566975/ftl/internal/cors"
"github.com/TBD54566975/ftl/internal/log"
Expand Down Expand Up @@ -642,7 +644,33 @@ nextModule:

// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest]) (*connect.Response[ftlv1.ModuleContextResponse], error) {
return nil, fmt.Errorf("not implemented")
// get module schema
schemas, err := s.dal.GetActiveDeploymentSchemas(ctx)
if err != nil {
return nil, err
}
schemas = slices.Filter(schemas, func(s *schema.Module) bool {
return s.Name == req.Msg.Module
})
if len(schemas) == 0 {
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no schema found for module %q", req.Msg.Module))
} else if len(schemas) > 1 {
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("multiple schemas found for module %q", req.Msg.Module))
}

b := modulecontext.NewBuilder(req.Msg.Module)
b = b.AddConfigFromManager(configuration.ConfigFromContext(ctx))
b = b.AddSecretsFromManager(configuration.SecretsFromContext(ctx))
b = b.AddDSNsFromEnvarsForModule(schemas[0])
moduleCtx, err := b.Build(ctx)
if err != nil {
return nil, err
}
out, err := moduleCtx.ToProto(ctx)
if err != nil {
return nil, err
}
return connect.NewResponse(out), nil
}

func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) {
Expand Down
18 changes: 13 additions & 5 deletions cmd/ftl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os/signal"
"runtime"
"strconv"
"strings"
"syscall"

"github.com/alecthomas/kong"
Expand Down Expand Up @@ -92,10 +91,19 @@ func main() {
kctx.BindTo(sr, (*cf.Resolver[cf.Secrets])(nil))
kctx.BindTo(cr, (*cf.Resolver[cf.Configuration])(nil))

// Propagate to runner processes.
// TODO: This is a bit of a hack until we get proper configuration
// management through the Controller.
os.Setenv("FTL_CONFIG", strings.Join(projectconfig.ConfigPaths(cli.ConfigFlag), ","))
// Add config manager to context.
cm, err := cf.NewConfigurationManager(ctx, cr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithConfig(ctx, cm)

// Add secrets manager to context.
sm, err := cf.NewSecretsManager(ctx, sr)
if err != nil {
kctx.Fatalf(err.Error())
}
ctx = cf.ContextWithSecrets(ctx, sm)

// Handle signals.
sigch := make(chan os.Signal, 1)
Expand Down
44 changes: 44 additions & 0 deletions common/configuration/in_memory_provider.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

// InMemoryProvider is a configuration provider that keeps values in memory
type InMemoryProvider[R Role] struct {
values map[Ref][]byte
}

var _ MutableProvider[Configuration] = &InMemoryProvider[Configuration]{}

func NewInMemoryProvider[R Role]() *InMemoryProvider[R] {
return &InMemoryProvider[R]{values: map[Ref][]byte{}}
}

func (p *InMemoryProvider[R]) Role() R { var r R; return r }
func (p *InMemoryProvider[R]) Key() string { return "grpc" }

func (p *InMemoryProvider[R]) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) {
if bytes, found := p.values[ref]; found {
return bytes, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (p *InMemoryProvider[R]) Writer() bool {
return true
}

// Store a configuration value and return its key.
func (p *InMemoryProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
p.values[ref] = value
return &url.URL{Scheme: p.Key()}, nil
}

// Delete a configuration value.
func (p *InMemoryProvider[R]) Delete(ctx context.Context, ref Ref) error {
delete(p.values, ref)
return nil
}
45 changes: 45 additions & 0 deletions common/configuration/in_memory_resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package configuration

import (
"context"
"fmt"
"net/url"
)

type InMemoryResolver[R Role] struct {
keyMap map[Ref]*url.URL
}

var _ Resolver[Configuration] = InMemoryResolver[Configuration]{}
var _ Resolver[Secrets] = InMemoryResolver[Secrets]{}

func NewInMemoryResolver[R Role]() *InMemoryResolver[R] {
return &InMemoryResolver[R]{keyMap: map[Ref]*url.URL{}}
}

func (k InMemoryResolver[R]) Role() R { var r R; return r }

func (k InMemoryResolver[R]) Get(ctx context.Context, ref Ref) (*url.URL, error) {
if key, found := k.keyMap[ref]; found {
return key, nil
}
return nil, fmt.Errorf("key %q not found", ref.Name)
}

func (k InMemoryResolver[R]) List(ctx context.Context) ([]Entry, error) {
entries := []Entry{}
for ref, url := range k.keyMap {
entries = append(entries, Entry{Ref: ref, Accessor: url})
}
return entries, nil
}

func (k InMemoryResolver[R]) Set(ctx context.Context, ref Ref, key *url.URL) error {
k.keyMap[ref] = key
return nil
}

func (k InMemoryResolver[R]) Unset(ctx context.Context, ref Ref) error {
delete(k.keyMap, ref)
return nil
}
45 changes: 31 additions & 14 deletions common/configuration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,49 +64,66 @@ func (m *Manager[R]) Mutable() error {
return fmt.Errorf("no writeable configuration provider available, specify one of %s", strings.Join(writers, ", "))
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
// GetData returns a data value for a configuration from the active providers.
// The data can be unmarshalled from JSON.
func (m *Manager[R]) GetData(ctx context.Context, ref Ref) ([]byte, error) {
key, err := m.resolver.Get(ctx, ref)
// Try again at the global scope if the value is not found in module scope.
if ref.Module.Ok() && errors.Is(err, ErrNotFound) {
gref := ref
gref.Module = optional.None[string]()
key, err = m.resolver.Get(ctx, gref)
if err != nil {
return err
return nil, err
}
} else if err != nil {
return err
return nil, err
}
provider, ok := m.providers[key.Scheme]
if !ok {
return fmt.Errorf("no provider for scheme %q", key.Scheme)
return nil, fmt.Errorf("no provider for scheme %q", key.Scheme)
}
data, err := provider.Load(ctx, ref, key)
if err != nil {
return fmt.Errorf("%s: %w", ref, err)
return nil, fmt.Errorf("%s: %w", ref, err)
}
return data, nil
}

// Get a configuration value from the active providers.
//
// "value" must be a pointer to a Go type that can be unmarshalled from JSON.
func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error {
data, err := m.GetData(ctx, ref)
if err != nil {
return err
}
return json.Unmarshal(data, value)
}

// Set a configuration value in the active writing provider.
// SetData updates the configuration value in the active writing provider as raw bytes.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
// "data" must be bytes that can be unmarshalled into a Go type.
func (m *Manager[R]) SetData(ctx context.Context, ref Ref, data []byte) error {
if err := m.Mutable(); err != nil {
return err
}
data, err := json.Marshal(value)
key, err := m.writer.Store(ctx, ref, data)
if err != nil {
return err
}
key, err := m.writer.Store(ctx, ref, data)
return m.resolver.Set(ctx, ref, key)
}

// Set a configuration value in the active writing provider.
//
// "value" must be a Go type that can be marshalled to JSON.
func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error {
data, err := json.Marshal(value)
if err != nil {
return err
}
return m.resolver.Set(ctx, ref, key)
return m.SetData(ctx, ref, data)
}

// Unset a configuration value in all providers.
Expand Down
Loading

0 comments on commit 8a62999

Please sign in to comment.