diff --git a/pkg/controller/job/BUILD b/pkg/controller/job/BUILD index 7010e6b07af..94dfcfa513d 100644 --- a/pkg/controller/job/BUILD +++ b/pkg/controller/job/BUILD @@ -64,6 +64,7 @@ go_test( "//vendor/k8s.io/client-go/rest:go_default_library", "//vendor/k8s.io/client-go/testing:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", ], ) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 14674a6b898..366eacc463f 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -109,9 +109,13 @@ func NewJobController(podInformer coreinformers.PodInformer, jobInformer batchin } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: jm.enqueueController, + AddFunc: func(obj interface{}) { + jm.enqueueController(obj, true) + }, UpdateFunc: jm.updateJob, - DeleteFunc: jm.enqueueController, + DeleteFunc: func(obj interface{}) { + jm.enqueueController(obj, true) + }, }) jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced @@ -209,7 +213,7 @@ func (jm *JobController) addPod(obj interface{}) { return } jm.expectations.CreationObserved(jobKey) - jm.enqueueController(job) + jm.enqueueController(job, true) return } @@ -218,7 +222,7 @@ func (jm *JobController) addPod(obj interface{}) { // DO NOT observe creation because no controller should be waiting for an // orphan. for _, job := range jm.getPodJobs(pod) { - jm.enqueueController(job) + jm.enqueueController(job, true) } } @@ -242,7 +246,8 @@ func (jm *JobController) updatePod(old, cur interface{}) { return } - labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) + // the only time we want the backoff to kick-in, is when the pod failed + immediate := curPod.Status.Phase != v1.PodFailed curControllerRef := metav1.GetControllerOf(curPod) oldControllerRef := metav1.GetControllerOf(oldPod) @@ -250,7 +255,7 @@ func (jm *JobController) updatePod(old, cur interface{}) { if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { - jm.enqueueController(job) + jm.enqueueController(job, immediate) } } @@ -260,15 +265,16 @@ func (jm *JobController) updatePod(old, cur interface{}) { if job == nil { return } - jm.enqueueController(job) + jm.enqueueController(job, immediate) return } // Otherwise, it's an orphan. If anything changed, sync matching controllers // to see if anyone wants to adopt it now. + labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels) if labelChanged || controllerRefChanged { for _, job := range jm.getPodJobs(curPod) { - jm.enqueueController(job) + jm.enqueueController(job, immediate) } } } @@ -309,7 +315,7 @@ func (jm *JobController) deletePod(obj interface{}) { return } jm.expectations.DeletionObserved(jobKey) - jm.enqueueController(job) + jm.enqueueController(job, true) } func (jm *JobController) updateJob(old, cur interface{}) { @@ -321,7 +327,7 @@ func (jm *JobController) updateJob(old, cur interface{}) { if err != nil { return } - jm.enqueueController(curJob) + jm.enqueueController(curJob, true) // check if need to add a new rsync for ActiveDeadlineSeconds if curJob.Status.StartTime != nil { curADS := curJob.Spec.ActiveDeadlineSeconds @@ -341,15 +347,19 @@ func (jm *JobController) updateJob(old, cur interface{}) { } } -// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item. -func (jm *JobController) enqueueController(job interface{}) { - key, err := controller.KeyFunc(job) +// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item, +// immediate tells the controller to update the status right away, and should +// happen ONLY when there was a successful pod run. +func (jm *JobController) enqueueController(obj interface{}, immediate bool) { + key, err := controller.KeyFunc(obj) if err != nil { - utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", job, err)) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } - // Retrieves the backoff duration for this Job + if immediate { + jm.queue.Forget(key) + } backoff := getBackoff(jm.queue, key) // TODO: Handle overlapping controllers better. Either disallow them at admission time or diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index ec9aabf3fc9..e176961c64c 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -36,6 +36,7 @@ import ( restclient "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/legacyscheme" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" @@ -1338,3 +1339,70 @@ func TestJobBackoffReset(t *testing.T) { } } } + +var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{} + +type fakeRateLimitingQueue struct { + workqueue.Interface + requeues int + item interface{} + duration time.Duration +} + +func (f *fakeRateLimitingQueue) AddRateLimited(item interface{}) {} +func (f *fakeRateLimitingQueue) Forget(item interface{}) {} +func (f *fakeRateLimitingQueue) NumRequeues(item interface{}) int { + return f.requeues +} +func (f *fakeRateLimitingQueue) AddAfter(item interface{}, duration time.Duration) { + f.item = item + f.duration = duration +} + +func TestJobBackoff(t *testing.T) { + job := newJob(1, 1, 1) + oldPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job) + oldPod.Status.Phase = v1.PodRunning + oldPod.ResourceVersion = "1" + newPod := oldPod.DeepCopy() + newPod.ResourceVersion = "2" + + testCases := map[string]struct { + // inputs + requeues int + phase v1.PodPhase + + // expectation + backoff int + }{ + "1st failure": {0, v1.PodFailed, 0}, + "2nd failure": {1, v1.PodFailed, 1}, + "3rd failure": {2, v1.PodFailed, 2}, + "1st success": {0, v1.PodSucceeded, 0}, + "2nd success": {1, v1.PodSucceeded, 0}, + "1st running": {0, v1.PodSucceeded, 0}, + "2nd running": {1, v1.PodSucceeded, 0}, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &legacyscheme.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) + fakePodControl := controller.FakePodControl{} + manager.podControl = &fakePodControl + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + queue := &fakeRateLimitingQueue{} + manager.queue = queue + sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + + queue.requeues = tc.requeues + newPod.Status.Phase = tc.phase + manager.updatePod(oldPod, newPod) + + if queue.duration.Nanoseconds() != int64(tc.backoff)*DefaultJobBackOff.Nanoseconds() { + t.Errorf("unexpected backoff %v", queue.duration) + } + }) + } +}