Fix Job tracking with finalizers for more than 500 pods

When doing partial updates for uncountedTerminatedPods, the controller might have removed UIDs for Pods which still had finalizers.

Also make more space by removing UIDs that don't have finalizers at the beginning of the sync.
This commit is contained in:
Aldo Culquicondor 2021-08-30 15:27:45 -04:00
parent 7282c2002e
commit 23ea5d80d6
2 changed files with 102 additions and 62 deletions

View File

@ -61,7 +61,8 @@ const (
// maxUncountedPods is the maximum size the slices in // maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation // .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. // roughly below 20 KB.
maxUncountedPods = 500 maxUncountedPods = 500
maxPodCreateDeletePerSync = 500
) )
// controllerKind contains the schema.GroupVersionKind for this controller type. // controllerKind contains the schema.GroupVersionKind for this controller type.
@ -71,8 +72,7 @@ var (
// DefaultJobBackOff is the default backoff period, exported for the e2e test // DefaultJobBackOff is the default backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test // MaxJobBackOff is the max backoff period, exported for the e2e test
MaxJobBackOff = 360 * time.Second MaxJobBackOff = 360 * time.Second
maxPodCreateDeletePerSync = 500
) )
// Controller ensures that all Job objects have corresponding pods to // Controller ensures that all Job objects have corresponding pods to
@ -888,9 +888,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus := job.Status.UncountedTerminatedPods uncountedStatus := job.Status.UncountedTerminatedPods
var newSucceededIndexes []int var newSucceededIndexes []int
if isIndexed { if isIndexed {
// Sort to introduce completed Indexes First. // Sort to introduce completed Indexes in order.
sort.Sort(byCompletionIndex(pods)) sort.Sort(byCompletionIndex(pods))
} }
uidsWithFinalizer := make(sets.String, len(pods))
for _, p := range pods {
if hasJobTrackingFinalizer(p) {
uidsWithFinalizer.Insert(string(p.UID))
}
}
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true
}
for _, pod := range pods { for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) { if !hasJobTrackingFinalizer(pod) {
continue continue
@ -924,14 +933,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
} }
} }
if len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes) > 0 { if len(newSucceededIndexes) > 0 {
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
job.Status.Succeeded = int32(succeededIndexes.total()) job.Status.Succeeded = int32(succeededIndexes.total())
job.Status.CompletedIndexes = succeededIndexes.String() job.Status.CompletedIndexes = succeededIndexes.String()
} }
var err error var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err return err
} }
podsToRemoveFinalizer = nil podsToRemoveFinalizer = nil
@ -944,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String() job.Status.CompletedIndexes = succeededIndexes.String()
} }
var err error var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil { if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err return err
} }
if jm.enactJobFinished(job, finishedCond) { if jm.enactJobFinished(job, finishedCond) {
@ -967,44 +976,29 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again. // 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, needsFlush bool) (bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) {
if needsFlush { if needsFlush {
if err := jm.updateStatusHandler(job); err != nil { if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
needsFlush = false needsFlush = false
} }
var failedToRm []*v1.Pod
var rmErr error var rmErr error
if len(podsToRemoveFinalizer) > 0 { if len(podsToRemoveFinalizer) > 0 {
failedToRm, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) var rmSucceded []bool
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer)
for i, p := range podsToRemoveFinalizer {
if rmSucceded[i] {
uidsWithFinalizer.Delete(string(p.UID))
}
}
} }
uncountedStatus := job.Status.UncountedTerminatedPods // Failed to remove some finalizers. Attempt to update the status with the
if rmErr == nil { // partial progress.
needsFlush = len(uncountedStatus.Succeeded) > 0 || len(uncountedStatus.Failed) > 0 if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded))
uncountedStatus.Succeeded = nil
job.Status.Failed += int32(len(uncountedStatus.Failed))
uncountedStatus.Failed = nil
return needsFlush, nil
}
uidsWithFinalizer := make(sets.String, len(failedToRm))
for _, p := range failedToRm {
uidsWithFinalizer.Insert(string(p.UID))
}
newUncounted := uncountedWithFailedFinalizerRemovals(uncountedStatus.Succeeded, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Succeeded) {
needsFlush = true needsFlush = true
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
uncountedStatus.Succeeded = newUncounted
} }
newUncounted = uncountedWithFailedFinalizerRemovals(uncountedStatus.Failed, uidsWithFinalizer) if rmErr != nil && needsFlush {
if len(newUncounted) != len(uncountedStatus.Failed) {
needsFlush = true
job.Status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
uncountedStatus.Failed = newUncounted
}
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil { if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err) return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
} }
@ -1012,14 +1006,35 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe
return needsFlush, rmErr return needsFlush, rmErr
} }
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
// .status.uncountedTerminatedPods for which the finalizer was successfully
// removed and increments the corresponding status counters.
// Returns whether there was any status change.
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool {
updated := false
uncountedStatus := status.UncountedTerminatedPods
newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Succeeded) {
updated = true
status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
uncountedStatus.Succeeded = newUncounted
}
newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Failed) {
updated = true
status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
uncountedStatus.Failed = newUncounted
}
return updated
}
// removeTrackingFinalizerFromPods removes tracking finalizers from Pods and // removeTrackingFinalizerFromPods removes tracking finalizers from Pods and
// returns the pod for which the operation failed (if the pod was deleted when // returns an array of booleans where the i-th value is true if the finalizer
// this function was called, it's considered as the finalizer was removed // of the i-th Pod was successfully removed (if the pod was deleted when this
// successfully). // function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod, error) { func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
var failed []*v1.Pod succeeded := make([]bool, len(pods))
var lock sync.Mutex
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(pods)) wg.Add(len(pods))
for i := range pods { for i := range pods {
@ -1030,16 +1045,15 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) {
errCh <- err errCh <- err
utilruntime.HandleError(err) utilruntime.HandleError(err)
lock.Lock()
failed = append(failed, pod)
lock.Unlock()
return return
} }
succeeded[i] = true
} }
}(i) }(i)
} }
wg.Wait() wg.Wait()
return failed, errorFromChannel(errCh)
return succeeded, errorFromChannel(errCh)
} }
// enactJobFinished adds the Complete or Failed condition and records events. // enactJobFinished adds the Complete or Failed condition and records events.
@ -1072,10 +1086,10 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
return true return true
} }
func uncountedWithFailedFinalizerRemovals(uncounted []types.UID, uidsWithFinalizer sets.String) []types.UID { func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID {
var newUncounted []types.UID var newUncounted []types.UID
for _, uid := range uncounted { for _, uid := range uncounted {
if uidsWithFinalizer.Has(string(uid)) { if include.Has(string(uid)) {
newUncounted = append(newUncounted, uid) newUncounted = append(newUncounted, uid)
} }
} }

View File

@ -1181,7 +1181,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
}, },
pods: []*v1.Pod{ pods: []*v1.Pod{
buildPod().phase(v1.PodSucceeded).Pod, buildPod().uid("e").phase(v1.PodSucceeded).Pod,
buildPod().phase(v1.PodFailed).Pod, buildPod().phase(v1.PodFailed).Pod,
buildPod().phase(v1.PodPending).Pod, buildPod().phase(v1.PodPending).Pod,
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@ -1193,12 +1193,12 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "e", "c"}, Succeeded: []types.UID{"a", "c"},
Failed: []types.UID{"b", "f", "d"}, Failed: []types.UID{"b", "d"},
}, },
Active: 1, Active: 1,
Succeeded: 2, Succeeded: 3,
Failed: 3, Failed: 4,
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
@ -1330,6 +1330,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
}, },
pods: []*v1.Pod{ pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
}, },
@ -1337,12 +1338,6 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
wantErr: mockErr, wantErr: mockErr,
wantRmFinalizers: 2, wantRmFinalizers: 2,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "c"},
Failed: []types.UID{"b", "d"},
},
},
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"c"}, Succeeded: []types.UID{"c"},
@ -1454,15 +1449,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
job: batch.Job{ job: batch.Job{
Status: batch.JobStatus{ Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"}, Failed: []types.UID{"a", "b"},
}, },
}, },
}, },
pods: func() []*v1.Pod { pods: func() []*v1.Pod {
pods := make([]*v1.Pod, 501) pods := make([]*v1.Pod, 500)
for i := range pods { for i := range pods {
pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
} }
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods return pods
}(), }(),
wantRmFinalizers: 501, wantRmFinalizers: 501,
@ -1476,20 +1472,50 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
} }
return uids return uids
}(), }(),
Failed: []types.UID{"a"}, Failed: []types.UID{"b"},
}, },
Failed: 1,
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"499", "500"}, Succeeded: []types.UID{"499"},
Failed: []types.UID{"b"},
}, },
Succeeded: 499, Succeeded: 499,
Failed: 1, Failed: 1,
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
},
},
"too many indexed finished": {
job: batch.Job{
Spec: batch.JobSpec{
CompletionMode: &indexedCompletion,
Completions: pointer.Int32Ptr(501),
},
},
pods: func() []*v1.Pod {
pods := make([]*v1.Pod, 501)
for i := range pods {
pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
}
return pods
}(),
wantRmFinalizers: 501,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499",
Succeeded: 500,
},
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501, Succeeded: 501,
Failed: 1,
}, },
}, },
}, },