Merge pull request #99963 from alculquicondor/job_complete_active

Remove active pods past completions
This commit is contained in:
Kubernetes Prow Robot 2021-04-08 17:10:10 -07:00 committed by GitHub
commit 0172cbf56c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 26 deletions

View File

@ -568,11 +568,8 @@ func (jm *Controller) syncJob(key string) (bool, error) {
// success by having that number of successes. Since we do not // success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should // start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached. // not be any remaining active pods once this count is reached.
if completions >= *job.Spec.Completions { if completions >= *job.Spec.Completions && active == 0 {
complete = true complete = true
if active > 0 {
jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached")
}
if completions > *job.Spec.Completions { if completions > *job.Spec.Completions {
jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
} }
@ -762,14 +759,33 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
return active, err return active, err
} }
rmAtLeast := active - parallelism wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
rmAtLeast := active - wantActive
if rmAtLeast < 0 { if rmAtLeast < 0 {
rmAtLeast = 0 rmAtLeast = 0
} }
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast))
if len(podsToDelete) > 0 { if len(podsToDelete) > 0 {
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", rmAtLeast, "target", parallelism) klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism)
removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete)
active -= removed active -= removed
if err != nil { if err != nil {
@ -777,25 +793,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
} }
} }
if active < parallelism { if active < wantActive {
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running.
if succeeded > 0 {
wantActive = active
} else {
wantActive = parallelism
}
} else {
// Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded
if wantActive > parallelism {
wantActive = parallelism
}
}
diff := wantActive - active diff := wantActive - active
if diff < 0 { if diff < 0 {
utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active))

View File

@ -366,7 +366,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionComplete, expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
}, },
"more active pods than completions": { "more active pods than parallelism": {
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,
backoffLimit: 6, backoffLimit: 6,
@ -375,6 +375,17 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 8, expectedDeletions: 8,
expectedActive: 2, expectedActive: 2,
}, },
"more active pods than remaining completions": {
parallelism: 3,
completions: 4,
backoffLimit: 6,
jobKeyForget: true,
activePods: 3,
succeededPods: 2,
expectedDeletions: 1,
expectedActive: 2,
expectedSucceeded: 2,
},
"status change": { "status change": {
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,