mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #52091 from clamoriniere1A/bugfix/jobcontroller_backoff
Automatic merge from submit-queue (batch tested with PRs 52091, 52071) Bugfix: Improve how JobController use queue for backoff **What this PR does / why we need it**: In some cases, the backoff delay for a given Job is reset unnecessarily. the PR improves how JobController uses queue for backoff: - Centralize the key "forget" and "re-queue" process in only on method. - Change the signature of the syncJob method in order to return the information if it is necessary to forget the backoff delay for a given key. **Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes # Links to #51153 **Special notes for your reviewer**: **Release note**: ```release-note ```
This commit is contained in:
commit
26d72847d0
@ -51,7 +51,7 @@ import (
|
|||||||
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
// controllerKind contains the schema.GroupVersionKind for this controller type.
|
||||||
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
|
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
|
||||||
|
|
||||||
const (
|
var (
|
||||||
// DefaultJobBackOff is the max backoff period, exported for the e2e test
|
// DefaultJobBackOff is the max backoff period, exported for the e2e test
|
||||||
DefaultJobBackOff = 10 * time.Second
|
DefaultJobBackOff = 10 * time.Second
|
||||||
// MaxJobBackOff is the max backoff period, exported for the e2e test
|
// MaxJobBackOff is the max backoff period, exported for the e2e test
|
||||||
@ -64,7 +64,7 @@ type JobController struct {
|
|||||||
|
|
||||||
// To allow injection of updateJobStatus for testing.
|
// To allow injection of updateJobStatus for testing.
|
||||||
updateHandler func(job *batch.Job) error
|
updateHandler func(job *batch.Job) error
|
||||||
syncHandler func(jobKey string) error
|
syncHandler func(jobKey string) (bool, error)
|
||||||
// podStoreSynced returns true if the pod store has been synced at least once.
|
// podStoreSynced returns true if the pod store has been synced at least once.
|
||||||
// Added as a member to the struct to allow injection for testing.
|
// Added as a member to the struct to allow injection for testing.
|
||||||
podStoreSynced cache.InformerSynced
|
podStoreSynced cache.InformerSynced
|
||||||
@ -375,9 +375,11 @@ func (jm *JobController) processNextWorkItem() bool {
|
|||||||
}
|
}
|
||||||
defer jm.queue.Done(key)
|
defer jm.queue.Done(key)
|
||||||
|
|
||||||
err := jm.syncHandler(key.(string))
|
forget, err := jm.syncHandler(key.(string))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
jm.queue.Forget(key)
|
if forget {
|
||||||
|
jm.queue.Forget(key)
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,7 +422,7 @@ func (jm *JobController) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
|
|||||||
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
|
||||||
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
// it did not expect to see any more of its pods created or deleted. This function is not meant to be invoked
|
||||||
// concurrently with the same key.
|
// concurrently with the same key.
|
||||||
func (jm *JobController) syncJob(key string) error {
|
func (jm *JobController) syncJob(key string) (bool, error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
glog.V(4).Infof("Finished syncing job %q (%v)", key, time.Now().Sub(startTime))
|
||||||
@ -428,26 +430,25 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
|
|
||||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
if len(ns) == 0 || len(name) == 0 {
|
if len(ns) == 0 || len(name) == 0 {
|
||||||
return fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
|
return false, fmt.Errorf("invalid job key %q: either namespace or name is missing", key)
|
||||||
}
|
}
|
||||||
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
|
sharedJob, err := jm.jobLister.Jobs(ns).Get(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
glog.V(4).Infof("Job has been deleted: %v", key)
|
glog.V(4).Infof("Job has been deleted: %v", key)
|
||||||
jm.expectations.DeleteExpectations(key)
|
jm.expectations.DeleteExpectations(key)
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
job := *sharedJob
|
job := *sharedJob
|
||||||
|
|
||||||
// if job was finished previously, we don't want to redo the termination
|
// if job was finished previously, we don't want to redo the termination
|
||||||
if IsJobFinished(&job) {
|
if IsJobFinished(&job) {
|
||||||
jm.queue.Forget(key)
|
return true, nil
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// retrieve the previous number of retry
|
// retrieve the previous number of retry
|
||||||
@ -460,7 +461,7 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
|
|
||||||
pods, err := jm.getPodsForJob(&job)
|
pods, err := jm.getPodsForJob(&job)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
activePods := controller.FilterActivePods(pods)
|
activePods := controller.FilterActivePods(pods)
|
||||||
@ -551,6 +552,7 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
forget := false
|
||||||
// no need to update the job if the status hasn't changed since last time
|
// no need to update the job if the status hasn't changed since last time
|
||||||
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
|
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || len(job.Status.Conditions) != conditions {
|
||||||
job.Status.Active = active
|
job.Status.Active = active
|
||||||
@ -558,19 +560,18 @@ func (jm *JobController) syncJob(key string) error {
|
|||||||
job.Status.Failed = failed
|
job.Status.Failed = failed
|
||||||
|
|
||||||
if err := jm.updateHandler(&job); err != nil {
|
if err := jm.updateHandler(&job); err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if jobHaveNewFailure && !IsJobFinished(&job) {
|
||||||
|
// returning an error will re-enqueue Job after the backoff period
|
||||||
|
return false, fmt.Errorf("failed pod(s) detected for job key %q", key)
|
||||||
|
}
|
||||||
|
|
||||||
|
forget = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if jobHaveNewFailure {
|
return forget, manageJobErr
|
||||||
// re-enqueue Job after the backoff period
|
|
||||||
jm.queue.AddRateLimited(key)
|
|
||||||
} else {
|
|
||||||
// if no new Failure the job backoff period can be reset
|
|
||||||
jm.queue.Forget(key)
|
|
||||||
}
|
|
||||||
|
|
||||||
return manageJobErr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
|
func (jm *JobController) deleteJobPods(job *batch.Job, pods []*v1.Pod, errCh chan<- error) {
|
||||||
|
@ -134,6 +134,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
|
|
||||||
// pod setup
|
// pod setup
|
||||||
podControllerError error
|
podControllerError error
|
||||||
|
jobKeyForget bool
|
||||||
pendingPods int32
|
pendingPods int32
|
||||||
activePods int32
|
activePods int32
|
||||||
succeededPods int32
|
succeededPods int32
|
||||||
@ -150,102 +151,102 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
}{
|
}{
|
||||||
"job start": {
|
"job start": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 0, 0, 0,
|
nil, true, 0, 0, 0, 0,
|
||||||
2, 0, 2, 0, 0, nil, "",
|
2, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"WQ job start": {
|
"WQ job start": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 0, 0, 0,
|
nil, true, 0, 0, 0, 0,
|
||||||
2, 0, 2, 0, 0, nil, "",
|
2, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"pending pods": {
|
"pending pods": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 2, 0, 0, 0,
|
nil, true, 2, 0, 0, 0,
|
||||||
0, 0, 2, 0, 0, nil, "",
|
0, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"correct # of pods": {
|
"correct # of pods": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 2, 0, 0,
|
nil, true, 0, 2, 0, 0,
|
||||||
0, 0, 2, 0, 0, nil, "",
|
0, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"WQ job: correct # of pods": {
|
"WQ job: correct # of pods": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 2, 0, 0,
|
nil, true, 0, 2, 0, 0,
|
||||||
0, 0, 2, 0, 0, nil, "",
|
0, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"too few active pods": {
|
"too few active pods": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 1, 1, 0,
|
nil, true, 0, 1, 1, 0,
|
||||||
1, 0, 2, 1, 0, nil, "",
|
1, 0, 2, 1, 0, nil, "",
|
||||||
},
|
},
|
||||||
"too few active pods with a dynamic job": {
|
"too few active pods with a dynamic job": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 1, 0, 0,
|
nil, true, 0, 1, 0, 0,
|
||||||
1, 0, 2, 0, 0, nil, "",
|
1, 0, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"too few active pods, with controller error": {
|
"too few active pods, with controller error": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
fmt.Errorf("Fake error"), 0, 1, 1, 0,
|
fmt.Errorf("Fake error"), true, 0, 1, 1, 0,
|
||||||
1, 0, 1, 1, 0, nil, "",
|
1, 0, 1, 1, 0, nil, "",
|
||||||
},
|
},
|
||||||
"too many active pods": {
|
"too many active pods": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 3, 0, 0,
|
nil, true, 0, 3, 0, 0,
|
||||||
0, 1, 2, 0, 0, nil, "",
|
0, 1, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"too many active pods, with controller error": {
|
"too many active pods, with controller error": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
fmt.Errorf("Fake error"), 0, 3, 0, 0,
|
fmt.Errorf("Fake error"), true, 0, 3, 0, 0,
|
||||||
0, 1, 3, 0, 0, nil, "",
|
0, 1, 3, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"failed pod": {
|
"failed pod": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 1, 1, 1,
|
fmt.Errorf("Fake error"), false, 0, 1, 1, 1,
|
||||||
1, 0, 2, 1, 1, nil, "",
|
1, 0, 1, 1, 1, nil, "",
|
||||||
},
|
},
|
||||||
"job finish": {
|
"job finish": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 0, 5, 0,
|
nil, true, 0, 0, 5, 0,
|
||||||
0, 0, 0, 5, 0, nil, "",
|
0, 0, 0, 5, 0, nil, "",
|
||||||
},
|
},
|
||||||
"WQ job finishing": {
|
"WQ job finishing": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 1, 1, 0,
|
nil, true, 0, 1, 1, 0,
|
||||||
0, 0, 1, 1, 0, nil, "",
|
0, 0, 1, 1, 0, nil, "",
|
||||||
},
|
},
|
||||||
"WQ job all finished": {
|
"WQ job all finished": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 0, 2, 0,
|
nil, true, 0, 0, 2, 0,
|
||||||
0, 0, 0, 2, 0, &jobConditionComplete, "",
|
0, 0, 0, 2, 0, &jobConditionComplete, "",
|
||||||
},
|
},
|
||||||
"WQ job all finished despite one failure": {
|
"WQ job all finished despite one failure": {
|
||||||
2, -1, 6, false, 0,
|
2, -1, 6, false, 0,
|
||||||
nil, 0, 0, 1, 1,
|
nil, true, 0, 0, 1, 1,
|
||||||
0, 0, 0, 1, 1, &jobConditionComplete, "",
|
0, 0, 0, 1, 1, &jobConditionComplete, "",
|
||||||
},
|
},
|
||||||
"more active pods than completions": {
|
"more active pods than completions": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 10, 0, 0,
|
nil, true, 0, 10, 0, 0,
|
||||||
0, 8, 2, 0, 0, nil, "",
|
0, 8, 2, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"status change": {
|
"status change": {
|
||||||
2, 5, 6, false, 0,
|
2, 5, 6, false, 0,
|
||||||
nil, 0, 2, 2, 0,
|
nil, true, 0, 2, 2, 0,
|
||||||
0, 0, 2, 2, 0, nil, "",
|
0, 0, 2, 2, 0, nil, "",
|
||||||
},
|
},
|
||||||
"deleting job": {
|
"deleting job": {
|
||||||
2, 5, 6, true, 0,
|
2, 5, 6, true, 0,
|
||||||
nil, 1, 1, 1, 0,
|
nil, true, 1, 1, 1, 0,
|
||||||
0, 0, 2, 1, 0, nil, "",
|
0, 0, 2, 1, 0, nil, "",
|
||||||
},
|
},
|
||||||
"limited pods": {
|
"limited pods": {
|
||||||
100, 200, 6, false, 10,
|
100, 200, 6, false, 10,
|
||||||
nil, 0, 0, 0, 0,
|
nil, true, 0, 0, 0, 0,
|
||||||
10, 0, 10, 0, 0, nil, "",
|
10, 0, 10, 0, 0, nil, "",
|
||||||
},
|
},
|
||||||
"to many job sync failure": {
|
"to many job sync failure": {
|
||||||
2, 5, 0, true, 0,
|
2, 5, 0, true, 0,
|
||||||
nil, 0, 0, 0, 1,
|
nil, true, 0, 0, 0, 1,
|
||||||
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
0, 0, 0, 0, 1, &jobConditionFailed, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@ -286,7 +287,7 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// run
|
// run
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
|
|
||||||
// We need requeue syncJob task if podController error
|
// We need requeue syncJob task if podController error
|
||||||
if tc.podControllerError != nil {
|
if tc.podControllerError != nil {
|
||||||
@ -298,7 +299,9 @@ func TestControllerSyncJob(t *testing.T) {
|
|||||||
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
|
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if forget != tc.jobKeyForget {
|
||||||
|
t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.jobKeyForget, forget)
|
||||||
|
}
|
||||||
// validate created/deleted pods
|
// validate created/deleted pods
|
||||||
if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
|
if int32(len(fakePodControl.Templates)) != tc.expectedCreations {
|
||||||
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
|
t.Errorf("%s: unexpected number of creates. Expected %d, saw %d\n", name, tc.expectedCreations, len(fakePodControl.Templates))
|
||||||
@ -371,6 +374,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
failedPods int32
|
failedPods int32
|
||||||
|
|
||||||
// expectations
|
// expectations
|
||||||
|
expectedForgetKey bool
|
||||||
expectedDeletions int32
|
expectedDeletions int32
|
||||||
expectedActive int32
|
expectedActive int32
|
||||||
expectedSucceeded int32
|
expectedSucceeded int32
|
||||||
@ -380,22 +384,22 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
"activeDeadlineSeconds less than single pod execution": {
|
"activeDeadlineSeconds less than single pod execution": {
|
||||||
1, 1, 10, 15, 6,
|
1, 1, 10, 15, 6,
|
||||||
1, 0, 0,
|
1, 0, 0,
|
||||||
1, 0, 0, 1, "DeadlineExceeded",
|
true, 1, 0, 0, 1, "DeadlineExceeded",
|
||||||
},
|
},
|
||||||
"activeDeadlineSeconds bigger than single pod execution": {
|
"activeDeadlineSeconds bigger than single pod execution": {
|
||||||
1, 2, 10, 15, 6,
|
1, 2, 10, 15, 6,
|
||||||
1, 1, 0,
|
1, 1, 0,
|
||||||
1, 0, 1, 1, "DeadlineExceeded",
|
true, 1, 0, 1, 1, "DeadlineExceeded",
|
||||||
},
|
},
|
||||||
"activeDeadlineSeconds times-out before any pod starts": {
|
"activeDeadlineSeconds times-out before any pod starts": {
|
||||||
1, 1, 10, 10, 6,
|
1, 1, 10, 10, 6,
|
||||||
0, 0, 0,
|
0, 0, 0,
|
||||||
0, 0, 0, 0, "DeadlineExceeded",
|
true, 0, 0, 0, 0, "DeadlineExceeded",
|
||||||
},
|
},
|
||||||
"activeDeadlineSeconds with backofflimit reach": {
|
"activeDeadlineSeconds with backofflimit reach": {
|
||||||
1, 1, 1, 10, 0,
|
1, 1, 1, 10, 0,
|
||||||
1, 0, 2,
|
1, 0, 2,
|
||||||
1, 0, 0, 3, "BackoffLimitExceeded",
|
true, 1, 0, 0, 3, "BackoffLimitExceeded",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -431,11 +435,13 @@ func TestSyncJobPastDeadline(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// run
|
// run
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
|
t.Errorf("%s: unexpected error when syncing jobs %v", name, err)
|
||||||
}
|
}
|
||||||
|
if forget != tc.expectedForgetKey {
|
||||||
|
t.Errorf("%s: unexpected forget value. Expected %v, saw %v\n", name, tc.expectedForgetKey, forget)
|
||||||
|
}
|
||||||
// validate created/deleted pods
|
// validate created/deleted pods
|
||||||
if int32(len(fakePodControl.Templates)) != 0 {
|
if int32(len(fakePodControl.Templates)) != 0 {
|
||||||
t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
|
t.Errorf("%s: unexpected number of creates. Expected 0, saw %d\n", name, len(fakePodControl.Templates))
|
||||||
@ -492,10 +498,13 @@ func TestSyncPastDeadlineJobFinished(t *testing.T) {
|
|||||||
job.Status.StartTime = &start
|
job.Status.StartTime = &start
|
||||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, "DeadlineExceeded", "Job was active longer than specified deadline"))
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||||
}
|
}
|
||||||
|
if !forget {
|
||||||
|
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
|
||||||
|
}
|
||||||
if len(fakePodControl.Templates) != 0 {
|
if len(fakePodControl.Templates) != 0 {
|
||||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
|
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
|
||||||
}
|
}
|
||||||
@ -518,10 +527,13 @@ func TestSyncJobComplete(t *testing.T) {
|
|||||||
job := newJob(1, 1, 6)
|
job := newJob(1, 1, 6)
|
||||||
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", ""))
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error when syncing jobs %v", err)
|
t.Fatalf("Unexpected error when syncing jobs %v", err)
|
||||||
}
|
}
|
||||||
|
if !forget {
|
||||||
|
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
|
||||||
|
}
|
||||||
actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
|
actual, err := manager.jobLister.Jobs(job.Namespace).Get(job.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
|
t.Fatalf("Unexpected error when trying to get job from the store: %v", err)
|
||||||
@ -541,10 +553,13 @@ func TestSyncJobDeleted(t *testing.T) {
|
|||||||
manager.jobStoreSynced = alwaysReady
|
manager.jobStoreSynced = alwaysReady
|
||||||
manager.updateHandler = func(job *batch.Job) error { return nil }
|
manager.updateHandler = func(job *batch.Job) error { return nil }
|
||||||
job := newJob(2, 2, 6)
|
job := newJob(2, 2, 6)
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error when syncing jobs %v", err)
|
t.Errorf("Unexpected error when syncing jobs %v", err)
|
||||||
}
|
}
|
||||||
|
if !forget {
|
||||||
|
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", true, forget)
|
||||||
|
}
|
||||||
if len(fakePodControl.Templates) != 0 {
|
if len(fakePodControl.Templates) != 0 {
|
||||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
|
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", 0, len(fakePodControl.Templates))
|
||||||
}
|
}
|
||||||
@ -555,6 +570,7 @@ func TestSyncJobDeleted(t *testing.T) {
|
|||||||
|
|
||||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}})
|
||||||
|
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing
|
||||||
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
manager, sharedInformerFactory := newJobControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||||
fakePodControl := controller.FakePodControl{}
|
fakePodControl := controller.FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
@ -567,10 +583,13 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||||||
}
|
}
|
||||||
job := newJob(2, 2, 6)
|
job := newJob(2, 2, 6)
|
||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
err := manager.syncJob(getKey(job, t))
|
forget, err := manager.syncJob(getKey(job, t))
|
||||||
if err == nil || err != updateError {
|
if err == nil || err != updateError {
|
||||||
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
|
t.Errorf("Expected error %v when syncing jobs, got %v", updateError, err)
|
||||||
}
|
}
|
||||||
|
if forget != false {
|
||||||
|
t.Errorf("Unexpected forget value. Expected %v, saw %v\n", false, forget)
|
||||||
|
}
|
||||||
t.Log("Waiting for a job in the queue")
|
t.Log("Waiting for a job in the queue")
|
||||||
key, _ := manager.queue.Get()
|
key, _ := manager.queue.Get()
|
||||||
expectedKey := getKey(job, t)
|
expectedKey := getKey(job, t)
|
||||||
@ -1168,7 +1187,7 @@ func TestWatchJobs(t *testing.T) {
|
|||||||
|
|
||||||
// The update sent through the fakeWatcher should make its way into the workqueue,
|
// The update sent through the fakeWatcher should make its way into the workqueue,
|
||||||
// and eventually into the syncHandler.
|
// and eventually into the syncHandler.
|
||||||
manager.syncHandler = func(key string) error {
|
manager.syncHandler = func(key string) (bool, error) {
|
||||||
defer close(received)
|
defer close(received)
|
||||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1177,12 +1196,12 @@ func TestWatchJobs(t *testing.T) {
|
|||||||
job, err := manager.jobLister.Jobs(ns).Get(name)
|
job, err := manager.jobLister.Jobs(ns).Get(name)
|
||||||
if err != nil || job == nil {
|
if err != nil || job == nil {
|
||||||
t.Errorf("Expected to find job under key %v: %v", key, err)
|
t.Errorf("Expected to find job under key %v: %v", key, err)
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
if !apiequality.Semantic.DeepDerivative(*job, testJob) {
|
if !apiequality.Semantic.DeepDerivative(*job, testJob) {
|
||||||
t.Errorf("Expected %#v, but got %#v", testJob, *job)
|
t.Errorf("Expected %#v, but got %#v", testJob, *job)
|
||||||
}
|
}
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
// Start only the job watcher and the workqueue, send a watch event,
|
// Start only the job watcher and the workqueue, send a watch event,
|
||||||
// and make sure it hits the sync method.
|
// and make sure it hits the sync method.
|
||||||
@ -1213,7 +1232,7 @@ func TestWatchPods(t *testing.T) {
|
|||||||
received := make(chan struct{})
|
received := make(chan struct{})
|
||||||
// The pod update sent through the fakeWatcher should figure out the managing job and
|
// The pod update sent through the fakeWatcher should figure out the managing job and
|
||||||
// send it into the syncHandler.
|
// send it into the syncHandler.
|
||||||
manager.syncHandler = func(key string) error {
|
manager.syncHandler = func(key string) (bool, error) {
|
||||||
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
ns, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
|
t.Errorf("Error getting namespace/name from key %v: %v", key, err)
|
||||||
@ -1225,10 +1244,10 @@ func TestWatchPods(t *testing.T) {
|
|||||||
if !apiequality.Semantic.DeepDerivative(job, testJob) {
|
if !apiequality.Semantic.DeepDerivative(job, testJob) {
|
||||||
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
|
t.Errorf("\nExpected %#v,\nbut got %#v", testJob, job)
|
||||||
close(received)
|
close(received)
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
close(received)
|
close(received)
|
||||||
return nil
|
return true, nil
|
||||||
}
|
}
|
||||||
// Start only the pod watcher and the workqueue, send a watch event,
|
// Start only the pod watcher and the workqueue, send a watch event,
|
||||||
// and make sure it hits the sync method for the right job.
|
// and make sure it hits the sync method for the right job.
|
||||||
|
Loading…
Reference in New Issue
Block a user