Merge pull request #101163 from AliceZhang2016/indexed_job_pod_adoption

indexed job: remove pods with invalid index
This commit is contained in:
Kubernetes Prow Robot 2021-05-04 18:21:14 -07:00 committed by GitHub
commit 4f1bfe314e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 98 additions and 24 deletions

View File

@ -40,7 +40,7 @@ func isIndexedJob(job *batch.Job) bool {
// calculateSucceededIndexes returns a string representation of the list of // calculateSucceededIndexes returns a string representation of the list of
// succeeded indexes in compressed format and the number of succeeded indexes. // succeeded indexes in compressed format and the number of succeeded indexes.
func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) { func calculateSucceededIndexes(pods []*v1.Pod, completions int32) (string, int32) {
sort.Sort(byCompletionIndex(pods)) sort.Sort(byCompletionIndex(pods))
var result strings.Builder var result strings.Builder
var lastSucceeded int var lastSucceeded int
@ -51,6 +51,9 @@ func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) {
if ix == unknownCompletionIndex { if ix == unknownCompletionIndex {
continue continue
} }
if ix >= int(completions) {
break
}
if p.Status.Phase == v1.PodSucceeded { if p.Status.Phase == v1.PodSucceeded {
if firstSucceeded == math.MinInt32 { if firstSucceeded == math.MinInt32 {
firstSucceeded = ix firstSucceeded = ix
@ -126,19 +129,26 @@ func firstPendingIndexes(pods []*v1.Pod, count, completions int) []int {
// is the number of repetitions. The pods to be removed are appended to `rm`, // is the number of repetitions. The pods to be removed are appended to `rm`,
// while the remaining pods are appended to `left`. // while the remaining pods are appended to `left`.
// All pods that don't have a completion index are appended to `rm`. // All pods that don't have a completion index are appended to `rm`.
func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) { // All pods with index not in valid range are appended to `rm`.
func appendDuplicatedIndexPodsForRemoval(rm, left, pods []*v1.Pod, completions int) ([]*v1.Pod, []*v1.Pod) {
sort.Sort(byCompletionIndex(pods)) sort.Sort(byCompletionIndex(pods))
lastIndex := unknownCompletionIndex lastIndex := unknownCompletionIndex
firstRepeatPos := 0 firstRepeatPos := 0
countLooped := 0
for i, p := range pods { for i, p := range pods {
ix := getCompletionIndex(p.Annotations) ix := getCompletionIndex(p.Annotations)
if ix >= completions {
rm = append(rm, pods[i:]...)
break
}
if ix != lastIndex { if ix != lastIndex {
rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex) rm, left = appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:i], lastIndex)
firstRepeatPos = i firstRepeatPos = i
lastIndex = ix lastIndex = ix
} }
countLooped += 1
} }
return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:], lastIndex) return appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods[firstRepeatPos:countLooped], lastIndex)
} }
func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) { func appendPodsWithSameIndexForRemovalAndRemaining(rm, left, pods []*v1.Pod, ix int) ([]*v1.Pod, []*v1.Pod) {

View File

@ -28,12 +28,14 @@ const noIndex = "-"
func TestCalculateSucceededIndexes(t *testing.T) { func TestCalculateSucceededIndexes(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
pods []indexPhase pods []indexPhase
wantCount int32 wantCount int32
completions int32
}{ }{
"1": { "1": {
pods: []indexPhase{{"1", v1.PodSucceeded}}, pods: []indexPhase{{"1", v1.PodSucceeded}},
wantCount: 1, wantCount: 1,
completions: 2,
}, },
"5,10": { "5,10": {
pods: []indexPhase{ pods: []indexPhase{
@ -43,7 +45,8 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"10", v1.PodFailed}, {"10", v1.PodFailed},
{"10", v1.PodSucceeded}, {"10", v1.PodSucceeded},
}, },
wantCount: 2, wantCount: 2,
completions: 11,
}, },
"2,3,5-7": { "2,3,5-7": {
pods: []indexPhase{ pods: []indexPhase{
@ -55,7 +58,8 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"6", v1.PodSucceeded}, {"6", v1.PodSucceeded},
{"7", v1.PodSucceeded}, {"7", v1.PodSucceeded},
}, },
wantCount: 5, wantCount: 5,
completions: 8,
}, },
"0-2": { "0-2": {
pods: []indexPhase{ pods: []indexPhase{
@ -66,7 +70,8 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"2", v1.PodSucceeded}, {"2", v1.PodSucceeded},
{"3", v1.PodFailed}, {"3", v1.PodFailed},
}, },
wantCount: 3, wantCount: 3,
completions: 4,
}, },
"0,2-5": { "0,2-5": {
pods: []indexPhase{ pods: []indexPhase{
@ -79,13 +84,28 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{noIndex, v1.PodSucceeded}, {noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded}, {"-2", v1.PodSucceeded},
}, },
wantCount: 5, wantCount: 5,
completions: 6,
},
"0-2,4": {
pods: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodSucceeded},
{"2", v1.PodSucceeded},
{"3", v1.PodFailed},
{"4", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded},
},
wantCount: 4,
completions: 5,
}, },
} }
for want, tc := range cases { for want, tc := range cases {
t.Run(want, func(t *testing.T) { t.Run(want, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods) pods := hollowPodsWithIndexPhase(tc.pods)
gotStr, gotCnt := calculateSucceededIndexes(pods) gotStr, gotCnt := calculateSucceededIndexes(pods, tc.completions)
if diff := cmp.Diff(want, gotStr); diff != "" { if diff := cmp.Diff(want, gotStr); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
} }
@ -162,23 +182,27 @@ func TestFirstPendingIndexes(t *testing.T) {
func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) { func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
pods []indexPhase pods []indexPhase
wantRm []indexPhase wantRm []indexPhase
wantLeft []indexPhase wantLeft []indexPhase
completions int32
}{ }{
"all unique": { "all unique": {
pods: []indexPhase{ pods: []indexPhase{
{noIndex, v1.PodPending}, {noIndex, v1.PodPending},
{"2", v1.PodPending}, {"2", v1.PodPending},
{"5", v1.PodRunning}, {"5", v1.PodRunning},
{"6", v1.PodRunning},
}, },
wantRm: []indexPhase{ wantRm: []indexPhase{
{noIndex, v1.PodPending}, {noIndex, v1.PodPending},
{"6", v1.PodRunning},
}, },
wantLeft: []indexPhase{ wantLeft: []indexPhase{
{"2", v1.PodPending}, {"2", v1.PodPending},
{"5", v1.PodRunning}, {"5", v1.PodRunning},
}, },
completions: 6,
}, },
"all with index": { "all with index": {
pods: []indexPhase{ pods: []indexPhase{
@ -188,17 +212,22 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
{"0", v1.PodRunning}, {"0", v1.PodRunning},
{"3", v1.PodRunning}, {"3", v1.PodRunning},
{"0", v1.PodPending}, {"0", v1.PodPending},
{"6", v1.PodRunning},
{"6", v1.PodPending},
}, },
wantRm: []indexPhase{ wantRm: []indexPhase{
{"0", v1.PodPending}, {"0", v1.PodPending},
{"0", v1.PodRunning}, {"0", v1.PodRunning},
{"3", v1.PodPending}, {"3", v1.PodPending},
{"6", v1.PodRunning},
{"6", v1.PodPending},
}, },
wantLeft: []indexPhase{ wantLeft: []indexPhase{
{"0", v1.PodRunning}, {"0", v1.PodRunning},
{"3", v1.PodRunning}, {"3", v1.PodRunning},
{"5", v1.PodPending}, {"5", v1.PodPending},
}, },
completions: 6,
}, },
"mixed": { "mixed": {
pods: []indexPhase{ pods: []indexPhase{
@ -221,12 +250,13 @@ func TestAppendDuplicatedIndexPodsForRemoval(t *testing.T) {
{"0", v1.PodPending}, {"0", v1.PodPending},
{"1", v1.PodRunning}, {"1", v1.PodRunning},
}, },
completions: 6,
}, },
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods) pods := hollowPodsWithIndexPhase(tc.pods)
rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods) rm, left := appendDuplicatedIndexPodsForRemoval(nil, nil, pods, int(tc.completions))
rmInt := toIndexPhases(rm) rmInt := toIndexPhases(rm)
leftInt := toIndexPhases(left) leftInt := toIndexPhases(left)
if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" { if diff := cmp.Diff(tc.wantRm, rmInt); diff != "" {

View File

@ -531,7 +531,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
var succeededIndexes string var succeededIndexes string
if isIndexedJob(&job) { if isIndexedJob(&job) {
succeededIndexes, succeeded = calculateSucceededIndexes(pods) succeededIndexes, succeeded = calculateSucceededIndexes(pods, *job.Spec.Completions)
} }
jobConditionsChanged := false jobConditionsChanged := false
manageJobCalled := false manageJobCalled := false
@ -810,7 +810,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
wait := sync.WaitGroup{} wait := sync.WaitGroup{}
var indexesToAdd []int var indexesToAdd []int
if job.Spec.Completions != nil && isIndexedJob(job) { if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions)) indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions))
diff = int32(len(indexesToAdd)) diff = int32(len(indexesToAdd))
} }
@ -889,7 +889,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
// activePodsForRemoval returns Pods that should be removed because there // activePodsForRemoval returns Pods that should be removed because there
// are too many pods running or, if this is an indexed job, there are repeated // are too many pods running or, if this is an indexed job, there are repeated
// indexes or some pods don't have indexes. // indexes or invalid indexes or some pods don't have indexes.
// Sorts candidate pods in the order such that not-ready < ready, unscheduled // Sorts candidate pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods // < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible. // in the earlier stages whenever possible.
@ -899,7 +899,7 @@ func activePodsForRemoval(job *batch.Job, pods []*v1.Pod, rmAtLeast int) []*v1.P
if isIndexedJob(job) { if isIndexedJob(job) {
rm = make([]*v1.Pod, 0, rmAtLeast) rm = make([]*v1.Pod, 0, rmAtLeast)
left = make([]*v1.Pod, 0, len(pods)-rmAtLeast) left = make([]*v1.Pod, 0, len(pods)-rmAtLeast)
rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods) rm, left = appendDuplicatedIndexPodsForRemoval(rm, left, pods, int(*job.Spec.Completions))
} else { } else {
left = pods left = pods
} }
@ -953,7 +953,8 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int { func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int {
result := 0 result := 0
for _, p := range pods { for _, p := range pods {
if phase == p.Status.Phase && (!isIndexedJob(job) || getCompletionIndex(p.Annotations) != unknownCompletionIndex) { idx := getCompletionIndex(p.Annotations)
if phase == p.Status.Phase && (!isIndexedJob(job) || (idx != unknownCompletionIndex && idx < int(*job.Spec.Completions))) {
result++ result++
} }
} }

View File

@ -526,14 +526,17 @@ func TestControllerSyncJob(t *testing.T) {
podsWithIndexes: []indexPhase{ podsWithIndexes: []indexPhase{
{"invalid", v1.PodRunning}, {"invalid", v1.PodRunning},
{"invalid", v1.PodSucceeded}, {"invalid", v1.PodSucceeded},
{"invalid", v1.PodFailed},
{"invalid", v1.PodPending},
{"0", v1.PodSucceeded}, {"0", v1.PodSucceeded},
{"1", v1.PodRunning}, {"1", v1.PodRunning},
{"2", v1.PodRunning}, {"2", v1.PodRunning},
}, },
jobKeyForget: true, jobKeyForget: true,
expectedDeletions: 2, expectedDeletions: 3,
expectedActive: 2, expectedActive: 2,
expectedSucceeded: 1, expectedSucceeded: 1,
expectedFailed: 0,
expectedCompletedIdxs: "0", expectedCompletedIdxs: "0",
indexedJobEnabled: true, indexedJobEnabled: true,
}, },
@ -560,6 +563,28 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreatedIndexes: sets.NewInt(3, 4), expectedCreatedIndexes: sets.NewInt(3, 4),
indexedJobEnabled: true, indexedJobEnabled: true,
}, },
"indexed job with indexes outside of range": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"5", v1.PodRunning},
{"6", v1.PodSucceeded},
{"7", v1.PodPending},
{"8", v1.PodFailed},
},
jobKeyForget: true,
expectedCreations: 2,
expectedSucceeded: 1,
expectedDeletions: 2,
expectedCompletedIdxs: "0",
expectedCreatedIndexes: sets.NewInt(1, 2),
expectedActive: 2,
expectedFailed: 0,
indexedJobEnabled: true,
},
"indexed job feature disabled": { "indexed job feature disabled": {
parallelism: 2, parallelism: 2,
completions: 3, completions: 3,
@ -691,7 +716,7 @@ func TestControllerSyncJob(t *testing.T) {
if err == nil { if err == nil {
t.Error("Syncing jobs expected to return error on podControl exception") t.Error("Syncing jobs expected to return error on podControl exception")
} }
} else if tc.expectedCondition == nil && (hasFailingPods(tc.podsWithIndexes) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) { } else if tc.expectedCondition == nil && (hasValidFailingPods(tc.podsWithIndexes, int(tc.completions)) || (tc.completionMode != batch.IndexedCompletion && tc.failedPods > 0)) {
if err == nil { if err == nil {
t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish") t.Error("Syncing jobs expected to return error when there are new failed pods and Job didn't finish")
} }
@ -2168,8 +2193,16 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
} }
} }
func hasFailingPods(status []indexPhase) bool { // hasValidFailingPods checks if there exists failed pods with valid index.
func hasValidFailingPods(status []indexPhase, completions int) bool {
for _, s := range status { for _, s := range status {
ix, err := strconv.Atoi(s.Index)
if err != nil {
continue
}
if ix < 0 || ix >= completions {
continue
}
if s.Phase == v1.PodFailed { if s.Phase == v1.PodFailed {
return true return true
} }