From 47a957d163213edb4716a56ca27a756900a4eea6 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 22 Sep 2021 11:15:50 -0400 Subject: [PATCH] Revert "Revert "Limit number of Pods counted in a single Job sync"" This reverts commit 8bcb780808865f93e4a8fc20be5fe2de28baa854. --- pkg/controller/job/job_controller.go | 24 +++++++++++------------ pkg/controller/job/job_controller_test.go | 17 +++------------- test/integration/job/job_test.go | 20 +++++++++++-------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 06a7ca0d1a1..8cfad1781b1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -883,8 +883,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. -// It does this in a controlled way such that the size of .status doesn't grow -// too much. +// It does this up to a limited number of Pods so that the size of .status +// doesn't grow too much and this sync doesn't starve other Jobs. func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod @@ -937,17 +937,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } } if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { - if len(newSucceededIndexes) > 0 { - succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) - job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexes.String() - } - var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { - return err - } - podsToRemoveFinalizer = nil - newSucceededIndexes = nil + // The controller added enough Pods already to .status.uncountedTerminatedPods + // We stop counting pods and removing finalizers here to: + // 1. Ensure that the UIDs representation are under 20 KB. + // 2. Cap the number of finalizer removals so that syncing of big Jobs + // doesn't starve smaller ones. + // + // The job will be synced again because the Job status and Pod updates + // will put the Job back to the work queue. + break } } if len(newSucceededIndexes) > 0 { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index de9da1548b3..fb113e9b2bf 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1462,7 +1462,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 499, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ @@ -1479,17 +1479,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499"}, - Failed: []types.UID{"b"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 500, - Failed: 2, - }, }, }, "too many indexed finished": { @@ -1506,18 +1500,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 500, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, - { - CompletedIndexes: "0-500", - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 501, - }, }, }, } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4cddc6c7669..5b2e4866b3b 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(4), - Completions: pointer.Int32Ptr(6), + Parallelism: pointer.Int32Ptr(504), + Completions: pointer.Int32Ptr(506), }, }) if err != nil { @@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 3, + Succeeded: 503, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 6, + Succeeded: 506, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -860,7 +860,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) - config := restclient.Config{Host: server.URL} + config := restclient.Config{ + Host: server.URL, + QPS: 200.0, + Burst: 200, + } clientSet, err := clientset.NewForConfig(&config) if err != nil { t.Fatalf("Error creating clientset: %v", err)