Skip to content

Commit

Permalink
fix: pass module schema to the pgproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
jvmakine committed Nov 22, 2024
1 parent eeafd05 commit f5554a1
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 51 deletions.
95 changes: 65 additions & 30 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/alecthomas/types/optional"
"github.com/jpillora/backoff"
"github.com/otiai10/copy"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"

Expand All @@ -39,6 +40,7 @@ import (
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
ftlobservability "github.com/TBD54566975/ftl/internal/observability"
"github.com/TBD54566975/ftl/internal/pgproxy"
"github.com/TBD54566975/ftl/internal/rpc"
"github.com/TBD54566975/ftl/internal/schema"
"github.com/TBD54566975/ftl/internal/slices"
Expand All @@ -61,6 +63,8 @@ type Config struct {
Registry artefacts.RegistryConfig `embed:"" prefix:"oci-"`
ObservabilityConfig ftlobservability.Config `embed:"" prefix:"o11y-"`
DevEndpoint optional.Option[url.URL] `help:"An existing endpoint to connect to in development mode" env:"FTL_DEV_ENDPOINT"`

PgProxyConfig pgproxy.Config `embed:"" prefix:"pgproxy-"`
}

func Start(ctx context.Context, config Config) error {
Expand Down Expand Up @@ -129,7 +133,20 @@ func Start(ctx context.Context, config Config) error {
cancelFunc: doneFunc,
devEndpoint: config.DevEndpoint,
}
err = svc.deploy(ctx)

deploymentKey, err := model.ParseDeploymentKey(config.Deployment)
if err != nil {
observability.Deployment.Failure(ctx, optional.None[string]())
svc.cancelFunc()
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
}

module, err := svc.GetModule(ctx, deploymentKey)
if err != nil {
return fmt.Errorf("failed to get module: %w", err)
}

err = svc.deploy(ctx, deploymentKey, module)
if err != nil {
// If we fail to deploy we just exit
// Kube or local scaling will start a new instance to continue
Expand All @@ -143,11 +160,19 @@ func Start(ctx context.Context, config Config) error {
go rpc.RetryStreamingClientStream(ctx, backoff.Backoff{}, controllerClient.StreamDeploymentLogs, svc.streamLogsLoop)
}()

return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.HTTP("/", svc),
rpc.HealthCheck(svc.healthCheck),
)
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return svc.StartPgProxy(ctx, module)
})
g.Go(func() error {
// TODO: Make sure pgproxy is ready before starting the runner
return rpc.Serve(ctx, config.Bind,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, svc),
rpc.HTTP("/", svc),
rpc.HealthCheck(svc.healthCheck),
)
})
return fmt.Errorf("failure in runner: %w", g.Wait())
}

func newIdentityStore(ctx context.Context, config Config, key model.RunnerKey, controllerClient ftlv1connect.ControllerServiceClient) (*identity.Store, error) {
Expand Down Expand Up @@ -294,52 +319,51 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
return connect.NewResponse(&ftlv1.PingResponse{}), nil
}

func (s *Service) deploy(ctx context.Context) error {
logger := log.FromContext(ctx)
if err, ok := s.registrationFailure.Load().Get(); ok {
observability.Deployment.Failure(ctx, optional.None[string]())
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to register runner: %w", err))
func (s *Service) GetModule(ctx context.Context, key model.DeploymentKey) (*schema.Module, error) {
gdResp, err := s.controllerClient.GetDeployment(ctx, connect.NewRequest(&ftlv1.GetDeploymentRequest{DeploymentKey: s.config.Deployment}))
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return nil, fmt.Errorf("failed to get deployment: %w", err)
}

key, err := model.ParseDeploymentKey(s.config.Deployment)
module, err := schema.ModuleFromProto(gdResp.Msg.Schema)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return nil, fmt.Errorf("invalid module: %w", err)
}
return module, nil
}

