Limit number of Pods counted in a single Job sync

This prevents big Jobs from starving smaller ones.
This commit is contained in:
Aldo Culquicondor 2021-09-01 15:52:18 -04:00
parent cf535b0339
commit 7d9cb88fed
3 changed files with 26 additions and 35 deletions

View File

@ -880,8 +880,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
// or the job was removed. // or the job was removed.
// 3. Increment job counters for pods that no longer have a finalizer. // 3. Increment job counters for pods that no longer have a finalizer.
// 4. Add Complete condition if satisfied with current counters. // 4. Add Complete condition if satisfied with current counters.
// It does this in a controlled way such that the size of .status doesn't grow // It does this up to a limited number of Pods so that the size of .status
// too much. // doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error {
isIndexed := isIndexedJob(job) isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod var podsToRemoveFinalizer []*v1.Pod
@ -934,17 +934,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
} }
} }
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes) > 0 { // The controller added enough Pods already to .status.uncountedTerminatedPods
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) // We stop counting pods and removing finalizers here to:
job.Status.Succeeded = int32(succeededIndexes.total()) // 1. Ensure that the UIDs representation are under 20 KB.
job.Status.CompletedIndexes = succeededIndexes.String() // 2. Cap the number of finalizer removals so that syncing of big Jobs
} // doesn't starve smaller ones.
var err error //
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { // The job will be synced again because the Job status and Pod updates
return err // will put the Job back to the work queue.
} break
podsToRemoveFinalizer = nil
newSucceededIndexes = nil
} }
} }
if len(newSucceededIndexes) > 0 { if len(newSucceededIndexes) > 0 {

View File

@ -1461,7 +1461,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods return pods
}(), }(),
wantRmFinalizers: 501, wantRmFinalizers: 499,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
@ -1478,17 +1478,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Succeeded: []types.UID{"499"}, Failed: []types.UID{"b"},
Failed: []types.UID{"b"},
}, },
Succeeded: 499, Succeeded: 499,
Failed: 1, Failed: 1,
}, },
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
}, },
}, },
"too many indexed finished": { "too many indexed finished": {
@ -1505,18 +1499,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
} }
return pods return pods
}(), }(),
wantRmFinalizers: 501, wantRmFinalizers: 500,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499", CompletedIndexes: "0-499",
Succeeded: 500, Succeeded: 500,
}, },
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501,
},
}, },
}, },
} }

View File

@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{ Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(4), Parallelism: pointer.Int32Ptr(504),
Completions: pointer.Int32Ptr(6), Completions: pointer.Int32Ptr(506),
}, },
}) })
if err != nil { if err != nil {
@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 504,
}, wFinalizers) }, wFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 504,
Failed: 2, Failed: 2,
}, wFinalizers) }, wFinalizers)
// Pods are created until the number of succeeded Pods equals completions. // Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 3, Succeeded: 503,
Active: 3, Active: 3,
}, wFinalizers) }, wFinalizers)
// No more Pods are created after the Job completes. // No more Pods are created after the Job completes.
@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 6, Succeeded: 506,
}, false) }, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
@ -860,7 +860,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
config := restclient.Config{Host: server.URL} config := restclient.Config{
Host: server.URL,
QPS: 200.0,
Burst: 200,
}
clientSet, err := clientset.NewForConfig(&config) clientSet, err := clientset.NewForConfig(&config)
if err != nil { if err != nil {
t.Fatalf("Error creating clientset: %v", err) t.Fatalf("Error creating clientset: %v", err)