diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index bc96785e8ed..f0203dbcaff 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -699,10 +699,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr return false, nil } - completionMode := string(batch.NonIndexedCompletion) - if isIndexedJob(&job) { - completionMode = string(batch.IndexedCompletion) - } + completionMode := getCompletionMode(&job) action := metrics.JobSyncActionReconciling defer func() { @@ -906,11 +903,14 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr job.Status.CompletedIndexes = succeededIndexes.String() } job.Status.UncountedTerminatedPods = nil - jm.enactJobFinished(&job, finishedCondition) + jobFinished := jm.enactJobFinished(&job, finishedCondition) if _, err := jm.updateStatusHandler(ctx, &job); err != nil { return forget, err } + if jobFinished { + jm.recordJobFinished(&job, finishedCondition) + } if jobHasNewFailure && !IsJobFinished(&job) { // returning an error will re-enqueue Job after the backoff period @@ -1105,13 +1105,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(ctx, job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } - if jm.enactJobFinished(job, finishedCond) { + jobFinished := jm.enactJobFinished(job, finishedCond) + if jobFinished { needsFlush = true } if needsFlush { if _, err := jm.updateStatusHandler(ctx, job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + if jobFinished { + jm.recordJobFinished(job, finishedCond) + } recordJobPodFinished(job, oldCounters) } return nil @@ -1244,16 +1248,20 @@ func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCo return false } } - completionMode := string(batch.NonIndexedCompletion) - if isIndexedJob(job) { - completionMode = string(*job.Spec.CompletionMode) - } job.Status.Conditions, _ = ensureJobConditionStatus(job.Status.Conditions, finishedCond.Type, finishedCond.Status, finishedCond.Reason, finishedCond.Message) + if finishedCond.Type == batch.JobComplete { + job.Status.CompletionTime = &finishedCond.LastTransitionTime + } + return true +} + +// recordJobFinished records events and the job_finished_total metric for a finished job. +func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool { + completionMode := getCompletionMode(job) if finishedCond.Type == batch.JobComplete { if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions { jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } - job.Status.CompletionTime = &finishedCond.LastTransitionTime jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed") metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc() } else { @@ -1613,6 +1621,14 @@ func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Str return result } +// getCompletionMode returns string representation of the completion mode. Used as a label value for metrics. +func getCompletionMode(job *batch.Job) string { + if isIndexedJob(job) { + return string(batch.IndexedCompletion) + } + return string(batch.NonIndexedCompletion) +} + func trackingUncountedPods(job *batch.Job) bool { return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job) } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index c3c1d0a178d..5fccb195cf9 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -47,12 +47,15 @@ import ( "k8s.io/client-go/restmapper" "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" + basemetrics "k8s.io/component-base/metrics" + "k8s.io/component-base/metrics/testutil" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/garbagecollector" jobcontroller "k8s.io/kubernetes/pkg/controller/job" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" "k8s.io/utils/pointer" @@ -60,6 +63,138 @@ import ( const waitInterval = time.Second +type metricLabelsWithValue struct { + Labels []string + Value int +} + +func TestMetrics(t *testing.T) { + nonIndexedCompletion := batchv1.NonIndexedCompletion + indexedCompletion := batchv1.IndexedCompletion + wFinalizers := true + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() + + // setup the job controller + closeFn, restConfig, clientSet, ns := setup(t, "simple") + defer closeFn() + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + defer func() { + cancel() + }() + + testCases := map[string]struct { + job *batchv1.Job + wantJobFinishedNumMetricDelta metricLabelsWithValue + wantJobPodsFinishedMetricDelta metricLabelsWithValue + }{ + "non-indexed job": { + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + CompletionMode: &nonIndexedCompletion, + }, + }, + wantJobFinishedNumMetricDelta: metricLabelsWithValue{ + Labels: []string{"NonIndexed", "succeeded"}, + Value: 1, + }, + wantJobPodsFinishedMetricDelta: metricLabelsWithValue{ + Labels: []string{"NonIndexed", "succeeded"}, + Value: 2, + }, + }, + "indexed job": { + job: &batchv1.Job{ + Spec: batchv1.JobSpec{ + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + CompletionMode: &indexedCompletion, + }, + }, + wantJobFinishedNumMetricDelta: metricLabelsWithValue{ + Labels: []string{"Indexed", "succeeded"}, + Value: 1, + }, + wantJobPodsFinishedMetricDelta: metricLabelsWithValue{ + Labels: []string{"Indexed", "succeeded"}, + Value: 2, + }, + }, + } + job_index := 0 // job index to avoid collisions between job names created by different test cases + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + + // record the metrics after the job is created + jobFinishedNumBefore, err := getCounterMetricValueForLabels(metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta.Labels) + if err != nil { + t.Fatalf("Failed to collect the JobFinishedNum metric before creating the job: %q", err) + } + jobPodsFinishedBefore, err := getCounterMetricValueForLabels(metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta.Labels) + if err != nil { + t.Fatalf("Failed to collect the JobPodsFinished metric before creating the job: %q", err) + } + + // create a single job and wait for its completion + job := tc.job.DeepCopy() + job.Name = fmt.Sprintf("job-%v", job_index) + job_index++ + jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, job) + if err != nil { + t.Fatalf("Failed to create Job: %v", err) + } + validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ + Active: int(*jobObj.Spec.Parallelism), + Ready: pointer.Int32(0), + }, wFinalizers) + if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil { + t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) + } + validateJobSucceeded(ctx, t, clientSet, jobObj) + + // verify metric values after the job is finished + validateMetricValueDeltas(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetricDelta, jobFinishedNumBefore) + validateMetricValueDeltas(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetricDelta, jobPodsFinishedBefore) + }) + } +} + +func validateMetricValueDeltas(t *testing.T, counterVer *basemetrics.CounterVec, wantMetricDelta metricLabelsWithValue, metricValuesBefore metricLabelsWithValue) { + t.Helper() + var cmpErr error + err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + cmpErr = nil + metricValuesAfter, err := getCounterMetricValueForLabels(counterVer, wantMetricDelta.Labels) + if err != nil { + return true, fmt.Errorf("Failed to collect the %q metric after the job is finished: %q", counterVer.Name, err) + } + wantDelta := wantMetricDelta.Value + gotDelta := metricValuesAfter.Value - metricValuesBefore.Value + if wantDelta != gotDelta { + cmpErr = fmt.Errorf("Unexepected metric delta for %q metric with labels %q. want: %v, got: %v", counterVer.Name, wantMetricDelta.Labels, wantDelta, gotDelta) + return false, nil + } + return true, nil + }) + if err != nil { + t.Errorf("Failed waiting for expected metric delta: %q", err) + } + if cmpErr != nil { + t.Error(cmpErr) + } +} + +func getCounterMetricValueForLabels(counterVec *basemetrics.CounterVec, labels []string) (metricLabelsWithValue, error) { + var result metricLabelsWithValue = metricLabelsWithValue{Labels: labels} + value, err := testutil.GetCounterMetricValue(counterVec.WithLabelValues(labels...)) + if err != nil { + return result, err + } + result.Value = int(value) + return result, nil +} + // TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed // in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on // the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is