Fix the occasional double-counting job_finished_total metric

The reason for the issue is that the metrics were bumped before the
final job status update. In case the update failed the path was
repeated by the next syncJob leading to double-counting of the metrics.

The solution is to delay recording metrics and broadcasting events
after the job status update succeeds.
This commit is contained in:
Michal Wozniak 2022-10-10 11:56:06 +02:00
parent 9b4b1c0e79
commit b64e5b2d15
2 changed files with 162 additions and 11 deletions

View File

@ -699,10 +699,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
return false, nil
}
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(&job) {
completionMode = string(batch.IndexedCompletion)
}
completionMode := getCompletionMode(&job)
action := metrics.JobSyncActionReconciling
defer func() {
@ -906,11 +903,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
job.Status.CompletedIndexes = succeededIndexes.String()
}
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
jobFinished := jm.enactJobFinished(&job, finishedCondition)
if _, err := jm.updateStatusHandler(ctx, &job); err != nil {
return forget, err
}
if jobFinished {
jm.recordJobFinished(&job, finishedCondition)
}
if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period
@ -1105,13 +1105,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil {
return err
}
if jm.enactJobFinished(job, finishedCond) {
jobFinished := jm.enactJobFinished(job, finishedCond)
if jobFinished {
needsFlush = true
}
if needsFlush {
if _, err := jm.updateStatusHandler(ctx, job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err)
}
if jobFinished {
jm.recordJobFinished(job, finishedCond)
}
recordJobPodFinished(job, oldCounters)
}
return nil
@ -1244,16 +1248,20 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo
return false
}
}
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(job) {
completionMode = string(*job.Spec.CompletionMode)
}
job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message)
if finishedCond.Type == batch.JobComplete {
job.Status.CompletionTime = &finishedCond.LastTransitionTime
}
return true
}
// recordJobFinished records events and the job_finished_total metric for a finished job.
func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
completionMode := getCompletionMode(job)
if finishedCond.Type == batch.JobComplete {
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
job.Status.CompletionTime = &finishedCond.LastTransitionTime
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
} else {
@ -1613,6 +1621,14 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str
return result
}
// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics.
func getCompletionMode(job *batch.Job) string {
if isIndexedJob(job) {
return string(batch.IndexedCompletion)
}
return string(batch.NonIndexedCompletion)
}
func trackingUncountedPods(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
}

View File

@ -47,12 +47,15 @@ import (
"k8s.io/client-go/restmapper"
"k8s.io/client-go/util/retry"
featuregatetesting "k8s.io/component-base/featuregate/testing"
basemetrics "k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/testutil"
"k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/klog/v2"
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/garbagecollector"
jobcontroller "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/job/metrics"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/test/integration/framework"
"k8s.io/utils/pointer"
@ -60,6 +63,138 @@ import (
const waitInterval = time.Second
type metricLabelsWithValue struct {
Labels []string
Value int
}
func TestMetrics(t *testing.T) {
nonIndexedCompletion := batchv1.NonIndexedCompletion
indexedCompletion := batchv1.IndexedCompletion
wFinalizers := true
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
// setup the job controller
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
testCases := map[string]struct {
job *batchv1.Job
wantJobFinishedNumMetricDelta metricLabelsWithValue
wantJobPodsFinishedMetricDelta metricLabelsWithValue
}{
"non-indexed job": {
job: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
CompletionMode: &nonIndexedCompletion,
},
},
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
Labels: []string{"NonIndexed", "succeeded"},
Value: 1,
},
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
Labels: []string{"NonIndexed", "succeeded"},
Value: 2,
},
},
"indexed job": {
job: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: pointer.Int32(2),
Parallelism: pointer.Int32(2),
CompletionMode: &indexedCompletion,
},
},
wantJobFinishedNumMetricDelta: metricLabelsWithValue{
Labels: []string{"Indexed", "succeeded"},
Value: 1,
},
wantJobPodsFinishedMetricDelta: metricLabelsWithValue{
Labels: []string{"Indexed", "succeeded"},
Value: 2,
},
},
}
job_index := 0 // job index to avoid collisions between job names created by different test cases
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
// record the metrics after the job is created
jobFinishedNumBefore, err := getCounterMetricValueForLabels(metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta.Labels)
if err != nil {
t.Fatalf("Failed to collect the JobFinishedNum metric before creating the job: %q", err)
}
jobPodsFinishedBefore, err := getCounterMetricValueForLabels(metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta.Labels)
if err != nil {
t.Fatalf("Failed to collect the JobPodsFinished metric before creating the job: %q", err)
}
// create a single job and wait for its completion
job := tc.job.DeepCopy()
job.Name = fmt.Sprintf("job-%v", job_index)
job_index++
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job)
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
Ready: pointer.Int32(0),
}, wFinalizers)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
// verify metric values after the job is finished
validateMetricValueDeltas(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta, jobFinishedNumBefore)
validateMetricValueDeltas(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta, jobPodsFinishedBefore)
})
}
}
func validateMetricValueDeltas(t *testing.T, counterVer *basemetrics.CounterVec, wantMetricDelta metricLabelsWithValue, metricValuesBefore metricLabelsWithValue) {
t.Helper()
var cmpErr error
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
cmpErr = nil
metricValuesAfter, err := getCounterMetricValueForLabels(counterVer, wantMetricDelta.Labels)
if err != nil {
return true, fmt.Errorf("Failed to collect the %q metric after the job is finished: %q", counterVer.Name, err)
}
wantDelta := wantMetricDelta.Value
gotDelta := metricValuesAfter.Value - metricValuesBefore.Value
if wantDelta != gotDelta {
cmpErr = fmt.Errorf("Unexepected metric delta for %q metric with labels %q. want: %v, got: %v", counterVer.Name, wantMetricDelta.Labels, wantDelta, gotDelta)
return false, nil
}
return true, nil
})
if err != nil {
t.Errorf("Failed waiting for expected metric delta: %q", err)
}
if cmpErr != nil {
t.Error(cmpErr)
}
}
func getCounterMetricValueForLabels(counterVec *basemetrics.CounterVec, labels []string) (metricLabelsWithValue, error) {
var result metricLabelsWithValue = metricLabelsWithValue{Labels: labels}
value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(labels...))
if err != nil {
return result, err
}
result.Value = int(value)
return result, nil
}
// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is