Skip to content

Commit

Permalink
fix: hot reload endpoint uses runner proxy (#3636)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Dec 12, 2024
1 parent 3199ff0 commit 7927bb5
Show file tree
Hide file tree
Showing 24 changed files with 909 additions and 528 deletions.
13 changes: 1 addition & 12 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,18 +663,7 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
logger := log.FromContext(ctx)
updates := s.routeTable.Subscribe()
defer s.routeTable.Unsubscribe(updates)
view := s.controllerState.View()
depName := req.Msg.Deployment
if !strings.HasPrefix(depName, "dpl-") {
// For hot reload endponts we might not have a deployment key
deps := view.GetActiveDeployments()
for _, dep := range deps {
if dep.Module == depName {
depName = dep.Key.String()
break
}
}
}
key, err := model.ParseDeploymentKey(depName)
if err != nil {
return connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("invalid deployment key: %w", err))
Expand Down Expand Up @@ -723,7 +712,7 @@ func (s *Service) GetDeploymentContext(ctx context.Context, req *connect.Request
}
}
if deployment.Schema.Runtime != nil && deployment.Schema.Runtime.Deployment != nil {
routeTable[module] = deployment.Schema.Runtime.Deployment.Endpoint
routeTable[deployment.Key.String()] = deployment.Schema.Runtime.Deployment.Endpoint
}

