implementation of PodReplacementPolicy kep in the job controller

This commit is contained in:
kannon92 2023-07-18 17:25:15 +00:00
parent 5766947ab8
commit 74fcf3e766
7 changed files with 379 additions and 40 deletions

View File

@ -958,12 +958,37 @@ func FilterActivePods(logger klog.Logger, pods []*v1.Pod) []*v1.Pod {
return result return result
} }
func FilterTerminatingPods(pods []*v1.Pod) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
if IsPodTerminating(p) {
result = append(result, p)
}
}
return result
}
func CountTerminatingPods(pods []*v1.Pod) int32 {
numberOfTerminatingPods := 0
for _, p := range pods {
if IsPodTerminating(p) {
numberOfTerminatingPods += 1
}
}
return int32(numberOfTerminatingPods)
}
func IsPodActive(p *v1.Pod) bool { func IsPodActive(p *v1.Pod) bool {
return v1.PodSucceeded != p.Status.Phase && return v1.PodSucceeded != p.Status.Phase &&
v1.PodFailed != p.Status.Phase && v1.PodFailed != p.Status.Phase &&
p.DeletionTimestamp == nil p.DeletionTimestamp == nil
} }
func IsPodTerminating(p *v1.Pod) bool {
return !podutil.IsPodTerminal(p) &&
p.DeletionTimestamp != nil
}
// FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods. // FilterActiveReplicaSets returns replica sets that have (or at least ought to have) pods.
func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet { func FilterActiveReplicaSets(replicaSets []*apps.ReplicaSet) []*apps.ReplicaSet {
activeFilter := func(rs *apps.ReplicaSet) bool { activeFilter := func(rs *apps.ReplicaSet) bool {

View File

@ -380,6 +380,31 @@ func TestDeletePodsAllowsMissing(t *testing.T) {
assert.True(t, apierrors.IsNotFound(err)) assert.True(t, apierrors.IsNotFound(err))
} }
func TestCountTerminatingPods(t *testing.T) {
now := metav1.Now()
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.
rc := newReplicationController(0)
podList := newPodList(nil, 7, v1.PodRunning, rc)
podList.Items[0].Status.Phase = v1.PodSucceeded
podList.Items[1].Status.Phase = v1.PodFailed
podList.Items[2].Status.Phase = v1.PodPending
podList.Items[2].SetDeletionTimestamp(&now)
podList.Items[3].Status.Phase = v1.PodRunning
podList.Items[3].SetDeletionTimestamp(&now)
var podPointers []*v1.Pod
for i := range podList.Items {
podPointers = append(podPointers, &podList.Items[i])
}
terminatingPods := CountTerminatingPods(podPointers)
assert.Equal(t, terminatingPods, int32(2))
terminatingList := FilterTerminatingPods(podPointers)
assert.Equal(t, len(terminatingList), int(2))
}
func TestActivePodFiltering(t *testing.T) { func TestActivePodFiltering(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
// This rc is not needed by the test, only the newPodList to give the pods labels/a namespace. // This rc is not needed by the test, only the newPodList to give the pods labels/a namespace.

View File

@ -243,6 +243,7 @@ func parseIndexesFromString(logger klog.Logger, indexesStr string, completions i
// firstPendingIndexes returns `count` indexes less than `completions` that are // firstPendingIndexes returns `count` indexes less than `completions` that are
// not covered by `activePods`, `succeededIndexes` or `failedIndexes`. // not covered by `activePods`, `succeededIndexes` or `failedIndexes`.
// In cases of PodReplacementPolicy as Failed we will include `terminatingPods` in this list.
func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int { func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
if count == 0 { if count == 0 {
return nil return nil
@ -250,6 +251,10 @@ func firstPendingIndexes(jobCtx *syncJobCtx, count, completions int) []int {
active := getIndexes(jobCtx.activePods) active := getIndexes(jobCtx.activePods)
result := make([]int, 0, count) result := make([]int, 0, count)
nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active)) nonPending := jobCtx.succeededIndexes.withOrderedIndexes(sets.List(active))
if onlyReplaceFailedPods(jobCtx.job) {
terminating := getIndexes(controller.FilterTerminatingPods(jobCtx.pods))
nonPending = nonPending.withOrderedIndexes(sets.List(terminating))
}
if jobCtx.failedIndexes != nil { if jobCtx.failedIndexes != nil {
nonPending = nonPending.merge(*jobCtx.failedIndexes) nonPending = nonPending.merge(*jobCtx.failedIndexes)
} }

View File

@ -758,6 +758,7 @@ func TestFirstPendingIndexes(t *testing.T) {
activePods: hollowPodsWithIndexPhase(tc.activePods), activePods: hollowPodsWithIndexPhase(tc.activePods),
succeededIndexes: tc.succeededIndexes, succeededIndexes: tc.succeededIndexes,
failedIndexes: tc.failedIndexes, failedIndexes: tc.failedIndexes,
job: newJob(1, 1, 1, batch.IndexedCompletion),
} }
got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions) got := firstPendingIndexes(jobCtx, tc.cnt, tc.completions)
if diff := cmp.Diff(tc.want, got); diff != "" { if diff := cmp.Diff(tc.want, got); diff != "" {

View File

@ -144,6 +144,7 @@ type syncJobCtx struct {
expectedRmFinalizers sets.Set[string] expectedRmFinalizers sets.Set[string]
uncounted *uncountedTerminatedPods uncounted *uncountedTerminatedPods
podsWithDelayedDeletionPerIndex map[int]*v1.Pod podsWithDelayedDeletionPerIndex map[int]*v1.Pod
terminating *int32
} }
// NewController creates a new Job controller that keeps the relevant pods // NewController creates a new Job controller that keeps the relevant pods
@ -783,11 +784,15 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if err != nil { if err != nil {
return err return err
} }
var terminating *int32
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) {
terminating = pointer.Int32(controller.CountTerminatingPods(pods))
}
jobCtx := &syncJobCtx{ jobCtx := &syncJobCtx{
job: &job, job: &job,
pods: pods, pods: pods,
activePods: controller.FilterActivePods(logger, pods), activePods: controller.FilterActivePods(logger, pods),
terminating: terminating,
uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods), uncounted: newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods),
expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key), expectedRmFinalizers: jm.finalizerExpectations.getExpectedUIDs(key),
} }
@ -919,6 +924,8 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready) needsStatusUpdate := suspendCondChanged || active != job.Status.Active || !pointer.Int32Equal(ready, job.Status.Ready)
job.Status.Active = active job.Status.Active = active
job.Status.Ready = ready job.Status.Ready = ready
job.Status.Terminating = jobCtx.terminating
needsStatusUpdate = needsStatusUpdate || !pointer.Int32Equal(job.Status.Terminating, jobCtx.terminating)
err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate) err = jm.trackJobStatusAndRemoveFinalizers(ctx, jobCtx, needsStatusUpdate)
if err != nil { if err != nil {
return fmt.Errorf("tracking status: %w", err) return fmt.Errorf("tracking status: %w", err)
@ -1453,6 +1460,17 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionPodsDeleted, err return active, metrics.JobSyncActionPodsDeleted, err
} }
var terminating int32 = 0
if onlyReplaceFailedPods(jobCtx.job) {
// For PodFailurePolicy specified but PodRecreationPolicy disabled
// we still need to count terminating pods for replica counts
// But we will not allow updates to status.
if jobCtx.terminating == nil {
terminating = controller.CountTerminatingPods(jobCtx.pods)
} else {
terminating = *jobCtx.terminating
}
}
wantActive := int32(0) wantActive := int32(0)
if job.Spec.Completions == nil { if job.Spec.Completions == nil {
// Job does not specify a number of completions. Therefore, number active // Job does not specify a number of completions. Therefore, number active
@ -1475,7 +1493,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
} }
} }
rmAtLeast := active - wantActive rmAtLeast := active + terminating - wantActive
if rmAtLeast < 0 { if rmAtLeast < 0 {
rmAtLeast = 0 rmAtLeast = 0
} }
@ -1495,7 +1513,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
return active, metrics.JobSyncActionPodsDeleted, err return active, metrics.JobSyncActionPodsDeleted, err
} }
if active < wantActive { if diff := wantActive - terminating - active; diff > 0 {
var remainingTime time.Duration var remainingTime time.Duration
if !hasBackoffLimitPerIndex(job) { if !hasBackoffLimitPerIndex(job) {
// we compute the global remaining time for pod creation when backoffLimitPerIndex is not used // we compute the global remaining time for pod creation when backoffLimitPerIndex is not used
@ -1505,7 +1523,6 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, jobCtx *syn
jm.enqueueSyncJobWithDelay(logger, job, remainingTime) jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil return 0, metrics.JobSyncActionPodsCreated, nil
} }
diff := wantActive - active
if diff > int32(MaxPodCreateDeletePerSync) { if diff > int32(MaxPodCreateDeletePerSync) {
diff = int32(MaxPodCreateDeletePerSync) diff = int32(MaxPodCreateDeletePerSync)
} }
@ -1797,6 +1814,9 @@ func isPodFailed(p *v1.Pod, job *batch.Job) bool {
if p.Status.Phase == v1.PodFailed { if p.Status.Phase == v1.PodFailed {
return true return true
} }
if onlyReplaceFailedPods(job) {
return p.Status.Phase == v1.PodFailed
}
// Count deleted Pods as failures to account for orphan Pods that // Count deleted Pods as failures to account for orphan Pods that
// never have a chance to reach the Failed phase. // never have a chance to reach the Failed phase.
return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded return p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded
@ -1849,3 +1869,13 @@ func countReadyPods(pods []*v1.Pod) int32 {
} }
return cnt return cnt
} }
// This checks if we should apply PodRecreationPolicy.
// PodRecreationPolicy controls when we recreate pods if they are marked as terminating
// Failed means that we recreate only once the pod has terminated.
func onlyReplaceFailedPods(job *batch.Job) bool {
if feature.DefaultFeatureGate.Enabled(features.JobPodReplacementPolicy) && *job.Spec.PodReplacementPolicy == batch.Failed {
return true
}
return feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && job.Spec.PodFailurePolicy != nil
}

View File

@ -167,7 +167,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
return pods return pods
} }
func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, readyPods int) { func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, activePods, succeededPods, failedPods, terminatingPods, readyPods int) {
for _, pod := range newPodList(pendingPods, v1.PodPending, job) { for _, pod := range newPodList(pendingPods, v1.PodPending, job) {
podIndexer.Add(pod) podIndexer.Add(pod)
} }
@ -190,6 +190,14 @@ func setPodsStatuses(podIndexer cache.Indexer, job *batch.Job, pendingPods, acti
for _, pod := range newPodList(failedPods, v1.PodFailed, job) { for _, pod := range newPodList(failedPods, v1.PodFailed, job) {
podIndexer.Add(pod) podIndexer.Add(pod)
} }
terminating := newPodList(terminatingPods, v1.PodRunning, job)
for _, p := range terminating {
now := metav1.Now()
p.DeletionTimestamp = &now
}
for _, pod := range terminating {
podIndexer.Add(pod)
}
} }
func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) { func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status []indexPhase) {
@ -234,17 +242,18 @@ func TestControllerSyncJob(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
// job setup // job setup
parallelism int32 parallelism int32
completions int32 completions int32
backoffLimit int32 backoffLimit int32
deleting bool deleting bool
podLimit int podLimit int
completionMode batch.CompletionMode completionMode batch.CompletionMode
wasSuspended bool wasSuspended bool
suspend bool suspend bool
initialStatus *jobInitialStatus podReplacementPolicy *batch.PodReplacementPolicy
backoffRecord *backoffRecord initialStatus *jobInitialStatus
controllerTime *time.Time backoffRecord *backoffRecord
controllerTime *time.Time
// pod setup // pod setup
@ -257,6 +266,7 @@ func TestControllerSyncJob(t *testing.T) {
readyPods int readyPods int
succeededPods int succeededPods int
failedPods int failedPods int
terminatingPods int
podsWithIndexes []indexPhase podsWithIndexes []indexPhase
fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations fakeExpectationAtCreation int32 // negative: ExpectDeletions, positive: ExpectCreations
@ -268,6 +278,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedSucceeded int32 expectedSucceeded int32
expectedCompletedIdxs string expectedCompletedIdxs string
expectedFailed int32 expectedFailed int32
expectedTerminating *int32
expectedCondition *batch.JobConditionType expectedCondition *batch.JobConditionType
expectedConditionStatus v1.ConditionStatus expectedConditionStatus v1.ConditionStatus
expectedConditionReason string expectedConditionReason string
@ -275,8 +286,9 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches int expectedPodPatches int
// features // features
jobReadyPodsEnabled bool jobReadyPodsEnabled bool
podIndexLabelDisabled bool podIndexLabelDisabled bool
jobPodReplacementPolicy bool
}{ }{
"job start": { "job start": {
parallelism: 2, parallelism: 2,
@ -335,6 +347,35 @@ func TestControllerSyncJob(t *testing.T) {
expectedSucceeded: 1, expectedSucceeded: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
}, },
"WQ job: recreate pods when failed": {
parallelism: 1,
completions: -1,
backoffLimit: 6,
activePods: 1,
failedPods: 1,
podReplacementPolicy: podReplacementPolicy(batch.Failed),
jobPodReplacementPolicy: true,
terminatingPods: 1,
expectedTerminating: pointer.Int32(1),
expectedPodPatches: 2,
expectedDeletions: 1,
expectedFailed: 1,
},
"WQ job: recreate pods when terminating or failed": {
parallelism: 1,
completions: -1,
backoffLimit: 6,
activePods: 1,
failedPods: 1,
podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed),
jobPodReplacementPolicy: true,
terminatingPods: 1,
expectedTerminating: pointer.Int32(1),
expectedActive: 1,
expectedPodPatches: 2,
expectedFailed: 2,
},
"too few active pods and active back-off": { "too few active pods and active back-off": {
parallelism: 1, parallelism: 1,
completions: 1, completions: 1,
@ -585,6 +626,33 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedCreatedIndexes: sets.New(0, 1), expectedCreatedIndexes: sets.New(0, 1),
}, },
"indexed job with some pods deleted, podRecreationPolicy Failed": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
expectedCreations: 1,
expectedActive: 1,
expectedCreatedIndexes: sets.New(0),
podReplacementPolicy: podReplacementPolicy(batch.Failed),
jobPodReplacementPolicy: true,
terminatingPods: 1,
expectedTerminating: pointer.Int32(1),
},
"indexed job with some pods deleted, podRecreationPolicy TerminatingOrFailed": {
parallelism: 2,
completions: 5,
backoffLimit: 6,
completionMode: batch.IndexedCompletion,
expectedCreations: 2,
expectedActive: 2,
expectedCreatedIndexes: sets.New(0, 1),
podReplacementPolicy: podReplacementPolicy(batch.TerminatingOrFailed),
jobPodReplacementPolicy: true,
terminatingPods: 1,
expectedTerminating: pointer.Int32(1),
expectedPodPatches: 1,
},
"indexed job completed": { "indexed job completed": {
parallelism: 2, parallelism: 2,
completions: 3, completions: 3,
@ -800,7 +868,7 @@ func TestControllerSyncJob(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)()
// job manager setup // job manager setup
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
@ -820,6 +888,9 @@ func TestControllerSyncJob(t *testing.T) {
// job & pods setup // job & pods setup
job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode) job := newJob(tc.parallelism, tc.completions, tc.backoffLimit, tc.completionMode)
job.Spec.Suspend = pointer.Bool(tc.suspend) job.Spec.Suspend = pointer.Bool(tc.suspend)
if tc.jobPodReplacementPolicy {
job.Spec.PodReplacementPolicy = tc.podReplacementPolicy
}
if tc.initialStatus != nil { if tc.initialStatus != nil {
startTime := metav1.Now() startTime := metav1.Now()
job.Status.StartTime = &startTime job.Status.StartTime = &startTime
@ -855,7 +926,7 @@ func TestControllerSyncJob(t *testing.T) {
} }
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.readyPods) setPodsStatuses(podIndexer, job, tc.pendingPods, tc.activePods, tc.succeededPods, tc.failedPods, tc.terminatingPods, tc.readyPods)
setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes) setPodsStatusesWithIndexes(podIndexer, job, tc.podsWithIndexes)
actual := job actual := job
@ -937,6 +1008,9 @@ func TestControllerSyncJob(t *testing.T) {
if actual.Status.Failed != tc.expectedFailed { if actual.Status.Failed != tc.expectedFailed {
t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed) t.Errorf("Unexpected number of failed pods. Expected %d, saw %d\n", tc.expectedFailed, actual.Status.Failed)
} }
if diff := cmp.Diff(tc.expectedTerminating, actual.Status.Terminating); diff != "" {
t.Errorf("Unexpected number of terminating pods (-want,+got): %s", diff)
}
if actual.Status.StartTime != nil && tc.suspend { if actual.Status.StartTime != nil && tc.suspend {
t.Error("Unexpected .status.startTime not nil when suspend is true") t.Error("Unexpected .status.startTime not nil when suspend is true")
} }
@ -1905,7 +1979,7 @@ func TestSyncJobPastDeadline(t *testing.T) {
job.Status.StartTime = &start job.Status.StartTime = &start
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer() podIndexer := sharedInformerFactory.Core().V1().Pods().Informer().GetIndexer()
setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0) setPodsStatuses(podIndexer, job, 0, tc.activePods, tc.succeededPods, tc.failedPods, 0, 0)
// run // run
err := manager.syncJob(context.TODO(), testutil.GetKey(job, t)) err := manager.syncJob(context.TODO(), testutil.GetKey(job, t))
@ -2175,12 +2249,14 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
testCases := map[string]struct { testCases := map[string]struct {
enableJobPodFailurePolicy bool enableJobPodFailurePolicy bool
enablePodDisruptionConditions bool enablePodDisruptionConditions bool
enableJobPodReplacementPolicy bool
job batch.Job job batch.Job
pods []v1.Pod pods []v1.Pod
wantConditions *[]batch.JobCondition wantConditions *[]batch.JobCondition
wantStatusFailed int32 wantStatusFailed int32
wantStatusActive int32 wantStatusActive int32
wantStatusSucceeded int32 wantStatusSucceeded int32
wantStatusTerminating *int32
}{ }{
"default handling for pod failure if the container matching the exit codes does not match the containerName restriction": { "default handling for pod failure if the container matching the exit codes does not match the containerName restriction": {
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
@ -3149,15 +3225,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}, },
}, },
}, },
wantConditions: &[]batch.JobCondition{
{
Type: batch.JobFailed,
Status: v1.ConditionTrue,
Reason: "BackoffLimitExceeded",
Message: "Job has reached the specified backoff limit",
},
},
wantStatusFailed: 1,
}, },
"terminating Pod not considered failed when PodDisruptionConditions is enabled": { "terminating Pod not considered failed when PodDisruptionConditions is enabled": {
enableJobPodFailurePolicy: true, enableJobPodFailurePolicy: true,
@ -3195,13 +3262,17 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
}, },
}, },
}, },
wantStatusActive: 1, // This is a replacement Pod: the terminating Pod is neither active nor failed.
}, },
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.enableJobPodReplacementPolicy)()
if tc.job.Spec.PodReplacementPolicy == nil {
tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed)
}
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
@ -3254,6 +3325,9 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
if actual.Status.Failed != tc.wantStatusFailed { if actual.Status.Failed != tc.wantStatusFailed {
t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed) t.Errorf("unexpected number of failed pods. Expected %d, saw %d\n", tc.wantStatusFailed, actual.Status.Failed)
} }
if pointer.Int32Deref(actual.Status.Terminating, 0) != pointer.Int32Deref(tc.wantStatusTerminating, 0) {
t.Errorf("unexpected number of terminating pods. Expected %d, saw %d\n", pointer.Int32Deref(tc.wantStatusTerminating, 0), pointer.Int32Deref(actual.Status.Terminating, 0))
}
}) })
} }
} }
@ -5135,6 +5209,10 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec, podIndexLabel
} }
} }
func podReplacementPolicy(m batch.PodReplacementPolicy) *batch.PodReplacementPolicy {
return &m
}
func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) { func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
t.Helper() t.Helper()
verifyEmptyQueue(ctx, t, jm) verifyEmptyQueue(ctx, t, jm)

View File

@ -1667,6 +1667,159 @@ func TestIndexedJob(t *testing.T) {
validateTerminatedPodsTrackingFinalizerMetric(t, 5) validateTerminatedPodsTrackingFinalizerMetric(t, 5)
} }
func TestJobPodReplacementPolicy(t *testing.T) {
const podCount int32 = 2
indexedCompletion := batchv1.IndexedCompletion
nonIndexedCompletion := batchv1.NonIndexedCompletion
var podReplacementPolicy = func(obj batchv1.PodReplacementPolicy) *batchv1.PodReplacementPolicy {
return &obj
}
jobSpecIndexedDefault := &batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(podCount),
Completions: pointer.Int32Ptr(podCount),
CompletionMode: &indexedCompletion,
}
cases := map[string]struct {
podReplacementPolicyEnabled bool
wantTerminating *int32
wantFailed int
wantActive int
jobSpec *batchv1.JobSpec
}{
"feature flag off, delete pods and verify no terminating status": {
jobSpec: jobSpecIndexedDefault,
wantActive: int(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods and verify terminating status": {
podReplacementPolicyEnabled: true,
jobSpec: jobSpecIndexedDefault,
wantTerminating: pointer.Int32(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods, verify terminating status and recreate upon terminating": {
podReplacementPolicyEnabled: true,
jobSpec: &batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(podCount),
Completions: pointer.Int32Ptr(podCount),
CompletionMode: &indexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
},
wantTerminating: pointer.Int32(podCount),
wantFailed: int(podCount),
},
"feature flag true, delete pods, verify terminating status and recreate once failed": {
podReplacementPolicyEnabled: true,
jobSpec: &batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(podCount),
Completions: pointer.Int32Ptr(podCount),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
},
wantTerminating: pointer.Int32(podCount),
},
"feature flag true with NonIndexedJob, delete pods, verify terminating status and recreate once failed": {
podReplacementPolicyEnabled: true,
jobSpec: &batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(podCount),
Completions: pointer.Int32Ptr(podCount),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
},
wantTerminating: pointer.Int32(podCount),
},
"feature flag false, podFailurePolicy enabled, delete pods, verify terminating status and recreate once failed": {
podReplacementPolicyEnabled: false,
jobSpec: &batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(podCount),
Completions: pointer.Int32Ptr(podCount),
CompletionMode: &nonIndexedCompletion,
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
PodFailurePolicy: &batchv1.PodFailurePolicy{
Rules: []batchv1.PodFailurePolicyRule{
{
Action: batchv1.PodFailurePolicyActionFailJob,
OnExitCodes: &batchv1.PodFailurePolicyOnExitCodesRequirement{
Operator: batchv1.PodFailurePolicyOnExitCodesOpIn,
Values: []int32{5},
},
},
},
},
},
wantActive: int(podCount),
},
}
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.jobSpec.PodFailurePolicy != nil)()
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: *tc.jobSpec,
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
// Wait for pods to start up.
err = wait.PollImmediate(5*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) {
job, err := jobClient.Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if job.Status.Active == int32(podCount) {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatalf("Error waiting for Job pods to become active: %v", err)
}
pods, errList := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{})
if errList != nil {
t.Fatalf("Failed to list pods: %v", errList)
}
updatePod(t, clientSet, pods.Items, func(pod *v1.Pod) {
pod.Finalizers = append(pod.Finalizers, "fake.example.com/blockDeletion")
})
err = clientSet.CoreV1().Pods(ns.Name).DeleteCollection(ctx,
metav1.DeleteOptions{},
metav1.ListOptions{
Limit: 1000,
})
if err != nil {
t.Fatalf("Failed to cleanup Pods: %v", err)
}
podsDelete, errList2 := clientSet.CoreV1().Pods(ns.Namespace).List(ctx, metav1.ListOptions{})
if errList != nil {
t.Fatalf("Failed to list pods: %v", errList2)
}
for _, val := range podsDelete.Items {
if val.DeletionTimestamp == nil {
t.Fatalf("Deletion not registered.")
}
}
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, podsByStatus{
Terminating: tc.wantTerminating,
Failed: tc.wantFailed,
Active: tc.wantActive,
Ready: pointer.Int32(0),
})
})
}
}
func TestElasticIndexedJob(t *testing.T) { func TestElasticIndexedJob(t *testing.T) {
const initialCompletions int32 = 3 const initialCompletions int32 = 3
type jobUpdate struct { type jobUpdate struct {
@ -2360,13 +2513,14 @@ func TestNodeSelectorUpdate(t *testing.T) {
} }
type podsByStatus struct { type podsByStatus struct {
Active int Active int
Ready *int32 Ready *int32
Failed int Failed int
Succeeded int Succeeded int
Terminating *int32
} }
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { func validateJobsPodsStatusOnly(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper() t.Helper()
var actualCounts podsByStatus var actualCounts podsByStatus
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
@ -2375,16 +2529,21 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
t.Fatalf("Failed to get updated Job: %v", err) t.Fatalf("Failed to get updated Job: %v", err)
} }
actualCounts = podsByStatus{ actualCounts = podsByStatus{
Active: int(updatedJob.Status.Active), Active: int(updatedJob.Status.Active),
Ready: updatedJob.Status.Ready, Ready: updatedJob.Status.Ready,
Succeeded: int(updatedJob.Status.Succeeded), Succeeded: int(updatedJob.Status.Succeeded),
Failed: int(updatedJob.Status.Failed), Failed: int(updatedJob.Status.Failed),
Terminating: updatedJob.Status.Terminating,
} }
return cmp.Equal(actualCounts, desired), nil return cmp.Equal(actualCounts, desired), nil
}); err != nil { }); err != nil {
diff := cmp.Diff(desired, actualCounts) diff := cmp.Diff(desired, actualCounts)
t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff) t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
} }
}
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
t.Helper()
validateJobsPodsStatusOnly(ctx, t, clientSet, jobObj, desired)
var active []*v1.Pod var active []*v1.Pod
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) { if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
@ -2627,6 +2786,22 @@ func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updat
return int(updated), nil return int(updated), nil
} }
func updatePod(t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
for _, val := range pods {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(context.TODO(), val.Name, metav1.GetOptions{})
if err != nil {
return err
}
updateFunc(newPod)
_, err = clientSet.CoreV1().Pods(val.Namespace).Update(context.TODO(), newPod, metav1.UpdateOptions{})
return err
}); err != nil {
t.Fatalf("Failed to update pod %s: %v", val.Name, err)
}
}
}
func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error { func setJobPhaseForIndex(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, phase v1.PodPhase, ix int) error {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {