feat: add job_pods_creation_total metric

This commit is contained in:
Dejan Pejchev 2023-10-24 17:49:04 +02:00
parent d2383ce467
commit 88c0a8be1b
No known key found for this signature in database
GPG Key ID: 8A900F09C964845E
3 changed files with 110 additions and 2 deletions

View File

@ -1546,6 +1546,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers) podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
// Counters for pod creation status (used by the job_pods_creation_total metric)
var creationsSucceeded, creationsFailed int32 = 0, 0
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would // This handles attempts to start large numbers of pods that would
@ -1595,7 +1598,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.expectations.CreationObserved(logger, jobKey) jm.expectations.CreationObserved(logger, jobKey)
atomic.AddInt32(&active, -1) atomic.AddInt32(&active, -1)
errCh <- err errCh <- err
atomic.AddInt32(&creationsFailed, 1)
} }
atomic.AddInt32(&creationsSucceeded, 1)
}() }()
} }
wait.Wait() wait.Wait()
@ -1614,6 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
diff -= batchSize diff -= batchSize
} }
recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed)
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
} }
@ -1911,3 +1917,23 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
} }
} }
} }
func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) {
reason := metrics.PodCreateNew
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0
isRecreateAction := podsTerminating || job.Status.Failed > 0
if isRecreateAction {
reason = metrics.PodRecreateTerminatingOrFailed
if *job.Spec.PodReplacementPolicy == batch.Failed {
reason = metrics.PodRecreateFailed
}
}
}
if succeeded > 0 {
metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Succeeded).Add(float64(succeeded))
}
if failed > 0 {
metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed))
}
}

View File

