From 00f5ec45ab0cd8fddfa48bc83919f7d172c7ca18 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Wed, 4 Sep 2024 19:33:14 +1000 Subject: [PATCH] address review --- backend/controller/controller.go | 1 - .../controller/dal/internal/sql/querier.go | 4 ---- .../controller/dal/internal/sql/queries.sql | 7 +------ .../dal/internal/sql/queries.sql.go | 7 +------ .../k8sscaling/deployment_provisioner.go | 21 +++++++++++-------- .../scaling/k8sscaling/k8s_scaling.go | 13 ++++++------ .../scaling/localscaling/local_scaling.go | 2 +- backend/controller/scaling/scaling.go | 4 ++++ backend/runner/runner.go | 4 ++-- 9 files changed, 27 insertions(+), 36 deletions(-) diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 900d364820..0a5ae5ace6 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -618,7 +618,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre // Check if we can contact the runner. func (s *Service) pingRunner(ctx context.Context, endpoint *url.URL) error { - // TODO: do we really need to ping the runner first thing? We should revisit this later client := rpc.Dial(ftlv1connect.NewVerbServiceClient, endpoint.String(), log.Error) retry := backoff.Backoff{} heartbeatCtx, cancel := context.WithTimeout(ctx, s.config.RunnerTimeout) diff --git a/backend/controller/dal/internal/sql/querier.go b/backend/controller/dal/internal/sql/querier.go index 705a67a75a..b8b39d5f1e 100644 --- a/backend/controller/dal/internal/sql/querier.go +++ b/backend/controller/dal/internal/sql/querier.go @@ -108,10 +108,6 @@ type Querier interface { UpsertController(ctx context.Context, key model.ControllerKey, endpoint string) (int64, error) UpsertModule(ctx context.Context, language string, name string) (int64, error) // Upsert a runner and return the deployment ID that it is assigned to, if any. - // If the deployment key is null, then deployment_rel.id will be null, - // otherwise we try to retrieve the deployments.id using the key. If - // there is no corresponding deployment, then the deployment ID is -1 - // and the parent statement will fail due to a foreign key constraint. UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error) UpsertSubscription(ctx context.Context, arg UpsertSubscriptionParams) (UpsertSubscriptionRow, error) UpsertTopic(ctx context.Context, arg UpsertTopicParams) error diff --git a/backend/controller/dal/internal/sql/queries.sql b/backend/controller/dal/internal/sql/queries.sql index 28cbb70278..52b581e2ba 100644 --- a/backend/controller/dal/internal/sql/queries.sql +++ b/backend/controller/dal/internal/sql/queries.sql @@ -71,10 +71,6 @@ WHERE a.id = @id; -- name: UpsertRunner :one -- Upsert a runner and return the deployment ID that it is assigned to, if any. WITH deployment_rel AS ( --- If the deployment key is null, then deployment_rel.id will be null, --- otherwise we try to retrieve the deployments.id using the key. If --- there is no corresponding deployment, then the deployment ID is -1 --- and the parent statement will fail due to a foreign key constraint. SELECT id FROM deployments d WHERE d.key = sqlc.arg('deployment_key')::deployment_key LIMIT 1) @@ -95,8 +91,7 @@ RETURNING deployment_id; -- name: KillStaleRunners :one WITH matches AS ( UPDATE runners - SET state = 'dead', - deployment_id = NULL + SET state = 'dead' WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL RETURNING 1) SELECT COUNT(*) diff --git a/backend/controller/dal/internal/sql/queries.sql.go b/backend/controller/dal/internal/sql/queries.sql.go index b007274202..a71dabc13d 100644 --- a/backend/controller/dal/internal/sql/queries.sql.go +++ b/backend/controller/dal/internal/sql/queries.sql.go @@ -2052,8 +2052,7 @@ func (q *Queries) KillStaleControllers(ctx context.Context, timeout sqltypes.Dur const killStaleRunners = `-- name: KillStaleRunners :one WITH matches AS ( UPDATE runners - SET state = 'dead', - deployment_id = NULL + SET state = 'dead' WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - $1::INTERVAL RETURNING 1) SELECT COUNT(*) @@ -2418,10 +2417,6 @@ type UpsertRunnerParams struct { } // Upsert a runner and return the deployment ID that it is assigned to, if any. -// If the deployment key is null, then deployment_rel.id will be null, -// otherwise we try to retrieve the deployments.id using the key. If -// there is no corresponding deployment, then the deployment ID is -1 -// and the parent statement will fail due to a foreign key constraint. func (q *Queries) UpsertRunner(ctx context.Context, arg UpsertRunnerParams) (int64, error) { row := q.db.QueryRowContext(ctx, upsertRunner, arg.Key, diff --git a/backend/controller/scaling/k8sscaling/deployment_provisioner.go b/backend/controller/scaling/k8sscaling/deployment_provisioner.go index e1a9cef7e8..1d4e6dfb57 100644 --- a/backend/controller/scaling/k8sscaling/deployment_provisioner.go +++ b/backend/controller/scaling/k8sscaling/deployment_provisioner.go @@ -19,7 +19,6 @@ package k8sscaling import ( "context" "fmt" - "os" "strings" "time" @@ -104,7 +103,7 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl return nil } logger := log.FromContext(ctx) - logger.Infof("handling schema change for %s", msg.DeploymentKey) + logger.Infof("Handling schema change for %s", msg.DeploymentKey) deploymentClient := r.Client.AppsV1().Deployments(r.Namespace) deployment, err := deploymentClient.Get(ctx, msg.DeploymentKey, v1.GetOptions{}) deploymentExists := true @@ -143,12 +142,6 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl } func (r *DeploymentProvisioner) thisContainerImage(ctx context.Context) (string, error) { - // This is only used for testing to enable local development outside of a cluster - // Which is why it is not a proper kong flag - testContainerImage := os.Getenv("FTL_TEST_CONTAINER_IMAGE") - if testContainerImage != "" { - return testContainerImage, nil - } deploymentClient := r.Client.AppsV1().Deployments(r.Namespace) thisDeployment, err := deploymentClient.Get(ctx, thisDeploymentName, v1.GetOptions{}) if err != nil { @@ -162,6 +155,7 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc if dep.Runtime == nil { return nil } + deploymentClient := r.Client.AppsV1().Deployments(r.Namespace) logger := log.FromContext(ctx) logger.Infof("creating new kube deployment %s", name) thisImage, err := r.thisContainerImage(ctx) @@ -187,8 +181,18 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc } runnerImage := strings.ReplaceAll(ourImage, "controller", "runner") + thisDeployment, err := deploymentClient.Get(ctx, thisDeploymentName, v1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment %s: %w", thisDeploymentName, err) + } deployment.Name = name + deployment.OwnerReferences = []v1.OwnerReference{{APIVersion: "apps/v1", Kind: "deployment", Name: thisDeploymentName, UID: thisDeployment.UID}} deployment.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", runnerImage, ourVersion) + deployment.Spec.Selector = &v1.LabelSelector{MatchLabels: map[string]string{"app": name}} + if deployment.Spec.Template.ObjectMeta.Labels == nil { + deployment.Spec.Template.ObjectMeta.Labels = map[string]string{} + } + deployment.Spec.Template.ObjectMeta.Labels["app"] = name changes, err := r.syncDeployment(ctx, thisImage, deployment, dep) if err != nil { return err @@ -201,7 +205,6 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc } deployment.Labels[deploymentLabel] = name - deploymentClient := r.Client.AppsV1().Deployments(r.Namespace) _, err = deploymentClient.Create(ctx, deployment, v1.CreateOptions{}) if err != nil { return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err) diff --git a/backend/controller/scaling/k8sscaling/k8s_scaling.go b/backend/controller/scaling/k8sscaling/k8s_scaling.go index f6d628eb59..4cbe41bdcb 100644 --- a/backend/controller/scaling/k8sscaling/k8s_scaling.go +++ b/backend/controller/scaling/k8sscaling/k8s_scaling.go @@ -28,14 +28,14 @@ func NewK8sScaling(ctx context.Context, controller url.URL, leaser leases.Leaser // if we're not in a cluster, use the kubeconfig config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { - panic(err.Error()) + return fmt.Errorf("failed to get kubeconfig: %w", err) } } // creates the clientset clientset, err := kubernetes.NewForConfig(config) if err != nil { - panic(err.Error()) + return fmt.Errorf("failed to create clientset: %w", err) } namespace, err := getCurrentNamespace() @@ -56,11 +56,10 @@ func NewK8sScaling(ctx context.Context, controller url.URL, leaser leases.Leaser func getCurrentNamespace() (string, error) { namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace" - if _, err := os.Stat(namespaceFile); err == nil { - namespace, err := os.ReadFile(namespaceFile) - if err != nil { - return "", fmt.Errorf("failed to read namespace file: %w", err) - } + namespace, err := os.ReadFile(namespaceFile) + if err != nil && !os.IsNotExist(err) { + return "", fmt.Errorf("failed to read namespace file: %w", err) + } else if err == nil { return string(namespace), nil } diff --git a/backend/controller/scaling/localscaling/local_scaling.go b/backend/controller/scaling/localscaling/local_scaling.go index 563f627414..a7ea5fa830 100644 --- a/backend/controller/scaling/localscaling/local_scaling.go +++ b/backend/controller/scaling/localscaling/local_scaling.go @@ -69,7 +69,7 @@ func (l *localScaling) handleSchemaChange(ctx context.Context, msg *ftlv1.PullSc defer l.lock.Unlock() logger := log.FromContext(ctx).Scope("localScaling") ctx = log.ContextWithLogger(ctx, logger) - logger.Infof("handling schema change for %s", msg.DeploymentKey) + logger.Infof("Handling schema change for %s", msg.DeploymentKey) moduleDeployments := l.runners[msg.ModuleName] if moduleDeployments == nil { moduleDeployments = map[string]*deploymentInfo{} diff --git a/backend/controller/scaling/scaling.go b/backend/controller/scaling/scaling.go index 943dc2a778..4d2a4dcef9 100644 --- a/backend/controller/scaling/scaling.go +++ b/backend/controller/scaling/scaling.go @@ -2,6 +2,7 @@ package scaling import ( "context" + "errors" "net/url" "time" @@ -32,6 +33,9 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha }(lease) // If we get it then we take over runner scaling runGrpcScaling(leaseContext, url, handler) + } else if !errors.Is(err, leases.ErrConflict) { + logger := log.FromContext(ctx) + logger.Errorf(err, "Failed to acquire lease") } select { case <-ctx.Done(): diff --git a/backend/runner/runner.go b/backend/runner/runner.go index 9c75d65b1f..e10a6971bc 100644 --- a/backend/runner/runner.go +++ b/backend/runner/runner.go @@ -313,7 +313,7 @@ func (s *Service) deploy(ctx context.Context) error { logger.Debugf("Deployed %s", key) setState(ftlv1.RunnerState_RUNNER_ASSIGNED) context.AfterFunc(ctx, func() { - err := s.Terminate() + err := s.Close() if err != nil { logger := log.FromContext(ctx) logger.Errorf(err, "failed to terminate deployment") @@ -323,7 +323,7 @@ func (s *Service) deploy(ctx context.Context) error { return nil } -func (s *Service) Terminate() error { +func (s *Service) Close() error { s.lock.Lock() defer s.lock.Unlock() depl, ok := s.deployment.Load().Get()