From 48116da0ec1fd6b0a57b95bb9947770cb0251c79 Mon Sep 17 00:00:00 2001 From: cedric lamoriniere Date: Thu, 7 Sep 2017 12:00:07 +0200 Subject: [PATCH] Improve how JobController use queue for backoff Centralize the key "forget" and "requeue" process in only on method. Change the signature of the syncJob method in order to return the information if it is necessary to forget the backoff delay for a given key. --- pkg/controller/job/job_controller.go | 45 ++++++----- pkg/controller/job/job_controller_test.go | 97 ++++++++++++++--------- 2 files changed, 81 insertions(+), 61 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 6e63f84b478..e31f8394366 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -51,7 +51,7 @@ import ( // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") -const ( +var ( // DefaultJobBackOff is the max backoff period, exported for the e2e test DefaultJobBackOff = 10 * time.Second // MaxJobBackOff is the max backoff period, exported for the e2e test @@ -64,7 +64,7 @@ type JobController struct { // To allow injection of updateJobStatus for testing. updateHandler func(job *batch.Job) error - syncHandler func(jobKey string) error + syncHandler func(jobKey string) (bool, error) // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. podStoreSynced cache.InformerSynced @@ -375,9 +375,11 @@ func (jm *JobController) processNextWorkItem() bool { } defer jm.queue.Done(key) - err := jm.syncHandler(key.(string)) + forget, err := jm.syncHandler(key.(string)) if err == nil { - jm.queue.Forget(key) + if forget { + jm.queue.Forget(key) + } return true } @@ -420,7 +422,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked // concurrently with the same key. -func (jm *JobController) syncJob(key string) error { +func (jm *JobController) syncJob(key string) (bool, error) { startTime := time.Now() defer func() { glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime)) @@ -428,26 +430,25 @@ func (jm *JobController) syncJob(key string) error { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - return err + return false, err } if len(ns) == 0 || len(name) == 0 { - return fmt.Errorf("invalid job key %q: either namespace or name is missing", key) + return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key) } sharedJob, err := jm.jobLister.Jobs(ns).Get(name) if err != nil { if errors.IsNotFound(err) { glog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) - return nil + return true, nil } - return err + return false, err } job := *sharedJob // if job was finished previously, we don't want to redo the termination if IsJobFinished(&job) { - jm.queue.Forget(key) - return nil + return true, nil } // retrieve the previous number of retry @@ -460,7 +461,7 @@ func (jm *JobController) syncJob(key string) error { pods, err := jm.getPodsForJob(&job) if err != nil { - return err + return false, err } activePods := controller.FilterActivePods(pods) @@ -551,6 +552,7 @@ func (jm *JobController) syncJob(key string) error { } } + forget := false // no need to update the job if the status hasn't changed since last time if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions { job.Status.Active = active @@ -558,19 +560,18 @@ func (jm *JobController) syncJob(key string) error { job.Status.Failed = failed if err := jm.updateHandler(&job); err != nil { - return err + return false, err } + + if jobHaveNewFailure && !IsJobFinished(&job) { + // returning an error will re-enqueue Job after the backoff period + return false, fmt.Errorf("failed pod(s) detected for job key %q", key) + } + + forget = true } - if jobHaveNewFailure { - // re-enqueue Job after the backoff period - jm.queue.AddRateLimited(key) - } else { - // if no new Failure the job backoff period can be reset - jm.queue.Forget(key) - } - - return manageJobErr + return forget, manageJobErr } func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 3b613426d03..eb5dffec391 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -134,6 +134,7 @@ func TestControllerSyncJob(t *testing.T) { // pod setup podControllerError error + jobKeyForget bool pendingPods int32 activePods int32 succeededPods int32 @@ -150,102 +151,102 @@ func TestControllerSyncJob(t *testing.T) { }{ "job start": { 2, 5, 6, false, 0, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 2, 0, 2, 0, 0, nil, "", }, "WQ job start": { 2, -1, 6, false, 0, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 2, 0, 2, 0, 0, nil, "", }, "pending pods": { 2, 5, 6, false, 0, - nil, 2, 0, 0, 0, + nil, true, 2, 0, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "correct # of pods": { 2, 5, 6, false, 0, - nil, 0, 2, 0, 0, + nil, true, 0, 2, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "WQ job: correct # of pods": { 2, -1, 6, false, 0, - nil, 0, 2, 0, 0, + nil, true, 0, 2, 0, 0, 0, 0, 2, 0, 0, nil, "", }, "too few active pods": { 2, 5, 6, false, 0, - nil, 0, 1, 1, 0, + nil, true, 0, 1, 1, 0, 1, 0, 2, 1, 0, nil, "", }, "too few active pods with a dynamic job": { 2, -1, 6, false, 0, - nil, 0, 1, 0, 0, + nil, true, 0, 1, 0, 0, 1, 0, 2, 0, 0, nil, "", }, "too few active pods, with controller error": { 2, 5, 6, false, 0, - fmt.Errorf("Fake error"), 0, 1, 1, 0, + fmt.Errorf("Fake error"), true, 0, 1, 1, 0, 1, 0, 1, 1, 0, nil, "", }, "too many active pods": { 2, 5, 6, false, 0, - nil, 0, 3, 0, 0, + nil, true, 0, 3, 0, 0, 0, 1, 2, 0, 0, nil, "", }, "too many active pods, with controller error": { 2, 5, 6, false, 0, - fmt.Errorf("Fake error"), 0, 3, 0, 0, + fmt.Errorf("Fake error"), true, 0, 3, 0, 0, 0, 1, 3, 0, 0, nil, "", }, "failed pod": { 2, 5, 6, false, 0, - nil, 0, 1, 1, 1, - 1, 0, 2, 1, 1, nil, "", + fmt.Errorf("Fake error"), false, 0, 1, 1, 1, + 1, 0, 1, 1, 1, nil, "", }, "job finish": { 2, 5, 6, false, 0, - nil, 0, 0, 5, 0, + nil, true, 0, 0, 5, 0, 0, 0, 0, 5, 0, nil, "", }, "WQ job finishing": { 2, -1, 6, false, 0, - nil, 0, 1, 1, 0, + nil, true, 0, 1, 1, 0, 0, 0, 1, 1, 0, nil, "", }, "WQ job all finished": { 2, -1, 6, false, 0, - nil, 0, 0, 2, 0, + nil, true, 0, 0, 2, 0, 0, 0, 0, 2, 0, &jobConditionComplete, "", }, "WQ job all finished despite one failure": { 2, -1, 6, false, 0, - nil, 0, 0, 1, 1, + nil, true, 0, 0, 1, 1, 0, 0, 0, 1, 1, &jobConditionComplete, "", }, "more active pods than completions": { 2, 5, 6, false, 0, - nil, 0, 10, 0, 0, + nil, true, 0, 10, 0, 0, 0, 8, 2, 0, 0, nil, "", }, "status change": { 2, 5, 6, false, 0, - nil, 0, 2, 2, 0, + nil, true, 0, 2, 2, 0, 0, 0, 2, 2, 0, nil, "", }, "deleting job": { 2, 5, 6, true, 0, - nil, 1, 1, 1, 0, + nil, true, 1, 1, 1, 0, 0, 0, 2, 1, 0, nil, "", }, "limited pods": { 100, 200, 6, false, 10, - nil, 0, 0, 0, 0, + nil, true, 0, 0, 0, 0, 10, 0, 10, 0, 0, nil, "", }, "to many job sync failure": { 2, 5, 0, true, 0, - nil, 0, 0, 0, 1, + nil, true, 0, 0, 0, 1, 0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded", }, } @@ -286,7 +287,7 @@ func TestControllerSyncJob(t *testing.T) { } // run - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) // We need requeue syncJob task if podController error if tc.podControllerError != nil { @@ -298,7 +299,9 @@ func TestControllerSyncJob(t *testing.T) { t.Errorf("%s: unexpected error when syncing jobs %v", name, err) } } - + if forget != tc.jobKeyForget { + t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget) + } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != tc.expectedCreations { t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates)) @@ -371,6 +374,7 @@ func TestSyncJobPastDeadline(t *testing.T) { failedPods int32 // expectations + expectedForgetKey bool expectedDeletions int32 expectedActive int32 expectedSucceeded int32 @@ -380,22 +384,22 @@ func TestSyncJobPastDeadline(t *testing.T) { "activeDeadlineSeconds less than single pod execution": { 1, 1, 10, 15, 6, 1, 0, 0, - 1, 0, 0, 1, "DeadlineExceeded", + true, 1, 0, 0, 1, "DeadlineExceeded", }, "activeDeadlineSeconds bigger than single pod execution": { 1, 2, 10, 15, 6, 1, 1, 0, - 1, 0, 1, 1, "DeadlineExceeded", + true, 1, 0, 1, 1, "DeadlineExceeded", }, "activeDeadlineSeconds times-out before any pod starts": { 1, 1, 10, 10, 6, 0, 0, 0, - 0, 0, 0, 0, "DeadlineExceeded", + true, 0, 0, 0, 0, "DeadlineExceeded", }, "activeDeadlineSeconds with backofflimit reach": { 1, 1, 1, 10, 0, 1, 0, 2, - 1, 0, 0, 3, "BackoffLimitExceeded", + true, 1, 0, 0, 3, "BackoffLimitExceeded", }, } @@ -431,11 +435,13 @@ func TestSyncJobPastDeadline(t *testing.T) { } // run - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("%s: unexpected error when syncing jobs %v", name, err) } - + if forget != tc.expectedForgetKey { + t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForgetKey, forget) + } // validate created/deleted pods if int32(len(fakePodControl.Templates)) != 0 { t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates)) @@ -492,10 +498,13 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) { job.Status.StartTime = &start job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -518,10 +527,13 @@ func TestSyncJobComplete(t *testing.T) { job := newJob(1, 1, 6) job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Fatalf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name) if err != nil { t.Fatalf("Unexpected error when trying to get job from the store: %v", err) @@ -541,10 +553,13 @@ func TestSyncJobDeleted(t *testing.T) { manager.jobStoreSynced = alwaysReady manager.updateHandler = func(job *batch.Job) error { return nil } job := newJob(2, 2, 6) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err != nil { t.Errorf("Unexpected error when syncing jobs %v", err) } + if !forget { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget) + } if len(fakePodControl.Templates) != 0 { t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates)) } @@ -555,6 +570,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl @@ -567,10 +583,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) { } job := newJob(2, 2, 6) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) - err := manager.syncJob(getKey(job, t)) + forget, err := manager.syncJob(getKey(job, t)) if err == nil || err != updateError { t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err) } + if forget != false { + t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget) + } t.Log("Waiting for a job in the queue") key, _ := manager.queue.Get() expectedKey := getKey(job, t) @@ -1168,7 +1187,7 @@ func TestWatchJobs(t *testing.T) { // The update sent through the fakeWatcher should make its way into the workqueue, // and eventually into the syncHandler. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(key string) (bool, error) { defer close(received) ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { @@ -1177,12 +1196,12 @@ func TestWatchJobs(t *testing.T) { job, err := manager.jobLister.Jobs(ns).Get(name) if err != nil || job == nil { t.Errorf("Expected to find job under key %v: %v", key, err) - return nil + return true, nil } if !apiequality.Semantic.DeepDerivative(*job, testJob) { t.Errorf("Expected %#v, but got %#v", testJob, *job) } - return nil + return true, nil } // Start only the job watcher and the workqueue, send a watch event, // and make sure it hits the sync method. @@ -1213,7 +1232,7 @@ func TestWatchPods(t *testing.T) { received := make(chan struct{}) // The pod update sent through the fakeWatcher should figure out the managing job and // send it into the syncHandler. - manager.syncHandler = func(key string) error { + manager.syncHandler = func(key string) (bool, error) { ns, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { t.Errorf("Error getting namespace/name from key %v: %v", key, err) @@ -1225,10 +1244,10 @@ func TestWatchPods(t *testing.T) { if !apiequality.Semantic.DeepDerivative(job, testJob) { t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job) close(received) - return nil + return true, nil } close(received) - return nil + return true, nil } // Start only the pod watcher and the workqueue, send a watch event, // and make sure it hits the sync method for the right job.