Increase accuracy of the pods_creation_total metric and improve test exec time

This commit is contained in:
Michal Wozniak 2024-02-12 17:25:11 +01:00
parent b202d053fa
commit 115dc90633
3 changed files with 22 additions and 13 deletions

View File

@ -130,6 +130,7 @@ type syncJobCtx struct {
finishedCondition *batch.JobCondition finishedCondition *batch.JobCondition
activePods []*v1.Pod activePods []*v1.Pod
succeeded int32 succeeded int32
failed int32
prevSucceededIndexes orderedIntervals prevSucceededIndexes orderedIntervals
succeededIndexes orderedIntervals succeededIndexes orderedIntervals
failedIndexes *orderedIntervals failedIndexes *orderedIntervals
@ -790,7 +791,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
active := int32(len(jobCtx.activePods)) active := int32(len(jobCtx.activePods))
newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx) newSucceededPods, newFailedPods := getNewFinishedPods(jobCtx)
jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded)) jobCtx.succeeded = job.Status.Succeeded + int32(len(newSucceededPods)) + int32(len(jobCtx.uncounted.succeeded))
failed := job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed)) jobCtx.failed = job.Status.Failed + int32(nonIgnoredFailedPodsCount(jobCtx, newFailedPods)) + int32(len(jobCtx.uncounted.failed))
var ready *int32 var ready *int32
if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) { if feature.DefaultFeatureGate.Enabled(features.JobReadyPods) {
ready = ptr.To(countReadyPods(jobCtx.activePods)) ready = ptr.To(countReadyPods(jobCtx.activePods))
@ -806,7 +807,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
var manageJobErr error var manageJobErr error
exceedsBackoffLimit := failed > *job.Spec.BackoffLimit exceedsBackoffLimit := jobCtx.failed > *job.Spec.BackoffLimit
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
@ -1618,7 +1619,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
diff -= batchSize diff -= batchSize
} }
recordJobPodsCreationTotal(job, creationsSucceeded, creationsFailed) recordJobPodsCreationTotal(job, jobCtx, creationsSucceeded, creationsFailed)
return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh) return active, metrics.JobSyncActionPodsCreated, errorFromChannel(errCh)
} }
@ -1917,16 +1918,13 @@ func (jm *Controller) cleanupPodFinalizers(job *batch.Job) {
} }
} }
func recordJobPodsCreationTotal(job *batch.Job, succeeded, failed int32) { func recordJobPodsCreationTotal(job *batch.Job, jobCtx *syncJobCtx, succeeded, failed int32) {
reason := metrics.PodCreateNew reason := metrics.PodCreateNew
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) { if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
podsTerminating := job.Status.Terminating != nil && *job.Status.Terminating > 0 if *job.Spec.PodReplacementPolicy == batch.Failed && jobCtx.failed > 0 {
isRecreateAction := podsTerminating || job.Status.Failed > 0 reason = metrics.PodRecreateFailed
if isRecreateAction { } else if jobCtx.failed > 0 || ptr.Deref(jobCtx.terminating, 0) > 0 {
reason = metrics.PodRecreateTerminatingOrFailed reason = metrics.PodRecreateTerminatingOrFailed
if *job.Spec.PodReplacementPolicy == batch.Failed {
reason = metrics.PodRecreateFailed
}
} }
} }
if succeeded > 0 { if succeeded > 0 {

View File

@ -115,7 +115,7 @@ func newJobWithName(name string, parallelism, completions, backoffLimit int32, c
j.Spec.Parallelism = nil j.Spec.Parallelism = nil
} }
j.Spec.BackoffLimit = &backoffLimit j.Spec.BackoffLimit = &backoffLimit
defaultPodReplacementPolicy(j)
return j return j
} }
@ -3880,6 +3880,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) {
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.jobStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady
job := &tc.job job := &tc.job
defaultPodReplacementPolicy(job)
actual := job actual := job
manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) { manager.updateStatusHandler = func(ctx context.Context, job *batch.Job) (*batch.Job, error) {
@ -5531,3 +5532,11 @@ func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
*val = origVal *val = origVal
} }
} }
// Helper to simulate defaulting of the PodReplacementPolicy field in unit tests
// as the job controller code assumes it is set by the kube-apiserver.
func defaultPodReplacementPolicy(job *batch.Job) {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && job.Spec.PodReplacementPolicy == nil {
job.Spec.PodReplacementPolicy = ptr.To(batch.TerminatingOrFailed)
}
}

View File

@ -1587,6 +1587,7 @@ func TestIndexedJob(t *testing.T) {
} }
func TestJobPodReplacementPolicy(t *testing.T) { func TestJobPodReplacementPolicy(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
indexedCompletion := batchv1.IndexedCompletion indexedCompletion := batchv1.IndexedCompletion
nonIndexedCompletion := batchv1.NonIndexedCompletion nonIndexedCompletion := batchv1.NonIndexedCompletion
var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy { var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
@ -1632,7 +1633,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
new: 4, new: 4,
}, },
}, },
"feature flag true, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": { "feature flag true with IndexedJob, TerminatingOrFailed policy, delete & fail pods, recreate terminating pods, and verify job status counters": {
podReplacementPolicyEnabled: true, podReplacementPolicyEnabled: true,
jobSpec: &batchv1.JobSpec{ jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To[int32](2), Parallelism: ptr.To[int32](2),
@ -1665,7 +1666,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
jobSpec: &batchv1.JobSpec{ jobSpec: &batchv1.JobSpec{
Parallelism: ptr.To[int32](2), Parallelism: ptr.To[int32](2),
Completions: ptr.To[int32](2), Completions: ptr.To[int32](2),
CompletionMode: &indexedCompletion, CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed), PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{ Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -1846,6 +1847,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
// Disable reverts to previous behavior. // Disable reverts to previous behavior.
// Enabling will then match the original failed case. // Enabling will then match the original failed case.
func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) { func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
const podCount int32 = 2 const podCount int32 = 2
jobSpec := batchv1.JobSpec{ jobSpec := batchv1.JobSpec{
Parallelism: ptr.To(podCount), Parallelism: ptr.To(podCount),