From 8a629998240a45e131a9331d2454109877dbd6c5 Mon Sep 17 00:00:00 2001 From: Matt Toohey Date: Fri, 19 Apr 2024 16:22:46 +1000 Subject: [PATCH] feat: module context over grpc # Conflicts: # backend/controller/controller.go --- backend/controller/controller.go | 30 +- cmd/ftl/main.go | 18 +- common/configuration/in_memory_provider.go | 44 +++ common/configuration/in_memory_resolver.go | 45 +++ common/configuration/manager.go | 45 ++- common/modulecontext/builder.go | 313 +++++++++++++++++++++ common/modulecontext/builder_test.go | 55 ++++ common/modulecontext/db_provider.go | 81 ++++++ common/modulecontext/db_provider_test.go | 30 ++ common/modulecontext/modulecontext.go | 96 +++++++ go-runtime/ftl/database.go | 31 +- go-runtime/server/server.go | 18 +- 12 files changed, 764 insertions(+), 42 deletions(-) create mode 100644 common/configuration/in_memory_provider.go create mode 100644 common/configuration/in_memory_resolver.go create mode 100644 common/modulecontext/builder.go create mode 100644 common/modulecontext/builder_test.go create mode 100644 common/modulecontext/db_provider.go create mode 100644 common/modulecontext/db_provider_test.go create mode 100644 common/modulecontext/modulecontext.go diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 8f9884afac..9c4e8fe922 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -38,6 +38,8 @@ import ( "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/common/configuration" + "github.com/TBD54566975/ftl/common/modulecontext" frontend "github.com/TBD54566975/ftl/frontend" "github.com/TBD54566975/ftl/internal/cors" "github.com/TBD54566975/ftl/internal/log" @@ -642,7 +644,33 @@ nextModule: // GetModuleContext retrieves config, secrets and DSNs for a module. func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest]) (*connect.Response[ftlv1.ModuleContextResponse], error) { - return nil, fmt.Errorf("not implemented") + // get module schema + schemas, err := s.dal.GetActiveDeploymentSchemas(ctx) + if err != nil { + return nil, err + } + schemas = slices.Filter(schemas, func(s *schema.Module) bool { + return s.Name == req.Msg.Module + }) + if len(schemas) == 0 { + return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no schema found for module %q", req.Msg.Module)) + } else if len(schemas) > 1 { + return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("multiple schemas found for module %q", req.Msg.Module)) + } + + b := modulecontext.NewBuilder(req.Msg.Module) + b = b.AddConfigFromManager(configuration.ConfigFromContext(ctx)) + b = b.AddSecretsFromManager(configuration.SecretsFromContext(ctx)) + b = b.AddDSNsFromEnvarsForModule(schemas[0]) + moduleCtx, err := b.Build(ctx) + if err != nil { + return nil, err + } + out, err := moduleCtx.ToProto(ctx) + if err != nil { + return nil, err + } + return connect.NewResponse(out), nil } func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { diff --git a/cmd/ftl/main.go b/cmd/ftl/main.go index 697ab3ed05..59cb4991a6 100644 --- a/cmd/ftl/main.go +++ b/cmd/ftl/main.go @@ -8,7 +8,6 @@ import ( "os/signal" "runtime" "strconv" - "strings" "syscall" "github.com/alecthomas/kong" @@ -92,10 +91,19 @@ func main() { kctx.BindTo(sr, (*cf.Resolver[cf.Secrets])(nil)) kctx.BindTo(cr, (*cf.Resolver[cf.Configuration])(nil)) - // Propagate to runner processes. - // TODO: This is a bit of a hack until we get proper configuration - // management through the Controller. - os.Setenv("FTL_CONFIG", strings.Join(projectconfig.ConfigPaths(cli.ConfigFlag), ",")) + // Add config manager to context. + cm, err := cf.NewConfigurationManager(ctx, cr) + if err != nil { + kctx.Fatalf(err.Error()) + } + ctx = cf.ContextWithConfig(ctx, cm) + + // Add secrets manager to context. + sm, err := cf.NewSecretsManager(ctx, sr) + if err != nil { + kctx.Fatalf(err.Error()) + } + ctx = cf.ContextWithSecrets(ctx, sm) // Handle signals. sigch := make(chan os.Signal, 1) diff --git a/common/configuration/in_memory_provider.go b/common/configuration/in_memory_provider.go new file mode 100644 index 0000000000..3c62eff3a6 --- /dev/null +++ b/common/configuration/in_memory_provider.go @@ -0,0 +1,44 @@ +package configuration + +import ( + "context" + "fmt" + "net/url" +) + +// InMemoryProvider is a configuration provider that keeps values in memory +type InMemoryProvider[R Role] struct { + values map[Ref][]byte +} + +var _ MutableProvider[Configuration] = &InMemoryProvider[Configuration]{} + +func NewInMemoryProvider[R Role]() *InMemoryProvider[R] { + return &InMemoryProvider[R]{values: map[Ref][]byte{}} +} + +func (p *InMemoryProvider[R]) Role() R { var r R; return r } +func (p *InMemoryProvider[R]) Key() string { return "grpc" } + +func (p *InMemoryProvider[R]) Load(ctx context.Context, ref Ref, key *url.URL) ([]byte, error) { + if bytes, found := p.values[ref]; found { + return bytes, nil + } + return nil, fmt.Errorf("key %q not found", ref.Name) +} + +func (p *InMemoryProvider[R]) Writer() bool { + return true +} + +// Store a configuration value and return its key. +func (p *InMemoryProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) { + p.values[ref] = value + return &url.URL{Scheme: p.Key()}, nil +} + +// Delete a configuration value. +func (p *InMemoryProvider[R]) Delete(ctx context.Context, ref Ref) error { + delete(p.values, ref) + return nil +} diff --git a/common/configuration/in_memory_resolver.go b/common/configuration/in_memory_resolver.go new file mode 100644 index 0000000000..ee0ca59eb5 --- /dev/null +++ b/common/configuration/in_memory_resolver.go @@ -0,0 +1,45 @@ +package configuration + +import ( + "context" + "fmt" + "net/url" +) + +type InMemoryResolver[R Role] struct { + keyMap map[Ref]*url.URL +} + +var _ Resolver[Configuration] = InMemoryResolver[Configuration]{} +var _ Resolver[Secrets] = InMemoryResolver[Secrets]{} + +func NewInMemoryResolver[R Role]() *InMemoryResolver[R] { + return &InMemoryResolver[R]{keyMap: map[Ref]*url.URL{}} +} + +func (k InMemoryResolver[R]) Role() R { var r R; return r } + +func (k InMemoryResolver[R]) Get(ctx context.Context, ref Ref) (*url.URL, error) { + if key, found := k.keyMap[ref]; found { + return key, nil + } + return nil, fmt.Errorf("key %q not found", ref.Name) +} + +func (k InMemoryResolver[R]) List(ctx context.Context) ([]Entry, error) { + entries := []Entry{} + for ref, url := range k.keyMap { + entries = append(entries, Entry{Ref: ref, Accessor: url}) + } + return entries, nil +} + +func (k InMemoryResolver[R]) Set(ctx context.Context, ref Ref, key *url.URL) error { + k.keyMap[ref] = key + return nil +} + +func (k InMemoryResolver[R]) Unset(ctx context.Context, ref Ref) error { + delete(k.keyMap, ref) + return nil +} diff --git a/common/configuration/manager.go b/common/configuration/manager.go index 5718e54718..5fe6f872c6 100644 --- a/common/configuration/manager.go +++ b/common/configuration/manager.go @@ -64,10 +64,9 @@ func (m *Manager[R]) Mutable() error { return fmt.Errorf("no writeable configuration provider available, specify one of %s", strings.Join(writers, ", ")) } -// Get a configuration value from the active providers. -// -// "value" must be a pointer to a Go type that can be unmarshalled from JSON. -func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { +// GetData returns a data value for a configuration from the active providers. +// The data can be unmarshalled from JSON. +func (m *Manager[R]) GetData(ctx context.Context, ref Ref) ([]byte, error) { key, err := m.resolver.Get(ctx, ref) // Try again at the global scope if the value is not found in module scope. if ref.Module.Ok() && errors.Is(err, ErrNotFound) { @@ -75,38 +74,56 @@ func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { gref.Module = optional.None[string]() key, err = m.resolver.Get(ctx, gref) if err != nil { - return err + return nil, err } } else if err != nil { - return err + return nil, err } provider, ok := m.providers[key.Scheme] if !ok { - return fmt.Errorf("no provider for scheme %q", key.Scheme) + return nil, fmt.Errorf("no provider for scheme %q", key.Scheme) } data, err := provider.Load(ctx, ref, key) if err != nil { - return fmt.Errorf("%s: %w", ref, err) + return nil, fmt.Errorf("%s: %w", ref, err) + } + return data, nil +} + +// Get a configuration value from the active providers. +// +// "value" must be a pointer to a Go type that can be unmarshalled from JSON. +func (m *Manager[R]) Get(ctx context.Context, ref Ref, value any) error { + data, err := m.GetData(ctx, ref) + if err != nil { + return err } return json.Unmarshal(data, value) } -// Set a configuration value in the active writing provider. +// SetData updates the configuration value in the active writing provider as raw bytes. // -// "value" must be a Go type that can be marshalled to JSON. -func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error { +// "data" must be bytes that can be unmarshalled into a Go type. +func (m *Manager[R]) SetData(ctx context.Context, ref Ref, data []byte) error { if err := m.Mutable(); err != nil { return err } - data, err := json.Marshal(value) + key, err := m.writer.Store(ctx, ref, data) if err != nil { return err } - key, err := m.writer.Store(ctx, ref, data) + return m.resolver.Set(ctx, ref, key) +} + +// Set a configuration value in the active writing provider. +// +// "value" must be a Go type that can be marshalled to JSON. +func (m *Manager[R]) Set(ctx context.Context, ref Ref, value any) error { + data, err := json.Marshal(value) if err != nil { return err } - return m.resolver.Set(ctx, ref, key) + return m.SetData(ctx, ref, data) } // Unset a configuration value in all providers. diff --git a/common/modulecontext/builder.go b/common/modulecontext/builder.go new file mode 100644 index 0000000000..eaebb9eac6 --- /dev/null +++ b/common/modulecontext/builder.go @@ -0,0 +1,313 @@ +package modulecontext + +import ( + "context" + "fmt" + "os" + "strings" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + cf "github.com/TBD54566975/ftl/common/configuration" + "github.com/TBD54566975/ftl/internal/slices" + "github.com/alecthomas/types/optional" +) + +type ref struct { + Module optional.Option[string] + Name string +} + +type refValuePair struct { + ref ref + resolver resolver +} + +func (r refValuePair) configOrSecretItem() {} + +type dsnEntry struct { + name string + dbType DBType + resolver stringResolver +} + +type resolver interface { + Resolve() (isData bool, value any, data []byte, err error) +} + +var _ resolver = valueResolver{} +var _ resolver = dataResolver{} + +type stringResolver interface { + ResolveString() (string, error) +} + +var _ stringResolver = valueResolver{} +var _ stringResolver = envarResolver{} + +type configManager[R cf.Role] struct { + manager *cf.Manager[R] +} + +func (m configManager[R]) configOrSecretItem() {} + +// sumtype:decl +type configOrSecretItem interface { + configOrSecretItem() +} + +var _ configOrSecretItem = refValuePair{} +var _ configOrSecretItem = configManager[cf.Configuration]{} + +type ModuleContextBuilder struct { + moduleName string + configs []configOrSecretItem + secrets []configOrSecretItem + dsns []dsnEntry +} + +func NewBuilder(moduleName string) ModuleContextBuilder { + return ModuleContextBuilder{ + moduleName: moduleName, + configs: []configOrSecretItem{}, + secrets: []configOrSecretItem{}, + dsns: []dsnEntry{}, + } +} + +func NewBuilderFromProto(moduleName string, response *ftlv1.ModuleContextResponse) ModuleContextBuilder { + return ModuleContextBuilder{ + moduleName: moduleName, + configs: slices.Map(response.Configs, func(c *ftlv1.ModuleContextResponse_Config) configOrSecretItem { + return refValuePair{ref: refFromProto(c.Ref), resolver: dataResolver{data: c.Data}} + }), + secrets: slices.Map(response.Secrets, func(s *ftlv1.ModuleContextResponse_Secret) configOrSecretItem { + return refValuePair{ref: refFromProto(s.Ref), resolver: dataResolver{data: s.Data}} + }), + dsns: slices.Map(response.Databases, func(d *ftlv1.ModuleContextResponse_DSN) dsnEntry { + return dsnEntry{name: d.Name, dbType: DBType(d.Type), resolver: valueResolver{value: d.Dsn}} + }), + } +} + +func (b ModuleContextBuilder) Build(ctx context.Context) (*ModuleContext, error) { + cm, err := newInMemoryConfigManager[cf.Configuration](ctx) + if err != nil { + return nil, err + } + sm, err := newInMemoryConfigManager[cf.Secrets](ctx) + if err != nil { + return nil, err + } + moduleCtx := &ModuleContext{ + configManager: cm, + secretsManager: sm, + dbProvider: NewDBProvider(), + } + + if err := buildConfigOrSecrets[cf.Configuration](ctx, b, *moduleCtx.configManager, b.configs); err != nil { + return nil, err + } + if err := buildConfigOrSecrets[cf.Secrets](ctx, b, *moduleCtx.secretsManager, b.secrets); err != nil { + return nil, err + } + + for _, entry := range b.dsns { + dsn, err := entry.resolver.ResolveString() + if err != nil { + return nil, err + } + if err = moduleCtx.dbProvider.AddDSN(entry.name, entry.dbType, dsn); err != nil { + return nil, err + } + } + + return moduleCtx, nil +} + +func newInMemoryConfigManager[R cf.Role](ctx context.Context) (*cf.Manager[R], error) { + provider := cf.NewInMemoryProvider[R]() + resolver := cf.NewInMemoryResolver[R]() + manager, err := cf.New(ctx, resolver, []cf.Provider[R]{provider}) + if err != nil { + return nil, err + } + return manager, nil +} + +func buildConfigOrSecrets[R cf.Role](ctx context.Context, b ModuleContextBuilder, manager cf.Manager[R], items []configOrSecretItem) error { + for _, item := range items { + switch item := item.(type) { + case refValuePair: + isData, value, data, err := item.resolver.Resolve() + if err != nil { + return err + } + if isData { + if err := manager.SetData(ctx, cf.Ref(item.ref), data); err != nil { + return err + } + } else { + if err := manager.Set(ctx, cf.Ref(item.ref), value); err != nil { + return err + } + } + case configManager[R]: + list, err := item.manager.List(ctx) + if err != nil { + return err + } + for _, e := range list { + if m, isModuleSpecific := e.Module.Get(); isModuleSpecific && b.moduleName != m { + continue + } + data, err := item.manager.GetData(ctx, e.Ref) + if err != nil { + return err + } + if err := manager.SetData(ctx, e.Ref, data); err != nil { + return err + } + } + } + } + return nil +} + +func (b ModuleContextBuilder) AddGlobalConfig(name string, value any) ModuleContextBuilder { + b.configs = append(b.configs, refValuePair{ + ref: ref{ + Name: name, + }, + resolver: valueResolver{value: value}, + }) + return b +} + +func (b ModuleContextBuilder) AddConfig(name string, value any) ModuleContextBuilder { + return b.AddConfigForModule(name, b.moduleName, value) +} + +func (b ModuleContextBuilder) AddConfigForModule(name string, module string, value any) ModuleContextBuilder { + b.configs = append(b.configs, refValuePair{ + ref: ref{ + Module: optional.Some(module), + Name: name, + }, + resolver: valueResolver{value: value}, + }) + return b +} + +func (b ModuleContextBuilder) AddGlobalSecret(name string, value any) ModuleContextBuilder { + b.secrets = append(b.secrets, refValuePair{ + ref: ref{ + Name: name, + }, + resolver: valueResolver{value: value}, + }) + return b +} + +func (b ModuleContextBuilder) AddSecret(name string, value any) ModuleContextBuilder { + return b.AddSecretForModule(name, b.moduleName, value) +} + +func (b ModuleContextBuilder) AddSecretForModule(name string, module string, value any) ModuleContextBuilder { + b.secrets = append(b.secrets, refValuePair{ + ref: ref{ + Module: optional.Some(module), + Name: name, + }, + resolver: valueResolver{value: value}, + }) + return b +} + +func (b ModuleContextBuilder) AddDSN(name string, dbType DBType, dsn string) ModuleContextBuilder { + b.dsns = append(b.dsns, dsnEntry{ + name: name, + dbType: dbType, + resolver: valueResolver{value: dsn}, + }) + return b +} + +func (b ModuleContextBuilder) AddDSNsFromEnvarsForModule(module *schema.Module) ModuleContextBuilder { + // remove in favor of a non-envar approach once it is available + for _, decl := range module.Decls { + dbDecl, ok := decl.(*schema.Database) + if !ok { + continue + } + b.dsns = append(b.dsns, dsnEntry{ + name: dbDecl.Name, + dbType: DBTypePostgres, + resolver: envarResolver{ + name: fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", strings.ToUpper(module.Name), strings.ToUpper(dbDecl.Name)), + }, + }) + } + return b +} + +func (b ModuleContextBuilder) AddConfigFromManager(cm *cf.Manager[cf.Configuration]) ModuleContextBuilder { + b.configs = append(b.configs, configManager[cf.Configuration]{manager: cm}) + return b +} + +func (b ModuleContextBuilder) AddSecretsFromManager(sm *cf.Manager[cf.Secrets]) ModuleContextBuilder { + b.secrets = append(b.secrets, configManager[cf.Secrets]{manager: sm}) + return b +} + +func refFromProto(r *ftlv1.ModuleContextResponse_Ref) ref { + return ref{ + Module: optional.Ptr(r.Module), + Name: r.Name, + } +} + +type valueResolver struct { + value any +} + +func (r valueResolver) Resolve() (isData bool, value any, data []byte, err error) { + return false, r.value, nil, nil +} + +func (r valueResolver) ResolveString() (string, error) { + str, ok := r.value.(string) + if !ok { + return "", fmt.Errorf("value is not a string: %v", r.value) + } + return str, nil +} + +type dataResolver struct { + data []byte +} + +func (r dataResolver) Resolve() (isData bool, value any, data []byte, err error) { + return true, nil, r.data, nil +} + +type envarResolver struct { + name string +} + +func (r envarResolver) Resolve() (isData bool, value any, data []byte, err error) { + value, err = r.ResolveString() + if err != nil { + return false, nil, nil, err + } + return false, value, nil, nil +} + +func (r envarResolver) ResolveString() (string, error) { + value, ok := os.LookupEnv(r.name) + if !ok { + return "", fmt.Errorf("missing environment variable %q", r.name) + } + return value, nil +} diff --git a/common/modulecontext/builder_test.go b/common/modulecontext/builder_test.go new file mode 100644 index 0000000000..8f18f66d60 --- /dev/null +++ b/common/modulecontext/builder_test.go @@ -0,0 +1,55 @@ +package modulecontext + +import ( + "context" + "testing" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/alecthomas/assert/v2" + "github.com/alecthomas/types/optional" + + cf "github.com/TBD54566975/ftl/common/configuration" +) + +func TestReadLatestValue(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + + moduleName := "test" + b := NewBuilder(moduleName) + b = b.AddConfig("c", "c-value1") + b = b.AddConfig("c", "c-value2") + b = b.AddGlobalConfig("cg", "cg-value1") + b = b.AddGlobalConfig("cg", "cg-value2") + b = b.AddSecret("s", "s-value1") + b = b.AddSecret("s", "s-value2") + b = b.AddGlobalSecret("sg", "sg-value1") + b = b.AddGlobalSecret("sg", "sg-value2") + b = b.AddDSN("d", DBTypePostgres, "postgres://localhost:54320/echo1?sslmode=disable&user=postgres&password=secret") + b = b.AddDSN("d", DBTypePostgres, "postgres://localhost:54320/echo2?sslmode=disable&user=postgres&password=secret") + moduleCtx, err := b.Build(ctx) + assert.NoError(t, err, "there should be no build errors") + ctx = moduleCtx.ApplyToContext(ctx) + + cm := cf.ConfigFromContext(ctx) + sm := cf.SecretsFromContext(ctx) + + var str string + + err = cm.Get(ctx, cf.Ref{Module: optional.Some(moduleName), Name: "c"}, &str) + assert.NoError(t, err, "could not read config value") + assert.Equal(t, "c-value2", str, "latest config value should be read") + + err = cm.Get(ctx, cf.Ref{Name: "cg"}, &str) + assert.NoError(t, err, "could not read global config value") + assert.Equal(t, "cg-value2", str, "latest global config value should be read") + + err = sm.Get(ctx, cf.Ref{Module: optional.Some(moduleName), Name: "s"}, &str) + assert.NoError(t, err, "could not read secret value") + assert.Equal(t, "s-value2", str, "latest secret value should be read") + + err = sm.Get(ctx, cf.Ref{Name: "sg"}, &str) + assert.NoError(t, err, "could not read global secret value") + assert.Equal(t, "sg-value2", str, "latest global secret value should be read") + + assert.Equal(t, moduleCtx.dbProvider.entries["d"].dsn, "postgres://localhost:54320/echo2?sslmode=disable&user=postgres&password=secret", "latest DSN should be read") +} diff --git a/common/modulecontext/db_provider.go b/common/modulecontext/db_provider.go new file mode 100644 index 0000000000..d45eac0b7b --- /dev/null +++ b/common/modulecontext/db_provider.go @@ -0,0 +1,81 @@ +package modulecontext + +import ( + "context" + "database/sql" + "fmt" + "strconv" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + + _ "github.com/jackc/pgx/v5/stdlib" // SQL driver +) + +type DBType int32 + +const ( + DBTypePostgres = DBType(ftlv1.ModuleContextResponse_POSTGRES) +) + +func (x DBType) String() string { + switch x { + case DBTypePostgres: + return "Postgres" + default: + panic(fmt.Sprintf("unknown DB type: %s", strconv.Itoa(int(x)))) + } +} + +type dbEntry struct { + dsn string + dbType DBType + db *sql.DB +} + +// DBProvider takes in DSNs and holds a *sql.DB for each +// this allows us to: +// - pool db connections, rather than initializing anew each time +// - validate DSNs at startup, rather than returning errors or panicking at Database.Get() +type DBProvider struct { + entries map[string]dbEntry +} + +type contextKeyDSNProvider struct{} + +func NewDBProvider() *DBProvider { + return &DBProvider{ + entries: map[string]dbEntry{}, + } +} + +func ContextWithDBProvider(ctx context.Context, provider *DBProvider) context.Context { + return context.WithValue(ctx, contextKeyDSNProvider{}, provider) +} + +func DBProviderFromContext(ctx context.Context) *DBProvider { + m, ok := ctx.Value(contextKeyDSNProvider{}).(*DBProvider) + if !ok { + panic("no db provider in context") + } + return m +} + +func (d *DBProvider) AddDSN(name string, dbType DBType, dsn string) error { + db, err := sql.Open("pgx", dsn) + if err != nil { + return err + } + d.entries[name] = dbEntry{ + dsn: dsn, + db: db, + dbType: dbType, + } + return nil +} + +func (d *DBProvider) GetDB(name string) (*sql.DB, error) { + if entry, ok := d.entries[name]; ok { + return entry.db, nil + } + return nil, fmt.Errorf("missing DSN for database %s", name) +} diff --git a/common/modulecontext/db_provider_test.go b/common/modulecontext/db_provider_test.go new file mode 100644 index 0000000000..aec15b68ec --- /dev/null +++ b/common/modulecontext/db_provider_test.go @@ -0,0 +1,30 @@ +package modulecontext + +import ( + "context" + "testing" + + "github.com/TBD54566975/ftl/internal/log" + "github.com/alecthomas/assert/v2" +) + +func TestValidDSN(t *testing.T) { + dbProvider := NewDBProvider() + dsn := "postgres://localhost:54320/echo?sslmode=disable&user=postgres&password=secret" + err := dbProvider.AddDSN("test", DBTypePostgres, "in va lid") + assert.NoError(t, err, "expected no error for valid DSN") + assert.Equal(t, dbProvider.entries["test"].dsn, dsn, "expected DSN to be set and unmodified") +} + +func TestInvalidDSN(t *testing.T) { + dbProvider := NewDBProvider() + err := dbProvider.AddDSN("test", DBTypePostgres, "in va lid") + assert.Error(t, err, "expected error for invalid DSN") +} + +func TestGettingAndSettingFromContext(t *testing.T) { + ctx := log.ContextWithNewDefaultLogger(context.Background()) + dbProvider := NewDBProvider() + ctx = ContextWithDBProvider(ctx, dbProvider) + assert.Equal(t, dbProvider, DBProviderFromContext(ctx), "expected dbProvider to be set and retrieved correctly") +} diff --git a/common/modulecontext/modulecontext.go b/common/modulecontext/modulecontext.go new file mode 100644 index 0000000000..c5a19aaa23 --- /dev/null +++ b/common/modulecontext/modulecontext.go @@ -0,0 +1,96 @@ +package modulecontext + +import ( + "context" + + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + cf "github.com/TBD54566975/ftl/common/configuration" +) + +type ModuleContext struct { + configManager *cf.Manager[cf.Configuration] + secretsManager *cf.Manager[cf.Secrets] + dbProvider *DBProvider +} + +func (m *ModuleContext) ApplyToContext(ctx context.Context) context.Context { + ctx = ContextWithDBProvider(ctx, m.dbProvider) + ctx = cf.ContextWithConfig(ctx, m.configManager) + ctx = cf.ContextWithSecrets(ctx, m.secretsManager) + return ctx +} + +func (m *ModuleContext) ToProto(ctx context.Context) (*ftlv1.ModuleContextResponse, error) { + configList, err := m.configManager.List(ctx) + if err != nil { + return nil, err + } + configProtos := []*ftlv1.ModuleContextResponse_Config{} + for _, entry := range configList { + p, err := m.configEntryToConfigProto(ctx, entry) + if err != nil { + return nil, err + } + configProtos = append(configProtos, p) + } + + secretsList, err := m.secretsManager.List(ctx) + if err != nil { + return nil, err + } + secretProtos := []*ftlv1.ModuleContextResponse_Secret{} + for _, entry := range secretsList { + p, err := m.configEntryToSecretProto(ctx, entry) + if err != nil { + return nil, err + } + secretProtos = append(secretProtos, p) + } + + dsnProtos := []*ftlv1.ModuleContextResponse_DSN{} + for name, entry := range m.dbProvider.entries { + dsnProtos = append(dsnProtos, &ftlv1.ModuleContextResponse_DSN{ + Name: name, + Type: ftlv1.ModuleContextResponse_DBType(entry.dbType), + Dsn: entry.dsn, + }) + } + + return &ftlv1.ModuleContextResponse{ + Configs: configProtos, + Secrets: secretProtos, + Databases: dsnProtos, + }, nil +} + +func (m *ModuleContext) configEntryToConfigProto(ctx context.Context, e cf.Entry) (*ftlv1.ModuleContextResponse_Config, error) { + data, err := m.configManager.GetData(ctx, e.Ref) + if err != nil { + return nil, err + } + return &ftlv1.ModuleContextResponse_Config{ + Ref: m.configRefToProto(e.Ref), + Data: data, + }, nil +} + +func (m *ModuleContext) configEntryToSecretProto(ctx context.Context, e cf.Entry) (*ftlv1.ModuleContextResponse_Secret, error) { + data, err := m.secretsManager.GetData(ctx, e.Ref) + if err != nil { + return nil, err + } + return &ftlv1.ModuleContextResponse_Secret{ + Ref: m.configRefToProto(e.Ref), + Data: data, + }, nil +} + +func (m *ModuleContext) configRefToProto(r cf.Ref) *ftlv1.ModuleContextResponse_Ref { + protoRef := &ftlv1.ModuleContextResponse_Ref{ + Name: r.Name, + } + if module, ok := r.Module.Get(); ok { + protoRef.Module = &module + } + return protoRef +} diff --git a/go-runtime/ftl/database.go b/go-runtime/ftl/database.go index efdef16420..516bd242f5 100644 --- a/go-runtime/ftl/database.go +++ b/go-runtime/ftl/database.go @@ -1,25 +1,34 @@ package ftl import ( + "context" "database/sql" "fmt" - "os" - "strings" + + "github.com/TBD54566975/ftl/common/modulecontext" _ "github.com/jackc/pgx/v5/stdlib" // Register Postgres driver ) -// PostgresDatabase returns a Postgres database connection for the named database. -func PostgresDatabase(name string) *sql.DB { - module := strings.ToUpper(callerModule()) - key := fmt.Sprintf("FTL_POSTGRES_DSN_%s_%s", module, strings.ToUpper(name)) - dsn, ok := os.LookupEnv(key) - if !ok { - panic(fmt.Sprintf("missing DSN environment variable %s", key)) +type Database struct { + Name string +} + +// PostgresDatabase returns a handler for the named database. +func PostgresDatabase(name string) Database { + return Database{ + Name: name, } - db, err := sql.Open("pgx", dsn) +} + +func (d Database) String() string { return fmt.Sprintf("database %q", d.Name) } + +// Get returns the sql db connection for the database. +func (d Database) Get(ctx context.Context) *sql.DB { + provider := modulecontext.DBProviderFromContext(ctx) + db, err := provider.GetDB(d.Name) if err != nil { - panic(fmt.Sprintf("failed to open database: %s", err)) + panic(err.Error()) } return db } diff --git a/go-runtime/server/server.go b/go-runtime/server/server.go index 34810fb947..4855ea45e9 100644 --- a/go-runtime/server/server.go +++ b/go-runtime/server/server.go @@ -10,7 +10,7 @@ import ( ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" - cf "github.com/TBD54566975/ftl/common/configuration" + "github.com/TBD54566975/ftl/common/modulecontext" "github.com/TBD54566975/ftl/common/plugin" "github.com/TBD54566975/ftl/go-runtime/encoding" "github.com/TBD54566975/ftl/go-runtime/ftl" @@ -34,21 +34,17 @@ func NewUserVerbServer(moduleName string, handlers ...Handler) plugin.Constructo verbServiceClient := rpc.Dial(ftlv1connect.NewVerbServiceClient, uc.FTLEndpoint.String(), log.Error) ctx = rpc.ContextWithClient(ctx, verbServiceClient) - // Add config manager to context. - cr := &cf.ProjectConfigResolver[cf.Configuration]{Config: uc.Config} - cm, err := cf.NewConfigurationManager(ctx, cr) + resp, err := verbServiceClient.GetModuleContext(ctx, connect.NewRequest(&ftlv1.ModuleContextRequest{ + Module: moduleName, + })) if err != nil { - return nil, nil, err + return nil, nil, fmt.Errorf("could not get config: %w", err) } - ctx = cf.ContextWithConfig(ctx, cm) - - // Add secrets manager to context. - sr := &cf.ProjectConfigResolver[cf.Secrets]{Config: uc.Config} - sm, err := cf.NewSecretsManager(ctx, sr) + moduleCtx, err := modulecontext.NewBuilderFromProto(moduleName, resp.Msg).Build(ctx) if err != nil { return nil, nil, err } - ctx = cf.ContextWithSecrets(ctx, sm) + ctx = moduleCtx.ApplyToContext(ctx) err = observability.Init(ctx, moduleName, "HEAD", uc.ObservabilityConfig) if err != nil {