Skip to content

Commit

Permalink
address review
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Sep 4, 2024
1 parent 5db48ac commit 00f5ec4
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 36 deletions.
1 change: 0 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre

// Check if we can contact the runner.
func (s *Service) pingRunner(ctx context.Context, endpoint *url.URL) error {
// TODO: do we really need to ping the runner first thing? We should revisit this later
client := rpc.Dial(ftlv1connect.NewVerbServiceClient, endpoint.String(), log.Error)
retry := backoff.Backoff{}
heartbeatCtx, cancel := context.WithTimeout(ctx, s.config.RunnerTimeout)
Expand Down
4 changes: 0 additions & 4 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 1 addition & 6 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ WHERE a.id = @id;
-- name: UpsertRunner :one
-- Upsert a runner and return the deployment ID that it is assigned to, if any.
WITH deployment_rel AS (
-- If the deployment key is null, then deployment_rel.id will be null,
-- otherwise we try to retrieve the deployments.id using the key. If
-- there is no corresponding deployment, then the deployment ID is -1
-- and the parent statement will fail due to a foreign key constraint.
SELECT id FROM deployments d
WHERE d.key = sqlc.arg('deployment_key')::deployment_key
LIMIT 1)
Expand All @@ -95,8 +91,7 @@ RETURNING deployment_id;
-- name: KillStaleRunners :one
WITH matches AS (
UPDATE runners
SET state = 'dead',
deployment_id = NULL
SET state = 'dead'
WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL
RETURNING 1)
SELECT COUNT(*)
Expand Down
7 changes: 1 addition & 6 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 12 additions & 9 deletions backend/controller/scaling/k8sscaling/deployment_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package k8sscaling
import (
"context"
"fmt"
"os"
"strings"
"time"

Expand Down Expand Up @@ -104,7 +103,7 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
return nil
}
logger := log.FromContext(ctx)
logger.Infof("handling schema change for %s", msg.DeploymentKey)
logger.Infof("Handling schema change for %s", msg.DeploymentKey)
deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
deployment, err := deploymentClient.Get(ctx, msg.DeploymentKey, v1.GetOptions{})
deploymentExists := true
Expand Down Expand Up @@ -143,12 +142,6 @@ func (r *DeploymentProvisioner) handleSchemaChange(ctx context.Context, msg *ftl
}

func (r *DeploymentProvisioner) thisContainerImage(ctx context.Context) (string, error) {
// This is only used for testing to enable local development outside of a cluster
// Which is why it is not a proper kong flag
testContainerImage := os.Getenv("FTL_TEST_CONTAINER_IMAGE")
if testContainerImage != "" {
return testContainerImage, nil
}
deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
thisDeployment, err := deploymentClient.Get(ctx, thisDeploymentName, v1.GetOptions{})
if err != nil {
Expand All @@ -162,6 +155,7 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc
if dep.Runtime == nil {
return nil
}
deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
logger := log.FromContext(ctx)
logger.Infof("creating new kube deployment %s", name)
thisImage, err := r.thisContainerImage(ctx)
Expand All @@ -187,8 +181,18 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc
}
runnerImage := strings.ReplaceAll(ourImage, "controller", "runner")

thisDeployment, err := deploymentClient.Get(ctx, thisDeploymentName, v1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get deployment %s: %w", thisDeploymentName, err)
}
deployment.Name = name
deployment.OwnerReferences = []v1.OwnerReference{{APIVersion: "apps/v1", Kind: "deployment", Name: thisDeploymentName, UID: thisDeployment.UID}}
deployment.Spec.Template.Spec.Containers[0].Image = fmt.Sprintf("%s:%s", runnerImage, ourVersion)
deployment.Spec.Selector = &v1.LabelSelector{MatchLabels: map[string]string{"app": name}}
if deployment.Spec.Template.ObjectMeta.Labels == nil {
deployment.Spec.Template.ObjectMeta.Labels = map[string]string{}
}
deployment.Spec.Template.ObjectMeta.Labels["app"] = name
changes, err := r.syncDeployment(ctx, thisImage, deployment, dep)
if err != nil {
return err
Expand All @@ -201,7 +205,6 @@ func (r *DeploymentProvisioner) handleNewDeployment(ctx context.Context, dep *sc
}
deployment.Labels[deploymentLabel] = name

deploymentClient := r.Client.AppsV1().Deployments(r.Namespace)
_, err = deploymentClient.Create(ctx, deployment, v1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment %s: %w", deployment.Name, err)
Expand Down
13 changes: 6 additions & 7 deletions backend/controller/scaling/k8sscaling/k8s_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ func NewK8sScaling(ctx context.Context, controller url.URL, leaser leases.Leaser
// if we're not in a cluster, use the kubeconfig
config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
panic(err.Error())
return fmt.Errorf("failed to get kubeconfig: %w", err)
}
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)

if err != nil {
panic(err.Error())
return fmt.Errorf("failed to create clientset: %w", err)
}

namespace, err := getCurrentNamespace()
Expand All @@ -56,11 +56,10 @@ func NewK8sScaling(ctx context.Context, controller url.URL, leaser leases.Leaser

func getCurrentNamespace() (string, error) {
namespaceFile := "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
if _, err := os.Stat(namespaceFile); err == nil {
namespace, err := os.ReadFile(namespaceFile)
if err != nil {
return "", fmt.Errorf("failed to read namespace file: %w", err)
}
namespace, err := os.ReadFile(namespaceFile)
if err != nil && !os.IsNotExist(err) {
return "", fmt.Errorf("failed to read namespace file: %w", err)
} else if err == nil {
return string(namespace), nil
}

Expand Down
2 changes: 1 addition & 1 deletion backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (l *localScaling) handleSchemaChange(ctx context.Context, msg *ftlv1.PullSc
defer l.lock.Unlock()
logger := log.FromContext(ctx).Scope("localScaling")
ctx = log.ContextWithLogger(ctx, logger)
logger.Infof("handling schema change for %s", msg.DeploymentKey)
logger.Infof("Handling schema change for %s", msg.DeploymentKey)
moduleDeployments := l.runners[msg.ModuleName]
if moduleDeployments == nil {
moduleDeployments = map[string]*deploymentInfo{}
Expand Down
4 changes: 4 additions & 0 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package scaling

import (
"context"
"errors"
"net/url"
"time"

Expand Down Expand Up @@ -32,6 +33,9 @@ func BeginGrpcScaling(ctx context.Context, url url.URL, leaser leases.Leaser, ha
}(lease)
// If we get it then we take over runner scaling
runGrpcScaling(leaseContext, url, handler)
} else if !errors.Is(err, leases.ErrConflict) {
logger := log.FromContext(ctx)
logger.Errorf(err, "Failed to acquire lease")
}
select {
case <-ctx.Done():
Expand Down
4 changes: 2 additions & 2 deletions backend/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func (s *Service) deploy(ctx context.Context) error {
logger.Debugf("Deployed %s", key)
setState(ftlv1.RunnerState_RUNNER_ASSIGNED)
context.AfterFunc(ctx, func() {
err := s.Terminate()
err := s.Close()
if err != nil {
logger := log.FromContext(ctx)
logger.Errorf(err, "failed to terminate deployment")
Expand All @@ -323,7 +323,7 @@ func (s *Service) deploy(ctx context.Context) error {
return nil
}

func (s *Service) Terminate() error {
func (s *Service) Close() error {
s.lock.Lock()
defer s.lock.Unlock()
depl, ok := s.deployment.Load().Get()
Expand Down

0 comments on commit 00f5ec4

Please sign in to comment.