diff --git a/pkg/api/testing/fuzzer.go b/pkg/api/testing/fuzzer.go index 415a5045e2a..8ad142d8709 100644 --- a/pkg/api/testing/fuzzer.go +++ b/pkg/api/testing/fuzzer.go @@ -542,6 +542,14 @@ func batchFuncs(t apitesting.TestingCommon) []interface{} { sds := int64(c.RandUint64()) sj.StartingDeadlineSeconds = &sds sj.Schedule = c.RandString() + if hasSuccessLimit := c.RandBool(); hasSuccessLimit { + successfulJobsHistoryLimit := int32(c.Rand.Int31()) + sj.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit + } + if hasFailedLimit := c.RandBool(); hasFailedLimit { + failedJobsHistoryLimit := int32(c.Rand.Int31()) + sj.FailedJobsHistoryLimit = &failedJobsHistoryLimit + } }, func(cp *batch.ConcurrencyPolicy, c fuzz.Continue) { policies := []batch.ConcurrencyPolicy{batch.AllowConcurrent, batch.ForbidConcurrent, batch.ReplaceConcurrent} diff --git a/pkg/apis/batch/types.go b/pkg/apis/batch/types.go index ef0037e0d00..2113dca18a6 100644 --- a/pkg/apis/batch/types.go +++ b/pkg/apis/batch/types.go @@ -244,6 +244,16 @@ type CronJobSpec struct { // JobTemplate is the object that describes the job that will be created when // executing a CronJob. JobTemplate JobTemplateSpec + + // The number of successful finished jobs to retain. + // This is a pointer to distinguish between explicit zero and not specified. + // +optional + SuccessfulJobsHistoryLimit *int32 + + // The number of failed finished jobs to retain. + // This is a pointer to distinguish between explicit zero and not specified. + // +optional + FailedJobsHistoryLimit *int32 } // ConcurrencyPolicy describes how the job will be handled. diff --git a/pkg/apis/batch/v2alpha1/types.go b/pkg/apis/batch/v2alpha1/types.go index e03f8c0561d..5d111471f2d 100644 --- a/pkg/apis/batch/v2alpha1/types.go +++ b/pkg/apis/batch/v2alpha1/types.go @@ -250,6 +250,16 @@ type CronJobSpec struct { // JobTemplate is the object that describes the job that will be created when // executing a CronJob. JobTemplate JobTemplateSpec `json:"jobTemplate" protobuf:"bytes,5,opt,name=jobTemplate"` + + // The number of successful finished jobs to retain. + // This is a pointer to distinguish between explicit zero and not specified. + // +optional + SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"` + + // The number of failed finished jobs to retain. + // This is a pointer to distinguish between explicit zero and not specified. + // +optional + FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"` } // ConcurrencyPolicy describes how the job will be handled. diff --git a/pkg/apis/batch/validation/validation.go b/pkg/apis/batch/validation/validation.go index 51f6b250a0c..77ab204e4a7 100644 --- a/pkg/apis/batch/validation/validation.go +++ b/pkg/apis/batch/validation/validation.go @@ -179,6 +179,15 @@ func ValidateCronJobSpec(spec *batch.CronJobSpec, fldPath *field.Path) field.Err allErrs = append(allErrs, validateConcurrencyPolicy(&spec.ConcurrencyPolicy, fldPath.Child("concurrencyPolicy"))...) allErrs = append(allErrs, ValidateJobTemplateSpec(&spec.JobTemplate, fldPath.Child("jobTemplate"))...) + if spec.SuccessfulJobsHistoryLimit != nil { + // zero is a valid SuccessfulJobsHistoryLimit + allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.SuccessfulJobsHistoryLimit), fldPath.Child("successfulJobsHistoryLimit"))...) + } + if spec.FailedJobsHistoryLimit != nil { + // zero is a valid SuccessfulJobsHistoryLimit + allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.FailedJobsHistoryLimit), fldPath.Child("failedJobsHistoryLimit"))...) + } + return allErrs } diff --git a/pkg/apis/batch/validation/validation_test.go b/pkg/apis/batch/validation/validation_test.go index 0e60991f423..53c81052999 100644 --- a/pkg/apis/batch/validation/validation_test.go +++ b/pkg/apis/batch/validation/validation_test.go @@ -402,6 +402,40 @@ func TestValidateCronJob(t *testing.T) { }, }, }, + "spec.successfulJobsHistoryLimit: must be greater than or equal to 0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: metav1.NamespaceDefault, + UID: types.UID("1a2b3c"), + }, + Spec: batch.CronJobSpec{ + Schedule: "* * * * ?", + ConcurrencyPolicy: batch.AllowConcurrent, + SuccessfulJobsHistoryLimit: &negative, + JobTemplate: batch.JobTemplateSpec{ + Spec: batch.JobSpec{ + Template: validPodTemplateSpec, + }, + }, + }, + }, + "spec.failedJobsHistoryLimit: must be greater than or equal to 0": { + ObjectMeta: metav1.ObjectMeta{ + Name: "mycronjob", + Namespace: metav1.NamespaceDefault, + UID: types.UID("1a2b3c"), + }, + Spec: batch.CronJobSpec{ + Schedule: "* * * * ?", + ConcurrencyPolicy: batch.AllowConcurrent, + FailedJobsHistoryLimit: &negative, + JobTemplate: batch.JobTemplateSpec{ + Spec: batch.JobSpec{ + Template: validPodTemplateSpec, + }, + }, + }, + }, "spec.concurrencyPolicy: Required value": { ObjectMeta: metav1.ObjectMeta{ Name: "mycronjob", diff --git a/pkg/controller/cronjob/cronjob_controller.go b/pkg/controller/cronjob/cronjob_controller.go index 173e2a25d31..9271f2242b0 100644 --- a/pkg/controller/cronjob/cronjob_controller.go +++ b/pkg/controller/cronjob/cronjob_controller.go @@ -30,6 +30,7 @@ Just periodically list jobs and SJs, and then reconcile them. import ( "fmt" + "sort" "time" "github.com/golang/glog" @@ -92,13 +93,13 @@ func (jm *CronJobController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting CronJob Manager") // Check things every 10 second. - go wait.Until(jm.SyncAll, 10*time.Second, stopCh) + go wait.Until(jm.syncAll, 10*time.Second, stopCh) <-stopCh glog.Infof("Shutting down CronJob Manager") } -// SyncAll lists all the CronJobs and Jobs and reconciles them. -func (jm *CronJobController) SyncAll() { +// syncAll lists all the CronJobs and Jobs and reconciles them. +func (jm *CronJobController) syncAll() { sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}) if err != nil { glog.Errorf("Error listing cronjobs: %v", err) @@ -119,24 +120,86 @@ func (jm *CronJobController) SyncAll() { glog.V(4).Infof("Found %d groups", len(jobsBySj)) for _, sj := range sjs { - SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) + syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) + cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder) } } -// SyncOne reconciles a CronJob with a list of any Jobs that it created. +// cleanupFinishedJobs cleanups finished jobs created by a CronJob +func cleanupFinishedJobs(sj *batch.CronJob, js []batch.Job, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { + // If neither limits are active, there is no need to do anything. + if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil { + return + } + + failedJobs := []batch.Job{} + succesfulJobs := []batch.Job{} + + for _, job := range js { + isFinished, finishedStatus := getFinishedStatus(&job) + if isFinished && finishedStatus == batch.JobComplete { + succesfulJobs = append(succesfulJobs, job) + } else if isFinished && finishedStatus == batch.JobFailed { + failedJobs = append(failedJobs, job) + } + } + + if sj.Spec.SuccessfulJobsHistoryLimit != nil { + removeOldestJobs(sj, + succesfulJobs, + jc, + pc, + *sj.Spec.SuccessfulJobsHistoryLimit, + recorder) + } + + if sj.Spec.FailedJobsHistoryLimit != nil { + removeOldestJobs(sj, + failedJobs, + jc, + pc, + *sj.Spec.FailedJobsHistoryLimit, + recorder) + } + + // Update the CronJob, in case jobs were removed from the list. + if _, err := sjc.UpdateStatus(sj); err != nil { + nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) + glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) + } +} + +// removeOldestJobs removes the oldest jobs from a list of jobs +func removeOldestJobs(sj *batch.CronJob, js []batch.Job, jc jobControlInterface, pc podControlInterface, maxJobs int32, recorder record.EventRecorder) { + numToDelete := len(js) - int(maxJobs) + if numToDelete <= 0 { + return + } + + nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) + glog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog) + + sort.Sort(byJobStartTime(js)) + for i := 0; i < numToDelete; i++ { + glog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog) + deleteJob(sj, &js[i], jc, pc, recorder, "history limit reached") + } +} + +// syncOne reconciles a CronJob with a list of any Jobs that it created. // All known jobs created by "sj" should be included in "js". // The current time is passed in to facilitate testing. // It has no receiver, to facilitate testing. -func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { +func syncOne(sj *batch.CronJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) childrenJobs := make(map[types.UID]bool) for i := range js { j := js[i] childrenJobs[j.ObjectMeta.UID] = true - found := inActiveList(sj, j.ObjectMeta.UID) + found := inActiveList(*sj, j.ObjectMeta.UID) if !found && !IsJobFinished(&j) { - recorder.Eventf(&sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) + recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name) // We found an unfinished job that has us as the parent, but it is not in our Active list. // This could happen if we crashed right after creating the Job and before updating the status, // or if our jobs list is newer than our sj status after a relist, or if someone intentionally created @@ -148,9 +211,9 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter // in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way. // TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about? } else if found && IsJobFinished(&j) { - deleteFromActiveList(&sj, j.ObjectMeta.UID) + deleteFromActiveList(sj, j.ObjectMeta.UID) // TODO: event to call out failure vs success. - recorder.Eventf(&sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) + recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name) } } @@ -159,25 +222,25 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter // job running. for _, j := range sj.Status.Active { if found := childrenJobs[j.UID]; !found { - recorder.Eventf(&sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) - deleteFromActiveList(&sj, j.UID) + recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) + deleteFromActiveList(sj, j.UID) } } - updatedSJ, err := sjc.UpdateStatus(&sj) + updatedSJ, err := sjc.UpdateStatus(sj) if err != nil { glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) return } - sj = *updatedSJ + *sj = *updatedSJ if sj.Spec.Suspend != nil && *sj.Spec.Suspend { glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } - times, err := getRecentUnmetScheduleTimes(sj, now) + times, err := getRecentUnmetScheduleTimes(*sj, now) if err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) + recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) } // TODO: handle multiple unmet start times, from oldest to newest, updating status as needed. @@ -224,73 +287,37 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter // TODO: this should be replaced with server side job deletion // currently this mimics JobReaper from pkg/kubectl/stop.go glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) + job, err := jc.GetJob(j.Namespace, j.Name) if err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) + recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } - // scale job down to 0 - if *job.Spec.Parallelism != 0 { - zero := int32(0) - job.Spec.Parallelism = &zero - job, err = jc.UpdateJob(job.Namespace, job) - if err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err) - return - } - } - // remove all pods... - selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) - options := metav1.ListOptions{LabelSelector: selector.String()} - podList, err := pc.ListPods(job.Namespace, options) - if err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err) - } - errList := []error{} - for _, pod := range podList.Items { - glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name) - if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil { - // ignores the error when the pod isn't found - if !errors.IsNotFound(err) { - errList = append(errList, err) - } - } - } - if len(errList) != 0 { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) + if !deleteJob(sj, job, jc, pc, recorder, "") { return } - // ... the job itself... - if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) - glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) - return - } - // ... and its reference from active list - deleteFromActiveList(&sj, job.ObjectMeta.UID) - recorder.Eventf(&sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name) } } - jobReq, err := getJobFromTemplate(&sj, scheduledTime) + jobReq, err := getJobFromTemplate(sj, scheduledTime) if err != nil { glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) return } jobResp, err := jc.CreateJob(sj.Namespace, jobReq) if err != nil { - recorder.Eventf(&sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) + recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) return } glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) - recorder.Eventf(&sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) + recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) // ------------------------------------------------------------------ // // If this process restarts at this point (after posting a job, but // before updating the status), then we might try to start the job on // the next time. Actually, if we relist the SJs and Jobs on the next - // iteration of SyncAll, we might not see our own status update, and + // iteration of syncAll, we might not see our own status update, and // then post one again. So, we need to use the job name as a lock to // prevent us from making the job twice (name the job with hash of its // scheduled time). @@ -303,13 +330,64 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter sj.Status.Active = append(sj.Status.Active, *ref) } sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} - if _, err := sjc.UpdateStatus(&sj); err != nil { + if _, err := sjc.UpdateStatus(sj); err != nil { glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err) } return } +// deleteJob reaps a job, deleting the job, the pobs and the reference in the active list +func deleteJob(sj *batch.CronJob, job *batch.Job, jc jobControlInterface, pc podControlInterface, recorder record.EventRecorder, reason string) bool { + // TODO: this should be replaced with server side job deletion + // currencontinuetly this mimics JobReaper from pkg/kubectl/stop.go + nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name) + var err error + + // scale job down to 0 + if *job.Spec.Parallelism != 0 { + zero := int32(0) + job.Spec.Parallelism = &zero + job, err = jc.UpdateJob(job.Namespace, job) + if err != nil { + recorder.Eventf(sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err) + return false + } + } + // remove all pods... + selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector) + options := metav1.ListOptions{LabelSelector: selector.String()} + podList, err := pc.ListPods(job.Namespace, options) + if err != nil { + recorder.Eventf(sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err) + } + errList := []error{} + for _, pod := range podList.Items { + glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name) + if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil { + // ignores the error when the pod isn't found + if !errors.IsNotFound(err) { + errList = append(errList, err) + } + } + } + if len(errList) != 0 { + recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList)) + return false + } + // ... the job itself... + if err := jc.DeleteJob(job.Namespace, job.Name); err != nil { + recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err) + glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err) + return false + } + // ... and its reference from active list + deleteFromActiveList(sj, job.ObjectMeta.UID) + recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name) + + return true +} + func getRef(object runtime.Object) (*v1.ObjectReference, error) { return v1.GetReference(api.Scheme, object) } diff --git a/pkg/controller/cronjob/cronjob_controller_test.go b/pkg/controller/cronjob/cronjob_controller_test.go index f3843f77b41..bcbe2b5f5aa 100644 --- a/pkg/controller/cronjob/cronjob_controller_test.go +++ b/pkg/controller/cronjob/cronjob_controller_test.go @@ -17,6 +17,8 @@ limitations under the License. package cronjob import ( + "sort" + "strconv" "strings" "testing" "time" @@ -81,6 +83,14 @@ func justAfterThePriorHour() time.Time { return T1 } +func startTimeStringToTime(startTime string) time.Time { + T1, err := time.Parse(time.RFC3339, startTime) + if err != nil { + panic("test setup error") + } + return T1 +} + // returns a cronJob with some fields filled in. func cronJob() batch.CronJob { return batch.CronJob{ @@ -270,7 +280,7 @@ func TestSyncOne_RunOrNot(t *testing.T) { pc := &fakePodControl{} recorder := record.NewFakeRecorder(10) - SyncOne(sj, js, tc.now, jc, sjc, pc, recorder) + syncOne(&sj, js, tc.now, jc, sjc, pc, recorder) expectedCreates := 0 if tc.expectCreate { expectedCreates = 1 @@ -320,10 +330,237 @@ func TestSyncOne_RunOrNot(t *testing.T) { } } +type CleanupJobSpec struct { + StartTime string + IsFinished bool + IsSuccessful bool + ExpectDelete bool + IsStillInActiveList bool // only when IsFinished is set +} + +func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) { + limitThree := int32(3) + limitTwo := int32(2) + limitOne := int32(1) + limitZero := int32(0) + + // Starting times are assumed to be sorted by increasing start time + // in all the test cases + testCases := map[string]struct { + jobSpecs []CleanupJobSpec + now time.Time + successfulJobsHistoryLimit *int32 + failedJobsHistoryLimit *int32 + expectActive int + }{ + "success. job limit reached": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, T, F}, + {"2016-05-19T05:00:00Z", T, T, T, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", F, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), &limitTwo, &limitOne, 1}, + + "success. jobs not processed by Sync yet": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, T, F}, + {"2016-05-19T05:00:00Z", T, T, T, T}, + {"2016-05-19T06:00:00Z", T, T, F, T}, + {"2016-05-19T07:00:00Z", T, T, F, T}, + {"2016-05-19T08:00:00Z", F, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, T}, + }, justBeforeTheHour(), &limitTwo, &limitOne, 4}, + + "failed job limit reached": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, F, T, F}, + {"2016-05-19T05:00:00Z", T, F, T, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", T, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), &limitTwo, &limitTwo, 0}, + + "success. job limit set to zero": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, T, F}, + {"2016-05-19T05:00:00Z", T, F, T, F}, + {"2016-05-19T06:00:00Z", T, T, T, F}, + {"2016-05-19T07:00:00Z", T, T, T, F}, + {"2016-05-19T08:00:00Z", F, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), &limitZero, &limitOne, 1}, + + "failed job limit set to zero": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, F, F}, + {"2016-05-19T05:00:00Z", T, F, T, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", F, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, T, F}, + }, justBeforeTheHour(), &limitThree, &limitZero, 1}, + + "no limits reached": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, F, F}, + {"2016-05-19T05:00:00Z", T, F, F, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", T, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), &limitThree, &limitThree, 0}, + + // This test case should trigger the short-circuit + "limits disabled": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, F, F}, + {"2016-05-19T05:00:00Z", T, F, F, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", T, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), nil, nil, 0}, + + "success limit disabled": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, F, F}, + {"2016-05-19T05:00:00Z", T, F, F, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", T, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), nil, &limitThree, 0}, + + "failure limit disabled": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", T, T, F, F}, + {"2016-05-19T05:00:00Z", T, F, F, F}, + {"2016-05-19T06:00:00Z", T, T, F, F}, + {"2016-05-19T07:00:00Z", T, T, F, F}, + {"2016-05-19T08:00:00Z", T, F, F, F}, + {"2016-05-19T09:00:00Z", T, F, F, F}, + }, justBeforeTheHour(), &limitThree, nil, 0}, + + "no limits reached because still active": { + []CleanupJobSpec{ + {"2016-05-19T04:00:00Z", F, F, F, F}, + {"2016-05-19T05:00:00Z", F, F, F, F}, + {"2016-05-19T06:00:00Z", F, F, F, F}, + {"2016-05-19T07:00:00Z", F, F, F, F}, + {"2016-05-19T08:00:00Z", F, F, F, F}, + {"2016-05-19T09:00:00Z", F, F, F, F}, + }, justBeforeTheHour(), &limitZero, &limitZero, 6}, + } + + for name, tc := range testCases { + sj := cronJob() + suspend := false + sj.Spec.ConcurrencyPolicy = f + sj.Spec.Suspend = &suspend + sj.Spec.Schedule = onTheHour + + sj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit + sj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit + + var ( + job *batch.Job + err error + ) + + // Set consistent timestamps for the CronJob + if len(tc.jobSpecs) != 0 { + firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime) + lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime) + sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime} + sj.Status.LastScheduleTime = &metav1.Time{Time: lastTime} + } else { + sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()} + } + + // Create jobs + js := []batch.Job{} + jobsToDelete := []string{} + sj.Status.Active = []v1.ObjectReference{} + + for i, spec := range tc.jobSpecs { + job, err = getJobFromTemplate(&sj, startTimeStringToTime(spec.StartTime)) + if err != nil { + t.Fatalf("%s: unexpected error creating a job from template: %v", name, err) + } + + job.UID = types.UID(strconv.Itoa(i)) + job.Namespace = "" + + if spec.IsFinished { + var conditionType batch.JobConditionType + if spec.IsSuccessful { + conditionType = batch.JobComplete + } else { + conditionType = batch.JobFailed + } + condition := batch.JobCondition{Type: conditionType, Status: v1.ConditionTrue} + job.Status.Conditions = append(job.Status.Conditions, condition) + + if spec.IsStillInActiveList { + sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID}) + } + } else { + if spec.IsSuccessful || spec.IsStillInActiveList { + t.Errorf("%s: test setup error: this case makes no sense", name) + } + sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID}) + } + + js = append(js, *job) + if spec.ExpectDelete { + jobsToDelete = append(jobsToDelete, job.Name) + } + } + + jc := &fakeJobControl{Job: job} + pc := &fakePodControl{} + sjc := &fakeSJControl{} + recorder := record.NewFakeRecorder(10) + + cleanupFinishedJobs(&sj, js, jc, sjc, pc, recorder) + + // Check we have actually deleted the correct jobs + if len(jc.DeleteJobName) != len(jobsToDelete) { + t.Errorf("%s: expected %d job deleted, actually %d", name, len(jobsToDelete), len(jc.DeleteJobName)) + } else { + sort.Strings(jobsToDelete) + sort.Strings(jc.DeleteJobName) + for i, expectedJobName := range jobsToDelete { + if expectedJobName != jc.DeleteJobName[i] { + t.Errorf("%s: expected job %s deleted, actually %v -- %v vs %v", name, expectedJobName, jc.DeleteJobName[i], jc.DeleteJobName, jobsToDelete) + } + } + } + + // Check for events + expectedEvents := len(jobsToDelete) + if len(recorder.Events) != expectedEvents { + t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events)) + } + + // Check for jobs still in active list + numActive := 0 + if len(sjc.Updates) != 0 { + numActive = len(sjc.Updates[len(sjc.Updates)-1].Status.Active) + } + if tc.expectActive != numActive { + t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive) + } + } +} + // TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs, // but over time, all jobs run as expected (assuming Allow and no deadline). -// TestSyncOne_Status tests sj.UpdateStatus in SyncOne +// TestSyncOne_Status tests sj.UpdateStatus in syncOne func TestSyncOne_Status(t *testing.T) { finishedJob := newJob("1") finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batch.JobCondition{Type: batch.JobComplete, Status: v1.ConditionTrue}) @@ -443,7 +680,7 @@ func TestSyncOne_Status(t *testing.T) { recorder := record.NewFakeRecorder(10) // Run the code - SyncOne(sj, jobs, tc.now, jc, sjc, pc, recorder) + syncOne(&sj, jobs, tc.now, jc, sjc, pc, recorder) // Status update happens once when ranging through job list, and another one if create jobs. expectUpdates := 1 diff --git a/pkg/controller/cronjob/utils.go b/pkg/controller/cronjob/utils.go index a2837b2d535..2fc73a666e5 100644 --- a/pkg/controller/cronjob/utils.go +++ b/pkg/controller/cronjob/utils.go @@ -234,11 +234,34 @@ func makeCreatedByRefJson(object runtime.Object) (string, error) { return string(createdByRefJson), nil } -func IsJobFinished(j *batch.Job) bool { +func getFinishedStatus(j *batch.Job) (bool, batch.JobConditionType) { for _, c := range j.Status.Conditions { if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue { - return true + return true, c.Type } } - return false + return false, "" +} + +func IsJobFinished(j *batch.Job) bool { + isFinished, _ := getFinishedStatus(j) + return isFinished +} + +// byJobStartTime sorts a list of jobs by start timestamp, using their names as a tie breaker. +type byJobStartTime []batch.Job + +func (o byJobStartTime) Len() int { return len(o) } +func (o byJobStartTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byJobStartTime) Less(i, j int) bool { + if o[j].Status.StartTime == nil { + return o[i].Status.StartTime != nil + } + + if (*o[i].Status.StartTime).Equal(*o[j].Status.StartTime) { + return o[i].Name < o[j].Name + } + + return (*o[i].Status.StartTime).Before(*o[j].Status.StartTime) } diff --git a/test/e2e/cronjob.go b/test/e2e/cronjob.go index bcc4304c965..1b81ab5e2a9 100644 --- a/test/e2e/cronjob.go +++ b/test/e2e/cronjob.go @@ -52,6 +52,11 @@ var ( var _ = framework.KubeDescribe("CronJob", func() { f := framework.NewDefaultGroupVersionFramework("cronjob", BatchV2Alpha1GroupVersion) + sleepCommand := []string{"sleep", "300"} + + // Pod will complete instantly + successCommand := []string{"/bin/true"} + BeforeEach(func() { framework.SkipIfMissingResource(f.ClientPool, CronJobGroupVersionResource, f.Namespace.Name) }) @@ -59,7 +64,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // multiple jobs running at once It("should schedule multiple jobs concurrently", func() { By("Creating a cronjob") - cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, true) + cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, + sleepCommand, nil) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -70,7 +76,7 @@ var _ = framework.KubeDescribe("CronJob", func() { By("Ensuring at least two running jobs exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) - activeJobs := filterActiveJobs(jobs) + activeJobs, _ := filterActiveJobs(jobs) Expect(len(activeJobs) >= 2).To(BeTrue()) By("Removing cronjob") @@ -81,7 +87,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // suspended should not schedule jobs It("should not schedule jobs when suspended [Slow]", func() { By("Creating a suspended cronjob") - cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, true) + cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, + sleepCommand, nil) cronJob.Spec.Suspend = newBool(true) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -103,7 +110,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // only single active job is allowed for ForbidConcurrent It("should not schedule new jobs when ForbidConcurrent [Slow]", func() { By("Creating a ForbidConcurrent cronjob") - cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true) + cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, + sleepCommand, nil) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -119,7 +127,7 @@ var _ = framework.KubeDescribe("CronJob", func() { By("Ensuring exaclty one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) - activeJobs := filterActiveJobs(jobs) + activeJobs, _ := filterActiveJobs(jobs) Expect(activeJobs).To(HaveLen(1)) By("Ensuring no more jobs are scheduled") @@ -134,7 +142,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // only single active job is allowed for ReplaceConcurrent It("should replace jobs when ReplaceConcurrent", func() { By("Creating a ReplaceConcurrent cronjob") - cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, true) + cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, + sleepCommand, nil) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -150,7 +159,7 @@ var _ = framework.KubeDescribe("CronJob", func() { By("Ensuring exaclty one running job exists by listing jobs explicitly") jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) Expect(err).NotTo(HaveOccurred()) - activeJobs := filterActiveJobs(jobs) + activeJobs, _ := filterActiveJobs(jobs) Expect(activeJobs).To(HaveLen(1)) By("Ensuring the job is replaced with a new one") @@ -165,7 +174,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // shouldn't give us unexpected warnings It("should not emit unexpected warnings", func() { By("Creating a cronjob") - cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, false) + cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, + nil, nil) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -187,7 +197,8 @@ var _ = framework.KubeDescribe("CronJob", func() { // deleted jobs should be removed from the active list It("should remove from active list jobs that have been deleted", func() { By("Creating a ForbidConcurrent cronjob") - cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true) + cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, + sleepCommand, nil) cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) Expect(err).NotTo(HaveOccurred()) @@ -225,10 +236,49 @@ var _ = framework.KubeDescribe("CronJob", func() { err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) Expect(err).NotTo(HaveOccurred()) }) + + // cleanup of successful finished jobs, with limit of one successful job + It("should delete successful finished jobs with limit of one successful job", func() { + By("Creating a AllowConcurrent cronjob with custom history limits") + successLimit := int32(1) + cronJob := newTestCronJob("concurrent-limit", "*/1 * * * ?", batch.AllowConcurrent, + successCommand, &successLimit) + cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob) + Expect(err).NotTo(HaveOccurred()) + + // Job is going to complete instantly: do not check for an active job + // as we are most likely to miss it + + By("Ensuring a finished job exists") + err = waitForAnyFinishedJob(f.ClientSet, f.Namespace.Name) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring a finished job exists by listing jobs explicitly") + jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, finishedJobs := filterActiveJobs(jobs) + Expect(len(finishedJobs) == 1).To(BeTrue()) + + // Job should get deleted when the next job finishes the next minute + By("Ensuring this job does not exist anymore") + err = waitForJobNotExist(f.ClientSet, f.Namespace.Name, finishedJobs[0]) + Expect(err).NotTo(HaveOccurred()) + + By("Ensuring there is 1 finished job by listing jobs explicitly") + jobs, err = f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{}) + Expect(err).NotTo(HaveOccurred()) + _, finishedJobs = filterActiveJobs(jobs) + Expect(len(finishedJobs) == 1).To(BeTrue()) + + By("Removing cronjob") + err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name) + Expect(err).NotTo(HaveOccurred()) + }) }) // newTestCronJob returns a cronjob which does one of several testing behaviors. -func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, sleep bool) *batch.CronJob { +func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, command []string, + successfulJobsHistoryLimit *int32) *batch.CronJob { parallelism := int32(1) completions := int32(1) sj := &batch.CronJob{ @@ -271,8 +321,9 @@ func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPo }, }, } - if sleep { - sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "300"} + sj.Spec.SuccessfulJobsHistoryLimit = successfulJobsHistoryLimit + if command != nil { + sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = command } return sj } @@ -319,6 +370,23 @@ func waitForNoJobs(c clientset.Interface, ns, jobName string, failIfNonEmpty boo }) } +// Wait for a job to not exist by listing jobs explicitly. +func waitForJobNotExist(c clientset.Interface, ns string, targetJob *batchv1.Job) error { + return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) { + jobs, err := c.Batch().Jobs(ns).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + _, finishedJobs := filterActiveJobs(jobs) + for _, job := range finishedJobs { + if targetJob.Namespace == job.Namespace && targetJob.Name == job.Name { + return false, nil + } + } + return true, nil + }) +} + // Wait for a job to be replaced with a new one. func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error { return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) { @@ -383,11 +451,13 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso return nil } -func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job) { +func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) { for i := range jobs.Items { j := jobs.Items[i] if !job.IsJobFinished(&j) { active = append(active, &j) + } else { + finished = append(finished, &j) } } return