diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index c6c6514a0cd..3e253517d3a 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1842,8 +1842,16 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch. // in tandem, and now a previously completed index is // now out of range (i.e. index >= spec.Completions). if isIndexedJob(job) { + completions := int(*job.Spec.Completions) if job.Status.CompletedIndexes != oldCounters.CompletedIndexes { - diff = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - parseIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() + diff = indexesCount(logger, &job.Status.CompletedIndexes, completions) - indexesCount(logger, &oldCounters.CompletedIndexes, completions) + } + backoffLimitLabel := backoffLimitMetricsLabel(job) + metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Succeeded, backoffLimitLabel).Add(float64(diff)) + if hasBackoffLimitPerIndex(job) && job.Status.FailedIndexes != oldCounters.FailedIndexes { + if failedDiff := indexesCount(logger, job.Status.FailedIndexes, completions) - indexesCount(logger, oldCounters.FailedIndexes, completions); failedDiff > 0 { + metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Failed, backoffLimitLabel).Add(float64(failedDiff)) + } } } else { diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded) @@ -1855,6 +1863,20 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch. metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) } +func indexesCount(logger klog.Logger, indexesStr *string, completions int) int { + if indexesStr == nil { + return 0 + } + return parseIndexesFromString(logger, *indexesStr, completions).total() +} + +func backoffLimitMetricsLabel(job *batch.Job) string { + if hasBackoffLimitPerIndex(job) { + return "perIndex" + } + return "global" +} + func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) { for action, count := range podFailureCountByPolicyAction { metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count)) diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index 073568e84d1..d70acc43d88 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -114,6 +114,17 @@ var ( that have the finalizer batch.kubernetes.io/job-tracking The event label can be "add" or "delete".`, }, []string{"event"}) + + // JobFinishedIndexesTotal records the number of finished indexes. + JobFinishedIndexesTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_finished_indexes_total", + Help: `The number of finished indexes. Possible values for the + status label are: "succeeded", "failed". Possible values for the + backoffLimit label are: "perIndex" and "global"`, + }, + []string{"status", "backoffLimit"}) ) const ( @@ -158,5 +169,6 @@ func Register() { legacyregistry.MustRegister(JobPodsFinished) legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy) legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) + legacyregistry.MustRegister(JobFinishedIndexesTotal) }) } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 8222f0f5d1f..66576bda30d 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -982,9 +982,10 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, } testCases := map[string]struct { - job batchv1.Job - podTerminations []podTerminationWithExpectations - wantJobConditionType batchv1.JobConditionType + job batchv1.Job + podTerminations []podTerminationWithExpectations + wantJobConditionType batchv1.JobConditionType + wantJobFinishedIndexesTotalMetric []metricLabelsWithValue }{ "job succeeded": { job: batchv1.Job{ @@ -1009,6 +1010,12 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobComplete, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"succeeded", "perIndex"}, + Value: 2, + }, + }, }, "job index fails due to exceeding backoff limit per index": { job: batchv1.Job{ @@ -1052,6 +1059,16 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobFailed, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"failed", "perIndex"}, + Value: 1, + }, + { + Labels: []string{"succeeded", "perIndex"}, + Value: 1, + }, + }, }, "job index fails due to exceeding the global backoff limit first": { job: batchv1.Job{ @@ -1095,6 +1112,16 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobFailed, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"succeeded", "perIndex"}, + Value: 0, + }, + { + Labels: []string{"failed", "perIndex"}, + Value: 0, + }, + }, }, "job continues execution after a failed index, the job is marked Failed due to the failed index": { job: batchv1.Job{ @@ -1129,6 +1156,16 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobFailed, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"succeeded", "perIndex"}, + Value: 1, + }, + { + Labels: []string{"failed", "perIndex"}, + Value: 1, + }, + }, }, "job execution terminated early due to exceeding max failed indexes": { job: batchv1.Job{ @@ -1163,6 +1200,12 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobFailed, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"failed", "perIndex"}, + Value: 2, + }, + }, }, "pod failure matching pod failure policy rule with FailIndex action": { job: batchv1.Job{ @@ -1230,6 +1273,12 @@ func TestBackoffLimitPerIndex(t *testing.T) { }, }, wantJobConditionType: batchv1.JobFailed, + wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{ + { + Labels: []string{"failed", "perIndex"}, + Value: 2, + }, + }, }, } for name, test := range testCases { @@ -1290,6 +1339,9 @@ func TestBackoffLimitPerIndex(t *testing.T) { } } validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType) + for _, wantMetricValue := range test.wantJobFinishedIndexesTotalMetric { + validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, wantMetricValue) + } validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) } @@ -1628,6 +1680,10 @@ func TestIndexedJob(t *testing.T) { Ready: pointer.Int32(0), }) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil) + validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{ + Labels: []string{"succeeded", "global"}, + Value: 0, + }) // One Pod succeeds. if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -1639,6 +1695,10 @@ func TestIndexedJob(t *testing.T) { Ready: pointer.Int32(0), }) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil) + validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{ + Labels: []string{"succeeded", "global"}, + Value: 1, + }) // One Pod fails, which should be recreated. if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { @@ -1651,6 +1711,10 @@ func TestIndexedJob(t *testing.T) { Ready: pointer.Int32(0), }) validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil) + validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{ + Labels: []string{"succeeded", "global"}, + Value: 1, + }) // Remaining Pods succeed. if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { @@ -1666,6 +1730,10 @@ func TestIndexedJob(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5) + validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{ + Labels: []string{"succeeded", "global"}, + Value: 4, + }) } func TestJobPodReplacementPolicy(t *testing.T) { @@ -2949,6 +3017,7 @@ func resetMetrics() { metrics.JobFinishedNum.Reset() metrics.JobPodsFinished.Reset() metrics.PodFailuresHandledByFailurePolicy.Reset() + metrics.JobFinishedIndexesTotal.Reset() } func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {