Modify CronJob API to add job history limits, cleanup jobs in controller

This commit is contained in:
peay 2017-02-25 06:51:54 -05:00
parent 59cec9c1a6
commit 2b33de0684
9 changed files with 558 additions and 79 deletions

View File

@ -542,6 +542,14 @@ func batchFuncs(t apitesting.TestingCommon) []interface{} {
sds := int64(c.RandUint64())
sj.StartingDeadlineSeconds = &sds
sj.Schedule = c.RandString()
if hasSuccessLimit := c.RandBool(); hasSuccessLimit {
successfulJobsHistoryLimit := int32(c.Rand.Int31())
sj.SuccessfulJobsHistoryLimit = &successfulJobsHistoryLimit
}
if hasFailedLimit := c.RandBool(); hasFailedLimit {
failedJobsHistoryLimit := int32(c.Rand.Int31())
sj.FailedJobsHistoryLimit = &failedJobsHistoryLimit
}
},
func(cp *batch.ConcurrencyPolicy, c fuzz.Continue) {
policies := []batch.ConcurrencyPolicy{batch.AllowConcurrent, batch.ForbidConcurrent, batch.ReplaceConcurrent}

View File

@ -244,6 +244,16 @@ type CronJobSpec struct {
// JobTemplate is the object that describes the job that will be created when
// executing a CronJob.
JobTemplate JobTemplateSpec
// The number of successful finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
SuccessfulJobsHistoryLimit *int32
// The number of failed finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
FailedJobsHistoryLimit *int32
}
// ConcurrencyPolicy describes how the job will be handled.

View File

@ -250,6 +250,16 @@ type CronJobSpec struct {
// JobTemplate is the object that describes the job that will be created when
// executing a CronJob.
JobTemplate JobTemplateSpec `json:"jobTemplate" protobuf:"bytes,5,opt,name=jobTemplate"`
// The number of successful finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
// The number of failed finished jobs to retain.
// This is a pointer to distinguish between explicit zero and not specified.
// +optional
FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
}
// ConcurrencyPolicy describes how the job will be handled.

View File

@ -179,6 +179,15 @@ func ValidateCronJobSpec(spec *batch.CronJobSpec, fldPath *field.Path) field.Err
allErrs = append(allErrs, validateConcurrencyPolicy(&spec.ConcurrencyPolicy, fldPath.Child("concurrencyPolicy"))...)
allErrs = append(allErrs, ValidateJobTemplateSpec(&spec.JobTemplate, fldPath.Child("jobTemplate"))...)
if spec.SuccessfulJobsHistoryLimit != nil {
// zero is a valid SuccessfulJobsHistoryLimit
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.SuccessfulJobsHistoryLimit), fldPath.Child("successfulJobsHistoryLimit"))...)
}
if spec.FailedJobsHistoryLimit != nil {
// zero is a valid SuccessfulJobsHistoryLimit
allErrs = append(allErrs, apivalidation.ValidateNonnegativeField(int64(*spec.FailedJobsHistoryLimit), fldPath.Child("failedJobsHistoryLimit"))...)
}
return allErrs
}

View File

