From a438f16741fbf064281e4c57e04e17ff4d41bc02 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 22 Sep 2021 11:15:53 -0400 Subject: [PATCH] Revert "Revert "Add metric job_pod_finished"" This reverts commit 7868fbbe64330bf62f8e463b19e1fe18ec184c91. --- pkg/controller/job/indexed_job_utils.go | 7 +++++ pkg/controller/job/job_controller.go | 18 ++++++++++-- pkg/controller/job/job_controller_test.go | 34 +++++++++++++++++++---- pkg/controller/job/metrics/metrics.go | 24 +++++++++++++++- 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 7fa9bdc22b4..a6d953fa1c4 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) { func (bci byCompletionIndex) Len() int { return len(bci) } + +func completionModeStr(job *batch.Job) string { + if job.Spec.CompletionMode != nil { + return string(*job.Spec.CompletionMode) + } + return string(batch.NonIndexedCompletion) +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 8cfad1781b1..0aa340e20ef 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -900,6 +900,8 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uidsWithFinalizer.Insert(string(p.UID)) } } + // Shallow copy, as it will only be used to detect changes in the counters. + oldCounters := job.Status if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true } @@ -954,7 +956,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -964,6 +966,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* if _, err := jm.updateStatusHandler(job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + recordJobPodFinished(job, oldCounters) } return nil } @@ -977,12 +980,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { var err error if needsFlush { if job, err = jm.updateStatusHandler(job); err != nil { return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } + recordJobPodFinished(job, *oldCounters) + // Shallow copy. + *oldCounters = job.Status needsFlush = false } var rmErr error @@ -1536,3 +1542,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio } return list, false } + +func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { + completionMode := completionModeStr(job) + diff := job.Status.Succeeded - oldCounters.Succeeded + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff)) + diff = job.Status.Failed - oldCounters.Failed + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index fb113e9b2bf..1ee664aab81 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -47,8 +47,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" + metricstestutil "k8s.io/component-base/metrics/testutil" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" @@ -1515,19 +1517,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} + metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return job, tc.statusUpdateErr } - - if tc.job.Status.UncountedTerminatedPods == nil { - tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} + job := tc.job.DeepCopy() + if job.Status.UncountedTerminatedPods == nil { + job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeededIndexes := succeededIndexesFromJob(&tc.job) - err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) + uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + succeededIndexes := succeededIndexesFromJob(job) + err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -1538,6 +1541,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if rmFinalizers != tc.wantRmFinalizers { t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) } + if tc.wantErr == nil { + completionMode := completionModeStr(job) + v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) + if err != nil { + t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) + } + newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded + if float64(newSucceeded) != v { + t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded) + } + v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) + if err != nil { + t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) + } + newFailed := job.Status.Failed - tc.job.Status.Failed + if float64(newFailed) != v { + t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed) + } + } }) } } diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index b2517a09f42..b9271426661 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -68,10 +68,26 @@ var ( }, []string{"completion_mode", "result"}, ) + + // JobPodsFinished records the number of finished Pods that the job controller + // finished tracking. + // It only applies to Jobs that were created while the feature gate + // JobTrackingWithFinalizers was enabled. + // Possible label values: + // completion_mode: Indexed, NonIndexed + // result: failed, succeeded + JobPodsFinished = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_pods_finished_total", + Help: "The number of finished Pods that are fully tracked", + }, + []string{"completion_mode", "result"}) ) -// Possible values for the "action" label in the above metrics. const ( + // Possible values for the "action" label in the above metrics. + // JobSyncActionReconciling when the Job's pod creation/deletion expectations // are unsatisfied and the controller is waiting for issued Pod // creation/deletions to complete. @@ -88,6 +104,11 @@ const ( // if a Job is suspended or if the number of active Pods is more than // parallelism. JobSyncActionPodsDeleted = "pods_deleted" + + // Possible values for "result" label in the above metrics. + + Succeeded = "succeeded" + Failed = "failed" ) var registerMetrics sync.Once @@ -98,5 +119,6 @@ func Register() { legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobFinishedNum) + legacyregistry.MustRegister(JobPodsFinished) }) }