From 7d9cb88fed1973759eaa71ad3484eb2cf0248f2a Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 1 Sep 2021 15:52:18 -0400 Subject: [PATCH 1/2] Limit number of Pods counted in a single Job sync This prevents big Jobs from starving smaller ones. --- pkg/controller/job/job_controller.go | 24 +++++++++++------------ pkg/controller/job/job_controller_test.go | 17 +++------------- test/integration/job/job_test.go | 20 +++++++++++-------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1da83969253..1a1933541ec 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -880,8 +880,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. -// It does this in a controlled way such that the size of .status doesn't grow -// too much. +// It does this up to a limited number of Pods so that the size of .status +// doesn't grow too much and this sync doesn't starve other Jobs. func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod @@ -934,17 +934,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } } if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { - if len(newSucceededIndexes) > 0 { - succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) - job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexes.String() - } - var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { - return err - } - podsToRemoveFinalizer = nil - newSucceededIndexes = nil + // The controller added enough Pods already to .status.uncountedTerminatedPods + // We stop counting pods and removing finalizers here to: + // 1. Ensure that the UIDs representation are under 20 KB. + // 2. Cap the number of finalizer removals so that syncing of big Jobs + // doesn't starve smaller ones. + // + // The job will be synced again because the Job status and Pod updates + // will put the Job back to the work queue. + break } } if len(newSucceededIndexes) > 0 { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 273c9d9e906..a827e563948 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1461,7 +1461,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 499, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ @@ -1478,17 +1478,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499"}, - Failed: []types.UID{"b"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 500, - Failed: 2, - }, }, }, "too many indexed finished": { @@ -1505,18 +1499,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 500, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, - { - CompletedIndexes: "0-500", - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 501, - }, }, }, } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4cddc6c7669..5b2e4866b3b 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(4), - Completions: pointer.Int32Ptr(6), + Parallelism: pointer.Int32Ptr(504), + Completions: pointer.Int32Ptr(506), }, }) if err != nil { @@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 3, + Succeeded: 503, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 6, + Succeeded: 506, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -860,7 +860,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) - config := restclient.Config{Host: server.URL} + config := restclient.Config{ + Host: server.URL, + QPS: 200.0, + Burst: 200, + } clientSet, err := clientset.NewForConfig(&config) if err != nil { t.Fatalf("Error creating clientset: %v", err) From a0e7a567c55e3720ee28103a361e00f99e81c262 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 2 Sep 2021 14:28:28 -0400 Subject: [PATCH 2/2] Add metric job_pod_finished To count the number of pods that the job controller successfully tracked with the JobTrackingWithFinalizers feature gate. --- pkg/controller/job/indexed_job_utils.go | 7 +++++ pkg/controller/job/job_controller.go | 18 ++++++++++-- pkg/controller/job/job_controller_test.go | 34 +++++++++++++++++++---- pkg/controller/job/metrics/metrics.go | 24 +++++++++++++++- 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 7fa9bdc22b4..a6d953fa1c4 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) { func (bci byCompletionIndex) Len() int { return len(bci) } + +func completionModeStr(job *batch.Job) string { + if job.Spec.CompletionMode != nil { + return string(*job.Spec.CompletionMode) + } + return string(batch.NonIndexedCompletion) +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1a1933541ec..5abd3dc7b8c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -897,6 +897,8 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uidsWithFinalizer.Insert(string(p.UID)) } } + // Shallow copy, as it will only be used to detect changes in the counters. + oldCounters := job.Status if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true } @@ -951,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -961,6 +963,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* if err := jm.updateStatusHandler(job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + recordJobPodFinished(job, oldCounters) } return nil } @@ -974,11 +977,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (bool, error) { if needsFlush { if err := jm.updateStatusHandler(job); err != nil { return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } + recordJobPodFinished(job, *oldCounters) + // Shallow copy. + *oldCounters = job.Status needsFlush = false } var rmErr error @@ -1545,3 +1551,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio } return list, false } + +func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { + completionMode := completionModeStr(job) + diff := job.Status.Succeeded - oldCounters.Succeeded + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff)) + diff = job.Status.Failed - oldCounters.Failed + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index a827e563948..2cc8dca1ceb 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -46,8 +46,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" + metricstestutil "k8s.io/component-base/metrics/testutil" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" @@ -1514,19 +1516,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} + metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus manager.updateStatusHandler = func(job *batch.Job) error { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return tc.statusUpdateErr } - - if tc.job.Status.UncountedTerminatedPods == nil { - tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} + job := tc.job.DeepCopy() + if job.Status.UncountedTerminatedPods == nil { + job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeededIndexes := succeededIndexesFromJob(&tc.job) - err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) + uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + succeededIndexes := succeededIndexesFromJob(job) + err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %w", err, tc.wantErr) } @@ -1537,6 +1540,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if rmFinalizers != tc.wantRmFinalizers { t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) } + if tc.wantErr == nil { + completionMode := completionModeStr(job) + v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) + if err != nil { + t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) + } + newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded + if float64(newSucceeded) != v { + t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded) + } + v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) + if err != nil { + t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) + } + newFailed := job.Status.Failed - tc.job.Status.Failed + if float64(newFailed) != v { + t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed) + } + } }) } } diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index b2517a09f42..b9271426661 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -68,10 +68,26 @@ var ( }, []string{"completion_mode", "result"}, ) + + // JobPodsFinished records the number of finished Pods that the job controller + // finished tracking. + // It only applies to Jobs that were created while the feature gate + // JobTrackingWithFinalizers was enabled. + // Possible label values: + // completion_mode: Indexed, NonIndexed + // result: failed, succeeded + JobPodsFinished = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_pods_finished_total", + Help: "The number of finished Pods that are fully tracked", + }, + []string{"completion_mode", "result"}) ) -// Possible values for the "action" label in the above metrics. const ( + // Possible values for the "action" label in the above metrics. + // JobSyncActionReconciling when the Job's pod creation/deletion expectations // are unsatisfied and the controller is waiting for issued Pod // creation/deletions to complete. @@ -88,6 +104,11 @@ const ( // if a Job is suspended or if the number of active Pods is more than // parallelism. JobSyncActionPodsDeleted = "pods_deleted" + + // Possible values for "result" label in the above metrics. + + Succeeded = "succeeded" + Failed = "failed" ) var registerMetrics sync.Once @@ -98,5 +119,6 @@ func Register() { legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobFinishedNum) + legacyregistry.MustRegister(JobPodsFinished) }) }