diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 6c7fd03ed24..cac4da12f37 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -312,8 +312,10 @@ func (jm *Controller) updatePod(old, cur interface{}) { return } - // the only time we want the backoff to kick-in, is when the pod failed - immediate := curPod.Status.Phase != v1.PodFailed + // the only time we want the backoff to kick-in, is when the pod failed for the first time. + // we don't want to re-calculate backoff for an update event when the tracking finalizer + // for a failed pod is removed. + immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed) // Don't check if oldPod has the finalizer, as during ownership transfer // finalizers might be re-added and removed again in behalf of the new owner. @@ -504,7 +506,9 @@ func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool, backoff := delay if !immediate { - backoff = getBackoff(jm.queue, key) + if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 { + backoff = calculatedBackoff + } } // TODO: Handle overlapping controllers better. Either disallow them at admission time or @@ -859,6 +863,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr job.Status.Ready = ready err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil { + if apierrors.IsConflict(err) { + // we probably have a stale informer cache + // so don't return an error to avoid backoff + jm.enqueueController(&job, false) + return false, nil + } return false, fmt.Errorf("tracking status: %w", err) } jobFinished := IsJobFinished(&job) @@ -866,7 +876,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr // returning an error will re-enqueue Job after the backoff period return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) } - forget = true + if suspendCondChanged { + forget = true + } return forget, manageJobErr } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 686a2979f35..67fa43c785a 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -187,6 +187,12 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status } } +type jobInitialStatus struct { + active int + succeed int + failed int +} + func TestControllerSyncJob(t *testing.T) { jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed @@ -202,6 +208,7 @@ func TestControllerSyncJob(t *testing.T) { completionMode batch.CompletionMode wasSuspended bool suspend bool + initialStatus *jobInitialStatus // pod setup podControllerError error @@ -237,7 +244,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 2, expectedActive: 2, }, @@ -245,7 +252,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 2, expectedActive: 2, }, @@ -253,7 +260,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, pendingPods: 2, expectedActive: 2, }, @@ -261,7 +268,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 3, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 3, readyPods: 2, expectedActive: 3, @@ -270,7 +277,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 3, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 3, readyPods: 2, expectedActive: 3, @@ -281,7 +288,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 2, expectedActive: 2, }, @@ -301,7 +308,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: -1, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 1, expectedCreations: 1, expectedActive: 2, @@ -322,7 +329,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 3, expectedDeletions: 1, expectedActive: 2, @@ -333,7 +340,7 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, podControllerError: fmt.Errorf("fake error"), - jobKeyForget: true, + jobKeyForget: false, activePods: 3, expectedDeletions: 1, expectedActive: 3, @@ -363,6 +370,18 @@ func TestControllerSyncJob(t *testing.T) { expectedFailed: 1, expectedPodPatches: 1, }, + "no new pod; possible finalizer update of failed pod": { + parallelism: 1, + completions: 1, + backoffLimit: 6, + initialStatus: &jobInitialStatus{1, 0, 1}, + activePods: 1, + failedPods: 0, + expectedCreations: 0, + expectedActive: 1, + expectedFailed: 1, + expectedPodPatches: 0, + }, "only new failed pod with controller error": { parallelism: 2, completions: 5, @@ -424,7 +443,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, activePods: 10, expectedDeletions: 8, expectedActive: 2, @@ -471,7 +490,7 @@ func TestControllerSyncJob(t *testing.T) { completions: 200, backoffLimit: 6, podLimit: 10, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 10, expectedActive: 10, }, @@ -479,7 +498,7 @@ func TestControllerSyncJob(t *testing.T) { parallelism: 2, completions: 5, deleting: true, - jobKeyForget: true, + jobKeyForget: false, failedPods: 1, expectedFailed: 1, expectedCondition: &jobConditionFailed, @@ -501,7 +520,7 @@ func TestControllerSyncJob(t *testing.T) { completions: 5, backoffLimit: 6, completionMode: batch.IndexedCompletion, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 2, expectedActive: 2, expectedCreatedIndexes: sets.NewInt(0, 1), @@ -678,7 +697,7 @@ func TestControllerSyncJob(t *testing.T) { fakeExpectationAtCreation: -1, // the controller is expecting a deletion completions: 4, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 0, expectedDeletions: 0, expectedActive: 3, @@ -708,7 +727,7 @@ func TestControllerSyncJob(t *testing.T) { activePods: 2, // parallelism == active, expectations satisfied completions: 4, backoffLimit: 6, - jobKeyForget: true, + jobKeyForget: false, expectedCreations: 0, expectedDeletions: 0, expectedActive: 2, @@ -734,6 +753,13 @@ func TestControllerSyncJob(t *testing.T) { // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) job.Spec.Suspend = pointer.Bool(tc.suspend) + if tc.initialStatus != nil { + startTime := metav1.Now() + job.Status.StartTime = &startTime + job.Status.Active = int32(tc.initialStatus.active) + job.Status.Succeeded = int32(tc.initialStatus.succeed) + job.Status.Failed = int32(tc.initialStatus.failed) + } key, err := controller.KeyFunc(job) if err != nil { t.Errorf("Unexpected error getting job key: %v", err) @@ -1555,7 +1581,7 @@ func TestSyncJobPastDeadline(t *testing.T) { startTime: 15, backoffLimit: 6, activePods: 1, - expectedForGetKey: true, + expectedForGetKey: false, expectedDeletions: 1, expectedFailed: 1, expectedCondition: batch.JobFailed, @@ -1582,7 +1608,7 @@ func TestSyncJobPastDeadline(t *testing.T) { activeDeadlineSeconds: 10, startTime: 10, backoffLimit: 6, - expectedForGetKey: true, + expectedForGetKey: false, expectedCondition: batch.JobFailed, expectedConditionReason: "DeadlineExceeded", }, @@ -1592,7 +1618,7 @@ func TestSyncJobPastDeadline(t *testing.T) { activeDeadlineSeconds: 1, startTime: 10, failedPods: 1, - expectedForGetKey: true, + expectedForGetKey: false, expectedFailed: 1, expectedCondition: batch.JobFailed, expectedConditionReason: "BackoffLimitExceeded", @@ -1804,8 +1830,8 @@ func TestSingleJobFailedCondition(t *testing.T) { if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } - if !forget { - t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + if forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget) } if len(fakePodControl.DeletePodName) != 0 { t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName)) @@ -2946,6 +2972,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + defer func() { DefaultJobBackOff = 10 * time.Second }() DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing cases := map[string]struct { updateErr error @@ -3711,6 +3738,7 @@ func TestJobBackoffReset(t *testing.T) { for name, tc := range testCases { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) + defer func() { DefaultJobBackOff = 10 * time.Second }() DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} @@ -3775,13 +3803,13 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio func TestJobBackoff(t *testing.T) { job := newJob(1, 1, 1, batch.NonIndexedCompletion) oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) - oldPod.Status.Phase = v1.PodRunning oldPod.ResourceVersion = "1" newPod := oldPod.DeepCopy() newPod.ResourceVersion = "2" testCases := map[string]struct { requeues int + oldPodPhase v1.PodPhase phase v1.PodPhase jobReadyPodsEnabled bool wantBackoff time.Duration @@ -3822,14 +3850,22 @@ func TestJobBackoff(t *testing.T) { wantBackoff: 0, }, "1st failure with pod updates batching": { - requeues: 0, - phase: v1.PodFailed, - wantBackoff: podUpdateBatchPeriod, + requeues: 0, + phase: v1.PodFailed, + jobReadyPodsEnabled: true, + wantBackoff: podUpdateBatchPeriod, }, "2nd failure with pod updates batching": { + requeues: 1, + phase: v1.PodFailed, + jobReadyPodsEnabled: true, + wantBackoff: DefaultJobBackOff, + }, + "Failed pod observed again": { requeues: 1, + oldPodPhase: v1.PodFailed, phase: v1.PodFailed, - wantBackoff: DefaultJobBackOff, + wantBackoff: 0, }, } @@ -3848,23 +3884,29 @@ func TestJobBackoff(t *testing.T) { queue.requeues = tc.requeues newPod.Status.Phase = tc.phase + oldPod.Status.Phase = v1.PodRunning + if tc.oldPodPhase != "" { + oldPod.Status.Phase = tc.oldPodPhase + } manager.updatePod(oldPod, newPod) - - if queue.duration.Nanoseconds() != int64(tc.wantBackoff)*DefaultJobBackOff.Nanoseconds() { - t.Errorf("unexpected backoff %v", queue.duration) + if queue.duration != tc.wantBackoff { + t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff) } }) } } func TestJobBackoffForOnFailure(t *testing.T) { + jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed + jobConditionSuspended := batch.JobSuspended testCases := map[string]struct { // job setup parallelism int32 completions int32 backoffLimit int32 + suspend bool // pod setup jobKeyForget bool @@ -3879,50 +3921,60 @@ func TestJobBackoffForOnFailure(t *testing.T) { expectedConditionReason string }{ "backoffLimit 0 should have 1 pod active": { - 1, 1, 0, - true, []int32{0}, v1.PodRunning, + 1, 1, 0, false, + false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 0 should have 1 pod active": { - 1, 1, 1, - true, []int32{0}, v1.PodRunning, + 1, 1, 1, false, + false, []int32{0}, v1.PodRunning, 1, 0, 0, nil, "", }, "backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": { - 1, 1, 1, - true, []int32{1}, v1.PodRunning, + 1, 1, 1, false, + false, []int32{1}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": { - 1, 1, 1, - true, []int32{1}, v1.PodPending, + 1, 1, 1, false, + false, []int32{1}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - single pod": { - 1, 5, 2, - true, []int32{2}, v1.PodRunning, + 1, 5, 2, false, + false, []int32{2}, v1.PodRunning, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - single pod": { - 1, 5, 2, - true, []int32{2}, v1.PodPending, + 1, 5, 2, false, + false, []int32{2}, v1.PodPending, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podRunning - multiple pods": { - 2, 5, 2, - true, []int32{1, 1}, v1.PodRunning, + 2, 5, 2, false, + false, []int32{1, 1}, v1.PodRunning, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "too many job failures with podPending - multiple pods": { - 2, 5, 2, - true, []int32{1, 1}, v1.PodPending, + 2, 5, 2, false, + false, []int32{1, 1}, v1.PodPending, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures": { - 2, 5, 3, - true, []int32{1, 1}, v1.PodRunning, + 2, 5, 3, false, + false, []int32{1, 1}, v1.PodRunning, 2, 0, 0, nil, "", }, + "suspending a job": { + 2, 4, 6, true, + true, []int32{1, 1}, v1.PodRunning, + 0, 0, 0, &jobConditionSuspended, "JobSuspended", + }, + "finshed job": { + 2, 4, 6, true, + true, []int32{1, 1, 2, 0}, v1.PodSucceeded, + 0, 4, 0, &jobConditionComplete, "", + }, } for name, tc := range testCases { @@ -3943,6 +3995,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { // job & pods setup job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion) job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure + job.Spec.Suspend = pointer.Bool(tc.suspend) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) { @@ -4003,7 +4056,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { "not enough failures with backoffLimit 0 - single pod": { 1, 1, 0, v1.PodRunning, 1, 0, - false, true, 1, 0, 0, nil, "", + false, false, 1, 0, 0, nil, "", }, "not enough failures with backoffLimit 1 - single pod": { 1, 1, 1, @@ -4013,7 +4066,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { "too many failures with backoffLimit 1 - single pod": { 1, 1, 1, "", 0, 2, - false, true, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", + false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded", }, "not enough failures with backoffLimit 6 - multiple pods": { 2, 2, 6, @@ -4023,7 +4076,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { "too many failures with backoffLimit 6 - multiple pods": { 2, 2, 6, "", 0, 7, - false, true, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", + false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded", }, }