if err != nil {
Expand Down
282 changes: 148 additions & 134 deletions backend/protos/xyz/block/ftl/language/v1/language.pb.go

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions backend/protos/xyz/block/ftl/language/v1/language.proto
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,9 @@ message BuildSuccess {

// Dev mode debug port
optional int32 debug_port = 8;

// Dev mode runner info file, this file is used to allow the runner to communicate provisioner info back to the plugin
optional string dev_runner_info_file = 9;
}

// BuildFailure should be sent when a build fails.
Expand Down
13 changes: 9 additions & 4 deletions backend/provisioner/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ func (l *localScaling) TerminatePreviousDeployments(ctx context.Context, module
type devModeRunner struct {
uri url.URL
// The deployment key of the deployment that is currently running
deploymentKey optional.Option[model.DeploymentKey]
debugPort int
deploymentKey optional.Option[model.DeploymentKey]
debugPort int
runnerInfoFile optional.Option[string]
}

func (l *localScaling) Start(ctx context.Context) error {
Expand All @@ -127,8 +128,9 @@ func (l *localScaling) Start(ctx context.Context) error {
// Must be called under lock
func (l *localScaling) updateDevModeEndpoint(ctx context.Context, devEndpoints dev.LocalEndpoint) {
l.devModeEndpoints[devEndpoints.Module] = &devModeRunner{
uri: devEndpoints.Endpoint,
debugPort: devEndpoints.DebugPort,
uri: devEndpoints.Endpoint,
debugPort: devEndpoints.DebugPort,
runnerInfoFile: devEndpoints.RunnerInfoFile,
}
if ide, ok := l.ideSupport.Get(); ok {
if devEndpoints.DebugPort != 0 {
Expand Down Expand Up @@ -262,9 +264,11 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey model.Depl

devEndpoint := l.devModeEndpoints[info.module]
devURI := optional.None[url.URL]()
devRunnerInfoFile := optional.None[string]()
debugPort := 0
if devEndpoint != nil {
devURI = optional.Some(devEndpoint.uri)
devRunnerInfoFile = devEndpoint.runnerInfoFile
if devKey, ok := devEndpoint.deploymentKey.Get(); ok && devKey.Equal(deploymentKey) {
// Already running, don't start another
return nil
Expand Down Expand Up @@ -304,6 +308,7 @@ func (l *localScaling) startRunner(ctx context.Context, deploymentKey model.Depl
Deployment: deploymentKey,
DebugPort: debugPort,
DevEndpoint: devURI,
DevRunnerInfoFile: devRunnerInfoFile,
}

simpleName := fmt.Sprintf("runner%d", keySuffix)
Expand Down
122 changes: 73 additions & 49 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
mysql "github.com/block/ftl-mysql-auth-proxy"
"github.com/jpillora/backoff"
"github.com/otiai10/copy"
"github.com/puzpuzpuz/xsync/v3"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -67,7 +68,8 @@ type Config struct {
HeartbeatJitter time.Duration `help:"Jitter to add to heartbeat period." default:"2s"`
Deployment model.DeploymentKey `help:"The deployment this runner is for." env:"FTL_DEPLOYMENT"`
DebugPort int `help:"The port to use for debugging." env:"FTL_DEBUG_PORT"`
DevEndpoint optional.Option[url.URL] `help:"An existing endpoint to connect to in development mode" env:"FTL_DEV_ENDPOINT"`
DevEndpoint optional.Option[url.URL] `help:"An existing endpoint to connect to in development mode" hidden:""`
DevRunnerInfoFile optional.Option[string] `help:"The path to a file that we write dev endpoint information to." hidden:""`
}

func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactService) error {
Expand Down Expand Up @@ -121,6 +123,7 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer
deploymentLogQueue: make(chan log.Entry, 10000),
cancelFunc: doneFunc,
devEndpoint: config.DevEndpoint,
devRunnerInfoFile: config.DevRunnerInfoFile,
}

module, err := svc.getModule(ctx, config.Deployment)
Expand All @@ -131,27 +134,28 @@ func Start(ctx context.Context, config Config, storage *artefacts.OCIArtefactSer
startedLatch := &sync.WaitGroup{}
startedLatch.Add(2)
g, ctx := errgroup.WithContext(ctx)
dbAddresses := xsync.NewMapOf[string, string]()
g.Go(func() error {
return svc.startPgProxy(ctx, module, startedLatch)
return svc.startPgProxy(ctx, module, startedLatch, dbAddresses)
})
g.Go(func() error {
return svc.startMySQLProxy(ctx, module, startedLatch)
return svc.startMySQLProxy(ctx, module, startedLatch, dbAddresses)
})
g.Go(func() error {
startedLatch.Wait()
select {
case <-ctx.Done():
return ctx.Err()
default:
return svc.startDeployment(ctx, config.Deployment, module)
return svc.startDeployment(ctx, config.Deployment, module, dbAddresses)
}
})

return fmt.Errorf("failure in runner: %w", g.Wait())
}

func (s *Service) startDeployment(ctx context.Context, key model.DeploymentKey, module *schema.Module) error {
err := s.deploy(ctx, key, module)
func (s *Service) startDeployment(ctx context.Context, key model.DeploymentKey, module *schema.Module, dbAddresses *xsync.MapOf[string, string]) error {
err := s.deploy(ctx, key, module, dbAddresses)
if err != nil {
// If we fail to deploy we just exit
// Kube or local scaling will start a new instance to continue
Expand Down Expand Up @@ -252,6 +256,7 @@ type Service struct {
deploymentLogQueue chan log.Entry
cancelFunc func()
devEndpoint optional.Option[url.URL]
devRunnerInfoFile optional.Option[string]
proxy *proxy.Service
pubSub *pubsub.Service
proxyBindAddress *url.URL
Expand Down Expand Up @@ -296,7 +301,7 @@ func (s *Service) getModule(ctx context.Context, key model.DeploymentKey) (*sche
return module, nil
}

func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *schema.Module) error {
func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *schema.Module, dbAddresses *xsync.MapOf[string, string]) error {
logger := log.FromContext(ctx)

if err, ok := s.registrationFailure.Load().Get(); ok {
Expand Down Expand Up @@ -331,6 +336,45 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
return fmt.Errorf("failed to create deployment directory: %w", err)
}
}

deploymentServiceClient := rpc.Dial(ftldeploymentconnect.NewDeploymentServiceClient, s.config.ControllerEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, deploymentServiceClient)

leaseServiceClient := rpc.Dial(ftlleaseconnect.NewLeaseServiceClient, s.config.LeaseEndpoint.String(), log.Error)

timelineClient := timeline.NewClient(ctx, s.config.TimelineEndpoint)
s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient, timelineClient)

pubSub, err := pubsub.New(module, key, s, timelineClient)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to create pubsub service: %w", err)
}
s.pubSub = pubSub

parse, err := url.Parse("http://127.0.0.1:0")
if err != nil {
return fmt.Errorf("failed to parse url: %w", err)
}
proxyServer, err := rpc.NewServer(ctx, parse,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, s.proxy),
rpc.GRPC(ftldeploymentconnect.NewDeploymentServiceHandler, s.proxy),
rpc.GRPC(ftlleaseconnect.NewLeaseServiceHandler, s.proxy),
rpc.GRPC(pubconnect.NewPublishServiceHandler, s.pubSub),
)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
urls := proxyServer.Bind.Subscribe(nil)
go func() {
err := proxyServer.Serve(ctx)
if err != nil {
logger.Errorf(err, "failed to serve")
return
}
}()
s.proxyBindAddress = <-urls

var dep *deployment
if ep, ok := s.devEndpoint.Get(); ok {
client := rpc.Dial(ftlv1connect.NewVerbServiceClient, ep.String(), log.Error)
Expand All @@ -341,51 +385,25 @@ func (s *Service) deploy(ctx context.Context, key model.DeploymentKey, module *s
endpoint: &ep,
client: client,
}
if file, ok := s.devRunnerInfoFile.Get(); ok {
fileContents := "proxy.bind.address=" + s.proxyBindAddress.String()
fileContents += fmt.Sprintf("\ndeployment=%s", s.config.Deployment.String())
dbAddresses.Range(func(key string, value string) bool {
fileContents += fmt.Sprintf("\ndatabase.%s.url=%s", key, value)
return true
})
err = os.WriteFile(file, []byte(fileContents), 0660) // #nosec
if err != nil {
logger.Errorf(err, "could not create FTL dev Config")
}
}
} else {
err := download.ArtefactsFromOCI(ctx, s.controllerClient, key, deploymentDir, s.storage)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to download artefacts: %w", err)
}

deploymentServiceClient := rpc.Dial(ftldeploymentconnect.NewDeploymentServiceClient, s.config.ControllerEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, deploymentServiceClient)

leaseServiceClient := rpc.Dial(ftlleaseconnect.NewLeaseServiceClient, s.config.LeaseEndpoint.String(), log.Error)

timelineClient := timeline.NewClient(ctx, s.config.TimelineEndpoint)
s.proxy = proxy.New(deploymentServiceClient, leaseServiceClient, timelineClient)

pubSub, err := pubsub.New(module, key, s, timelineClient)
if err != nil {
observability.Deployment.Failure(ctx, optional.Some(key.String()))
return fmt.Errorf("failed to create pubsub service: %w", err)
}
s.pubSub = pubSub

parse, err := url.Parse("http://127.0.0.1:0")
if err != nil {
return fmt.Errorf("failed to parse url: %w", err)
}
proxyServer, err := rpc.NewServer(ctx, parse,
rpc.GRPC(ftlv1connect.NewVerbServiceHandler, s.proxy),
rpc.GRPC(ftldeploymentconnect.NewDeploymentServiceHandler, s.proxy),
rpc.GRPC(ftlleaseconnect.NewLeaseServiceHandler, s.proxy),
rpc.GRPC(pubconnect.NewPublishServiceHandler, s.pubSub),
)
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}
urls := proxyServer.Bind.Subscribe(nil)
go func() {
err := proxyServer.Serve(ctx)
if err != nil {
logger.Errorf(err, "failed to serve")
return
}
}()
s.proxyBindAddress = <-urls

logger.Debugf("Setting FTL_ENDPOINT to %s", s.proxyBindAddress.String())
envVars := []string{"FTL_ENDPOINT=" + s.proxyBindAddress.String(),
"FTL_CONFIG=" + strings.Join(s.config.Config, ","),
Expand Down Expand Up @@ -600,7 +618,7 @@ func (s *Service) healthCheck(writer http.ResponseWriter, request *http.Request)
writer.WriteHeader(http.StatusServiceUnavailable)
}

func (s *Service) startPgProxy(ctx context.Context, module *schema.Module, started *sync.WaitGroup) error {
func (s *Service) startPgProxy(ctx context.Context, module *schema.Module, started *sync.WaitGroup, addresses *xsync.MapOf[string, string]) error {
logger := log.FromContext(ctx)

databases := map[string]*schema.Database{}
Expand All @@ -620,7 +638,11 @@ func (s *Service) startPgProxy(ctx context.Context, module *schema.Module, start
go func() {
select {
case pgProxy := <-channel:
os.Setenv("FTL_PROXY_POSTGRES_ADDRESS", fmt.Sprintf("127.0.0.1:%d", pgProxy.Address.Port))
address := fmt.Sprintf("127.0.0.1:%d", pgProxy.Address.Port)
for db := range databases {
addresses.Store(db, address)
}
os.Setenv("FTL_PROXY_POSTGRES_ADDRESS", address)
started.Done()
case <-ctx.Done():
started.Done()
Expand Down Expand Up @@ -649,7 +671,7 @@ func (s *Service) startPgProxy(ctx context.Context, module *schema.Module, start
return nil
}

func (s *Service) startMySQLProxy(ctx context.Context, module *schema.Module, latch *sync.WaitGroup) error {
func (s *Service) startMySQLProxy(ctx context.Context, module *schema.Module, latch *sync.WaitGroup, addresses *xsync.MapOf[string, string]) error {
defer latch.Done()
logger := log.FromContext(ctx)

Expand Down Expand Up @@ -689,7 +711,9 @@ func (s *Service) startMySQLProxy(ctx context.Context, module *schema.Module, la
case port = <-portC:
}

os.Setenv(strings.ToUpper("FTL_PROXY_MYSQL_ADDRESS_"+decl.Name), fmt.Sprintf("127.0.0.1:%d", port))
address := fmt.Sprintf("127.0.0.1:%d", port)
addresses.Store(decl.Name, address)
os.Setenv(strings.ToUpper("FTL_PROXY_MYSQL_ADDRESS_"+decl.Name), address)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion examples/java/echo/src/main/java/ftl/echo/Echo.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ public class Echo {
@Verb
public EchoResponse echo(EchoRequest req, TimeClient time) {
var response = time.time();
return new EchoResponse("Hello, " + req.name().orElse("anonymous") + "! The time is " + response.toString() + ".");
return new EchoResponse("Hello, " + req.name().orElse("anonymous") + "! The time is " + response.getTime() + ".");
}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/buildengine/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func handleBuildResult(ctx context.Context, projectConfig projectconfig.Config,
if devModeEndpoints != nil {
parsed, err := url.Parse(endpoint)
if err == nil {
devModeEndpoints <- dev.LocalEndpoint{Module: config.Module, Endpoint: *parsed, DebugPort: result.DebugPort, Language: config.Language}
devModeEndpoints <- dev.LocalEndpoint{Module: config.Module, Endpoint: *parsed, DebugPort: result.DebugPort, Language: config.Language, RunnerInfoFile: result.DevRunnerInfoFile}
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions internal/buildengine/languageplugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ type BuildResult struct {
// Endpoint of an instance started by the plugin to use in dev mode
DevEndpoint optional.Option[string]

// File that the runner can use to pass info into the hot reload endpoint
DevRunnerInfoFile optional.Option[string]

DebugPort int
}

Expand Down Expand Up @@ -572,12 +575,13 @@ func buildResultFromProto(result either.Either[*langpb.BuildResponse_BuildSucces
port = int(*buildSuccess.DebugPort)
}
return BuildResult{
Errors: errs,
Schema: moduleSch,
Deploy: buildSuccess.Deploy,
StartTime: startTime,
DevEndpoint: optional.Ptr(buildSuccess.DevEndpoint),
DebugPort: port,
Errors: errs,
Schema: moduleSch,
Deploy: buildSuccess.Deploy,
StartTime: startTime,
DevEndpoint: optional.Ptr(buildSuccess.DevEndpoint),
DevRunnerInfoFile: optional.Ptr(buildSuccess.DevRunnerInfoFile),
DebugPort: port,
}, nil
case either.Right[*langpb.BuildResponse_BuildSuccess, *langpb.BuildResponse_BuildFailure]:
buildFailure := result.Get().BuildFailure
Expand Down
Loading

0 comments on commit 7927bb5

Please sign in to comment.