diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 4a9ddac1f88..21a2b8f8d25 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -102,24 +102,43 @@ func newJobControllerFromClient(kubeClient clientset.Interface, resyncPeriod con return jm, sharedInformers } +func newPod(name string, job *batch.Job) *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: job.Spec.Selector.MatchLabels, + Namespace: job.Namespace, + OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, + }, + } +} + // create count pods with the given phase for the given job func newPodList(count int32, status v1.PodPhase, job *batch.Job) []v1.Pod { pods := []v1.Pod{} for i := int32(0); i < count; i++ { - newPod := v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%v", rand.String(10)), - Labels: job.Spec.Selector.MatchLabels, - Namespace: job.Namespace, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, - }, - Status: v1.PodStatus{Phase: status}, - } - pods = append(pods, newPod) + newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) + newPod.Status = v1.PodStatus{Phase: status} + pods = append(pods, *newPod) } return pods } +func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods int32) { + for _, pod := range newPodList(pendingPods, v1.PodPending, job) { + podIndexer.Add(&pod) + } + for _, pod := range newPodList(activePods, v1.PodRunning, job) { + podIndexer.Add(&pod) + } + for _, pod := range newPodList(succeededPods, v1.PodSucceeded, job) { + podIndexer.Add(&pod) + } + for _, pod := range newPodList(failedPods, v1.PodFailed, job) { + podIndexer.Add(&pod) + } +} + func TestControllerSyncJob(t *testing.T) { jobConditionComplete := batch.JobComplete jobConditionFailed := batch.JobFailed @@ -273,18 +292,7 @@ func TestControllerSyncJob(t *testing.T) { } sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - for _, pod := range newPodList(tc.pendingPods, v1.PodPending, job) { - podIndexer.Add(&pod) - } - for _, pod := range newPodList(tc.activePods, v1.PodRunning, job) { - podIndexer.Add(&pod) - } - for _, pod := range newPodList(tc.succeededPods, v1.PodSucceeded, job) { - podIndexer.Add(&pod) - } - for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { - podIndexer.Add(&pod) - } + setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods) // run forget, err := manager.syncJob(getKey(job, t)) @@ -424,15 +432,7 @@ func TestSyncJobPastDeadline(t *testing.T) { job.Status.StartTime = &start sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() - for _, pod := range newPodList(tc.activePods, v1.PodRunning, job) { - podIndexer.Add(&pod) - } - for _, pod := range newPodList(tc.succeededPods, v1.PodSucceeded, job) { - podIndexer.Add(&pod) - } - for _, pod := range newPodList(tc.failedPods, v1.PodFailed, job) { - podIndexer.Add(&pod) - } + setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods) // run forget, err := manager.syncJob(getKey(job, t)) @@ -680,17 +680,6 @@ func TestJobPodLookup(t *testing.T) { } } -func newPod(name string, job *batch.Job) *v1.Pod { - return &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: job.Spec.Selector.MatchLabels, - Namespace: job.Namespace, - OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, - }, - } -} - func TestGetPodsForJob(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) jm, informer := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) @@ -1269,3 +1258,78 @@ func bumpResourceVersion(obj metav1.Object) { ver, _ := strconv.ParseInt(obj.GetResourceVersion(), 10, 32) obj.SetResourceVersion(strconv.FormatInt(ver+1, 10)) } + +type pods struct { + pending int32 + active int32 + succeed int32 + failed int32 +} + +func TestJobBackoffReset(t *testing.T) { + testCases := map[string]struct { + // job setup + parallelism int32 + completions int32 + backoffLimit int32 + + // pod setup - each row is additive! + pods []pods + }{ + "parallelism=1": { + 1, 2, 1, + []pods{ + {0, 1, 0, 1}, + {0, 0, 1, 0}, + }, + }, + "parallelism=2 (just failure)": { + 2, 2, 1, + []pods{ + {0, 2, 0, 1}, + {0, 0, 1, 0}, + }, + }, + } + + for name, tc := range testCases { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + var actual *batch.Job + manager.updateHandler = func(job *batch.Job) error { + actual = job + return nil + } + + // job & pods setup + job := newJob(tc.parallelism, tc.completions, tc.backoffLimit) + key := getKey(job, t) + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() + + setPodsStatuses(podIndexer, job, tc.pods[0].pending, tc.pods[0].active, tc.pods[0].succeed, tc.pods[0].failed) + manager.queue.Add(key) + manager.processNextWorkItem() + retries := manager.queue.NumRequeues(key) + if retries != 1 { + t.Errorf("%s: expected exactly 1 retry, got %d", name, retries) + } + + job = actual + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Replace([]interface{}{actual}, actual.ResourceVersion) + setPodsStatuses(podIndexer, job, tc.pods[1].pending, tc.pods[1].active, tc.pods[1].succeed, tc.pods[1].failed) + manager.processNextWorkItem() + retries = manager.queue.NumRequeues(key) + if retries != 0 { + t.Errorf("%s: expected exactly 0 retries, got %d", name, retries) + } + if getCondition(actual, batch.JobFailed, "BackoffLimitExceeded") { + t.Errorf("%s: unexpected job failure", name) + } + } +} diff --git a/test/e2e/apps/job.go b/test/e2e/apps/job.go index 4f65be150c8..8a4b8c0caa7 100644 --- a/test/e2e/apps/job.go +++ b/test/e2e/apps/job.go @@ -77,12 +77,15 @@ var _ = SIGDescribe("Job", func() { // Worst case analysis: 15 failures, each taking 1 minute to // run due to some slowness, 1 in 2^15 chance of happening, // causing test flake. Should be very rare. - job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, completions, nil, 999) + // With the introduction of backoff limit and high failure rate this + // is hitting its timeout, the 3 is a reasonable that should make this + // test less flaky, for now. + job := framework.NewTestJob("randomlySucceedOrFail", "rand-non-local", v1.RestartPolicyNever, parallelism, 3, nil, 999) job, err := framework.CreateJob(f.ClientSet, f.Namespace.Name, job) Expect(err).NotTo(HaveOccurred()) By("Ensuring job reaches completions") - err = framework.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name, completions) + err = framework.WaitForJobFinish(f.ClientSet, f.Namespace.Name, job.Name, *job.Spec.Completions) Expect(err).NotTo(HaveOccurred()) })