diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1a00f6b7e21..1da83969253 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -61,7 +61,8 @@ const ( // maxUncountedPods is the maximum size the slices in // .status.uncountedTerminatedPods should have to keep their representation // roughly below 20 KB. - maxUncountedPods = 500 + maxUncountedPods = 500 + maxPodCreateDeletePerSync = 500 ) // controllerKind contains the schema.GroupVersionKind for this controller type. @@ -71,8 +72,7 @@ var ( // DefaultJobBackOff is the default backoff period, exported for the e2e test DefaultJobBackOff = 10 * time.Second // MaxJobBackOff is the max backoff period, exported for the e2e test - MaxJobBackOff = 360 * time.Second - maxPodCreateDeletePerSync = 500 + MaxJobBackOff = 360 * time.Second ) // Controller ensures that all Job objects have corresponding pods to @@ -888,9 +888,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus := job.Status.UncountedTerminatedPods var newSucceededIndexes []int if isIndexed { - // Sort to introduce completed Indexes First. + // Sort to introduce completed Indexes in order. sort.Sort(byCompletionIndex(pods)) } + uidsWithFinalizer := make(sets.String, len(pods)) + for _, p := range pods { + if hasJobTrackingFinalizer(p) { + uidsWithFinalizer.Insert(string(p.UID)) + } + } + if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { + needsFlush = true + } for _, pod := range pods { if !hasJobTrackingFinalizer(pod) { continue @@ -924,14 +933,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) } } - if len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { + if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes) > 0 { succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) job.Status.Succeeded = int32(succeededIndexes.total()) job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } podsToRemoveFinalizer = nil @@ -944,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -967,44 +976,29 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, needsFlush bool) (bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) { if needsFlush { if err := jm.updateStatusHandler(job); err != nil { return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } needsFlush = false } - var failedToRm []*v1.Pod var rmErr error if len(podsToRemoveFinalizer) > 0 { - failedToRm, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + var rmSucceded []bool + rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + for i, p := range podsToRemoveFinalizer { + if rmSucceded[i] { + uidsWithFinalizer.Delete(string(p.UID)) + } + } } - uncountedStatus := job.Status.UncountedTerminatedPods - if rmErr == nil { - needsFlush = len(uncountedStatus.Succeeded) > 0 || len(uncountedStatus.Failed) > 0 - job.Status.Succeeded += int32(len(uncountedStatus.Succeeded)) - uncountedStatus.Succeeded = nil - job.Status.Failed += int32(len(uncountedStatus.Failed)) - uncountedStatus.Failed = nil - return needsFlush, nil - } - uidsWithFinalizer := make(sets.String, len(failedToRm)) - for _, p := range failedToRm { - uidsWithFinalizer.Insert(string(p.UID)) - } - newUncounted := uncountedWithFailedFinalizerRemovals(uncountedStatus.Succeeded, uidsWithFinalizer) - if len(newUncounted) != len(uncountedStatus.Succeeded) { + // Failed to remove some finalizers. Attempt to update the status with the + // partial progress. + if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true - job.Status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted)) - uncountedStatus.Succeeded = newUncounted } - newUncounted = uncountedWithFailedFinalizerRemovals(uncountedStatus.Failed, uidsWithFinalizer) - if len(newUncounted) != len(uncountedStatus.Failed) { - needsFlush = true - job.Status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted)) - uncountedStatus.Failed = newUncounted - } - if needsFlush { + if rmErr != nil && needsFlush { if err := jm.updateStatusHandler(job); err != nil { return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) } @@ -1012,14 +1006,35 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe return needsFlush, rmErr } +// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from +// .status.uncountedTerminatedPods for which the finalizer was successfully +// removed and increments the corresponding status counters. +// Returns whether there was any status change. +func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool { + updated := false + uncountedStatus := status.UncountedTerminatedPods + newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer) + if len(newUncounted) != len(uncountedStatus.Succeeded) { + updated = true + status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted)) + uncountedStatus.Succeeded = newUncounted + } + newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer) + if len(newUncounted) != len(uncountedStatus.Failed) { + updated = true + status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted)) + uncountedStatus.Failed = newUncounted + } + return updated +} + // removeTrackingFinalizerFromPods removes tracking finalizers from Pods and -// returns the pod for which the operation failed (if the pod was deleted when -// this function was called, it's considered as the finalizer was removed -// successfully). -func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod, error) { +// returns an array of booleans where the i-th value is true if the finalizer +// of the i-th Pod was successfully removed (if the pod was deleted when this +// function was called, it's considered as the finalizer was removed successfully). +func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) { errCh := make(chan error, len(pods)) - var failed []*v1.Pod - var lock sync.Mutex + succeeded := make([]bool, len(pods)) wg := sync.WaitGroup{} wg.Add(len(pods)) for i := range pods { @@ -1030,16 +1045,15 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { errCh <- err utilruntime.HandleError(err) - lock.Lock() - failed = append(failed, pod) - lock.Unlock() return } + succeeded[i] = true } }(i) } wg.Wait() - return failed, errorFromChannel(errCh) + + return succeeded, errorFromChannel(errCh) } // enactJobFinished adds the Complete or Failed condition and records events. @@ -1072,10 +1086,10 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo return true } -func uncountedWithFailedFinalizerRemovals(uncounted []types.UID, uidsWithFinalizer sets.String) []types.UID { +func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID { var newUncounted []types.UID for _, uid := range uncounted { - if uidsWithFinalizer.Has(string(uid)) { + if include.Has(string(uid)) { newUncounted = append(newUncounted, uid) } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 527d8e42dc8..273c9d9e906 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1181,7 +1181,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, pods: []*v1.Pod{ - buildPod().phase(v1.PodSucceeded).Pod, + buildPod().uid("e").phase(v1.PodSucceeded).Pod, buildPod().phase(v1.PodFailed).Pod, buildPod().phase(v1.PodPending).Pod, buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1193,12 +1193,12 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"a", "e", "c"}, - Failed: []types.UID{"b", "f", "d"}, + Succeeded: []types.UID{"a", "c"}, + Failed: []types.UID{"b", "d"}, }, Active: 1, - Succeeded: 2, - Failed: 3, + Succeeded: 3, + Failed: 4, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, @@ -1330,6 +1330,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, }, @@ -1337,12 +1338,6 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { wantErr: mockErr, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"a", "c"}, - Failed: []types.UID{"b", "d"}, - }, - }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"c"}, @@ -1454,15 +1449,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { job: batch.Job{ Status: batch.JobStatus{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Failed: []types.UID{"a"}, + Failed: []types.UID{"a", "b"}, }, }, }, pods: func() []*v1.Pod { - pods := make([]*v1.Pod, 501) + pods := make([]*v1.Pod, 500) for i := range pods { pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod } + pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), wantRmFinalizers: 501, @@ -1476,20 +1472,50 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return uids }(), - Failed: []types.UID{"a"}, + Failed: []types.UID{"b"}, }, + Failed: 1, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499", "500"}, + Succeeded: []types.UID{"499"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 500, + Failed: 2, + }, + }, + }, + "too many indexed finished": { + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32Ptr(501), + }, + }, + pods: func() []*v1.Pod { + pods := make([]*v1.Pod, 501) + for i := range pods { + pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod + } + return pods + }(), + wantRmFinalizers: 501, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + CompletedIndexes: "0-499", + Succeeded: 500, + }, + { + CompletedIndexes: "0-500", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 501, - Failed: 1, }, }, },