Merge pull request #104666 from alculquicondor/tracking-beta

Fix Job tracking with finalizers for more than 500 pods
This commit is contained in:
Kubernetes Prow Robot 2021-09-09 09:26:11 -07:00 committed by GitHub
commit 669de4b957
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 102 additions and 62 deletions

View File

@ -61,7 +61,8 @@ const (
// maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB.
maxUncountedPods = 500
maxUncountedPods = 500
maxPodCreateDeletePerSync = 500
)
// controllerKind contains the schema.GroupVersionKind for this controller type.
@ -71,8 +72,7 @@ var (
// DefaultJobBackOff is the default backoff period, exported for the e2e test
DefaultJobBackOff = 10 * time.Second
// MaxJobBackOff is the max backoff period, exported for the e2e test
MaxJobBackOff = 360 * time.Second
maxPodCreateDeletePerSync = 500
MaxJobBackOff = 360 * time.Second
)
// Controller ensures that all Job objects have corresponding pods to
@ -888,9 +888,18 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus := job.Status.UncountedTerminatedPods
var newSucceededIndexes []int
if isIndexed {
// Sort to introduce completed Indexes First.
// Sort to introduce completed Indexes in order.
sort.Sort(byCompletionIndex(pods))
}
uidsWithFinalizer := make(sets.String, len(pods))
for _, p := range pods {
if hasJobTrackingFinalizer(p) {
uidsWithFinalizer.Insert(string(p.UID))
}
}
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true
}
for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) {
continue
@ -924,14 +933,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
}
}
if len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes) > 0 {
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
job.Status.Succeeded = int32(succeededIndexes.total())
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil {
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err
}
podsToRemoveFinalizer = nil
@ -944,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil {
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err
}
if jm.enactJobFinished(job, finishedCond) {
@ -967,44 +976,29 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, needsFlush bool) (bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) {
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
}
needsFlush = false
}
var failedToRm []*v1.Pod
var rmErr error
if len(podsToRemoveFinalizer) > 0 {
failedToRm, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer)
var rmSucceded []bool
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer)
for i, p := range podsToRemoveFinalizer {
if rmSucceded[i] {
uidsWithFinalizer.Delete(string(p.UID))
}
}
}
uncountedStatus := job.Status.UncountedTerminatedPods
if rmErr == nil {
needsFlush = len(uncountedStatus.Succeeded) > 0 || len(uncountedStatus.Failed) > 0
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded))
uncountedStatus.Succeeded = nil
job.Status.Failed += int32(len(uncountedStatus.Failed))
uncountedStatus.Failed = nil
return needsFlush, nil
}
uidsWithFinalizer := make(sets.String, len(failedToRm))
for _, p := range failedToRm {
uidsWithFinalizer.Insert(string(p.UID))
}
newUncounted := uncountedWithFailedFinalizerRemovals(uncountedStatus.Succeeded, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Succeeded) {
// Failed to remove some finalizers. Attempt to update the status with the
// partial progress.
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
uncountedStatus.Succeeded = newUncounted
}
newUncounted = uncountedWithFailedFinalizerRemovals(uncountedStatus.Failed, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Failed) {
needsFlush = true
job.Status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
uncountedStatus.Failed = newUncounted
}
if needsFlush {
if rmErr != nil && needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
}
@ -1012,14 +1006,35 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe
return needsFlush, rmErr
}
// cleanUncountedPodsWithoutFinalizers removes the Pod UIDs from
// .status.uncountedTerminatedPods for which the finalizer was successfully
// removed and increments the corresponding status counters.
// Returns whether there was any status change.
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool {
updated := false
uncountedStatus := status.UncountedTerminatedPods
newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Succeeded) {
updated = true
status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
uncountedStatus.Succeeded = newUncounted
}
newUncounted = filterInUncountedUIDs(uncountedStatus.Failed, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Failed) {
updated = true
status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
uncountedStatus.Failed = newUncounted
}
return updated
}
// removeTrackingFinalizerFromPods removes tracking finalizers from Pods and
// returns the pod for which the operation failed (if the pod was deleted when
// this function was called, it's considered as the finalizer was removed
// successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod, error) {
// returns an array of booleans where the i-th value is true if the finalizer
// of the i-th Pod was successfully removed (if the pod was deleted when this
// function was called, it's considered as the finalizer was removed successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) {
errCh := make(chan error, len(pods))
var failed []*v1.Pod
var lock sync.Mutex
succeeded := make([]bool, len(pods))
wg := sync.WaitGroup{}
wg.Add(len(pods))
for i := range pods {
@ -1030,16 +1045,15 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) {
errCh <- err
utilruntime.HandleError(err)
lock.Lock()
failed = append(failed, pod)
lock.Unlock()
return
}
succeeded[i] = true
}
}(i)
}
wg.Wait()
return failed, errorFromChannel(errCh)
return succeeded, errorFromChannel(errCh)
}
// enactJobFinished adds the Complete or Failed condition and records events.
@ -1072,10 +1086,10 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
return true
}
func uncountedWithFailedFinalizerRemovals(uncounted []types.UID, uidsWithFinalizer sets.String) []types.UID {
func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID {
var newUncounted []types.UID
for _, uid := range uncounted {
if uidsWithFinalizer.Has(string(uid)) {
if include.Has(string(uid)) {
newUncounted = append(newUncounted, uid)
}
}

View File

@ -1181,7 +1181,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
pods: []*v1.Pod{
buildPod().phase(v1.PodSucceeded).Pod,
buildPod().uid("e").phase(v1.PodSucceeded).Pod,
buildPod().phase(v1.PodFailed).Pod,
buildPod().phase(v1.PodPending).Pod,
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@ -1193,12 +1193,12 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "e", "c"},
Failed: []types.UID{"b", "f", "d"},
Succeeded: []types.UID{"a", "c"},
Failed: []types.UID{"b", "d"},
},
Active: 1,
Succeeded: 2,
Failed: 3,
Succeeded: 3,
Failed: 4,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
@ -1330,6 +1330,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod,
},
@ -1337,12 +1338,6 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
wantErr: mockErr,
wantRmFinalizers: 2,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"a", "c"},
Failed: []types.UID{"b", "d"},
},
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"c"},
@ -1454,15 +1449,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
job: batch.Job{
Status: batch.JobStatus{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"a"},
Failed: []types.UID{"a", "b"},
},
},
},
pods: func() []*v1.Pod {
pods := make([]*v1.Pod, 501)
pods := make([]*v1.Pod, 500)
for i := range pods {
pods[i] = buildPod().uid(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
}
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods
}(),
wantRmFinalizers: 501,
@ -1476,20 +1472,50 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}
return uids
}(),
Failed: []types.UID{"a"},
Failed: []types.UID{"b"},
},
Failed: 1,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"499", "500"},
Succeeded: []types.UID{"499"},
Failed: []types.UID{"b"},
},
Succeeded: 499,
Failed: 1,
},
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
},
},
"too many indexed finished": {
job: batch.Job{
Spec: batch.JobSpec{
CompletionMode: &indexedCompletion,
Completions: pointer.Int32Ptr(501),
},
},
pods: func() []*v1.Pod {
pods := make([]*v1.Pod, 501)
for i := range pods {
pods[i] = buildPod().uid(strconv.Itoa(i)).index(strconv.Itoa(i)).phase(v1.PodSucceeded).trackingFinalizer().Pod
}
return pods
}(),
wantRmFinalizers: 501,
wantStatusUpdates: []batch.JobStatus{
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499",
Succeeded: 500,
},
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501,
Failed: 1,
},
},
},