From 990339d4c3e61e8acd3838cfae3066fa38a3f050 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 10 Jul 2023 09:39:13 +0200 Subject: [PATCH] Introduce syncJobContext to limit the number of function parameters --- pkg/controller/job/job_controller.go | 94 ++++++++++++++++------- pkg/controller/job/job_controller_test.go | 14 +++- 2 files changed, 80 insertions(+), 28 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 4205ae4c16b..58e0fa05db9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -131,6 +131,20 @@ type Controller struct { podBackoffStore *backoffStore } +type syncJobContext struct { + job *batch.Job + pods []*v1.Pod + finishedCondition *batch.JobCondition + activePods []*v1.Pod + succeeded int32 + prevSucceededIndexes orderedIntervals + succeededIndexes orderedIntervals + newBackoffRecord backoffRecord + expectedRmFinalizers sets.Set[string] + uncounted *uncountedTerminatedPods + podFailureCountByPolicyAction map[string]int +} + // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { @@ -742,6 +756,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return nil } + syncJobContext := &syncJobContext{} + syncJobContext.job = &job + completionMode := getCompletionMode(&job) action := metrics.JobSyncActionReconciling @@ -759,7 +776,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) - expectedRmFinalizers := jm.finalizerExpectations.getExpectedUIDs(key) + syncJobContext.uncounted = uncounted + syncJobContext.expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // and update the expectations after we've retrieved active pods from the store. If a new pod enters @@ -771,10 +789,12 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { return err } + syncJobContext.pods = pods activePods := controller.FilterActivePods(pods) + syncJobContext.activePods = activePods active := int32(len(activePods)) - newSucceededPods, newFailedPods := getNewFinishedPods(&job, pods, uncounted, expectedRmFinalizers) - succeeded := job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded)) + newSucceededPods, newFailedPods := getNewFinishedPods(syncJobContext) + syncJobContext.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(uncounted.succeeded)) failed := job.Status.Failed + int32(len(newFailedPods)) + int32(len(uncounted.failed)) var ready *int32 if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { @@ -787,28 +807,27 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job.Status.StartTime = &now } - newBackoffRecord := jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) + syncJobContext.newBackoffRecord = jm.podBackoffStore.newBackoffRecord(key, newSucceededPods, newFailedPods) var manageJobErr error - var finishedCondition *batch.JobCondition exceedsBackoffLimit := failed > *job.Spec.BackoffLimit if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { - finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) + syncJobContext.finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) } else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil { // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. - finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) + syncJobContext.finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) } } - if finishedCondition == nil { + if syncJobContext.finishedCondition == nil { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { // check if the number of pod restart exceeds backoff (for restart OnFailure only) // OR if the number of failed jobs increased since the last syncJob - finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now()) + syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit", jm.clock.Now()) } else if jm.pastActiveDeadline(&job) { - finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) + syncJobContext.finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline", jm.clock.Now()) } else if job.Spec.ActiveDeadlineSeconds != nil && !jobSuspended(&job) { syncDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second - jm.clock.Since(job.Status.StartTime.Time) logger.V(2).Info("Job has activeDeadlineSeconds configuration. Will sync this job again", "key", key, "nextSyncIn", syncDuration) @@ -819,23 +838,25 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { var prevSucceededIndexes, succeededIndexes orderedIntervals if isIndexedJob(&job) { prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(logger, &job, pods) - succeeded = int32(succeededIndexes.total()) + syncJobContext.prevSucceededIndexes = prevSucceededIndexes + syncJobContext.succeededIndexes = succeededIndexes + syncJobContext.succeeded = int32(succeededIndexes.total()) } suspendCondChanged := false // Remove active pods if Job failed. - if finishedCondition != nil { + if syncJobContext.finishedCondition != nil { deleted, err := jm.deleteActivePods(ctx, &job, activePods) if deleted != active || !satisfiedExpectations { // Can't declare the Job as finished yet, as there might be remaining // pod finalizers or pods that are not in the informer's cache yet. - finishedCondition = nil + syncJobContext.finishedCondition = nil } active -= deleted manageJobErr = err } else { manageJobCalled := false if satisfiedExpectations && job.DeletionTimestamp == nil { - active, action, manageJobErr = jm.manageJob(ctx, &job, activePods, succeeded, succeededIndexes, newBackoffRecord) + active, action, manageJobErr = jm.manageJob(ctx, syncJobContext) manageJobCalled = true } complete := false @@ -846,16 +867,16 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { // not expected to fail, but if they do, the failure is ignored. Once any // pod succeeds, the controller waits for remaining pods to finish, and // then the job is complete. - complete = succeeded > 0 && active == 0 + complete = syncJobContext.succeeded > 0 && active == 0 } else { // Job specifies a number of completions. This type of job signals // success by having that number of successes. Since we do not // start more pods than there are remaining completions, there should // not be any remaining active pods once this count is reached. - complete = succeeded >= *job.Spec.Completions && active == 0 + complete = syncJobContext.succeeded >= *job.Spec.Completions && active == 0 } if complete { - finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) + syncJobContext.finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "", jm.clock.Now()) } else if manageJobCalled { // Update the conditions / emit events only if manageJob was called in // this syncJob. Otherwise wait for the right syncJob call to make @@ -891,7 +912,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready) job.Status.Active = active job.Status.Ready = ready - err = jm.trackJobStatusAndRemoveFinalizers(ctx, &job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate, newBackoffRecord) + err = jm.trackJobStatusAndRemoveFinalizers(ctx, syncJobContext, needsStatusUpdate) if err != nil { return fmt.Errorf("tracking status: %w", err) } @@ -970,8 +991,15 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey // // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, needsFlush bool) error { logger := klog.FromContext(ctx) + job := syncJobContext.job + pods := syncJobContext.pods + finishedCond := syncJobContext.finishedCondition + expectedRmFinalizers := syncJobContext.expectedRmFinalizers + succeededIndexes := syncJobContext.succeededIndexes + uncounted := syncJobContext.uncounted + isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -1070,8 +1098,9 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job finishedCond = newFailedConditionForFailureTarget(finishedCond, jm.clock.Now()) } } + syncJobContext.podFailureCountByPolicyAction = podFailureCountByPolicyAction var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, podFailureCountByPolicyAction, needsFlush, newBackoffRecord); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, syncJobContext, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } jobFinished := !reachedMaxUncountedPods && jm.enactJobFinished(job, finishedCond) @@ -1101,8 +1130,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, syncJobContext *syncJobContext, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { logger := klog.FromContext(ctx) + job := syncJobContext.job + newBackoffRecord := syncJobContext.newBackoffRecord + podFailureCountByPolicyAction := syncJobContext.podFailureCountByPolicyAction var err error if needsFlush { if job, err = jm.updateStatusHandler(ctx, job); err != nil { @@ -1337,11 +1369,12 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string { // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted // in the job status. The list of failed pods can be affected by the podFailurePolicy. -func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) { - succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { +func getNewFinishedPods(syncJobContext *syncJobContext) (succeededPods, failedPods []*v1.Pod) { + job := syncJobContext.job + succeededPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Succeeded(), func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded }) - failedPods = getValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { + failedPods = getValidPodsWithFilter(syncJobContext, syncJobContext.uncounted.Failed(), func(p *v1.Pod) bool { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil { if !isPodFailed(p, job) { return false @@ -1365,8 +1398,14 @@ func jobSuspended(job *batch.Job) bool { // pods according to what is specified in the job.Spec. // Respects back-off; does not create new pods if the back-off time has not passed // Does NOT modify . -func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval, newBackoffRecord backoffRecord) (int32, string, error) { +func (jm *Controller) manageJob(ctx context.Context, syncJobContext *syncJobContext) (int32, string, error) { logger := klog.FromContext(ctx) + job := syncJobContext.job + activePods := syncJobContext.activePods + succeeded := syncJobContext.succeeded + succeededIndexes := syncJobContext.succeededIndexes + newBackoffRecord := syncJobContext.newBackoffRecord + active := int32(len(activePods)) parallelism := *job.Spec.Parallelism jobKey, err := controller.KeyFunc(job) @@ -1561,7 +1600,10 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. -func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { +func getValidPodsWithFilter(synJobContext *syncJobContext, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { + job := synJobContext.job + pods := synJobContext.pods + expectedRmFinalizers := synJobContext.expectedRmFinalizers var result []*v1.Pod for _, p := range pods { uid := string(p.UID) diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index e583e466563..26b62ef22dd 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1079,7 +1079,8 @@ func TestGetNewFinshedPods(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeededPods, failedPods := getNewFinishedPods(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) + syncJobContext := &syncJobContext{job: &tc.job, pods: tc.pods, uncounted: uncounted, expectedRmFinalizers: tc.expectedRmFinalizers} + succeededPods, failedPods := getNewFinishedPods(syncJobContext) succeeded := int32(len(succeededPods)) + tc.job.Status.Succeeded + int32(len(uncounted.succeeded)) failed := int32(len(failedPods)) + tc.job.Status.Failed + int32(len(uncounted.failed)) if succeeded != tc.wantSucceeded { @@ -1654,7 +1655,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if isIndexedJob(job) { succeededIndexes = succeededIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)) } - err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush, backoffRecord{}) + syncJobContext := &syncJobContext{ + job: job, + pods: tc.pods, + succeededIndexes: succeededIndexes, + uncounted: uncounted, + expectedRmFinalizers: tc.expectedRmFinalizers, + finishedCondition: tc.finishedCond, + newBackoffRecord: backoffRecord{}, + } + err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), syncJobContext, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) }