From 2a81337e7cfb8a233c278f86d02496f1dfff0d31 Mon Sep 17 00:00:00 2001 From: Daniel Vega-Myhre Date: Fri, 27 Jan 2023 00:02:18 +0000 Subject: [PATCH] update prev succeeded indexes for indexed jobs unconditionally --- pkg/controller/job/indexed_job_utils.go | 13 ++- pkg/controller/job/job_controller.go | 30 +++++- pkg/controller/job/job_controller_test.go | 112 ++++++++++++++++++---- 3 files changed, 126 insertions(+), 29 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 11ee7e08745..e2a78e557b5 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -52,7 +52,7 @@ type orderedIntervals []interval // empty list if this Job is not tracked with finalizers. The new list includes // the indexes that succeeded since the last sync. func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { - prevIntervals := succeededIndexesFromJob(job) + prevIntervals := succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) newSucceeded := sets.NewInt() for _, p := range pods { ix := getCompletionIndex(p.Annotations) @@ -148,20 +148,19 @@ func (oi orderedIntervals) has(ix int) bool { return oi[hi].First <= ix } -func succeededIndexesFromJob(job *batch.Job) orderedIntervals { - if job.Status.CompletedIndexes == "" { +func succeededIndexesFromString(completedIndexes string, completions int) orderedIntervals { + if completedIndexes == "" { return nil } var result orderedIntervals var lastInterval *interval - completions := int(*job.Spec.Completions) - for _, intervalStr := range strings.Split(job.Status.CompletedIndexes, ",") { + for _, intervalStr := range strings.Split(completedIndexes, ",") { limitsStr := strings.Split(intervalStr, "-") var inter interval var err error inter.First, err = strconv.Atoi(limitsStr[0]) if err != nil { - klog.InfoS("Corrupted completed indexes interval, ignoring", "job", klog.KObj(job), "interval", intervalStr, "err", err) + klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.First >= completions { @@ -170,7 +169,7 @@ func succeededIndexesFromJob(job *batch.Job) orderedIntervals { if len(limitsStr) > 1 { inter.Last, err = strconv.Atoi(limitsStr[1]) if err != nil { - klog.InfoS("Corrupted completed indexes interval, ignoring", "job", klog.KObj(job), "interval", intervalStr, "err", err) + klog.InfoS("Corrupted completed indexes interval, ignoring", "interval", intervalStr, "err", err) continue } if inter.Last >= completions { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index cac4da12f37..3eebbd262e6 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -968,6 +968,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job uidsWithFinalizer.Insert(uid) } } + // Shallow copy, as it will only be used to detect changes in the counters. oldCounters := job.Status if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { @@ -1041,10 +1042,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job break } } - if len(newSucceededIndexes) > 0 { + if isIndexed { succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) + succeededIndexesStr := succeededIndexes.String() + if succeededIndexesStr != job.Status.CompletedIndexes { + needsFlush = true + } job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexes.String() + job.Status.CompletedIndexes = succeededIndexesStr } if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if finishedCond != nil && finishedCond.Type == batch.JobFailureTarget { @@ -1680,7 +1685,7 @@ func isPodFailed(p *v1.Pod, job *batch.Job) bool { return true } // Count deleted Pods as failures to account for orphan Pods that - // never have a chance to reach the Failed phase. + // never have a chance to reach the Failed phase. return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded } @@ -1695,9 +1700,24 @@ func findConditionByType(list []batch.JobCondition, cType batch.JobConditionType func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { completionMode := completionModeStr(job) - diff := job.Status.Succeeded - oldCounters.Succeeded + var diff int + + // Updating succeeded metric must be handled differently + // for Indexed Jobs to handle the case where the job has + // been scaled down by reducing completions & parallelism + // in tandem, and now a previously completed index is + // now out of range (i.e. index >= spec.Completions). + if isIndexedJob(job) { + if job.Status.CompletedIndexes != oldCounters.CompletedIndexes { + diff = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)).total() - succeededIndexesFromString(oldCounters.CompletedIndexes, int(*job.Spec.Completions)).total() + } + } else { + diff = int(job.Status.Succeeded) - int(oldCounters.Succeeded) + } metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff)) - diff = job.Status.Failed - oldCounters.Failed + + // Update failed metric. + diff = int(job.Status.Failed - oldCounters.Failed) metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 038ae52e5f9..9f5b3d7b1f2 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1043,16 +1043,18 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { indexedCompletion := batch.IndexedCompletion mockErr := errors.New("mock error") cases := map[string]struct { - job batch.Job - pods []*v1.Pod - finishedCond *batch.JobCondition - expectedRmFinalizers sets.String - needsFlush bool - statusUpdateErr error - podControlErr error - wantErr error - wantRmFinalizers int - wantStatusUpdates []batch.JobStatus + job batch.Job + pods []*v1.Pod + finishedCond *batch.JobCondition + expectedRmFinalizers sets.String + needsFlush bool + statusUpdateErr error + podControlErr error + wantErr error + wantRmFinalizers int + wantStatusUpdates []batch.JobStatus + wantSucceededPodsMetric int + wantFailedPodsMetric int }{ "no updates": {}, "new active": { @@ -1093,6 +1095,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Failed: 3, }, }, + wantSucceededPodsMetric: 2, + wantFailedPodsMetric: 3, }, "past and new finished pods": { job: batch.Job{ @@ -1133,6 +1137,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Failed: 6, }, }, + wantSucceededPodsMetric: 3, + wantFailedPodsMetric: 3, }, "expecting removed finalizers": { job: batch.Job{ @@ -1172,6 +1178,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Failed: 6, }, }, + wantSucceededPodsMetric: 3, + wantFailedPodsMetric: 3, }, "succeeding job": { pods: []*v1.Pod{ @@ -1195,6 +1203,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { CompletionTime: &succeededCond.LastTransitionTime, }, }, + wantSucceededPodsMetric: 1, + wantFailedPodsMetric: 1, }, "failing job": { pods: []*v1.Pod{ @@ -1219,6 +1229,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Conditions: []batch.JobCondition{*failedCond}, }, }, + wantSucceededPodsMetric: 1, + wantFailedPodsMetric: 2, }, "deleted job": { job: batch.Job{ @@ -1251,6 +1263,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Failed: 1, }, }, + wantSucceededPodsMetric: 1, + wantFailedPodsMetric: 1, }, "status update error": { pods: []*v1.Pod{ @@ -1339,6 +1353,62 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, + wantSucceededPodsMetric: 2, + }, + "indexed job prev successful pods outside current completions index range with no new succeeded pods": { + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + }, + Status: batch.JobStatus{ + Active: 2, + Succeeded: 1, + CompletedIndexes: "3", + }, + }, + pods: []*v1.Pod{ + buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, + buildPod().phase(v1.PodRunning).trackingFinalizer().index("1").Pod, + }, + wantRmFinalizers: 0, + wantStatusUpdates: []batch.JobStatus{ + { + Active: 2, + Succeeded: 0, + CompletedIndexes: "", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + }, + "indexed job prev successful pods outside current completions index range with new succeeded pods in range": { + job: batch.Job{ + Spec: batch.JobSpec{ + CompletionMode: &indexedCompletion, + Completions: pointer.Int32(2), + Parallelism: pointer.Int32(2), + }, + Status: batch.JobStatus{ + Active: 2, + Succeeded: 1, + CompletedIndexes: "3", + }, + }, + pods: []*v1.Pod{ + buildPod().phase(v1.PodRunning).trackingFinalizer().index("0").Pod, + buildPod().phase(v1.PodSucceeded).trackingFinalizer().index("1").Pod, + }, + wantRmFinalizers: 1, + wantStatusUpdates: []batch.JobStatus{ + { + Active: 2, + Succeeded: 1, + CompletedIndexes: "1", + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + }, + }, + wantSucceededPodsMetric: 1, }, "indexed job new failed pods": { job: batch.Job{ @@ -1371,6 +1441,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, + wantFailedPodsMetric: 3, }, "indexed job past and new pods": { job: batch.Job{ @@ -1409,6 +1480,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, }, }, + wantSucceededPodsMetric: 1, + wantFailedPodsMetric: 2, }, "too many finished": { job: batch.Job{ @@ -1449,6 +1522,8 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Failed: 1, }, }, + wantSucceededPodsMetric: 499, + wantFailedPodsMetric: 1, }, "too many indexed finished": { job: batch.Job{ @@ -1472,6 +1547,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Succeeded: 500, }, }, + wantSucceededPodsMetric: 500, }, "pod flips from failed to succeeded": { job: batch.Job{ @@ -1498,6 +1574,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { Conditions: []batch.JobCondition{*failedCond}, }, }, + wantFailedPodsMetric: 2, }, } for name, tc := range cases { @@ -1517,7 +1594,10 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) - succeededIndexes := succeededIndexesFromJob(job) + var succeededIndexes orderedIntervals + if isIndexedJob(job) { + succeededIndexes = succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) + } err := manager.trackJobStatusAndRemoveFinalizers(context.TODO(), job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) @@ -1535,17 +1615,15 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if err != nil { t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) } - newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded - if float64(newSucceeded) != v { - t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded) + if float64(tc.wantSucceededPodsMetric) != v { + t.Errorf("Metric reports %.0f succeeded pods, want %d", v, tc.wantSucceededPodsMetric) } v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) if err != nil { t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) } - newFailed := job.Status.Failed - tc.job.Status.Failed - if float64(newFailed) != v { - t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed) + if float64(tc.wantFailedPodsMetric) != v { + t.Errorf("Metric reports %.0f failed pods, want %d", v, tc.wantFailedPodsMetric) } } })