diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index ed2b6157f..e998cd581 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -289,6 +289,14 @@ rules: - patch - update - watch +- apiGroups: + - policy + resources: + - poddisruptionbudgets + verbs: + - create + - delete + - list - apiGroups: - rbac.authorization.k8s.io resources: diff --git a/controllers/slicegateway/pod_disruption_budget.go b/controllers/slicegateway/pod_disruption_budget.go new file mode 100644 index 000000000..b535a65a8 --- /dev/null +++ b/controllers/slicegateway/pod_disruption_budget.go @@ -0,0 +1,58 @@ +package slicegateway + +import ( + "context" + "fmt" + + "github.com/kubeslice/worker-operator/controllers" + webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" + policyv1 "k8s.io/api/policy/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// formatPodDisruptionBudget creates the PodDisruptionBudget's manifest with labels matching the slice gateway pods. +func formatPodDisruptionBudget(slice, sliceGateway string, minAvailable intstr.IntOrString) *policyv1.PodDisruptionBudget { + return &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pdb", sliceGateway), + Namespace: controllers.ControlPlaneNamespace, + Labels: map[string]string{ + controllers.ApplicationNamespaceSelectorLabelKey: slice, + controllers.SliceGatewaySelectorLabelKey: sliceGateway, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + controllers.ApplicationNamespaceSelectorLabelKey: slice, + webhook.PodInjectLabelKey: "slicegateway", + controllers.SliceGatewaySelectorLabelKey: sliceGateway, + }, + }, + }, + } +} + +// listPodDisruptionBudgetForSliceGateway lists the PodDisruptionBudget objects that match the slice gateway pods. +func listPodDisruptionBudgetForSliceGateway(ctx context.Context, kubeClient client.Client, + sliceName, sliceGwName string) ([]policyv1.PodDisruptionBudget, error) { + // Options for listing the PDBs that match the slice and slice gateway + listOpts := []client.ListOption{ + client.MatchingLabels(map[string]string{ + controllers.ApplicationNamespaceSelectorLabelKey: sliceName, + controllers.SliceGatewaySelectorLabelKey: sliceGwName, + }), + client.InNamespace(controllers.ControlPlaneNamespace), + } + + // List PDBs from cluster that match the slice and slice gateway + pdbList := policyv1.PodDisruptionBudgetList{} + if err := kubeClient.List(ctx, &pdbList, listOpts...); err != nil { + return nil, err + } + + return pdbList.Items, nil +} diff --git a/controllers/slicegateway/reconciler.go b/controllers/slicegateway/reconciler.go index f09e50fa0..48afa26b3 100644 --- a/controllers/slicegateway/reconciler.go +++ b/controllers/slicegateway/reconciler.go @@ -29,6 +29,7 @@ import ( webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -78,6 +79,7 @@ type SliceGwReconciler struct { //+kubebuilder:rbac:groups=core,resources=pods,verbs=get;list;watch //+kubebuilder:rbac:groups=core,resources=endpoints,verbs=get;list;watch;create;update;patch;delete //+kubebuilder:rbac:groups=core,resources=nodes,verbs=get;list;watch; +//+kubebuilder:rbac:groups=policy,resources=poddisruptionbudgets,verbs=list;create;delete func (r *SliceGwReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { var sliceGwNodePorts []int @@ -490,6 +492,7 @@ func (r *SliceGwReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&kubeslicev1beta1.SliceGateway{}). Owns(&appsv1.Deployment{}). Owns(&corev1.Service{}). + Owns(&policyv1.PodDisruptionBudget{}). Watches( &corev1.Pod{}, handler.EnqueueRequestsFromMapFunc(r.findSliceGwObjectsToReconcile), diff --git a/controllers/slicegateway/slicegateway.go b/controllers/slicegateway/slicegateway.go index 6016d9c95..964be975d 100644 --- a/controllers/slicegateway/slicegateway.go +++ b/controllers/slicegateway/slicegateway.go @@ -30,6 +30,7 @@ import ( "sync" "time" + "github.com/go-logr/logr" "github.com/kubeslice/worker-operator/controllers" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -1375,6 +1376,18 @@ func (r *SliceGwReconciler) ReconcileGatewayDeployments(ctx context.Context, sli } } + // Create PodDisruptionBudget for slice gateway's pod to at least have 1 instance of pods on each worker + // when disruption has occurred. + // + // Note: This should run an attempt to create PDB regardless of whether current reconciliation creating deployments + // as the request could've been requeued due to failure at the creation of PDB. + if err = r.createPodDisruptionBudgetForSliceGatewayPods(ctx, log, r.Client, sliceName, sliceGw); err != nil { + log.Error(err, "Failed to create PodDisruptionBudget for SliceGW deployments", + "SliceName", sliceName, "SliceGwName", sliceGwName) + + return ctrl.Result{}, err, true + } + // Reconcile deployment to node port mapping for gw client deployments if isClient(sliceGw) { for _, deployment := range deployments.Items { @@ -1534,3 +1547,54 @@ func (r *SliceGwReconciler) ReconcileIntermediateGatewayDeployments(ctx context. return ctrl.Result{}, nil, false } + +// createPodDisruptionBudgetForSliceGatewayPods checks for PodDisruptionBudget objects in the cluster that match the +// slice gateway pods, and if missing, it creates a PDB with minimum availability of 1 so at least one pod remains in +// case of a disruption. +func (r *SliceGwReconciler) createPodDisruptionBudgetForSliceGatewayPods(ctx context.Context, log logr.Logger, + client client.Client, sliceName string, sliceGateway *kubeslicev1beta1.SliceGateway) error { + log = log.WithValues("sliceName", sliceName, "sliceGwName", sliceGateway.Name) + + // List PDBs in cluster that match the slice gateway pods + pdbs, err := listPodDisruptionBudgetForSliceGateway(ctx, client, sliceName, sliceGateway.Name) + if err != nil && !apierrors.IsNotFound(err) { + log.Error(err, "failed to list PodDisruptionBudgets that match the slice gateway") + + // When some unexpected error occurred, return the error for requeuing the request + return err + } + + // Check if PDB already exists that matches the current slice gateway + if len(pdbs) > 0 { + log.Info("PodDisruptionBudget matching the slice gateway already exists. Skipping creation.") + + return nil + } + + // Create PDB manifest with minimum availability of 1 pod + pdb := formatPodDisruptionBudget(sliceName, sliceGateway.Name, intstr.FromInt(1)) + + // Set SliceGateway instance as the owner and controller for PDB + ctrl.SetControllerReference(sliceGateway, pdb, r.Scheme) + + // Create PDB for slice gateway's pod to have at least 1 pod on each worker when disruption occurs + if err = client.Create(ctx, pdb); err != nil { + if apierrors.IsAlreadyExists(err) { + log.Info("PodDisruptionBudget matching the slice gateway already exists. Skipping creation", + "pdb", pdb.Name) + + // PDB is already exists. So, ignoring the current request. + return nil + } + + log.Error(err, "PodDisruptionBudget creation failed", "pdb", pdb.Name) + + // When any other unexpected error occurred when attempting to create PDB, fail the request + return fmt.Errorf("failed to create PodDisruptionBudget for SliceGW pods: %v", err) + } + + // PDB created successfully + log.Info("PodDisruptionBudget for slice gateway pods created successfully") + + return nil +} diff --git a/tests/spoke/slicegw_controller_test.go b/tests/spoke/slicegw_controller_test.go index 3e7c2df83..111fa222b 100644 --- a/tests/spoke/slicegw_controller_test.go +++ b/tests/spoke/slicegw_controller_test.go @@ -27,13 +27,17 @@ import ( nsmv1 "github.com/networkservicemesh/sdk-k8s/pkg/tools/k8s/apis/networkservicemesh.io/v1" kubeslicev1beta1 "github.com/kubeslice/worker-operator/api/v1beta1" + "github.com/kubeslice/worker-operator/controllers" + webhook "github.com/kubeslice/worker-operator/pkg/webhook/pod" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + policyv1 "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/util/retry" _ "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -50,6 +54,9 @@ var _ = Describe("Worker SlicegwController", func() { var createdSlice *kubeslicev1beta1.Slice var vl3ServiceEndpoint *nsmv1.NetworkServiceEndpoint var appPod *corev1.Pod + var podDisruptionBudget *policyv1.PodDisruptionBudget + var createdPodDisruptionBudget *policyv1.PodDisruptionBudget + Context("With SliceGW CR created", func() { BeforeEach(func() { @@ -132,8 +139,31 @@ var _ = Describe("Worker SlicegwController", func() { }, } + minAvailable := intstr.FromInt(1) + podDisruptionBudget = &policyv1.PodDisruptionBudget{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-pdb", sliceGw.Name), + Namespace: CONTROL_PLANE_NS, + Labels: map[string]string{ + controllers.ApplicationNamespaceSelectorLabelKey: slice.Name, + controllers.SliceGatewaySelectorLabelKey: sliceGw.Name, + }, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + MinAvailable: &minAvailable, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + controllers.ApplicationNamespaceSelectorLabelKey: slice.Name, + webhook.PodInjectLabelKey: "slicegateway", + "kubeslice.io/slicegw": sliceGw.Name, + }, + }, + }, + } + createdSlice = &kubeslicev1beta1.Slice{} createdSliceGw = &kubeslicev1beta1.SliceGateway{} + createdPodDisruptionBudget = &policyv1.PodDisruptionBudget{} founddepl := &appsv1.Deployment{} deplKey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS} @@ -168,6 +198,19 @@ var _ = Describe("Worker SlicegwController", func() { err := k8sClient.Get(ctx, types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}, svc) return errors.IsNotFound(err) }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: podDisruptionBudget.Name, + Namespace: podDisruptionBudget.Namespace}, + createdPodDisruptionBudget, + ) + if err != nil { + return errors.IsNotFound(err) + } + Expect(k8sClient.Delete(ctx, createdPodDisruptionBudget)).Should(Succeed()) + return true + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) }) }) @@ -315,6 +358,94 @@ var _ = Describe("Worker SlicegwController", func() { }) + It("Should create a PodDisruptionBudget for gateway server's pods", func() { + ctx := context.Background() + Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) + Expect(k8sClient.Create(ctx, slice)).Should(Succeed()) + Expect(k8sClient.Create(ctx, vl3ServiceEndpoint)).Should(Succeed()) + Expect(k8sClient.Create(ctx, sliceGw)).Should(Succeed()) + Expect(k8sClient.Create(ctx, appPod)).Should(Succeed()) + + sliceKey := types.NamespacedName{Name: "test-slice-4", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + if err != nil { + return err + } + // Update the minimum required values in the slice cr status field + if createdSlice.Status.SliceConfig == nil { + createdSlice.Status.SliceConfig = &kubeslicev1beta1.SliceConfig{ + SliceDisplayName: slice.Name, + SliceSubnet: "192.168.0.0/16", + } + } + if err := k8sClient.Status().Update(ctx, createdSlice); err != nil { + return err + } + return nil + }) + Expect(err).To(BeNil()) + Expect(createdSlice.Status.SliceConfig).NotTo(BeNil()) + + slicegwkey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + createdSliceGw.Status.Config.SliceGatewayHostType = "Server" + Eventually(func() bool { + err := k8sClient.Status().Update(ctx, createdSliceGw) + return err == nil + }, time.Second*10, time.Millisecond*250).Should(BeTrue()) + + foundsvc := &corev1.Service{} + svckey := types.NamespacedName{Name: "svc-test-slicegw-0-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, svckey, foundsvc) + return err == nil + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + foundsvc = &corev1.Service{} + svckey = types.NamespacedName{Name: "svc-test-slicegw-1-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, svckey, foundsvc) + return err == nil + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + founddepl := &appsv1.Deployment{} + deplKey := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + + founddepl = &appsv1.Deployment{} + deplKey = types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: podDisruptionBudget.Name, + Namespace: podDisruptionBudget.Namespace, + }, createdPodDisruptionBudget) + + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + }) + It("Should create a finalizer for the slicegw cr created", func() { ctx := context.Background() @@ -430,6 +561,88 @@ var _ = Describe("Worker SlicegwController", func() { }, time.Second*40, time.Millisecond*250).Should(BeTrue()) }) + + It("Should create a PodDisruptionBudget for gateway client's pods", func() { + ctx := context.Background() + Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) + Expect(k8sClient.Create(ctx, slice)).Should(Succeed()) + Expect(k8sClient.Create(ctx, vl3ServiceEndpoint)).Should(Succeed()) + Expect(k8sClient.Create(ctx, sliceGw)).Should(Succeed()) + Expect(k8sClient.Create(ctx, appPod)).Should(Succeed()) + + sliceKey := types.NamespacedName{Name: "test-slice-4", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + err := retry.RetryOnConflict(retry.DefaultRetry, func() error { + err := k8sClient.Get(ctx, sliceKey, createdSlice) + if err != nil { + return err + } + // Update the minimum required values in the slice cr status field + if createdSlice.Status.SliceConfig == nil { + createdSlice.Status.SliceConfig = &kubeslicev1beta1.SliceConfig{ + SliceDisplayName: slice.Name, + SliceSubnet: "192.168.0.0/16", + } + } + if err := k8sClient.Status().Update(ctx, createdSlice); err != nil { + return err + } + return nil + }) + Expect(err).To(BeNil()) + Expect(createdSlice.Status.SliceConfig).NotTo(BeNil()) + + slicegwkey := types.NamespacedName{Name: "test-slicegw", Namespace: CONTROL_PLANE_NS} + Eventually(func() bool { + err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) + return err == nil + }, time.Second*250, time.Millisecond*250).Should(BeTrue()) + + createdSliceGw.Status.Config.SliceGatewayHostType = "Client" + createdSliceGw.Status.Config.SliceGatewayRemoteGatewayID = "remote-gateway-id" + createdSliceGw.Status.Config.SliceGatewayRemoteNodeIPs = []string{"192.168.1.1"} + createdSliceGw.Status.Config.SliceGatewayRemoteNodePorts = []int{8080, 8090} + + Eventually(func() bool { + err := k8sClient.Status().Update(ctx, createdSliceGw) + return err == nil + }, time.Second*30, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) + return err == nil + }, time.Second*10, time.Millisecond*250).Should(BeTrue()) + + founddepl := &appsv1.Deployment{} + deplKey := types.NamespacedName{Name: "test-slicegw-0-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + + founddepl = &appsv1.Deployment{} + deplKey = types.NamespacedName{Name: "test-slicegw-1-0", Namespace: CONTROL_PLANE_NS} + + Eventually(func() bool { + err := k8sClient.Get(ctx, deplKey, founddepl) + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: podDisruptionBudget.Name, + Namespace: podDisruptionBudget.Namespace, + }, createdPodDisruptionBudget) + + return err == nil + }, time.Second*40, time.Millisecond*250).Should(BeTrue()) + }) + It("Should create create headless service for gw client", func() { ctx := context.Background() Expect(k8sClient.Create(ctx, svc)).Should(Succeed()) @@ -649,6 +862,16 @@ var _ = Describe("Worker SlicegwController", func() { err := k8sClient.Get(ctx, slicegwkey, createdSliceGw) return errors.IsNotFound(err) }, time.Second*10, time.Millisecond*250).Should(BeTrue()) + + createdPodDisruptionBudget = &policyv1.PodDisruptionBudget{} + + Eventually(func() bool { + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: podDisruptionBudget.Name, + Namespace: podDisruptionBudget.Namespace, + }, createdPodDisruptionBudget) + return errors.IsNotFound(err) + }, time.Second*10, time.Millisecond*250).Should(BeTrue()) }) }) })