From 1f66d735b09b2ca4a06bcf1bd83da32bd1f397f7 Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Wed, 7 Feb 2024 15:30:38 -0800 Subject: [PATCH] feat: add wait flag on deploy cmd fixes #885 --- backend/controller/controller.go | 13 + cmd/ftl/cmd_deploy.go | 46 ++ cmd/ftl/cmd_dev.go | 1 + .../src/protos/xyz/block/ftl/v1/ftl_pb.ts | 6 + integration/integration_test.go | 15 +- protos/xyz/block/ftl/v1/ftl.pb.go | 406 +++++++++--------- protos/xyz/block/ftl/v1/ftl.proto | 1 + 7 files changed, 280 insertions(+), 208 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 8e450c388c..5851dde084 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -337,11 +337,13 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR return out }) s.routesMu.RUnlock() + replicas := map[string]int32{} protoRunners, err := slices.MapErr(status.Runners, func(r dal.Runner) (*ftlv1.StatusResponse_Runner, error) { var deployment *string if d, ok := r.Deployment.Get(); ok { asString := d.String() deployment = &asString + replicas[asString] = replicas[asString] + 1 } labels, err := structpb.NewStruct(r.Labels) if err != nil { @@ -368,6 +370,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR Language: d.Language, Name: d.Module, MinReplicas: int32(d.MinReplicas), + Replicas: replicas[d.Name.String()], Schema: d.Schema.ToProto().(*schemapb.Module), //nolint:forcetypeassert Labels: labels, }, nil @@ -551,6 +554,16 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre } else if err != nil { return nil, err } + + routes, err := s.dal.GetRoutingTable(ctx, nil) + if errors.Is(err, dal.ErrNotFound) { + routes = map[string][]dal.Route{} + } else if err != nil { + return nil, err + } + s.routesMu.Lock() + s.routes = routes + s.routesMu.Unlock() } if stream.Err() != nil { return nil, stream.Err() diff --git a/cmd/ftl/cmd_deploy.go b/cmd/ftl/cmd_deploy.go index f62ec495a7..84ac57f83f 100644 --- a/cmd/ftl/cmd_deploy.go +++ b/cmd/ftl/cmd_deploy.go @@ -6,9 +6,11 @@ import ( "os" "path/filepath" "strings" + "time" "connectrpc.com/connect" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/timestamppb" @@ -24,6 +26,7 @@ import ( type deployCmd struct { Replicas int32 `short:"n" help:"Number of replicas to deploy." default:"1"` ModuleDir string `arg:"" help:"Directory containing ftl.toml" type:"existingdir" default:"."` + Wait bool `help:"Only complete the deploy command when sufficient runners have been provisioned, i.e. the deployment is online and reachable." default:"false"` } func (d *deployCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceClient) error { @@ -100,10 +103,53 @@ func (d *deployCmd) Run(ctx context.Context, client ftlv1connect.ControllerServi return err } + if d.Wait { + logger.Infof("Waiting for deployment %s to become ready", resp.Msg.DeploymentName) + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(d.checkReadiness(ctx, client, resp.Msg.DeploymentName)) + if err := wg.Wait(); err != nil { + return fmt.Errorf("deployment %s failed to become ready: %w", resp.Msg.DeploymentName, err) + } + } + logger.Infof("Successfully created deployment %s", resp.Msg.DeploymentName) return nil } +func (d *deployCmd) checkReadiness(ctx context.Context, client ftlv1connect.ControllerServiceClient, deploymentName string) func() error { + return func() error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + status, err := client.Status(ctx, connect.NewRequest(&ftlv1.StatusRequest{ + AllDeployments: true, + })) + if err != nil { + return err + } + + var found bool + for _, deployment := range status.Msg.Deployments { + if deployment.Key == deploymentName { + found = true + if deployment.Replicas >= d.Replicas { + return nil + } + } + } + if !found { + return fmt.Errorf("deployment %s not found: %v", deploymentName, status.Msg.Deployments) + } + case <-ctx.Done(): + return ctx.Err() + } + } + } +} func (d *deployCmd) loadProtoSchema(deployDir string, config moduleconfig.ModuleConfig) (*schemapb.Module, error) { schema := filepath.Join(deployDir, config.Schema) content, err := os.ReadFile(schema) diff --git a/cmd/ftl/cmd_dev.go b/cmd/ftl/cmd_dev.go index fd7756d004..f7fac62ec9 100644 --- a/cmd/ftl/cmd_dev.go +++ b/cmd/ftl/cmd_dev.go @@ -138,6 +138,7 @@ func (d *devCmd) Run(ctx context.Context, client ftlv1connect.ControllerServiceC deploy := deployCmd{ Replicas: 1, ModuleDir: dir, + Wait: d.ExitAfterDeploy, } err = deploy.Run(ctx, client) if err != nil { diff --git a/frontend/src/protos/xyz/block/ftl/v1/ftl_pb.ts b/frontend/src/protos/xyz/block/ftl/v1/ftl_pb.ts index d04da3d91a..e0024b4401 100644 --- a/frontend/src/protos/xyz/block/ftl/v1/ftl_pb.ts +++ b/frontend/src/protos/xyz/block/ftl/v1/ftl_pb.ts @@ -1630,6 +1630,11 @@ export class StatusResponse_Deployment extends Message