@ -125,6 +125,24 @@ The event label can be "add" or "delete".`,
backoffLimit label are: "perIndex" and "global"`, backoffLimit label are: "perIndex" and "global"`,
}, },
[]string{"status", "backoffLimit"}) []string{"status", "backoffLimit"})
// JobPodsCreationTotal records the number of pods created by the job controller
// based on the reason for their creation (i.e. if PodReplacementPolicy was specified)
// and the status of the creation (i.e. if the Pod creation succeeded or failed).
// Possible label values:
// reason: new, recreate_terminating_or_failed, recreate_failed
// status: succeeded, failed
JobPodsCreationTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_pods_creation_total",
Help: `The number of Pods created by the Job controller labelled with a reason for the Pod creation.
This metric also distinguishes between Pods created using different PodReplacementPolicy settings.
Possible values of the "reason" label are:
"new", "recreate_terminating_or_failed", "recreate_failed".
Possible values of the "status" label are:
"succeeded", "failed".`,
}, []string{"reason", "status"})
) )
const ( const (
@ -147,7 +165,7 @@ const (
// parallelism. // parallelism.
JobSyncActionPodsDeleted = "pods_deleted" JobSyncActionPodsDeleted = "pods_deleted"
// Possible values for "result" label in the above metrics. // Possible values for "result" and "status" (job_pods_creation_total) labels in the above metrics.
Succeeded = "succeeded" Succeeded = "succeeded"
Failed = "failed" Failed = "failed"
@ -156,6 +174,12 @@ const (
// metric. // metric.
Add = "add" Add = "add"
Delete = "delete" Delete = "delete"
// Possible values for "reason" label in the job_pods_creation_total metric.
PodCreateNew = "new"
PodRecreateTerminatingOrFailed = "recreate_terminating_or_failed"
PodRecreateFailed = "recreate_failed"
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -170,5 +194,6 @@ func Register() {
legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy) legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
legacyregistry.MustRegister(JobFinishedIndexesTotal) legacyregistry.MustRegister(JobFinishedIndexesTotal)
legacyregistry.MustRegister(JobPodsCreationTotal)
}) })
} }

View File

@ -82,6 +82,7 @@ func TestMetricsOnSuccesses(t *testing.T) {
job *batchv1.Job job *batchv1.Job
wantJobFinishedNumMetric metricLabelsWithValue wantJobFinishedNumMetric metricLabelsWithValue
wantJobPodsFinishedMetric metricLabelsWithValue wantJobPodsFinishedMetric metricLabelsWithValue
wantJobPodsCreatedMetric metricLabelsWithValue
}{ }{
"non-indexed job": { "non-indexed job": {
job: &batchv1.Job{ job: &batchv1.Job{
@ -99,6 +100,10 @@ func TestMetricsOnSuccesses(t *testing.T) {
Labels: []string{"NonIndexed", "succeeded"}, Labels: []string{"NonIndexed", "succeeded"},
Value: 2, Value: 2,
}, },
wantJobPodsCreatedMetric: metricLabelsWithValue{
Labels: []string{"new", "succeeded"},
Value: 2,
},
}, },
"indexed job": { "indexed job": {
job: &batchv1.Job{ job: &batchv1.Job{
@ -116,6 +121,10 @@ func TestMetricsOnSuccesses(t *testing.T) {
Labels: []string{"Indexed", "succeeded"}, Labels: []string{"Indexed", "succeeded"},
Value: 2, Value: 2,
}, },
wantJobPodsCreatedMetric: metricLabelsWithValue{
Labels: []string{"new", "succeeded"},
Value: 2,
},
}, },
} }
job_index := 0 // job index to avoid collisions between job names created by different test cases job_index := 0 // job index to avoid collisions between job names created by different test cases
@ -142,6 +151,7 @@ func TestMetricsOnSuccesses(t *testing.T) {
// verify metric values after the job is finished // verify metric values after the job is finished
validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) validateCounterMetric(ctx, t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateCounterMetric(ctx, t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric) validateCounterMetric(ctx, t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
validateCounterMetric(ctx, t, metrics.JobPodsCreationTotal, tc.wantJobPodsCreatedMetric)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, int(*jobObj.Spec.Parallelism)) validateTerminatedPodsTrackingFinalizerMetric(ctx, t, int(*jobObj.Spec.Parallelism))
}) })
} }
@ -1700,11 +1710,17 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed int failed int
terminating *int32 terminating *int32
} }
type jobPodsCreationMetrics struct {
new int
recreateTerminatingOrFailed int
recreateFailed int
}
cases := map[string]struct { cases := map[string]struct {
podReplacementPolicyEnabled bool podReplacementPolicyEnabled bool
jobSpec *batchv1.JobSpec jobSpec *batchv1.JobSpec
wantStatusAfterDeletion jobStatus wantStatusAfterDeletion jobStatus
wantStatusAfterFailure jobStatus wantStatusAfterFailure jobStatus
wantMetrics jobPodsCreationMetrics
}{ }{
"feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
jobSpec: &batchv1.JobSpec{ jobSpec: &batchv1.JobSpec{
@ -1725,6 +1741,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
active: 2, active: 2,
failed: 2, failed: 2,
}, },
wantMetrics: jobPodsCreationMetrics{
new: 4,
},
}, },
"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1749,6 +1768,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateTerminatingOrFailed: 2,
},
}, },
"feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1773,6 +1796,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateTerminatingOrFailed: 2,
},
}, },
"feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: false, podReplacementPolicyEnabled: false,
@ -1804,6 +1831,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
wantStatusAfterFailure: jobStatus{ wantStatusAfterFailure: jobStatus{
active: 2, active: 2,
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
},
}, },
"feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1828,6 +1858,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateFailed: 2,
},
}, },
"feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1852,6 +1886,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateFailed: 2,
},
}, },
} }
for name, tc := range cases { for name, tc := range cases {
@ -1887,13 +1925,31 @@ func TestJobPodReplacementPolicy(t *testing.T) {
}) })
failTerminatingPods(ctx, t, clientSet, ns.Name) failTerminatingPods(ctx, t, clientSet, ns.Name)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Terminating: tc.wantStatusAfterFailure.terminating, Terminating: tc.wantStatusAfterFailure.terminating,
Failed: tc.wantStatusAfterFailure.failed, Failed: tc.wantStatusAfterFailure.failed,
Active: tc.wantStatusAfterFailure.active, Active: tc.wantStatusAfterFailure.active,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
}) })
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"new", "succeeded"}, Value: tc.wantMetrics.new},
)
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"recreate_terminating_or_failed", "succeeded"}, Value: tc.wantMetrics.recreateTerminatingOrFailed},
)
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"recreate_failed", "succeeded"}, Value: tc.wantMetrics.recreateFailed},
)
}) })
} }
} }
@ -3002,6 +3058,7 @@ func resetMetrics() {
metrics.JobPodsFinished.Reset() metrics.JobPodsFinished.Reset()
metrics.PodFailuresHandledByFailurePolicy.Reset() metrics.PodFailuresHandledByFailurePolicy.Reset()
metrics.JobFinishedIndexesTotal.Reset() metrics.JobFinishedIndexesTotal.Reset()
metrics.JobPodsCreationTotal.Reset()
} }
func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {