diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index b1c00165b1b..179c01d4463 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -811,18 +811,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if err != nil { return err } - var terminating *int32 - if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { - terminating = ptr.To(controller.CountTerminatingPods(pods)) - } jobCtx := &syncJobCtx{ job: &job, pods: pods, activePods: controller.FilterActivePods(logger, pods), - terminating: terminating, uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), } + if trackTerminatingPods(&job) { + jobCtx.terminating = ptr.To(controller.CountTerminatingPods(pods)) + } active := int32(len(jobCtx.activePods)) newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx) jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded)) @@ -896,7 +894,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { jobCtx.finishedCondition = nil } active -= deleted - if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + if trackTerminatingPods(jobCtx.job) { *jobCtx.terminating += deleted } manageJobErr = err @@ -956,11 +954,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } } + var terminating *int32 + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + terminating = jobCtx.terminating + } needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready) - needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating) + needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating) job.Status.Active = active job.Status.Ready = ready - job.Status.Terminating = jobCtx.terminating + job.Status.Terminating = terminating err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) if err != nil { return fmt.Errorf("tracking status: %w", err) @@ -1507,23 +1509,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed - if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + if trackTerminatingPods(job) { *jobCtx.terminating += removed } return active, metrics.JobSyncActionPodsDeleted, err } - var terminating int32 = 0 - if onlyReplaceFailedPods(jobCtx.job) { - // For PodFailurePolicy specified but PodReplacementPolicy disabled - // we still need to count terminating pods for replica counts - // But we will not allow updates to status. - if jobCtx.terminating == nil { - terminating = controller.CountTerminatingPods(jobCtx.pods) - } else { - terminating = *jobCtx.terminating - } - } wantActive := int32(0) if job.Spec.Completions == nil { // Job does not specify a number of completions. Therefore, number active @@ -1559,7 +1550,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed - if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + if trackTerminatingPods(job) { *jobCtx.terminating += removed } // While it is possible for a Job to require both pod creations and @@ -1569,6 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn return active, metrics.JobSyncActionPodsDeleted, err } + var terminating int32 = 0 + if onlyReplaceFailedPods(jobCtx.job) { + // When onlyReplaceFailedPods=true, then also trackTerminatingPods=true, + // and so we can use the value. + terminating = *jobCtx.terminating + } if diff := wantActive - terminating - active; diff > 0 { var remainingTime time.Duration if !hasBackoffLimitPerIndex(job) { @@ -1954,6 +1951,17 @@ func countReadyPods(pods []*v1.Pod) int32 { return cnt } +// trackTerminatingPods checks if the count of terminating pods is tracked. +// They are tracked when any the following is true: +// - JobPodReplacementPolicy is enabled to be returned in the status field, +// - only failed pods are replaced, because pod failure policy is used +func trackTerminatingPods(job *batch.Job) bool { + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + return true + } + return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil +} + // This checks if we should apply PodReplacementPolicy. // PodReplacementPolicy controls when we recreate pods if they are marked as terminating // Failed means that we recreate only once the pod has terminated.