Skip to content

Commit

Permalink
Deploy required manifests from within the e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
TheSpiritXIII committed Dec 14, 2023
1 parent 03d823e commit f6284ea
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 164 deletions.
179 changes: 27 additions & 152 deletions e2e/operator_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@ import (
"errors"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap/zapcore"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -56,10 +53,6 @@ import (
)

const (
collectorManifest = "../cmd/operator/deploy/operator/10-collector.yaml"
ruleEvalManifest = "../cmd/operator/deploy/operator/11-rule-evaluator.yaml"
alertmanagerManifest = "../cmd/operator/deploy/operator/12-alertmanager.yaml"

testLabel = "monitoring.googleapis.com/prometheus-test"
)

Expand All @@ -76,6 +69,7 @@ var (
portForward bool
leakResources bool
cleanup bool
deployOperator bool
)

func init() {
Expand All @@ -90,6 +84,9 @@ func newClient() (client.Client, error) {
if err != nil {
return nil, fmt.Errorf("operator schema: %w", err)
}
if err := apiextensionsv1.AddToScheme(scheme); err != nil {
return nil, err
}

return client.New(kubeconfig, client.Options{
Scheme: scheme,
Expand Down Expand Up @@ -119,11 +116,12 @@ func TestMain(m *testing.M) {
flag.StringVar(&projectID, "project-id", "", "The GCP project to write metrics to.")
flag.StringVar(&cluster, "cluster", "", "The name of the Kubernetes cluster that's tested against.")
flag.StringVar(&location, "location", "", "The location of the Kubernetes cluster that's tested against.")
flag.BoolVar(&skipGCM, "skip-gcm", false, "Skip validating GCM ingested points.")
flag.BoolVar(&skipGCM, "skip-gcm", true, "Skip validating GCM ingested points.")
flag.StringVar(&gcpServiceAccount, "gcp-service-account", "", "Path to GCP service account file for usage by deployed containers.")
flag.BoolVar(&portForward, "port-forward", true, "Whether to port-forward Kubernetes HTTP requests.")
flag.BoolVar(&leakResources, "leak-resources", true, "If set, prevents deleting resources. Useful for debugging.")
flag.BoolVar(&cleanup, "cleanup-resources", true, "If set, cleans resources before running tests.")
flag.BoolVar(&deployOperator, "deploy-operator", false, "If set, deploys a GMP operator to more closely mimic an actual environment.")

flag.Parse()

Expand Down Expand Up @@ -163,6 +161,12 @@ func TestMain(m *testing.M) {
}
}

ctx := context.Background()
if err := DeployGlobalResources(ctx, c); err != nil {
fmt.Fprintln(os.Stderr, "create global resources:", err)
os.Exit(1)
}

go func() {
os.Exit(m.Run())
}()
Expand All @@ -180,7 +184,7 @@ func TestMain(m *testing.M) {
return
}
fmt.Fprintln(os.Stdout, "cleaning up abandoned resources...")
if err := cleanupResources(context.Background(), kubeconfig, c, ""); err != nil {
if err := cleanupResources(ctx, kubeconfig, c, ""); err != nil {
fmt.Fprintln(os.Stderr, "Cleaning up namespaces failed:", err)
os.Exit(1)
}
Expand Down Expand Up @@ -233,40 +237,10 @@ func newOperatorContext(t *testing.T) *OperatorContext {
cancel()
})

if err := createBaseResources(ctx, tctx.Client(), namespace, pubNamespace, userNamespace, tctx.GetOperatorTestLabelValue()); err != nil {
t.Fatalf("create resources: %s", err)
}

var httpClient *http.Client
if portForward {
var err error
httpClient, err = kubeutil.PortForwardClient(t, kubeconfig, tctx.Client())
if err != nil {
t.Fatalf("creating HTTP client: %s", err)
}
}

op, err := operator.New(globalLogger, kubeconfig, operator.Options{
ProjectID: projectID,
Cluster: cluster,
Location: location,
OperatorNamespace: tctx.namespace,
PublicNamespace: tctx.pubNamespace,
// Pick a random available port.
ListenAddr: ":0",
CollectorHTTPClient: httpClient,
})
if err != nil {
t.Fatalf("instantiating operator: %s", err)
if err := createLocalResources(t, ctx, tctx.Client(), namespace, pubNamespace, userNamespace, tctx.GetOperatorTestLabelValue()); err != nil {
t.Fatalf("create local resources: %s", err)
}

go func() {
if err := op.Run(ctx, prometheus.NewRegistry()); err != nil {
// Since we aren't in the main test goroutine we cannot fail with Fatal here.
t.Errorf("running operator: %s", err)
}
}()

return tctx
}

Expand Down Expand Up @@ -314,46 +288,27 @@ func (tctx *OperatorContext) GetOperatorTestLabelValue() string {
return strings.SplitN(tctx.T.Name(), "/", 2)[0]
}

// createBaseResources creates resources the operator requires to exist already.
// createLocalResources creates resources the operator requires to exist already.
// These are resources which don't depend on runtime state and can thus be deployed
// statically, allowing to run the operator without critical write permissions.
func createBaseResources(ctx context.Context, kubeClient client.Client, opNamespace, publicNamespace, userNamespace, labelValue string) error {
if err := createNamespaces(ctx, kubeClient, opNamespace, publicNamespace, userNamespace); err != nil {
return err
}

if err := createGCPSecretResources(ctx, kubeClient, opNamespace); err != nil {
return err
}
if err := createCollectorResources(ctx, kubeClient, opNamespace, labelValue); err != nil {
return err
}
if err := createAlertmanagerResources(ctx, kubeClient, opNamespace); err != nil {
func createLocalResources(t testing.TB, ctx context.Context, kubeClient client.Client, opNamespace, publicNamespace, userNamespace, labelValue string) error {
if err := DeployCollectorResources(t, ctx, kubeClient, opNamespace, publicNamespace, testLabel, labelValue, skipGCM, !deployOperator); err != nil {
return err
}
return nil
}

func createNamespaces(ctx context.Context, kubeClient client.Client, opNamespace, publicNamespace, userNamespace string) error {
if err := kubeClient.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: opNamespace,
Name: userNamespace,
},
}); err != nil {
return fmt.Errorf("create user namespace: %w", err)
}
if err := createGCPSecretResources(ctx, kubeClient, opNamespace); err != nil {
return err
}
if err := kubeClient.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: publicNamespace,
},
}); err != nil {
if err := createAlertmanagerResources(ctx, kubeClient, opNamespace); err != nil {
return err
}
return kubeClient.Create(ctx, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: userNamespace,
},
})
return nil
}

