Improve how JobController use queue for backoff

Centralize the key "forget" and "requeue" process in only on method.
Change the signature of the syncJob method in order to return the
information if it is necessary to forget the backoff delay for a given
key.
This commit is contained in:
cedric lamoriniere 2017-09-07 12:00:07 +02:00
parent 1dbef2f113
commit 48116da0ec
2 changed files with 81 additions and 61 deletions

View File

@ -51,7 +51,7 @@ import (
// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
const (
var (
// DefaultJobBackOff is the max backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
@ -64,7 +64,7 @@ type JobController struct {
// To allow injection of updateJobStatus for testing.
updateHandler func(job *batch.Job) error
syncHandler func(jobKey string) error
syncHandler func(jobKey string) (bool, error)
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced
@ -375,9 +375,11 @@ func (jm *JobController) processNextWorkItem() bool {
}
defer jm.queue.Done(key)
err := jm.syncHandler(key.(string))
forget, err := jm.syncHandler(key.(string))
if err == nil {
jm.queue.Forget(key)
if forget {
jm.queue.Forget(key)
}
return true
}
@ -420,7 +422,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
// concurrently with the same key.
func (jm *JobController) syncJob(key string) error {
func (jm *JobController) syncJob(key string) (bool, error) {
startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
@ -428,26 +430,25 @@ func (jm *JobController) syncJob(key string) error {
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
return false, err
}
if len(ns) == 0 || len(name) == 0 {
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
}
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.V(4).Infof("Job has been deleted: %v", key)
jm.expectations.DeleteExpectations(key)
return nil
return true, nil
}
return err
return false, err
}
job := *sharedJob
// if job was finished previously, we don't want to redo the termination
if IsJobFinished(&job) {
jm.queue.Forget(key)
return nil
return true, nil
}
// retrieve the previous number of retry
@ -460,7 +461,7 @@ func (jm *JobController) syncJob(key string) error {
pods, err := jm.getPodsForJob(&job)
if err != nil {
return err
return false, err
}
activePods := controller.FilterActivePods(pods)
@ -551,6 +552,7 @@ func (jm *JobController) syncJob(key string) error {
}
}
forget := false
// no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
job.Status.Active = active
@ -558,19 +560,18 @@ func (jm *JobController) syncJob(key string) error {
job.Status.Failed = failed
if err := jm.updateHandler(&job); err != nil {
return err
return false, err
}
if jobHaveNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
return false, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
}
if jobHaveNewFailure {
// re-enqueue Job after the backoff period
jm.queue.AddRateLimited(key)
} else {
// if no new Failure the job backoff period can be reset
jm.queue.Forget(key)
}
return manageJobErr
return forget, manageJobErr
}
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {

View File

@ -134,6 +134,7 @@ func TestControllerSyncJob(t *testing.T) {
// pod setup
podControllerError error
jobKeyForget bool
pendingPods int32
activePods int32
succeededPods int32
@ -150,102 +151,102 @@ func TestControllerSyncJob(t *testing.T) {
}{
"job start": {
2, 5, 6, false, 0,
nil, 0, 0, 0, 0,
nil, true, 0, 0, 0, 0,
2, 0, 2, 0, 0, nil, "",
},
"WQ job start": {
2, -1, 6, false, 0,
nil, 0, 0, 0, 0,
nil, true, 0, 0, 0, 0,
2, 0, 2, 0, 0, nil, "",
},
"pending pods": {
2, 5, 6, false, 0,
nil, 2, 0, 0, 0,
nil, true, 2, 0, 0, 0,
0, 0, 2, 0, 0, nil, "",
},
"correct # of pods": {
2, 5, 6, false, 0,
nil, 0, 2, 0, 0,
nil, true, 0, 2, 0, 0,
0, 0, 2, 0, 0, nil, "",
},
"WQ job: correct # of pods": {
2, -1, 6, false, 0,
nil, 0, 2, 0, 0,
nil, true, 0, 2, 0, 0,
0, 0, 2, 0, 0, nil, "",
},
"too few active pods": {
2, 5, 6, false, 0,
nil, 0, 1, 1, 0,
nil, true, 0, 1, 1, 0,
1, 0, 2, 1, 0, nil, "",
},
"too few active pods with a dynamic job": {
2, -1, 6, false, 0,
nil, 0, 1, 0, 0,
nil, true, 0, 1, 0, 0,
1, 0, 2, 0, 0, nil, "",
},
"too few active pods, with controller error": {
2, 5, 6, false, 0,
fmt.Errorf("Fake error"), 0, 1, 1, 0,
fmt.Errorf("Fake error"), true, 0, 1, 1, 0,
1, 0, 1, 1, 0, nil, "",
},
"too many active pods": {
2, 5, 6, false, 0,
nil, 0, 3, 0, 0,
nil, true, 0, 3, 0, 0,
0, 1, 2, 0, 0, nil, "",
},
"too many active pods, with controller error": {
2, 5, 6, false, 0,
fmt.Errorf("Fake error"), 0, 3, 0, 0,
fmt.Errorf("Fake error"), true, 0, 3, 0, 0,
0, 1, 3, 0, 0, nil, "",
},
"failed pod": {
2, 5, 6, false, 0,
nil, 0, 1, 1, 1,
1, 0, 2, 1, 1, nil, "",
fmt.Errorf("Fake error"), false, 0, 1, 1, 1,
1, 0, 1, 1, 1, nil, "",
},
"job finish": {
2, 5, 6, false, 0,
nil, 0, 0, 5, 0,
nil, true, 0, 0, 5, 0,
0, 0, 0, 5, 0, nil, "",
},
"WQ job finishing": {
2, -1, 6, false, 0,
nil, 0, 1, 1, 0,
nil, true, 0, 1, 1, 0,
0, 0, 1, 1, 0, nil, "",
},
"WQ job all finished": {
2, -1, 6, false, 0,
nil, 0, 0, 2, 0,
nil, true, 0, 0, 2, 0,
0, 0, 0, 2, 0, &jobConditionComplete, "",
},
"WQ job all finished despite one failure": {
2, -1, 6, false, 0,
nil, 0, 0, 1, 1,
nil, true, 0, 0, 1, 1,
0, 0, 0, 1, 1, &jobConditionComplete, "",
},
"more active pods than completions": {
2, 5, 6, false, 0,
nil, 0, 10, 0, 0,
nil, true, 0, 10, 0, 0,
0, 8, 2, 0, 0, nil, "",
},
"status change": {
2, 5, 6, false, 0,
nil, 0, 2, 2, 0,
nil, true, 0, 2, 2, 0,
0, 0, 2, 2, 0, nil, "",
},
"deleting job": {
2, 5, 6, true, 0,
nil, 1, 1, 1, 0,
nil, true, 1, 1, 1, 0,
0, 0, 2, 1, 0, nil, "",
},
"limited pods": {
100, 200, 6, false, 10,
nil, 0, 0, 0, 0,
nil, true, 0, 0, 0, 0,
10, 0, 10, 0, 0, nil, "",
},
"to many job sync failure": {
2, 5, 0, true, 0,
nil, 0, 0, 0, 1,
nil, true, 0, 0, 0, 1,
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
},
}
@ -286,7 +287,7 @@ func TestControllerSyncJob(t *testing.T) {
}
// run
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
// We need requeue syncJob task if podController error
if tc.podControllerError != nil {
@ -298,7 +299,9 @@ func TestControllerSyncJob(t *testing.T) {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
}
}
if forget != tc.jobKeyForget {
t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget)
}
// validate created/deleted pods
if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
@ -371,6 +374,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
failedPods int32
// expectations
expectedForgetKey bool
expectedDeletions int32
expectedActive int32
expectedSucceeded int32
@ -380,22 +384,22 @@ func TestSyncJobPastDeadline(t *testing.T) {
"activeDeadlineSeconds less than single pod execution": {
1, 1, 10, 15, 6,
1, 0, 0,
1, 0, 0, 1, "DeadlineExceeded",
true, 1, 0, 0, 1, "DeadlineExceeded",
},
"activeDeadlineSeconds bigger than single pod execution": {
1, 2, 10, 15, 6,
1, 1, 0,
1, 0, 1, 1, "DeadlineExceeded",
true, 1, 0, 1, 1, "DeadlineExceeded",
},
"activeDeadlineSeconds times-out before any pod starts": {
1, 1, 10, 10, 6,
0, 0, 0,
0, 0, 0, 0, "DeadlineExceeded",
true, 0, 0, 0, 0, "DeadlineExceeded",
},
"activeDeadlineSeconds with backofflimit reach": {
1, 1, 1, 10, 0,
1, 0, 2,
1, 0, 0, 3, "BackoffLimitExceeded",
true, 1, 0, 0, 3, "BackoffLimitExceeded",
},
}
@ -431,11 +435,13 @@ func TestSyncJobPastDeadline(t *testing.T) {
}
// run
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
}
if forget != tc.expectedForgetKey {
t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForgetKey, forget)
}
// validate created/deleted pods
if int32(len(fakePodControl.Templates)) != 0 {
t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
@ -492,10 +498,13 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
job.Status.StartTime = &start
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
if len(fakePodControl.Templates) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
}
@ -518,10 +527,13 @@ func TestSyncJobComplete(t *testing.T) {
job := newJob(1, 1, 6)
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
if err != nil {
t.Fatalf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
if err != nil {
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
@ -541,10 +553,13 @@ func TestSyncJobDeleted(t *testing.T) {
manager.jobStoreSynced = alwaysReady
manager.updateHandler = func(job *batch.Job) error { return nil }
job := newJob(2, 2, 6)
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
if err != nil {
t.Errorf("Unexpected error when syncing jobs %v", err)
}
if !forget {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
}
if len(fakePodControl.Templates) != 0 {
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
}
@ -555,6 +570,7 @@ func TestSyncJobDeleted(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
@ -567,10 +583,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
}
job := newJob(2, 2, 6)
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
err := manager.syncJob(getKey(job, t))
forget, err := manager.syncJob(getKey(job, t))
if err == nil || err != updateError {
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
}
if forget != false {
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
}
t.Log("Waiting for a job in the queue")
key, _ := manager.queue.Get()
expectedKey := getKey(job, t)
@ -1168,7 +1187,7 @@ func TestWatchJobs(t *testing.T) {
// The update sent through the fakeWatcher should make its way into the workqueue,
// and eventually into the syncHandler.
manager.syncHandler = func(key string) error {
manager.syncHandler = func(key string) (bool, error) {
defer close(received)
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
@ -1177,12 +1196,12 @@ func TestWatchJobs(t *testing.T) {
job, err := manager.jobLister.Jobs(ns).Get(name)
if err != nil || job == nil {
t.Errorf("Expected to find job under key %v: %v", key, err)
return nil
return true, nil
}
if !apiequality.Semantic.DeepDerivative(*job, testJob) {
t.Errorf("Expected %#v, but got %#v", testJob, *job)
}
return nil
return true, nil
}
// Start only the job watcher and the workqueue, send a watch event,
// and make sure it hits the sync method.
@ -1213,7 +1232,7 @@ func TestWatchPods(t *testing.T) {
received := make(chan struct{})
// The pod update sent through the fakeWatcher should figure out the managing job and
// send it into the syncHandler.
manager.syncHandler = func(key string) error {
manager.syncHandler = func(key string) (bool, error) {
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
@ -1225,10 +1244,10 @@ func TestWatchPods(t *testing.T) {
if !apiequality.Semantic.DeepDerivative(job, testJob) {
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
close(received)
return nil
return true, nil
}
close(received)
return nil
return true, nil
}
// Start only the pod watcher and the workqueue, send a watch event,
// and make sure it hits the sync method for the right job.