Skip to content

Commit

Permalink
feat(SliceGwReconciler): Add PodDisruptionBudget logic
Browse files Browse the repository at this point in the history
The SliceGwReconciler should handle the lifecycle of PodDisruptionBudget
objects that match the slice gateway deployments.

Signed-off-by: Bhargav Ravuri <[email protected]>
  • Loading branch information
Bhargav-InfraCloud committed Feb 23, 2024
1 parent 051dbde commit 210cff2
Show file tree
Hide file tree
Showing 5 changed files with 356 additions and 0 deletions.
8 changes: 8 additions & 0 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ rules:
- patch
- update
- watch
- apiGroups:
- policy
resources:
- poddisruptionbudgets
verbs:
- create
- delete
- list
- apiGroups:
- rbac.authorization.k8s.io
resources:
Expand Down
58 changes: 58 additions & 0 deletions controllers/slicegateway/pod_disruption_budget.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package slicegateway

import (
"context"
"fmt"

"github.com/kubeslice/worker-operator/controllers"
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
policyv1 "k8s.io/api/policy/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// formatPodDisruptionBudget creates the PodDisruptionBudget's manifest with labels matching the slice gateway pods.
func formatPodDisruptionBudget(slice, sliceGateway string, minAvailable intstr.IntOrString) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%s-pdb", sliceGateway),
Namespace: controllers.ControlPlaneNamespace,
Labels: map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: slice,
controllers.SliceGatewaySelectorLabelKey: sliceGateway,
},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: &minAvailable,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: slice,
webhook.PodInjectLabelKey: "slicegateway",
controllers.SliceGatewaySelectorLabelKey: sliceGateway,
},
},
},
}
}

// listPodDisruptionBudgetForSliceGateway lists the PodDisruptionBudget objects that match the slice gateway pods.
func listPodDisruptionBudgetForSliceGateway(ctx context.Context, kubeClient client.Client,
sliceName, sliceGwName string) ([]policyv1.PodDisruptionBudget, error) {
// Options for listing the PDBs that match the slice and slice gateway
listOpts := []client.ListOption{
client.MatchingLabels(map[string]string{
controllers.ApplicationNamespaceSelectorLabelKey: sliceName,
controllers.SliceGatewaySelectorLabelKey: sliceGwName,
}),
client.InNamespace(controllers.ControlPlaneNamespace),
}

// List PDBs from cluster that match the slice and slice gateway
pdbList := policyv1.PodDisruptionBudgetList{}
if err := kubeClient.List(ctx, &pdbList, listOpts...); err != nil {
return nil, err
}

return pdbList.Items, nil
}
3 changes: 3 additions & 0 deletions controllers/slicegateway/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -79,6 +80,7 @@ type SliceGwReconciler struct {
//+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch
//+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch;
//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=list;create;delete

func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var sliceGwNodePorts []int
Expand Down Expand Up @@ -491,6 +493,7 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&kubeslicev1beta1.SliceGateway{}).
Owns(&appsv1.Deployment{}).
Owns(&corev1.Service{}).
Owns(&policyv1.PodDisruptionBudget{}).
Watches(&source.Kind{Type: &corev1.Pod{}},
handler.EnqueueRequestsFromMapFunc(r.findSliceGwObjectsToReconcile),
builder.WithPredicates(sliceGwUpdPredicate),
Expand Down
64 changes: 64 additions & 0 deletions controllers/slicegateway/slicegateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
"github.com/kubeslice/worker-operator/controllers"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1375,6 +1376,18 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli
}
}

// Create PodDisruptionBudget for slice gateway's pod to at least have 1 instance of pods on each worker
// when disruption has occurred.
//
// Note: This should run an attempt to create PDB regardless of whether current reconciliation creating deployments
// as the request could've been requeued due to failure at the creation of PDB.
if err = r.createPodDisruptionBudgetForSliceGatewayPods(ctx, log, r.Client, sliceName, sliceGw); err != nil {
log.Error(err, "Failed to create PodDisruptionBudget for SliceGW deployments",
"SliceName", sliceName, "SliceGwName", sliceGwName)

return ctrl.Result{}, err, true
}

// Reconcile deployment to node port mapping for gw client deployments
if isClient(sliceGw) {
for _, deployment := range deployments.Items {
Expand Down Expand Up @@ -1534,3 +1547,54 @@ func (r *SliceGwReconciler) ReconcileIntermediateGatewayDeployments(ctx context.

return ctrl.Result{}, nil, false
}

// createPodDisruptionBudgetForSliceGatewayPods checks for PodDisruptionBudget objects in the cluster that match the
// slice gateway pods, and if missing, it creates a PDB with minimum availability of 1 so at least one pod remains in
// case of a disruption.
func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx context.Context, log logr.Logger,
client client.Client, sliceName string, sliceGateway *kubeslicev1beta1.SliceGateway) error {
log = log.WithValues("sliceName", sliceName, "sliceGwName", sliceGateway.Name)

// List PDBs in cluster that match the slice gateway pods
pdbs, err := listPodDisruptionBudgetForSliceGateway(ctx, client, sliceName, sliceGateway.Name)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to list PodDisruptionBudgets that match the slice gateway")

// When some unexpected error occurred, return the error for requeuing the request
return err
}

// Check if PDB already exists that matches the current slice gateway
if len(pdbs) > 0 {
log.Info("PodDisruptionBudget matching the slice gateway already exists. Skipping creation.")

return nil
}

// Create PDB manifest with minimum availability of 1 pod
pdb := formatPodDisruptionBudget(sliceName, sliceGateway.Name, intstr.FromInt(1))

// Set SliceGateway instance as the owner and controller for PDB
ctrl.SetControllerReference(sliceGateway, pdb, r.Scheme)

// Create PDB for slice gateway's pod to have at least 1 pod on each worker when disruption occurs
if err = client.Create(ctx, pdb); err != nil {
if apierrors.IsAlreadyExists(err) {
log.Info("PodDisruptionBudget matching the slice gateway already exists. Skipping creation",
"pdb", pdb.Name)

// PDB is already exists. So, ignoring the current request.
return nil
}

log.Error(err, "PodDisruptionBudget creation failed", "pdb", pdb.Name)

// When any other unexpected error occurred when attempting to create PDB, fail the request
return fmt.Errorf("failed to create PodDisruptionBudget for SliceGW pods: %v", err)
}

// PDB created successfully
log.Info("PodDisruptionBudget for slice gateway pods created successfully")

return nil
}
Loading

0 comments on commit 210cff2

Please sign in to comment.