From 41c82e69edb9c30db16885fe2d47b0f4d1fa45fb Mon Sep 17 00:00:00 2001 From: Alay Patel Date: Tue, 10 Nov 2020 22:34:08 -0500 Subject: [PATCH] convert to stardard lister, use []*batchv1.Job instead of []batchv1.Job --- .../cronjob/cronjob_controllerv2.go | 65 +++++++++++++------ .../cronjob/cronjob_controllerv2_test.go | 12 ++-- pkg/controller/cronjob/utils.go | 19 ++++++ 3 files changed, 69 insertions(+), 27 deletions(-) diff --git a/pkg/controller/cronjob/cronjob_controllerv2.go b/pkg/controller/cronjob/cronjob_controllerv2.go index 13d27a480e1..8e653844eee 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2.go +++ b/pkg/controller/cronjob/cronjob_controllerv2.go @@ -19,6 +19,7 @@ package cronjob import ( "fmt" "reflect" + "sort" "time" "github.com/robfig/cron" @@ -181,13 +182,13 @@ func (jm *ControllerV2) sync(cronJobKey string) (*time.Duration, error) { return nil, err } - cronJob, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled) + cj, requeueAfter, err := jm.syncCronJob(cronJob, jobsToBeReconciled) if err != nil { klog.V(2).InfoS("error reconciling cronjob", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "err", err) return nil, err } - err = jm.cleanupFinishedJobs(cronJob, jobsToBeReconciled) + err = jm.cleanupFinishedJobs(cj, jobsToBeReconciled) if err != nil { klog.V(2).InfoS("error cleaning up jobs", "cronjob", klog.KRef(cronJob.GetNamespace(), cronJob.GetName()), "resourceVersion", cronJob.GetResourceVersion(), "err", err) return nil, err @@ -222,7 +223,7 @@ func (jm *ControllerV2) resolveControllerRef(namespace string, controllerRef *me return cronJob } -func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]batchv1.Job, error) { +func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([]*batchv1.Job, error) { var jobSelector labels.Selector if len(cronJob.Spec.JobTemplate.Labels) == 0 { jobSelector = labels.Everything() @@ -234,13 +235,13 @@ func (jm *ControllerV2) getJobsToBeReconciled(cronJob *batchv1beta1.CronJob) ([] return nil, err } - jobsToBeReconciled := []batchv1.Job{} + jobsToBeReconciled := []*batchv1.Job{} for _, job := range jobList { // If it has a ControllerRef, that's all that matters. if controllerRef := metav1.GetControllerOf(job); controllerRef != nil && controllerRef.Name == cronJob.Name { // this job is needs to be reconciled - jobsToBeReconciled = append(jobsToBeReconciled, *job) + jobsToBeReconciled = append(jobsToBeReconciled, job) } } return jobsToBeReconciled, nil @@ -397,7 +398,7 @@ func (jm *ControllerV2) updateCronJob(old interface{}, curr interface{}) { // that mutates the object func (jm *ControllerV2) syncCronJob( cj *batchv1beta1.CronJob, - js []batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) { + js []*batchv1.Job) (*batchv1beta1.CronJob, *time.Duration, error) { cj = cj.DeepCopy() now := jm.now() @@ -406,14 +407,14 @@ func (jm *ControllerV2) syncCronJob( for _, j := range js { childrenJobs[j.ObjectMeta.UID] = true found := inActiveList(*cj, j.ObjectMeta.UID) - if !found && !IsJobFinished(&j) { + if !found && !IsJobFinished(j) { jm.recorder.Eventf(cj, corev1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) // We found an unfinished job that has us as the parent, but it is not in our Active list. // This could happen if we crashed right after creating the Job and before updating the status, // or if our jobs list is newer than our cj status after a relist, or if someone intentionally created // a job that they wanted us to adopt. - } else if found && IsJobFinished(&j) { - _, status := getFinishedStatus(&j) + } else if found && IsJobFinished(j) { + _, status := getFinishedStatus(j) deleteFromActiveList(cj, j.ObjectMeta.UID) jm.recorder.Eventf(cj, corev1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) } @@ -597,17 +598,17 @@ func nextScheduledTimeDuration(sched cron.Schedule, now time.Time) *time.Duratio } // cleanupFinishedJobs cleanups finished jobs created by a CronJob -func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batchv1.Job) error { +func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job) error { // If neither limits are active, there is no need to do anything. if cj.Spec.FailedJobsHistoryLimit == nil && cj.Spec.SuccessfulJobsHistoryLimit == nil { return nil } - failedJobs := []batchv1.Job{} - successfulJobs := []batchv1.Job{} + failedJobs := []*batchv1.Job{} + successfulJobs := []*batchv1.Job{} for _, job := range js { - isFinished, finishedStatus := getFinishedStatus(&job) + isFinished, finishedStatus := jm.getFinishedStatus(job) if isFinished && finishedStatus == batchv1.JobComplete { successfulJobs = append(successfulJobs, job) } else if isFinished && finishedStatus == batchv1.JobFailed { @@ -616,19 +617,15 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batch } if cj.Spec.SuccessfulJobsHistoryLimit != nil { - removeOldestJobs(cj, + jm.removeOldestJobs(cj, successfulJobs, - jm.jobControl, - *cj.Spec.SuccessfulJobsHistoryLimit, - jm.recorder) + *cj.Spec.SuccessfulJobsHistoryLimit) } if cj.Spec.FailedJobsHistoryLimit != nil { - removeOldestJobs(cj, + jm.removeOldestJobs(cj, failedJobs, - jm.jobControl, - *cj.Spec.FailedJobsHistoryLimit, - jm.recorder) + *cj.Spec.FailedJobsHistoryLimit) } // Update the CronJob, in case jobs were removed from the list. @@ -636,6 +633,32 @@ func (jm *ControllerV2) cleanupFinishedJobs(cj *batchv1beta1.CronJob, js []batch return err } +func (jm *ControllerV2) getFinishedStatus(j *batchv1.Job) (bool, batchv1.JobConditionType) { + for _, c := range j.Status.Conditions { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { + return true, c.Type + } + } + return false, "" +} + +// removeOldestJobs removes the oldest jobs from a list of jobs +func (jm *ControllerV2) removeOldestJobs(cj *batchv1beta1.CronJob, js []*batchv1.Job, maxJobs int32) { + numToDelete := len(js) - int(maxJobs) + if numToDelete <= 0 { + return + } + + nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) + klog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog) + + sort.Sort(byJobStartTimeStar(js)) + for i := 0; i < numToDelete; i++ { + klog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) + deleteJob(cj, js[i], jm.jobControl, jm.recorder) + } +} + // isJobInActiveList take a job and checks if activeJobs has a job with the same // name and namespace. func isJobInActiveList(job *batchv1.Job, activeJobs []corev1.ObjectReference) bool { diff --git a/pkg/controller/cronjob/cronjob_controllerv2_test.go b/pkg/controller/cronjob/cronjob_controllerv2_test.go index 4ba9e68293b..54201ab3fd9 100644 --- a/pkg/controller/cronjob/cronjob_controllerv2_test.go +++ b/pkg/controller/cronjob/cronjob_controllerv2_test.go @@ -164,7 +164,7 @@ func Test_syncOne2(t *testing.T) { job *batchv1.Job err error ) - js := []batchv1.Job{} + js := []*batchv1.Job{} if tc.ranPreviously { cj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeThePriorHour()} cj.Status.LastScheduleTime = &metav1.Time{Time: justAfterThePriorHour()} @@ -181,7 +181,7 @@ func Test_syncOne2(t *testing.T) { } cj.Status.Active = []v1.ObjectReference{*ref} if !tc.jobStillNotFoundInLister { - js = append(js, *job) + js = append(js, job) } } } else { @@ -458,7 +458,7 @@ func TestControllerV2_getJobList(t *testing.T) { name string fields fields args args - want []batchv1.Job + want []*batchv1.Job wantErr bool }{ { @@ -476,7 +476,7 @@ func TestControllerV2_getJobList(t *testing.T) { }, }}}, args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, - want: []batchv1.Job{}, + want: []*batchv1.Job{}, }, { name: "test getting jobs in namespace with a controller reference", @@ -499,7 +499,7 @@ func TestControllerV2_getJobList(t *testing.T) { }, }}}, args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, - want: []batchv1.Job{{ + want: []*batchv1.Job{{ ObjectMeta: metav1.ObjectMeta{Name: "foo1", Namespace: "foo-ns", OwnerReferences: []metav1.OwnerReference{ { @@ -524,7 +524,7 @@ func TestControllerV2_getJobList(t *testing.T) { }, }}}, args: args{cronJob: &batchv1beta1.CronJob{ObjectMeta: metav1.ObjectMeta{Namespace: "foo-ns", Name: "fooer"}}}, - want: []batchv1.Job{}, + want: []*batchv1.Job{}, }, } for _, tt := range tests { diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index 29dcab3144b..e78bc6d9a0d 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -298,3 +298,22 @@ func (o byJobStartTime) Less(i, j int) bool { } return o[i].Status.StartTime.Before(o[j].Status.StartTime) } + +// byJobStartTimeStar sorts a list of jobs by start timestamp, using their names as a tie breaker. +type byJobStartTimeStar []*batchv1.Job + +func (o byJobStartTimeStar) Len() int { return len(o) } +func (o byJobStartTimeStar) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byJobStartTimeStar) Less(i, j int) bool { + if o[i].Status.StartTime == nil && o[j].Status.StartTime != nil { + return false + } + if o[i].Status.StartTime != nil && o[j].Status.StartTime == nil { + return true + } + if o[i].Status.StartTime.Equal(o[j].Status.StartTime) { + return o[i].Name < o[j].Name + } + return o[i].Status.StartTime.Before(o[j].Status.StartTime) +}