From 23ea5d80d6f370262b3da4b762731349ee0bac75 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Mon, 30 Aug 2021 15:27:45 -0400 Subject: [PATCH] Fix Job tracking with finalizers for more than 500 pods When doing partial updates for uncountedTerminatedPods, the controller might have removed UIDs for Pods which still had finalizers. Also make more space by removing UIDs that don't have finalizers at the beginning of the sync. --- pkg/controller/job/job_controller.go | 106 ++++++++++++---------- pkg/controller/job/job_controller_test.go | 58 ++++++++---- 2 files changed, 102 insertions(+), 62 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1a00f6b7e21..1da83969253 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -61,7 +61,8 @@ const ( // maxUncountedPods is the maximum size the slices in // .status.uncountedTerminatedPods should have to keep their representation // roughly below 20 KB. - maxUncountedPods = 500 + maxUncountedPods = 500 + maxPodCreateDeletePerSync = 500 ) // controllerKind contains the schema.GroupVersionKind for this controller type. @@ -71,8 +72,7 @@ var ( // DefaultJobBackOff is the default backoff period, exported for the e2e test DefaultJobBackOff = 10 * time.Second // MaxJobBackOff is the max backoff period, exported for the e2e test - MaxJobBackOff = 360 * time.Second - maxPodCreateDeletePerSync = 500 + MaxJobBackOff = 360 * time.Second ) // Controller ensures that all Job objects have corresponding pods to @@ -888,9 +888,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus := job.Status.UncountedTerminatedPods var newSucceededIndexes []int if isIndexed { - // Sort to introduce completed Indexes First. + // Sort to introduce completed Indexes in order. sort.Sort(byCompletionIndex(pods)) } + uidsWithFinalizer := make(sets.String, len(pods)) + for _, p := range pods { + if hasJobTrackingFinalizer(p) { + uidsWithFinalizer.Insert(string(p.UID)) + } + } + if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { + needsFlush = true + } for _, pod := range pods { if !hasJobTrackingFinalizer(pod) { continue @@ -924,14 +933,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) } } - if len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { + if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes) > 0 { succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) job.Status.Succeeded = int32(succeededIndexes.total()) job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } podsToRemoveFinalizer = nil @@ -944,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -967,44 +976,29 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, needsFlush bool) (bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) { if needsFlush { if err := jm.updateStatusHandler(job); err != nil { return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } needsFlush = false } - var failedToRm []*v1.Pod var rmErr error if len(podsToRemoveFinalizer) > 0 { - failedToRm, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + var rmSucceded []bool + rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + for i, p := range podsToRemoveFinalizer { + if rmSucceded[i] { + uidsWithFinalizer.Delete(string(p.UID)) + } + } } - uncountedStatus := job.Status.UncountedTerminatedPods - if rmErr == nil { - needsFlush = len(uncountedStatus.Succeeded) > 0 || len(uncountedStatus.Failed) > 0 - job.Status.Succeeded += int32(len(uncountedStatus.Succeeded)) - uncountedStatus.Succeeded = nil - job.Status.Failed += int32(len(uncountedStatus.Failed)) - uncountedStatus.Failed = nil - return needsFlush, nil - } - uidsWithFinalizer := make(sets.String, len(failedToRm)) - for _, p := range failedToRm { - uidsWithFinalizer.Insert(string(p.UID)) - } - newUncounted := uncountedWithFailedFinalizerRemovals(uncountedStatus.Succeeded, uidsWithFinalizer) - if len(newUncounted) != len(uncountedStatus.Succeeded) { + // Failed to remove some finalizers. Attempt to update the status with the + // partial progress. + if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true - job.Status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted)) - uncountedStatus.Succeeded = newUncounted } - newUncounted = uncountedWithFailedFinalizerRemovals(uncountedStatus.Failed, uidsWithFinalizer) - if len(newUncounted) != len(uncountedStatus.Failed) { - needsFlush = true - job.Status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted)) - uncountedStatus.Failed = newUncounted - } - if needsFlush { + if rmErr != nil && needsFlush { if err := jm.updateStatusHandler(job); err != nil { return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) } @@ -1012,14 +1006,35 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe return needsFlush, rmErr } +// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from +// .status.uncountedTerminatedPods for which the finalizer was successfully +// removed and increments the corresponding status counters. +// Returns whether there was any status change. +func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool { + updated := false + uncountedStatus := status.UncountedTerminatedPods + newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer) + if len(newUncounted) != len(uncountedStatus.Succeeded) { + updated = true + status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted)) + uncountedStatus.Succeeded = newUncounted + } + newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer) + if len(newUncounted) != len(uncountedStatus.Failed) { + updated = true + status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted)) + uncountedStatus.Failed = newUncounted + } + return updated +} + // removeTrackingFinalizerFromPods removes tracking finalizers from Pods and -// returns the pod for which the operation failed (if the pod was deleted when -// this function was called, it's considered as the finalizer was removed -// successfully). -func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod, error) { +// returns an array of booleans where the i-th value is true if the finalizer +// of the i-th Pod was successfully removed (if the pod was deleted when this +// function was called, it's considered as the finalizer was removed successfully). +func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) { errCh := make(chan error, len(pods)) - var failed []*v1.Pod - var lock sync.Mutex + succeeded := make([]bool, len(pods)) wg := sync.WaitGroup{} wg.Add(len(pods)) for i := range pods { @@ -1030,16 +1045,15 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { errCh <- err utilruntime.HandleError(err) - lock.Lock() - failed = append(failed, pod) - lock.Unlock() return } + succeeded[i] = true } }(i) } wg.Wait() - return failed, errorFromChannel(errCh) + + return succeeded, errorFromChannel(errCh) } // enactJobFinished adds the Complete or Failed condition and records events. @@ -1072,10 +1086,10 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo return true } -func uncountedWithFailedFinalizerRemovals(uncounted []types.UID, uidsWithFinalizer sets.String) []types.UID { +func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID { var newUncounted []types.UID for _, uid := range uncounted { - if uidsWithFinalizer.Has(string(uid)) { + if include.Has(string(uid)) { newUncounted = append(newUncounted, uid) } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 527d8e42dc8..273c9d9e906 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1181,7 +1181,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, pods: []*v1.Pod{ - buildPod().phase(v1.PodSucceeded).Pod, + buildPod().uid("e").phase(v1.PodSucceeded).Pod, buildPod().phase(v1.PodFailed).Pod, buildPod().phase(v1.PodPending).Pod, buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1193,12 +1193,12 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"a", "e", "c"}, - Failed: []types.UID{"b", "f", "d"}, + Succeeded: []types.UID{"a", "c"}, + Failed: []types.UID{"b", "d"}, }, Active: 1, - Succeeded: 2, - Failed: 3, + Succeeded: 3, + Failed: 4, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, @@ -1330,6 +1330,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, }, @@ -1337,12 +1338,6 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { wantErr: mockErr, wantRmFinalizers: 2, wantStatusUpdates: []batch.JobStatus{ - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"a", "c"}, - Failed: []types.UID{"b", "d"}, - }, - }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ Succeeded: []types.UID{"c"}, @@ -1454,15 +1449,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { job: batch.Job{ Status: batch.JobStatus{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Failed: []types.UID{"a"}, + Failed: []types.UID{"a", "b"}, }, }, }, pods: func() []*v1.Pod { - pods := make([]*v1.Pod, 501) + pods := make([]*v1.Pod, 500) for i := range pods { pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod } + pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), wantRmFinalizers: 501, @@ -1476,20 +1472,50 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return uids }(), - Failed: []types.UID{"a"}, + Failed: []types.UID{"b"}, }, + Failed: 1, }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499", "500"}, + Succeeded: []types.UID{"499"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 500, + Failed: 2, + }, + }, + }, + "too many indexed finished": { + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32Ptr(501), + }, + }, + pods: func() []*v1.Pod { + pods := make([]*v1.Pod, 501) + for i := range pods { + pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod + } + return pods + }(), + wantRmFinalizers: 501, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + CompletedIndexes: "0-499", + Succeeded: 500, + }, + { + CompletedIndexes: "0-500", UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, Succeeded: 501, - Failed: 1, }, }, },