diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 6fb11c99932..d964f2b8d9c 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -34,12 +34,13 @@ const ( unknownCompletionIndex = -1 ) -// calculateCompletedIndexesStr returns a string representation of the list -// of completed indexes in compressed format. -func calculateCompletedIndexesStr(pods []*v1.Pod) string { +// calculateSucceededIndexes returns a string representation of the list of +// succeeded indexes in compressed format and the number of succeeded indexes. +func calculateSucceededIndexes(pods []*v1.Pod) (string, int32) { sort.Sort(byCompletionIndex(pods)) var result strings.Builder var lastSucceeded int + var count int32 firstSucceeded := math.MinInt32 for _, p := range pods { ix := getCompletionIndex(p.Annotations) @@ -51,6 +52,7 @@ func calculateCompletedIndexesStr(pods []*v1.Pod) string { firstSucceeded = ix } else if ix > lastSucceeded+1 { addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) + count += int32(lastSucceeded - firstSucceeded + 1) firstSucceeded = ix } lastSucceeded = ix @@ -58,8 +60,9 @@ func calculateCompletedIndexesStr(pods []*v1.Pod) string { } if firstSucceeded != math.MinInt32 { addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded) + count += int32(lastSucceeded - firstSucceeded + 1) } - return result.String() + return result.String(), count } func addSingleOrRangeStr(builder *strings.Builder, first, last int) { diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index 47b883027e5..cdb79c03239 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -26,53 +26,72 @@ import ( const noIndex = "-" -func TestCalculateCompletedIndexesStr(t *testing.T) { - cases := map[string][]indexPhase{ +func TestCalculateSucceededIndexes(t *testing.T) { + cases := map[string]struct { + pods []indexPhase + wantCount int32 + }{ "1": { - {"1", v1.PodSucceeded}, + pods: []indexPhase{{"1", v1.PodSucceeded}}, + wantCount: 1, }, "5,10": { - {"2", v1.PodFailed}, - {"5", v1.PodSucceeded}, - {"5", v1.PodSucceeded}, - {"10", v1.PodFailed}, - {"10", v1.PodSucceeded}, + pods: []indexPhase{ + {"2", v1.PodFailed}, + {"5", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {"10", v1.PodFailed}, + {"10", v1.PodSucceeded}, + }, + wantCount: 2, }, "2,3,5-7": { - {"0", v1.PodRunning}, - {"1", v1.PodPending}, - {"2", v1.PodSucceeded}, - {"3", v1.PodSucceeded}, - {"5", v1.PodSucceeded}, - {"6", v1.PodSucceeded}, - {"7", v1.PodSucceeded}, + pods: []indexPhase{ + {"0", v1.PodRunning}, + {"1", v1.PodPending}, + {"2", v1.PodSucceeded}, + {"3", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {"6", v1.PodSucceeded}, + {"7", v1.PodSucceeded}, + }, + wantCount: 5, }, "0-2": { - {"0", v1.PodSucceeded}, - {"1", v1.PodFailed}, - {"1", v1.PodSucceeded}, - {"2", v1.PodSucceeded}, - {"2", v1.PodSucceeded}, - {"3", v1.PodFailed}, + pods: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"1", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + {"3", v1.PodFailed}, + }, + wantCount: 3, }, "0,2-5": { - {"0", v1.PodSucceeded}, - {"1", v1.PodFailed}, - {"2", v1.PodSucceeded}, - {"3", v1.PodSucceeded}, - {"4", v1.PodSucceeded}, - {"5", v1.PodSucceeded}, - {noIndex, v1.PodSucceeded}, - {"-2", v1.PodSucceeded}, + pods: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"2", v1.PodSucceeded}, + {"3", v1.PodSucceeded}, + {"4", v1.PodSucceeded}, + {"5", v1.PodSucceeded}, + {noIndex, v1.PodSucceeded}, + {"-2", v1.PodSucceeded}, + }, + wantCount: 5, }, } for want, tc := range cases { t.Run(want, func(t *testing.T) { - pods := hollowPodsWithIndexPhase(tc) - gotStr := calculateCompletedIndexesStr(pods) + pods := hollowPodsWithIndexPhase(tc.pods) + gotStr, gotCnt := calculateSucceededIndexes(pods) if diff := cmp.Diff(want, gotStr); diff != "" { t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) } + if gotCnt != tc.wantCount { + t.Errorf("Got number of completed indexes %d, want %d", gotCnt, tc.wantCount) + } }) } } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index d23cd770ea0..ba583de82a0 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -524,6 +524,10 @@ func (jm *Controller) syncJob(key string) (bool, error) { failureMessage = "Job was active longer than specified deadline" } + var succeededIndexes string + if job.Spec.CompletionMode == batch.IndexedCompletion { + succeededIndexes, succeeded = calculateSucceededIndexes(pods) + } jobConditionsChanged := false manageJobCalled := false if jobFailed { @@ -622,7 +626,7 @@ func (jm *Controller) syncJob(key string) (bool, error) { job.Status.Succeeded = succeeded job.Status.Failed = failed if job.Spec.CompletionMode == batch.IndexedCompletion { - job.Status.CompletedIndexes = calculateCompletedIndexesStr(pods) + job.Status.CompletedIndexes = succeededIndexes } if err := jm.updateHandler(&job); err != nil { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 16ca21c54d8..9d1c08d9b40 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -323,12 +323,14 @@ func TestControllerSyncJob(t *testing.T) { expectedFailed: 1, }, "job finish": { - parallelism: 2, - completions: 5, - backoffLimit: 6, - jobKeyForget: true, - succeededPods: 5, - expectedSucceeded: 5, + parallelism: 2, + completions: 5, + backoffLimit: 6, + jobKeyForget: true, + succeededPods: 5, + expectedSucceeded: 5, + expectedCondition: &jobConditionComplete, + expectedConditionStatus: v1.ConditionTrue, }, "WQ job finishing": { parallelism: 2, @@ -424,6 +426,43 @@ func TestControllerSyncJob(t *testing.T) { expectedCreatedIndexes: sets.NewInt(0, 1), indexedJobEnabled: true, }, + "indexed job completed": { + parallelism: 2, + completions: 3, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + jobKeyForget: true, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodFailed}, + {"1", v1.PodSucceeded}, + {"2", v1.PodSucceeded}, + }, + expectedSucceeded: 3, + expectedFailed: 1, + expectedCompletedIdxs: "0-2", + expectedCondition: &jobConditionComplete, + expectedConditionStatus: v1.ConditionTrue, + indexedJobEnabled: true, + }, + "indexed job repeated completed index": { + parallelism: 2, + completions: 3, + backoffLimit: 6, + completionMode: batch.IndexedCompletion, + jobKeyForget: true, + podsWithIndexes: []indexPhase{ + {"0", v1.PodSucceeded}, + {"1", v1.PodSucceeded}, + {"1", v1.PodSucceeded}, + }, + expectedCreations: 1, + expectedActive: 1, + expectedSucceeded: 2, + expectedCompletedIdxs: "0,1", + expectedCreatedIndexes: sets.NewInt(2), + indexedJobEnabled: true, + }, "indexed job some running and completed pods": { parallelism: 8, completions: 20, @@ -705,8 +744,14 @@ func TestControllerSyncJob(t *testing.T) { t.Error("Missing .status.startTime") } // validate conditions - if tc.expectedCondition != nil && !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { - t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) + if tc.expectedCondition != nil { + if !getCondition(actual, *tc.expectedCondition, tc.expectedConditionStatus, tc.expectedConditionReason) { + t.Errorf("Expected completion condition. Got %#v", actual.Status.Conditions) + } + } else { + if cond := hasTrueCondition(actual); cond != nil { + t.Errorf("Got condition %s, want none", *cond) + } } if tc.expectedCondition == nil && tc.suspend && len(actual.Status.Conditions) != 0 { t.Errorf("Unexpected conditions %v", actual.Status.Conditions) @@ -902,6 +947,15 @@ func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.Co return false } +func hasTrueCondition(job *batch.Job) *batch.JobConditionType { + for _, v := range job.Status.Conditions { + if v.Status == v1.ConditionTrue { + return &v.Type + } + } + return nil +} + func TestSyncPastDeadlineJobFinished(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)