Merge pull request #99865 from alculquicondor/index-job-success

Fix completed indexed job with repeated indexes
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 16:08:23 -08:00 committed by GitHub
commit 0aa2e543d4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 124 additions and 44 deletions

View File

@ -34,12 +34,13 @@ const (
unknownCompletionIndex = -1 unknownCompletionIndex = -1
) )
// calculateCompletedIndexesStr returns a string representation of the list // calculateSucceededIndexes returns a string representation of the list of
// of completed indexes in compressed format. // succeeded indexes in compressed format and the number of succeeded indexes.
func calculateCompletedIndexesStr(pods []*v1.Pod) string { func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) {
sort.Sort(byCompletionIndex(pods)) sort.Sort(byCompletionIndex(pods))
var result strings.Builder var result strings.Builder
var lastSucceeded int var lastSucceeded int
var count int32
firstSucceeded := math.MinInt32 firstSucceeded := math.MinInt32
for _, p := range pods { for _, p := range pods {
ix := getCompletionIndex(p.Annotations) ix := getCompletionIndex(p.Annotations)
@ -51,6 +52,7 @@ func calculateCompletedIndexesStr(pods []*v1.Pod) string {
firstSucceeded = ix firstSucceeded = ix
} else if ix > lastSucceeded+1 { } else if ix > lastSucceeded+1 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
count += int32(lastSucceeded - firstSucceeded + 1)
firstSucceeded = ix firstSucceeded = ix
} }
lastSucceeded = ix lastSucceeded = ix
@ -58,8 +60,9 @@ func calculateCompletedIndexesStr(pods []*v1.Pod) string {
} }
if firstSucceeded != math.MinInt32 { if firstSucceeded != math.MinInt32 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
count += int32(lastSucceeded - firstSucceeded + 1)
} }
return result.String() return result.String(), count
} }
func addSingleOrRangeStr(builder *strings.Builder, first, last int) { func addSingleOrRangeStr(builder *strings.Builder, first, last int) {

View File

@ -26,53 +26,72 @@ import (
const noIndex = "-" const noIndex = "-"
func TestCalculateCompletedIndexesStr(t *testing.T) { func TestCalculateSucceededIndexes(t *testing.T) {
cases := map[string][]indexPhase{ cases := map[string]struct {
pods []indexPhase
wantCount int32
}{
"1": { "1": {
{"1", v1.PodSucceeded}, pods: []indexPhase{{"1", v1.PodSucceeded}},
wantCount: 1,
}, },
"5,10": { "5,10": {
{"2", v1.PodFailed}, pods: []indexPhase{
{"5", v1.PodSucceeded}, {"2", v1.PodFailed},
{"5", v1.PodSucceeded}, {"5", v1.PodSucceeded},
{"10", v1.PodFailed}, {"5", v1.PodSucceeded},
{"10", v1.PodSucceeded}, {"10", v1.PodFailed},
{"10", v1.PodSucceeded},
},
wantCount: 2,
}, },
"2,3,5-7": { "2,3,5-7": {
{"0", v1.PodRunning}, pods: []indexPhase{
{"1", v1.PodPending}, {"0", v1.PodRunning},
{"2", v1.PodSucceeded}, {"1", v1.PodPending},
{"3", v1.PodSucceeded}, {"2", v1.PodSucceeded},
{"5", v1.PodSucceeded}, {"3", v1.PodSucceeded},
{"6", v1.PodSucceeded}, {"5", v1.PodSucceeded},
{"7", v1.PodSucceeded}, {"6", v1.PodSucceeded},
{"7", v1.PodSucceeded},
},
wantCount: 5,
}, },
"0-2": { "0-2": {
{"0", v1.PodSucceeded}, pods: []indexPhase{
{"1", v1.PodFailed}, {"0", v1.PodSucceeded},
{"1", v1.PodSucceeded}, {"1", v1.PodFailed},
{"2", v1.PodSucceeded}, {"1", v1.PodSucceeded},
{"2", v1.PodSucceeded}, {"2", v1.PodSucceeded},
{"3", v1.PodFailed}, {"2", v1.PodSucceeded},
{"3", v1.PodFailed},
},
wantCount: 3,
}, },
"0,2-5": { "0,2-5": {
{"0", v1.PodSucceeded}, pods: []indexPhase{
{"1", v1.PodFailed}, {"0", v1.PodSucceeded},
{"2", v1.PodSucceeded}, {"1", v1.PodFailed},
{"3", v1.PodSucceeded}, {"2", v1.PodSucceeded},
{"4", v1.PodSucceeded}, {"3", v1.PodSucceeded},
{"5", v1.PodSucceeded}, {"4", v1.PodSucceeded},
{noIndex, v1.PodSucceeded}, {"5", v1.PodSucceeded},
{"-2", v1.PodSucceeded}, {noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded},
},
wantCount: 5,
}, },
} }
for want, tc := range cases { for want, tc := range cases {
t.Run(want, func(t *testing.T) { t.Run(want, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc) pods := hollowPodsWithIndexPhase(tc.pods)
gotStr := calculateCompletedIndexesStr(pods) gotStr, gotCnt := calculateSucceededIndexes(pods)
if diff := cmp.Diff(want, gotStr); diff != "" { if diff := cmp.Diff(want, gotStr); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
} }
if gotCnt != tc.wantCount {
t.Errorf("Got number of completed indexes %d, want %d", gotCnt, tc.wantCount)
}
}) })
} }
} }

