Merge pull request #119193 from mimowo/sync-job-context

Introduce syncJobContext to limit the number of function parameters
This commit is contained in:
Kubernetes Prow Robot 2023-07-11 10:33:30 -07:00 committed by GitHub
commit a6890b361d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 119 additions and 93 deletions

View File

@ -131,6 +131,19 @@ type Controller struct {
podBackoffStore *backoffStore podBackoffStore *backoffStore
} }
type syncJobCtx struct {
job *batch.Job
pods []*v1.Pod
finishedCondition *batch.JobCondition
activePods []*v1.Pod
succeeded int32
prevSucceededIndexes orderedIntervals
succeededIndexes orderedIntervals
newBackoffRecord backoffRecord
expectedRmFinalizers sets.Set[string]
uncounted *uncountedTerminatedPods
}
// NewController creates a new Job controller that keeps the relevant pods // NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects. // in sync with their corresponding Job objects.
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller {
@ -758,8 +771,6 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if job.Status.UncountedTerminatedPods == nil { if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
} }
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key)
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters
@ -771,14 +782,20 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
return err return err
} }
activePods := controller.FilterActivePods(pods) jobCtx := &syncJobCtx{
active := int32(len(activePods)) job: &job,
newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers) pods: pods,
succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded)) activePods: controller.FilterActivePods(pods),
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed)) uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
}
active := int32(len(jobCtx.activePods))
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(jobCtx.uncounted.failed))
var ready *int32 var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = pointer.Int32(countReadyPods(activePods)) ready = pointer.Int32(countReadyPods(jobCtx.activePods))
} }
// Job first start. Set StartTime only if the job is not in the suspended state. // Job first start. Set StartTime only if the job is not in the suspended state.
@ -787,28 +804,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
job.Status.StartTime = &now job.Status.StartTime = &now
} }
newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) jobCtx.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods)
var manageJobErr error var manageJobErr error
var finishedCondition *batch.JobCondition
exceedsBackoffLimit := failed > *job.Spec.BackoffLimit exceedsBackoffLimit := failed > *job.Spec.BackoffLimit
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) jobCtx.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil { } else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) jobCtx.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
} }
} }
if finishedCondition == nil { if jobCtx.finishedCondition == nil {
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only) // check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob // OR if the number of failed jobs increased since the last syncJob
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now()) jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now())
} else if jm.pastActiveDeadline(&job) { } else if jm.pastActiveDeadline(&job) {
finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) jobCtx.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now())
} else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) {
syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time)
logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration)
@ -816,26 +832,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
} }
} }
var prevSucceededIndexes, succeededIndexes orderedIntervals
if isIndexedJob(&job) { if isIndexedJob(&job) {
prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods) jobCtx.prevSucceededIndexes, jobCtx.succeededIndexes = calculateSucceededIndexes(logger, &job, pods)
succeeded = int32(succeededIndexes.total()) jobCtx.succeeded = int32(jobCtx.succeededIndexes.total())
} }
suspendCondChanged := false suspendCondChanged := false
// Remove active pods if Job failed. // Remove active pods if Job failed.
if finishedCondition != nil { if jobCtx.finishedCondition != nil {
deleted, err := jm.deleteActivePods(ctx, &job, activePods) deleted, err := jm.deleteActivePods(ctx, &job, jobCtx.activePods)
if deleted != active || !satisfiedExpectations { if deleted != active || !satisfiedExpectations {
// Can't declare the Job as finished yet, as there might be remaining // Can't declare the Job as finished yet, as there might be remaining
// pod finalizers or pods that are not in the informer's cache yet. // pod finalizers or pods that are not in the informer's cache yet.
finishedCondition = nil jobCtx.finishedCondition = nil
} }
active -= deleted active -= deleted
manageJobErr = err manageJobErr = err
} else { } else {
manageJobCalled := false manageJobCalled := false
if satisfiedExpectations && job.DeletionTimestamp == nil { if satisfiedExpectations && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord) active, action, manageJobErr = jm.manageJob(ctx, &job, jobCtx)
manageJobCalled = true manageJobCalled = true
} }
complete := false complete := false
@ -846,16 +861,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
// not expected to fail, but if they do, the failure is ignored. Once any // not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and // pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete. // then the job is complete.
complete = succeeded > 0 && active == 0 complete = jobCtx.succeeded > 0 && active == 0
} else { } else {
// Job specifies a number of completions. This type of job signals // Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not // success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should // start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached. // not be any remaining active pods once this count is reached.
complete = succeeded >= *job.Spec.Completions && active == 0 complete = jobCtx.succeeded >= *job.Spec.Completions && active == 0
} }
if complete { if complete {
finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) jobCtx.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now())
} else if manageJobCalled { } else if manageJobCalled {
// Update the conditions / emit events only if manageJob was called in // Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make // this syncJob. Otherwise wait for the right syncJob call to make
@ -891,7 +906,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready) needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
job.Status.Active = active job.Status.Active = active
job.Status.Ready = ready job.Status.Ready = ready
err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord) err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil { if err != nil {
return fmt.Errorf("tracking status: %w", err) return fmt.Errorf("tracking status: %w", err)
} }
@ -970,59 +985,60 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
// //
// It does this up to a limited number of Pods so that the size of .status // It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs. // doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, needsFlush bool) error {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
isIndexed := isIndexedJob(job)
isIndexed := isIndexedJob(jobCtx.job)
var podsToRemoveFinalizer []*v1.Pod var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods uncountedStatus := jobCtx.job.Status.UncountedTerminatedPods
var newSucceededIndexes []int var newSucceededIndexes []int
if isIndexed { if isIndexed {
// Sort to introduce completed Indexes in order. // Sort to introduce completed Indexes in order.
sort.Sort(byCompletionIndex(pods)) sort.Sort(byCompletionIndex(jobCtx.pods))
} }
uidsWithFinalizer := make(sets.Set[string], len(pods)) uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
for _, p := range pods { for _, p := range jobCtx.pods {
uid := string(p.UID) uid := string(p.UID)
if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) { if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
uidsWithFinalizer.Insert(uid) uidsWithFinalizer.Insert(uid)
} }
} }
// Shallow copy, as it will only be used to detect changes in the counters. // Shallow copy, as it will only be used to detect changes in the counters.
oldCounters := job.Status oldCounters := jobCtx.job.Status
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
needsFlush = true needsFlush = true
} }
podFailureCountByPolicyAction := map[string]int{} podFailureCountByPolicyAction := map[string]int{}
reachedMaxUncountedPods := false reachedMaxUncountedPods := false
for _, pod := range pods { for _, pod := range jobCtx.pods {
if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) { if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
// This pod was processed in a previous sync. // This pod was processed in a previous sync.
continue continue
} }
considerPodFailed := isPodFailed(pod, job) considerPodFailed := isPodFailed(pod, jobCtx.job)
if podutil.IsPodTerminal(pod) || considerPodFailed || finishedCond != nil || job.DeletionTimestamp != nil { if podutil.IsPodTerminal(pod) || considerPodFailed || jobCtx.finishedCondition != nil || jobCtx.job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
} }
if pod.Status.Phase == v1.PodSucceeded && !uncounted.failed.Has(string(pod.UID)) { if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
if isIndexed { if isIndexed {
// The completion index is enough to avoid recounting succeeded pods. // The completion index is enough to avoid recounting succeeded pods.
// No need to track UIDs. // No need to track UIDs.
ix := getCompletionIndex(pod.Annotations) ix := getCompletionIndex(pod.Annotations)
if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && !succeededIndexes.has(ix) { if ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions) && !jobCtx.prevSucceededIndexes.has(ix) {
newSucceededIndexes = append(newSucceededIndexes, ix) newSucceededIndexes = append(newSucceededIndexes, ix)
needsFlush = true needsFlush = true
} }
} else if !uncounted.succeeded.Has(string(pod.UID)) { } else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
needsFlush = true needsFlush = true
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID) uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
} }
} else if considerPodFailed || finishedCond != nil { } else if considerPodFailed || jobCtx.finishedCondition != nil {
// When the job is considered finished, every non-terminated pod is considered failed // When the job is considered finished, every non-terminated pod is considered failed
ix := getCompletionIndex(pod.Annotations) ix := getCompletionIndex(pod.Annotations)
if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) { if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
_, countFailed, action := matchPodFailurePolicy(job.Spec.PodFailurePolicy, pod) _, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
if action != nil { if action != nil {
podFailureCountByPolicyAction[string(*action)] += 1 podFailureCountByPolicyAction[string(*action)] += 1
} }
@ -1050,42 +1066,42 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
} }
} }
if isIndexed { if isIndexed {
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) jobCtx.succeededIndexes = jobCtx.succeededIndexes.withOrderedIndexes(newSucceededIndexes)
succeededIndexesStr := succeededIndexes.String() succeededIndexesStr := jobCtx.succeededIndexes.String()
if succeededIndexesStr != job.Status.CompletedIndexes { if succeededIndexesStr != jobCtx.job.Status.CompletedIndexes {
needsFlush = true needsFlush = true
} }
job.Status.Succeeded = int32(succeededIndexes.total()) jobCtx.job.Status.Succeeded = int32(jobCtx.succeededIndexes.total())
job.Status.CompletedIndexes = succeededIndexesStr jobCtx.job.Status.CompletedIndexes = succeededIndexesStr
} }
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if finishedCond != nil && finishedCond.Type == batch.JobFailureTarget { if jobCtx.finishedCondition != nil && jobCtx.finishedCondition.Type == batch.JobFailureTarget {
// Append the interim FailureTarget condition to update the job status with before finalizers are removed. // Append the interim FailureTarget condition to update the job status with before finalizers are removed.
job.Status.Conditions = append(job.Status.Conditions, *finishedCond) jobCtx.job.Status.Conditions = append(jobCtx.job.Status.Conditions, *jobCtx.finishedCondition)
needsFlush = true needsFlush = true
// Prepare the final Failed condition to update the job status with after the finalizers are removed. // Prepare the final Failed condition to update the job status with after the finalizers are removed.
// It is also used in the enactJobFinished function for reporting. // It is also used in the enactJobFinished function for reporting.
finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now()) jobCtx.finishedCondition = newFailedConditionForFailureTarget(jobCtx.finishedCondition, jm.clock.Now())
} }
} }
var err error var err error
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil { if jobCtx.job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, jobCtx, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush); err != nil {
return err return err
} }
jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond) jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(jobCtx.job, jobCtx.finishedCondition)
if jobFinished { if jobFinished {
needsFlush = true needsFlush = true
} }
if needsFlush { if needsFlush {
if _, err := jm.updateStatusHandler(ctx, job); err != nil { if _, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err) return fmt.Errorf("removing uncounted pods from status: %w", err)
} }
if jobFinished { if jobFinished {
jm.recordJobFinished(job, finishedCond) jm.recordJobFinished(jobCtx.job, jobCtx.finishedCondition)
} }
recordJobPodFinished(logger, job, oldCounters) recordJobPodFinished(logger, jobCtx.job, oldCounters)
} }
return nil return nil
} }
@ -1101,15 +1117,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// //
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
var err error var err error
if needsFlush { if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil { if jobCtx.job, err = jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return jobCtx.job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
err = jm.podBackoffStore.updateBackoffRecord(newBackoffRecord) err = jm.podBackoffStore.updateBackoffRecord(jobCtx.newBackoffRecord)
if err != nil { if err != nil {
// this error might undercount the backoff. // this error might undercount the backoff.
@ -1118,16 +1134,16 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
logger.Error(err, "Backoff update failed") logger.Error(err, "Backoff update failed")
} }
recordJobPodFinished(logger, job, *oldCounters) recordJobPodFinished(logger, jobCtx.job, *oldCounters)
// Shallow copy, as it will only be used to detect changes in the counters. // Shallow copy, as it will only be used to detect changes in the counters.
*oldCounters = job.Status *oldCounters = jobCtx.job.Status
needsFlush = false needsFlush = false
} }
recordJobPodFailurePolicyActions(job, podFailureCountByPolicyAction) recordJobPodFailurePolicyActions(jobCtx.job, podFailureCountByPolicyAction)
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(jobCtx.job)
if err != nil { if err != nil {
return job, needsFlush, fmt.Errorf("getting job key: %w", err) return jobCtx.job, needsFlush, fmt.Errorf("getting job key: %w", err)
} }
var rmErr error var rmErr error
if len(podsToRemoveFinalizer) > 0 { if len(podsToRemoveFinalizer) > 0 {
@ -1141,15 +1157,15 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
} }
// Failed to remove some finalizers. Attempt to update the status with the // Failed to remove some finalizers. Attempt to update the status with the
// partial progress. // partial progress.
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { if cleanUncountedPodsWithoutFinalizers(&jobCtx.job.Status, uidsWithFinalizer) {
needsFlush = true needsFlush = true
} }
if rmErr != nil && needsFlush { if rmErr != nil && needsFlush {
if job, err := jm.updateStatusHandler(ctx, job); err != nil { if job, err := jm.updateStatusHandler(ctx, jobCtx.job); err != nil {
return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) return job, needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
} }
} }
return job, needsFlush, rmErr return jobCtx.job, needsFlush, rmErr
} }
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from // cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
@ -1337,19 +1353,19 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
// in the job status. The list of failed pods can be affected by the podFailurePolicy. // in the job status. The list of failed pods can be affected by the podFailurePolicy.
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) { func getNewFinishedPods(jobCtx *syncJobCtx) (succeededPods, failedPods []*v1.Pod) {
succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { succeededPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Succeeded(), func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded return p.Status.Phase == v1.PodSucceeded
}) })
failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { failedPods = getValidPodsWithFilter(jobCtx, jobCtx.uncounted.Failed(), func(p *v1.Pod) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && jobCtx.job.Spec.PodFailurePolicy != nil {
if !isPodFailed(p, job) { if !isPodFailed(p, jobCtx.job) {
return false return false
} }
_, countFailed, _ := matchPodFailurePolicy(job.Spec.PodFailurePolicy, p) _, countFailed, _ := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, p)
return countFailed return countFailed
} else { } else {
return isPodFailed(p, job) return isPodFailed(p, jobCtx.job)
} }
}) })
return succeededPods, failedPods return succeededPods, failedPods
@ -1365,9 +1381,9 @@ func jobSuspended(job *batch.Job) bool {
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Respects back-off; does not create new pods if the back-off time has not passed // Respects back-off; does not create new pods if the back-off time has not passed
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) { func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syncJobCtx) (int32, string, error) {
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
active := int32(len(activePods)) active := int32(len(jobCtx.activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
if err != nil { if err != nil {
@ -1377,7 +1393,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if jobSuspended(job) { if jobSuspended(job) {
logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active) logger.V(4).Info("Deleting all active pods in suspended job", "job", klog.KObj(job), "active", active)
podsToDelete := activePodsForRemoval(job, activePods, int(active)) podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(active))
jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) jm.expectations.ExpectDeletions(jobKey, len(podsToDelete))
removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete) removed, err := jm.deleteJobPods(ctx, job, jobKey, podsToDelete)
active -= removed active -= removed
@ -1389,7 +1405,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
// Job does not specify a number of completions. Therefore, number active // Job does not specify a number of completions. Therefore, number active
// should be equal to parallelism, unless the job has seen at least // should be equal to parallelism, unless the job has seen at least
// once success, in which leave whatever is running, running. // once success, in which leave whatever is running, running.
if succeeded > 0 { if jobCtx.succeeded > 0 {
wantActive = active wantActive = active
} else { } else {
wantActive = parallelism wantActive = parallelism
@ -1397,7 +1413,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
} else { } else {
// Job specifies a specific number of completions. Therefore, number // Job specifies a specific number of completions. Therefore, number
// active should not ever exceed number of remaining completions. // active should not ever exceed number of remaining completions.
wantActive = *job.Spec.Completions - succeeded wantActive = *job.Spec.Completions - jobCtx.succeeded
if wantActive > parallelism { if wantActive > parallelism {
wantActive = parallelism wantActive = parallelism
} }
@ -1410,7 +1426,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
if rmAtLeast < 0 { if rmAtLeast < 0 {
rmAtLeast = 0 rmAtLeast = 0
} }
podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) podsToDelete := activePodsForRemoval(job, jobCtx.activePods, int(rmAtLeast))
if len(podsToDelete) > MaxPodCreateDeletePerSync { if len(podsToDelete) > MaxPodCreateDeletePerSync {
podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync]
} }
@ -1427,7 +1443,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
} }
if active < wantActive { if active < wantActive {
remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff) remainingTime := jobCtx.newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
if remainingTime > 0 { if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime) jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil return 0, metrics.JobSyncActionPodsCreated, nil
@ -1445,7 +1461,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
var indexesToAdd []int var indexesToAdd []int
if isIndexedJob(job) { if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions)) indexesToAdd = firstPendingIndexes(jobCtx.activePods, jobCtx.succeededIndexes, int(diff), int(*job.Spec.Completions))
diff = int32(len(indexesToAdd)) diff = int32(len(indexesToAdd))
} }
active += diff active += diff
@ -1561,19 +1577,19 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
// getValidPodsWithFilter returns the valid pods that pass the filter. // getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set // Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index. // and, for Indexed Jobs, a valid completion index.
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
var result []*v1.Pod var result []*v1.Pod
for _, p := range pods { for _, p := range jobCtx.pods {
uid := string(p.UID) uid := string(p.UID)
// Pods that don't have a completion finalizer are in the uncounted set or // Pods that don't have a completion finalizer are in the uncounted set or
// have already been accounted for in the Job status. // have already been accounted for in the Job status.
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid) { if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
continue continue
} }
if isIndexedJob(job) { if isIndexedJob(jobCtx.job) {
idx := getCompletionIndex(p.Annotations) idx := getCompletionIndex(p.Annotations)
if idx == unknownCompletionIndex || idx >= int(*job.Spec.Completions) { if idx == unknownCompletionIndex || idx >= int(*jobCtx.job.Spec.Completions) {
continue continue
} }
} }

View File

@ -1079,7 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) {
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) jobCtx := &syncJobCtx{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers}
succeededPods, failedPods := getNewFinishedPods(jobCtx)
succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded))
failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed))
if succeeded != tc.wantSucceeded { if succeeded != tc.wantSucceeded {
@ -1654,7 +1655,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if isIndexedJob(job) { if isIndexedJob(job) {
succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions))
} }
err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{}) jobCtx := &syncJobCtx{
job: job,
pods: tc.pods,
succeededIndexes: succeededIndexes,
uncounted: uncounted,
expectedRmFinalizers: tc.expectedRmFinalizers,
finishedCondition: tc.finishedCond,
newBackoffRecord: backoffRecord{},
}
err := manager.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, tc.needsFlush)
if !errors.Is(err, tc.wantErr) { if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr) t.Errorf("Got error %v, want %v", err, tc.wantErr)
} }