Merge pull request #51153 from clamoriniere1A/feature/job_failure_policy_controller

Automatic merge from submit-queue

Job failure policy controller support

**What this PR does / why we need it**:
Start implementing the support of the "Backoff policy and failed pod limit" in the ```JobController```  defined in https://github.com/kubernetes/community/pull/583.
This PR depends on a previous PR #48075  that updates the K8s API types.

TODO: 
* [X] Implement ```JobSpec.BackoffLimit``` support
* [x] Rebase when #48075 has been merged.
* [X] Implement end2end tests



implements https://github.com/kubernetes/community/pull/583

**Special notes for your reviewer**:

**Release note**:
```release-note
Add backoff policy and failed pod limit for a job
```
This commit is contained in:
Kubernetes Submit Queue 2017-09-03 13:13:17 -07:00 committed by GitHub
commit b63abc9fdd
6 changed files with 244 additions and 137 deletions

View File

@ -18,6 +18,7 @@ package job
import (
"fmt"
"math"
"reflect"
"sort"
"sync"
@ -50,6 +51,13 @@ import (
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
const (
// DefaultJobBackOff is the max backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
MaxJobBackOff = 360 * time.Second
)
type JobController struct {
kubeClient clientset.Interface
podControl controller.PodControlInterface
@ -96,7 +104,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
},
expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "job"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
}
@ -118,6 +126,7 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin
jm.updateHandler = jm.updateJobStatus
jm.syncHandler = jm.syncJob
return jm
}
@ -312,7 +321,7 @@ func (jm *JobController) updateJob(old, cur interface{}) {
if err != nil {
return
}
jm.queue.Add(key)
jm.enqueueController(curJob)
// check if need to add a new rsync for ActiveDeadlineSeconds
if curJob.Status.StartTime != nil {
curADS := curJob.Spec.ActiveDeadlineSeconds
@ -333,20 +342,23 @@ func (jm *JobController) updateJob(old, cur interface{}) {
}
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
func (jm *JobController) enqueueController(obj interface{}) {
key, err := controller.KeyFunc(obj)
func (jm *JobController) enqueueController(job interface{}) {
key, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err))
return
}
// Retrieves the backoff duration for this Job
backoff := getBackoff(jm.queue, key)
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
// deterministically avoid syncing controllers that fight over pods. Currently, we only
// ensure that the same controller is synced for a given pod. When we periodically relist
// all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them.
jm.queue.Add(key)
jm.queue.AddAfter(key, backoff)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -432,6 +444,15 @@ func (jm *JobController) syncJob(key string) error {
}
job := *sharedJob
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
jm.queue.Forget(key)
return nil
}
// retrieve the previous number of retry
previousRetry := jm.queue.NumRequeues(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist.
@ -457,34 +478,28 @@ func (jm *JobController) syncJob(key string) error {
jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second)
}
}
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
return nil
}
var manageJobErr error
if pastActiveDeadline(&job) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
errCh := make(chan error, int(active))
wait.Add(int(active))
for i := int32(0); i < active; i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, &job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", activePods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()
jobFailed := false
var failureReason string
var failureMessage string
jobHaveNewFailure := failed > job.Status.Failed
// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) {
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reach the specified backoff limit"
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
}
if jobFailed {
errCh := make(chan error, active)
jm.deleteJobPods(&job, activePods, errCh)
select {
case manageJobErr = <-errCh:
if manageJobErr != nil {
@ -496,8 +511,8 @@ func (jm *JobController) syncJob(key string) error {
// update status values accordingly
failed += active
active = 0
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
jm.recorder.Event(&job, v1.EventTypeNormal, "DeadlineExceeded", "Job was active longer than specified deadline")
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage))
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage)
} else {
if jobNeedsSync && job.DeletionTimestamp == nil {
active, manageJobErr = jm.manageJob(activePods, succeeded, &job)
@ -546,9 +561,41 @@ func (jm *JobController) syncJob(key string) error {
return err
}
}
if jobHaveNewFailure {
// re-enqueue Job after the backoff period
jm.queue.AddRateLimited(key)
} else {
// if no new Failure the job backoff period can be reset
jm.queue.Forget(key)
}
return manageJobErr
}
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
// TODO: below code should be replaced with pod termination resulting in
// pod failures, rather than killing pods. Unfortunately none such solution
// exists ATM. There's an open discussion in the topic in
// https://github.com/kubernetes/kubernetes/issues/14602 which might give
// some sort of solution to above problem.
// kill remaining active pods
wait := sync.WaitGroup{}
nbPods := len(pods)
wait.Add(nbPods)
for i := int32(0); i < int32(nbPods); i++ {
go func(ix int32) {
defer wait.Done()
if err := jm.podControl.DeletePod(job.Namespace, pods[ix].Name, job); err != nil {
defer utilruntime.HandleError(err)
glog.V(2).Infof("Failed to delete %v, job %q/%q deadline exceeded", pods[ix].Name, job.Namespace, job.Name)
errCh <- err
}
}(i)
}
wait.Wait()
}
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {
@ -726,6 +773,26 @@ func (jm *JobController) updateJobStatus(job *batch.Job) error {
return err
}
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
exp := queue.NumRequeues(key)
if exp <= 0 {
return time.Duration(0)
}
// The backoff is capped such that 'calculated' value never overflows.
backoff := float64(DefaultJobBackOff.Nanoseconds()) * math.Pow(2, float64(exp-1))
if backoff > math.MaxInt64 {
return MaxJobBackOff
}
calculated := time.Duration(backoff)
if calculated > MaxJobBackOff {
return MaxJobBackOff
}
return calculated
}
// filterPods returns pods based on their phase.
func filterPods(pods []*v1.Pod, phase v1.PodPhase) int {
result := 0

View File

@ -43,7 +43,7 @@ import (
var alwaysReady = func() bool { return true }
func newJob(parallelism, completions int32) *batch.Job {
func newJob(parallelism, completions, backoffLimit int32) *batch.Job {
j := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
@ -80,6 +80,8 @@ func newJob(parallelism, completions int32) *batch.Job {
} else {
j.Spec.Parallelism = nil
}
j.Spec.BackoffLimit = &backoffLimit
return j
}
@ -119,12 +121,16 @@ func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod {
}
func TestControllerSyncJob(t *testing.T) {
jobConditionComplete := batch.JobComplete
jobConditionFailed := batch.JobFailed
testCases := map[string]struct {
// job setup
parallelism int32
completions int32
deleting bool
podLimit int
parallelism int32
completions int32
backoffLimit int32
deleting bool
podLimit int
// pod setup
podControllerError error
@ -134,107 +140,113 @@ func TestControllerSyncJob(t *testing.T) {
failedPods int32
// expectations
expectedCreations int32
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedComplete bool
expectedCreations int32
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedCondition *batch.JobConditionType
expectedConditionReason string
}{
"job start": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 0, 0, 0,
2, 0, 2, 0, 0, false,
2, 0, 2, 0, 0, nil, "",
},
"WQ job start": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 0, 0, 0,
2, 0, 2, 0, 0, false,
2, 0, 2, 0, 0, nil, "",
},
"pending pods": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 2, 0, 0, 0,
0, 0, 2, 0, 0, false,
0, 0, 2, 0, 0, nil, "",
},
"correct # of pods": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 2, 0, 0,
0, 0, 2, 0, 0, false,
0, 0, 2, 0, 0, nil, "",
},
"WQ job: correct # of pods": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 2, 0, 0,
0, 0, 2, 0, 0, false,
0, 0, 2, 0, 0, nil, "",
},
"too few active pods": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 1, 1, 0,
1, 0, 2, 1, 0, false,
1, 0, 2, 1, 0, nil, "",
},
"too few active pods with a dynamic job": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 1, 0, 0,
1, 0, 2, 0, 0, false,
1, 0, 2, 0, 0, nil, "",
},
"too few active pods, with controller error": {
2, 5, false, 0,
2, 5, 6, false, 0,
fmt.Errorf("Fake error"), 0, 1, 1, 0,
1, 0, 1, 1, 0, false,
1, 0, 1, 1, 0, nil, "",
},
"too many active pods": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 3, 0, 0,
0, 1, 2, 0, 0, false,
0, 1, 2, 0, 0, nil, "",
},
"too many active pods, with controller error": {
2, 5, false, 0,
2, 5, 6, false, 0,
fmt.Errorf("Fake error"), 0, 3, 0, 0,
0, 1, 3, 0, 0, false,
0, 1, 3, 0, 0, nil, "",
},
"failed pod": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 1, 1, 1,
1, 0, 2, 1, 1, false,
1, 0, 2, 1, 1, nil, "",
},
"job finish": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 0, 5, 0,
0, 0, 0, 5, 0, true,
0, 0, 0, 5, 0, nil, "",
},
"WQ job finishing": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 1, 1, 0,
0, 0, 1, 1, 0, false,
0, 0, 1, 1, 0, nil, "",
},
"WQ job all finished": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 0, 2, 0,
0, 0, 0, 2, 0, true,
0, 0, 0, 2, 0, &jobConditionComplete, "",
},
"WQ job all finished despite one failure": {
2, -1, false, 0,
2, -1, 6, false, 0,
nil, 0, 0, 1, 1,
0, 0, 0, 1, 1, true,
0, 0, 0, 1, 1, &jobConditionComplete, "",
},
"more active pods than completions": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 10, 0, 0,
0, 8, 2, 0, 0, false,
0, 8, 2, 0, 0, nil, "",
},
"status change": {
2, 5, false, 0,
2, 5, 6, false, 0,
nil, 0, 2, 2, 0,
0, 0, 2, 2, 0, false,
0, 0, 2, 2, 0, nil, "",
},
"deleting job": {
2, 5, true, 0,
2, 5, 6, true, 0,
nil, 1, 1, 1, 0,
0, 0, 2, 1, 0, false,
0, 0, 2, 1, 0, nil, "",
},
"limited pods": {
100, 200, false, 10,
100, 200, 6, false, 10,
nil, 0, 0, 0, 0,
10, 0, 10, 0, 0, false,
10, 0, 10, 0, 0, nil, "",
},
"to many job sync failure": {
2, 5, 0, true, 0,
nil, 0, 0, 0, 1,
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
}
@ -253,7 +265,7 @@ func TestControllerSyncJob(t *testing.T) {
}
// job & pods setup
job := newJob(tc.parallelism, tc.completions)
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
if tc.deleting {
now := metav1.Now()
job.DeletionTimestamp = &now
@ -330,7 +342,7 @@ func TestControllerSyncJob(t *testing.T) {
t.Errorf("%s: .status.startTime was not set", name)
}
// validate conditions
if tc.expectedComplete && !getCondition(actual, batch.JobComplete) {
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("%s: expected completion condition. Got %#v", name, actual.Status.Conditions)
}
// validate slow start
@ -351,6 +363,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
completions int32
activeDeadlineSeconds int64
startTime int64
backoffLimit int32
// pod setup
activePods int32
@ -358,25 +371,31 @@ func TestSyncJobPastDeadline(t *testing.T) {
failedPods int32
// expectations
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedConditionReason string
}{
"activeDeadlineSeconds less than single pod execution": {
1, 1, 10, 15,
1, 1, 10, 15, 6,
1, 0, 0,
1, 0, 0, 1,
1, 0, 0, 1, "DeadlineExceeded",
},
"activeDeadlineSeconds bigger than single pod execution": {
1, 2, 10, 15,
1, 2, 10, 15, 6,
1, 1, 0,
1, 0, 1, 1,
1, 0, 1, 1, "DeadlineExceeded",
},
"activeDeadlineSeconds times-out before any pod starts": {
1, 1, 10, 10,
1, 1, 10, 10, 6,
0, 0, 0,
0, 0, 0, 0,
0, 0, 0, 0, "DeadlineExceeded",
},
"activeDeadlineSeconds with backofflimit reach": {
1, 1, 1, 10, 0,
1, 0, 2,
1, 0, 0, 3, "BackoffLimitExceeded",
},
}
@ -395,7 +414,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
}
// job & pods setup
job := newJob(tc.parallelism, tc.completions)
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
job.Spec.ActiveDeadlineSeconds = &tc.activeDeadlineSeconds
start := metav1.Unix(metav1.Now().Time.Unix()-tc.startTime, 0)
job.Status.StartTime = &start
@ -438,15 +457,15 @@ func TestSyncJobPastDeadline(t *testing.T) {
t.Errorf("%s: .status.startTime was not set", name)
}
// validate conditions
if !getCondition(actual, batch.JobFailed) {
if !getCondition(actual, batch.JobFailed, tc.expectedConditionReason) {
t.Errorf("%s: expected fail condition. Got %#v", name, actual.Status.Conditions)
}
}
}
func getCondition(job *batch.Job, condition batch.JobConditionType) bool {
func getCondition(job *batch.Job, condition batch.JobConditionType, reason string) bool {
for _, v := range job.Status.Conditions {
if v.Type == condition && v.Status == v1.ConditionTrue {
if v.Type == condition && v.Status == v1.ConditionTrue && v.Reason == reason {
return true
}
}
@ -466,7 +485,7 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
return nil
}
job := newJob(1, 1)
job := newJob(1, 1, 6)
activeDeadlineSeconds := int64(10)
job.Spec.ActiveDeadlineSeconds = &activeDeadlineSeconds
start := metav1.Unix(metav1.Now().Time.Unix()-15, 0)
@ -496,7 +515,7 @@ func TestSyncJobComplete(t *testing.T) {
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
job := newJob(1, 1)
job := newJob(1, 1, 6)
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t))
@ -521,7 +540,7 @@ func TestSyncJobDeleted(t *testing.T) {
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2)
job := newJob(2, 2, 6)
err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
@ -546,7 +565,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
manager.queue.AddRateLimited(getKey(job, t))
return updateError
}
job := newJob(2, 2)
job := newJob(2, 2, 6)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t))
if err == nil || err != updateError {
@ -659,9 +678,9 @@ func TestGetPodsForJob(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -700,7 +719,7 @@ func TestGetPodsForJob(t *testing.T) {
}
func TestGetPodsForJobAdopt(t *testing.T) {
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
clientset := fake.NewSimpleClientset(job1)
jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
@ -726,7 +745,7 @@ func TestGetPodsForJobAdopt(t *testing.T) {
}
func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job1.DeletionTimestamp = &metav1.Time{}
clientset := fake.NewSimpleClientset(job1)
@ -756,7 +775,7 @@ func TestGetPodsForJobNoAdoptIfBeingDeleted(t *testing.T) {
}
func TestGetPodsForJobNoAdoptIfBeingDeletedRace(t *testing.T) {
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
// The up-to-date object says it's being deleted.
job1.DeletionTimestamp = &metav1.Time{}
@ -795,7 +814,7 @@ func TestGetPodsForJobRelease(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -824,9 +843,9 @@ func TestAddPod(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -869,11 +888,11 @@ func TestAddPodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
job3 := newJob(1, 1)
job3 := newJob(1, 1, 6)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -897,9 +916,9 @@ func TestUpdatePod(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -946,9 +965,9 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -973,9 +992,9 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -999,9 +1018,9 @@ func TestUpdatePodRelease(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1025,9 +1044,9 @@ func TestDeletePod(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job2)
@ -1070,11 +1089,11 @@ func TestDeletePodOrphan(t *testing.T) {
jm.podStoreSynced = alwaysReady
jm.jobStoreSynced = alwaysReady
job1 := newJob(1, 1)
job1 := newJob(1, 1, 6)
job1.Name = "job1"
job2 := newJob(1, 1)
job2 := newJob(1, 1, 6)
job2.Name = "job2"
job3 := newJob(1, 1)
job3 := newJob(1, 1, 6)
job3.Name = "job3"
job3.Spec.Selector.MatchLabels = map[string]string{"other": "labels"}
informer.Batch().V1().Jobs().Informer().GetIndexer().Add(job1)
@ -1113,7 +1132,7 @@ func TestSyncJobExpectations(t *testing.T) {
manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2)
job := newJob(2, 2, 6)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
pods := newPodList(2, v1.PodPending, job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
@ -1181,7 +1200,7 @@ func TestWatchJobs(t *testing.T) {
}
func TestWatchPods(t *testing.T) {
testJob := newJob(2, 2)
testJob := newJob(2, 2, 6)
clientset := fake.NewSimpleClientset(testJob)
fakeWatch := watch.NewFake()
clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil))

View File

@ -35,11 +35,12 @@ var _ = SIGDescribe("Job", func() {
f := framework.NewDefaultFramework("job")
parallelism := int32(2)
completions := int32(4)
backoffLimit := int32(6) // default value
// Simplest case: all pods succeed promptly
It("should run a job to completion when tasks succeed", func() {
By("Creating a job")
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil)
job := framework.NewTestJob("succeed", "all-succeed", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
@ -58,7 +59,7 @@ var _ = SIGDescribe("Job", func() {
// up to 5 minutes between restarts, making test timeouts
// due to successive failures too likely with a reasonable
// test timeout.
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil)
job := framework.NewTestJob("failOnce", "fail-once-local", v1.RestartPolicyOnFailure, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
@ -76,7 +77,7 @@ var _ = SIGDescribe("Job", func() {
// Worst case analysis: 15 failures, each taking 1 minute to
// run due to some slowness, 1 in 2^15 chance of happening,
// causing test flake. Should be very rare.
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil)
job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
@ -88,7 +89,7 @@ var _ = SIGDescribe("Job", func() {
It("should exceed active deadline", func() {
By("Creating a job")
var activeDeadlineSeconds int64 = 1
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds)
job := framework.NewTestJob("notTerminate", "exceed-active-deadline", v1.RestartPolicyNever, parallelism, completions, &activeDeadlineSeconds, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job past active deadline")
@ -98,7 +99,7 @@ var _ = SIGDescribe("Job", func() {
It("should delete a job", func() {
By("Creating a job")
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil)
job := framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
@ -121,7 +122,7 @@ var _ = SIGDescribe("Job", func() {
It("should adopt matching orphans and release non-matching pods", func() {
By("Creating a job")
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil)
job := framework.NewTestJob("notTerminate", "adopt-release", v1.RestartPolicyNever, parallelism, completions, nil, backoffLimit)
// Replace job with the one returned from Create() so it has the UID.
// Save Kind since it won't be populated in the returned job.
kind := job.Kind
@ -172,4 +173,22 @@ var _ = SIGDescribe("Job", func() {
},
)).To(Succeed(), "wait for pod %q to be released", pod.Name)
})
It("should exceed backoffLimit", func() {
By("Creating a job")
job := framework.NewTestJob("fail", "backofflimit", v1.RestartPolicyNever, 1, 1, nil, 0)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
By("Ensuring job exceed backofflimit")
err = framework.WaitForJobFailure(f.ClientSet, f.Namespace.Name, job.Name, time.Duration(30)*time.Second, "BackoffLimitExceeded")
Expect(err).NotTo(HaveOccurred())
By("Checking that only one pod created and status is failed")
pods, err := framework.GetJobPods(f.ClientSet, f.Namespace.Name, job.Name)
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
pod := pods.Items[0]
Expect(pod.Status.Phase).To(Equal(v1.PodFailed))
})
})

View File

@ -43,7 +43,7 @@ const (
// first time it is run and succeeds subsequently. name is the Name of the Job. RestartPolicy indicates the restart
// policy of the containers in which the Pod is running. Parallelism is the Job's parallelism, and completions is the
// Job's required number of completions.
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64) *batch.Job {
func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, completions int32, activeDeadlineSeconds *int64, backoffLimit int32) *batch.Job {
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: name,
@ -55,6 +55,7 @@ func NewTestJob(behavior, name string, rPol v1.RestartPolicy, parallelism, compl
ActiveDeadlineSeconds: activeDeadlineSeconds,
Parallelism: &parallelism,
Completions: &completions,
BackoffLimit: &backoffLimit,
ManualSelector: newBool(false),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{

View File

@ -418,9 +418,10 @@ var _ = framework.KubeDescribe("[sig-apps] Network Partition [Disruptive] [Slow]
It("should create new pods when node is partitioned", func() {
parallelism := int32(2)
completions := int32(4)
backoffLimit := int32(6) // default value
job := framework.NewTestJob("notTerminate", "network-partition", v1.RestartPolicyNever,
parallelism, completions, nil)
parallelism, completions, nil, backoffLimit)
job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job)
Expect(err).NotTo(HaveOccurred())
label := labels.SelectorFromSet(labels.Set(map[string]string{framework.JobSelectorKey: job.Name}))

View File

@ -39,7 +39,7 @@ func (t *JobUpgradeTest) Setup(f *framework.Framework) {
t.namespace = f.Namespace.Name
By("Creating a job")
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil)
t.job = framework.NewTestJob("notTerminate", "foo", v1.RestartPolicyOnFailure, 2, 2, nil, 6)
job, err := framework.CreateJob(f.ClientSet, t.namespace, t.job)
t.job = job
Expect(err).NotTo(HaveOccurred())