Merge pull request #121481 from dejanzele/job-pod-creation-total-metrics

feat: add job_pods_creation_total metric
This commit is contained in:
Kubernetes Prow Robot 2023-10-24 23:43:20 +02:00 committed by GitHub
commit 8cdab8a5db
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 100 additions and 2 deletions

View File

@ -1546,6 +1546,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers) podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
// Counters for pod creation status (used by the job_pods_creation_total metric)
var creationsSucceeded, creationsFailed int32 = 0, 0
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
// This handles attempts to start large numbers of pods that would // This handles attempts to start large numbers of pods that would
@ -1595,7 +1598,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.expectations.CreationObserved(logger, jobKey) jm.expectations.CreationObserved(logger, jobKey)
atomic.AddInt32(&active, -1) atomic.AddInt32(&active, -1)
errCh <- err errCh <- err
atomic.AddInt32(&creationsFailed, 1)
} }
atomic.AddInt32(&creationsSucceeded, 1)
}() }()
} }
wait.Wait() wait.Wait()
@ -1614,6 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
diff -= batchSize diff -= batchSize
} }
recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed)
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
} }
@ -1911,3 +1917,23 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
} }
} }
} }
func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) {
reason := metrics.PodCreateNew
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0
isRecreateAction := podsTerminating || job.Status.Failed > 0
if isRecreateAction {
reason = metrics.PodRecreateTerminatingOrFailed
if *job.Spec.PodReplacementPolicy == batch.Failed {
reason = metrics.PodRecreateFailed
}
}
}
if succeeded > 0 {
metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Succeeded).Add(float64(succeeded))
}
if failed > 0 {
metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed))
}
}

View File

@ -125,6 +125,24 @@ The event label can be "add" or "delete".`,
backoffLimit label are: "perIndex" and "global"`, backoffLimit label are: "perIndex" and "global"`,
}, },
[]string{"status", "backoffLimit"}) []string{"status", "backoffLimit"})
// JobPodsCreationTotal records the number of pods created by the job controller
// based on the reason for their creation (i.e. if PodReplacementPolicy was specified)
// and the status of the creation (i.e. if the Pod creation succeeded or failed).
// Possible label values:
// reason: new, recreate_terminating_or_failed, recreate_failed
// status: succeeded, failed
JobPodsCreationTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "job_pods_creation_total",
Help: `The number of Pods created by the Job controller labelled with a reason for the Pod creation.
This metric also distinguishes between Pods created using different PodReplacementPolicy settings.
Possible values of the "reason" label are:
"new", "recreate_terminating_or_failed", "recreate_failed".
Possible values of the "status" label are:
"succeeded", "failed".`,
}, []string{"reason", "status"})
) )
const ( const (
@ -147,7 +165,7 @@ const (
// parallelism. // parallelism.
JobSyncActionPodsDeleted = "pods_deleted" JobSyncActionPodsDeleted = "pods_deleted"
// Possible values for "result" label in the above metrics. // Possible values for "result" and "status" (job_pods_creation_total) labels in the above metrics.
Succeeded = "succeeded" Succeeded = "succeeded"
Failed = "failed" Failed = "failed"
@ -156,6 +174,12 @@ const (
// metric. // metric.
Add = "add" Add = "add"
Delete = "delete" Delete = "delete"
// Possible values for "reason" label in the job_pods_creation_total metric.
PodCreateNew = "new"
PodRecreateTerminatingOrFailed = "recreate_terminating_or_failed"
PodRecreateFailed = "recreate_failed"
) )
var registerMetrics sync.Once var registerMetrics sync.Once
@ -170,5 +194,6 @@ func Register() {
legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy) legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
legacyregistry.MustRegister(JobFinishedIndexesTotal) legacyregistry.MustRegister(JobFinishedIndexesTotal)
legacyregistry.MustRegister(JobPodsCreationTotal)
}) })
} }

View File

@ -1700,11 +1700,17 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed int failed int
terminating *int32 terminating *int32
} }
type jobPodsCreationMetrics struct {
new int
recreateTerminatingOrFailed int
recreateFailed int
}
cases := map[string]struct { cases := map[string]struct {
podReplacementPolicyEnabled bool podReplacementPolicyEnabled bool
jobSpec *batchv1.JobSpec jobSpec *batchv1.JobSpec
wantStatusAfterDeletion jobStatus wantStatusAfterDeletion jobStatus
wantStatusAfterFailure jobStatus wantStatusAfterFailure jobStatus
wantMetrics jobPodsCreationMetrics
}{ }{
"feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": {
jobSpec: &batchv1.JobSpec{ jobSpec: &batchv1.JobSpec{
@ -1725,6 +1731,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
active: 2, active: 2,
failed: 2, failed: 2,
}, },
wantMetrics: jobPodsCreationMetrics{
new: 4,
},
}, },
"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1749,6 +1758,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateTerminatingOrFailed: 2,
},
}, },
"feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1773,6 +1786,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateTerminatingOrFailed: 2,
},
}, },
"feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: false, podReplacementPolicyEnabled: false,
@ -1804,6 +1821,9 @@ func TestJobPodReplacementPolicy(t *testing.T) {
wantStatusAfterFailure: jobStatus{ wantStatusAfterFailure: jobStatus{
active: 2, active: 2,
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
},
}, },
"feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1828,6 +1848,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateFailed: 2,
},
}, },
"feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { "feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
@ -1852,6 +1876,10 @@ func TestJobPodReplacementPolicy(t *testing.T) {
failed: 2, failed: 2,
terminating: ptr.To[int32](0), terminating: ptr.To[int32](0),
}, },
wantMetrics: jobPodsCreationMetrics{
new: 2,
recreateFailed: 2,
},
}, },
} }
for name, tc := range cases { for name, tc := range cases {
@ -1887,13 +1915,31 @@ func TestJobPodReplacementPolicy(t *testing.T) {
}) })
failTerminatingPods(ctx, t, clientSet, ns.Name) failTerminatingPods(ctx, t, clientSet, ns.Name)
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Terminating: tc.wantStatusAfterFailure.terminating, Terminating: tc.wantStatusAfterFailure.terminating,
Failed: tc.wantStatusAfterFailure.failed, Failed: tc.wantStatusAfterFailure.failed,
Active: tc.wantStatusAfterFailure.active, Active: tc.wantStatusAfterFailure.active,
Ready: ptr.To[int32](0), Ready: ptr.To[int32](0),
}) })
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"new", "succeeded"}, Value: tc.wantMetrics.new},
)
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"recreate_terminating_or_failed", "succeeded"}, Value: tc.wantMetrics.recreateTerminatingOrFailed},
)
validateCounterMetric(
ctx,
t,
metrics.JobPodsCreationTotal,
metricLabelsWithValue{Labels: []string{"recreate_failed", "succeeded"}, Value: tc.wantMetrics.recreateFailed},
)
}) })
} }
} }
@ -3002,6 +3048,7 @@ func resetMetrics() {
metrics.JobPodsFinished.Reset() metrics.JobPodsFinished.Reset()
metrics.PodFailuresHandledByFailurePolicy.Reset() metrics.PodFailuresHandledByFailurePolicy.Reset()
metrics.JobFinishedIndexesTotal.Reset() metrics.JobFinishedIndexesTotal.Reset()
metrics.JobPodsCreationTotal.Reset()
} }
func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {