diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 5abd3dc7b8c..738755aabd3 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -880,8 +880,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. -// It does this up to a limited number of Pods so that the size of .status -// doesn't grow too much and this sync doesn't starve other Jobs. +// It does this in a controlled way such that the size of .status doesn't grow +// too much. func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod @@ -936,15 +936,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } } if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { - // The controller added enough Pods already to .status.uncountedTerminatedPods - // We stop counting pods and removing finalizers here to: - // 1. Ensure that the UIDs representation are under 20 KB. - // 2. Cap the number of finalizer removals so that syncing of big Jobs - // doesn't starve smaller ones. - // - // The job will be synced again because the Job status and Pod updates - // will put the Job back to the work queue. - break + if len(newSucceededIndexes) > 0 { + succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) + job.Status.Succeeded = int32(succeededIndexes.total()) + job.Status.CompletedIndexes = succeededIndexes.String() + } + var err error + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + return err + } + podsToRemoveFinalizer = nil + newSucceededIndexes = nil } } if len(newSucceededIndexes) > 0 { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 2cc8dca1ceb..a7bbf15bead 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1463,7 +1463,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), - wantRmFinalizers: 499, + wantRmFinalizers: 501, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ @@ -1480,11 +1480,17 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Failed: []types.UID{"b"}, + Succeeded: []types.UID{"499"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 500, + Failed: 2, + }, }, }, "too many indexed finished": { @@ -1501,13 +1507,18 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return pods }(), - wantRmFinalizers: 500, + wantRmFinalizers: 501, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, + { + CompletedIndexes: "0-500", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 501, + }, }, }, } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 5b2e4866b3b..4cddc6c7669 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(504), - Completions: pointer.Int32Ptr(506), + Parallelism: pointer.Int32Ptr(4), + Completions: pointer.Int32Ptr(6), }, }) if err != nil { @@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 4, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 4, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 503, + Succeeded: 3, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 506, + Succeeded: 6, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -860,11 +860,7 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) - config := restclient.Config{ - Host: server.URL, - QPS: 200.0, - Burst: 200, - } + config := restclient.Config{Host: server.URL} clientSet, err := clientset.NewForConfig(&config) if err != nil { t.Fatalf("Error creating clientset: %v", err)