diff --git a/api/v1alpha1/k6conditions.go b/api/v1alpha1/k6conditions.go index 6a8862c2..3b9dfb2f 100644 --- a/api/v1alpha1/k6conditions.go +++ b/api/v1alpha1/k6conditions.go @@ -16,6 +16,10 @@ const ( // - if True, it's after successful starter but before all runners have finished TestRunRunning = "TestRunRunning" + // TeardownExecuted indicates whether the `teardown()` has been executed on one of the runners. + // This condition can be used only in PLZ test runs. + TeardownExecuted = "TeardownExecuted" + // CloudTestRun indicates if this test run is supposed to be a cloud test run // (i.e. with `--out cloud` option). // - if empty / Unknown, the type of test is unknown yet @@ -71,6 +75,13 @@ func Initialize(k6 TestRunI) { Reason: "TestRunPreparation", Message: "", }, + metav1.Condition{ + Type: TeardownExecuted, + Status: metav1.ConditionFalse, + LastTransitionTime: t, + Reason: "TeardownExecutedFalse", + Message: "", + }, } UpdateCondition(k6, CloudTestRunAborted, metav1.ConditionFalse) diff --git a/api/v1alpha1/testruni.go b/api/v1alpha1/testruni.go index 0f88463f..8dd78394 100644 --- a/api/v1alpha1/testruni.go +++ b/api/v1alpha1/testruni.go @@ -2,6 +2,7 @@ package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" runtime "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" @@ -31,3 +32,13 @@ func TestRunID(k6 TestRunI) string { } return k6.GetStatus().TestRunID } + +func ListOptions(k6 TestRunI) *client.ListOptions { + selector := labels.SelectorFromSet(map[string]string{ + "app": "k6", + "k6_cr": k6.NamespacedName().Name, + "runner": "true", + }) + + return &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace} +} diff --git a/controllers/common.go b/controllers/common.go index c90f2b8e..1cc6f888 100644 --- a/controllers/common.go +++ b/controllers/common.go @@ -11,7 +11,9 @@ import ( "github.com/go-logr/logr" "github.com/grafana/k6-operator/api/v1alpha1" "github.com/grafana/k6-operator/pkg/cloud" + "github.com/grafana/k6-operator/pkg/testrun" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" @@ -186,3 +188,58 @@ func getEnvVar(vars []corev1.EnvVar, name string) string { } return "" } + +func (r *TestRunReconciler) hostnames(ctx context.Context, log logr.Logger, abortOnUnready bool, opts *client.ListOptions) ([]string, error) { + var ( + hostnames []string + err error + ) + + sl := &v1.ServiceList{} + + if err = r.List(ctx, sl, opts); err != nil { + log.Error(err, "Could not list services") + return nil, err + } + + for _, service := range sl.Items { + log.Info(fmt.Sprintf("Checking service %s", service.Name)) + if isServiceReady(log, &service) { + log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name)) + hostnames = append(hostnames, service.Spec.ClusterIP) + } else { + err = fmt.Errorf("%v service is not ready", service.ObjectMeta.Name) + log.Info(err.Error()) + if abortOnUnready { + return nil, err + } + } + } + + return hostnames, nil +} + +func runSetup(ctx context.Context, hostnames []string, log logr.Logger) error { + log.Info("Invoking setup() on the first runner") + + setupData, err := testrun.RunSetup(ctx, hostnames[0]) + if err != nil { + return err + } + + log.Info("Sending setup data to the runners") + + if err = testrun.SetSetupData(ctx, hostnames, setupData); err != nil { + return err + } + + return nil +} + +func runTeardown(ctx context.Context, hostnames []string, log logr.Logger) { + log.Info("Invoking teardown() on the first responsive runner") + + if err := testrun.RunTeardown(ctx, hostnames); err != nil { + log.Error(err, "Failed to invoke teardown()") + } +} diff --git a/controllers/k6_start.go b/controllers/k6_start.go index 3e1199d6..8b20108a 100644 --- a/controllers/k6_start.go +++ b/controllers/k6_start.go @@ -13,9 +13,7 @@ import ( "github.com/grafana/k6-operator/pkg/resources/jobs" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" ) func isServiceReady(log logr.Logger, service *v1.Service) bool { @@ -40,13 +38,8 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te log.Info("Waiting for pods to get ready") - selector := labels.SelectorFromSet(map[string]string{ - "app": "k6", - "k6_cr": k6.NamespacedName().Name, - "runner": "true", - }) + opts := v1alpha1.ListOptions(k6) - opts := &client.ListOptions{LabelSelector: selector, Namespace: k6.NamespacedName().Namespace} pl := &v1.PodList{} if err = r.List(ctx, pl, opts); err != nil { log.Error(err, "Could not list pods") @@ -85,25 +78,28 @@ func StartJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r *Te return res, nil } - var hostnames []string - sl := &v1.ServiceList{} + // services - if err = r.List(ctx, sl, opts); err != nil { - log.Error(err, "Could not list services") - return res, nil + log.Info("Waiting for services to get ready") + + hostnames, err := r.hostnames(ctx, log, true, opts) + log.Info(fmt.Sprintf("err: %v, hostnames: %v", err, hostnames)) + if err != nil { + return ctrl.Result{}, err } - for _, service := range sl.Items { - hostnames = append(hostnames, service.Spec.ClusterIP) + log.Info(fmt.Sprintf("%d/%d services ready", len(hostnames), k6.GetSpec().Parallelism)) - if !isServiceReady(log, &service) { - log.Info(fmt.Sprintf("%v service is not ready, aborting", service.ObjectMeta.Name)) - return res, nil - } else { - log.Info(fmt.Sprintf("%v service is ready", service.ObjectMeta.Name)) + // setup + + if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { + if err := runSetup(ctx, hostnames, log); err != nil { + return ctrl.Result{}, err } } + // starter + starter := jobs.NewStarterJob(k6, hostnames) if err = ctrl.SetControllerReference(k6, starter, r.Scheme); err != nil { diff --git a/controllers/k6_stopped_jobs.go b/controllers/k6_stopped_jobs.go index d010e0a1..301d392f 100644 --- a/controllers/k6_stopped_jobs.go +++ b/controllers/k6_stopped_jobs.go @@ -44,7 +44,7 @@ func isJobRunning(log logr.Logger, service *v1.Service) bool { return true } - return status.Status().Stopped + return status.Status().Running } // StoppedJobs checks if the runners pods have stopped execution. @@ -70,17 +70,17 @@ func StoppedJobs(ctx context.Context, log logr.Logger, k6 v1alpha1.TestRunI, r * return } - var count int32 + var runningJobs int32 for _, service := range sl.Items { if isJobRunning(log, &service) { - count++ + runningJobs++ } } - log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-count, k6.GetSpec().Parallelism)) + log.Info(fmt.Sprintf("%d/%d runners stopped execution", k6.GetSpec().Parallelism-runningJobs, k6.GetSpec().Parallelism)) - if count > 0 { + if runningJobs > 0 { return } diff --git a/controllers/testrun_controller.go b/controllers/testrun_controller.go index d6b1f51e..92f1d353 100644 --- a/controllers/testrun_controller.go +++ b/controllers/testrun_controller.go @@ -203,8 +203,36 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log return ctrl.Result{}, nil } - // wait for the test to finish - if !FinishJobs(ctx, log, k6, r) { + if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { + runningTime, _ := v1alpha1.LastUpdate(k6, v1alpha1.TestRunRunning) + + if v1alpha1.IsFalse(k6, v1alpha1.TeardownExecuted) { + var allJobsStopped bool + // TODO: figure out baseline time + if time.Since(runningTime) > time.Second*30 { + allJobsStopped = StoppedJobs(ctx, log, k6, r) + } + + // The test run reached a regular stop in execution so execute teardown + if v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) && allJobsStopped { + hostnames, err := r.hostnames(ctx, log, false, v1alpha1.ListOptions(k6)) + if err != nil { + return ctrl.Result{}, nil + } + runTeardown(ctx, hostnames, log) + v1alpha1.UpdateCondition(k6, v1alpha1.TeardownExecuted, metav1.ConditionTrue) + + _, err = r.UpdateStatus(ctx, k6, log) + return ctrl.Result{}, err + // NOTE: we proceed here regardless whether teardown() is successful or not + } else { + // Test runs can take a long time and usually they aren't supposed + // to be too quick. So check in only periodically. + return ctrl.Result{RequeueAfter: time.Second * 15}, nil + } + } + } else if !FinishJobs(ctx, log, k6, r) { + // wait for the test to finish // TODO: confirm if this check is needed given the check in the beginning of reconcile if v1alpha1.IsTrue(k6, v1alpha1.CloudTestRun) && v1alpha1.IsFalse(k6, v1alpha1.CloudTestRunAborted) { @@ -285,7 +313,7 @@ func (r *TestRunReconciler) reconcile(ctx context.Context, req ctrl.Request, log log.Info("Changing stage of TestRun status to finished") k6.GetStatus().Stage = "finished" - _, err := r.UpdateStatus(ctx, k6, log) + _, err = r.UpdateStatus(ctx, k6, log) if err != nil { return ctrl.Result{}, err } diff --git a/pkg/resources/jobs/runner.go b/pkg/resources/jobs/runner.go index 9e56fb26..70e357dc 100644 --- a/pkg/resources/jobs/runner.go +++ b/pkg/resources/jobs/runner.go @@ -73,6 +73,10 @@ func NewRunnerJob(k6 v1alpha1.TestRunI, index int, token string) (*batchv1.Job, // Add an job tag: in case metrics are stored, they need to be distinguished by job command = append(command, "--tag", fmt.Sprintf("job_name=%s", name)) + if v1alpha1.IsTrue(k6, v1alpha1.CloudPLZTestRun) { + command = append(command, "--no-setup", "--no-teardown", "--linger") + } + command = script.UpdateCommand(command) var ( diff --git a/pkg/resources/jobs/runner_test.go b/pkg/resources/jobs/runner_test.go index 9aa1e882..f6fce5a6 100644 --- a/pkg/resources/jobs/runner_test.go +++ b/pkg/resources/jobs/runner_test.go @@ -1593,3 +1593,147 @@ func TestNewRunnerJobWithVolume(t *testing.T) { t.Errorf("NewRunnerJob returned unexpected data, diff: %s", diff) } } + +func TestNewRunnerJobPLZTestRun(t *testing.T) { + // NewRunnerJob does not validate the type of Script for + // internal consistency (like in PLZ case) so it can be anything. + script := &types.Script{ + Name: "test", + Filename: "thing.js", + Type: "ConfigMap", + } + + var zero int64 = 0 + automountServiceAccountToken := true + + expectedLabels := map[string]string{ + "app": "k6", + "k6_cr": "test", + "runner": "true", + "label1": "awesome", + } + + expectedOutcome := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-1", + Namespace: "test", + Labels: expectedLabels, + Annotations: map[string]string{ + "awesomeAnnotation": "dope", + }, + }, + Spec: batchv1.JobSpec{ + BackoffLimit: new(int32), + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: expectedLabels, + Annotations: map[string]string{ + "awesomeAnnotation": "dope", + }, + }, + Spec: corev1.PodSpec{ + Hostname: "test-1", + RestartPolicy: corev1.RestartPolicyNever, + SecurityContext: &corev1.PodSecurityContext{}, + Affinity: nil, + NodeSelector: nil, + Tolerations: nil, + TopologySpreadConstraints: nil, + ServiceAccountName: "default", + AutomountServiceAccountToken: &automountServiceAccountToken, + Containers: []corev1.Container{{ + Image: "ghcr.io/grafana/k6-operator:latest-runner", + ImagePullPolicy: corev1.PullNever, + Name: "k6", + Command: []string{"k6", "run", "--quiet", "/test/test.js", "--address=0.0.0.0:6565", "--paused", "--tag", "instance_id=1", "--tag", "job_name=test-1", "--no-setup", "--no-teardown", "--linger"}, + Env: []corev1.EnvVar{}, + Resources: corev1.ResourceRequirements{}, + VolumeMounts: script.VolumeMount(), + Ports: []corev1.ContainerPort{{ContainerPort: 6565}}, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "env", + }, + }, + }, + }, + LivenessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/v1/status", + Port: intstr.IntOrString{IntVal: 6565}, + Scheme: "HTTP", + }, + }, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/v1/status", + Port: intstr.IntOrString{IntVal: 6565}, + Scheme: "HTTP", + }, + }, + }, + }}, + TerminationGracePeriodSeconds: &zero, + Volumes: script.Volume(), + }, + }, + }, + } + k6 := &v1alpha1.TestRun{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "test", + }, + Spec: v1alpha1.TestRunSpec{ + Script: v1alpha1.K6Script{ + ConfigMap: v1alpha1.K6Configmap{ + Name: "test", + File: "test.js", + }, + }, + Runner: v1alpha1.Pod{ + Metadata: v1alpha1.PodMetadata{ + Labels: map[string]string{ + "label1": "awesome", + }, + Annotations: map[string]string{ + "awesomeAnnotation": "dope", + }, + }, + EnvFrom: []corev1.EnvFromSource{ + { + ConfigMapRef: &corev1.ConfigMapEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "env", + }, + }, + }, + }, + ImagePullPolicy: corev1.PullNever, + }, + }, + Status: v1alpha1.TestRunStatus{ + Conditions: []metav1.Condition{ + { + Type: v1alpha1.CloudPLZTestRun, + Status: metav1.ConditionTrue, + LastTransitionTime: metav1.Now(), + // Reason: "CloudPLZTestRunTrue", + }, + }, + }, + } + + job, err := NewRunnerJob(k6, 1, "") + if err != nil { + t.Errorf("NewRunnerJob errored, got: %v", err) + } + if diff := deep.Equal(job, expectedOutcome); diff != nil { + t.Errorf("NewRunnerJob returned unexpected data, diff: %s", diff) + } +} diff --git a/pkg/testrun/k6client.go b/pkg/testrun/k6client.go new file mode 100644 index 00000000..e336d2f8 --- /dev/null +++ b/pkg/testrun/k6client.go @@ -0,0 +1,70 @@ +package testrun + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + + "github.com/grafana/k6-operator/pkg/types" + k6Client "go.k6.io/k6/api/v1/client" +) + +// This will probably be removed once distributed mode in k6 is implemented. + +func RunSetup(ctx context.Context, hostname string) (_ json.RawMessage, err error) { + c, err := k6Client.New(fmt.Sprintf("%v:6565", hostname), k6Client.WithHTTPClient(&http.Client{ + Timeout: 0, + })) + if err != nil { + return + } + + var response types.SetupData + if err = c.CallAPI(ctx, "POST", &url.URL{Path: "/v1/setup"}, nil, &response); err != nil { + return nil, err + } + + if response.Data.Attributes.Data != nil { + var tmpSetupDataObj interface{} + if err := json.Unmarshal(response.Data.Attributes.Data, &tmpSetupDataObj); err != nil { + return nil, err + } + } + + return response.Data.Attributes.Data, nil +} + +func SetSetupData(ctx context.Context, hostnames []string, data json.RawMessage) (err error) { + for _, hostname := range hostnames { + c, err := k6Client.New(fmt.Sprintf("%v:6565", hostname), k6Client.WithHTTPClient(&http.Client{ + Timeout: 0, + })) + if err != nil { + return err + } + + if err = c.CallAPI(ctx, "PUT", &url.URL{Path: "/v1/setup"}, data, nil); err != nil { + return err + } + } + + return nil +} + +func RunTeardown(ctx context.Context, hostnames []string) (err error) { + if len(hostnames) == 0 { + return errors.New("no k6 Service is available to run teardown") + } + + c, err := k6Client.New(fmt.Sprintf("%v:6565", hostnames[0]), k6Client.WithHTTPClient(&http.Client{ + Timeout: 0, + })) + if err != nil { + return + } + + return c.CallAPI(ctx, "POST", &url.URL{Path: "/v1/teardown"}, nil, nil) +} diff --git a/pkg/types/conditions.go b/pkg/types/conditions.go index b2633d8c..a93f6ecf 100644 --- a/pkg/types/conditions.go +++ b/pkg/types/conditions.go @@ -70,6 +70,10 @@ var reasons = map[string]string{ "TestRunRunningTrue": "TestRunRunningTrue", "TestRunRunningFalse": "TestRunRunningFalse", + "TeardownExecutedUnknown": "TestRunPreparation", + "TeardownExecutedFalse": "TeardownExecutedFalse", + "TeardownExecutedTrue": "TeardownExecutedTrue", + "CloudTestRunUnknown": "TestRunTypeUnknown", "CloudTestRunTrue": "CloudTestRunTrue", "CloudTestRunFalse": "CloudTestRunFalse", diff --git a/pkg/types/k6status.go b/pkg/types/k6status.go index 40ba6e91..4821c59a 100644 --- a/pkg/types/k6status.go +++ b/pkg/types/k6status.go @@ -1,5 +1,7 @@ package types +import "encoding/json" + // k6 REST API types. // TODO: refactor with existing definitions in k6 api/v1? @@ -17,3 +19,17 @@ type StatusAPIRequestDataAttributes struct { Paused bool `json:"paused"` Stopped bool `json:"stopped"` } + +type SetupData struct { + Data setUpData `json:"data"` +} + +type setUpData struct { + Type string `json:"type"` + ID string `json:"id"` + Attributes setupResponseAttributes `json:"attributes"` +} + +type setupResponseAttributes struct { + Data json.RawMessage `json:"data"` +}