Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call setup and teardown only once for PLZ #358

Merged
merged 2 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/v1alpha1/k6conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ const (
// - if True, it's after successful starter but before all runners have finished
TestRunRunning = "TestRunRunning"

// TeardownExecuted indicates whether the `teardown()` has been executed on one of the runners.
// This condition can be used only in PLZ test runs.
TeardownExecuted = "TeardownExecuted"

// CloudTestRun indicates if this test run is supposed to be a cloud test run
// (i.e. with `--out cloud` option).
// - if empty / Unknown, the type of test is unknown yet
Expand Down Expand Up @@ -71,6 +75,13 @@ func Initialize(k6 TestRunI) {
Reason: "TestRunPreparation",
Message: "",
},
metav1.Condition{
Type: TeardownExecuted,
Status: metav1.ConditionFalse,
LastTransitionTime: t,
Reason: "TeardownExecutedFalse",
Message: "",
},
}

UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse)
Expand Down
11 changes: 11 additions & 0 deletions api/v1alpha1/testruni.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package v1alpha1

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
runtime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -31,3 +32,13 @@ func TestRunID(k6 TestRunI) string {
}
return k6.GetStatus().TestRunID
}

func ListOptions(k6 TestRunI) *client.ListOptions {
selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})

return &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
}
57 changes: 57 additions & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/go-logr/logr"
"github.com/grafana/k6-operator/api/v1alpha1"
"github.com/grafana/k6-operator/pkg/cloud"
"github.com/grafana/k6-operator/pkg/testrun"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -186,3 +188,58 @@ func getEnvVar(vars []corev1.EnvVar, name string) string {
}
return ""
}

func (r *TestRunReconciler) hostnames(ctx context.Context, log logr.Logger, abortOnUnready bool, opts *client.ListOptions) ([]string, error) {
var (
hostnames []string
err error
)

sl := &v1.ServiceList{}

if err = r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
return nil, err
}

for _, service := range sl.Items {
log.Info(fmt.Sprintf("Checking service %s", service.Name))
if isServiceReady(log, &service) {
log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name))
hostnames = append(hostnames, service.Spec.ClusterIP)
} else {
err = fmt.Errorf("%v service is not ready", service.ObjectMeta.Name)
log.Info(err.Error())
if abortOnUnready {
return nil, err
}
}
}

return hostnames, nil
}

func runSetup(ctx context.Context, hostnames []string, log logr.Logger) error {
log.Info("Invoking setup() on the first runner")

setupData, err := testrun.RunSetup(ctx, hostnames[0])
if err != nil {
return err
}

log.Info("Sending setup data to the runners")

if err = testrun.SetSetupData(ctx, hostnames, setupData); err != nil {
return err
}

return nil
}

func runTeardown(ctx context.Context, hostnames []string, log logr.Logger) {
log.Info("Invoking teardown() on the first responsive runner")

if err := testrun.RunTeardown(ctx, hostnames); err != nil {
log.Error(err, "Failed to invoke teardown()")
}
}
36 changes: 16 additions & 20 deletions controllers/k6_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/grafana/k6-operator/pkg/resources/jobs"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func isServiceReady(log logr.Logger, service *v1.Service) bool {
Expand All @@ -40,13 +38,8 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te

log.Info("Waiting for pods to get ready")

selector := labels.SelectorFromSet(map[string]string{
"app": "k6",
"k6_cr": k6.NamespacedName().Name,
"runner": "true",
})
opts := v1alpha1.ListOptions(k6)

opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace}
pl := &v1.PodList{}
if err = r.List(ctx, pl, opts); err != nil {
log.Error(err, "Could not list pods")
Expand Down Expand Up @@ -85,25 +78,28 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te
return res, nil
}

var hostnames []string
sl := &v1.ServiceList{}
// services

if err = r.List(ctx, sl, opts); err != nil {
log.Error(err, "Could not list services")
return res, nil
log.Info("Waiting for services to get ready")

hostnames, err := r.hostnames(ctx, log, true, opts)
log.Info(fmt.Sprintf("err: %v, hostnames: %v", err, hostnames))
if err != nil {
return ctrl.Result{}, err
}

for _, service := range sl.Items {
hostnames = append(hostnames, service.Spec.ClusterIP)
log.Info(fmt.Sprintf("%d/%d services ready", len(hostnames), k6.GetSpec().Parallelism))

if !isServiceReady(log, &service) {
log.Info(fmt.Sprintf("%v service is not ready, aborting", service.ObjectMeta.Name))
return res, nil
} else {
log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name))
// setup

if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
if err := runSetup(ctx, hostnames, log); err != nil {
return ctrl.Result{}, err
}
}

// starter

starter := jobs.NewStarterJob(k6, hostnames)

if err = ctrl.SetControllerReference(k6, starter, r.Scheme); err != nil {
Expand Down
10 changes: 5 additions & 5 deletions controllers/k6_stopped_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func isJobRunning(log logr.Logger, service *v1.Service) bool {
return true
}

return status.Status().Stopped
return status.Status().Running
}

// StoppedJobs checks if the runners pods have stopped execution.
Expand All @@ -70,17 +70,17 @@ func StoppedJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *
return
}

var count int32
var runningJobs int32
for _, service := range sl.Items {

if isJobRunning(log, &service) {
count++
runningJobs++
}
}

log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-count, k6.GetSpec().Parallelism))
log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-runningJobs, k6.GetSpec().Parallelism))

if count > 0 {
if runningJobs > 0 {
return
}

Expand Down
34 changes: 31 additions & 3 deletions controllers/testrun_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,36 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log
return ctrl.Result{}, nil
}

// wait for the test to finish
if !FinishJobs(ctx, log, k6, r) {
if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
runningTime, _ := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning)

if v1alpha1.IsFalse(k6, v1alpha1.TeardownExecuted) {
var allJobsStopped bool
// TODO: figure out baseline time
if time.Since(runningTime) > time.Second*30 {
allJobsStopped = StoppedJobs(ctx, log, k6, r)
}

// The test run reached a regular stop in execution so execute teardown
if v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) && allJobsStopped {
hostnames, err := r.hostnames(ctx, log, false, v1alpha1.ListOptions(k6))
if err != nil {
return ctrl.Result{}, nil
}
runTeardown(ctx, hostnames, log)
v1alpha1.UpdateCondition(k6, v1alpha1.TeardownExecuted, metav1.ConditionTrue)

_, err = r.UpdateStatus(ctx, k6, log)
return ctrl.Result{}, err
// NOTE: we proceed here regardless whether teardown() is successful or not
} else {
// Test runs can take a long time and usually they aren't supposed
// to be too quick. So check in only periodically.
return ctrl.Result{RequeueAfter: time.Second * 15}, nil
}
}
} else if !FinishJobs(ctx, log, k6, r) {
// wait for the test to finish

// TODO: confirm if this check is needed given the check in the beginning of reconcile
if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) {
Expand Down Expand Up @@ -285,7 +313,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log
log.Info("Changing stage of TestRun status to finished")
k6.GetStatus().Stage = "finished"

_, err := r.UpdateStatus(ctx, k6, log)
_, err = r.UpdateStatus(ctx, k6, log)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/resources/jobs/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func NewRunnerJob(k6 v1alpha1.TestRunI, index int, token string) (*batchv1.Job,
// Add an job tag: in case metrics are stored, they need to be distinguished by job
command = append(command, "--tag", fmt.Sprintf("job_name=%s", name))

if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) {
command = append(command, "--no-setup", "--no-teardown", "--linger")
}

command = script.UpdateCommand(command)

var (
Expand Down
Loading
Loading