diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 552fdfd8fe6..c09b135594f 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -495,12 +495,14 @@ func (jm *JobController) syncJob(key string) (bool, error) { var failureMessage string jobHaveNewFailure := failed > job.Status.Failed + exceedsBackoffLimit := jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) - // check if the number of failed jobs increased since the last syncJob - if jobHaveNewFailure && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) { + if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { + // check if the number of pod restart exceeds backoff (for restart OnFailure only) + // OR if the number of failed jobs increased since the last syncJob jobFailed = true failureReason = "BackoffLimitExceeded" - failureMessage = "Job has reach the specified backoff limit" + failureMessage = "Job has reached the specified backoff limit" } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" @@ -614,6 +616,30 @@ func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh cha wait.Wait() } +// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit +// this method applies only to pods with restartPolicy == OnFailure +func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool { + if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure { + return false + } + result := int32(0) + for i := range pods { + po := pods[i] + if po.Status.Phase != v1.PodRunning { + continue + } + for j := range po.Status.InitContainerStatuses { + stat := po.Status.InitContainerStatuses[j] + result += stat.RestartCount + } + for j := range po.Status.ContainerStatuses { + stat := po.Status.ContainerStatuses[j] + result += stat.RestartCount + } + } + return result >= *job.Spec.BackoffLimit +} + // pastActiveDeadline checks if job has ActiveDeadlineSeconds field set and if it is exceeded. func pastActiveDeadline(job *batch.Job) bool { if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 81edcc90a81..dbb89ccd924 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -269,7 +269,7 @@ func TestControllerSyncJob(t *testing.T) { nil, true, 0, 0, 0, 0, 10, 0, 10, 0, 0, nil, "", }, - "to many job sync failure": { + "too many job failures": { 2, 5, 0, true, 0, nil, true, 0, 0, 0, 1, 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", @@ -1408,3 +1408,92 @@ func TestJobBackoff(t *testing.T) { }) } } + +func TestJobBackoffForOnFailure(t *testing.T) { + jobConditionFailed := batch.JobFailed + + testCases := map[string]struct { + // job setup + parallelism int32 + completions int32 + backoffLimit int32 + + // pod setup + jobKeyForget bool + restartCounts []int32 + + // expectations + expectedActive int32 + expectedSucceeded int32 + expectedFailed int32 + expectedCondition *batch.JobConditionType + expectedConditionReason string + }{ + "too many job failures - single pod": { + 1, 5, 2, + true, []int32{2}, + 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", + }, + "too many job failures - multiple pods": { + 2, 5, 2, + true, []int32{1, 1}, + 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", + }, + "not enough failures": { + 2, 5, 3, + true, []int32{1, 1}, + 2, 0, 0, nil, "", + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + // job manager setup + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + var actual *batch.Job + manager.updateHandler = func(job *batch.Job) error { + actual = job + return nil + } + + // job & pods setup + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() + for i, pod := range newPodList(int32(len(tc.restartCounts)), v1.PodRunning, job) { + pod.Status.ContainerStatuses = []v1.ContainerStatus{{RestartCount: tc.restartCounts[i]}} + podIndexer.Add(&pod) + } + + // run + forget, err := manager.syncJob(getKey(job, t)) + + if err != nil { + t.Errorf("unexpected error syncing job. Got %#v", err) + } + if forget != tc.jobKeyForget { + t.Errorf("unexpected forget value. Expected %v, saw %v\n", tc.jobKeyForget, forget) + } + // validate status + if actual.Status.Active != tc.expectedActive { + t.Errorf("unexpected number of active pods. Expected %d, saw %d\n", tc.expectedActive, actual.Status.Active) + } + if actual.Status.Succeeded != tc.expectedSucceeded { + t.Errorf("unexpected number of succeeded pods. Expected %d, saw %d\n", tc.expectedSucceeded, actual.Status.Succeeded) + } + if actual.Status.Failed != tc.expectedFailed { + t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) + } + // validate conditions + if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionReason) { + t.Errorf("expected completion condition. Got %#v", actual.Status.Conditions) + } + }) + } +}