From 1be2e1f1ce5940738b7dc79704d77708b599ff5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juho=20M=C3=A4kinen?= Date: Fri, 22 Nov 2024 11:57:34 +1100 Subject: [PATCH] feat: run pgproxy as a part of the runner (#3456) The proxy does nothing yet --- cmd/ftl-proxy-pg/main.go | 4 ++-- cmd/ftl-runner/main.go | 24 ++++++++++++++++++++++-- internal/pgproxy/pgproxy.go | 8 ++++++-- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/cmd/ftl-proxy-pg/main.go b/cmd/ftl-proxy-pg/main.go index a48542da66..781fda75da 100644 --- a/cmd/ftl-proxy-pg/main.go +++ b/cmd/ftl-proxy-pg/main.go @@ -20,7 +20,7 @@ var cli struct { ObservabilityConfig observability.Config `embed:"" prefix:"o11y-"` LogConfig log.Config `embed:"" prefix:"log-"` - Listen string `name:"listen" short:"l" help:"Address to listen on." env:"FTL_PROXY_PG_LISTEN" default:"127.0.0.1:5678"` + pgproxy.Config } func main() { @@ -38,7 +38,7 @@ func main() { err = observability.Init(ctx, false, "", "ftl-provisioner", ftl.Version, cli.ObservabilityConfig) kctx.FatalIfErrorf(err, "failed to initialize observability") - proxy := pgproxy.New(cli.Listen, func(ctx context.Context, params map[string]string) (string, error) { + proxy := pgproxy.New(cli.Config, func(ctx context.Context, params map[string]string) (string, error) { return "postgres://localhost:5432/postgres?user=" + params["user"], nil }) if err := proxy.Start(ctx); err != nil { diff --git a/cmd/ftl-runner/main.go b/cmd/ftl-runner/main.go index b75bf402cf..b1126290fc 100644 --- a/cmd/ftl-runner/main.go +++ b/cmd/ftl-runner/main.go @@ -2,21 +2,25 @@ package main import ( "context" + "fmt" "os" "path/filepath" "github.com/alecthomas/kong" + "golang.org/x/sync/errgroup" "github.com/TBD54566975/ftl" "github.com/TBD54566975/ftl/backend/runner" _ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota. "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/pgproxy" ) var cli struct { Version kong.VersionFlag `help:"Show version."` LogConfig log.Config `prefix:"log-" embed:""` RunnerConfig runner.Config `embed:""` + ProxyConfig pgproxy.Config `embed:"" prefix:"pgproxy-"` } func main() { @@ -42,6 +46,22 @@ and route to user code. }) logger := log.Configure(os.Stderr, cli.LogConfig) ctx := log.ContextWithLogger(context.Background(), logger) - err = runner.Start(ctx, cli.RunnerConfig) - kctx.FatalIfErrorf(err) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return runPGProxy(ctx, cli.ProxyConfig) + }) + g.Go(func() error { + return runner.Start(ctx, cli.RunnerConfig) + }) + kctx.FatalIfErrorf(g.Wait()) +} + +func runPGProxy(ctx context.Context, config pgproxy.Config) error { + if err := pgproxy.New(config, func(ctx context.Context, params map[string]string) (string, error) { + return "postgres://127.0.0.1:5432/postgres?user=" + params["user"], nil + }).Start(ctx); err != nil { + return fmt.Errorf("failed to start pgproxy: %w", err) + } + return nil } diff --git a/internal/pgproxy/pgproxy.go b/internal/pgproxy/pgproxy.go index 13008a79e8..916bdcaec7 100644 --- a/internal/pgproxy/pgproxy.go +++ b/internal/pgproxy/pgproxy.go @@ -11,6 +11,10 @@ import ( "github.com/TBD54566975/ftl/internal/log" ) +type Config struct { + Listen string `name:"listen" short:"l" help:"Address to listen on." env:"FTL_PROXY_PG_LISTEN" default:"127.0.0.1:5678"` +} + // PgProxy is a configurable proxy for PostgreSQL connections type PgProxy struct { listenAddress string @@ -26,9 +30,9 @@ type DSNConstructor func(ctx context.Context, params map[string]string) (string, // // address is the address to listen on for incoming connections. // connectionFn is a function that constructs a new connection string from parameters of the incoming connection. -func New(address string, connectionFn DSNConstructor) *PgProxy { +func New(config Config, connectionFn DSNConstructor) *PgProxy { return &PgProxy{ - listenAddress: address, + listenAddress: config.Listen, connectionStringFn: connectionFn, } }