diff --git a/apis/apps/v1alpha1/cron_types.go b/apis/apps/v1alpha1/cron_types.go index 0719c7a0..d2de403d 100644 --- a/apis/apps/v1alpha1/cron_types.go +++ b/apis/apps/v1alpha1/cron_types.go @@ -60,7 +60,7 @@ type CronStatus struct { type CronHistory struct { // Object is the reference of the historical scheduled cron job. - Object corev1.TypedLocalObjectReference `json:"object"` + Object corev1.ObjectReference `json:"object"` // Status is the final status when job finished. Status v1.JobConditionType `json:"status"` // Created is the creation timestamp of job. diff --git a/apis/apps/v1alpha1/zz_generated.deepcopy.go b/apis/apps/v1alpha1/zz_generated.deepcopy.go index 5290795b..00fd6e16 100644 --- a/apis/apps/v1alpha1/zz_generated.deepcopy.go +++ b/apis/apps/v1alpha1/zz_generated.deepcopy.go @@ -56,7 +56,7 @@ func (in *Cron) DeepCopyObject() runtime.Object { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CronHistory) DeepCopyInto(out *CronHistory) { *out = *in - in.Object.DeepCopyInto(&out.Object) + out.Object = in.Object if in.Created != nil { in, out := &in.Created, &out.Created *out = (*in).DeepCopy() diff --git a/config/crd/bases/apps.kubedl.io_crons.yaml b/config/crd/bases/apps.kubedl.io_crons.yaml index 83c0e264..2def0417 100644 --- a/config/crd/bases/apps.kubedl.io_crons.yaml +++ b/config/crd/bases/apps.kubedl.io_crons.yaml @@ -86,15 +86,20 @@ spec: type: string object: properties: - apiGroup: + apiVersion: + type: string + fieldPath: type: string kind: type: string name: type: string - required: - - kind - - name + namespace: + type: string + resourceVersion: + type: string + uid: + type: string type: object status: type: string diff --git a/controllers/apps/cron_controller.go b/controllers/apps/cron_controller.go index 1aef6094..d9fbf5bc 100644 --- a/controllers/apps/cron_controller.go +++ b/controllers/apps/cron_controller.go @@ -136,6 +136,11 @@ func (cc *CronController) Reconcile(_ context.Context, req ctrl.Request) (ctrl.R if nextDuration != nil { return ctrl.Result{RequeueAfter: *nextDuration}, nil } + + if err = cc.deleteCompletedJobsBeyondThreshold(&cron); err != nil { + return ctrl.Result{}, err + } + return ctrl.Result{}, nil } @@ -461,6 +466,78 @@ func (cc *CronController) newEmptyWorkload(apiVersion, kind string) (client.Obje return nil, fmt.Errorf("workload %+v has not implemented client.Object interface", groupVersion) } +// Deletes successfully completed jobs beyond given history limits. +func (cc *CronController) deleteCompletedJobsBeyondThreshold(cron *v1alpha1.Cron) error { + historyLimit := cron.Spec.HistoryLimit + if historyLimit == nil { + return nil + } + + completedWorkloads, err := cc.listSuccessfullyCompletedWorkloads(cron) + if err != nil { + return err + } + + if len(completedWorkloads) <= int(*historyLimit) { + return nil + } + + // Sort completed workloads by creation timestamp. + sort.Slice(completedWorkloads, func(i, j int) bool { + return completedWorkloads[i].GetCreationTimestamp().Time.Before(completedWorkloads[j].GetCreationTimestamp().Time) + }) + + // Delete the oldest completed workloads. + for _, wlObj := range completedWorkloads[:len(completedWorkloads)-int(*historyLimit)] { + // Convert to ObjectReference + var wlRef corev1.ObjectReference + if err := cc.scheme.Convert(wlObj, &wlRef, nil); err != nil { + return err + } + + if err := cc.deleteWorkload(cron, wlRef); err != nil { + return err + } + } + + return nil +} + +// listSuccessfullyCompletedWorkloads returns a list of successfully completed workloads. +func (cc *CronController) listSuccessfullyCompletedWorkloads(cron *v1alpha1.Cron) ([]metav1.Object, error) { + workloads := make([]metav1.Object, 0) + + for _, history := range cron.Status.History { + if history.Status == v1.JobSucceeded { + wl, err := cc.newEmptyWorkload(cron.APIVersion, history.Object.Kind) + if err != nil { + klog.Errorf("unsupported cron workload and failed to init by scheme, kind: %s, err: %v", + history.Object.Kind, err) + continue + } + if err := cc.client.Get(context.Background(), types.NamespacedName{ + Name: history.Object.Name, + Namespace: history.Object.Namespace, + }, wl); err != nil { + if errors.IsNotFound(err) { + klog.Infof("completed workload[%s/%s] in cron[%s/%s] has been deleted.", + history.Object.Namespace, history.Object.Name, cron.Namespace, cron.Name) + continue + } + return nil, err + } + metaWl, ok := wl.(metav1.Object) + if !ok { + klog.Warningf("workload [%s/%s] cannot convert to metav1.Object", cron.Namespace, history.Object.Name) + continue + } + workloads = append(workloads, metaWl) + } + } + + return workloads, nil +} + func (cc *CronController) deleteWorkload(cron *v1alpha1.Cron, ref corev1.ObjectReference) error { wl, err := cc.newEmptyWorkload(ref.APIVersion, ref.Kind) if err != nil { diff --git a/controllers/apps/cron_utils.go b/controllers/apps/cron_utils.go index 652894f7..c294daaf 100644 --- a/controllers/apps/cron_utils.go +++ b/controllers/apps/cron_utils.go @@ -129,16 +129,19 @@ func inActiveList(active []corev1.ObjectReference, workload metav1.Object) bool return false } -func workloadToHistory(wl metav1.Object, apiGroup, kind string) v1alpha1.CronHistory { +func workloadToHistory(wl metav1.Object, apiVersion, kind string) v1alpha1.CronHistory { status, finished := IsWorkloadFinished(wl) created := wl.GetCreationTimestamp() ch := v1alpha1.CronHistory{ Created: &created, Status: status, - Object: corev1.TypedLocalObjectReference{ - APIGroup: &apiGroup, - Kind: kind, - Name: wl.GetName(), + Object: corev1.ObjectReference{ + APIVersion: apiVersion, + Kind: kind, + Name: wl.GetName(), + Namespace: wl.GetNamespace(), + ResourceVersion: wl.GetResourceVersion(), + UID: wl.GetUID(), }, } if finished {