Merge pull request #58972 from soltysh/issue54870

Automatic merge from submit-queue (batch tested with PRs 61962, 58972, 62509, 62606). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Fix job's backoff limit for restart policy OnFailure

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes #54870

**Release note**:
```release-note
NONE
```

/assign janetkuo
This commit is contained in:
Kubernetes Submit Queue 2018-04-19 16:47:18 -07:00 committed by GitHub
commit 139309f798
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 4 deletions

View File

@ -495,12 +495,14 @@ func (jm *JobController) syncJob(key string) (bool, error) {
var failureMessage string
jobHaveNewFailure := failed > job.Status.Failed
exceedsBackoffLimit := jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit)
// check if the number of failed jobs increased since the last syncJob
if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob
jobFailed = true
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reach the specified backoff limit"
failureMessage = "Job has reached the specified backoff limit"
} else if pastActiveDeadline(&job) {
jobFailed = true
failureReason = "DeadlineExceeded"
@ -614,6 +616,30 @@ func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh cha
wait.Wait()
}
// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure
func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure {
return false
}
result := int32(0)
for i := range pods {
po := pods[i]
if po.Status.Phase != v1.PodRunning {
continue
}
for j := range po.Status.InitContainerStatuses {
stat := po.Status.InitContainerStatuses[j]
result += stat.RestartCount
}
for j := range po.Status.ContainerStatuses {
stat := po.Status.ContainerStatuses[j]
result += stat.RestartCount
}
}
return result >= *job.Spec.BackoffLimit
}
// pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded.
func pastActiveDeadline(job *batch.Job) bool {
if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil {

View File

@ -269,7 +269,7 @@ func TestControllerSyncJob(t *testing.T) {
nil, true, 0, 0, 0, 0,
10, 0, 10, 0, 0, nil, "",
},
"to many job sync failure": {
"too many job failures": {
2, 5, 0, true, 0,
nil, true, 0, 0, 0, 1,
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
@ -1408,3 +1408,92 @@ func TestJobBackoff(t *testing.T) {
})
}
}
func TestJobBackoffForOnFailure(t *testing.T) {
jobConditionFailed := batch.JobFailed
testCases := map[string]struct {
// job setup
parallelism int32
completions int32
backoffLimit int32
// pod setup
jobKeyForget bool
restartCounts []int32
// expectations
expectedActive int32
expectedSucceeded int32
expectedFailed int32
expectedCondition *batch.JobConditionType
expectedConditionReason string
}{
"too many job failures - single pod": {
1, 5, 2,
true, []int32{2},
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
"too many job failures - multiple pods": {
2, 5, 2,
true, []int32{1, 1},
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
},
"not enough failures": {
2, 5, 3,
true, []int32{1, 1},
2, 0, 0, nil, "",
},
}
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// job manager setup
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady
var actual *batch.Job
manager.updateHandler = func(job *batch.Job) error {
actual = job
return nil
}
// job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit)
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
for i, pod := range newPodList(int32(len(tc.restartCounts)), v1.PodRunning, job) {
pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}}
podIndexer.Add(&pod)
}
// run
forget, err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("unexpected error syncing job. Got %#v", err)
}
if forget != tc.jobKeyForget {
t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget)
}
// validate status
if actual.Status.Active != tc.expectedActive {
t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active)
}
if actual.Status.Succeeded != tc.expectedSucceeded {
t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded)
}
if actual.Status.Failed != tc.expectedFailed {
t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
}
// validate conditions
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) {
t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions)
}
})
}
}