View File

@ -524,6 +524,10 @@ func (jm *Controller) syncJob(key string) (bool, error) {
failureMessage = "Job was active longer than specified deadline" failureMessage = "Job was active longer than specified deadline"
} }
var succeededIndexes string
if job.Spec.CompletionMode == batch.IndexedCompletion {
succeededIndexes, succeeded = calculateSucceededIndexes(pods)
}
jobConditionsChanged := false jobConditionsChanged := false
manageJobCalled := false manageJobCalled := false
if jobFailed { if jobFailed {
@ -622,7 +626,7 @@ func (jm *Controller) syncJob(key string) (bool, error) {
job.Status.Succeeded = succeeded job.Status.Succeeded = succeeded
job.Status.Failed = failed job.Status.Failed = failed
if job.Spec.CompletionMode == batch.IndexedCompletion { if job.Spec.CompletionMode == batch.IndexedCompletion {
job.Status.CompletedIndexes = calculateCompletedIndexesStr(pods) job.Status.CompletedIndexes = succeededIndexes
} }
if err := jm.updateHandler(&job); err != nil { if err := jm.updateHandler(&job); err != nil {

View File

@ -323,12 +323,14 @@ func TestControllerSyncJob(t *testing.T) {
expectedFailed: 1, expectedFailed: 1,
}, },
"job finish": { "job finish": {
parallelism: 2, parallelism: 2,
completions: 5, completions: 5,
backoffLimit: 6, backoffLimit: 6,
jobKeyForget: true, jobKeyForget: true,
succeededPods: 5, succeededPods: 5,
expectedSucceeded: 5, expectedSucceeded: 5,
expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue,
}, },
"WQ job finishing": { "WQ job finishing": {
parallelism: 2, parallelism: 2,
@ -424,6 +426,43 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreatedIndexes: sets.NewInt(0, 1), expectedCreatedIndexes: sets.NewInt(0, 1),
indexedJobEnabled: true, indexedJobEnabled: true,
}, },
"indexed job completed": {
parallelism: 2,
completions: 3,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: true,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodFailed},
{"1", v1.PodSucceeded},
{"2", v1.PodSucceeded},
},
expectedSucceeded: 3,
expectedFailed: 1,
expectedCompletedIdxs: "0-2",
expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue,
indexedJobEnabled: true,
},
"indexed job repeated completed index": {
parallelism: 2,
completions: 3,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
jobKeyForget: true,
podsWithIndexes: []indexPhase{
{"0", v1.PodSucceeded},
{"1", v1.PodSucceeded},
{"1", v1.PodSucceeded},
},
expectedCreations: 1,
expectedActive: 1,
expectedSucceeded: 2,
expectedCompletedIdxs: "0,1",
expectedCreatedIndexes: sets.NewInt(2),
indexedJobEnabled: true,
},
"indexed job some running and completed pods": { "indexed job some running and completed pods": {
parallelism: 8, parallelism: 8,
completions: 20, completions: 20,
@ -705,8 +744,14 @@ func TestControllerSyncJob(t *testing.T) {
t.Error("Missing .status.startTime") t.Error("Missing .status.startTime")
} }
// validate conditions // validate conditions
if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { if tc.expectedCondition != nil {
t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) {
t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions)
}
} else {
if cond := hasTrueCondition(actual); cond != nil {
t.Errorf("Got condition %s, want none", *cond)
}
} }
if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 {
t.Errorf("Unexpected conditions %v", actual.Status.Conditions) t.Errorf("Unexpected conditions %v", actual.Status.Conditions)
@ -902,6 +947,15 @@ func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.Co
return false return false
} }
func hasTrueCondition(job *batch.Job) *batch.JobConditionType {
for _, v := range job.Status.Conditions {
if v.Status == v1.ConditionTrue {
return &v.Type
}
}
return nil
}
func TestSyncPastDeadlineJobFinished(t *testing.T) { func TestSyncPastDeadlineJobFinished(t *testing.T) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)