@ -402,6 +402,40 @@ func TestValidateCronJob(t *testing.T) {
},
},
},
"spec.successfulJobsHistoryLimit: must be greater than or equal to 0": {
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
SuccessfulJobsHistoryLimit: &negative,
JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{
Template: validPodTemplateSpec,
},
},
},
},
"spec.failedJobsHistoryLimit: must be greater than or equal to 0": {
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",
Namespace: metav1.NamespaceDefault,
UID: types.UID("1a2b3c"),
},
Spec: batch.CronJobSpec{
Schedule: "* * * * ?",
ConcurrencyPolicy: batch.AllowConcurrent,
FailedJobsHistoryLimit: &negative,
JobTemplate: batch.JobTemplateSpec{
Spec: batch.JobSpec{
Template: validPodTemplateSpec,
},
},
},
},
"spec.concurrencyPolicy: Required value": {
ObjectMeta: metav1.ObjectMeta{
Name: "mycronjob",

View File

@ -30,6 +30,7 @@ Just periodically list jobs and SJs, and then reconcile them.
import (
"fmt"
"sort"
"time"
"github.com/golang/glog"
@ -92,13 +93,13 @@ func (jm *CronJobController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Infof("Starting CronJob Manager")
// Check things every 10 second.
go wait.Until(jm.SyncAll, 10*time.Second, stopCh)
go wait.Until(jm.syncAll, 10*time.Second, stopCh)
<-stopCh
glog.Infof("Shutting down CronJob Manager")
}
// SyncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *CronJobController) SyncAll() {
// syncAll lists all the CronJobs and Jobs and reconciles them.
func (jm *CronJobController) syncAll() {
sjl, err := jm.kubeClient.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{})
if err != nil {
glog.Errorf("Error listing cronjobs: %v", err)
@ -119,24 +120,86 @@ func (jm *CronJobController) SyncAll() {
glog.V(4).Infof("Found %d groups", len(jobsBySj))
for _, sj := range sjs {
SyncOne(sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
syncOne(&sj, jobsBySj[sj.UID], time.Now(), jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
cleanupFinishedJobs(&sj, jobsBySj[sj.UID], jm.jobControl, jm.sjControl, jm.podControl, jm.recorder)
}
}
// SyncOne reconciles a CronJob with a list of any Jobs that it created.
// cleanupFinishedJobs cleanups finished jobs created by a CronJob
func cleanupFinishedJobs(sj *batch.CronJob, js []batch.Job, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
// If neither limits are active, there is no need to do anything.
if sj.Spec.FailedJobsHistoryLimit == nil && sj.Spec.SuccessfulJobsHistoryLimit == nil {
return
}
failedJobs := []batch.Job{}
succesfulJobs := []batch.Job{}
for _, job := range js {
isFinished, finishedStatus := getFinishedStatus(&job)
if isFinished && finishedStatus == batch.JobComplete {
succesfulJobs = append(succesfulJobs, job)
} else if isFinished && finishedStatus == batch.JobFailed {
failedJobs = append(failedJobs, job)
}
}
if sj.Spec.SuccessfulJobsHistoryLimit != nil {
removeOldestJobs(sj,
succesfulJobs,
jc,
pc,
*sj.Spec.SuccessfulJobsHistoryLimit,
recorder)
}
if sj.Spec.FailedJobsHistoryLimit != nil {
removeOldestJobs(sj,
failedJobs,
jc,
pc,
*sj.Spec.FailedJobsHistoryLimit,
recorder)
}
// Update the CronJob, in case jobs were removed from the list.
if _, err := sjc.UpdateStatus(sj); err != nil {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
}
}
// removeOldestJobs removes the oldest jobs from a list of jobs
func removeOldestJobs(sj *batch.CronJob, js []batch.Job, jc jobControlInterface, pc podControlInterface, maxJobs int32, recorder record.EventRecorder) {
numToDelete := len(js) - int(maxJobs)
if numToDelete <= 0 {
return
}
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
glog.V(4).Infof("Cleaning up %d/%d jobs from %s", numToDelete, len(js), nameForLog)
sort.Sort(byJobStartTime(js))
for i := 0; i < numToDelete; i++ {
glog.V(4).Infof("Removing job %s from %s", js[i].Name, nameForLog)
deleteJob(sj, &js[i], jc, pc, recorder, "history limit reached")
}
}
// syncOne reconciles a CronJob with a list of any Jobs that it created.
// All known jobs created by "sj" should be included in "js".
// The current time is passed in to facilitate testing.
// It has no receiver, to facilitate testing.
func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
func syncOne(sj *batch.CronJob, js []batch.Job, now time.Time, jc jobControlInterface, sjc sjControlInterface, pc podControlInterface, recorder record.EventRecorder) {
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
childrenJobs := make(map[types.UID]bool)
for i := range js {
j := js[i]
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(sj, j.ObjectMeta.UID)
found := inActiveList(*sj, j.ObjectMeta.UID)
if !found && !IsJobFinished(&j) {
recorder.Eventf(&sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
recorder.Eventf(sj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %v", j.Name)
// We found an unfinished job that has us as the parent, but it is not in our Active list.
// This could happen if we crashed right after creating the Job and before updating the status,
// or if our jobs list is newer than our sj status after a relist, or if someone intentionally created
@ -148,9 +211,9 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
// in the same namespace "adopt" that job. ReplicaSets and their Pods work the same way.
// TBS: how to update sj.Status.LastScheduleTime if the adopted job is newer than any we knew about?
} else if found && IsJobFinished(&j) {
deleteFromActiveList(&sj, j.ObjectMeta.UID)
deleteFromActiveList(sj, j.ObjectMeta.UID)
// TODO: event to call out failure vs success.
recorder.Eventf(&sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
recorder.Eventf(sj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %v", j.Name)
}
}
@ -159,25 +222,25 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
// job running.
for _, j := range sj.Status.Active {
if found := childrenJobs[j.UID]; !found {
recorder.Eventf(&sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(&sj, j.UID)
recorder.Eventf(sj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name)
deleteFromActiveList(sj, j.UID)
}
}
updatedSJ, err := sjc.UpdateStatus(&sj)
updatedSJ, err := sjc.UpdateStatus(sj)
if err != nil {
glog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
return
}
sj = *updatedSJ
*sj = *updatedSJ
if sj.Spec.Suspend != nil && *sj.Spec.Suspend {
glog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog)
return
}
times, err := getRecentUnmetScheduleTimes(sj, now)
times, err := getRecentUnmetScheduleTimes(*sj, now)
if err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
recorder.Eventf(sj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err)
glog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err)
}
// TODO: handle multiple unmet start times, from oldest to newest, updating status as needed.
@ -224,73 +287,37 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
// TODO: this should be replaced with server side job deletion
// currently this mimics JobReaper from pkg/kubectl/stop.go
glog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog)
job, err := jc.GetJob(j.Namespace, j.Name)
if err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
recorder.Eventf(sj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err)
return
}
// scale job down to 0
if *job.Spec.Parallelism != 0 {
zero := int32(0)
job.Spec.Parallelism = &zero
job, err = jc.UpdateJob(job.Namespace, job)
if err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
return
}
}
// remove all pods...
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := pc.ListPods(job.Namespace, options)
if err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err)
}
errList := []error{}
for _, pod := range podList.Items {
glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name)
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) != 0 {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
if !deleteJob(sj, job, jc, pc, recorder, "") {
return
}
// ... the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return
}
// ... and its reference from active list
deleteFromActiveList(&sj, job.ObjectMeta.UID)
recorder.Eventf(&sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", j.Name)
}
}
jobReq, err := getJobFromTemplate(&sj, scheduledTime)
jobReq, err := getJobFromTemplate(sj, scheduledTime)
if err != nil {
glog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err)
return
}
jobResp, err := jc.CreateJob(sj.Namespace, jobReq)
if err != nil {
recorder.Eventf(&sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
recorder.Eventf(sj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err)
return
}
glog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog)
recorder.Eventf(&sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name)
// ------------------------------------------------------------------ //
// If this process restarts at this point (after posting a job, but
// before updating the status), then we might try to start the job on
// the next time. Actually, if we relist the SJs and Jobs on the next
// iteration of SyncAll, we might not see our own status update, and
// iteration of syncAll, we might not see our own status update, and
// then post one again. So, we need to use the job name as a lock to
// prevent us from making the job twice (name the job with hash of its
// scheduled time).
@ -303,13 +330,64 @@ func SyncOne(sj batch.CronJob, js []batch.Job, now time.Time, jc jobControlInter
sj.Status.Active = append(sj.Status.Active, *ref)
}
sj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime}
if _, err := sjc.UpdateStatus(&sj); err != nil {
if _, err := sjc.UpdateStatus(sj); err != nil {
glog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, sj.ResourceVersion, err)
}
return
}
// deleteJob reaps a job, deleting the job, the pobs and the reference in the active list
func deleteJob(sj *batch.CronJob, job *batch.Job, jc jobControlInterface, pc podControlInterface, recorder record.EventRecorder, reason string) bool {
// TODO: this should be replaced with server side job deletion
// currencontinuetly this mimics JobReaper from pkg/kubectl/stop.go
nameForLog := fmt.Sprintf("%s/%s", sj.Namespace, sj.Name)
var err error
// scale job down to 0
if *job.Spec.Parallelism != 0 {
zero := int32(0)
job.Spec.Parallelism = &zero
job, err = jc.UpdateJob(job.Namespace, job)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedUpdate", "Update job: %v", err)
return false
}
}
// remove all pods...
selector, _ := metav1.LabelSelectorAsSelector(job.Spec.Selector)
options := metav1.ListOptions{LabelSelector: selector.String()}
podList, err := pc.ListPods(job.Namespace, options)
if err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedList", "List job-pods: %v", err)
}
errList := []error{}
for _, pod := range podList.Items {
glog.V(2).Infof("CronJob controller is deleting Pod %v/%v", pod.Namespace, pod.Name)
if err := pc.DeletePod(pod.Namespace, pod.Name); err != nil {
// ignores the error when the pod isn't found
if !errors.IsNotFound(err) {
errList = append(errList, err)
}
}
}
if len(errList) != 0 {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job-pods: %v", utilerrors.NewAggregate(errList))
return false
}
// ... the job itself...
if err := jc.DeleteJob(job.Namespace, job.Name); err != nil {
recorder.Eventf(sj, v1.EventTypeWarning, "FailedDelete", "Deleted job: %v", err)
glog.Errorf("Error deleting job %s from %s: %v", job.Name, nameForLog, err)
return false
}
// ... and its reference from active list
deleteFromActiveList(sj, job.ObjectMeta.UID)
recorder.Eventf(sj, v1.EventTypeNormal, "SuccessfulDelete", "Deleted job %v", job.Name)
return true
}
func getRef(object runtime.Object) (*v1.ObjectReference, error) {
return v1.GetReference(api.Scheme, object)
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package cronjob
import (
"sort"
"strconv"
"strings"
"testing"
"time"
@ -81,6 +83,14 @@ func justAfterThePriorHour() time.Time {
return T1
}
func startTimeStringToTime(startTime string) time.Time {
T1, err := time.Parse(time.RFC3339, startTime)
if err != nil {
panic("test setup error")
}
return T1
}
// returns a cronJob with some fields filled in.
func cronJob() batch.CronJob {
return batch.CronJob{
@ -270,7 +280,7 @@ func TestSyncOne_RunOrNot(t *testing.T) {
pc := &fakePodControl{}
recorder := record.NewFakeRecorder(10)
SyncOne(sj, js, tc.now, jc, sjc, pc, recorder)
syncOne(&sj, js, tc.now, jc, sjc, pc, recorder)
expectedCreates := 0
if tc.expectCreate {
expectedCreates = 1
@ -320,10 +330,237 @@ func TestSyncOne_RunOrNot(t *testing.T) {
}
}
type CleanupJobSpec struct {
StartTime string
IsFinished bool
IsSuccessful bool
ExpectDelete bool
IsStillInActiveList bool // only when IsFinished is set
}
func TestCleanupFinishedJobs_DeleteOrNot(t *testing.T) {
limitThree := int32(3)
limitTwo := int32(2)
limitOne := int32(1)
limitZero := int32(0)
// Starting times are assumed to be sorted by increasing start time
// in all the test cases
testCases := map[string]struct {
jobSpecs []CleanupJobSpec
now time.Time
successfulJobsHistoryLimit *int32
failedJobsHistoryLimit *int32
expectActive int
}{
"success. job limit reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, T, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitTwo, &limitOne, 1},
"success. jobs not processed by Sync yet": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, T, T, T},
{"2016-05-19T06:00:00Z", T, T, F, T},
{"2016-05-19T07:00:00Z", T, T, F, T},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, T},
}, justBeforeTheHour(), &limitTwo, &limitOne, 4},
"failed job limit reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, F, T, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitTwo, &limitTwo, 0},
"success. job limit set to zero": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, T, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, T, F},
{"2016-05-19T07:00:00Z", T, T, T, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitOne, 1},
"failed job limit set to zero": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, T, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", T, F, T, F},
}, justBeforeTheHour(), &limitThree, &limitZero, 1},
"no limits reached": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitThree, &limitThree, 0},
// This test case should trigger the short-circuit
"limits disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), nil, nil, 0},
"success limit disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), nil, &limitThree, 0},
"failure limit disabled": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", T, T, F, F},
{"2016-05-19T05:00:00Z", T, F, F, F},
{"2016-05-19T06:00:00Z", T, T, F, F},
{"2016-05-19T07:00:00Z", T, T, F, F},
{"2016-05-19T08:00:00Z", T, F, F, F},
{"2016-05-19T09:00:00Z", T, F, F, F},
}, justBeforeTheHour(), &limitThree, nil, 0},
"no limits reached because still active": {
[]CleanupJobSpec{
{"2016-05-19T04:00:00Z", F, F, F, F},
{"2016-05-19T05:00:00Z", F, F, F, F},
{"2016-05-19T06:00:00Z", F, F, F, F},
{"2016-05-19T07:00:00Z", F, F, F, F},
{"2016-05-19T08:00:00Z", F, F, F, F},
{"2016-05-19T09:00:00Z", F, F, F, F},
}, justBeforeTheHour(), &limitZero, &limitZero, 6},
}
for name, tc := range testCases {
sj := cronJob()
suspend := false
sj.Spec.ConcurrencyPolicy = f
sj.Spec.Suspend = &suspend
sj.Spec.Schedule = onTheHour
sj.Spec.SuccessfulJobsHistoryLimit = tc.successfulJobsHistoryLimit
sj.Spec.FailedJobsHistoryLimit = tc.failedJobsHistoryLimit
var (
job *batch.Job
err error
)
// Set consistent timestamps for the CronJob
if len(tc.jobSpecs) != 0 {
firstTime := startTimeStringToTime(tc.jobSpecs[0].StartTime)
lastTime := startTimeStringToTime(tc.jobSpecs[len(tc.jobSpecs)-1].StartTime)
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: firstTime}
sj.Status.LastScheduleTime = &metav1.Time{Time: lastTime}
} else {
sj.ObjectMeta.CreationTimestamp = metav1.Time{Time: justBeforeTheHour()}
}
// Create jobs
js := []batch.Job{}
jobsToDelete := []string{}
sj.Status.Active = []v1.ObjectReference{}
for i, spec := range tc.jobSpecs {
job, err = getJobFromTemplate(&sj, startTimeStringToTime(spec.StartTime))
if err != nil {
t.Fatalf("%s: unexpected error creating a job from template: %v", name, err)
}
job.UID = types.UID(strconv.Itoa(i))
job.Namespace = ""
if spec.IsFinished {
var conditionType batch.JobConditionType
if spec.IsSuccessful {
conditionType = batch.JobComplete
} else {
conditionType = batch.JobFailed
}
condition := batch.JobCondition{Type: conditionType, Status: v1.ConditionTrue}
job.Status.Conditions = append(job.Status.Conditions, condition)
if spec.IsStillInActiveList {
sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID})
}
} else {
if spec.IsSuccessful || spec.IsStillInActiveList {
t.Errorf("%s: test setup error: this case makes no sense", name)
}
sj.Status.Active = append(sj.Status.Active, v1.ObjectReference{UID: job.UID})
}
js = append(js, *job)
if spec.ExpectDelete {
jobsToDelete = append(jobsToDelete, job.Name)
}
}
jc := &fakeJobControl{Job: job}
pc := &fakePodControl{}
sjc := &fakeSJControl{}
recorder := record.NewFakeRecorder(10)
cleanupFinishedJobs(&sj, js, jc, sjc, pc, recorder)
// Check we have actually deleted the correct jobs
if len(jc.DeleteJobName) != len(jobsToDelete) {
t.Errorf("%s: expected %d job deleted, actually %d", name, len(jobsToDelete), len(jc.DeleteJobName))
} else {
sort.Strings(jobsToDelete)
sort.Strings(jc.DeleteJobName)
for i, expectedJobName := range jobsToDelete {
if expectedJobName != jc.DeleteJobName[i] {
t.Errorf("%s: expected job %s deleted, actually %v -- %v vs %v", name, expectedJobName, jc.DeleteJobName[i], jc.DeleteJobName, jobsToDelete)
}
}
}
// Check for events
expectedEvents := len(jobsToDelete)
if len(recorder.Events) != expectedEvents {
t.Errorf("%s: expected %d event, actually %v", name, expectedEvents, len(recorder.Events))
}
// Check for jobs still in active list
numActive := 0
if len(sjc.Updates) != 0 {
numActive = len(sjc.Updates[len(sjc.Updates)-1].Status.Active)
}
if tc.expectActive != numActive {
t.Errorf("%s: expected Active size %d, got %d", name, tc.expectActive, numActive)
}
}
}
// TODO: simulation where the controller randomly doesn't run, and randomly has errors starting jobs or deleting jobs,
// but over time, all jobs run as expected (assuming Allow and no deadline).
// TestSyncOne_Status tests sj.UpdateStatus in SyncOne
// TestSyncOne_Status tests sj.UpdateStatus in syncOne
func TestSyncOne_Status(t *testing.T) {
finishedJob := newJob("1")
finishedJob.Status.Conditions = append(finishedJob.Status.Conditions, batch.JobCondition{Type: batch.JobComplete, Status: v1.ConditionTrue})
@ -443,7 +680,7 @@ func TestSyncOne_Status(t *testing.T) {
recorder := record.NewFakeRecorder(10)
// Run the code
SyncOne(sj, jobs, tc.now, jc, sjc, pc, recorder)
syncOne(&sj, jobs, tc.now, jc, sjc, pc, recorder)
// Status update happens once when ranging through job list, and another one if create jobs.
expectUpdates := 1

View File

@ -234,11 +234,34 @@ func makeCreatedByRefJson(object runtime.Object) (string, error) {
return string(createdByRefJson), nil
}
func IsJobFinished(j *batch.Job) bool {
func getFinishedStatus(j *batch.Job) (bool, batch.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true
return true, c.Type
}
}
return false
return false, ""
}
func IsJobFinished(j *batch.Job) bool {
isFinished, _ := getFinishedStatus(j)
return isFinished
}
// byJobStartTime sorts a list of jobs by start timestamp, using their names as a tie breaker.
type byJobStartTime []batch.Job
func (o byJobStartTime) Len() int { return len(o) }
func (o byJobStartTime) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o byJobStartTime) Less(i, j int) bool {
if o[j].Status.StartTime == nil {
return o[i].Status.StartTime != nil
}
if (*o[i].Status.StartTime).Equal(*o[j].Status.StartTime) {
return o[i].Name < o[j].Name
}
return (*o[i].Status.StartTime).Before(*o[j].Status.StartTime)
}

View File

@ -52,6 +52,11 @@ var (
var _ = framework.KubeDescribe("CronJob", func() {
f := framework.NewDefaultGroupVersionFramework("cronjob", BatchV2Alpha1GroupVersion)
sleepCommand := []string{"sleep", "300"}
// Pod will complete instantly
successCommand := []string{"/bin/true"}
BeforeEach(func() {
framework.SkipIfMissingResource(f.ClientPool, CronJobGroupVersionResource, f.Namespace.Name)
})
@ -59,7 +64,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// multiple jobs running at once
It("should schedule multiple jobs concurrently", func() {
By("Creating a cronjob")
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, true)
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -70,7 +76,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring at least two running jobs exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(len(activeJobs) >= 2).To(BeTrue())
By("Removing cronjob")
@ -81,7 +87,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// suspended should not schedule jobs
It("should not schedule jobs when suspended [Slow]", func() {
By("Creating a suspended cronjob")
cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent, true)
cronJob := newTestCronJob("suspended", "*/1 * * * ?", batch.AllowConcurrent,
sleepCommand, nil)
cronJob.Spec.Suspend = newBool(true)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -103,7 +110,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// only single active job is allowed for ForbidConcurrent
It("should not schedule new jobs when ForbidConcurrent [Slow]", func() {
By("Creating a ForbidConcurrent cronjob")
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -119,7 +127,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring exaclty one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(activeJobs).To(HaveLen(1))
By("Ensuring no more jobs are scheduled")
@ -134,7 +142,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// only single active job is allowed for ReplaceConcurrent
It("should replace jobs when ReplaceConcurrent", func() {
By("Creating a ReplaceConcurrent cronjob")
cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent, true)
cronJob := newTestCronJob("replace", "*/1 * * * ?", batch.ReplaceConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -150,7 +159,7 @@ var _ = framework.KubeDescribe("CronJob", func() {
By("Ensuring exaclty one running job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
activeJobs := filterActiveJobs(jobs)
activeJobs, _ := filterActiveJobs(jobs)
Expect(activeJobs).To(HaveLen(1))
By("Ensuring the job is replaced with a new one")
@ -165,7 +174,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// shouldn't give us unexpected warnings
It("should not emit unexpected warnings", func() {
By("Creating a cronjob")
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent, false)
cronJob := newTestCronJob("concurrent", "*/1 * * * ?", batch.AllowConcurrent,
nil, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -187,7 +197,8 @@ var _ = framework.KubeDescribe("CronJob", func() {
// deleted jobs should be removed from the active list
It("should remove from active list jobs that have been deleted", func() {
By("Creating a ForbidConcurrent cronjob")
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent, true)
cronJob := newTestCronJob("forbid", "*/1 * * * ?", batch.ForbidConcurrent,
sleepCommand, nil)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
@ -225,10 +236,49 @@ var _ = framework.KubeDescribe("CronJob", func() {
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
// cleanup of successful finished jobs, with limit of one successful job
It("should delete successful finished jobs with limit of one successful job", func() {
By("Creating a AllowConcurrent cronjob with custom history limits")
successLimit := int32(1)
cronJob := newTestCronJob("concurrent-limit", "*/1 * * * ?", batch.AllowConcurrent,
successCommand, &successLimit)
cronJob, err := createCronJob(f.ClientSet, f.Namespace.Name, cronJob)
Expect(err).NotTo(HaveOccurred())
// Job is going to complete instantly: do not check for an active job
// as we are most likely to miss it
By("Ensuring a finished job exists")
err = waitForAnyFinishedJob(f.ClientSet, f.Namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Ensuring a finished job exists by listing jobs explicitly")
jobs, err := f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
_, finishedJobs := filterActiveJobs(jobs)
Expect(len(finishedJobs) == 1).To(BeTrue())
// Job should get deleted when the next job finishes the next minute
By("Ensuring this job does not exist anymore")
err = waitForJobNotExist(f.ClientSet, f.Namespace.Name, finishedJobs[0])
Expect(err).NotTo(HaveOccurred())
By("Ensuring there is 1 finished job by listing jobs explicitly")
jobs, err = f.ClientSet.Batch().Jobs(f.Namespace.Name).List(metav1.ListOptions{})
Expect(err).NotTo(HaveOccurred())
_, finishedJobs = filterActiveJobs(jobs)
Expect(len(finishedJobs) == 1).To(BeTrue())
By("Removing cronjob")
err = deleteCronJob(f.ClientSet, f.Namespace.Name, cronJob.Name)
Expect(err).NotTo(HaveOccurred())
})
})
// newTestCronJob returns a cronjob which does one of several testing behaviors.
func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, sleep bool) *batch.CronJob {
func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPolicy, command []string,
successfulJobsHistoryLimit *int32) *batch.CronJob {
parallelism := int32(1)
completions := int32(1)
sj := &batch.CronJob{
@ -271,8 +321,9 @@ func newTestCronJob(name, schedule string, concurrencyPolicy batch.ConcurrencyPo
},
},
}
if sleep {
sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "300"}
sj.Spec.SuccessfulJobsHistoryLimit = successfulJobsHistoryLimit
if command != nil {
sj.Spec.JobTemplate.Spec.Template.Spec.Containers[0].Command = command
}
return sj
}
@ -319,6 +370,23 @@ func waitForNoJobs(c clientset.Interface, ns, jobName string, failIfNonEmpty boo
})
}
// Wait for a job to not exist by listing jobs explicitly.
func waitForJobNotExist(c clientset.Interface, ns string, targetJob *batchv1.Job) error {
return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
jobs, err := c.Batch().Jobs(ns).List(metav1.ListOptions{})
if err != nil {
return false, err
}
_, finishedJobs := filterActiveJobs(jobs)
for _, job := range finishedJobs {
if targetJob.Namespace == job.Namespace && targetJob.Name == job.Name {
return false, nil
}
}
return true, nil
})
}
// Wait for a job to be replaced with a new one.
func waitForJobReplaced(c clientset.Interface, ns, previousJobName string) error {
return wait.Poll(framework.Poll, cronJobTimeout, func() (bool, error) {
@ -383,11 +451,13 @@ func checkNoEventWithReason(c clientset.Interface, ns, cronJobName string, reaso
return nil
}
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job) {
func filterActiveJobs(jobs *batchv1.JobList) (active []*batchv1.Job, finished []*batchv1.Job) {
for i := range jobs.Items {
j := jobs.Items[i]
if !job.IsJobFinished(&j) {
active = append(active, &j)
} else {
finished = append(finished, &j)
}
}
return