diff --git a/Makefile b/Makefile index bf261dd8..87326eb1 100755 --- a/Makefile +++ b/Makefile @@ -422,6 +422,7 @@ e2e-test: e2e-test/teardown e2e-test/setup ${PWD}/test/e2e/pkg -- \ -api-server=https://$(shell cat ${PWD}/test/e2e/.external_host_ip):30080 \ -grpc-server=$(shell cat ${PWD}/test/e2e/.external_host_ip):30090 \ + -server-kubeconfig=${PWD}/test/e2e/.kubeconfig \ -consumer-name=$(shell cat ${PWD}/test/e2e/.consumer_name) \ - -consumer-kubeconfig=${PWD}/test/e2e/.kubeconfig + -agent-kubeconfig=${PWD}/test/e2e/.kubeconfig .PHONY: e2e-test diff --git a/test/e2e/pkg/grpc_test.go b/test/e2e/pkg/grpc_test.go index 5670c3a4..34c1ea95 100644 --- a/test/e2e/pkg/grpc_test.go +++ b/test/e2e/pkg/grpc_test.go @@ -95,7 +95,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource spec using grpc client", func() { - evt := helper.NewEvent(sourceID, "create_request", consumer.Name, resourceID, deployName, 1, 1) + evt := helper.NewEvent(sourceID, "create_request", agentTestOpts.consumerName, resourceID, deployName, 1, 1) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -132,7 +132,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -152,7 +152,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource spec with update request using grpc client", func() { - evt := helper.NewEvent(sourceID, "update_request", consumer.Name, resourceID, deployName, 1, 2) + evt := helper.NewEvent(sourceID, "update_request", agentTestOpts.consumerName, resourceID, deployName, 1, 2) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -189,7 +189,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -209,7 +209,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource spec with delete request using grpc client", func() { - evt := helper.NewEvent(sourceID, "delete_request", consumer.Name, resourceID, deployName, 2, 2) + evt := helper.NewEvent(sourceID, "delete_request", agentTestOpts.consumerName, resourceID, deployName, 2, 2) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -233,7 +233,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -302,7 +302,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource bundle spec using grpc client", func() { - evt := helper.NewBundleEvent(sourceID, "create_request", consumer.Name, resourceID, deployName, 1, 1) + evt := helper.NewBundleEvent(sourceID, "create_request", agentTestOpts.consumerName, resourceID, deployName, 1, 1) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -354,7 +354,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -374,7 +374,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource bundle spec with update request using grpc client", func() { - evt := helper.NewBundleEvent(sourceID, "update_request", consumer.Name, resourceID, deployName, 1, 2) + evt := helper.NewBundleEvent(sourceID, "update_request", agentTestOpts.consumerName, resourceID, deployName, 1, 2) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -426,7 +426,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -446,7 +446,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { }) It("publish a resource bundle spec with delete request using grpc client", func() { - evt := helper.NewBundleEvent(sourceID, "delete_request", consumer.Name, resourceID, deployName, 2, 2) + evt := helper.NewBundleEvent(sourceID, "delete_request", agentTestOpts.consumerName, resourceID, deployName, 2, 2) pbEvt := &pbv1.CloudEvent{} err := grpcprotocol.WritePBMessage(ctx, binding.ToMessage(evt), pbEvt) Expect(err).To(BeNil(), "failed to convert spec from cloudevent to protobuf") @@ -470,7 +470,7 @@ var _ = Describe("GRPC", Ordered, Label("e2e-tests-grpc"), func() { It("get the nginx deployment from cluster", func() { Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil diff --git a/test/e2e/pkg/resources_test.go b/test/e2e/pkg/resources_test.go index fb190b7c..9fca6f7f 100644 --- a/test/e2e/pkg/resources_test.go +++ b/test/e2e/pkg/resources_test.go @@ -25,7 +25,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { deployName := fmt.Sprintf("nginx-%s", rand.String(5)) var resource *openapi.Resource It("post the nginx resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployName, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployName, 1) var resp *http.Response var err error resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -35,7 +35,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*resource.Version).To(Equal(int32(1))) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -55,7 +55,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }) It("patch the nginx resource with the maestro api", func() { - newRes := helper.NewAPIResource(consumer.Name, deployName, 2) + newRes := helper.NewAPIResource(agentTestOpts.consumerName, deployName, 2) patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(ctx, *resource.Id). ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resource.Version, Manifest: newRes.Manifest}).Execute() Expect(err).ShouldNot(HaveOccurred()) @@ -63,7 +63,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*patchedResource.Version).To(Equal(*resource.Version + 1)) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -80,7 +80,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -96,7 +96,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { deployName := fmt.Sprintf("nginx-%s", rand.String(5)) var resource *openapi.Resource It("post the nginx resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployName, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployName, 1) res.DeleteOption = map[string]interface{}{"propagationPolicy": "Orphan"} var resp *http.Response var err error @@ -106,7 +106,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*resource.Id).ShouldNot(BeEmpty()) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -124,7 +124,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { // ensure the "nginx" deployment in the "default" namespace is not deleted Consistently(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return fmt.Errorf("nginx deployment is deleted") @@ -135,11 +135,11 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }) It("delete the nginx deployment", func() { - err := consumer.ClientSet.AppsV1().Deployments("default").Delete(ctx, deployName, metav1.DeleteOptions{}) + err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Delete(ctx, deployName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -155,7 +155,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { deployName := fmt.Sprintf("nginx-%s", rand.String(5)) var resource *openapi.Resource It("post the nginx resource to the maestro api with createOnly updateStrategy", func() { - res := helper.NewAPIResource(consumer.Name, deployName, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployName, 1) res.UpdateStrategy = map[string]interface{}{"type": "CreateOnly"} var resp *http.Response var err error @@ -165,7 +165,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(*resource.Id).ShouldNot(BeEmpty()) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return err } @@ -177,7 +177,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }) It("patch the nginx resource", func() { - newRes := helper.NewAPIResource(consumer.Name, deployName, 2) + newRes := helper.NewAPIResource(agentTestOpts.consumerName, deployName, 2) patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(ctx, *resource.Id). ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resource.Version, Manifest: newRes.Manifest}).Execute() Expect(err).ShouldNot(HaveOccurred()) @@ -186,7 +186,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { // ensure the "nginx" deployment in the "default" namespace is not updated Consistently(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { return nil } @@ -203,7 +203,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -222,7 +222,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { nginxDeploy := &appsv1.Deployment{} err := json.Unmarshal([]byte(helper.NewResourceManifestJSON(deployName, 1)), nginxDeploy) Expect(err).ShouldNot(HaveOccurred()) - _, err = consumer.ClientSet.AppsV1().Deployments("default").Create(ctx, nginxDeploy, metav1.CreateOptions{}) + _, err = agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Create(ctx, nginxDeploy, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) @@ -230,13 +230,13 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { var resp *http.Response var err error // post the resource with readonly updateStrategy and foreground delete option should fail - invalidRes := helper.NewReadOnlyAPIResource(consumer.Name, deployName) + invalidRes := helper.NewReadOnlyAPIResource(agentTestOpts.consumerName, deployName) invalidRes.DeleteOption = map[string]interface{}{"propagationPolicy": "Foreground"} resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(invalidRes).Execute() Expect(err).Should(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusBadRequest)) - res := helper.NewReadOnlyAPIResource(consumer.Name, deployName) + res := helper.NewReadOnlyAPIResource(agentTestOpts.consumerName, deployName) resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusCreated)) @@ -286,11 +286,11 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { Expect(err).ShouldNot(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) - err = consumer.ClientSet.AppsV1().Deployments("default").Delete(ctx, deployName, metav1.DeleteOptions{}) + err = agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Delete(ctx, deployName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -307,7 +307,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { secretName := "auth-" + rand.String(5) manifest := fmt.Sprintf("{\"apiVersion\":\"v1\",\"kind\":\"Secret\",\"metadata\":{\"name\":\"%s\",\"namespace\":\"default\"}}", secretName) It("create a secret in the target cluster", func() { - _, err := consumer.ClientSet.CoreV1().Secrets("default").Create(ctx, &corev1.Secret{ + _, err := agentTestOpts.kubeClientSet.CoreV1().Secrets("default").Create(ctx, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secretName, Namespace: "default", @@ -360,7 +360,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }, } Eventually(func() error { - _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) return err }, 5*time.Minute, 5*time.Second).ShouldNot(HaveOccurred()) }) @@ -380,7 +380,7 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { It("get the resource status back", func() { Eventually(func() error { - work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{}) + work, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Get(ctx, workName, metav1.GetOptions{}) if err != nil { return err } @@ -402,14 +402,14 @@ var _ = Describe("Resources", Ordered, Label("e2e-tests-resources"), func() { }) It("delete the readonly resource", func() { - err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = consumer.ClientSet.CoreV1().Secrets("default").Delete(ctx, secretName, metav1.DeleteOptions{}) + err = agentTestOpts.kubeClientSet.CoreV1().Secrets("default").Delete(ctx, secretName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { - _, err := consumer.ClientSet.CoreV1().Secrets("default").Get(ctx, secretName, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.CoreV1().Secrets("default").Get(ctx, secretName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil diff --git a/test/e2e/pkg/serverside_test.go b/test/e2e/pkg/serverside_test.go index d753bc2d..e4859a5d 100644 --- a/test/e2e/pkg/serverside_test.go +++ b/test/e2e/pkg/serverside_test.go @@ -64,7 +64,7 @@ var _ = Describe("Server Side Apply", Ordered, Label("e2e-tests-serverside-apply res := openapi.Resource{ Manifest: manifest, - ConsumerName: &consumer.Name, + ConsumerName: &agentTestOpts.consumerName, } created, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -121,12 +121,12 @@ var _ = Describe("Server Side Apply", Ordered, Label("e2e-tests-serverside-apply nestedWorkNamespace := "default" work := NewNestedManifestWork(nestedWorkNamespace, workName, nestedWorkName) - _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) // make sure the nested work is created Eventually(func() error { - _, err := kubeWorkClient.WorkV1().ManifestWorks(nestedWorkNamespace).Get(ctx, nestedWorkName, metav1.GetOptions{}) + _, err := agentTestOpts.workClientSet.WorkV1().ManifestWorks(nestedWorkNamespace).Get(ctx, nestedWorkName, metav1.GetOptions{}) if err != nil { return err } @@ -136,7 +136,7 @@ var _ = Describe("Server Side Apply", Ordered, Label("e2e-tests-serverside-apply // make sure the nested work is not updated Consistently(func() error { - nestedWork, err := kubeWorkClient.WorkV1().ManifestWorks(nestedWorkNamespace).Get(ctx, nestedWorkName, metav1.GetOptions{}) + nestedWork, err := agentTestOpts.workClientSet.WorkV1().ManifestWorks(nestedWorkNamespace).Get(ctx, nestedWorkName, metav1.GetOptions{}) if err != nil { return err } @@ -148,7 +148,7 @@ var _ = Describe("Server Side Apply", Ordered, Label("e2e-tests-serverside-apply return nil }, 1*time.Minute, 1*time.Second).Should(BeNil()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) }) diff --git a/test/e2e/pkg/sourceclient_test.go b/test/e2e/pkg/sourceclient_test.go index ec6a0d20..3a1c9086 100644 --- a/test/e2e/pkg/sourceclient_test.go +++ b/test/e2e/pkg/sourceclient_test.go @@ -33,7 +33,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- workName = "work-" + rand.String(5) work := NewManifestWork(workName) Eventually(func() error { - _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) return err }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) @@ -42,7 +42,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- }) AfterEach(func() { - err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -53,7 +53,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- It("Should return an error when updating an obsolete work", func() { By("update a work by work client") - work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{}) + work, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Get(ctx, workName, metav1.GetOptions{}) Expect(err).ShouldNot(HaveOccurred()) newWork := work.DeepCopy() @@ -61,7 +61,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- patchData, err := grpcsource.ToWorkPatch(work, newWork) Expect(err).ShouldNot(HaveOccurred()) - _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) Expect(err).ShouldNot(HaveOccurred()) By("update the work by work client again") @@ -70,7 +70,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- patchData, err = grpcsource.ToWorkPatch(work, obsoleteWork) Expect(err).ShouldNot(HaveOccurred()) - _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) Expect(err).Should(HaveOccurred()) Expect(strings.Contains(err.Error(), "the resource version is not the latest")).Should(BeTrue()) @@ -93,20 +93,20 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- initWorkAName = "init-work-a-" + rand.String(5) work := NewManifestWorkWithLabels(initWorkAName, map[string]string{"app": "test"}) - _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) initWorkBName = "init-work-b-" + rand.String(5) work = NewManifestWorkWithLabels(initWorkBName, map[string]string{"app": "test"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) AfterEach(func() { - err := workClient.ManifestWorks(consumer.Name).Delete(ctx, initWorkAName, metav1.DeleteOptions{}) + err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, initWorkAName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, initWorkBName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, initWorkBName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -131,20 +131,20 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- Expect(err).ShouldNot(HaveOccurred()) By("start watching") - watcher, err := watcherClient.ManifestWorks(consumer.Name).Watch(watcherCtx, metav1.ListOptions{}) + watcher, err := watcherClient.ManifestWorks(agentTestOpts.consumerName).Watch(watcherCtx, metav1.ListOptions{}) Expect(err).ShouldNot(HaveOccurred()) result := StartWatch(watcherCtx, watcher) By("create a work by work client") workName := "work-" + rand.String(5) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) // wait for few seconds to ensure the creation is finished <-time.After(5 * time.Second) By("update a work by work client") - work, err := workClient.ManifestWorks(consumer.Name).Get(ctx, workName, metav1.GetOptions{}) + work, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Get(ctx, workName, metav1.GetOptions{}) Expect(err).ShouldNot(HaveOccurred()) newWork := work.DeepCopy() @@ -152,14 +152,14 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- patchData, err := grpcsource.ToWorkPatch(work, newWork) Expect(err).ShouldNot(HaveOccurred()) - _, err = workClient.ManifestWorks(consumer.Name).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Patch(ctx, workName, types.MergePatchType, patchData, metav1.PatchOptions{}) Expect(err).ShouldNot(HaveOccurred()) // wait for few seconds to ensure the work status is updated by agent <-time.After(5 * time.Second) By("delete the work by work client") - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -181,8 +181,8 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- Expect(err).ShouldNot(HaveOccurred()) allConsumerWatcherResult := StartWatch(watcherCtx, allConsumerWatcher) - By("start watching works from consumer" + consumer.Name) - consumerWatcher, err := watcherClient.ManifestWorks(consumer.Name).Watch(watcherCtx, metav1.ListOptions{}) + By("start watching works from consumer" + agentTestOpts.consumerName) + consumerWatcher, err := watcherClient.ManifestWorks(agentTestOpts.consumerName).Watch(watcherCtx, metav1.ListOptions{}) Expect(err).ShouldNot(HaveOccurred()) consumerWatcherResult := StartWatch(watcherCtx, consumerWatcher) @@ -193,14 +193,14 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- By("create a work by work client") workName := "work-" + rand.String(5) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, NewManifestWork(workName), metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) // wait for few seconds to ensure the creation is finished <-time.After(5 * time.Second) By("delete the work by work client") - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -229,7 +229,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- Expect(err).ShouldNot(HaveOccurred()) By("start watching with label app=test") - watcher, err := watcherClient.ManifestWorks(consumer.Name).Watch(watcherCtx, metav1.ListOptions{ + watcher, err := watcherClient.ManifestWorks(agentTestOpts.consumerName).Watch(watcherCtx, metav1.ListOptions{ LabelSelector: "app=test", }) Expect(err).ShouldNot(HaveOccurred()) @@ -238,14 +238,14 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- By("create a work by work client") workName := "work-" + rand.String(5) work := NewManifestWorkWithLabels(workName, map[string]string{"app": "test"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) // wait for few seconds to ensure the creation is finished <-time.After(5 * time.Second) By("delete the work by work client") - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -265,27 +265,27 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- // prepare works firstly workName = "work-" + rand.String(5) work := NewManifestWork(workName) - _, err := workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) prodWorkName = "work-production" + rand.String(5) work = NewManifestWorkWithLabels(prodWorkName, map[string]string{"app": "test", "env": "production"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) testWorkAName = "work-integration-a-" + rand.String(5) work = NewManifestWorkWithLabels(testWorkAName, map[string]string{"app": "test", "env": "integration", "val": "a"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) testWorkBName = "work-integration-b-" + rand.String(5) work = NewManifestWorkWithLabels(testWorkBName, map[string]string{"app": "test", "env": "integration", "val": "b"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) testWorkCName = "work-integration-c-" + rand.String(5) work = NewManifestWorkWithLabels(testWorkCName, map[string]string{"app": "test", "env": "integration", "val": "c"}) - _, err = workClient.ManifestWorks(consumer.Name).Create(ctx, work, metav1.CreateOptions{}) + _, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Create(ctx, work, metav1.CreateOptions{}) Expect(err).ShouldNot(HaveOccurred()) // wait for few seconds to ensure the creation is finished @@ -293,19 +293,19 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- }) AfterEach(func() { - err := workClient.ManifestWorks(consumer.Name).Delete(ctx, workName, metav1.DeleteOptions{}) + err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, workName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, prodWorkName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, prodWorkName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, testWorkAName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, testWorkAName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, testWorkBName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, testWorkBName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) - err = workClient.ManifestWorks(consumer.Name).Delete(ctx, testWorkCName, metav1.DeleteOptions{}) + err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Delete(ctx, testWorkCName, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) Eventually(func() error { @@ -331,57 +331,57 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- It("List works with options", func() { By("list all works") - works, err := workClient.ManifestWorks(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) + works, err := sourceWorkClient.ManifestWorks(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, workName, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) By("list works by consumer name") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{}) + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{}) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, workName, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) By("list works by nonexistent consumer") - works, err = workClient.ManifestWorks("nonexistent").List(ctx, metav1.ListOptions{}) + works, err = sourceWorkClient.ManifestWorks("nonexistent").List(ctx, metav1.ListOptions{}) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) By("list works with nonexistent labels") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "nonexistent=true", }) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items)).ShouldNot(HaveOccurred()) By("list works with app label") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "app=test", }) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) By("list works without test env") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "app=test,env!=integration", }) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, prodWorkName)).ShouldNot(HaveOccurred()) By("list works in prod and test env") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "env in (production, integration)", }) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, prodWorkName, testWorkAName, testWorkBName, testWorkCName)).ShouldNot(HaveOccurred()) By("list works in test env and val not in a and b") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "env=integration,val notin (a,b)", }) Expect(err).ShouldNot(HaveOccurred()) Expect(AssertWorks(works.Items, testWorkCName)).ShouldNot(HaveOccurred()) By("list works with val label") - works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ LabelSelector: "val", }) Expect(err).ShouldNot(HaveOccurred()) @@ -389,7 +389,7 @@ var _ = Describe("Source ManifestWork Client", Ordered, Label("e2e-tests-source- // TODO support does not exist // By("list works without val label") - // works, err = workClient.ManifestWorks(consumer.Name).List(ctx, metav1.ListOptions{ + // works, err = sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).List(ctx, metav1.ListOptions{ // LabelSelector: "!val", // }) // Expect(err).ShouldNot(HaveOccurred()) @@ -476,7 +476,7 @@ func AssertWatchResult(result *WatchedResult) error { } func AssertWorkNotFound(name string) error { - _, err := workClient.ManifestWorks(consumer.Name).Get(ctx, name, metav1.GetOptions{}) + _, err := sourceWorkClient.ManifestWorks(agentTestOpts.consumerName).Get(ctx, name, metav1.GetOptions{}) if errors.IsNotFound(err) { return nil } diff --git a/test/e2e/pkg/spec_resync_test.go b/test/e2e/pkg/spec_resync_test.go index 5d374a43..a6a7aa6a 100644 --- a/test/e2e/pkg/spec_resync_test.go +++ b/test/e2e/pkg/spec_resync_test.go @@ -8,11 +8,9 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/openshift-online/maestro/pkg/api/openapi" - corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" ) @@ -24,7 +22,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res deployB := fmt.Sprintf("nginx-%s", rand.String(5)) deployC := fmt.Sprintf("nginx-%s", rand.String(5)) It("post the nginx A resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployA, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployA, 1) var resp *http.Response var err error resourceA, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -33,7 +31,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res Expect(*resourceA.Id).ShouldNot(BeEmpty()) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) if err != nil { return err } @@ -45,7 +43,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res }) It("post the nginx B resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployB, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployB, 1) var resp *http.Response var err error resourceB, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -54,7 +52,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res Expect(*resourceB.Id).ShouldNot(BeEmpty()) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) if err != nil { return err } @@ -66,20 +64,20 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res }) It("shut down maestro agent", func() { - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro-agent").Get(ctx, "maestro-agent", metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments(agentTestOpts.agentNamespace).Get(ctx, "maestro-agent", metav1.GetOptions{}) Expect(err).ShouldNot(HaveOccurred()) maestroAgentReplicas = int(*deploy.Spec.Replicas) // patch maestro agent replicas to 0 - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro-agent").Patch(ctx, "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", + deploy, err = agentTestOpts.kubeClientSet.AppsV1().Deployments(agentTestOpts.agentNamespace).Patch(ctx, "maestro-agent", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "testagentTestOpts.kubeClientSet", }) Expect(err).ShouldNot(HaveOccurred()) Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) // ensure no running maestro agent pods Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro-agent").List(ctx, metav1.ListOptions{ + pods, err := agentTestOpts.kubeClientSet.CoreV1().Pods(agentTestOpts.agentNamespace).List(ctx, metav1.ListOptions{ LabelSelector: "app=maestro-agent", }) if err != nil { @@ -93,7 +91,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res }) It("patch the nginx A resource", func() { - newRes := helper.NewAPIResource(consumer.Name, deployA, 2) + newRes := helper.NewAPIResource(agentTestOpts.consumerName, deployA, 2) patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(ctx, *resourceA.Id). ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resourceA.Version, Manifest: newRes.Manifest}).Execute() Expect(err).ShouldNot(HaveOccurred()) @@ -103,7 +101,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx A resource is not updated", func() { Consistently(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) if err != nil { return nil } @@ -122,7 +120,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx B resource is not deleted", func() { Consistently(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return fmt.Errorf("nginx B deployment %s is deleted", deployB) @@ -133,7 +131,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res }) It("post the nginx C resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployC, 1) + res := helper.NewAPIResource(agentTestOpts.consumerName, deployC, 1) var resp *http.Response var err error resourceC, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -144,7 +142,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx C resource is not created", func() { Consistently(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) if err == nil { return fmt.Errorf("nginx C deployment %s is created", deployC) } @@ -154,15 +152,15 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("restart maestro agent", func() { // patch maestro agent replicas back - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro-agent").Patch(ctx, "maestro-agent", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroAgentReplicas)), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments(agentTestOpts.agentNamespace).Patch(ctx, "maestro-agent", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroAgentReplicas)), metav1.PatchOptions{ + FieldManager: "testagentTestOpts.kubeClientSet", }) Expect(err).ShouldNot(HaveOccurred()) Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroAgentReplicas))) // ensure maestro agent pod is up and running Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro-agent").List(ctx, metav1.ListOptions{ + pods, err := agentTestOpts.kubeClientSet.CoreV1().Pods(agentTestOpts.agentNamespace).List(ctx, metav1.ListOptions{ LabelSelector: "app=maestro-agent", }) if err != nil { @@ -185,7 +183,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx A resource is updated", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) if err != nil { return err } @@ -198,7 +196,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx B resource is deleted", func() { Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -211,7 +209,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res It("ensure the nginx C resource is created", func() { Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) if err != nil { return err } @@ -228,7 +226,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -243,7 +241,7 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -256,362 +254,362 @@ var _ = Describe("Spec Resync After Restart", Ordered, Label("e2e-tests-spec-res }) }) -var _ = Describe("Spec Resync After Reconnect", Ordered, Label("e2e-tests-spec-resync-reconnect"), func() { - Context("Resource resync resource spec after maestro agent reconnects", func() { - var maestroServerReplicas, mqttReplicas int - var resourceA, resourceB, resourceC *openapi.Resource - deployA := fmt.Sprintf("nginx-%s", rand.String(5)) - deployB := fmt.Sprintf("nginx-%s", rand.String(5)) - deployC := fmt.Sprintf("nginx-%s", rand.String(5)) - It("post the nginx A resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployA, 1) - var resp *http.Response - var err error - resourceA, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusCreated)) - Expect(*resourceA.Id).ShouldNot(BeEmpty()) - - Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) - if err != nil { - return err - } - if *deploy.Spec.Replicas != 1 { - return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 1, got %d", deployA, *deploy.Spec.Replicas) - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("post the nginx B resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployB, 1) - var resp *http.Response - var err error - resourceB, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusCreated)) - Expect(*resourceB.Id).ShouldNot(BeEmpty()) - - Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) - if err != nil { - return err - } - if *deploy.Spec.Replicas != 1 { - return fmt.Errorf("unexpected replicas for nginx B deployment %s, expected 1, got %d", deployB, *deploy.Spec.Replicas) - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("delete the grpc-broker service for agent", func() { - err := consumer.ClientSet.CoreV1().Services("maestro").Delete(ctx, "maestro-grpc-broker", metav1.DeleteOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("delete the mqtt-broker service for agent", func() { - err := consumer.ClientSet.CoreV1().Services("maestro").Delete(ctx, "maestro-mqtt-agent", metav1.DeleteOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("rollout maestro server", func() { - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro").Get(ctx, "maestro", metav1.GetOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - maestroServerReplicas = int(*deploy.Spec.Replicas) - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) - - // ensure no running maestro server pods - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "app=maestro", - }) - if err != nil { - return err - } - if len(pods.Items) > 0 { - return fmt.Errorf("maestro server pods still running") - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - // patch maestro server replicas to maestroServerReplicas - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroServerReplicas))) - - // ensure maestro server pod is up and running - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "app=maestro", - }) - if err != nil { - return err - } - if len(pods.Items) != maestroServerReplicas { - return fmt.Errorf("unexpected maestro server pod count, expected %d, got %d", maestroServerReplicas, len(pods.Items)) - } - for _, pod := range pods.Items { - if pod.Status.Phase != "Running" { - return fmt.Errorf("maestro server pod not in running state") - } - if pod.Status.ContainerStatuses[0].State.Running == nil { - return fmt.Errorf("maestro server container not in running state") - } - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("rollout the mqtt-broker", func() { - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro").Get(ctx, "maestro-mqtt", metav1.GetOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - mqttReplicas = int(*deploy.Spec.Replicas) - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) - - // ensure no running mqtt-broker pods - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "name=maestro-mqtt", - }) - if err != nil { - return err - } - if len(pods.Items) > 0 { - return fmt.Errorf("maestro-mqtt pods still running") - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - // patch mqtt-broker replicas to mqttReplicas - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ - FieldManager: "testconsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) - - // ensure mqtt-broker pod is up and running - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "name=maestro-mqtt", - }) - if err != nil { - return err - } - if len(pods.Items) != mqttReplicas { - return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) - } - for _, pod := range pods.Items { - if pod.Status.Phase != "Running" { - return fmt.Errorf("maestro-mqtt pod not in running state") - } - if pod.Status.ContainerStatuses[0].State.Running == nil { - return fmt.Errorf("maestro-mqtt container not in running state") - } - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("patch the nginx A resource", func() { - newRes := helper.NewAPIResource(consumer.Name, deployA, 2) - Eventually(func() error { - patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(ctx, *resourceA.Id). - ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resourceA.Version, Manifest: newRes.Manifest}).Execute() - if err != nil { - return err - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("unexpected status code, expected 200, got %d", resp.StatusCode) - } - if *patchedResource.Version != *resourceA.Version+1 { - return fmt.Errorf("unexpected version, expected %d, got %d", *resourceA.Version+1, *patchedResource.Version) - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("ensure the nginx A resource is not updated", func() { - Consistently(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) - if err != nil { - return nil - } - if *deploy.Spec.Replicas != 1 { - return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 1, got %d", deployA, *deploy.Spec.Replicas) - } - return nil - }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("delete the nginx B resource", func() { - resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceB.Id).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) - }) - - It("ensure the nginx B resource is not deleted", func() { - Consistently(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return fmt.Errorf("nginx B deployment %s is deleted", deployB) - } - } - return nil - }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("post the nginx C resource to the maestro api", func() { - res := helper.NewAPIResource(consumer.Name, deployC, 1) - var resp *http.Response - var err error - resourceC, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusCreated)) - Expect(*resourceC.Id).ShouldNot(BeEmpty()) - }) - - It("ensure the nginx C resource is not created", func() { - Consistently(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) - if err == nil { - return fmt.Errorf("nginx C deployment %s is created", deployC) - } - return nil - }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("recreate the mqtt-broker service for agent", func() { - mqttAgentService := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "maestro-mqtt-agent", - Namespace: "maestro", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "name": "maestro-mqtt", - }, - Ports: []corev1.ServicePort{ - { - Name: "mosquitto", - Protocol: corev1.ProtocolTCP, - Port: 1883, - TargetPort: intstr.FromInt(1883), - }, - }, - Type: corev1.ServiceTypeClusterIP, - }, - } - - _, err := consumer.ClientSet.CoreV1().Services("maestro").Create(ctx, mqttAgentService, metav1.CreateOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("recreate the grpc-broker service for agent", func() { - grpcBrokerService := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "maestro-grpc-broker", - Namespace: "maestro", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "app": "maestro", - }, - Ports: []corev1.ServicePort{ - { - Name: "grpc-broker", - Protocol: corev1.ProtocolTCP, - Port: 8091, - TargetPort: intstr.FromInt(8091), - }, - }, - Type: corev1.ServiceTypeClusterIP, - }, - } - _, err := consumer.ClientSet.CoreV1().Services("maestro").Create(ctx, grpcBrokerService, metav1.CreateOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("ensure the nginx A resource is updated", func() { - Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) - if err != nil { - return err - } - if *deploy.Spec.Replicas != 2 { - return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 2, got %d", deployA, *deploy.Spec.Replicas) - } - return nil - }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) - }) - - It("ensure the nginx B resource is deleted", func() { - Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("nginx B deployment %s still exists", deployB) - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("ensure the nginx C resource is created", func() { - Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) - if err != nil { - return err - } - if *deploy.Spec.Replicas != 1 { - return fmt.Errorf("unexpected replicas for nginx C deployment %s, expected 1, got %d", deployC, *deploy.Spec.Replicas) - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("delete the nginx A and nginx C resources", func() { - resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceA.Id).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) - - Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("nginx A deployment %s still exists", deployA) - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceC.Id).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) - - Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("nginx C deployment %s still exists", deployC) - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - }) -}) +// var _ = Describe("Spec Resync After Reconnect", Ordered, Label("e2e-tests-spec-resync-reconnect"), func() { +// Context("Resource resync resource spec after maestro agent reconnects", func() { +// var maestroServerReplicas, mqttReplicas int +// var resourceA, resourceB, resourceC *openapi.Resource +// deployA := fmt.Sprintf("nginx-%s", rand.String(5)) +// deployB := fmt.Sprintf("nginx-%s", rand.String(5)) +// deployC := fmt.Sprintf("nginx-%s", rand.String(5)) +// It("post the nginx A resource to the maestro api", func() { +// res := helper.NewAPIResource(agentTestOpts.consumerName, deployA, 1) +// var resp *http.Response +// var err error +// resourceA, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusCreated)) +// Expect(*resourceA.Id).ShouldNot(BeEmpty()) + +// Eventually(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) +// if err != nil { +// return err +// } +// if *deploy.Spec.Replicas != 1 { +// return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 1, got %d", deployA, *deploy.Spec.Replicas) +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("post the nginx B resource to the maestro api", func() { +// res := helper.NewAPIResource(agentTestOpts.consumerName, deployB, 1) +// var resp *http.Response +// var err error +// resourceB, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusCreated)) +// Expect(*resourceB.Id).ShouldNot(BeEmpty()) + +// Eventually(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) +// if err != nil { +// return err +// } +// if *deploy.Spec.Replicas != 1 { +// return fmt.Errorf("unexpected replicas for nginx B deployment %s, expected 1, got %d", deployB, *deploy.Spec.Replicas) +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("delete the grpc-broker service for agent", func() { +// err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Delete(ctx, "maestro-grpc-broker", metav1.DeleteOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("delete the mqtt-broker service for agent", func() { +// err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Delete(ctx, "maestro-mqtt-agent", metav1.DeleteOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("rollout maestro server", func() { +// deploy, err := serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Get(ctx, "maestro", metav1.GetOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// maestroServerReplicas = int(*deploy.Spec.Replicas) +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + +// // ensure no running maestro server pods +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "app=maestro", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) > 0 { +// return fmt.Errorf("maestro server pods still running") +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + +// // patch maestro server replicas to maestroServerReplicas +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroServerReplicas))) + +// // ensure maestro server pod is up and running +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "app=maestro", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) != maestroServerReplicas { +// return fmt.Errorf("unexpected maestro server pod count, expected %d, got %d", maestroServerReplicas, len(pods.Items)) +// } +// for _, pod := range pods.Items { +// if pod.Status.Phase != "Running" { +// return fmt.Errorf("maestro server pod not in running state") +// } +// if pod.Status.ContainerStatuses[0].State.Running == nil { +// return fmt.Errorf("maestro server container not in running state") +// } +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("rollout the mqtt-broker", func() { +// deploy, err := serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Get(ctx, "maestro-mqtt", metav1.GetOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// mqttReplicas = int(*deploy.Spec.Replicas) +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + +// // ensure no running mqtt-broker pods +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "name=maestro-mqtt", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) > 0 { +// return fmt.Errorf("maestro-mqtt pods still running") +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + +// // patch mqtt-broker replicas to mqttReplicas +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) + +// // ensure mqtt-broker pod is up and running +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "name=maestro-mqtt", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) != mqttReplicas { +// return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) +// } +// for _, pod := range pods.Items { +// if pod.Status.Phase != "Running" { +// return fmt.Errorf("maestro-mqtt pod not in running state") +// } +// if pod.Status.ContainerStatuses[0].State.Running == nil { +// return fmt.Errorf("maestro-mqtt container not in running state") +// } +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("patch the nginx A resource", func() { +// newRes := helper.NewAPIResource(agentTestOpts.consumerName, deployA, 2) +// Eventually(func() error { +// patchedResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdPatch(ctx, *resourceA.Id). +// ResourcePatchRequest(openapi.ResourcePatchRequest{Version: resourceA.Version, Manifest: newRes.Manifest}).Execute() +// if err != nil { +// return err +// } +// if resp.StatusCode != http.StatusOK { +// return fmt.Errorf("unexpected status code, expected 200, got %d", resp.StatusCode) +// } +// if *patchedResource.Version != *resourceA.Version+1 { +// return fmt.Errorf("unexpected version, expected %d, got %d", *resourceA.Version+1, *patchedResource.Version) +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("ensure the nginx A resource is not updated", func() { +// Consistently(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) +// if err != nil { +// return nil +// } +// if *deploy.Spec.Replicas != 1 { +// return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 1, got %d", deployA, *deploy.Spec.Replicas) +// } +// return nil +// }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("delete the nginx B resource", func() { +// resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceB.Id).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) +// }) + +// It("ensure the nginx B resource is not deleted", func() { +// Consistently(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) +// if err != nil { +// if errors.IsNotFound(err) { +// return fmt.Errorf("nginx B deployment %s is deleted", deployB) +// } +// } +// return nil +// }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("post the nginx C resource to the maestro api", func() { +// res := helper.NewAPIResource(agentTestOpts.consumerName, deployC, 1) +// var resp *http.Response +// var err error +// resourceC, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusCreated)) +// Expect(*resourceC.Id).ShouldNot(BeEmpty()) +// }) + +// It("ensure the nginx C resource is not created", func() { +// Consistently(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) +// if err == nil { +// return fmt.Errorf("nginx C deployment %s is created", deployC) +// } +// return nil +// }, 10*time.Second, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("recreate the mqtt-broker service for agent", func() { +// mqttAgentService := &corev1.Service{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: "maestro-mqtt-agent", +// Namespace: serverTestOpts.serverNamespace, +// }, +// Spec: corev1.ServiceSpec{ +// Selector: map[string]string{ +// "name": "maestro-mqtt", +// }, +// Ports: []corev1.ServicePort{ +// { +// Name: "mosquitto", +// Protocol: corev1.ProtocolTCP, +// Port: 1883, +// TargetPort: intstr.FromInt(1883), +// }, +// }, +// Type: corev1.ServiceTypeClusterIP, +// }, +// } + +// _, err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Create(ctx, mqttAgentService, metav1.CreateOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("recreate the grpc-broker service for agent", func() { +// grpcBrokerService := &corev1.Service{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: "maestro-grpc-broker", +// Namespace: serverTestOpts.serverNamespace, +// }, +// Spec: corev1.ServiceSpec{ +// Selector: map[string]string{ +// "app": "maestro", +// }, +// Ports: []corev1.ServicePort{ +// { +// Name: "grpc-broker", +// Protocol: corev1.ProtocolTCP, +// Port: 8091, +// TargetPort: intstr.FromInt(8091), +// }, +// }, +// Type: corev1.ServiceTypeClusterIP, +// }, +// } +// _, err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Create(ctx, grpcBrokerService, metav1.CreateOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("ensure the nginx A resource is updated", func() { +// Eventually(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) +// if err != nil { +// return err +// } +// if *deploy.Spec.Replicas != 2 { +// return fmt.Errorf("unexpected replicas for nginx A deployment %s, expected 2, got %d", deployA, *deploy.Spec.Replicas) +// } +// return nil +// }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("ensure the nginx B resource is deleted", func() { +// Eventually(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployB, metav1.GetOptions{}) +// if err != nil { +// if errors.IsNotFound(err) { +// return nil +// } +// return err +// } +// return fmt.Errorf("nginx B deployment %s still exists", deployB) +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("ensure the nginx C resource is created", func() { +// Eventually(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) +// if err != nil { +// return err +// } +// if *deploy.Spec.Replicas != 1 { +// return fmt.Errorf("unexpected replicas for nginx C deployment %s, expected 1, got %d", deployC, *deploy.Spec.Replicas) +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("delete the nginx A and nginx C resources", func() { +// resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceA.Id).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + +// Eventually(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployA, metav1.GetOptions{}) +// if err != nil { +// if errors.IsNotFound(err) { +// return nil +// } +// return err +// } +// return fmt.Errorf("nginx A deployment %s still exists", deployA) +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + +// resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resourceC.Id).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + +// Eventually(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, deployC, metav1.GetOptions{}) +// if err != nil { +// if errors.IsNotFound(err) { +// return nil +// } +// return err +// } +// return fmt.Errorf("nginx C deployment %s still exists", deployC) +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) +// }) +// }) diff --git a/test/e2e/pkg/status_resync_test.go b/test/e2e/pkg/status_resync_test.go index a2292333..0b500274 100644 --- a/test/e2e/pkg/status_resync_test.go +++ b/test/e2e/pkg/status_resync_test.go @@ -14,7 +14,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/rand" ) @@ -24,7 +23,7 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status var resource *openapi.Resource name := fmt.Sprintf("nginx-%s", rand.String(5)) It("post the nginx resource with non-default service account to the maestro api", func() { - res := helper.NewAPIResourceWithSA(consumer.Name, name, name, 1) + res := helper.NewAPIResourceWithSA(agentTestOpts.consumerName, name, name, 1) var resp *http.Response var err error resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() @@ -33,7 +32,7 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status Expect(*resource.Id).ShouldNot(BeEmpty()) Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) + deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) if err != nil { return err } @@ -66,20 +65,20 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status }) It("shut down maestro server", func() { - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro").Get(ctx, "maestro", metav1.GetOptions{}) + deploy, err := serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Get(ctx, "maestro", metav1.GetOptions{}) Expect(err).ShouldNot(HaveOccurred()) maestroServerReplicas = int(*deploy.Spec.Replicas) // patch maestro server replicas to 0 - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testConsumer.ClientSet", + deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ + FieldManager: "serverTestOpts.kubeClientSet", }) Expect(err).ShouldNot(HaveOccurred()) Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) // ensure no running maestro server pods Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ + pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ LabelSelector: "app=maestro", }) if err != nil { @@ -93,7 +92,7 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status }) It("create serviceaccount for nginx deployment", func() { - _, err := consumer.ClientSet.CoreV1().ServiceAccounts("default").Create(ctx, &corev1.ServiceAccount{ + _, err := agentTestOpts.kubeClientSet.CoreV1().ServiceAccounts("default").Create(ctx, &corev1.ServiceAccount{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -101,21 +100,21 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status Expect(err).ShouldNot(HaveOccurred()) // delete the nginx deployment to tigger recreating - err = consumer.ClientSet.AppsV1().Deployments("default").Delete(ctx, name, metav1.DeleteOptions{}) + err = agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Delete(ctx, name, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) It("restart maestro server", func() { // patch maestro server replicas back - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{ - FieldManager: "testConsumer.ClientSet", + deploy, err := serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, maestroServerReplicas)), metav1.PatchOptions{ + FieldManager: "serverTestOpts.kubeClientSet", }) Expect(err).ShouldNot(HaveOccurred()) Expect(*deploy.Spec.Replicas).To(Equal(int32(maestroServerReplicas))) // ensure maestro server pod is up and running Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ + pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ LabelSelector: "app=maestro", }) if err != nil { @@ -162,7 +161,7 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) + _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { return nil @@ -172,195 +171,195 @@ var _ = Describe("Status Resync After Restart", Ordered, Label("e2e-tests-status return fmt.Errorf("nginx deployment still exists") }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) - err = consumer.ClientSet.CoreV1().ServiceAccounts("default").Delete(ctx, name, metav1.DeleteOptions{}) + err = agentTestOpts.kubeClientSet.CoreV1().ServiceAccounts("default").Delete(ctx, name, metav1.DeleteOptions{}) Expect(err).ShouldNot(HaveOccurred()) }) }) }) -var _ = Describe("Status Resync After Reconnect", Ordered, Label("e2e-tests-status-resync-reconnect"), func() { - Context("Resource resync resource status after maestro server reconnects", func() { - var mqttReplicas int - var resource *openapi.Resource - name := fmt.Sprintf("nginx-%s", rand.String(5)) - It("post the nginx resource with non-default service account to the maestro api", func() { - res := helper.NewAPIResourceWithSA(consumer.Name, name, name, 1) - var resp *http.Response - var err error - resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusCreated)) - Expect(*resource.Id).ShouldNot(BeEmpty()) - - Eventually(func() error { - deploy, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) - if err != nil { - return err - } - if *deploy.Spec.Replicas != 1 { - return fmt.Errorf("unexpected replicas for nginx deployment %s, expected 1, got %d", name, *deploy.Spec.Replicas) - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusOK)) - Expect(*gotResource.Id).To(Equal(*resource.Id)) - Expect(*gotResource.Version).To(Equal(*resource.Version)) - - Eventually(func() error { - gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() - if err != nil { - return err - } - statusJSON, err := json.Marshal(gotResource.Status) - if err != nil { - return err - } - if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") { - return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON)) - } - return nil - }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) - }) - - It("delete the mqtt-broker service for server", func() { - err := consumer.ClientSet.CoreV1().Services("maestro").Delete(ctx, "maestro-mqtt-server", metav1.DeleteOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("create serviceaccount for nginx deployment", func() { - _, err := consumer.ClientSet.CoreV1().ServiceAccounts("default").Create(ctx, &corev1.ServiceAccount{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - }, metav1.CreateOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - - // delete the nginx deployment to tigger recreating - err = consumer.ClientSet.AppsV1().Deployments("default").Delete(ctx, name, metav1.DeleteOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("Rollout the mqtt-broker", func() { - deploy, err := consumer.ClientSet.AppsV1().Deployments("maestro").Get(ctx, "maestro-mqtt", metav1.GetOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - mqttReplicas = int(*deploy.Spec.Replicas) - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ - FieldManager: "testConsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) - - // ensure no running mqtt-broker pods - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "name=maestro-mqtt", - }) - if err != nil { - return err - } - if len(pods.Items) > 0 { - return fmt.Errorf("maestro-mqtt pods still running") - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - - // patch mqtt-broker replicas to mqttReplicas - deploy, err = consumer.ClientSet.AppsV1().Deployments("maestro").Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ - FieldManager: "testConsumer.ClientSet", - }) - Expect(err).ShouldNot(HaveOccurred()) - Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) - - // ensure mqtt-broker pod is up and running - Eventually(func() error { - pods, err := consumer.ClientSet.CoreV1().Pods("maestro").List(ctx, metav1.ListOptions{ - LabelSelector: "name=maestro-mqtt", - }) - if err != nil { - return err - } - if len(pods.Items) != mqttReplicas { - return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) - } - for _, pod := range pods.Items { - if pod.Status.Phase != "Running" { - return fmt.Errorf("maestro-mqtt pod not in running state") - } - if pod.Status.ContainerStatuses[0].State.Running == nil { - return fmt.Errorf("maestro-mqtt container not in running state") - } - } - return nil - }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) - }) - - It("recreate the mqtt-broker service for server", func() { - mqttServerService := &corev1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "maestro-mqtt-server", - Namespace: "maestro", - }, - Spec: corev1.ServiceSpec{ - Selector: map[string]string{ - "name": "maestro-mqtt", - }, - Ports: []corev1.ServicePort{ - { - Name: "mosquitto", - Protocol: corev1.ProtocolTCP, - Port: 1883, - TargetPort: intstr.FromInt(1883), - }, - }, - Type: corev1.ServiceTypeClusterIP, - }, - } - - _, err := consumer.ClientSet.CoreV1().Services("maestro").Create(ctx, mqttServerService, metav1.CreateOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - - It("ensure the resource status is resynced", func() { - Eventually(func() error { - gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() - if err != nil { - return err - } - if _, ok := gotResource.Status["ContentStatus"]; !ok { - return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status) - } - statusJSON, err := json.Marshal(gotResource.Status) - if err != nil { - return err - } - if strings.Contains(string(statusJSON), "error looking up service account default/nginx") { - return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON)) - } - return nil - }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) - }) - - It("delete the nginx resource", func() { - resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resource.Id).Execute() - Expect(err).ShouldNot(HaveOccurred()) - Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) - - Eventually(func() error { - _, err := consumer.ClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - return fmt.Errorf("nginx deployment still exists") - }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) - - err = consumer.ClientSet.CoreV1().ServiceAccounts("default").Delete(ctx, name, metav1.DeleteOptions{}) - Expect(err).ShouldNot(HaveOccurred()) - }) - }) -}) +// var _ = Describe("Status Resync After Reconnect", Ordered, Label("e2e-tests-status-resync-reconnect"), func() { +// Context("Resource resync resource status after maestro server reconnects", func() { +// var mqttReplicas int +// var resource *openapi.Resource +// name := fmt.Sprintf("nginx-%s", rand.String(5)) +// It("post the nginx resource with non-default service account to the maestro api", func() { +// res := helper.NewAPIResourceWithSA(agentTestOpts.consumerName, name, name, 1) +// var resp *http.Response +// var err error +// resource, resp, err = apiClient.DefaultApi.ApiMaestroV1ResourcesPost(ctx).Resource(res).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusCreated)) +// Expect(*resource.Id).ShouldNot(BeEmpty()) + +// Eventually(func() error { +// deploy, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) +// if err != nil { +// return err +// } +// if *deploy.Spec.Replicas != 1 { +// return fmt.Errorf("unexpected replicas for nginx deployment %s, expected 1, got %d", name, *deploy.Spec.Replicas) +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + +// gotResource, resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusOK)) +// Expect(*gotResource.Id).To(Equal(*resource.Id)) +// Expect(*gotResource.Version).To(Equal(*resource.Version)) + +// Eventually(func() error { +// gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() +// if err != nil { +// return err +// } +// statusJSON, err := json.Marshal(gotResource.Status) +// if err != nil { +// return err +// } +// if !strings.Contains(string(statusJSON), "error looking up service account default/nginx") { +// return fmt.Errorf("unexpected status, expected error looking up service account default/nginx, got %s", string(statusJSON)) +// } +// return nil +// }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("delete the mqtt-broker service for server", func() { +// err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Delete(ctx, "maestro-mqtt-server", metav1.DeleteOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("create serviceaccount for nginx deployment", func() { +// _, err := agentTestOpts.kubeClientSet.CoreV1().ServiceAccounts("default").Create(ctx, &corev1.ServiceAccount{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: name, +// }, +// }, metav1.CreateOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) + +// // delete the nginx deployment to tigger recreating +// err = agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Delete(ctx, name, metav1.DeleteOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("Rollout the mqtt-broker", func() { +// deploy, err := serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Get(ctx, "maestro-mqtt", metav1.GetOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// mqttReplicas = int(*deploy.Spec.Replicas) +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(`{"spec":{"replicas":0}}`), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(0))) + +// // ensure no running mqtt-broker pods +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "name=maestro-mqtt", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) > 0 { +// return fmt.Errorf("maestro-mqtt pods still running") +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) + +// // patch mqtt-broker replicas to mqttReplicas +// deploy, err = serverTestOpts.kubeClientSet.AppsV1().Deployments(serverTestOpts.serverNamespace).Patch(ctx, "maestro-mqtt", types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"replicas":%d}}`, mqttReplicas)), metav1.PatchOptions{ +// FieldManager: "serverTestOpts.kubeClientSet", +// }) +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(*deploy.Spec.Replicas).To(Equal(int32(mqttReplicas))) + +// // ensure mqtt-broker pod is up and running +// Eventually(func() error { +// pods, err := serverTestOpts.kubeClientSet.CoreV1().Pods(serverTestOpts.serverNamespace).List(ctx, metav1.ListOptions{ +// LabelSelector: "name=maestro-mqtt", +// }) +// if err != nil { +// return err +// } +// if len(pods.Items) != mqttReplicas { +// return fmt.Errorf("unexpected maestro-mqtt pod count, expected %d, got %d", mqttReplicas, len(pods.Items)) +// } +// for _, pod := range pods.Items { +// if pod.Status.Phase != "Running" { +// return fmt.Errorf("maestro-mqtt pod not in running state") +// } +// if pod.Status.ContainerStatuses[0].State.Running == nil { +// return fmt.Errorf("maestro-mqtt container not in running state") +// } +// } +// return nil +// }, 1*time.Minute, 1*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("recreate the mqtt-broker service for server", func() { +// mqttServerService := &corev1.Service{ +// ObjectMeta: metav1.ObjectMeta{ +// Name: "maestro-mqtt-server", +// Namespace: serverTestOpts.serverNamespace, +// }, +// Spec: corev1.ServiceSpec{ +// Selector: map[string]string{ +// "name": "maestro-mqtt", +// }, +// Ports: []corev1.ServicePort{ +// { +// Name: "mosquitto", +// Protocol: corev1.ProtocolTCP, +// Port: 1883, +// TargetPort: intstr.FromInt(1883), +// }, +// }, +// Type: corev1.ServiceTypeClusterIP, +// }, +// } + +// _, err := serverTestOpts.kubeClientSet.CoreV1().Services(serverTestOpts.serverNamespace).Create(ctx, mqttServerService, metav1.CreateOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) + +// It("ensure the resource status is resynced", func() { +// Eventually(func() error { +// gotResource, _, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, *resource.Id).Execute() +// if err != nil { +// return err +// } +// if _, ok := gotResource.Status["ContentStatus"]; !ok { +// return fmt.Errorf("unexpected status, expected contains ContentStatus, got %v", gotResource.Status) +// } +// statusJSON, err := json.Marshal(gotResource.Status) +// if err != nil { +// return err +// } +// if strings.Contains(string(statusJSON), "error looking up service account default/nginx") { +// return fmt.Errorf("unexpected status, should not contain error looking up service account default/nginx, got %s", string(statusJSON)) +// } +// return nil +// }, 3*time.Minute, 3*time.Second).ShouldNot(HaveOccurred()) +// }) + +// It("delete the nginx resource", func() { +// resp, err := apiClient.DefaultApi.ApiMaestroV1ResourcesIdDelete(ctx, *resource.Id).Execute() +// Expect(err).ShouldNot(HaveOccurred()) +// Expect(resp.StatusCode).To(Equal(http.StatusNoContent)) + +// Eventually(func() error { +// _, err := agentTestOpts.kubeClientSet.AppsV1().Deployments("default").Get(ctx, name, metav1.GetOptions{}) +// if err != nil { +// if errors.IsNotFound(err) { +// return nil +// } +// return err +// } +// return fmt.Errorf("nginx deployment still exists") +// }, 2*time.Minute, 2*time.Second).ShouldNot(HaveOccurred()) + +// err = agentTestOpts.kubeClientSet.CoreV1().ServiceAccounts("default").Delete(ctx, name, metav1.DeleteOptions{}) +// Expect(err).ShouldNot(HaveOccurred()) +// }) +// }) +// }) diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index 0af54542..e33b5e4b 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -15,6 +15,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/kubernetes" @@ -32,21 +33,35 @@ import ( "github.com/openshift-online/maestro/test" ) +type agentTestOptions struct { + agentNamespace string + consumerName string + kubeConfig string + kubeClientSet kubernetes.Interface + workClientSet workclientset.Interface +} + +type serverTestOptions struct { + serverNamespace string + kubeConfig string + kubeClientSet kubernetes.Interface +} + var ( + serverTestOpts *serverTestOptions + agentTestOpts *agentTestOptions apiServerAddress string - grpcServerAddress string - grpcClient pbv1.CloudEventServiceClient - workClient workv1client.WorkV1Interface apiClient *openapi.APIClient - sourceID string + grpcServerAddress string + grpcCertDir string grpcConn *grpc.ClientConn + grpcClient pbv1.CloudEventServiceClient grpcOptions *grpcoptions.GRPCOptions - consumer *ConsumerOptions + sourceID string + sourceWorkClient workv1client.WorkV1Interface helper *test.Helper cancel context.CancelFunc ctx context.Context - grpcCertDir string - kubeWorkClient workclientset.Interface ) func TestE2E(t *testing.T) { @@ -56,12 +71,16 @@ func TestE2E(t *testing.T) { } func init() { - consumer = &ConsumerOptions{} + serverTestOpts = &serverTestOptions{} + agentTestOpts = &agentTestOptions{} klog.SetOutput(GinkgoWriter) - flag.StringVar(&apiServerAddress, "api-server", "", "Maestro API server address") + flag.StringVar(&apiServerAddress, "api-server", "", "Maestro Restful API server address") flag.StringVar(&grpcServerAddress, "grpc-server", "", "Maestro gRPC server address") - flag.StringVar(&consumer.Name, "consumer-name", "", "Consumer name is used to identify the consumer") - flag.StringVar(&consumer.KubeConfig, "consumer-kubeconfig", "", "Path to kubeconfig file") + flag.StringVar(&serverTestOpts.serverNamespace, "server-namespace", "maestro", "Namespace where the maestro server is running") + flag.StringVar(&serverTestOpts.kubeConfig, "server-kubeconfig", "", "Path to the kubeconfig file for the maestro server") + flag.StringVar(&agentTestOpts.agentNamespace, "agent-namespace", "maestro-agent", "Namespace where the maestro agent is running") + flag.StringVar(&agentTestOpts.consumerName, "consumer-name", "", "Consumer name is used to identify the consumer") + flag.StringVar(&agentTestOpts.kubeConfig, "agent-kubeconfig", "", "Path to the kubeconfig file for the maestro agent") } var _ = BeforeSuite(func() { @@ -89,45 +108,57 @@ var _ = BeforeSuite(func() { } apiClient = openapi.NewAPIClient(cfg) - grpcCertDir, err := os.MkdirTemp("/tmp", "maestro-grpc-certs-") + var err error + grpcCertDir, err = os.MkdirTemp("/tmp", "maestro-grpc-certs-") Expect(err).To(Succeed()) - // validate the consumer kubeconfig and name - restConfig, err := clientcmd.BuildConfigFromFlags("", consumer.KubeConfig) - Expect(err).To(Succeed()) - consumer.ClientSet, err = kubernetes.NewForConfig(restConfig) + // validate the server kubeconfig and initialize the kube client + serverRestConfig, err := clientcmd.BuildConfigFromFlags("", serverTestOpts.kubeConfig) Expect(err).To(Succeed()) - kubeWorkClient, err = workclientset.NewForConfig(restConfig) + serverTestOpts.kubeClientSet, err = kubernetes.NewForConfig(serverRestConfig) Expect(err).To(Succeed()) - Expect(consumer.Name).NotTo(BeEmpty(), "consumer name is not provided") - grpcCertSrt, err := consumer.ClientSet.CoreV1().Secrets("maestro").Get(ctx, "maestro-grpc-cert", metav1.GetOptions{}) + // validate the agent consumer name && kubeconfig and initialize the kube client & work client + Expect(agentTestOpts.consumerName).NotTo(BeEmpty(), "consumer name is not provided") + agentRestConfig, err := clientcmd.BuildConfigFromFlags("", agentTestOpts.kubeConfig) Expect(err).To(Succeed()) - grpcServerCAFile := fmt.Sprintf("%s/ca.crt", grpcCertDir) - grpcClientCert := fmt.Sprintf("%s/client.crt", grpcCertDir) - grpcClientKey := fmt.Sprintf("%s/client.key", grpcCertDir) - Expect(os.WriteFile(grpcServerCAFile, grpcCertSrt.Data["ca.crt"], 0644)).To(Succeed()) - Expect(os.WriteFile(grpcClientCert, grpcCertSrt.Data["client.crt"], 0644)).To(Succeed()) - Expect(os.WriteFile(grpcClientKey, grpcCertSrt.Data["client.key"], 0644)).To(Succeed()) - - grpcClientTokenSrt, err := consumer.ClientSet.CoreV1().Secrets("maestro").Get(ctx, "grpc-client-token", metav1.GetOptions{}) + agentTestOpts.kubeClientSet, err = kubernetes.NewForConfig(agentRestConfig) + Expect(err).To(Succeed()) + agentTestOpts.workClientSet, err = workclientset.NewForConfig(agentRestConfig) Expect(err).To(Succeed()) - grpcClientTokenFile := fmt.Sprintf("%s/token", grpcCertDir) - Expect(os.WriteFile(grpcClientTokenFile, grpcClientTokenSrt.Data["token"], 0644)).To(Succeed()) + // initialize the grpc source options grpcOptions = grpcoptions.NewGRPCOptions() grpcOptions.URL = grpcServerAddress - grpcOptions.CAFile = grpcServerCAFile - grpcOptions.TokenFile = grpcClientTokenFile sourceID = "sourceclient-test" + rand.String(5) + grpcCertSrt, err := serverTestOpts.kubeClientSet.CoreV1().Secrets(serverTestOpts.serverNamespace).Get(ctx, "maestro-grpc-cert", metav1.GetOptions{}) + if !errors.IsNotFound(err) { + // retrieve the grpc cert from the maestro server and write to the grpc cert dir + grpcServerCAFile := fmt.Sprintf("%s/ca.crt", grpcCertDir) + grpcClientCert := fmt.Sprintf("%s/client.crt", grpcCertDir) + grpcClientKey := fmt.Sprintf("%s/client.key", grpcCertDir) + Expect(os.WriteFile(grpcServerCAFile, grpcCertSrt.Data["ca.crt"], 0644)).To(Succeed()) + Expect(os.WriteFile(grpcClientCert, grpcCertSrt.Data["client.crt"], 0644)).To(Succeed()) + Expect(os.WriteFile(grpcClientKey, grpcCertSrt.Data["client.key"], 0644)).To(Succeed()) + grpcClientTokenSrt, err := serverTestOpts.kubeClientSet.CoreV1().Secrets(serverTestOpts.serverNamespace).Get(ctx, "grpc-client-token", metav1.GetOptions{}) + Expect(err).To(Succeed()) + grpcClientTokenFile := fmt.Sprintf("%s/token", grpcCertDir) + Expect(os.WriteFile(grpcClientTokenFile, grpcClientTokenSrt.Data["token"], 0644)).To(Succeed()) + // set CAFile and TokenFile for grpc authz + grpcOptions.CAFile = grpcServerCAFile + grpcOptions.TokenFile = grpcClientTokenFile + // create the clusterrole for grpc authz + Expect(helper.CreateGRPCAuthRule(ctx, serverTestOpts.kubeClientSet, "grpc-pub-sub", "source", sourceID, []string{"pub", "sub"})).To(Succeed()) + + grpcConn, err = helper.CreateGRPCConn(grpcServerAddress, grpcServerCAFile, grpcClientTokenFile) + Expect(err).To(Succeed()) + } else { + grpcConn, err = helper.CreateGRPCConn(grpcServerAddress, "", "") + Expect(err).To(Succeed()) + } - // create the clusterrole for grpc authz - Expect(helper.CreateGRPCAuthRule(ctx, consumer.ClientSet, "grpc-pub-sub", "source", sourceID, []string{"pub", "sub"})).To(Succeed()) - grpcConn, err = helper.CreateGRPCConn(grpcServerAddress, grpcServerCAFile, grpcClientTokenFile) - Expect(err).To(Succeed()) grpcClient = pbv1.NewCloudEventServiceClient(grpcConn) - - workClient, err = grpcsource.NewMaestroGRPCSourceWorkClient( + sourceWorkClient, err = grpcsource.NewMaestroGRPCSourceWorkClient( ctx, apiClient, grpcOptions, @@ -144,17 +175,11 @@ var _ = AfterSuite(func() { cancel() }) -type ConsumerOptions struct { - Name string - KubeConfig string - ClientSet *kubernetes.Clientset -} - func dumpDebugInfo() { // dump the maestro server logs - dumpPodLogs(ctx, consumer.ClientSet, "app=maestro", "maestro") + dumpPodLogs(ctx, serverTestOpts.kubeClientSet, "app=maestro", serverTestOpts.serverNamespace) // dump the maestro agent ogs - dumpPodLogs(ctx, consumer.ClientSet, "app=maestro-agent", "maestro-agent") + dumpPodLogs(ctx, agentTestOpts.kubeClientSet, "app=maestro-agent", agentTestOpts.agentNamespace) } func dumpPodLogs(ctx context.Context, kubeClient kubernetes.Interface, podSelector, podNamespace string) error { diff --git a/test/factories.go b/test/factories.go index 85a6f97e..96c51328 100755 --- a/test/factories.go +++ b/test/factories.go @@ -500,36 +500,41 @@ func (helper *Helper) CreateGRPCAuthRule(ctx context.Context, kubeClient kuberne } func (helper *Helper) CreateGRPCConn(serverAddr, serverCAFile, tokenFile string) (*grpc.ClientConn, error) { - certPool, err := x509.SystemCertPool() - if err != nil { - return nil, err - } + if serverCAFile == "" || tokenFile == "" { + // no TLS and authz + return grpc.Dial(serverAddr, grpc.WithInsecure()) + } else { + certPool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } - caPEM, err := os.ReadFile(serverCAFile) - if err != nil { - return nil, err - } + caPEM, err := os.ReadFile(serverCAFile) + if err != nil { + return nil, err + } - ok := certPool.AppendCertsFromPEM(caPEM) - if !ok { - return nil, fmt.Errorf("failed to append server CA certificate") - } + ok := certPool.AppendCertsFromPEM(caPEM) + if !ok { + return nil, fmt.Errorf("failed to append server CA certificate") + } - tlsConfig := &tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS13, - MaxVersion: tls.VersionTLS13, - } + tlsConfig := &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, + } - token, err := os.ReadFile(tokenFile) - if err != nil { - return nil, err - } + token, err := os.ReadFile(tokenFile) + if err != nil { + return nil, err + } - perRPCCred := oauth.TokenSource{ - TokenSource: oauth2.StaticTokenSource(&oauth2.Token{ - AccessToken: string(token), - })} + perRPCCred := oauth.TokenSource{ + TokenSource: oauth2.StaticTokenSource(&oauth2.Token{ + AccessToken: string(token), + })} - return grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), grpc.WithPerRPCCredentials(perRPCCred)) + return grpc.Dial(serverAddr, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), grpc.WithPerRPCCredentials(perRPCCred)) + } }