diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 3adb3bdf4c9..0ed26946dac 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1546,6 +1546,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers) + // Counters for pod creation status (used by the job_pods_creation_total metric) + var creationsSucceeded, creationsFailed int32 = 0, 0 + // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // and double with each successful iteration in a kind of "slow start". // This handles attempts to start large numbers of pods that would @@ -1595,7 +1598,9 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn jm.expectations.CreationObserved(logger, jobKey) atomic.AddInt32(&active, -1) errCh <- err + atomic.AddInt32(&creationsFailed, 1) } + atomic.AddInt32(&creationsSucceeded, 1) }() } wait.Wait() @@ -1614,6 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } diff -= batchSize } + recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed) return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) } @@ -1911,3 +1917,23 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) { } } } + +func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) { + reason := metrics.PodCreateNew + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { + podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0 + isRecreateAction := podsTerminating || job.Status.Failed > 0 + if isRecreateAction { + reason = metrics.PodRecreateTerminatingOrFailed + if *job.Spec.PodReplacementPolicy == batch.Failed { + reason = metrics.PodRecreateFailed + } + } + } + if succeeded > 0 { + metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Succeeded).Add(float64(succeeded)) + } + if failed > 0 { + metrics.JobPodsCreationTotal.WithLabelValues(reason, metrics.Failed).Add(float64(failed)) + } +} diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index d70acc43d88..39a82f53f9b 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -125,6 +125,24 @@ The event label can be "add" or "delete".`, backoffLimit label are: "perIndex" and "global"`, }, []string{"status", "backoffLimit"}) + + // JobPodsCreationTotal records the number of pods created by the job controller + // based on the reason for their creation (i.e. if PodReplacementPolicy was specified) + // and the status of the creation (i.e. if the Pod creation succeeded or failed). + // Possible label values: + // reason: new, recreate_terminating_or_failed, recreate_failed + // status: succeeded, failed + JobPodsCreationTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_pods_creation_total", + Help: `The number of Pods created by the Job controller labelled with a reason for the Pod creation. +This metric also distinguishes between Pods created using different PodReplacementPolicy settings. +Possible values of the "reason" label are: +"new", "recreate_terminating_or_failed", "recreate_failed". +Possible values of the "status" label are: +"succeeded", "failed".`, + }, []string{"reason", "status"}) ) const ( @@ -147,7 +165,7 @@ const ( // parallelism. JobSyncActionPodsDeleted = "pods_deleted" - // Possible values for "result" label in the above metrics. + // Possible values for "result" and "status" (job_pods_creation_total) labels in the above metrics. Succeeded = "succeeded" Failed = "failed" @@ -156,6 +174,12 @@ const ( // metric. Add = "add" Delete = "delete" + + // Possible values for "reason" label in the job_pods_creation_total metric. + + PodCreateNew = "new" + PodRecreateTerminatingOrFailed = "recreate_terminating_or_failed" + PodRecreateFailed = "recreate_failed" ) var registerMetrics sync.Once @@ -170,5 +194,6 @@ func Register() { legacyregistry.MustRegister(PodFailuresHandledByFailurePolicy) legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) legacyregistry.MustRegister(JobFinishedIndexesTotal) + legacyregistry.MustRegister(JobPodsCreationTotal) }) } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index cce3cec7905..25c4ee00fc1 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1700,11 +1700,17 @@ func TestJobPodReplacementPolicy(t *testing.T) { failed int terminating *int32 } + type jobPodsCreationMetrics struct { + new int + recreateTerminatingOrFailed int + recreateFailed int + } cases := map[string]struct { podReplacementPolicyEnabled bool jobSpec *batchv1.JobSpec wantStatusAfterDeletion jobStatus wantStatusAfterFailure jobStatus + wantMetrics jobPodsCreationMetrics }{ "feature flag off, delete & fail pods, recreate terminating pods, and verify job status counters": { jobSpec: &batchv1.JobSpec{ @@ -1725,6 +1731,9 @@ func TestJobPodReplacementPolicy(t *testing.T) { active: 2, failed: 2, }, + wantMetrics: jobPodsCreationMetrics{ + new: 4, + }, }, "feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { podReplacementPolicyEnabled: true, @@ -1749,6 +1758,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { failed: 2, terminating: ptr.To[int32](0), }, + wantMetrics: jobPodsCreationMetrics{ + new: 2, + recreateTerminatingOrFailed: 2, + }, }, "feature flag true with NonIndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { podReplacementPolicyEnabled: true, @@ -1773,6 +1786,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { failed: 2, terminating: ptr.To[int32](0), }, + wantMetrics: jobPodsCreationMetrics{ + new: 2, + recreateTerminatingOrFailed: 2, + }, }, "feature flag false, podFailurePolicy enabled, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: false, @@ -1804,6 +1821,9 @@ func TestJobPodReplacementPolicy(t *testing.T) { wantStatusAfterFailure: jobStatus{ active: 2, }, + wantMetrics: jobPodsCreationMetrics{ + new: 2, + }, }, "feature flag true, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: true, @@ -1828,6 +1848,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { failed: 2, terminating: ptr.To[int32](0), }, + wantMetrics: jobPodsCreationMetrics{ + new: 2, + recreateFailed: 2, + }, }, "feature flag true with NonIndexedJob, Failed policy, delete & fail pods, recreate failed pods, and verify job status counters": { podReplacementPolicyEnabled: true, @@ -1852,6 +1876,10 @@ func TestJobPodReplacementPolicy(t *testing.T) { failed: 2, terminating: ptr.To[int32](0), }, + wantMetrics: jobPodsCreationMetrics{ + new: 2, + recreateFailed: 2, + }, }, } for name, tc := range cases { @@ -1887,13 +1915,31 @@ func TestJobPodReplacementPolicy(t *testing.T) { }) failTerminatingPods(ctx, t, clientSet, ns.Name) - validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{ Terminating: tc.wantStatusAfterFailure.terminating, Failed: tc.wantStatusAfterFailure.failed, Active: tc.wantStatusAfterFailure.active, Ready: ptr.To[int32](0), }) + + validateCounterMetric( + ctx, + t, + metrics.JobPodsCreationTotal, + metricLabelsWithValue{Labels: []string{"new", "succeeded"}, Value: tc.wantMetrics.new}, + ) + validateCounterMetric( + ctx, + t, + metrics.JobPodsCreationTotal, + metricLabelsWithValue{Labels: []string{"recreate_terminating_or_failed", "succeeded"}, Value: tc.wantMetrics.recreateTerminatingOrFailed}, + ) + validateCounterMetric( + ctx, + t, + metrics.JobPodsCreationTotal, + metricLabelsWithValue{Labels: []string{"recreate_failed", "succeeded"}, Value: tc.wantMetrics.recreateFailed}, + ) }) } } @@ -3002,6 +3048,7 @@ func resetMetrics() { metrics.JobPodsFinished.Reset() metrics.PodFailuresHandledByFailurePolicy.Reset() metrics.JobFinishedIndexesTotal.Reset() + metrics.JobPodsCreationTotal.Reset() } func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) {