Merge pull request #125515 from mimowo/refactor-terminating-counter

Refactor tracking of terminating pods in Job controller
This commit is contained in:
Kubernetes Prow Robot 2024-06-20 13:01:41 -07:00 committed by GitHub
commit cc2946e5d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -811,18 +811,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if err != nil {
return err
}
var terminating *int32
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
terminating = ptr.To(controller.CountTerminatingPods(pods))
}
jobCtx := &syncJobCtx{
job: &job,
pods: pods,
activePods: controller.FilterActivePods(logger, pods),
terminating: terminating,
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
}
if trackTerminatingPods(&job) {
jobCtx.terminating = ptr.To(controller.CountTerminatingPods(pods))
}
active := int32(len(jobCtx.activePods))
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
@ -896,7 +894,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
jobCtx.finishedCondition = nil
}
active -= deleted
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
if trackTerminatingPods(jobCtx.job) {
*jobCtx.terminating += deleted
}
manageJobErr = err
@ -956,11 +954,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
}
}
var terminating *int32
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
terminating = jobCtx.terminating
}
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !ptr.Equal(ready, job.Status.Ready)
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, jobCtx.terminating)
needsStatusUpdate = needsStatusUpdate || !ptr.Equal(job.Status.Terminating, terminating)
job.Status.Active = active
job.Status.Ready = ready
job.Status.Terminating = jobCtx.terminating
job.Status.Terminating = terminating
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil {
return fmt.Errorf("tracking status: %w", err)
@ -1507,23 +1509,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.expectations.ExpectDeletions(logger, jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
return active, metrics.JobSyncActionPodsDeleted, err
}
var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// For PodFailurePolicy specified but PodReplacementPolicy disabled
// we still need to count terminating pods for replica counts
// But we will not allow updates to status.
if jobCtx.terminating == nil {
terminating = controller.CountTerminatingPods(jobCtx.pods)
} else {
terminating = *jobCtx.terminating
}
}
wantActive := int32(0)
if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active
@ -1559,7 +1550,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
logger.V(4).Info("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive)
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
if trackTerminatingPods(job) {
*jobCtx.terminating += removed
}
// While it is possible for a Job to require both pod creations and
@ -1569,6 +1560,12 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionPodsDeleted, err
}
var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// When onlyReplaceFailedPods=true, then also trackTerminatingPods=true,
// and so we can use the value.
terminating = *jobCtx.terminating
}
if diff := wantActive - terminating - active; diff > 0 {
var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) {
@ -1954,6 +1951,17 @@ func countReadyPods(pods []*v1.Pod) int32 {
return cnt
}
// trackTerminatingPods checks if the count of terminating pods is tracked.
// They are tracked when any the following is true:
// - JobPodReplacementPolicy is enabled to be returned in the status field,
// - only failed pods are replaced, because pod failure policy is used
func trackTerminatingPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
}
// This checks if we should apply PodReplacementPolicy.
// PodReplacementPolicy controls when we recreate pods if they are marked as terminating
// Failed means that we recreate only once the pod has terminated.