Skip to content

Commit

Permalink
fix(): reconcile gateway client's node port.
Browse files Browse the repository at this point in the history
  • Loading branch information
narmidm authored Apr 17, 2024
2 parents c16590e + 5c9b367 commit 82d7622
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 38 deletions.
84 changes: 62 additions & 22 deletions controllers/slicegateway/slicegateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"net"
"os"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -55,6 +54,7 @@ import (
)

var (
vpnClientFileName = "openvpn_client.ovpn"
gwSidecarImage = os.Getenv("AVESHA_GW_SIDECAR_IMAGE")
gwSidecarImagePullPolicy = os.Getenv("AVESHA_GW_SIDECAR_IMAGE_PULLPOLICY")

Expand Down Expand Up @@ -383,7 +383,6 @@ func (r *SliceGwReconciler) deploymentForGatewayClient(g *kubeslicev1beta1.Slice

var vpnSecretDefaultMode int32 = 0644

certFileName := "openvpn_client.ovpn"
sidecarImg := "nexus.dev.aveshalabs.io/kubeslice/gw-sidecar:1.0.0"
sidecarPullPolicy := corev1.PullAlways

Expand Down Expand Up @@ -546,23 +545,7 @@ func (r *SliceGwReconciler) deploymentForGatewayClient(g *kubeslicev1beta1.Slice
Command: []string{
"/usr/local/bin/waitForConfigToRunCmd.sh",
},
Args: []string{
"/vpnclient/" + certFileName,
"90",
"openvpn",
"--remote",
g.Status.Config.SliceGatewayRemoteGatewayID,
"--port",
strconv.Itoa(remotePortNumber),
"--ping-restart",
"15",
"--proto",
strings.ToLower(g.Status.Config.SliceGatewayProtocol),
"--txqueuelen",
"5000",
"--config",
"/vpnclient/" + certFileName,
},
Args: getOVPNClientContainerArgs(remotePortNumber, g),
SecurityContext: &corev1.SecurityContext{
Privileged: &privileged,
AllowPrivilegeEscalation: &privileged,
Expand Down Expand Up @@ -1392,13 +1375,43 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
for _, deployment := range deployments.Items {
found, nodePortInUse := getClientGwRemotePortInUse(ctx, r.Client, sliceGw, deployment.Name)
if found {
_, foundInMap := gwClientToRemotePortMap.Load(deployment.Name)
if !foundInMap {
// Check if the portInUse is valid.
// It is valid only if the list of remoteNodePorts in the slicegw object contains the portInUse.
if !checkIfNodePortIsValid(sliceGw.Status.Config.SliceGatewayRemoteNodePorts, nodePortInUse) {
// Get a valid port number for this deployment
portNumToUpdate, err := allocateNodePortToClient(sliceGw.Status.Config.SliceGatewayRemoteNodePorts, deployment.Name, &gwClientToRemotePortMap)
if err != nil {
return ctrl.Result{}, err, true
}
// Update the port map
gwClientToRemotePortMap.Store(deployment.Name, portNumToUpdate)
err = r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portNumToUpdate)
if err != nil {
return ctrl.Result{}, err, true
}
// Requeue if the update was successful
return ctrl.Result{}, nil, true
}
// At this point, the node port in use is valid. Check if the port map is populated. Only case
// where it is not populated is if the operator restarts. The populated value must match the
// port in use. If not, the deploy needs to be updated to match the state stored in the operator.
portInMap, foundInMap := gwClientToRemotePortMap.Load(deployment.Name)
if foundInMap {
if portInMap != nodePortInUse {
// Update the deployment since the port numbers do not match
err := r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portInMap.(int))
if err != nil {
return ctrl.Result{}, err, true
}
// Requeue if the update was successful
return ctrl.Result{}, nil, true
}
} else {
gwClientToRemotePortMap.Store(deployment.Name, nodePortInUse)
}
// TODO: Handle the case of the port number in the deployment and the one in the port map being different
}
}

}

// Delete any deployments marked for deletion. We could have an external orchestrator (like the workerslicegatewayrecycler) request
Expand Down Expand Up @@ -1599,3 +1612,30 @@ func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx con

return nil
}

// updateGatewayDeploymentNodePort updates the gateway client deployments with the relevant updated ports
// from the workersliceconfig
func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, c client.Client, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error {
containers := deployment.Spec.Template.Spec.Containers
for contIndex, cont := range containers {
if cont.Name == "kubeslice-sidecar" {
for index, key := range cont.Env {
if key.Name == "NODE_PORT" {
updatedEnvVar := corev1.EnvVar{
Name: "NODE_PORT",
Value: strconv.Itoa(nodePort),
}
cont.Env[index] = updatedEnvVar
}
}
} else if cont.Name == "kubeslice-openvpn-client" {
containers[contIndex].Args = getOVPNClientContainerArgs(nodePort, g)
}
}
deployment.Spec.Template.Spec.Containers = containers
err := r.Update(ctx, deployment)
if err != nil {
return err
}
return nil
}
64 changes: 57 additions & 7 deletions controllers/slicegateway/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"

gwsidecarpb "github.com/kubeslice/gateway-sidecar/pkg/sidecar/sidecarpb"
kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
"github.com/kubeslice/worker-operator/controllers"
ossEvents "github.com/kubeslice/worker-operator/events"
"github.com/kubeslice/worker-operator/pkg/utils"
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
"os"
"strconv"
"strings"
"sync"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -77,15 +77,24 @@ func getGwSvcNameFromDepName(depName string) string {
return "svc-" + depName
}

func contains(s []string, e string) bool {
for _, a := range s {
if a == e {
func contains[T comparable](s []T, e T) bool {
for _, element := range s {
if element == e {
return true
}
}
return false
}

func containsWithIndex[T comparable](s []T, e T) (bool, int) {
for index, element := range s {
if element == e {
return true, index
}
}
return false, 0
}

func getPodIPs(slicegateway *kubeslicev1beta1.SliceGateway) []string {
podIPs := make([]string, 0)
for i := range slicegateway.Status.GatewayPodStatus {
Expand Down Expand Up @@ -426,3 +435,44 @@ func (r *SliceGwReconciler) cleanupSliceGwResources(ctx context.Context, slicegw

return nil
}

// getOVPNClientContainerArgs returns the args needed for the ovpn client deployment container
func getOVPNClientContainerArgs(remotePortNumber int, g *kubeslicev1beta1.SliceGateway) []string {
args := []string{
"/vpnclient/" + vpnClientFileName,
"90",
"openvpn",
"--remote",
g.Status.Config.SliceGatewayRemoteGatewayID,
"--port",
strconv.Itoa(remotePortNumber),
"--ping-restart",
"15",
"--proto",
strings.ToLower(g.Status.Config.SliceGatewayProtocol),
"--txqueuelen",
"5000",
"--config",
"/vpnclient/" + vpnClientFileName,
}
return args
}

// a helper to assign distinct port to each client deployment
func allocateNodePortToClient(correctNodePorts []int, depName string, nodePortsMap *sync.Map) (int, error) {
nodePortsMap.Range(func(k, v interface{}) bool {
if ok, index := containsWithIndex(correctNodePorts, v.(int)); ok {
correctNodePorts = append(correctNodePorts[:index], correctNodePorts[index+1:]...)
}
return true
})
if len(correctNodePorts) > 0 {
return correctNodePorts[0], nil
} else {
port, ok := nodePortsMap.Load(depName)
if ok {
return port.(int), nil
}
return 0, errors.New("could not allocate a port")
}
}
119 changes: 110 additions & 9 deletions tests/spoke/slicegw_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,11 @@ package spoke_test
import (
"context"
"fmt"
"reflect"
"time"

nsmv1 "github.com/networkservicemesh/sdk-k8s/pkg/tools/k8s/apis/networkservicemesh.io/v1"

kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
"github.com/kubeslice/worker-operator/controllers"
slicegatewaycontroller "github.com/kubeslice/worker-operator/controllers/slicegateway"
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
nsmv1 "github.com/networkservicemesh/sdk-k8s/pkg/tools/k8s/apis/networkservicemesh.io/v1"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -39,7 +35,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/retry"
"reflect"
_ "sigs.k8s.io/controller-runtime/pkg/client"
"strconv"
"time"
)

var sliceGwFinalizer = []string{
Expand Down Expand Up @@ -164,7 +163,8 @@ var _ = Describe("Worker SlicegwController", func() {
createdSliceGw = &kubeslicev1beta1.SliceGateway{}
createdPodDisruptionBudget = &policyv1.PodDisruptionBudget{}
founddepl := &appsv1.Deployment{}
deplKey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS}
deplKey1 := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS}
deplKey2 := types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS}

DeferCleanup(func() {
ctx := context.Background()
Expand All @@ -185,13 +185,18 @@ var _ = Describe("Worker SlicegwController", func() {
}, time.Second*40, time.Millisecond*250).Should(BeTrue())
Expect(k8sClient.Delete(ctx, appPod)).Should(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, deplKey, founddepl)
err := k8sClient.Get(ctx, deplKey1, founddepl)
if err != nil {
return errors.IsNotFound(err)
}
Expect(k8sClient.Delete(ctx, founddepl)).Should(Succeed())
err = k8sClient.Get(ctx, deplKey2, founddepl)
if err != nil {
return errors.IsNotFound(err)
}
Expect(k8sClient.Delete(ctx, founddepl)).Should(Succeed())
return true
}, time.Second*40, time.Millisecond*250).Should(BeTrue())
}, time.Second*50, time.Millisecond*250).Should(BeTrue())
Expect(k8sClient.Delete(ctx, svc)).Should(Succeed())
Eventually(func() bool {
err := k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, svc)
Expand Down Expand Up @@ -773,8 +778,104 @@ var _ = Describe("Worker SlicegwController", func() {
//ip should be same as remote node IP
Expect(endpointFound.Subsets[0].Addresses[0].IP).To(Equal(createdSliceGw.Status.Config.SliceGatewayRemoteNodeIPs[0]))
})
})
It("Should restart the gw client deployment if there is mismatch in the nodePorts", func() {
ctx := context.Background()
Expect(k8sClient.Create(ctx, svc)).Should(Succeed())
Expect(k8sClient.Create(ctx, slice)).Should(Succeed())
Expect(k8sClient.Create(ctx, vl3ServiceEndpoint)).Should(Succeed())
Expect(k8sClient.Create(ctx, sliceGw)).Should(Succeed())
Expect(k8sClient.Create(ctx, appPod)).Should(Succeed())
sliceKey := types.NamespacedName{Name: "test-slice-4", Namespace: CONTROL_PLANE_NS}
Eventually(func() bool {
err := k8sClient.Get(ctx, sliceKey, createdSlice)
return err == nil
}, time.Second*250, time.Millisecond*250).Should(BeTrue())

err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := k8sClient.Get(ctx, sliceKey, createdSlice)
if err != nil {
return err
}
// Update the minimum required values in the slice cr status field
if createdSlice.Status.SliceConfig == nil {
createdSlice.Status.SliceConfig = &kubeslicev1beta1.SliceConfig{
SliceDisplayName: slice.Name,
SliceSubnet: "192.168.0.0/16",
}
}
if err := k8sClient.Status().Update(ctx, createdSlice); err != nil {
return err
}
return nil
})
Expect(err).To(BeNil())
Expect(createdSlice.Status.SliceConfig).NotTo(BeNil())

slicegwkey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS}
Eventually(func() bool {
err := k8sClient.Get(ctx, slicegwkey, createdSliceGw)
return err == nil
}, time.Second*250, time.Millisecond*250).Should(BeTrue())

createdSliceGw.Status.Config.SliceGatewayHostType = "Client"
createdSliceGw.Status.Config.SliceGatewayRemoteGatewayID = "remote-gateway-id"
createdSliceGw.Status.Config.SliceGatewayRemoteNodeIPs = []string{"192.168.1.1"}
createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts = []int{8080, 8090}

Eventually(func() bool {
err := k8sClient.Status().Update(ctx, createdSliceGw)
return err == nil
}, time.Second*30, time.Millisecond*250).Should(BeTrue())

Eventually(func() bool {
err := k8sClient.Get(ctx, slicegwkey, createdSliceGw)
return err == nil
}, time.Second*10, time.Millisecond*250).Should(BeTrue())

founddepl := &appsv1.Deployment{}
deplKey := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS}

Eventually(func() bool {
err := k8sClient.Get(ctx, deplKey, founddepl)
return err == nil
}, time.Second*40, time.Millisecond*250).Should(BeTrue())

founddepl = &appsv1.Deployment{}
deplKey = types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS}

createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts = []int{6080, 7090}

Eventually(func() bool {
err := k8sClient.Status().Update(ctx, createdSliceGw)
return err == nil
}, time.Second*30, time.Millisecond*250).Should(BeTrue())

Eventually(func() bool {
err := k8sClient.Get(ctx, deplKey, founddepl)
return err == nil
}, time.Second*40, time.Millisecond*250).Should(BeTrue())
time.Sleep(time.Second * 30)
var portFromDep int
Eventually(func(portFromDep *int) []int {
err := k8sClient.Get(ctx, deplKey, founddepl)
if err != nil {
return []int{}
}
cont := founddepl.Spec.Template.Spec.Containers[0]
for _, key := range cont.Env {
if key.Name == "NODE_PORT" {
*portFromDep, err = strconv.Atoi(key.Value)
if err != nil {
fmt.Println("error converting string to int")
return []int{}
}

}
}
return createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts
}(&portFromDep), time.Second*120, time.Millisecond*250).Should(ContainElement(portFromDep))
})
})
Context("With SliceGw CR deleted", func() {

BeforeEach(func() {
Expand Down

0 comments on commit 82d7622

Please sign in to comment.