diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 46b41aec307..e7b6ece74a7 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -905,12 +905,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return fmt.Errorf("tracking status: %w", err) } - jobFinished := IsJobFinished(&job) - if jobHasNewFailure && !jobFinished { - // returning an error will re-enqueue Job after the backoff period - return fmt.Errorf("failed pod(s) detected for job key %q", key) - } - return manageJobErr } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 17c738122d6..cd023a31b04 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -854,10 +854,6 @@ func TestControllerSyncJob(t *testing.T) { if err == nil { t.Error("Syncing jobs expected to return error on podControl exception") } - } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { - if err == nil { - t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") - } } else if tc.podLimit != 0 && fakePodControl.CreateCallCount > tc.podLimit { if err == nil { t.Error("Syncing jobs expected to return error when reached the podControl limit") @@ -1704,7 +1700,6 @@ func TestSyncJobPastDeadline(t *testing.T) { failedPods int // expectations - expectedForGetKey bool expectedDeletions int32 expectedActive int32 expectedSucceeded int32 @@ -1719,7 +1714,6 @@ func TestSyncJobPastDeadline(t *testing.T) { startTime: 15, backoffLimit: 6, activePods: 1, - expectedForGetKey: false, expectedDeletions: 1, expectedFailed: 1, expectedCondition: batch.JobFailed, @@ -1733,7 +1727,6 @@ func TestSyncJobPastDeadline(t *testing.T) { backoffLimit: 6, activePods: 1, succeededPods: 1, - expectedForGetKey: true, expectedDeletions: 1, expectedSucceeded: 1, expectedFailed: 1, @@ -1746,7 +1739,6 @@ func TestSyncJobPastDeadline(t *testing.T) { activeDeadlineSeconds: 10, startTime: 10, backoffLimit: 6, - expectedForGetKey: false, expectedCondition: batch.JobFailed, expectedConditionReason: "DeadlineExceeded", }, @@ -1756,7 +1748,6 @@ func TestSyncJobPastDeadline(t *testing.T) { activeDeadlineSeconds: 1, startTime: 10, failedPods: 1, - expectedForGetKey: false, expectedFailed: 1, expectedCondition: batch.JobFailed, expectedConditionReason: "BackoffLimitExceeded", @@ -1768,7 +1759,6 @@ func TestSyncJobPastDeadline(t *testing.T) { activeDeadlineSeconds: 10, startTime: 15, backoffLimit: 6, - expectedForGetKey: true, expectedCondition: batch.JobSuspended, expectedConditionReason: "JobSuspended", }, @@ -3898,80 +3888,38 @@ func bumpResourceVersion(obj metav1.Object) { obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) } -type pods struct { - pending int - active int - succeed int - failed int -} - -func TestJobBackoffReset(t *testing.T) { +func TestJobApiBackoffReset(t *testing.T) { _, ctx := ktesting.NewTestContext(t) - testCases := map[string]struct { - // job setup - parallelism int32 - completions int32 - backoffLimit int32 - // pod setup - each row is additive! - pods []pods - }{ - "parallelism=1": { - 1, 2, 1, - []pods{ - {0, 1, 0, 1}, - {0, 0, 1, 0}, - }, - }, - "parallelism=2 (just failure)": { - 2, 2, 1, - []pods{ - {0, 2, 0, 1}, - {0, 0, 1, 0}, - }, - }, + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { + return job, nil } - for name, tc := range testCases { - clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - defer func() { DefaultJobApiBackOff = 1 * time.Second }() - DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) - fakePodControl := controller.FakePodControl{} - manager.podControl = &fakePodControl - manager.podStoreSynced = alwaysReady - manager.jobStoreSynced = alwaysReady - var actual *batch.Job - manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { - actual = job - return job, nil - } + job := newJob(1, 1, 2, batch.NonIndexedCompletion) + key := testutil.GetKey(job, t) + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - // job & pods setup - job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) - key := testutil.GetKey(job, t) - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() + // error returned make the key requeued + fakePodControl.Err = errors.New("Controller error") + manager.queue.Add(key) + manager.processNextWorkItem(context.TODO()) + retries := manager.queue.NumRequeues(key) + if retries != 1 { + t.Fatalf("%s: expected exactly 1 retry, got %d", job.Name, retries) + } - setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed, 0) - manager.queue.Add(key) - manager.processNextWorkItem(context.TODO()) - retries := manager.queue.NumRequeues(key) - if retries != 1 { - t.Errorf("%s: expected exactly 1 retry, got %d", name, retries) - } - - job = actual - sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion) - setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed, 0) - manager.processNextWorkItem(context.TODO()) - retries = manager.queue.NumRequeues(key) - if retries != 0 { - t.Errorf("%s: expected exactly 0 retries, got %d", name, retries) - } - if getCondition(actual, batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded") { - t.Errorf("%s: unexpected job failure", name) - } + // the queue is emptied on success + fakePodControl.Err = nil + manager.processNextWorkItem(context.TODO()) + retries = manager.queue.NumRequeues(key) + if retries != 0 { + t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries) } } @@ -4066,7 +4014,6 @@ func TestJobBackoffForOnFailure(t *testing.T) { suspend bool // pod setup - jobKeyForget bool restartCounts []int32 podPhase v1.PodPhase @@ -4078,57 +4025,57 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedConditionReason string }{ "backoffLimit 0 should have 1 pod active": { - 1, 1, 0, false, + 1, 1, 0, false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 0 should have 1 pod active": { - 1, 1, 1, false, + 1, 1, 1, false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": { - 1, 1, 1, false, + 1, 1, 1, false, []int32{1}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": { - 1, 1, 1, false, + 1, 1, 1, false, []int32{1}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - single pod": { - 1, 5, 2, false, + 1, 5, 2, false, []int32{2}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - single pod": { - 1, 5, 2, false, + 1, 5, 2, false, []int32{2}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - multiple pods": { - 2, 5, 2, false, + 2, 5, 2, false, []int32{1, 1}, v1.PodRunning, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - multiple pods": { - 2, 5, 2, false, + 2, 5, 2, false, []int32{1, 1}, v1.PodPending, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures": { - 2, 5, 3, false, + 2, 5, 3, false, []int32{1, 1}, v1.PodRunning, 2, 0, 0, nil, "", }, "suspending a job": { - 2, 4, 6, true, + 2, 4, 6, true, []int32{1, 1}, v1.PodRunning, 0, 0, 0, &jobConditionSuspended, "JobSuspended", }, "finshed job": { - 2, 4, 6, true, + 2, 4, 6, true, []int32{1, 1, 2, 0}, v1.PodSucceeded, 0, 4, 0, &jobConditionComplete, "", }, @@ -4200,8 +4147,6 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { failedPods int // expectations - isExpectingAnError bool - jobKeyForget bool expectedActive int32 expectedSucceeded int32 expectedFailed int32 @@ -4211,27 +4156,27 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { "not enough failures with backoffLimit 0 - single pod": { 1, 1, 0, v1.PodRunning, 1, 0, - false, false, 1, 0, 0, nil, "", + 1, 0, 0, nil, "", }, "not enough failures with backoffLimit 1 - single pod": { 1, 1, 1, "", 0, 1, - true, false, 1, 0, 1, nil, "", + 1, 0, 1, nil, "", }, "too many failures with backoffLimit 1 - single pod": { 1, 1, 1, "", 0, 2, - false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", + 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures with backoffLimit 6 - multiple pods": { 2, 2, 6, v1.PodRunning, 1, 6, - true, false, 2, 0, 6, nil, "", + 2, 0, 6, nil, "", }, "too many failures with backoffLimit 6 - multiple pods": { 2, 2, 6, "", 0, 7, - false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", + 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", }, } @@ -4267,9 +4212,8 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { // run err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) - - if (err != nil) != tc.isExpectingAnError { - t.Errorf("unexpected error syncing job. Got %#v, isExpectingAnError: %v\n", err, tc.isExpectingAnError) + if err != nil { + t.Fatalf("unexpected error syncing job: %#v\n", err) } // validate status if actual.Status.Active != tc.expectedActive { @@ -4490,23 +4434,6 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { } } -// hasValidFailingPods checks if there exists failed pods with valid index. -func hasValidFailingPods(status []indexPhase, completions int) bool { - for _, s := range status { - ix, err := strconv.Atoi(s.Index) - if err != nil { - continue - } - if ix < 0 || ix >= completions { - continue - } - if s.Phase == v1.PodFailed { - return true - } - } - return false -} - type podBuilder struct { *v1.Pod }