diff --git a/controllers/slicegateway/slicegateway.go b/controllers/slicegateway/slicegateway.go index 6ac3d10d6..b4e779b63 100644 --- a/controllers/slicegateway/slicegateway.go +++ b/controllers/slicegateway/slicegateway.go @@ -26,7 +26,6 @@ import ( "net" "os" "strconv" - "strings" "sync" "time" @@ -55,6 +54,7 @@ import ( ) var ( + vpnClientFileName = "openvpn_client.ovpn" gwSidecarImage = os.Getenv("AVESHA_GW_SIDECAR_IMAGE") gwSidecarImagePullPolicy = os.Getenv("AVESHA_GW_SIDECAR_IMAGE_PULLPOLICY") @@ -383,7 +383,6 @@ func (r *SliceGwReconciler) deploymentForGatewayClient(g *kubeslicev1beta1.Slice var vpnSecretDefaultMode int32 = 0644 - certFileName := "openvpn_client.ovpn" sidecarImg := "nexus.dev.aveshalabs.io/kubeslice/gw-sidecar:1.0.0" sidecarPullPolicy := corev1.PullAlways @@ -546,23 +545,7 @@ func (r *SliceGwReconciler) deploymentForGatewayClient(g *kubeslicev1beta1.Slice Command: []string{ "/usr/local/bin/waitForConfigToRunCmd.sh", }, - Args: []string{ - "/vpnclient/" + certFileName, - "90", - "openvpn", - "--remote", - g.Status.Config.SliceGatewayRemoteGatewayID, - "--port", - strconv.Itoa(remotePortNumber), - "--ping-restart", - "15", - "--proto", - strings.ToLower(g.Status.Config.SliceGatewayProtocol), - "--txqueuelen", - "5000", - "--config", - "/vpnclient/" + certFileName, - }, + Args: getOVPNClientContainerArgs(remotePortNumber, g), SecurityContext: &corev1.SecurityContext{ Privileged: &privileged, AllowPrivilegeEscalation: &privileged, @@ -1392,13 +1375,43 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli for _, deployment := range deployments.Items { found, nodePortInUse := getClientGwRemotePortInUse(ctx, r.Client, sliceGw, deployment.Name) if found { - _, foundInMap := gwClientToRemotePortMap.Load(deployment.Name) - if !foundInMap { + // Check if the portInUse is valid. + // It is valid only if the list of remoteNodePorts in the slicegw object contains the portInUse. + if !checkIfNodePortIsValid(sliceGw.Status.Config.SliceGatewayRemoteNodePorts, nodePortInUse) { + // Get a valid port number for this deployment + portNumToUpdate, err := allocateNodePortToClient(sliceGw.Status.Config.SliceGatewayRemoteNodePorts, deployment.Name, &gwClientToRemotePortMap) + if err != nil { + return ctrl.Result{}, err, true + } + // Update the port map + gwClientToRemotePortMap.Store(deployment.Name, portNumToUpdate) + err = r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portNumToUpdate) + if err != nil { + return ctrl.Result{}, err, true + } + // Requeue if the update was successful + return ctrl.Result{}, nil, true + } + // At this point, the node port in use is valid. Check if the port map is populated. Only case + // where it is not populated is if the operator restarts. The populated value must match the + // port in use. If not, the deploy needs to be updated to match the state stored in the operator. + portInMap, foundInMap := gwClientToRemotePortMap.Load(deployment.Name) + if foundInMap { + if portInMap != nodePortInUse { + // Update the deployment since the port numbers do not match + err := r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portInMap.(int)) + if err != nil { + return ctrl.Result{}, err, true + } + // Requeue if the update was successful + return ctrl.Result{}, nil, true + } + } else { gwClientToRemotePortMap.Store(deployment.Name, nodePortInUse) } - // TODO: Handle the case of the port number in the deployment and the one in the port map being different } } + } // Delete any deployments marked for deletion. We could have an external orchestrator (like the workerslicegatewayrecycler) request @@ -1599,3 +1612,30 @@ func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx con return nil } + +// updateGatewayDeploymentNodePort updates the gateway client deployments with the relevant updated ports +// from the workersliceconfig +func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, c client.Client, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error { + containers := deployment.Spec.Template.Spec.Containers + for contIndex, cont := range containers { + if cont.Name == "kubeslice-sidecar" { + for index, key := range cont.Env { + if key.Name == "NODE_PORT" { + updatedEnvVar := corev1.EnvVar{ + Name: "NODE_PORT", + Value: strconv.Itoa(nodePort), + } + cont.Env[index] = updatedEnvVar + } + } + } else if cont.Name == "kubeslice-openvpn-client" { + containers[contIndex].Args = getOVPNClientContainerArgs(nodePort, g) + } + } + deployment.Spec.Template.Spec.Containers = containers + err := r.Update(ctx, deployment) + if err != nil { + return err + } + return nil +} diff --git a/controllers/slicegateway/utils.go b/controllers/slicegateway/utils.go index aa9c45580..1938411f1 100644 --- a/controllers/slicegateway/utils.go +++ b/controllers/slicegateway/utils.go @@ -22,16 +22,16 @@ import ( "context" "errors" "fmt" - "os" - "strconv" - "strings" - gwsidecarpb "github.com/kubeslice/gateway-sidecar/pkg/sidecar/sidecarpb" kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" "github.com/kubeslice/worker-operator/controllers" ossEvents "github.com/kubeslice/worker-operator/events" "github.com/kubeslice/worker-operator/pkg/utils" webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" + "os" + "strconv" + "strings" + "sync" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -77,15 +77,24 @@ func getGwSvcNameFromDepName(depName string) string { return "svc-" + depName } -func contains(s []string, e string) bool { - for _, a := range s { - if a == e { +func contains[T comparable](s []T, e T) bool { + for _, element := range s { + if element == e { return true } } return false } +func containsWithIndex[T comparable](s []T, e T) (bool, int) { + for index, element := range s { + if element == e { + return true, index + } + } + return false, 0 +} + func getPodIPs(slicegateway *kubeslicev1beta1.SliceGateway) []string { podIPs := make([]string, 0) for i := range slicegateway.Status.GatewayPodStatus { @@ -426,3 +435,44 @@ func (r *SliceGwReconciler) cleanupSliceGwResources(ctx context.Context, slicegw return nil } + +// getOVPNClientContainerArgs returns the args needed for the ovpn client deployment container +func getOVPNClientContainerArgs(remotePortNumber int, g *kubeslicev1beta1.SliceGateway) []string { + args := []string{ + "/vpnclient/" + vpnClientFileName, + "90", + "openvpn", + "--remote", + g.Status.Config.SliceGatewayRemoteGatewayID, + "--port", + strconv.Itoa(remotePortNumber), + "--ping-restart", + "15", + "--proto", + strings.ToLower(g.Status.Config.SliceGatewayProtocol), + "--txqueuelen", + "5000", + "--config", + "/vpnclient/" + vpnClientFileName, + } + return args +} + +// a helper to assign distinct port to each client deployment +func allocateNodePortToClient(correctNodePorts []int, depName string, nodePortsMap *sync.Map) (int, error) { + nodePortsMap.Range(func(k, v interface{}) bool { + if ok, index := containsWithIndex(correctNodePorts, v.(int)); ok { + correctNodePorts = append(correctNodePorts[:index], correctNodePorts[index+1:]...) + } + return true + }) + if len(correctNodePorts) > 0 { + return correctNodePorts[0], nil + } else { + port, ok := nodePortsMap.Load(depName) + if ok { + return port.(int), nil + } + return 0, errors.New("could not allocate a port") + } +} diff --git a/tests/spoke/slicegw_controller_test.go b/tests/spoke/slicegw_controller_test.go index 417cd9690..dbd04bb69 100644 --- a/tests/spoke/slicegw_controller_test.go +++ b/tests/spoke/slicegw_controller_test.go @@ -21,15 +21,11 @@ package spoke_test import ( "context" "fmt" - "reflect" - "time" - - nsmv1 "github.com/networkservicemesh/sdk-k8s/pkg/tools/k8s/apis/networkservicemesh.io/v1" - kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" "github.com/kubeslice/worker-operator/controllers" slicegatewaycontroller "github.com/kubeslice/worker-operator/controllers/slicegateway" webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" + nsmv1 "github.com/networkservicemesh/sdk-k8s/pkg/tools/k8s/apis/networkservicemesh.io/v1" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" @@ -39,7 +35,10 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" + "reflect" _ "sigs.k8s.io/controller-runtime/pkg/client" + "strconv" + "time" ) var sliceGwFinalizer = []string{ @@ -164,7 +163,8 @@ var _ = Describe("Worker SlicegwController", func() { createdSliceGw = &kubeslicev1beta1.SliceGateway{} createdPodDisruptionBudget = &policyv1.PodDisruptionBudget{} founddepl := &appsv1.Deployment{} - deplKey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS} + deplKey1 := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS} + deplKey2 := types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS} DeferCleanup(func() { ctx := context.Background() @@ -185,13 +185,18 @@ var _ = Describe("Worker SlicegwController", func() { }, time.Second*40, time.Millisecond*250).Should(BeTrue()) Expect(k8sClient.Delete(ctx, appPod)).Should(Succeed()) Eventually(func() bool { - err := k8sClient.Get(ctx, deplKey, founddepl) + err := k8sClient.Get(ctx, deplKey1, founddepl) + if err != nil { + return errors.IsNotFound(err) + } + Expect(k8sClient.Delete(ctx, founddepl)).Should(Succeed()) + err = k8sClient.Get(ctx, deplKey2, founddepl) if err != nil { return errors.IsNotFound(err) } Expect(k8sClient.Delete(ctx, founddepl)).Should(Succeed()) return true - }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + }, time.Second*50, time.Millisecond*250).Should(BeTrue()) Expect(k8sClient.Delete(ctx, svc)).Should(Succeed()) Eventually(func() bool { err := k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, svc) @@ -773,8 +778,104 @@ var _ = Describe("Worker SlicegwController", func() { //ip should be same as remote node IP Expect(endpointFound.Subsets[0].Addresses[0].IP).To(Equal(createdSliceGw.Status.Config.SliceGatewayRemoteNodeIPs[0])) }) - }) + It("Should restart the gw client deployment if there is mismatch in the nodePorts", func() { + ctx := context.Background() + Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) + Expect(k8sClient.Create(ctx, slice)).Should(Succeed()) + Expect(k8sClient.Create(ctx, vl3ServiceEndpoint)).Should(Succeed()) + Expect(k8sClient.Create(ctx, sliceGw)).Should(Succeed()) + Expect(k8sClient.Create(ctx, appPod)).Should(Succeed()) + sliceKey := types.NamespacedName{Name: "test-slice-4", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + if err != nil { + return err + } + // Update the minimum required values in the slice cr status field + if createdSlice.Status.SliceConfig == nil { + createdSlice.Status.SliceConfig = &kubeslicev1beta1.SliceConfig{ + SliceDisplayName: slice.Name, + SliceSubnet: "192.168.0.0/16", + } + } + if err := k8sClient.Status().Update(ctx, createdSlice); err != nil { + return err + } + return nil + }) + Expect(err).To(BeNil()) + Expect(createdSlice.Status.SliceConfig).NotTo(BeNil()) + slicegwkey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + createdSliceGw.Status.Config.SliceGatewayHostType = "Client" + createdSliceGw.Status.Config.SliceGatewayRemoteGatewayID = "remote-gateway-id" + createdSliceGw.Status.Config.SliceGatewayRemoteNodeIPs = []string{"192.168.1.1"} + createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts = []int{8080, 8090} + + Eventually(func() bool { + err := k8sClient.Status().Update(ctx, createdSliceGw) + return err == nil + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) + return err == nil + }, time.Second*10, time.Millisecond*250).Should(BeTrue()) + + founddepl := &appsv1.Deployment{} + deplKey := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + + founddepl = &appsv1.Deployment{} + deplKey = types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS} + + createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts = []int{6080, 7090} + + Eventually(func() bool { + err := k8sClient.Status().Update(ctx, createdSliceGw) + return err == nil + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + time.Sleep(time.Second * 30) + var portFromDep int + Eventually(func(portFromDep *int) []int { + err := k8sClient.Get(ctx, deplKey, founddepl) + if err != nil { + return []int{} + } + cont := founddepl.Spec.Template.Spec.Containers[0] + for _, key := range cont.Env { + if key.Name == "NODE_PORT" { + *portFromDep, err = strconv.Atoi(key.Value) + if err != nil { + fmt.Println("error converting string to int") + return []int{} + } + + } + } + return createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts + }(&portFromDep), time.Second*120, time.Millisecond*250).Should(ContainElement(portFromDep)) + }) + }) Context("With SliceGw CR deleted", func() { BeforeEach(func() {