Merge pull request #105181 from alculquicondor/revert

Revert #104739
This commit is contained in:
Kubernetes Prow Robot 2021-09-21 16:54:00 -07:00 committed by GitHub
commit 76c0573ff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 44 additions and 100 deletions

View File

@ -338,10 +338,3 @@ func (bci byCompletionIndex) Swap(i, j int) {
func (bci byCompletionIndex) Len() int { func (bci byCompletionIndex) Len() int {
return len(bci) return len(bci)
} }
func completionModeStr(job *batch.Job) string {
if job.Spec.CompletionMode != nil {
return string(*job.Spec.CompletionMode)
}
return string(batch.NonIndexedCompletion)
}

View File

@ -880,8 +880,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
// or the job was removed. // or the job was removed.
// 3. Increment job counters for pods that no longer have a finalizer. // 3. Increment job counters for pods that no longer have a finalizer.
// 4. Add Complete condition if satisfied with current counters. // 4. Add Complete condition if satisfied with current counters.
// It does this up to a limited number of Pods so that the size of .status // It does this in a controlled way such that the size of .status doesn't grow
// doesn't grow too much and this sync doesn't starve other Jobs. // too much.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error {
isIndexed := isIndexedJob(job) isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod var podsToRemoveFinalizer []*v1.Pod
@ -897,8 +897,6 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
uidsWithFinalizer.Insert(string(p.UID)) uidsWithFinalizer.Insert(string(p.UID))
} }
} }
// Shallow copy, as it will only be used to detect changes in the counters.
oldCounters := job.Status
if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) {
needsFlush = true needsFlush = true
} }
@ -936,15 +934,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
} }
} }
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
// The controller added enough Pods already to .status.uncountedTerminatedPods if len(newSucceededIndexes) > 0 {
// We stop counting pods and removing finalizers here to: succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
// 1. Ensure that the UIDs representation are under 20 KB. job.Status.Succeeded = int32(succeededIndexes.total())
// 2. Cap the number of finalizer removals so that syncing of big Jobs job.Status.CompletedIndexes = succeededIndexes.String()
// doesn't starve smaller ones. }
// var err error
// The job will be synced again because the Job status and Pod updates if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
// will put the Job back to the work queue. return err
break }
podsToRemoveFinalizer = nil
newSucceededIndexes = nil
} }
} }
if len(newSucceededIndexes) > 0 { if len(newSucceededIndexes) > 0 {
@ -953,7 +953,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
job.Status.CompletedIndexes = succeededIndexes.String() job.Status.CompletedIndexes = succeededIndexes.String()
} }
var err error var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
return err return err
} }
if jm.enactJobFinished(job, finishedCond) { if jm.enactJobFinished(job, finishedCond) {
@ -963,7 +963,6 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
if err := jm.updateStatusHandler(job); err != nil { if err := jm.updateStatusHandler(job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err) return fmt.Errorf("removing uncounted pods from status: %w", err)
} }
recordJobPodFinished(job, oldCounters)
} }
return nil return nil
} }
@ -977,14 +976,11 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
// 4. (if not all removals succeeded) flush Job status again. // 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be // Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls. // flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (bool, error) { func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (bool, error) {
if needsFlush { if needsFlush {
if err := jm.updateStatusHandler(job); err != nil { if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
} }
recordJobPodFinished(job, *oldCounters)
// Shallow copy.
*oldCounters = job.Status
needsFlush = false needsFlush = false
} }
var rmErr error var rmErr error
@ -1551,11 +1547,3 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
} }
return list, false return list, false
} }
func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) {
completionMode := completionModeStr(job)
diff := job.Status.Succeeded - oldCounters.Succeeded
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff))
diff = job.Status.Failed - oldCounters.Failed
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
}

View File

@ -46,10 +46,8 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing" featuregatetesting "k8s.io/component-base/featuregate/testing"
metricstestutil "k8s.io/component-base/metrics/testutil"
_ "k8s.io/kubernetes/pkg/apis/core/install" _ "k8s.io/kubernetes/pkg/apis/core/install"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
@ -1463,7 +1461,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
return pods return pods
}(), }(),
wantRmFinalizers: 499, wantRmFinalizers: 501,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
@ -1480,11 +1478,17 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
}, },
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{ UncountedTerminatedPods: &batch.UncountedTerminatedPods{
Failed: []types.UID{"b"}, Succeeded: []types.UID{"499"},
Failed: []types.UID{"b"},
}, },
Succeeded: 499, Succeeded: 499,
Failed: 1, Failed: 1,
}, },
{
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 500,
Failed: 2,
},
}, },
}, },
"too many indexed finished": { "too many indexed finished": {
@ -1501,13 +1505,18 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
} }
return pods return pods
}(), }(),
wantRmFinalizers: 500, wantRmFinalizers: 501,
wantStatusUpdates: []batch.JobStatus{ wantStatusUpdates: []batch.JobStatus{
{ {
UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
CompletedIndexes: "0-499", CompletedIndexes: "0-499",
Succeeded: 500, Succeeded: 500,
}, },
{
CompletedIndexes: "0-500",
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
Succeeded: 501,
},
}, },
}, },
} }
@ -1516,20 +1525,19 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControlErr} fakePodControl := controller.FakePodControl{Err: tc.podControlErr}
metrics.JobPodsFinished.Reset()
manager.podControl = &fakePodControl manager.podControl = &fakePodControl
var statusUpdates []batch.JobStatus var statusUpdates []batch.JobStatus
manager.updateStatusHandler = func(job *batch.Job) error { manager.updateStatusHandler = func(job *batch.Job) error {
statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) statusUpdates = append(statusUpdates, *job.Status.DeepCopy())
return tc.statusUpdateErr return tc.statusUpdateErr
} }
job := tc.job.DeepCopy()
if job.Status.UncountedTerminatedPods == nil { if tc.job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
} }
uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods)
succeededIndexes := succeededIndexesFromJob(job) succeededIndexes := succeededIndexesFromJob(&tc.job)
err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush)
if !errors.Is(err, tc.wantErr) { if !errors.Is(err, tc.wantErr) {
t.Errorf("Got error %v, want %v", err, tc.wantErr) t.Errorf("Got error %v, want %v", err, tc.wantErr)
} }
@ -1540,25 +1548,6 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
if rmFinalizers != tc.wantRmFinalizers { if rmFinalizers != tc.wantRmFinalizers {
t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers)
} }
if tc.wantErr == nil {
completionMode := completionModeStr(job)
v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded))
if err != nil {
t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err)
}
newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded
if float64(newSucceeded) != v {
t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded)
}
v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed))
if err != nil {
t.Fatalf("Obtaining failed job_pods_finished_total: %v", err)
}
newFailed := job.Status.Failed - tc.job.Status.Failed
if float64(newFailed) != v {
t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed)
}
}
}) })
} }
} }

View File

@ -68,26 +68,10 @@ var (
}, },
[]string{"completion_mode", "result"}, []string{"completion_mode", "result"},
) )
// JobPodsFinished records the number of finished Pods that the job controller
// finished tracking.
// It only applies to Jobs that were created while the feature gate
// JobTrackingWithFinalizers was enabled.
// Possible label values:
// completion_mode: Indexed, NonIndexed
// result: failed, succeeded
JobPodsFinished = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_pods_finished_total",
Help: "The number of finished Pods that are fully tracked",
},
[]string{"completion_mode", "result"})
) )
// Possible values for the "action" label in the above metrics.
const ( const (
// Possible values for the "action" label in the above metrics.
// JobSyncActionReconciling when the Job's pod creation/deletion expectations // JobSyncActionReconciling when the Job's pod creation/deletion expectations
// are unsatisfied and the controller is waiting for issued Pod // are unsatisfied and the controller is waiting for issued Pod
// creation/deletions to complete. // creation/deletions to complete.
@ -104,11 +88,6 @@ const (
// if a Job is suspended or if the number of active Pods is more than // if a Job is suspended or if the number of active Pods is more than
// parallelism. // parallelism.
JobSyncActionPodsDeleted = "pods_deleted" JobSyncActionPodsDeleted = "pods_deleted"
// Possible values for "result" label in the above metrics.
Succeeded = "succeeded"
Failed = "failed"
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -119,6 +98,5 @@ func Register() {
legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncDurationSeconds)
legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum) legacyregistry.MustRegister(JobFinishedNum)
legacyregistry.MustRegister(JobPodsFinished)
}) })
} }

View File

@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) {
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{ Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(504), Parallelism: pointer.Int32Ptr(4),
Completions: pointer.Int32Ptr(506), Completions: pointer.Int32Ptr(6),
}, },
}) })
if err != nil { if err != nil {
@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 504, Active: 4,
}, wFinalizers) }, wFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 504, Active: 4,
Failed: 2, Failed: 2,
}, wFinalizers) }, wFinalizers)
// Pods are created until the number of succeeded Pods equals completions. // Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 503, Succeeded: 3,
Active: 3, Active: 3,
}, wFinalizers) }, wFinalizers)
// No more Pods are created after the Job completes. // No more Pods are created after the Job completes.
@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 506, Succeeded: 6,
}, false) }, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
@ -860,11 +860,7 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
config := restclient.Config{ config := restclient.Config{Host: server.URL}
Host: server.URL,
QPS: 200.0,
Burst: 200,
}
clientSet, err := clientset.NewForConfig(&config) clientSet, err := clientset.NewForConfig(&config)
if err != nil { if err != nil {
t.Fatalf("Error creating clientset: %v", err) t.Fatalf("Error creating clientset: %v", err)