From 4cf7e75841738a341f112adc444b91ba1f659193 Mon Sep 17 00:00:00 2001 From: Mengxue Zhang Date: Thu, 15 Apr 2021 22:24:22 +0000 Subject: [PATCH] indexed job: remove pods with invalid index --- pkg/controller/job/indexed_job_utils.go | 16 ++++-- pkg/controller/job/indexed_job_utils_test.go | 56 +++++++++++++++----- pkg/controller/job/job_controller.go | 11 ++-- pkg/controller/job/job_controller_test.go | 39 ++++++++++++-- 4 files changed, 98 insertions(+), 24 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 9f9a09ac51b..97b4337799a 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -40,7 +40,7 @@ func isIndexedJob(job *batch.Job) bool { // calculateSucceededIndexes returns a string representation of the list of // succeeded indexes in compressed format and the number of succeeded indexes. -func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) { +func calculateSucceededIndexes(pods []*v1.Pod, completions int32) (string, int32) { sort.Sort(byCompletionIndex(pods)) var result strings.Builder var lastSucceeded int @@ -51,6 +51,9 @@ func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) { if ix == unknownCompletionIndex { continue } + if ix >= int(completions) { + break + } if p.Status.Phase == v1.PodSucceeded { if firstSucceeded == math.MinInt32 { firstSucceeded = ix @@ -126,19 +129,26 @@ func firstPendingIndexes(pods []*v1.Pod, count, completions int) []int { // is the number of repetitions. The pods to be removed are appended to `rm`, // while the remaining pods are appended to `left`. // All pods that don't have a completion index are appended to `rm`. -func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) { +// All pods with index not in valid range are appended to `rm`. +func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions int) ([]*v1.Pod, []*v1.Pod) { sort.Sort(byCompletionIndex(pods)) lastIndex := unknownCompletionIndex firstRepeatPos := 0 + countLooped := 0 for i, p := range pods { ix := getCompletionIndex(p.Annotations) + if ix >= completions { + rm = append(rm, pods[i:]...) + break + } if ix != lastIndex { rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex) firstRepeatPos = i lastIndex = ix } + countLooped += 1 } - return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:], lastIndex) + return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex) } func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) { diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index cdb79c03239..c525d2d0df0 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -28,12 +28,14 @@ const noIndex = "-" func TestCalculateSucceededIndexes(t *testing.T) { cases := map[string]struct { - pods []indexPhase - wantCount int32 + pods []indexPhase + wantCount int32 + completions int32 }{ "1": { - pods: []indexPhase{{"1", v1.PodSucceeded}}, - wantCount: 1, + pods: []indexPhase{{"1", v1.PodSucceeded}}, + wantCount: 1, + completions: 2, }, "5,10": { pods: []indexPhase{ @@ -43,7 +45,8 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"10", v1.PodFailed}, {"10", v1.PodSucceeded}, }, - wantCount: 2, + wantCount: 2, + completions: 11, }, "2,3,5-7": { pods: []indexPhase{ @@ -55,7 +58,8 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"6", v1.PodSucceeded}, {"7", v1.PodSucceeded}, }, - wantCount: 5, + wantCount: 5, + completions: 8, }, "0-2": { pods: []indexPhase{ @@ -66,7 +70,8 @@ func TestCalculateSucceededIndexes(t *testing.T) { {"2", v1.PodSucceeded}, {"3", v1.PodFailed}, }, - wantCount: 3, + wantCount: 3, + completions: 4, }, "0,2-5": { pods: []indexPhase{ @@ -79,13 +84,28 @@ func TestCalculateSucceededIndexes(t *testing.T) { {noIndex, v1.PodSucceeded}, {"-2", v1.PodSucceeded}, }, - wantCount: 5, + wantCount: 5, + completions: 6, + }, + "0-2,4": { + pods: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + {"3", v1.PodFailed}, + {"4", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {noIndex, v1.PodSucceeded}, + {"-2", v1.PodSucceeded}, + }, + wantCount: 4, + completions: 5, }, } for want, tc := range cases { t.Run(want, func(t *testing.T) { pods := hollowPodsWithIndexPhase(tc.pods) - gotStr, gotCnt := calculateSucceededIndexes(pods) + gotStr, gotCnt := calculateSucceededIndexes(pods, tc.completions) if diff := cmp.Diff(want, gotStr); diff != "" { t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) } @@ -162,23 +182,27 @@ func TestFirstPendingIndexes(t *testing.T) { func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { cases := map[string]struct { - pods []indexPhase - wantRm []indexPhase - wantLeft []indexPhase + pods []indexPhase + wantRm []indexPhase + wantLeft []indexPhase + completions int32 }{ "all unique": { pods: []indexPhase{ {noIndex, v1.PodPending}, {"2", v1.PodPending}, {"5", v1.PodRunning}, + {"6", v1.PodRunning}, }, wantRm: []indexPhase{ {noIndex, v1.PodPending}, + {"6", v1.PodRunning}, }, wantLeft: []indexPhase{ {"2", v1.PodPending}, {"5", v1.PodRunning}, }, + completions: 6, }, "all with index": { pods: []indexPhase{ @@ -188,17 +212,22 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { {"0", v1.PodRunning}, {"3", v1.PodRunning}, {"0", v1.PodPending}, + {"6", v1.PodRunning}, + {"6", v1.PodPending}, }, wantRm: []indexPhase{ {"0", v1.PodPending}, {"0", v1.PodRunning}, {"3", v1.PodPending}, + {"6", v1.PodRunning}, + {"6", v1.PodPending}, }, wantLeft: []indexPhase{ {"0", v1.PodRunning}, {"3", v1.PodRunning}, {"5", v1.PodPending}, }, + completions: 6, }, "mixed": { pods: []indexPhase{ @@ -221,12 +250,13 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { {"0", v1.PodPending}, {"1", v1.PodRunning}, }, + completions: 6, }, } for name, tc := range cases { t.Run(name, func(t *testing.T) { pods := hollowPodsWithIndexPhase(tc.pods) - rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods) + rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods, int(tc.completions)) rmInt := toIndexPhases(rm) leftInt := toIndexPhases(left) if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 858f75d11a5..7ab29c83b14 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -531,7 +531,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { var succeededIndexes string if isIndexedJob(&job) { - succeededIndexes, succeeded = calculateSucceededIndexes(pods) + succeededIndexes, succeeded = calculateSucceededIndexes(pods, *job.Spec.Completions) } jobConditionsChanged := false manageJobCalled := false @@ -810,7 +810,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded wait := sync.WaitGroup{} var indexesToAdd []int - if job.Spec.Completions != nil && isIndexedJob(job) { + if isIndexedJob(job) { indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions)) diff = int32(len(indexesToAdd)) } @@ -889,7 +889,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded // activePodsForRemoval returns Pods that should be removed because there // are too many pods running or, if this is an indexed job, there are repeated -// indexes or some pods don't have indexes. +// indexes or invalid indexes or some pods don't have indexes. // Sorts candidate pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. @@ -899,7 +899,7 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P if isIndexedJob(job) { rm = make([]*v1.Pod, 0, rmAtLeast) left = make([]*v1.Pod, 0, len(pods)-rmAtLeast) - rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods) + rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods, int(*job.Spec.Completions)) } else { left = pods } @@ -953,7 +953,8 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int { result := 0 for _, p := range pods { - if phase == p.Status.Phase && (!isIndexedJob(job) || getCompletionIndex(p.Annotations) != unknownCompletionIndex) { + idx := getCompletionIndex(p.Annotations) + if phase == p.Status.Phase && (!isIndexedJob(job) || (idx != unknownCompletionIndex && idx < int(*job.Spec.Completions))) { result++ } } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 746e766b075..e153262f1cc 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -526,14 +526,17 @@ func TestControllerSyncJob(t *testing.T) { podsWithIndexes: []indexPhase{ {"invalid", v1.PodRunning}, {"invalid", v1.PodSucceeded}, + {"invalid", v1.PodFailed}, + {"invalid", v1.PodPending}, {"0", v1.PodSucceeded}, {"1", v1.PodRunning}, {"2", v1.PodRunning}, }, jobKeyForget: true, - expectedDeletions: 2, + expectedDeletions: 3, expectedActive: 2, expectedSucceeded: 1, + expectedFailed: 0, expectedCompletedIdxs: "0", indexedJobEnabled: true, }, @@ -560,6 +563,28 @@ func TestControllerSyncJob(t *testing.T) { expectedCreatedIndexes: sets.NewInt(3, 4), indexedJobEnabled: true, }, + "indexed job with indexes outside of range": { + parallelism: 2, + completions: 5, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"5", v1.PodRunning}, + {"6", v1.PodSucceeded}, + {"7", v1.PodPending}, + {"8", v1.PodFailed}, + }, + jobKeyForget: true, + expectedCreations: 2, + expectedSucceeded: 1, + expectedDeletions: 2, + expectedCompletedIdxs: "0", + expectedCreatedIndexes: sets.NewInt(1, 2), + expectedActive: 2, + expectedFailed: 0, + indexedJobEnabled: true, + }, "indexed job feature disabled": { parallelism: 2, completions: 3, @@ -691,7 +716,7 @@ func TestControllerSyncJob(t *testing.T) { if err == nil { t.Error("Syncing jobs expected to return error on podControl exception") } - } else if tc.expectedCondition == nil && (hasFailingPods(tc.podsWithIndexes) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { + } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { if err == nil { t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") } @@ -2168,8 +2193,16 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { } } -func hasFailingPods(status []indexPhase) bool { +// hasValidFailingPods checks if there exists failed pods with valid index. +func hasValidFailingPods(status []indexPhase, completions int) bool { for _, s := range status { + ix, err := strconv.Atoi(s.Index) + if err != nil { + continue + } + if ix < 0 || ix >= completions { + continue + } if s.Phase == v1.PodFailed { return true }