mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
pkg/controller/job: re-honor exponential backoff
This commit makes the job controller re-honor exponential backoff for failed pods. Before this commit, the controller created pods without any backoff. This is a regression because the controller used to create pods with an exponential backoff delay before (10s, 20s, 40s ...). The issue occurs only when the JobTrackingWithFinalizers feature is enabled (which is enabled by default right now). With this feature, we get an extra pod update event when the finalizer of a failed pod is removed. Note that the pod failure detection and new pod creation happen in the same reconcile loop so the 2nd pod is created immediately after the 1st pod fails. The backoff is only applied on 2nd pod failure, which means that the 3rd pod created 10s after the 2nd pod, 4th pod is created 20s after the 3rd pod and so on. This commit fixes a few bugs: 1. Right now, each time `uncounted != nil` and the job does not see a _new_ failure, `forget` is set to true and the job is removed from the queue. Which means that this condition is also triggered each time the finalizer for a failed pod is removed and `NumRequeues` is reset, which results in a backoff of 0s. 2. Updates `updatePod` to only apply backoff when we see a particular pod failed for the first time. This is necessary to ensure that the controller does not apply backoff when it sees a pod update event for finalizer removal of a failed pod. 3. If `JobsReadyPods` feature is enabled and backoff is 0s, the job is now enqueued after `podUpdateBatchPeriod` seconds, instead of 0s. The unit test for this check also had a few bugs: - `DefaultJobBackOff` is overwritten to 0 in certain unit tests, which meant that `DefaultJobBackOff` was considered to be 0, effectively not running any meaningful checks. - `JobsReadyPods` was not enabled for test cases that ran tests which required the feature gate to be enabled. - The check for expected and actual backoff had incorrect calculations.
This commit is contained in:
parent
0ff0d0b94e
commit
fd8d92a29d
@ -312,8 +312,10 @@ func (jm *Controller) updatePod(old, cur interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// the only time we want the backoff to kick-in, is when the pod failed
|
// the only time we want the backoff to kick-in, is when the pod failed for the first time.
|
||||||
immediate := curPod.Status.Phase != v1.PodFailed
|
// we don't want to re-calculate backoff for an update event when the tracking finalizer
|
||||||
|
// for a failed pod is removed.
|
||||||
|
immediate := !(curPod.Status.Phase == v1.PodFailed && oldPod.Status.Phase != v1.PodFailed)
|
||||||
|
|
||||||
// Don't check if oldPod has the finalizer, as during ownership transfer
|
// Don't check if oldPod has the finalizer, as during ownership transfer
|
||||||
// finalizers might be re-added and removed again in behalf of the new owner.
|
// finalizers might be re-added and removed again in behalf of the new owner.
|
||||||
@ -504,7 +506,9 @@ func (jm *Controller) enqueueControllerDelayed(obj interface{}, immediate bool,
|
|||||||
|
|
||||||
backoff := delay
|
backoff := delay
|
||||||
if !immediate {
|
if !immediate {
|
||||||
backoff = getBackoff(jm.queue, key)
|
if calculatedBackoff := getBackoff(jm.queue, key); calculatedBackoff > 0 {
|
||||||
|
backoff = calculatedBackoff
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
// TODO: Handle overlapping controllers better. Either disallow them at admission time or
|
||||||
@ -859,6 +863,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
job.Status.Ready = ready
|
job.Status.Ready = ready
|
||||||
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
|
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if apierrors.IsConflict(err) {
|
||||||
|
// we probably have a stale informer cache
|
||||||
|
// so don't return an error to avoid backoff
|
||||||
|
jm.enqueueController(&job, false)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
return false, fmt.Errorf("tracking status: %w", err)
|
return false, fmt.Errorf("tracking status: %w", err)
|
||||||
}
|
}
|
||||||
jobFinished := IsJobFinished(&job)
|
jobFinished := IsJobFinished(&job)
|
||||||
@ -866,7 +876,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
|||||||
// returning an error will re-enqueue Job after the backoff period
|
// returning an error will re-enqueue Job after the backoff period
|
||||||
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
|
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
|
||||||
}
|
}
|
||||||
|
if suspendCondChanged {
|
||||||
forget = true
|
forget = true
|
||||||
|
}
|
||||||
return forget, manageJobErr
|
return forget, manageJobErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -187,6 +187,12 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type jobInitialStatus struct {
|
||||||
|
active int
|
||||||
|
succeed int
|
||||||
|
failed int
|
||||||
|
}
|
||||||
|
|
||||||
func TestControllerSyncJob(t *testing.T) {
|
func TestControllerSyncJob(t *testing.T) {
|
||||||
jobConditionComplete := batch.JobComplete
|
jobConditionComplete := batch.JobComplete
|
||||||
jobConditionFailed := batch.JobFailed
|
jobConditionFailed := batch.JobFailed
|
||||||
@ -202,6 +208,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
completionMode batch.CompletionMode
|
completionMode batch.CompletionMode
|
||||||
wasSuspended bool
|
wasSuspended bool
|
||||||
suspend bool
|
suspend bool
|
||||||
|
initialStatus *jobInitialStatus
|
||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
podControllerError error
|
podControllerError error
|
||||||
@ -237,7 +244,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 2,
|
expectedCreations: 2,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
},
|
},
|
||||||
@ -245,7 +252,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: -1,
|
completions: -1,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 2,
|
expectedCreations: 2,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
},
|
},
|
||||||
@ -253,7 +260,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
pendingPods: 2,
|
pendingPods: 2,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
},
|
},
|
||||||
@ -261,7 +268,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 3,
|
parallelism: 3,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 3,
|
activePods: 3,
|
||||||
readyPods: 2,
|
readyPods: 2,
|
||||||
expectedActive: 3,
|
expectedActive: 3,
|
||||||
@ -270,7 +277,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 3,
|
parallelism: 3,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 3,
|
activePods: 3,
|
||||||
readyPods: 2,
|
readyPods: 2,
|
||||||
expectedActive: 3,
|
expectedActive: 3,
|
||||||
@ -281,7 +288,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: -1,
|
completions: -1,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 2,
|
activePods: 2,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
},
|
},
|
||||||
@ -301,7 +308,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: -1,
|
completions: -1,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 1,
|
activePods: 1,
|
||||||
expectedCreations: 1,
|
expectedCreations: 1,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
@ -322,7 +329,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 3,
|
activePods: 3,
|
||||||
expectedDeletions: 1,
|
expectedDeletions: 1,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
@ -333,7 +340,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
podControllerError: fmt.Errorf("fake error"),
|
podControllerError: fmt.Errorf("fake error"),
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 3,
|
activePods: 3,
|
||||||
expectedDeletions: 1,
|
expectedDeletions: 1,
|
||||||
expectedActive: 3,
|
expectedActive: 3,
|
||||||
@ -363,6 +370,18 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedPodPatches: 1,
|
expectedPodPatches: 1,
|
||||||
},
|
},
|
||||||
|
"no new pod; possible finalizer update of failed pod": {
|
||||||
|
parallelism: 1,
|
||||||
|
completions: 1,
|
||||||
|
backoffLimit: 6,
|
||||||
|
initialStatus: &jobInitialStatus{1, 0, 1},
|
||||||
|
activePods: 1,
|
||||||
|
failedPods: 0,
|
||||||
|
expectedCreations: 0,
|
||||||
|
expectedActive: 1,
|
||||||
|
expectedFailed: 1,
|
||||||
|
expectedPodPatches: 0,
|
||||||
|
},
|
||||||
"only new failed pod with controller error": {
|
"only new failed pod with controller error": {
|
||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
@ -424,7 +443,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
activePods: 10,
|
activePods: 10,
|
||||||
expectedDeletions: 8,
|
expectedDeletions: 8,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
@ -471,7 +490,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
completions: 200,
|
completions: 200,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
podLimit: 10,
|
podLimit: 10,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 10,
|
expectedCreations: 10,
|
||||||
expectedActive: 10,
|
expectedActive: 10,
|
||||||
},
|
},
|
||||||
@ -479,7 +498,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
parallelism: 2,
|
parallelism: 2,
|
||||||
completions: 5,
|
completions: 5,
|
||||||
deleting: true,
|
deleting: true,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
failedPods: 1,
|
failedPods: 1,
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedCondition: &jobConditionFailed,
|
expectedCondition: &jobConditionFailed,
|
||||||
@ -501,7 +520,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
completions: 5,
|
completions: 5,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
completionMode: batch.IndexedCompletion,
|
completionMode: batch.IndexedCompletion,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 2,
|
expectedCreations: 2,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
expectedCreatedIndexes: sets.NewInt(0, 1),
|
expectedCreatedIndexes: sets.NewInt(0, 1),
|
||||||
@ -678,7 +697,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
|
fakeExpectationAtCreation: -1, // the controller is expecting a deletion
|
||||||
completions: 4,
|
completions: 4,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 0,
|
expectedCreations: 0,
|
||||||
expectedDeletions: 0,
|
expectedDeletions: 0,
|
||||||
expectedActive: 3,
|
expectedActive: 3,
|
||||||
@ -708,7 +727,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
activePods: 2, // parallelism == active, expectations satisfied
|
activePods: 2, // parallelism == active, expectations satisfied
|
||||||
completions: 4,
|
completions: 4,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
jobKeyForget: true,
|
jobKeyForget: false,
|
||||||
expectedCreations: 0,
|
expectedCreations: 0,
|
||||||
expectedDeletions: 0,
|
expectedDeletions: 0,
|
||||||
expectedActive: 2,
|
expectedActive: 2,
|
||||||
@ -734,6 +753,13 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
// job & pods setup
|
// job & pods setup
|
||||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
|
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
|
||||||
job.Spec.Suspend = pointer.Bool(tc.suspend)
|
job.Spec.Suspend = pointer.Bool(tc.suspend)
|
||||||
|
if tc.initialStatus != nil {
|
||||||
|
startTime := metav1.Now()
|
||||||
|
job.Status.StartTime = &startTime
|
||||||
|
job.Status.Active = int32(tc.initialStatus.active)
|
||||||
|
job.Status.Succeeded = int32(tc.initialStatus.succeed)
|
||||||
|
job.Status.Failed = int32(tc.initialStatus.failed)
|
||||||
|
}
|
||||||
key, err := controller.KeyFunc(job)
|
key, err := controller.KeyFunc(job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error getting job key: %v", err)
|
t.Errorf("Unexpected error getting job key: %v", err)
|
||||||
@ -1555,7 +1581,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
startTime: 15,
|
startTime: 15,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
activePods: 1,
|
activePods: 1,
|
||||||
expectedForGetKey: true,
|
expectedForGetKey: false,
|
||||||
expectedDeletions: 1,
|
expectedDeletions: 1,
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
@ -1582,7 +1608,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
activeDeadlineSeconds: 10,
|
activeDeadlineSeconds: 10,
|
||||||
startTime: 10,
|
startTime: 10,
|
||||||
backoffLimit: 6,
|
backoffLimit: 6,
|
||||||
expectedForGetKey: true,
|
expectedForGetKey: false,
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
expectedConditionReason: "DeadlineExceeded",
|
expectedConditionReason: "DeadlineExceeded",
|
||||||
},
|
},
|
||||||
@ -1592,7 +1618,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
activeDeadlineSeconds: 1,
|
activeDeadlineSeconds: 1,
|
||||||
startTime: 10,
|
startTime: 10,
|
||||||
failedPods: 1,
|
failedPods: 1,
|
||||||
expectedForGetKey: true,
|
expectedForGetKey: false,
|
||||||
expectedFailed: 1,
|
expectedFailed: 1,
|
||||||
expectedCondition: batch.JobFailed,
|
expectedCondition: batch.JobFailed,
|
||||||
expectedConditionReason: "BackoffLimitExceeded",
|
expectedConditionReason: "BackoffLimitExceeded",
|
||||||
@ -1804,8 +1830,8 @@ func TestSingleJobFailedCondition(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||||
}
|
}
|
||||||
if !forget {
|
if forget {
|
||||||
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
|
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
|
||||||
}
|
}
|
||||||
if len(fakePodControl.DeletePodName) != 0 {
|
if len(fakePodControl.DeletePodName) != 0 {
|
||||||
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
|
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", 0, len(fakePodControl.DeletePodName))
|
||||||
@ -2946,6 +2972,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
|
|||||||
|
|
||||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
|
defer func() { DefaultJobBackOff = 10 * time.Second }()
|
||||||
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
updateErr error
|
updateErr error
|
||||||
@ -3711,6 +3738,7 @@ func TestJobBackoffReset(t *testing.T) {
|
|||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
|
defer func() { DefaultJobBackOff = 10 * time.Second }()
|
||||||
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
fakePodControl := controller.FakePodControl{}
|
fakePodControl := controller.FakePodControl{}
|
||||||
@ -3775,13 +3803,13 @@ func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duratio
|
|||||||
func TestJobBackoff(t *testing.T) {
|
func TestJobBackoff(t *testing.T) {
|
||||||
job := newJob(1, 1, 1, batch.NonIndexedCompletion)
|
job := newJob(1, 1, 1, batch.NonIndexedCompletion)
|
||||||
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
|
oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
|
||||||
oldPod.Status.Phase = v1.PodRunning
|
|
||||||
oldPod.ResourceVersion = "1"
|
oldPod.ResourceVersion = "1"
|
||||||
newPod := oldPod.DeepCopy()
|
newPod := oldPod.DeepCopy()
|
||||||
newPod.ResourceVersion = "2"
|
newPod.ResourceVersion = "2"
|
||||||
|
|
||||||
testCases := map[string]struct {
|
testCases := map[string]struct {
|
||||||
requeues int
|
requeues int
|
||||||
|
oldPodPhase v1.PodPhase
|
||||||
phase v1.PodPhase
|
phase v1.PodPhase
|
||||||
jobReadyPodsEnabled bool
|
jobReadyPodsEnabled bool
|
||||||
wantBackoff time.Duration
|
wantBackoff time.Duration
|
||||||
@ -3824,13 +3852,21 @@ func TestJobBackoff(t *testing.T) {
|
|||||||
"1st failure with pod updates batching": {
|
"1st failure with pod updates batching": {
|
||||||
requeues: 0,
|
requeues: 0,
|
||||||
phase: v1.PodFailed,
|
phase: v1.PodFailed,
|
||||||
|
jobReadyPodsEnabled: true,
|
||||||
wantBackoff: podUpdateBatchPeriod,
|
wantBackoff: podUpdateBatchPeriod,
|
||||||
},
|
},
|
||||||
"2nd failure with pod updates batching": {
|
"2nd failure with pod updates batching": {
|
||||||
requeues: 1,
|
requeues: 1,
|
||||||
phase: v1.PodFailed,
|
phase: v1.PodFailed,
|
||||||
|
jobReadyPodsEnabled: true,
|
||||||
wantBackoff: DefaultJobBackOff,
|
wantBackoff: DefaultJobBackOff,
|
||||||
},
|
},
|
||||||
|
"Failed pod observed again": {
|
||||||
|
requeues: 1,
|
||||||
|
oldPodPhase: v1.PodFailed,
|
||||||
|
phase: v1.PodFailed,
|
||||||
|
wantBackoff: 0,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
@ -3848,23 +3884,29 @@ func TestJobBackoff(t *testing.T) {
|
|||||||
|
|
||||||
queue.requeues = tc.requeues
|
queue.requeues = tc.requeues
|
||||||
newPod.Status.Phase = tc.phase
|
newPod.Status.Phase = tc.phase
|
||||||
|
oldPod.Status.Phase = v1.PodRunning
|
||||||
|
if tc.oldPodPhase != "" {
|
||||||
|
oldPod.Status.Phase = tc.oldPodPhase
|
||||||
|
}
|
||||||
manager.updatePod(oldPod, newPod)
|
manager.updatePod(oldPod, newPod)
|
||||||
|
if queue.duration != tc.wantBackoff {
|
||||||
if queue.duration.Nanoseconds() != int64(tc.wantBackoff)*DefaultJobBackOff.Nanoseconds() {
|
t.Errorf("unexpected backoff %v, expected %v", queue.duration, tc.wantBackoff)
|
||||||
t.Errorf("unexpected backoff %v", queue.duration)
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestJobBackoffForOnFailure(t *testing.T) {
|
func TestJobBackoffForOnFailure(t *testing.T) {
|
||||||
|
jobConditionComplete := batch.JobComplete
|
||||||
jobConditionFailed := batch.JobFailed
|
jobConditionFailed := batch.JobFailed
|
||||||
|
jobConditionSuspended := batch.JobSuspended
|
||||||
|
|
||||||
testCases := map[string]struct {
|
testCases := map[string]struct {
|
||||||
// job setup
|
// job setup
|
||||||
parallelism int32
|
parallelism int32
|
||||||
completions int32
|
completions int32
|
||||||
backoffLimit int32
|
backoffLimit int32
|
||||||
|
suspend bool
|
||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
jobKeyForget bool
|
jobKeyForget bool
|
||||||
@ -3879,50 +3921,60 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
|||||||
expectedConditionReason string
|
expectedConditionReason string
|
||||||
}{
|
}{
|
||||||
"backoffLimit 0 should have 1 pod active": {
|
"backoffLimit 0 should have 1 pod active": {
|
||||||
1, 1, 0,
|
1, 1, 0, false,
|
||||||
true, []int32{0}, v1.PodRunning,
|
false, []int32{0}, v1.PodRunning,
|
||||||
1, 0, 0, nil, "",
|
1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
|
"backoffLimit 1 with restartCount 0 should have 1 pod active": {
|
||||||
1, 1, 1,
|
1, 1, 1, false,
|
||||||
true, []int32{0}, v1.PodRunning,
|
false, []int32{0}, v1.PodRunning,
|
||||||
1, 0, 0, nil, "",
|
1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
|
"backoffLimit 1 with restartCount 1 and podRunning should have 0 pod active": {
|
||||||
1, 1, 1,
|
1, 1, 1, false,
|
||||||
true, []int32{1}, v1.PodRunning,
|
false, []int32{1}, v1.PodRunning,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
|
"backoffLimit 1 with restartCount 1 and podPending should have 0 pod active": {
|
||||||
1, 1, 1,
|
1, 1, 1, false,
|
||||||
true, []int32{1}, v1.PodPending,
|
false, []int32{1}, v1.PodPending,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podRunning - single pod": {
|
"too many job failures with podRunning - single pod": {
|
||||||
1, 5, 2,
|
1, 5, 2, false,
|
||||||
true, []int32{2}, v1.PodRunning,
|
false, []int32{2}, v1.PodRunning,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podPending - single pod": {
|
"too many job failures with podPending - single pod": {
|
||||||
1, 5, 2,
|
1, 5, 2, false,
|
||||||
true, []int32{2}, v1.PodPending,
|
false, []int32{2}, v1.PodPending,
|
||||||
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podRunning - multiple pods": {
|
"too many job failures with podRunning - multiple pods": {
|
||||||
2, 5, 2,
|
2, 5, 2, false,
|
||||||
true, []int32{1, 1}, v1.PodRunning,
|
false, []int32{1, 1}, v1.PodRunning,
|
||||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"too many job failures with podPending - multiple pods": {
|
"too many job failures with podPending - multiple pods": {
|
||||||
2, 5, 2,
|
2, 5, 2, false,
|
||||||
true, []int32{1, 1}, v1.PodPending,
|
false, []int32{1, 1}, v1.PodPending,
|
||||||
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"not enough failures": {
|
"not enough failures": {
|
||||||
2, 5, 3,
|
2, 5, 3, false,
|
||||||
true, []int32{1, 1}, v1.PodRunning,
|
false, []int32{1, 1}, v1.PodRunning,
|
||||||
2, 0, 0, nil, "",
|
2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
|
"suspending a job": {
|
||||||
|
2, 4, 6, true,
|
||||||
|
true, []int32{1, 1}, v1.PodRunning,
|
||||||
|
0, 0, 0, &jobConditionSuspended, "JobSuspended",
|
||||||
|
},
|
||||||
|
"finshed job": {
|
||||||
|
2, 4, 6, true,
|
||||||
|
true, []int32{1, 1, 2, 0}, v1.PodSucceeded,
|
||||||
|
0, 4, 0, &jobConditionComplete, "",
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for name, tc := range testCases {
|
for name, tc := range testCases {
|
||||||
@ -3943,6 +3995,7 @@ func TestJobBackoffForOnFailure(t *testing.T) {
|
|||||||
// job & pods setup
|
// job & pods setup
|
||||||
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
|
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, batch.NonIndexedCompletion)
|
||||||
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
|
job.Spec.Template.Spec.RestartPolicy = v1.RestartPolicyOnFailure
|
||||||
|
job.Spec.Suspend = pointer.Bool(tc.suspend)
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
|
||||||
for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) {
|
for i, pod := range newPodList(len(tc.restartCounts), tc.podPhase, job) {
|
||||||
@ -4003,7 +4056,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
"not enough failures with backoffLimit 0 - single pod": {
|
"not enough failures with backoffLimit 0 - single pod": {
|
||||||
1, 1, 0,
|
1, 1, 0,
|
||||||
v1.PodRunning, 1, 0,
|
v1.PodRunning, 1, 0,
|
||||||
false, true, 1, 0, 0, nil, "",
|
false, false, 1, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"not enough failures with backoffLimit 1 - single pod": {
|
"not enough failures with backoffLimit 1 - single pod": {
|
||||||
1, 1, 1,
|
1, 1, 1,
|
||||||
@ -4013,7 +4066,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
"too many failures with backoffLimit 1 - single pod": {
|
"too many failures with backoffLimit 1 - single pod": {
|
||||||
1, 1, 1,
|
1, 1, 1,
|
||||||
"", 0, 2,
|
"", 0, 2,
|
||||||
false, true, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
false, false, 0, 0, 2, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
"not enough failures with backoffLimit 6 - multiple pods": {
|
"not enough failures with backoffLimit 6 - multiple pods": {
|
||||||
2, 2, 6,
|
2, 2, 6,
|
||||||
@ -4023,7 +4076,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) {
|
|||||||
"too many failures with backoffLimit 6 - multiple pods": {
|
"too many failures with backoffLimit 6 - multiple pods": {
|
||||||
2, 2, 6,
|
2, 2, 6,
|
||||||
"", 0, 7,
|
"", 0, 7,
|
||||||
false, true, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
false, false, 0, 0, 7, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user