func createGCPSecretResources(ctx context.Context, kubeClient client.Client, namespace string) error {
Expand All @@ -377,90 +332,10 @@ func createGCPSecretResources(ctx context.Context, kubeClient client.Client, nam
return nil
}

func createCollectorResources(ctx context.Context, kubeClient client.Client, namespace, labelValue string) error {
if err := kubeClient.Create(ctx, &corev1.ServiceAccount{
ObjectMeta: metav1.ObjectMeta{
Name: operator.NameCollector,
Namespace: namespace,
},
}); err != nil {
return err
}

// The cluster role expected to exist already.
const clusterRoleName = operator.DefaultOperatorNamespace + ":" + operator.NameCollector

if err := kubeClient.Create(ctx, &rbacv1.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleName + ":" + namespace,
},
RoleRef: rbacv1.RoleRef{
APIGroup: "rbac.authorization.k8s.io",
Kind: "ClusterRole",
// The ClusterRole is expected to exist in the cluster already.
Name: clusterRoleName,
},
Subjects: []rbacv1.Subject{
{
Kind: "ServiceAccount",
Namespace: namespace,
Name: operator.NameCollector,
},
},
}); err != nil {
return err
}

obj, err := kubeutil.ResourceFromFile(kubeClient.Scheme(), collectorManifest)
if err != nil {
return fmt.Errorf("decode collector: %w", err)
}
collector := obj.(*appsv1.DaemonSet)
collector.Namespace = namespace
if collector.Spec.Template.Labels == nil {
collector.Spec.Template.Labels = map[string]string{}
}
collector.Spec.Template.Labels[testLabel] = labelValue
if skipGCM {
for i := range collector.Spec.Template.Spec.Containers {
container := &collector.Spec.Template.Spec.Containers[i]
if container.Name == "prometheus" {
container.Args = append(container.Args, "--export.debug.disable-auth")
break
}
}
}

if err = kubeClient.Create(ctx, collector); err != nil {
return fmt.Errorf("create collector DaemonSet: %w", err)
}
return nil
}

func createAlertmanagerResources(ctx context.Context, kubeClient client.Client, namespace string) error {
obj, err := kubeutil.ResourceFromFile(kubeClient.Scheme(), ruleEvalManifest)
if err != nil {
return fmt.Errorf("decode evaluator: %w", err)
}
evaluator := obj.(*appsv1.Deployment)
evaluator.Namespace = namespace

if err := kubeClient.Create(ctx, evaluator); err != nil {
return fmt.Errorf("create rule-evaluator Deployment: %w", err)
}

objs, err := kubeutil.ResourcesFromFile(kubeClient.Scheme(), alertmanagerManifest)
if err != nil {
return fmt.Errorf("read alertmanager YAML: %w", err)
}
for i, obj := range objs {
obj.SetNamespace(namespace)

if err := kubeClient.Create(ctx, obj); err != nil {
return fmt.Errorf("create object at index %d: %w", i, err)
}
if err := DeployAlertmanagerResources(ctx, kubeClient, namespace); err != nil {
return err
}

return nil
}

Expand Down
Loading

0 comments on commit f6284ea

Please sign in to comment.