Introduce the job_finished_indexes_total metric

This commit is contained in:
Michal Wozniak 2023-10-17 15:31:32 +02:00
parent 7b9d244efd
commit b0d04d933b
3 changed files with 107 additions and 4 deletions

View File

@ -1842,8 +1842,16 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.
// in tandem, and now a previously completed index is
// now out of range (i.e. index >= spec.Completions).
if isIndexedJob(job) {
completions := int(*job.Spec.Completions)
if job.Status.CompletedIndexes != oldCounters.CompletedIndexes {
diff = parseIndexesFromString(logger, job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - parseIndexesFromString(logger, oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total()
diff = indexesCount(logger, &job.Status.CompletedIndexes, completions) - indexesCount(logger, &oldCounters.CompletedIndexes, completions)
}
backoffLimitLabel := backoffLimitMetricsLabel(job)
metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Succeeded, backoffLimitLabel).Add(float64(diff))
if hasBackoffLimitPerIndex(job) && job.Status.FailedIndexes != oldCounters.FailedIndexes {
if failedDiff := indexesCount(logger, job.Status.FailedIndexes, completions) - indexesCount(logger, oldCounters.FailedIndexes, completions); failedDiff > 0 {
metrics.JobFinishedIndexesTotal.WithLabelValues(metrics.Failed, backoffLimitLabel).Add(float64(failedDiff))
}
}
} else {
diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded)
@ -1855,6 +1863,20 @@ func recordJobPodFinished(logger klog.Logger, job *batch.Job, oldCounters batch.
metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff))
}
func indexesCount(logger klog.Logger, indexesStr *string, completions int) int {
if indexesStr == nil {
return 0
}
return parseIndexesFromString(logger, *indexesStr, completions).total()
}
func backoffLimitMetricsLabel(job *batch.Job) string {
if hasBackoffLimitPerIndex(job) {
return "perIndex"
}
return "global"
}
func recordJobPodFailurePolicyActions(job *batch.Job, podFailureCountByPolicyAction map[string]int) {
for action, count := range podFailureCountByPolicyAction {
metrics.PodFailuresHandledByFailurePolicy.WithLabelValues(action).Add(float64(count))

View File

@ -114,6 +114,17 @@ var (
that have the finalizer batch.kubernetes.io/job-tracking
The event label can be "add" or "delete".`,
}, []string{"event"})
// JobFinishedIndexesTotal records the number of finished indexes.
JobFinishedIndexesTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_finished_indexes_total",
Help: `The number of finished indexes. Possible values for the
status label are: "succeeded", "failed". Possible values for the
backoffLimit label are: "perIndex" and "global"`,
},
[]string{"status", "backoffLimit"})
)
const (
@ -158,5 +169,6 @@ func Register() {
legacyregistry.MustRegister(JobPodsFinished)
legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
legacyregistry.MustRegister(JobFinishedIndexesTotal)
})
}

View File

@ -982,9 +982,10 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
}
testCases := map[string]struct {
job batchv1.Job
podTerminations []podTerminationWithExpectations
wantJobConditionType batchv1.JobConditionType
job batchv1.Job
podTerminations []podTerminationWithExpectations
wantJobConditionType batchv1.JobConditionType
wantJobFinishedIndexesTotalMetric []metricLabelsWithValue
}{
"job succeeded": {
job: batchv1.Job{
@ -1009,6 +1010,12 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobComplete,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"succeeded", "perIndex"},
Value: 2,
},
},
},
"job index fails due to exceeding backoff limit per index": {
job: batchv1.Job{
@ -1052,6 +1059,16 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobFailed,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"failed", "perIndex"},
Value: 1,
},
{
Labels: []string{"succeeded", "perIndex"},
Value: 1,
},
},
},
"job index fails due to exceeding the global backoff limit first": {
job: batchv1.Job{
@ -1095,6 +1112,16 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobFailed,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"succeeded", "perIndex"},
Value: 0,
},
{
Labels: []string{"failed", "perIndex"},
Value: 0,
},
},
},
"job continues execution after a failed index, the job is marked Failed due to the failed index": {
job: batchv1.Job{
@ -1129,6 +1156,16 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobFailed,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"succeeded", "perIndex"},
Value: 1,
},
{
Labels: []string{"failed", "perIndex"},
Value: 1,
},
},
},
"job execution terminated early due to exceeding max failed indexes": {
job: batchv1.Job{
@ -1163,6 +1200,12 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobFailed,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"failed", "perIndex"},
Value: 2,
},
},
},
"pod failure matching pod failure policy rule with FailIndex action": {
job: batchv1.Job{
@ -1230,6 +1273,12 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
wantJobConditionType: batchv1.JobFailed,
wantJobFinishedIndexesTotalMetric: []metricLabelsWithValue{
{
Labels: []string{"failed", "perIndex"},
Value: 2,
},
},
},
}
for name, test := range testCases {
@ -1290,6 +1339,9 @@ func TestBackoffLimitPerIndex(t *testing.T) {
}
}
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
for _, wantMetricValue := range test.wantJobFinishedIndexesTotalMetric {
validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, wantMetricValue)
}
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
@ -1628,6 +1680,10 @@ func TestIndexedJob(t *testing.T) {
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 1, 2), "", nil)
validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
Labels: []string{"succeeded", "global"},
Value: 0,
})
// One Pod succeeds.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
@ -1639,6 +1695,10 @@ func TestIndexedJob(t *testing.T) {
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
Labels: []string{"succeeded", "global"},
Value: 1,
})
// One Pod fails, which should be recreated.
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
@ -1651,6 +1711,10 @@ func TestIndexedJob(t *testing.T) {
Ready: pointer.Int32(0),
})
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.New(0, 2, 3), "1", nil)
validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
Labels: []string{"succeeded", "global"},
Value: 1,
})
// Remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
@ -1666,6 +1730,10 @@ func TestIndexedJob(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 5)
validateCounterMetric(ctx, t, metrics.JobFinishedIndexesTotal, metricLabelsWithValue{
Labels: []string{"succeeded", "global"},
Value: 4,
})
}
func TestJobPodReplacementPolicy(t *testing.T) {
@ -2949,6 +3017,7 @@ func resetMetrics() {
metrics.JobFinishedNum.Reset()
metrics.JobPodsFinished.Reset()
metrics.PodFailuresHandledByFailurePolicy.Reset()
metrics.JobFinishedIndexesTotal.Reset()
}
func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {