From 4af432bab302f0b25f2e405816f6c861b75e8d0a Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 8 Mar 2021 21:52:51 +0000 Subject: [PATCH] Remove active pods past completions --- pkg/controller/job/job_controller.go | 48 +++++++++++------------ pkg/controller/job/job_controller_test.go | 13 +++++- 2 files changed, 35 insertions(+), 26 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index ba583de82a0..23d44d24b1a 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -563,11 +563,8 @@ func (jm *Controller) syncJob(key string) (bool, error) { // success by having that number of successes. Since we do not // start more pods than there are remaining completions, there should // not be any remaining active pods once this count is reached. - if completions >= *job.Spec.Completions { + if completions >= *job.Spec.Completions && active == 0 { complete = true - if active > 0 { - jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") - } if completions > *job.Spec.Completions { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } @@ -757,14 +754,33 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded return active, err } - rmAtLeast := active - parallelism + wantActive := int32(0) + if job.Spec.Completions == nil { + // Job does not specify a number of completions. Therefore, number active + // should be equal to parallelism, unless the job has seen at least + // once success, in which leave whatever is running, running. + if succeeded > 0 { + wantActive = active + } else { + wantActive = parallelism + } + } else { + // Job specifies a specific number of completions. Therefore, number + // active should not ever exceed number of remaining completions. + wantActive = *job.Spec.Completions - succeeded + if wantActive > parallelism { + wantActive = parallelism + } + } + + rmAtLeast := active - wantActive if rmAtLeast < 0 { rmAtLeast = 0 } podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) - klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", rmAtLeast, "target", parallelism) + klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) active -= removed if err != nil { @@ -772,25 +788,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded } } - if active < parallelism { - wantActive := int32(0) - if job.Spec.Completions == nil { - // Job does not specify a number of completions. Therefore, number active - // should be equal to parallelism, unless the job has seen at least - // once success, in which leave whatever is running, running. - if succeeded > 0 { - wantActive = active - } else { - wantActive = parallelism - } - } else { - // Job specifies a specific number of completions. Therefore, number - // active should not ever exceed number of remaining completions. - wantActive = *job.Spec.Completions - succeeded - if wantActive > parallelism { - wantActive = parallelism - } - } + if active < wantActive { diff := wantActive - active if diff < 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9d1c08d9b40..290f3121af1 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -364,7 +364,7 @@ func TestControllerSyncJob(t *testing.T) { expectedCondition: &jobConditionComplete, expectedConditionStatus: v1.ConditionTrue, }, - "more active pods than completions": { + "more active pods than parallelism": { parallelism: 2, completions: 5, backoffLimit: 6, @@ -373,6 +373,17 @@ func TestControllerSyncJob(t *testing.T) { expectedDeletions: 8, expectedActive: 2, }, + "more active pods than remaining completions": { + parallelism: 3, + completions: 4, + backoffLimit: 6, + jobKeyForget: true, + activePods: 3, + succeededPods: 2, + expectedDeletions: 1, + expectedActive: 2, + expectedSucceeded: 2, + }, "status change": { parallelism: 2, completions: 5,