From 115dc90633f536571eacb6e43862b35d0bb7e9ce Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 12 Feb 2024 17:25:11 +0100 Subject: [PATCH] Increase accuracy of the pods_creation_total metric and improve test exec time --- pkg/controller/job/job_controller.go | 18 ++++++++---------- pkg/controller/job/job_controller_test.go | 11 ++++++++++- test/integration/job/job_test.go | 6 ++++-- 3 files changed, 22 insertions(+), 13 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 1b0da9511cc..2f1fe144772 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -130,6 +130,7 @@ type syncJobCtx struct { finishedCondition *batch.JobCondition activePods []*v1.Pod succeeded int32 + failed int32 prevSucceededIndexes orderedIntervals succeededIndexes orderedIntervals failedIndexes *orderedIntervals @@ -790,7 +791,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { active := int32(len(jobCtx.activePods)) newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx) jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded)) - failed := job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed)) + jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed)) var ready *int32 if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { ready = ptr.To(countReadyPods(jobCtx.activePods)) @@ -806,7 +807,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { var manageJobErr error - exceedsBackoffLimit := failed > *job.Spec.BackoffLimit + exceedsBackoffLimit := jobCtx.failed > *job.Spec.BackoffLimit if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { @@ -1618,7 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn } diff -= batchSize } - recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed) + recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed) return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) } @@ -1917,16 +1918,13 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) { } } -func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) { +func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) { reason := metrics.PodCreateNew if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { - podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0 - isRecreateAction := podsTerminating || job.Status.Failed > 0 - if isRecreateAction { + if *job.Spec.PodReplacementPolicy == batch.Failed && jobCtx.failed > 0 { + reason = metrics.PodRecreateFailed + } else if jobCtx.failed > 0 || ptr.Deref(jobCtx.terminating, 0) > 0 { reason = metrics.PodRecreateTerminatingOrFailed - if *job.Spec.PodReplacementPolicy == batch.Failed { - reason = metrics.PodRecreateFailed - } } } if succeeded > 0 { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 7f33daa9b62..07d6f71448a 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -115,7 +115,7 @@ func newJobWithName(name string, parallelism, completions, backoffLimit int32, c j.Spec.Parallelism = nil } j.Spec.BackoffLimit = &backoffLimit - + defaultPodReplacementPolicy(j) return j } @@ -3880,6 +3880,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady job := &tc.job + defaultPodReplacementPolicy(job) actual := job manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { @@ -5531,3 +5532,11 @@ func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() { *val = origVal } } + +// Helper to simulate defaulting of the PodReplacementPolicy field in unit tests +// as the job controller code assumes it is set by the kube-apiserver. +func defaultPodReplacementPolicy(job *batch.Job) { + if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && job.Spec.PodReplacementPolicy == nil { + job.Spec.PodReplacementPolicy = ptr.To(batch.TerminatingOrFailed) + } +} diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4067c82b994..da6392d24f3 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -1587,6 +1587,7 @@ func TestIndexedJob(t *testing.T) { } func TestJobPodReplacementPolicy(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) indexedCompletion := batchv1.IndexedCompletion nonIndexedCompletion := batchv1.NonIndexedCompletion var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy { @@ -1632,7 +1633,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { new: 4, }, }, - "feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { + "feature flag true with IndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { podReplacementPolicyEnabled: true, jobSpec: &batchv1.JobSpec{ Parallelism: ptr.To[int32](2), @@ -1665,7 +1666,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { jobSpec: &batchv1.JobSpec{ Parallelism: ptr.To[int32](2), Completions: ptr.To[int32](2), - CompletionMode: &indexedCompletion, + CompletionMode: &nonIndexedCompletion, PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), Template: v1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ @@ -1846,6 +1847,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { // Disable reverts to previous behavior. // Enabling will then match the original failed case. func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) { + t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) const podCount int32 = 2 jobSpec := batchv1.JobSpec{ Parallelism: ptr.To(podCount),