address comments

This commit is contained in:
Daniel Vega-Myhre 2023-02-15 18:26:46 +00:00
parent d41302312e
commit b0b0959b92

View File

@ -1036,10 +1036,12 @@ func TestElasticIndexedJob(t *testing.T) {
) )
type jobUpdate struct { type jobUpdate struct {
completions int completions int
succeededIndexes []int succeedIndexes []int
failedIndexes []int failIndexes []int
wantSucceededIndexes string wantSucceededIndexes string
wantFailed int wantFailed int
wantRemainingIndexes sets.Int
wantActivePods int
} }
cases := map[string]struct { cases := map[string]struct {
featureGate bool featureGate bool
@ -1064,7 +1066,7 @@ func TestElasticIndexedJob(t *testing.T) {
{ {
// Scale up completions 3->4 then succeed indexes 0-3 // Scale up completions 3->4 then succeed indexes 0-3
completions: 4, completions: 4,
succeededIndexes: []int{0, 1, 2, 3}, succeedIndexes: []int{0, 1, 2, 3},
wantSucceededIndexes: "0-3", wantSucceededIndexes: "0-3",
}, },
}, },
@ -1075,16 +1077,18 @@ func TestElasticIndexedJob(t *testing.T) {
// First succeed index 1 and fail index 2 while completions is still original value (3). // First succeed index 1 and fail index 2 while completions is still original value (3).
{ {
completions: noCompletionsUpdate, completions: noCompletionsUpdate,
succeededIndexes: []int{1}, succeedIndexes: []int{1},
failedIndexes: []int{2}, failIndexes: []int{2},
wantSucceededIndexes: "1", wantSucceededIndexes: "1",
wantFailed: 1, wantFailed: 1,
wantRemainingIndexes: sets.NewInt(0, 2),
wantActivePods: 2,
}, },
// Scale down completions 3->1, verify prev failure out of range still counts // Scale down completions 3->1, verify prev failure out of range still counts
// but succeeded out of range does not. // but succeeded out of range does not.
{ {
completions: 1, completions: 1,
succeededIndexes: []int{0}, succeedIndexes: []int{0},
wantSucceededIndexes: "0", wantSucceededIndexes: "0",
wantFailed: 1, wantFailed: 1,
}, },
@ -1096,17 +1100,21 @@ func TestElasticIndexedJob(t *testing.T) {
// First succeed index 2 while completions is still original value (3). // First succeed index 2 while completions is still original value (3).
{ {
completions: noCompletionsUpdate, completions: noCompletionsUpdate,
succeededIndexes: []int{2}, succeedIndexes: []int{2},
wantSucceededIndexes: "2", wantSucceededIndexes: "2",
wantRemainingIndexes: sets.NewInt(0, 1),
wantActivePods: 2,
}, },
// Scale completions down 3->2 to exclude previously succeeded index. // Scale completions down 3->2 to exclude previously succeeded index.
{ {
completions: 2, completions: 2,
wantRemainingIndexes: sets.NewInt(0, 1),
wantActivePods: 2,
}, },
// Scale completions back up to include previously succeeded index that was temporarily out of range. // Scale completions back up to include previously succeeded index that was temporarily out of range.
{ {
completions: 3, completions: 3,
succeededIndexes: []int{0, 1, 2}, succeedIndexes: []int{0, 1, 2},
wantSucceededIndexes: "0-2", wantSucceededIndexes: "0-2",
}, },
}, },
@ -1158,8 +1166,6 @@ func TestElasticIndexedJob(t *testing.T) {
t.Fatalf("Error waiting for Job pods to become active: %v", err) t.Fatalf("Error waiting for Job pods to become active: %v", err)
} }
currentCompletions := initialCompletions
for _, update := range tc.jobUpdates { for _, update := range tc.jobUpdates {
// Update Job spec if necessary. // Update Job spec if necessary.
if update.completions != noCompletionsUpdate { if update.completions != noCompletionsUpdate {
@ -1176,42 +1182,33 @@ func TestElasticIndexedJob(t *testing.T) {
} }
return return
} }
currentCompletions = update.completions
} }
// Succeed specified indexes. // Succeed specified indexes.
succeededSet := sets.NewInt() if update.succeedIndexes != nil {
if update.succeededIndexes != nil { for _, idx := range update.succeedIndexes {
for _, idx := range update.succeededIndexes {
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil { if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, idx); err != nil {
t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err) t.Fatalf("Failed trying to succeed pod with index %d: %v", idx, err)
} }
succeededSet.Insert(idx)
} }
} }
// Fail specified indexes. // Fail specified indexes.
if update.failedIndexes != nil { if update.failIndexes != nil {
for _, idx := range update.failedIndexes { for _, idx := range update.failIndexes {
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil { if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, idx); err != nil {
t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err) t.Fatalf("Failed trying to fail pod with index %d: %v", idx, err)
} }
} }
} }
remainingIndexes := remainingIndexSet(currentCompletions, succeededSet)
newActive := len(remainingIndexes)
// initialCompletions == initial parallelism, and active must be <= parallelism.
if newActive > currentCompletions && currentCompletions != noCompletionsUpdate {
newActive = currentCompletions
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: newActive, Active: update.wantActivePods,
Succeeded: len(succeededSet), Succeeded: len(update.succeedIndexes),
Failed: update.wantFailed, Failed: update.wantFailed,
Ready: pointer.Int32(0), Ready: pointer.Int32(0),
}) })
validateIndexedJobPods(ctx, t, clientSet, jobObj, remainingIndexes, update.wantSucceededIndexes) validateIndexedJobPods(ctx, t, clientSet, jobObj, update.wantRemainingIndexes, update.wantSucceededIndexes)
} }
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
@ -2077,13 +2074,3 @@ func updateJob(ctx context.Context, jobClient typedv1.JobInterface, jobName stri
}) })
return job, err return job, err
} }
func remainingIndexSet(completions int, exclude sets.Int) sets.Int {
s := sets.NewInt()
for i := 0; i < completions; i++ {
if !exclude.Has(i) {
s.Insert(i)
}
}
return s
}