From b64e5b2d15405d321b61463ba63147e61dd423fb Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 10 Oct 2022 11:56:06 +0200 Subject: [PATCH] Fix the occasional double-counting job_finished_total metric The reason for the issue is that the metrics were bumped before the final job status update. In case the update failed the path was repeated by the next syncJob leading to double-counting of the metrics. The solution is to delay recording metrics and broadcasting events after the job status update succeeds. --- pkg/controller/job/job_controller.go | 38 +++++--- test/integration/job/job_test.go | 135 +++++++++++++++++++++++++++ 2 files changed, 162 insertions(+), 11 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index bc96785e8ed..f0203dbcaff 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -699,10 +699,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr return false, nil } - completionMode := string(batch.NonIndexedCompletion) - if isIndexedJob(&job) { - completionMode = string(batch.IndexedCompletion) - } + completionMode := getCompletionMode(&job) action := metrics.JobSyncActionReconciling defer func() { @@ -906,11 +903,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr job.Status.CompletedIndexes = succeededIndexes.String() } job.Status.UncountedTerminatedPods = nil - jm.enactJobFinished(&job, finishedCondition) + jobFinished := jm.enactJobFinished(&job, finishedCondition) if _, err := jm.updateStatusHandler(ctx, &job); err != nil { return forget, err } + if jobFinished { + jm.recordJobFinished(&job, finishedCondition) + } if jobHasNewFailure && !IsJobFinished(&job) { // returning an error will re-enqueue Job after the backoff period @@ -1105,13 +1105,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } - if jm.enactJobFinished(job, finishedCond) { + jobFinished := jm.enactJobFinished(job, finishedCond) + if jobFinished { needsFlush = true } if needsFlush { if _, err := jm.updateStatusHandler(ctx, job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + if jobFinished { + jm.recordJobFinished(job, finishedCond) + } recordJobPodFinished(job, oldCounters) } return nil @@ -1244,16 +1248,20 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo return false } } - completionMode := string(batch.NonIndexedCompletion) - if isIndexedJob(job) { - completionMode = string(*job.Spec.CompletionMode) - } job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message) + if finishedCond.Type == batch.JobComplete { + job.Status.CompletionTime = &finishedCond.LastTransitionTime + } + return true +} + +// recordJobFinished records events and the job_finished_total metric for a finished job. +func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool { + completionMode := getCompletionMode(job) if finishedCond.Type == batch.JobComplete { if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions { jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } - job.Status.CompletionTime = &finishedCond.LastTransitionTime jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed") metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc() } else { @@ -1613,6 +1621,14 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str return result } +// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics. +func getCompletionMode(job *batch.Job) string { + if isIndexedJob(job) { + return string(batch.IndexedCompletion) + } + return string(batch.NonIndexedCompletion) +} + func trackingUncountedPods(job *batch.Job) bool { return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job) } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c3c1d0a178d..5fccb195cf9 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -47,12 +47,15 @@ import ( "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" + basemetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/testutil" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" @@ -60,6 +63,138 @@ import ( const waitInterval = time.Second +type metricLabelsWithValue struct { + Labels []string + Value int +} + +func TestMetrics(t *testing.T) { + nonIndexedCompletion := batchv1.NonIndexedCompletion + indexedCompletion := batchv1.IndexedCompletion + wFinalizers := true + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() + + // setup the job controller + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer func() { + cancel() + }() + + testCases := map[string]struct { + job *batchv1.Job + wantJobFinishedNumMetricDelta metricLabelsWithValue + wantJobPodsFinishedMetricDelta metricLabelsWithValue + }{ + "non-indexed job": { + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + CompletionMode: &nonIndexedCompletion, + }, + }, + wantJobFinishedNumMetricDelta: metricLabelsWithValue{ + Labels: []string{"NonIndexed", "succeeded"}, + Value: 1, + }, + wantJobPodsFinishedMetricDelta: metricLabelsWithValue{ + Labels: []string{"NonIndexed", "succeeded"}, + Value: 2, + }, + }, + "indexed job": { + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + CompletionMode: &indexedCompletion, + }, + }, + wantJobFinishedNumMetricDelta: metricLabelsWithValue{ + Labels: []string{"Indexed", "succeeded"}, + Value: 1, + }, + wantJobPodsFinishedMetricDelta: metricLabelsWithValue{ + Labels: []string{"Indexed", "succeeded"}, + Value: 2, + }, + }, + } + job_index := 0 // job index to avoid collisions between job names created by different test cases + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + // record the metrics after the job is created + jobFinishedNumBefore, err := getCounterMetricValueForLabels(metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta.Labels) + if err != nil { + t.Fatalf("Failed to collect the JobFinishedNum metric before creating the job: %q", err) + } + jobPodsFinishedBefore, err := getCounterMetricValueForLabels(metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta.Labels) + if err != nil { + t.Fatalf("Failed to collect the JobPodsFinished metric before creating the job: %q", err) + } + + // create a single job and wait for its completion + job := tc.job.DeepCopy() + job.Name = fmt.Sprintf("job-%v", job_index) + job_index++ + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: int(*jobObj.Spec.Parallelism), + Ready: pointer.Int32(0), + }, wFinalizers) + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) + } + validateJobSucceeded(ctx, t, clientSet, jobObj) + + // verify metric values after the job is finished + validateMetricValueDeltas(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta, jobFinishedNumBefore) + validateMetricValueDeltas(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta, jobPodsFinishedBefore) + }) + } +} + +func validateMetricValueDeltas(t *testing.T, counterVer *basemetrics.CounterVec, wantMetricDelta metricLabelsWithValue, metricValuesBefore metricLabelsWithValue) { + t.Helper() + var cmpErr error + err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + cmpErr = nil + metricValuesAfter, err := getCounterMetricValueForLabels(counterVer, wantMetricDelta.Labels) + if err != nil { + return true, fmt.Errorf("Failed to collect the %q metric after the job is finished: %q", counterVer.Name, err) + } + wantDelta := wantMetricDelta.Value + gotDelta := metricValuesAfter.Value - metricValuesBefore.Value + if wantDelta != gotDelta { + cmpErr = fmt.Errorf("Unexepected metric delta for %q metric with labels %q. want: %v, got: %v", counterVer.Name, wantMetricDelta.Labels, wantDelta, gotDelta) + return false, nil + } + return true, nil + }) + if err != nil { + t.Errorf("Failed waiting for expected metric delta: %q", err) + } + if cmpErr != nil { + t.Error(cmpErr) + } +} + +func getCounterMetricValueForLabels(counterVec *basemetrics.CounterVec, labels []string) (metricLabelsWithValue, error) { + var result metricLabelsWithValue = metricLabelsWithValue{Labels: labels} + value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(labels...)) + if err != nil { + return result, err + } + result.Value = int(value) + return result, nil +} + // TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed // in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on // the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is