Skip to content

Commit

Permalink
fix(): Fixed tunnel status reporting in the slicegw CR (#406)
Browse files Browse the repository at this point in the history
Signed-off-by: Bharath Horatti <[email protected]>
  • Loading branch information
bharath-avesha authored Nov 4, 2024
1 parent 0bb59aa commit 7015e35
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 35 deletions.
14 changes: 11 additions & 3 deletions api/v1beta1/slicegateway_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,14 @@ type GwPodInfo struct {
PeerPodName string `json:"peerPodName,omitempty"`
PodIP string `json:"podIP,omitempty"`
LocalNsmIP string `json:"localNsmIP,omitempty"`
TunnelStatus TunnelStatus `json:"tunnelStatus,omitempty"`
RouteRemoved int32 `json:"routeRemoved,omitempty"`
// TunnelStatus is the status of the tunnel between this gw pod and its peer
TunnelStatus TunnelStatus `json:"tunnelStatus,omitempty"`
RouteRemoved int32 `json:"routeRemoved,omitempty"`
// RemotePort is the port number this gw pod is connected to on the remote cluster.
// Applicable only for gw clients. Would be set to 0 for gw servers.
RemotePort int32 `json:"remotePort,omitempty"`
}

type TunnelStatus struct {
IntfName string `json:"IntfName,omitempty"`
LocalIP string `json:"LocalIP,omitempty"`
Expand All @@ -143,7 +148,10 @@ type TunnelStatus struct {
TxRate uint64 `json:"TxRate,omitempty"`
RxRate uint64 `json:"RxRate,omitempty"`
PacketLoss uint64 `json:"PacketLoss,omitempty"`
Status int32 `json:"Status,omitempty"`
// Status is the status of the tunnel. 0: DOWN, 1: UP
Status int32 `json:"Status,omitempty"`
// TunnelState is the state of the tunnel in string format: UP, DOWN, UNKNOWN
TunnelState string `json:"TunnelState,omitempty"`
}

func init() {
Expand Down
14 changes: 14 additions & 0 deletions config/crd/bases/networking.kubeslice.io_slicegateways.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,18 @@ spec:
type: string
podName:
type: string
remotePort:
description: |-
RemotePort is the port number this gw pod is connected to on the remote cluster.
Applicable only for gw clients. Would be set to 0 for gw servers.
format: int32
type: integer
routeRemoved:
format: int32
type: integer
tunnelStatus:
description: TunnelStatus is the status of the tunnel between
this gw pod and its peer
properties:
IntfName:
type: string
Expand All @@ -190,8 +198,14 @@ spec:
format: int64
type: integer
Status:
description: 'Status is the status of the tunnel. 0: DOWN,
1: UP'
format: int32
type: integer
TunnelState:
description: 'TunnelState is the state of the tunnel in
string format: UP, DOWN, UNKNOWN'
type: string
TxRate:
format: int64
type: integer
Expand Down
55 changes: 43 additions & 12 deletions controllers/slicegateway/slicegateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func labelsForSliceGwDeployment(name, slice, depName string) map[string]string {
}
}

func labelsForSliceGwService(name, svcName, depName string) map[string]string {
func labelsForSliceGwService(name, depName string) map[string]string {
return map[string]string{
controllers.SliceGatewaySelectorLabelKey: name,
"kubeslice.io/slice-gw-dep": depName,
Expand Down Expand Up @@ -360,7 +360,7 @@ func (r *SliceGwReconciler) serviceForGateway(g *kubeslicev1beta1.SliceGateway,
},
Spec: corev1.ServiceSpec{
Type: "NodePort",
Selector: labelsForSliceGwService(g.Name, svcName, depName),
Selector: labelsForSliceGwService(g.Name, depName),
Ports: []corev1.ServicePort{{
Port: 11194,
Protocol: proto,
Expand Down Expand Up @@ -661,8 +661,25 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew
return ctrl.Result{}, err, true
}
gwPod.LocalNsmIP = status.NsmStatus.LocalIP
gwPod.TunnelStatus = kubeslicev1beta1.TunnelStatus(status.TunnelStatus)
// this grpc call fails untill the openvpn tunnel connection is not established, so its better to do not reconcile in case of errors, hence the reconciler does not proceedes further
gwPod.TunnelStatus = kubeslicev1beta1.TunnelStatus{
IntfName: status.TunnelStatus.IntfName,
LocalIP: status.TunnelStatus.LocalIP,
RemoteIP: status.TunnelStatus.RemoteIP,
Latency: status.TunnelStatus.Latency,
TxRate: status.TunnelStatus.TxRate,
RxRate: status.TunnelStatus.RxRate,
PacketLoss: status.TunnelStatus.PacketLoss,
Status: int32(status.TunnelStatus.Status),
TunnelState: status.TunnelStatus.TunnelState,
}

if isClient(slicegateway) {
// get the remote port number this gw pod is connected to on the remote cluster
_, remotePortInUse := getClientGwRemotePortInUse(ctx, r.Client, slicegateway, GetDepNameFromPodName(slicegateway.Status.Config.SliceGatewayID, gwPod.PodName))
gwPod.RemotePort = int32(remotePortInUse)
}

// this grpc call fails untill the openvpn tunnel connection is not established, so its better to do not reconcile in case of errors, hence the reconciler does not proceeds further
gwPod.PeerPodName, err = r.getRemoteGwPodName(ctx, slicegateway.Status.Config.SliceGatewayRemoteVpnIP, gwPod.PodIP)
if err != nil {
log.Error(err, "Error getting peer pod name", "PodName", gwPod.PodName, "PodIP", gwPod.PodIP)
Expand All @@ -671,10 +688,11 @@ func (r *SliceGwReconciler) ReconcileGwPodStatus(ctx context.Context, slicegatew
if isGatewayStatusChanged(slicegateway, gwPod) {
toUpdate = true
}
if len(slicegateway.Status.GatewayPodStatus) != len(gwPodsInfo) {
toUpdate = true
}
}
if len(slicegateway.Status.GatewayPodStatus) != len(gwPodsInfo) {
toUpdate = true
}

if toUpdate {
log.Info("gwPodsInfo", "gwPodsInfo", gwPodsInfo)
slicegateway.Status.GatewayPodStatus = gwPodsInfo
Expand Down Expand Up @@ -725,6 +743,10 @@ func (r *SliceGwReconciler) SendConnectionContextAndQosToGwPod(ctx context.Conte

err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
err := r.Get(ctx, req.NamespacedName, slicegateway)
if err != nil {
log.Error(err, "Failed to get SliceGateway")
return err
}
slicegateway.Status.ConnectionContextUpdatedOn = time.Now().Unix()
err = r.Status().Update(ctx, slicegateway)
if err != nil {
Expand Down Expand Up @@ -1094,6 +1116,15 @@ func (r *SliceGwReconciler) gwPodPlacementIsSkewed(ctx context.Context, sliceGw

func (r *SliceGwReconciler) ReconcileGwPodPlacement(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway) error {
log := r.Log

// if the env variable is set, do not perform any gw pod rebalancing. This is useful in clusters where
// the k8s scheduler does not honor the pod anti-affinity rule and places the gw pods on the same node. Such scenarios
// could occur if the node with the kubeslice gateway label is cordoned off or if the node has insufficient resources or
// if the node has some taints that the gw pods cannot tolerate.
if os.Getenv("DISABLE_GW_POD_REBALANCING") == "true" {
return nil
}

// The gw pod rebalancing is always performed on a deployment. We expect the gw pods belonging to a slicegateway
// object between any two clusters to placed on different nodes marked as kubeslice gateway nodes. If they are
// initially placed on the same node due to lack of kubeslice-gateway nodes, the rebalancing algorithim is expected
Expand Down Expand Up @@ -1170,7 +1201,7 @@ func (r *SliceGwReconciler) handleSliceGwSvcCreation(ctx context.Context, sliceG
return ctrl.Result{Requeue: true}, nil, true
}

func (r *SliceGwReconciler) handleSliceGwSvcDeletion(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway, svcName, depName string) error {
func (r *SliceGwReconciler) handleSliceGwSvcDeletion(ctx context.Context, sliceGw *kubeslicev1beta1.SliceGateway, svcName string) error {
log := logger.FromContext(ctx).WithName("slicegw")
serviceFound := corev1.Service{}
err := r.Get(ctx, types.NamespacedName{Namespace: sliceGw.Namespace, Name: svcName}, &serviceFound)
Expand Down Expand Up @@ -1385,7 +1416,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
}
// Update the port map
gwClientToRemotePortMap.Store(deployment.Name, portNumToUpdate)
err = r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portNumToUpdate)
err = r.updateGatewayDeploymentNodePort(ctx, sliceGw, &deployment, portNumToUpdate)
if err != nil {
return ctrl.Result{}, err, true
}
Expand All @@ -1399,7 +1430,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
if foundInMap {
if portInMap != nodePortInUse {
// Update the deployment since the port numbers do not match
err := r.updateGatewayDeploymentNodePort(ctx, r.Client, sliceGw, &deployment, portInMap.(int))
err := r.updateGatewayDeploymentNodePort(ctx, sliceGw, &deployment, portInMap.(int))
if err != nil {
return ctrl.Result{}, err, true
}
Expand All @@ -1425,7 +1456,7 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
if deploymentsToDelete != nil {
for _, depToDelete := range deploymentsToDelete.Items {
// Delete the gw svc associated with the deployment
err := r.handleSliceGwSvcDeletion(ctx, sliceGw, getGwSvcNameFromDepName(depToDelete.Name), depToDelete.Name)
err := r.handleSliceGwSvcDeletion(ctx, sliceGw, getGwSvcNameFromDepName(depToDelete.Name))
if err != nil {
log.Error(err, "Failed to delete gw svc", "svcName", depToDelete.Name)
return ctrl.Result{}, err, true
Expand Down Expand Up @@ -1615,7 +1646,7 @@ func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx con

// updateGatewayDeploymentNodePort updates the gateway client deployments with the relevant updated ports
// from the workersliceconfig
func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, c client.Client, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error {
func (r *SliceGwReconciler) updateGatewayDeploymentNodePort(ctx context.Context, g *kubeslicev1beta1.SliceGateway, deployment *appsv1.Deployment, nodePort int) error {
containers := deployment.Spec.Template.Spec.Containers
for contIndex, cont := range containers {
if cont.Name == "kubeslice-sidecar" {
Expand Down
19 changes: 12 additions & 7 deletions controllers/slicegateway/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ import (
"context"
"errors"
"fmt"
"os"
"strconv"
"strings"
"sync"

gwsidecarpb "github.com/kubeslice/gateway-sidecar/pkg/sidecar/sidecarpb"
kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1"
"github.com/kubeslice/worker-operator/controllers"
ossEvents "github.com/kubeslice/worker-operator/events"
"github.com/kubeslice/worker-operator/pkg/utils"
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
"os"
"strconv"
"strings"
"sync"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -112,6 +113,10 @@ func getPodNames(slicegateway *kubeslicev1beta1.SliceGateway) []string {
}

func GetDepNameFromPodName(sliceGwID, podName string) string {
if sliceGwID == "" || podName == "" {
return ""
}

after, found := strings.CutPrefix(podName, sliceGwID)
if !found {
return ""
Expand Down Expand Up @@ -204,13 +209,13 @@ func getPodPairToRebalance(podsOnNode []corev1.Pod, sliceGw *kubeslicev1beta1.Sl
func GetPeerGwPodName(gwPodName string, sliceGw *kubeslicev1beta1.SliceGateway) (string, error) {
podInfo := findGwPodInfo(sliceGw.Status.GatewayPodStatus, gwPodName)
if podInfo == nil {
return "", errors.New("Gw pod not found")
return "", errors.New("gw pod not found")
}
if podInfo.TunnelStatus.Status != int32(gwsidecarpb.TunnelStatusType_GW_TUNNEL_STATE_UP) {
return "", errors.New("Gw tunnel is down")
return "", errors.New("gw tunnel is down")
}
if podInfo.PeerPodName == "" {
return "", errors.New("Gw peer pod info unavailable")
return "", errors.New("gw peer pod info unavailable")
}

return podInfo.PeerPodName, nil
Expand Down
45 changes: 32 additions & 13 deletions pkg/gwsidecar/gwsidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@ type NsmStatus struct {
IntfName string
LocalIP string
}

type TunnelStatus struct {
IntfName string
LocalIP string
RemoteIP string
Latency uint64
TxRate uint64
RxRate uint64
PacketLoss uint64
Status int32
IntfName string
LocalIP string
RemoteIP string
Latency uint64
TxRate uint64
RxRate uint64
PacketLoss uint64
Status int32
TunnelState string
}

type GwStatus struct {
NsmStatus
TunnelStatus
Expand Down Expand Up @@ -79,6 +82,17 @@ func (worker gwSidecarClient) GetSliceGwRemotePodName(ctx context.Context, gwRem
return res.GatewayPodName, nil
}

func getTunnelState(tunnelState sidecar.TunnelStatusType) string {
switch tunnelState {
case sidecar.TunnelStatusType_GW_TUNNEL_STATE_UP:
return "UP"
case sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN:
return "DOWN"
default:
return "UNKNOWN"
}
}

// GetStatus retrieves sidecar status
func (worker gwSidecarClient) GetStatus(ctx context.Context, serverAddr string) (*GwStatus, error) {
conn, err := grpc.Dial(serverAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
Expand All @@ -103,14 +117,19 @@ func (worker gwSidecarClient) GetStatus(ctx context.Context, serverAddr string)
}
if res.TunnelStatus != nil {
gwStatus.TunnelStatus = TunnelStatus{
IntfName: res.TunnelStatus.NetInterface,
LocalIP: res.TunnelStatus.LocalIP,
RemoteIP: res.TunnelStatus.PeerIP,
PacketLoss: res.TunnelStatus.PacketLoss,
Status: int32(res.TunnelStatus.Status),
IntfName: res.TunnelStatus.NetInterface,
LocalIP: res.TunnelStatus.LocalIP,
RemoteIP: res.TunnelStatus.PeerIP,
Latency: res.TunnelStatus.Latency,
TxRate: res.TunnelStatus.TxRate,
RxRate: res.TunnelStatus.RxRate,
PacketLoss: res.TunnelStatus.PacketLoss,
Status: int32(res.TunnelStatus.Status),
TunnelState: getTunnelState(res.TunnelStatus.Status),
}
} else {
gwStatus.TunnelStatus.Status = int32(sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN)
gwStatus.TunnelStatus.TunnelState = getTunnelState(sidecar.TunnelStatusType_GW_TUNNEL_STATE_DOWN)
}

return gwStatus, err
Expand Down

0 comments on commit 7015e35

Please sign in to comment.