mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #114516 from nikhita/job-backoff-fix
pkg/controller/job: re-honor exponential backoff delay
This commit is contained in:
commit
c0c386b9c9
@ -312,8 +312,10 @@ func (jm *Controller) updatePod(old, cur interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
// the only time we want the backoff to kick-in, is when the pod failed
|
||||
immediate := curPod.Status.Phase != v1.PodFailed
|
||||
// the only time we want the backoff to kick-in, is when the pod failed for the first time.
|
||||
// we don't want to re-calculate backoff for an update event when the tracking finalizer
|
||||
// for a failed pod is removed.
|
||||
immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed)
|
||||
|
||||
// Don't check if oldPod has the finalizer, as during ownership transfer
|
||||
// finalizers might be re-added and removed again in behalf of the new owner.
|
||||
@ -504,7 +506,9 @@ func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool,
|
||||
|
||||
backoff := delay
|
||||
if !immediate {
|
||||
backoff = getBackoff(jm.queue, key)
|
||||
if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 {
|
||||
backoff = calculatedBackoff
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
||||
@ -859,6 +863,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
||||
job.Status.Ready = ready
|
||||
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
|
||||
if err != nil {
|
||||
if apierrors.IsConflict(err) {
|
||||
// we probably have a stale informer cache
|
||||
// so don't return an error to avoid backoff
|
||||
jm.enqueueController(&job, false)
|
||||
return false, nil
|
||||
}
|
||||
return false, fmt.Errorf("tracking status: %w", err)
|
||||
}
|
||||
jobFinished := IsJobFinished(&job)
|
||||
@ -866,7 +876,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
||||
// returning an error will re-enqueue Job after the backoff period
|
||||
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
|
||||
}
|
||||
if suspendCondChanged {
|
||||
forget = true
|
||||
}
|
||||
return forget, manageJobErr
|
||||
}
|
||||
|
||||
|
@ -187,6 +187,12 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
|
||||
}
|
||||
}
|
||||
|
||||
type jobInitialStatus struct {
|
||||
active int
|
||||
succeed int
|
||||
failed int
|
||||
}
|
||||
|
||||
func TestControllerSyncJob(t *testing.T) {
|
||||
jobConditionComplete := batch.JobComplete
|
||||
jobConditionFailed := batch.JobFailed
|
||||
@ -202,6 +208,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
completionMode batch.CompletionMode
|
||||
wasSuspended bool
|
||||
suspend bool
|
||||
initialStatus *jobInitialStatus
|
||||
|
||||
// pod setup
|
||||
podControllerError error
|
||||
@ -237,7 +244,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 2,
|
||||
expectedActive: 2,
|
||||
},
|
||||
@ -245,7 +252,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: -1,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 2,
|
||||
expectedActive: 2,
|
||||
},
|
||||
@ -253,7 +260,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
pendingPods: 2,
|
||||
expectedActive: 2,
|
||||
},
|
||||
@ -261,7 +268,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 3,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 3,
|
||||
readyPods: 2,
|
||||
expectedActive: 3,
|
||||
@ -270,7 +277,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 3,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 3,
|
||||
readyPods: 2,
|
||||
expectedActive: 3,
|
||||
@ -281,7 +288,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: -1,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 2,
|
||||
expectedActive: 2,
|
||||
},
|
||||
@ -301,7 +308,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: -1,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 1,
|
||||
expectedCreations: 1,
|
||||
expectedActive: 2,
|
||||
@ -322,7 +329,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 3,
|
||||
expectedDeletions: 1,
|
||||
expectedActive: 2,
|
||||
@ -333,7 +340,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
podControllerError: fmt.Errorf("fake error"),
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 3,
|
||||
expectedDeletions: 1,
|
||||
expectedActive: 3,
|
||||
@ -363,6 +370,18 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
expectedFailed: 1,
|
||||
expectedPodPatches: 1,
|
||||
},
|
||||
"no new pod; possible finalizer update of failed pod": {
|
||||
parallelism: 1,
|
||||
completions: 1,
|
||||
backoffLimit: 6,
|
||||
initialStatus: &jobInitialStatus{1, 0, 1},
|
||||
activePods: 1,
|
||||
failedPods: 0,
|
||||
expectedCreations: 0,
|
||||
expectedActive: 1,
|
||||
expectedFailed: 1,
|
||||
expectedPodPatches: 0,
|
||||
},
|
||||
"only new failed pod with controller error": {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
@ -424,7 +443,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
activePods: 10,
|
||||
expectedDeletions: 8,
|
||||
expectedActive: 2,
|
||||
@ -471,7 +490,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
completions: 200,
|
||||
backoffLimit: 6,
|
||||
podLimit: 10,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 10,
|
||||
expectedActive: 10,
|
||||
},
|
||||
@ -479,7 +498,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
deleting: true,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
failedPods: 1,
|
||||
expectedFailed: 1,
|
||||
expectedCondition: &jobConditionFailed,
|
||||
@ -501,7 +520,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
completionMode: batch.IndexedCompletion,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 2,
|
||||
expectedActive: 2,
|
||||
expectedCreatedIndexes: sets.NewInt(0, 1),
|
||||
@ -678,7 +697,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
|
||||
completions: 4,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 0,
|
||||
expectedDeletions: 0,
|
||||
expectedActive: 3,
|
||||
@ -708,7 +727,7 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
activePods: 2, // parallelism == active, expectations satisfied
|
||||
completions: 4,
|
||||
backoffLimit: 6,
|
||||
jobKeyForget: true,
|
||||
jobKeyForget: false,
|
||||
expectedCreations: 0,
|
||||
expectedDeletions: 0,
|
||||
expectedActive: 2,
|
||||
@ -734,6 +753,13 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
// job & pods setup
|
||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
|
||||
job.Spec.Suspend = pointer.Bool(tc.suspend)
|
||||
if tc.initialStatus != nil {
|
||||
startTime := metav1.Now()
|
||||
job.Status.StartTime = &startTime
|
||||
job.Status.Active = int32(tc.initialStatus.active)
|
||||
job.Status.Succeeded = int32(tc.initialStatus.succeed)
|
||||
job.Status.Failed = int32(tc.initialStatus.failed)
|
||||
}
|
||||
key, err := controller.KeyFunc(job)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error getting job key: %v", err)
|
||||
@ -1555,7 +1581,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
startTime: 15,
|
||||
backoffLimit: 6,
|
||||
activePods: 1,
|
||||
expectedForGetKey: true,
|
||||
expectedForGetKey: false,
|
||||
expectedDeletions: 1,
|
||||
expectedFailed: 1,
|
||||
expectedCondition: batch.JobFailed,
|
||||
@ -1582,7 +1608,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
activeDeadlineSeconds: 10,
|
||||
startTime: 10,
|
||||
backoffLimit: 6,
|
||||
expectedForGetKey: true,
|
||||
expectedForGetKey: false,
|
||||
expectedCondition: batch.JobFailed,
|
||||
expectedConditionReason: "DeadlineExceeded",
|
||||
},
|
||||
@ -1592,7 +1618,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
||||
activeDeadlineSeconds: 1,
|
||||
startTime: 10,
|
||||
failedPods: 1,
|
||||
expectedForGetKey: true,
|
||||
expectedForGetKey: false,
|
||||
expectedFailed: 1,
|
||||
expectedCondition: batch.JobFailed,
|
||||
expectedConditionReason: "BackoffLimitExceeded",
|
||||
@ -1804,8 +1830,8 @@ func TestSingleJobFailedCondition(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||
}
|
||||
if !forget {
|
||||
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
|
||||
if forget {
|
||||
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
|
||||
}
|
||||
if len(fakePodControl.DeletePodName) != 0 {
|
||||
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
|
||||
@ -2946,6 +2972,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
|
||||
|
||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
defer func() { DefaultJobBackOff = 10 * time.Second }()
|
||||
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||
cases := map[string]struct {
|
||||
updateErr error
|
||||
@ -3711,6 +3738,7 @@ func TestJobBackoffReset(t *testing.T) {
|
||||
|
||||
for name, tc := range testCases {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
defer func() { DefaultJobBackOff = 10 * time.Second }()
|
||||
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
@ -3775,13 +3803,13 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio
|
||||
func TestJobBackoff(t *testing.T) {
|
||||
job := newJob(1, 1, 1, batch.NonIndexedCompletion)
|
||||
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
|
||||
oldPod.Status.Phase = v1.PodRunning
|
||||
oldPod.ResourceVersion = "1"
|
||||
newPod := oldPod.DeepCopy()
|
||||
newPod.ResourceVersion = "2"
|
||||
|
||||
testCases := map[string]struct {
|
||||
requeues int
|
||||
oldPodPhase v1.PodPhase
|
||||
phase v1.PodPhase
|
||||
jobReadyPodsEnabled bool
|
||||
wantBackoff time.Duration
|
||||
@ -3824,13 +3852,21 @@ func TestJobBackoff(t *testing.T) {
|
||||
"1st failure with pod updates batching": {
|
||||
requeues: 0,
|
||||
phase: v1.PodFailed,
|
||||
jobReadyPodsEnabled: true,
|
||||
wantBackoff: podUpdateBatchPeriod,
|
||||
},
|
||||
"2nd failure with pod updates batching": {
|
||||
requeues: 1,
|
||||
phase: v1.PodFailed,
|
||||
jobReadyPodsEnabled: true,
|
||||
wantBackoff: DefaultJobBackOff,
|
||||
},
|
||||
"Failed pod observed again": {
|
||||
requeues: 1,
|
||||
oldPodPhase: v1.PodFailed,
|
||||
phase: v1.PodFailed,
|
||||
wantBackoff: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
@ -3848,23 +3884,29 @@ func TestJobBackoff(t *testing.T) {
|
||||
|
||||
queue.requeues = tc.requeues
|
||||
newPod.Status.Phase = tc.phase
|
||||
oldPod.Status.Phase = v1.PodRunning
|
||||
if tc.oldPodPhase != "" {
|
||||
oldPod.Status.Phase = tc.oldPodPhase
|
||||
}
|
||||
manager.updatePod(oldPod, newPod)
|
||||
|
||||
if queue.duration.Nanoseconds() != int64(tc.wantBackoff)*DefaultJobBackOff.Nanoseconds() {
|
||||
t.Errorf("unexpected backoff %v", queue.duration)
|
||||
if queue.duration != tc.wantBackoff {
|
||||
t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobBackoffForOnFailure(t *testing.T) {
|
||||
jobConditionComplete := batch.JobComplete
|
||||
jobConditionFailed := batch.JobFailed
|
||||
jobConditionSuspended := batch.JobSuspended
|
||||
|
||||
testCases := map[string]struct {
|
||||
// job setup
|
||||
parallelism int32
|
||||
completions int32
|
||||
backoffLimit int32
|
||||
suspend bool
|
||||
|
||||
// pod setup
|
||||
jobKeyForget bool
|
||||
@ -3879,50 +3921,60 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
||||
expectedConditionReason string
|
||||
}{
|
||||
"backoffLimit 0 should have 1 pod active": {
|
||||
1, 1, 0,
|
||||
true, []int32{0}, v1.PodRunning,
|
||||
1, 1, 0, false,
|
||||
false, []int32{0}, v1.PodRunning,
|
||||
1, 0, 0, nil, "",
|
||||
},
|
||||
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
|
||||
1, 1, 1,
|
||||
true, []int32{0}, v1.PodRunning,
|
||||
1, 1, 1, false,
|
||||
false, []int32{0}, v1.PodRunning,
|
||||
1, 0, 0, nil, "",
|
||||
},
|
||||
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
|
||||
1, 1, 1,
|
||||
true, []int32{1}, v1.PodRunning,
|
||||
1, 1, 1, false,
|
||||
false, []int32{1}, v1.PodRunning,
|
||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
|
||||
1, 1, 1,
|
||||
true, []int32{1}, v1.PodPending,
|
||||
1, 1, 1, false,
|
||||
false, []int32{1}, v1.PodPending,
|
||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"too many job failures with podRunning - single pod": {
|
||||
1, 5, 2,
|
||||
true, []int32{2}, v1.PodRunning,
|
||||
1, 5, 2, false,
|
||||
false, []int32{2}, v1.PodRunning,
|
||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"too many job failures with podPending - single pod": {
|
||||
1, 5, 2,
|
||||
true, []int32{2}, v1.PodPending,
|
||||
1, 5, 2, false,
|
||||
false, []int32{2}, v1.PodPending,
|
||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"too many job failures with podRunning - multiple pods": {
|
||||
2, 5, 2,
|
||||
true, []int32{1, 1}, v1.PodRunning,
|
||||
2, 5, 2, false,
|
||||
false, []int32{1, 1}, v1.PodRunning,
|
||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"too many job failures with podPending - multiple pods": {
|
||||
2, 5, 2,
|
||||
true, []int32{1, 1}, v1.PodPending,
|
||||
2, 5, 2, false,
|
||||
false, []int32{1, 1}, v1.PodPending,
|
||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"not enough failures": {
|
||||
2, 5, 3,
|
||||
true, []int32{1, 1}, v1.PodRunning,
|
||||
2, 5, 3, false,
|
||||
false, []int32{1, 1}, v1.PodRunning,
|
||||
2, 0, 0, nil, "",
|
||||
},
|
||||
"suspending a job": {
|
||||
2, 4, 6, true,
|
||||
true, []int32{1, 1}, v1.PodRunning,
|
||||
0, 0, 0, &jobConditionSuspended, "JobSuspended",
|
||||
},
|
||||
"finshed job": {
|
||||
2, 4, 6, true,
|
||||
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
|
||||
0, 4, 0, &jobConditionComplete, "",
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range testCases {
|
||||
@ -3943,6 +3995,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
||||
// job & pods setup
|
||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
|
||||
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
|
||||
job.Spec.Suspend = pointer.Bool(tc.suspend)
|
||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||
for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) {
|
||||
@ -4003,7 +4056,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
||||
"not enough failures with backoffLimit 0 - single pod": {
|
||||
1, 1, 0,
|
||||
v1.PodRunning, 1, 0,
|
||||
false, true, 1, 0, 0, nil, "",
|
||||
false, false, 1, 0, 0, nil, "",
|
||||
},
|
||||
"not enough failures with backoffLimit 1 - single pod": {
|
||||
1, 1, 1,
|
||||
@ -4013,7 +4066,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
||||
"too many failures with backoffLimit 1 - single pod": {
|
||||
1, 1, 1,
|
||||
"", 0, 2,
|
||||
false, true, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
"not enough failures with backoffLimit 6 - multiple pods": {
|
||||
2, 2, 6,
|
||||
@ -4023,7 +4076,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
||||
"too many failures with backoffLimit 6 - multiple pods": {
|
||||
2, 2, 6,
|
||||
"", 0, 7,
|
||||
false, true, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user