func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *schema.Module) error {
logger := log.FromContext(ctx)

if err, ok := s.registrationFailure.Load().Get(); ok {
observability.Deployment.Failure(ctx, optional.None[string]())
s.cancelFunc()
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
return connect.NewError(connect.CodeUnavailable, fmt.Errorf("failed to register runner: %w", err))
}

observability.Deployment.Started(ctx, key.String())
defer observability.Deployment.Completed(ctx, key.String())

deploymentLogger := s.getDeploymentLogger(ctx, key)
ctx = log.ContextWithLogger(ctx, deploymentLogger)

s.lock.Lock()
defer s.lock.Unlock()
if s.deployment.Load().Ok() {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return errors.New("already deployed")
}

gdResp, err := s.controllerClient.GetDeployment(ctx, connect.NewRequest(&ftlv1.GetDeploymentRequest{DeploymentKey: s.config.Deployment}))
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to get deployment: %w", err)
}
module, err := schema.ModuleFromProto(gdResp.Msg.Schema)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("invalid module: %w", err)
}
deploymentLogger := s.getDeploymentLogger(ctx, key)
ctx = log.ContextWithLogger(ctx, deploymentLogger)

deploymentDir := filepath.Join(s.config.DeploymentDir, module.Name, key.String())
if s.config.TemplateDir != "" {
err = copy.Copy(s.config.TemplateDir, deploymentDir)
err := copy.Copy(s.config.TemplateDir, deploymentDir)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to copy template directory: %w", err)
}
} else {
err = os.MkdirAll(deploymentDir, 0700)
err := os.MkdirAll(deploymentDir, 0700)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to create deployment directory: %w", err)
Expand Down Expand Up @@ -377,7 +401,7 @@ func (s *Service) deploy(ctx context.Context) error {
deployment, cmdCtx, err := plugin.Spawn(
unstoppable.Context(verbCtx),
log.FromContext(ctx).GetLevel(),
gdResp.Msg.Schema.Name,
module.Name,
deploymentDir,
"./launch",
ftlv1connect.NewVerbServiceClient,
Expand Down Expand Up @@ -568,3 +592,14 @@ func (s *Service) healthCheck(writer http.ResponseWriter, request *http.Request)
}
writer.WriteHeader(http.StatusServiceUnavailable)
}

func (s *Service) StartPgProxy(ctx context.Context, module *schema.Module) error {
if err := pgproxy.New(s.config.PgProxyConfig, func(ctx context.Context, params map[string]string) (string, error) {
db := params["database"]

return "postgres://127.0.0.1:5432/" + db + "?user=" + params["user"], nil
}).Start(ctx); err != nil {
return fmt.Errorf("failed to start pgproxy: %w", err)
}
return nil
}
22 changes: 1 addition & 21 deletions cmd/ftl-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@ package main

import (
"context"
"fmt"
"os"
"path/filepath"

"github.com/alecthomas/kong"
"golang.org/x/sync/errgroup"

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/runner"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/pgproxy"
)

var cli struct {
Version kong.VersionFlag `help:"Show version."`
LogConfig log.Config `prefix:"log-" embed:""`
RunnerConfig runner.Config `embed:""`
ProxyConfig pgproxy.Config `embed:"" prefix:"pgproxy-"`
}

func main() {
Expand All @@ -47,21 +43,5 @@ and route to user code.
logger := log.Configure(os.Stderr, cli.LogConfig)
ctx := log.ContextWithLogger(context.Background(), logger)

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
return runPGProxy(ctx, cli.ProxyConfig)
})
g.Go(func() error {
return runner.Start(ctx, cli.RunnerConfig)
})
kctx.FatalIfErrorf(g.Wait())
}

func runPGProxy(ctx context.Context, config pgproxy.Config) error {
if err := pgproxy.New(config, func(ctx context.Context, params map[string]string) (string, error) {
return "postgres://127.0.0.1:5432/postgres?user=" + params["user"], nil
}).Start(ctx); err != nil {
return fmt.Errorf("failed to start pgproxy: %w", err)
}
return nil
kctx.FatalIfErrorf(runner.Start(ctx, cli.RunnerConfig))
}

0 comments on commit f5554a1

Please sign in to comment.