From bf48165232f6c1f044a8a4273b38bc8127b8fecc Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 11 Jul 2023 07:48:06 +0200 Subject: [PATCH] Remarks to syncJobCtx --- pkg/controller/job/job_controller.go | 234 ++++++++++------------ pkg/controller/job/job_controller_test.go | 8 +- 2 files changed, 108 insertions(+), 134 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 58e0fa05db9..78d2b13408f 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -131,18 +131,17 @@ type Controller struct { podBackoffStore *backoffStore } -type syncJobContext struct { - job *batch.Job - pods []*v1.Pod - finishedCondition *batch.JobCondition - activePods []*v1.Pod - succeeded int32 - prevSucceededIndexes orderedIntervals - succeededIndexes orderedIntervals - newBackoffRecord backoffRecord - expectedRmFinalizers sets.Set[string] - uncounted *uncountedTerminatedPods - podFailureCountByPolicyAction map[string]int +type syncJobCtx struct { + job *batch.Job + pods []*v1.Pod + finishedCondition *batch.JobCondition + activePods []*v1.Pod + succeeded int32 + prevSucceededIndexes orderedIntervals + succeededIndexes orderedIntervals + newBackoffRecord backoffRecord + expectedRmFinalizers sets.Set[string] + uncounted *uncountedTerminatedPods } // NewController creates a new Job controller that keeps the relevant pods @@ -756,9 +755,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return nil } - syncJobContext := &syncJobContext{} - syncJobContext.job = &job - completionMode := getCompletionMode(&job) action := metrics.JobSyncActionReconciling @@ -775,9 +771,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if job.Status.UncountedTerminatedPods == nil { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) - syncJobContext.uncounted = uncounted - syncJobContext.expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters @@ -789,16 +782,20 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return err } - syncJobContext.pods = pods - activePods := controller.FilterActivePods(pods) - syncJobContext.activePods = activePods - active := int32(len(activePods)) - newSucceededPods, newFailedPods := getNewFinishedPods(syncJobContext) - syncJobContext.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded)) - failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed)) + jobCtx := &syncJobCtx{ + job: &job, + pods: pods, + activePods: controller.FilterActivePods(pods), + uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), + expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), + } + active := int32(len(jobCtx.activePods)) + newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx) + jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded)) + failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(jobCtx.uncounted.failed)) var ready *int32 if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { - ready = pointer.Int32(countReadyPods(activePods)) + ready = pointer.Int32(countReadyPods(jobCtx.activePods)) } // Job first start. Set StartTime only if the job is not in the suspended state. @@ -807,7 +804,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job.Status.StartTime = &now } - syncJobContext.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) + jobCtx.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) var manageJobErr error @@ -815,19 +812,19 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { - syncJobContext.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) + jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) } else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil { // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. - syncJobContext.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) + jobCtx.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) } } - if syncJobContext.finishedCondition == nil { + if jobCtx.finishedCondition == nil { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { // check if the number of pod restart exceeds backoff (for restart OnFailure only) // OR if the number of failed jobs increased since the last syncJob - syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now()) + jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now()) } else if jm.pastActiveDeadline(&job) { - syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) + jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) @@ -835,28 +832,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { } } - var prevSucceededIndexes, succeededIndexes orderedIntervals if isIndexedJob(&job) { - prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods) - syncJobContext.prevSucceededIndexes = prevSucceededIndexes - syncJobContext.succeededIndexes = succeededIndexes - syncJobContext.succeeded = int32(succeededIndexes.total()) + jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods) + jobCtx.succeeded = int32(jobCtx.succeededIndexes.total()) } suspendCondChanged := false // Remove active pods if Job failed. - if syncJobContext.finishedCondition != nil { - deleted, err := jm.deleteActivePods(ctx, &job, activePods) + if jobCtx.finishedCondition != nil { + deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods) if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining // pod finalizers or pods that are not in the informer's cache yet. - syncJobContext.finishedCondition = nil + jobCtx.finishedCondition = nil } active -= deleted manageJobErr = err } else { manageJobCalled := false if satisfiedExpectations && job.DeletionTimestamp == nil { - active, action, manageJobErr = jm.manageJob(ctx, syncJobContext) + active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx) manageJobCalled = true } complete := false @@ -867,16 +861,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // not expected to fail, but if they do, the failure is ignored. Once any // pod succeeds, the controller waits for remaining pods to finish, and // then the job is complete. - complete = syncJobContext.succeeded > 0 && active == 0 + complete = jobCtx.succeeded > 0 && active == 0 } else { // Job specifies a number of completions. This type of job signals // success by having that number of successes. Since we do not // start more pods than there are remaining completions, there should // not be any remaining active pods once this count is reached. - complete = syncJobContext.succeeded >= *job.Spec.Completions && active == 0 + complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0 } if complete { - syncJobContext.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) + jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) } else if manageJobCalled { // Update the conditions / emit events only if manageJob was called in // this syncJob. Otherwise wait for the right syncJob call to make @@ -912,7 +906,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(ctx, syncJobContext, needsStatusUpdate) + err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) if err != nil { return fmt.Errorf("tracking status: %w", err) } @@ -991,66 +985,60 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey // // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, needsFlush bool) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error { logger := klog.FromContext(ctx) - job := syncJobContext.job - pods := syncJobContext.pods - finishedCond := syncJobContext.finishedCondition - expectedRmFinalizers := syncJobContext.expectedRmFinalizers - succeededIndexes := syncJobContext.succeededIndexes - uncounted := syncJobContext.uncounted - isIndexed := isIndexedJob(job) + isIndexed := isIndexedJob(jobCtx.job) var podsToRemoveFinalizer []*v1.Pod - uncountedStatus := job.Status.UncountedTerminatedPods + uncountedStatus := jobCtx.job.Status.UncountedTerminatedPods var newSucceededIndexes []int if isIndexed { // Sort to introduce completed Indexes in order. - sort.Sort(byCompletionIndex(pods)) + sort.Sort(byCompletionIndex(jobCtx.pods)) } - uidsWithFinalizer := make(sets.Set[string], len(pods)) - for _, p := range pods { + uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods)) + for _, p := range jobCtx.pods { uid := string(p.UID) - if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) { + if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) { uidsWithFinalizer.Insert(uid) } } // Shallow copy, as it will only be used to detect changes in the counters. - oldCounters := job.Status - if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { + oldCounters := jobCtx.job.Status + if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) { needsFlush = true } podFailureCountByPolicyAction := map[string]int{} reachedMaxUncountedPods := false - for _, pod := range pods { - if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) { + for _, pod := range jobCtx.pods { + if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) { // This pod was processed in a previous sync. continue } - considerPodFailed := isPodFailed(pod, job) - if podutil.IsPodTerminal(pod) || considerPodFailed || finishedCond != nil || job.DeletionTimestamp != nil { + considerPodFailed := isPodFailed(pod, jobCtx.job) + if podutil.IsPodTerminal(pod) || considerPodFailed || jobCtx.finishedCondition != nil || jobCtx.job.DeletionTimestamp != nil { podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) } - if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) { + if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) { if isIndexed { // The completion index is enough to avoid recounting succeeded pods. // No need to track UIDs. ix := getCompletionIndex(pod.Annotations) - if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && !succeededIndexes.has(ix) { + if ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) && !jobCtx.prevSucceededIndexes.has(ix) { newSucceededIndexes = append(newSucceededIndexes, ix) needsFlush = true } - } else if !uncounted.succeeded.Has(string(pod.UID)) { + } else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) { needsFlush = true uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID) } - } else if considerPodFailed || finishedCond != nil { + } else if considerPodFailed || jobCtx.finishedCondition != nil { // When the job is considered finished, every non-terminated pod is considered failed ix := getCompletionIndex(pod.Annotations) - if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) { - if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { - _, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod) + if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) { + if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil { + _, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod) if action != nil { podFailureCountByPolicyAction[string(*action)] += 1 } @@ -1078,43 +1066,42 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syn } } if isIndexed { - succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) - succeededIndexesStr := succeededIndexes.String() - if succeededIndexesStr != job.Status.CompletedIndexes { + jobCtx.succeededIndexes = jobCtx.succeededIndexes.withOrderedIndexes(newSucceededIndexes) + succeededIndexesStr := jobCtx.succeededIndexes.String() + if succeededIndexesStr != jobCtx.job.Status.CompletedIndexes { needsFlush = true } - job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexesStr + jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total()) + jobCtx.job.Status.CompletedIndexes = succeededIndexesStr } if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { - if finishedCond != nil && finishedCond.Type == batch.JobFailureTarget { + if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget { // Append the interim FailureTarget condition to update the job status with before finalizers are removed. - job.Status.Conditions = append(job.Status.Conditions, *finishedCond) + jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition) needsFlush = true // Prepare the final Failed condition to update the job status with after the finalizers are removed. // It is also used in the enactJobFinished function for reporting. - finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now()) + jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now()) } } - syncJobContext.podFailureCountByPolicyAction = podFailureCountByPolicyAction var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, syncJobContext, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { + if jobCtx.job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, jobCtx, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil { return err } - jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond) + jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(jobCtx.job, jobCtx.finishedCondition) if jobFinished { needsFlush = true } if needsFlush { - if _, err := jm.updateStatusHandler(ctx, job); err != nil { + if _, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } if jobFinished { - jm.recordJobFinished(job, finishedCond) + jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition) } - recordJobPodFinished(logger, job, oldCounters) + recordJobPodFinished(logger, jobCtx.job, oldCounters) } return nil } @@ -1130,18 +1117,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syn // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) { logger := klog.FromContext(ctx) - job := syncJobContext.job - newBackoffRecord := syncJobContext.newBackoffRecord - podFailureCountByPolicyAction := syncJobContext.podFailureCountByPolicyAction var err error if needsFlush { - if job, err = jm.updateStatusHandler(ctx, job); err != nil { - return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) + if jobCtx.job, err = jm.updateStatusHandler(ctx, jobCtx.job); err != nil { + return jobCtx.job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } - err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord) + err = jm.podBackoffStore.updateBackoffRecord(jobCtx.newBackoffRecord) if err != nil { // this error might undercount the backoff. @@ -1150,16 +1134,16 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syn logger.Error(err, "Backoff update failed") } - recordJobPodFinished(logger, job, *oldCounters) + recordJobPodFinished(logger, jobCtx.job, *oldCounters) // Shallow copy, as it will only be used to detect changes in the counters. - *oldCounters = job.Status + *oldCounters = jobCtx.job.Status needsFlush = false } - recordJobPodFailurePolicyActions(job, podFailureCountByPolicyAction) + recordJobPodFailurePolicyActions(jobCtx.job, podFailureCountByPolicyAction) - jobKey, err := controller.KeyFunc(job) + jobKey, err := controller.KeyFunc(jobCtx.job) if err != nil { - return job, needsFlush, fmt.Errorf("getting job key: %w", err) + return jobCtx.job, needsFlush, fmt.Errorf("getting job key: %w", err) } var rmErr error if len(podsToRemoveFinalizer) > 0 { @@ -1173,15 +1157,15 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syn } // Failed to remove some finalizers. Attempt to update the status with the // partial progress. - if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { + if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) { needsFlush = true } if rmErr != nil && needsFlush { - if job, err := jm.updateStatusHandler(ctx, job); err != nil { + if job, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil { return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) } } - return job, needsFlush, rmErr + return jobCtx.job, needsFlush, rmErr } // cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from @@ -1369,20 +1353,19 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string { // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted // in the job status. The list of failed pods can be affected by the podFailurePolicy. -func getNewFinishedPods(syncJobContext *syncJobContext) (succeededPods, failedPods []*v1.Pod) { - job := syncJobContext.job - succeededPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Succeeded(), func(p *v1.Pod) bool { +func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) { + succeededPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Succeeded(), func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded }) - failedPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Failed(), func(p *v1.Pod) bool { - if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { - if !isPodFailed(p, job) { + failedPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Failed(), func(p *v1.Pod) bool { + if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil { + if !isPodFailed(p, jobCtx.job) { return false } - _, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) + _, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p) return countFailed } else { - return isPodFailed(p, job) + return isPodFailed(p, jobCtx.job) } }) return succeededPods, failedPods @@ -1398,15 +1381,9 @@ func jobSuspended(job *batch.Job) bool { // pods according to what is specified in the job.Spec. // Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify . -func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobContext) (int32, string, error) { +func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) { logger := klog.FromContext(ctx) - job := syncJobContext.job - activePods := syncJobContext.activePods - succeeded := syncJobContext.succeeded - succeededIndexes := syncJobContext.succeededIndexes - newBackoffRecord := syncJobContext.newBackoffRecord - - active := int32(len(activePods)) + active := int32(len(jobCtx.activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) if err != nil { @@ -1416,7 +1393,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont if jobSuspended(job) { logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) - podsToDelete := activePodsForRemoval(job, activePods, int(active)) + podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) active -= removed @@ -1428,7 +1405,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont // Job does not specify a number of completions. Therefore, number active // should be equal to parallelism, unless the job has seen at least // once success, in which leave whatever is running, running. - if succeeded > 0 { + if jobCtx.succeeded > 0 { wantActive = active } else { wantActive = parallelism @@ -1436,7 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont } else { // Job specifies a specific number of completions. Therefore, number // active should not ever exceed number of remaining completions. - wantActive = *job.Spec.Completions - succeeded + wantActive = *job.Spec.Completions - jobCtx.succeeded if wantActive > parallelism { wantActive = parallelism } @@ -1449,7 +1426,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont if rmAtLeast < 0 { rmAtLeast = 0 } - podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) + podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast)) if len(podsToDelete) > MaxPodCreateDeletePerSync { podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] } @@ -1466,7 +1443,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont } if active < wantActive { - remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) + remainingTime := jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) if remainingTime > 0 { jm.enqueueSyncJobWithDelay(logger, job, remainingTime) return 0, metrics.JobSyncActionPodsCreated, nil @@ -1484,7 +1461,7 @@ func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobCont var indexesToAdd []int if isIndexedJob(job) { - indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions)) + indexesToAdd = firstPendingIndexes(jobCtx.activePods, jobCtx.succeededIndexes, int(diff), int(*job.Spec.Completions)) diff = int32(len(indexesToAdd)) } active += diff @@ -1600,22 +1577,19 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. -func getValidPodsWithFilter(synJobContext *syncJobContext, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { - job := synJobContext.job - pods := synJobContext.pods - expectedRmFinalizers := synJobContext.expectedRmFinalizers +func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { var result []*v1.Pod - for _, p := range pods { + for _, p := range jobCtx.pods { uid := string(p.UID) // Pods that don't have a completion finalizer are in the uncounted set or // have already been accounted for in the Job status. - if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid) { + if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) { continue } - if isIndexedJob(job) { + if isIndexedJob(jobCtx.job) { idx := getCompletionIndex(p.Annotations) - if idx == unknownCompletionIndex || idx >= int(*job.Spec.Completions) { + if idx == unknownCompletionIndex || idx >= int(*jobCtx.job.Spec.Completions) { continue } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 26b62ef22dd..99ac30eae60 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1079,8 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - syncJobContext := &syncJobContext{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers} - succeededPods, failedPods := getNewFinishedPods(syncJobContext) + jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers} + succeededPods, failedPods := getNewFinishedPods(jobCtx) succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) if succeeded != tc.wantSucceeded { @@ -1655,7 +1655,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if isIndexedJob(job) { succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) } - syncJobContext := &syncJobContext{ + jobCtx := &syncJobCtx{ job: job, pods: tc.pods, succeededIndexes: succeededIndexes, @@ -1664,7 +1664,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { finishedCondition: tc.finishedCond, newBackoffRecord: backoffRecord{}, } - err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), syncJobContext, tc.needsFlush) + err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) }