From 43771262ddaefd9d92cd1489fd8bed795a5b96a1 Mon Sep 17 00:00:00 2001 From: Sanskar Bhushan Date: Thu, 20 Apr 2023 00:00:29 +0530 Subject: [PATCH 1/3] Adding functionality to delete successfully completed jobs in a FIFO fashion based on the given history limits in a cron controller Signed-off-by: Sanskar Bhushan --- controllers/apps/cron_controller.go | 46 +++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/controllers/apps/cron_controller.go b/controllers/apps/cron_controller.go index 1aef6094..e1b69a7a 100644 --- a/controllers/apps/cron_controller.go +++ b/controllers/apps/cron_controller.go @@ -461,6 +461,52 @@ func (cc *CronController) newEmptyWorkload(apiVersion, kind string) (client.Obje return nil, fmt.Errorf("workload %+v has not implemented client.Object interface", groupVersion) } +// Deletes successfully complete jobs based on given history limits. +func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron) error { + historyLimit := cron.Spec.HistoryLimit + if historyLimit == nil { + return nil + } + + completedWorkloads, err := cc.listCompletedWorkloads(cron) + if err != nil { + return err + } + + if len(completedWorkloads) <= int(*historyLimit) { + return nil + } + + // Sort completed workloads by creation timestamp. + sort.Slice(completedWorkloads, func(i, j int) bool { + return completedWorkloads[i].GetCreationTimestamp().Before(&completedWorkloads[j].GetCreationTimestamp()) + }) + + // Delete the oldest completed workloads. + for _, wl := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] { + if err := cc.deleteWorkload(cron, wl); err != nil { + return err + } + } + return nil +} + +// Helper function to list completed workloads (jobs) for a given Cron object. +func (cc *CronController) listCompletedWorkloads(cron *v1alpha1.Cron) ([]*batchv1.Job, error) { + jobList, err := cc.jobLister.Jobs(cron.Namespace).List(labels.SelectorFromSet(cron.Spec.JobLabelSelector)) + if err != nil { + return nil, err + } + + var completedWorkloads []*batchv1.Job + for _, job := range jobList { + if job.Status.Succeeded >= *job.Spec.Completions { + completedWorkloads = append(completedWorkloads, job) + } + } + return completedWorkloads, nil +} + func (cc *CronController) deleteWorkload(cron *v1alpha1.Cron, ref corev1.ObjectReference) error { wl, err := cc.newEmptyWorkload(ref.APIVersion, ref.Kind) if err != nil { From 0c306c65748a7d0c62a7d597448bfc931b1d1ba6 Mon Sep 17 00:00:00 2001 From: Sanskar Bhushan Date: Mon, 24 Apr 2023 06:05:08 +0530 Subject: [PATCH 2/3] made some ammendments to implement deleteCompletedJobsBeyondThreshold Signed-off-by: Sanskar Bhushan --- apis/apps/v1alpha1/cron_types.go | 2 +- config/crd/bases/apps.kubedl.io_crons.yaml | 13 ++-- controllers/apps/cron_controller.go | 70 +++++++++++++++++----- controllers/apps/cron_utils.go | 13 ++-- 4 files changed, 72 insertions(+), 26 deletions(-) diff --git a/apis/apps/v1alpha1/cron_types.go b/apis/apps/v1alpha1/cron_types.go index 0719c7a0..d2de403d 100644 --- a/apis/apps/v1alpha1/cron_types.go +++ b/apis/apps/v1alpha1/cron_types.go @@ -60,7 +60,7 @@ type CronStatus struct { type CronHistory struct { // Object is the reference of the historical scheduled cron job. - Object corev1.TypedLocalObjectReference `json:"object"` + Object corev1.ObjectReference `json:"object"` // Status is the final status when job finished. Status v1.JobConditionType `json:"status"` // Created is the creation timestamp of job. diff --git a/config/crd/bases/apps.kubedl.io_crons.yaml b/config/crd/bases/apps.kubedl.io_crons.yaml index 83c0e264..2def0417 100644 --- a/config/crd/bases/apps.kubedl.io_crons.yaml +++ b/config/crd/bases/apps.kubedl.io_crons.yaml @@ -86,15 +86,20 @@ spec: type: string object: properties: - apiGroup: + apiVersion: + type: string + fieldPath: type: string kind: type: string name: type: string - required: - - kind - - name + namespace: + type: string + resourceVersion: + type: string + uid: + type: string type: object status: type: string diff --git a/controllers/apps/cron_controller.go b/controllers/apps/cron_controller.go index e1b69a7a..e0d99f25 100644 --- a/controllers/apps/cron_controller.go +++ b/controllers/apps/cron_controller.go @@ -136,6 +136,11 @@ func (cc *CronController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.R if nextDuration != nil { return ctrl.Result{RequeueAfter: *nextDuration}, nil } + + if err = cc.deleteCompletedJobsBeyondThreshold(&cron); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } @@ -461,14 +466,14 @@ func (cc *CronController) newEmptyWorkload(apiVersion, kind string) (client.Obje return nil, fmt.Errorf("workload %+v has not implemented client.Object interface", groupVersion) } -// Deletes successfully complete jobs based on given history limits. +// Deletes successfully completed jobs beyond given history limits. func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron) error { historyLimit := cron.Spec.HistoryLimit if historyLimit == nil { return nil } - completedWorkloads, err := cc.listCompletedWorkloads(cron) + completedWorkloads, err := cc.listSuccessfullyCompletedWorkloads(cron) if err != nil { return err } @@ -479,32 +484,65 @@ func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron // Sort completed workloads by creation timestamp. sort.Slice(completedWorkloads, func(i, j int) bool { - return completedWorkloads[i].GetCreationTimestamp().Before(&completedWorkloads[j].GetCreationTimestamp()) + return completedWorkloads[i].GetCreationTimestamp().Time.Before(completedWorkloads[j].GetCreationTimestamp().Time) }) + // Create a new Scheme object. + s := runtime.NewScheme() + + // Register the required Kubernetes API types with the Scheme object. + corev1.AddToScheme(s) + v1alpha1.AddToScheme(s) + // Delete the oldest completed workloads. - for _, wl := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] { - if err := cc.deleteWorkload(cron, wl); err != nil { + for _, wlObj := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] { + // Convert to ObjectReference + var wlRef corev1.ObjectReference + if err := s.Convert(wlObj, &wlRef, nil); err != nil { + return err + } + + if err := cc.deleteWorkload(cron, wlRef); err != nil { return err } } + return nil } -// Helper function to list completed workloads (jobs) for a given Cron object. -func (cc *CronController) listCompletedWorkloads(cron *v1alpha1.Cron) ([]*batchv1.Job, error) { - jobList, err := cc.jobLister.Jobs(cron.Namespace).List(labels.SelectorFromSet(cron.Spec.JobLabelSelector)) - if err != nil { - return nil, err - } +// listSuccessfullyCompletedWorkloads returns a list of successfully completed workloads. +func (cc *CronController) listSuccessfullyCompletedWorkloads(cron *v1alpha1.Cron) ([]metav1.Object, error) { + workloads := make([]metav1.Object, 0) - var completedWorkloads []*batchv1.Job - for _, job := range jobList { - if job.Status.Succeeded >= *job.Spec.Completions { - completedWorkloads = append(completedWorkloads, job) + for _, history := range cron.Status.History { + if history.Status == v1.JobSucceeded { + wl, err := cc.newEmptyWorkload(cron.APIVersion, history.Object.Kind) + if err != nil { + klog.Errorf("unsupported cron workload and failed to init by scheme, kind: %s, err: %v", + history.Object.Kind, err) + continue + } + if err := cc.client.Get(context.Background(), types.NamespacedName{ + Name: history.Object.Name, + Namespace: history.Object.Namespace, + }, wl); err != nil { + if errors.IsNotFound(err) { + klog.Infof("completed workload[%s/%s] in cron[%s/%s] has been deleted.", + history.Object.Namespace, history.Object.Name, cron.Namespace, cron.Name) + continue + } + return nil, err + } + metaWl, ok := wl.(metav1.Object) + if !ok { + klog.Warningf("workload [%s/%s] cannot convert to metav1.Object", cron.Namespace, history.Object.Name) + continue + } + workloads = append(workloads, metaWl) } } - return completedWorkloads, nil + + return workloads, nil } func (cc *CronController) deleteWorkload(cron *v1alpha1.Cron, ref corev1.ObjectReference) error { diff --git a/controllers/apps/cron_utils.go b/controllers/apps/cron_utils.go index 652894f7..c294daaf 100644 --- a/controllers/apps/cron_utils.go +++ b/controllers/apps/cron_utils.go @@ -129,16 +129,19 @@ func inActiveList(active []corev1.ObjectReference, workload metav1.Object) bool return false } -func workloadToHistory(wl metav1.Object, apiGroup, kind string) v1alpha1.CronHistory { +func workloadToHistory(wl metav1.Object, apiVersion, kind string) v1alpha1.CronHistory { status, finished := IsWorkloadFinished(wl) created := wl.GetCreationTimestamp() ch := v1alpha1.CronHistory{ Created: &created, Status: status, - Object: corev1.TypedLocalObjectReference{ - APIGroup: &apiGroup, - Kind: kind, - Name: wl.GetName(), + Object: corev1.ObjectReference{ + APIVersion: apiVersion, + Kind: kind, + Name: wl.GetName(), + Namespace: wl.GetNamespace(), + ResourceVersion: wl.GetResourceVersion(), + UID: wl.GetUID(), }, } if finished { From cf295752e690d69f11d45e093d3fe47a5ededf0f Mon Sep 17 00:00:00 2001 From: Sanskar Bhushan Date: Fri, 5 May 2023 10:30:28 +0530 Subject: [PATCH 3/3] used static scheme instead of creating a new one Signed-off-by: Sanskar Bhushan --- apis/apps/v1alpha1/zz_generated.deepcopy.go | 2 +- controllers/apps/cron_controller.go | 9 +-------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 5290795b..00fd6e16 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -56,7 +56,7 @@ func (in *Cron) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CronHistory) DeepCopyInto(out *CronHistory) { *out = *in - in.Object.DeepCopyInto(&out.Object) + out.Object = in.Object if in.Created != nil { in, out := &in.Created, &out.Created *out = (*in).DeepCopy() diff --git a/controllers/apps/cron_controller.go b/controllers/apps/cron_controller.go index e0d99f25..d9fbf5bc 100644 --- a/controllers/apps/cron_controller.go +++ b/controllers/apps/cron_controller.go @@ -487,18 +487,11 @@ func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron return completedWorkloads[i].GetCreationTimestamp().Time.Before(completedWorkloads[j].GetCreationTimestamp().Time) }) - // Create a new Scheme object. - s := runtime.NewScheme() - - // Register the required Kubernetes API types with the Scheme object. - corev1.AddToScheme(s) - v1alpha1.AddToScheme(s) - // Delete the oldest completed workloads. for _, wlObj := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] { // Convert to ObjectReference var wlRef corev1.ObjectReference - if err := s.Convert(wlObj, &wlRef, nil); err != nil { + if err := cc.scheme.Convert(wlObj, &wlRef, nil); err != nil { return err }