diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 6e63f84b478..e31f8394366 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -51,7 +51,7 @@ import ( // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") -const ( +var ( // DefaultJobBackOff is the max backoff period, exported for the e2e test DefaultJobBackOff = 10 * time.Second // MaxJobBackOff is the max backoff period, exported for the e2e test @@ -64,7 +64,7 @@ type JobController struct { // To allow injection of updateJobStatus for testing. updateHandler func(job *batch.Job) error - syncHandler func(jobKey string) error + syncHandler func(jobKey string) (bool, error) // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -375,9 +375,11 @@ func (jm *JobController) processNextWorkItem() bool { } defer jm.queue.Done(key) - err := jm.syncHandler(key.(string)) + forget, err := jm.syncHandler(key.(string)) if err == nil { - jm.queue.Forget(key) + if forget { + jm.queue.Forget(key) + } return true } @@ -420,7 +422,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *JobController) syncJob(key string) error { +func (jm *JobController) syncJob(key string) (bool, error) { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) @@ -428,26 +430,25 @@ func (jm *JobController) syncJob(key string) error { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return err + return false, err } if len(ns) == 0 || len(name) == 0 { - return fmt.Errorf("invalid job key %q: either namespace or name is missing", key) + return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) - return nil + return true, nil } - return err + return false, err } job := *sharedJob // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { - jm.queue.Forget(key) - return nil + return true, nil } // retrieve the previous number of retry @@ -460,7 +461,7 @@ func (jm *JobController) syncJob(key string) error { pods, err := jm.getPodsForJob(&job) if err != nil { - return err + return false, err } activePods := controller.FilterActivePods(pods) @@ -551,6 +552,7 @@ func (jm *JobController) syncJob(key string) error { } } + forget := false // no need to update the job if the status hasn't changed since last time if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active @@ -558,19 +560,18 @@ func (jm *JobController) syncJob(key string) error { job.Status.Failed = failed if err := jm.updateHandler(&job); err != nil { - return err + return false, err } + + if jobHaveNewFailure && !IsJobFinished(&job) { + // returning an error will re-enqueue Job after the backoff period + return false, fmt.Errorf("failed pod(s) detected for job key %q", key) + } + + forget = true } - if jobHaveNewFailure { - // re-enqueue Job after the backoff period - jm.queue.AddRateLimited(key) - } else { - // if no new Failure the job backoff period can be reset - jm.queue.Forget(key) - } - - return manageJobErr + return forget, manageJobErr } func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 3b613426d03..eb5dffec391 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -134,6 +134,7 @@ func TestControllerSyncJob(t *testing.T) { // pod setup podControllerError error + jobKeyForget bool pendingPods int32 activePods int32 succeededPods int32 @@ -150,102 +151,102 @@ func TestControllerSyncJob(t *testing.T) { }{ "job start": { 2, 5, 6, false, 0, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 2, 0, 2, 0, 0, nil, "", }, "WQ job start": { 2, -1, 6, false, 0, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 2, 0, 2, 0, 0, nil, "", }, "pending pods": { 2, 5, 6, false, 0, - nil, 2, 0, 0, 0, + nil, true, 2, 0, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "correct # of pods": { 2, 5, 6, false, 0, - nil, 0, 2, 0, 0, + nil, true, 0, 2, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "WQ job: correct # of pods": { 2, -1, 6, false, 0, - nil, 0, 2, 0, 0, + nil, true, 0, 2, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "too few active pods": { 2, 5, 6, false, 0, - nil, 0, 1, 1, 0, + nil, true, 0, 1, 1, 0, 1, 0, 2, 1, 0, nil, "", }, "too few active pods with a dynamic job": { 2, -1, 6, false, 0, - nil, 0, 1, 0, 0, + nil, true, 0, 1, 0, 0, 1, 0, 2, 0, 0, nil, "", }, "too few active pods, with controller error": { 2, 5, 6, false, 0, - fmt.Errorf("Fake error"), 0, 1, 1, 0, + fmt.Errorf("Fake error"), true, 0, 1, 1, 0, 1, 0, 1, 1, 0, nil, "", }, "too many active pods": { 2, 5, 6, false, 0, - nil, 0, 3, 0, 0, + nil, true, 0, 3, 0, 0, 0, 1, 2, 0, 0, nil, "", }, "too many active pods, with controller error": { 2, 5, 6, false, 0, - fmt.Errorf("Fake error"), 0, 3, 0, 0, + fmt.Errorf("Fake error"), true, 0, 3, 0, 0, 0, 1, 3, 0, 0, nil, "", }, "failed pod": { 2, 5, 6, false, 0, - nil, 0, 1, 1, 1, - 1, 0, 2, 1, 1, nil, "", + fmt.Errorf("Fake error"), false, 0, 1, 1, 1, + 1, 0, 1, 1, 1, nil, "", }, "job finish": { 2, 5, 6, false, 0, - nil, 0, 0, 5, 0, + nil, true, 0, 0, 5, 0, 0, 0, 0, 5, 0, nil, "", }, "WQ job finishing": { 2, -1, 6, false, 0, - nil, 0, 1, 1, 0, + nil, true, 0, 1, 1, 0, 0, 0, 1, 1, 0, nil, "", }, "WQ job all finished": { 2, -1, 6, false, 0, - nil, 0, 0, 2, 0, + nil, true, 0, 0, 2, 0, 0, 0, 0, 2, 0, &jobConditionComplete, "", }, "WQ job all finished despite one failure": { 2, -1, 6, false, 0, - nil, 0, 0, 1, 1, + nil, true, 0, 0, 1, 1, 0, 0, 0, 1, 1, &jobConditionComplete, "", }, "more active pods than completions": { 2, 5, 6, false, 0, - nil, 0, 10, 0, 0, + nil, true, 0, 10, 0, 0, 0, 8, 2, 0, 0, nil, "", }, "status change": { 2, 5, 6, false, 0, - nil, 0, 2, 2, 0, + nil, true, 0, 2, 2, 0, 0, 0, 2, 2, 0, nil, "", }, "deleting job": { 2, 5, 6, true, 0, - nil, 1, 1, 1, 0, + nil, true, 1, 1, 1, 0, 0, 0, 2, 1, 0, nil, "", }, "limited pods": { 100, 200, 6, false, 10, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 10, 0, 10, 0, 0, nil, "", }, "to many job sync failure": { 2, 5, 0, true, 0, - nil, 0, 0, 0, 1, + nil, true, 0, 0, 0, 1, 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, } @@ -286,7 +287,7 @@ func TestControllerSyncJob(t *testing.T) { } // run - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) // We need requeue syncJob task if podController error if tc.podControllerError != nil { @@ -298,7 +299,9 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("%s: unexpected error when syncing jobs %v", name, err) } } - + if forget != tc.jobKeyForget { + t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget) + } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != tc.expectedCreations { t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates)) @@ -371,6 +374,7 @@ func TestSyncJobPastDeadline(t *testing.T) { failedPods int32 // expectations + expectedForgetKey bool expectedDeletions int32 expectedActive int32 expectedSucceeded int32 @@ -380,22 +384,22 @@ func TestSyncJobPastDeadline(t *testing.T) { "activeDeadlineSeconds less than single pod execution": { 1, 1, 10, 15, 6, 1, 0, 0, - 1, 0, 0, 1, "DeadlineExceeded", + true, 1, 0, 0, 1, "DeadlineExceeded", }, "activeDeadlineSeconds bigger than single pod execution": { 1, 2, 10, 15, 6, 1, 1, 0, - 1, 0, 1, 1, "DeadlineExceeded", + true, 1, 0, 1, 1, "DeadlineExceeded", }, "activeDeadlineSeconds times-out before any pod starts": { 1, 1, 10, 10, 6, 0, 0, 0, - 0, 0, 0, 0, "DeadlineExceeded", + true, 0, 0, 0, 0, "DeadlineExceeded", }, "activeDeadlineSeconds with backofflimit reach": { 1, 1, 1, 10, 0, 1, 0, 2, - 1, 0, 0, 3, "BackoffLimitExceeded", + true, 1, 0, 0, 3, "BackoffLimitExceeded", }, } @@ -431,11 +435,13 @@ func TestSyncJobPastDeadline(t *testing.T) { } // run - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("%s: unexpected error when syncing jobs %v", name, err) } - + if forget != tc.expectedForgetKey { + t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForgetKey, forget) + } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != 0 { t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates)) @@ -492,10 +498,13 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -518,10 +527,13 @@ func TestSyncJobComplete(t *testing.T) { job := newJob(1, 1, 6) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { t.Fatalf("Unexpected error when trying to get job from the store: %v", err) @@ -541,10 +553,13 @@ func TestSyncJobDeleted(t *testing.T) { manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } job := newJob(2, 2, 6) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -555,6 +570,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -567,10 +583,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) { } job := newJob(2, 2, 6) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err == nil || err != updateError { t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) } + if forget != false { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget) + } t.Log("Waiting for a job in the queue") key, _ := manager.queue.Get() expectedKey := getKey(job, t) @@ -1168,7 +1187,7 @@ func TestWatchJobs(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(key string) (bool, error) { defer close(received) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -1177,12 +1196,12 @@ func TestWatchJobs(t *testing.T) { job, err := manager.jobLister.Jobs(ns).Get(name) if err != nil || job == nil { t.Errorf("Expected to find job under key %v: %v", key, err) - return nil + return true, nil } if !apiequality.Semantic.DeepDerivative(*job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, *job) } - return nil + return true, nil } // Start only the job watcher and the workqueue, send a watch event, // and make sure it hits the sync method. @@ -1213,7 +1232,7 @@ func TestWatchPods(t *testing.T) { received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(key string) (bool, error) { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) @@ -1225,10 +1244,10 @@ func TestWatchPods(t *testing.T) { if !apiequality.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) close(received) - return nil + return true, nil } close(received) - return nil + return true, nil } // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right job.