locking feature-gate for ready pods job status

This commit is contained in:
Anton Stuchinskii 2023-10-17 18:11:41 +02:00
parent 06092ecf4b
commit 34294cd67f
3 changed files with 176 additions and 196 deletions

View File

@ -291,7 +291,6 @@ func TestControllerSyncJob(t *testing.T) {
expectedPodPatches int expectedPodPatches int
// features // features
jobReadyPodsEnabled bool
podIndexLabelDisabled bool podIndexLabelDisabled bool
jobPodReplacementPolicy bool jobPodReplacementPolicy bool
}{ }{
@ -301,6 +300,7 @@ func TestControllerSyncJob(t *testing.T) {
backoffLimit: 6, backoffLimit: 6,
expectedCreations: 2, expectedCreations: 2,
expectedActive: 2, expectedActive: 2,
expectedReady: ptr.To[int32](0),
}, },
"WQ job start": { "WQ job start": {
parallelism: 2, parallelism: 2,
@ -308,6 +308,7 @@ func TestControllerSyncJob(t *testing.T) {
backoffLimit: 6, backoffLimit: 6,
expectedCreations: 2, expectedCreations: 2,
expectedActive: 2, expectedActive: 2,
expectedReady: ptr.To[int32](0),
}, },
"pending pods": { "pending pods": {
parallelism: 2, parallelism: 2,
@ -315,6 +316,7 @@ func TestControllerSyncJob(t *testing.T) {
backoffLimit: 6, backoffLimit: 6,
pendingPods: 2, pendingPods: 2,
expectedActive: 2, expectedActive: 2,
expectedReady: ptr.To[int32](0),
}, },
"correct # of pods": { "correct # of pods": {
parallelism: 3, parallelism: 3,
@ -323,16 +325,7 @@ func TestControllerSyncJob(t *testing.T) {
activePods: 3, activePods: 3,
readyPods: 2, readyPods: 2,
expectedActive: 3, expectedActive: 3,
}, expectedReady: ptr.To[int32](2),
"correct # of pods, ready enabled": {
parallelism: 3,
completions: 5,
backoffLimit: 6,
activePods: 3,
readyPods: 2,
expectedActive: 3,
expectedReady: ptr.To[int32](2),
jobReadyPodsEnabled: true,
}, },
"WQ job: correct # of pods": { "WQ job: correct # of pods": {
parallelism: 2, parallelism: 2,
@ -340,6 +333,7 @@ func TestControllerSyncJob(t *testing.T) {
backoffLimit: 6, backoffLimit: 6,
activePods: 2, activePods: 2,
expectedActive: 2, expectedActive: 2,
expectedReady: ptr.To[int32](0),
}, },
"too few active pods": { "too few active pods": {
parallelism: 2, parallelism: 2,
@ -351,6 +345,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedSucceeded: 1, expectedSucceeded: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"WQ job: recreate pods when failed": { "WQ job: recreate pods when failed": {
parallelism: 1, parallelism: 1,
@ -362,6 +357,7 @@ func TestControllerSyncJob(t *testing.T) {
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
terminatingPods: 1, terminatingPods: 1,
expectedTerminating: ptr.To[int32](1), expectedTerminating: ptr.To[int32](1),
expectedReady: ptr.To[int32](0),
// Removes finalizer and deletes one failed pod // Removes finalizer and deletes one failed pod
expectedPodPatches: 1, expectedPodPatches: 1,
expectedFailed: 1, expectedFailed: 1,
@ -375,6 +371,7 @@ func TestControllerSyncJob(t *testing.T) {
failedPods: 1, failedPods: 1,
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
expectedTerminating: ptr.To[int32](1), expectedTerminating: ptr.To[int32](1),
expectedReady: ptr.To[int32](0),
terminatingPods: 1, terminatingPods: 1,
expectedActive: 1, expectedActive: 1,
expectedPodPatches: 2, expectedPodPatches: 2,
@ -390,6 +387,7 @@ func TestControllerSyncJob(t *testing.T) {
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
terminatingPods: 1, terminatingPods: 1,
expectedTerminating: ptr.To[int32](1), expectedTerminating: ptr.To[int32](1),
expectedReady: ptr.To[int32](0),
expectedActive: 1, expectedActive: 1,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedFailed: 2, expectedFailed: 2,
@ -404,6 +402,7 @@ func TestControllerSyncJob(t *testing.T) {
podReplacementPolicy: podReplacementPolicy(batch.Failed), podReplacementPolicy: podReplacementPolicy(batch.Failed),
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
expectedTerminating: ptr.To[int32](4), expectedTerminating: ptr.To[int32](4),
expectedReady: ptr.To[int32](0),
expectedActive: 1, expectedActive: 1,
expectedDeletions: 1, expectedDeletions: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
@ -428,6 +427,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 0, expectedActive: 0,
expectedSucceeded: 0, expectedSucceeded: 0,
expectedPodPatches: 0, expectedPodPatches: 0,
expectedReady: ptr.To[int32](0),
controllerTime: &referenceTime, controllerTime: &referenceTime,
}, },
"too few active pods and no back-offs": { "too few active pods and no back-offs": {
@ -444,6 +444,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1, expectedActive: 1,
expectedSucceeded: 0, expectedSucceeded: 0,
expectedPodPatches: 0, expectedPodPatches: 0,
expectedReady: ptr.To[int32](0),
controllerTime: &referenceTime, controllerTime: &referenceTime,
}, },
"too few active pods with a dynamic job": { "too few active pods with a dynamic job": {
@ -453,6 +454,7 @@ func TestControllerSyncJob(t *testing.T) {
activePods: 1, activePods: 1,
expectedCreations: 1, expectedCreations: 1,
expectedActive: 2, expectedActive: 2,
expectedReady: ptr.To[int32](0),
}, },
"too few active pods, with controller error": { "too few active pods, with controller error": {
parallelism: 2, parallelism: 2,
@ -465,6 +467,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1, expectedActive: 1,
expectedSucceeded: 0, expectedSucceeded: 0,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"too many active pods": { "too many active pods": {
parallelism: 2, parallelism: 2,
@ -474,6 +477,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 1, expectedDeletions: 1,
expectedActive: 2, expectedActive: 2,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"too many active pods, with controller error": { "too many active pods, with controller error": {
parallelism: 2, parallelism: 2,
@ -484,6 +488,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 0, expectedDeletions: 0,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedActive: 3, expectedActive: 3,
expectedReady: ptr.To[int32](0),
}, },
"failed + succeed pods: reset backoff delay": { "failed + succeed pods: reset backoff delay": {
parallelism: 2, parallelism: 2,
@ -497,6 +502,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedSucceeded: 1, expectedSucceeded: 1,
expectedFailed: 1, expectedFailed: 1,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"new failed pod": { "new failed pod": {
parallelism: 2, parallelism: 2,
@ -508,6 +514,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedFailed: 1, expectedFailed: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"no new pod; possible finalizer update of failed pod": { "no new pod; possible finalizer update of failed pod": {
parallelism: 1, parallelism: 1,
@ -524,6 +531,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1, expectedActive: 1,
expectedFailed: 1, expectedFailed: 1,
expectedPodPatches: 0, expectedPodPatches: 0,
expectedReady: ptr.To[int32](0),
}, },
"only new failed pod with controller error": { "only new failed pod with controller error": {
parallelism: 2, parallelism: 2,
@ -536,6 +544,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1, expectedActive: 1,
expectedFailed: 0, expectedFailed: 0,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"job finish": { "job finish": {
parallelism: 2, parallelism: 2,
@ -546,6 +555,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionComplete, expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedPodPatches: 5, expectedPodPatches: 5,
expectedReady: ptr.To[int32](0),
}, },
"WQ job finishing": { "WQ job finishing": {
parallelism: 2, parallelism: 2,
@ -556,6 +566,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1, expectedActive: 1,
expectedSucceeded: 1, expectedSucceeded: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"WQ job all finished": { "WQ job all finished": {
parallelism: 2, parallelism: 2,
@ -566,6 +577,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionComplete, expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"WQ job all finished despite one failure": { "WQ job all finished despite one failure": {
parallelism: 2, parallelism: 2,
@ -578,6 +590,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionComplete, expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"more active pods than parallelism": { "more active pods than parallelism": {
parallelism: 2, parallelism: 2,
@ -587,6 +600,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 8, expectedDeletions: 8,
expectedActive: 2, expectedActive: 2,
expectedPodPatches: 8, expectedPodPatches: 8,
expectedReady: ptr.To[int32](0),
}, },
"more active pods than remaining completions": { "more active pods than remaining completions": {
parallelism: 3, parallelism: 3,
@ -598,6 +612,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedSucceeded: 2, expectedSucceeded: 2,
expectedPodPatches: 3, expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
}, },
"status change": { "status change": {
parallelism: 2, parallelism: 2,
@ -608,6 +623,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedSucceeded: 2, expectedSucceeded: 2,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"deleting job": { "deleting job": {
parallelism: 2, parallelism: 2,
@ -620,6 +636,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedSucceeded: 1, expectedSucceeded: 1,
expectedPodPatches: 3, expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
}, },
"limited pods": { "limited pods": {
parallelism: 100, parallelism: 100,
@ -628,6 +645,7 @@ func TestControllerSyncJob(t *testing.T) {
podLimit: 10, podLimit: 10,
expectedCreations: 10, expectedCreations: 10,
expectedActive: 10, expectedActive: 10,
expectedReady: ptr.To[int32](0),
}, },
"too many job failures": { "too many job failures": {
parallelism: 2, parallelism: 2,
@ -639,6 +657,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "BackoffLimitExceeded", expectedConditionReason: "BackoffLimitExceeded",
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"job failures, unsatisfied expectations": { "job failures, unsatisfied expectations": {
parallelism: 2, parallelism: 2,
@ -648,6 +667,7 @@ func TestControllerSyncJob(t *testing.T) {
fakeExpectationAtCreation: 1, fakeExpectationAtCreation: 1,
expectedFailed: 1, expectedFailed: 1,
expectedPodPatches: 1, expectedPodPatches: 1,
expectedReady: ptr.To[int32](0),
}, },
"indexed job start": { "indexed job start": {
parallelism: 2, parallelism: 2,
@ -657,6 +677,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreations: 2, expectedCreations: 2,
expectedActive: 2, expectedActive: 2,
expectedCreatedIndexes: sets.New(0, 1), expectedCreatedIndexes: sets.New(0, 1),
expectedReady: ptr.To[int32](0),
}, },
"indexed job with some pods deleted, podReplacementPolicy Failed": { "indexed job with some pods deleted, podReplacementPolicy Failed": {
parallelism: 2, parallelism: 2,
@ -670,6 +691,7 @@ func TestControllerSyncJob(t *testing.T) {
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
terminatingPods: 1, terminatingPods: 1,
expectedTerminating: ptr.To[int32](1), expectedTerminating: ptr.To[int32](1),
expectedReady: ptr.To[int32](0),
}, },
"indexed job with some pods deleted, podReplacementPolicy TerminatingOrFailed": { "indexed job with some pods deleted, podReplacementPolicy TerminatingOrFailed": {
parallelism: 2, parallelism: 2,
@ -683,6 +705,7 @@ func TestControllerSyncJob(t *testing.T) {
jobPodReplacementPolicy: true, jobPodReplacementPolicy: true,
terminatingPods: 1, terminatingPods: 1,
expectedTerminating: ptr.To[int32](1), expectedTerminating: ptr.To[int32](1),
expectedReady: ptr.To[int32](0),
expectedPodPatches: 1, expectedPodPatches: 1,
}, },
"indexed job completed": { "indexed job completed": {
@ -702,6 +725,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionComplete, expectedCondition: &jobConditionComplete,
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedPodPatches: 4, expectedPodPatches: 4,
expectedReady: ptr.To[int32](0),
}, },
"indexed job repeated completed index": { "indexed job repeated completed index": {
parallelism: 2, parallelism: 2,
@ -719,6 +743,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCompletedIdxs: "0,1", expectedCompletedIdxs: "0,1",
expectedCreatedIndexes: sets.New(2), expectedCreatedIndexes: sets.New(2),
expectedPodPatches: 3, expectedPodPatches: 3,
expectedReady: ptr.To[int32](0),
}, },
"indexed job some running and completed pods": { "indexed job some running and completed pods": {
parallelism: 8, parallelism: 8,
@ -741,6 +766,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCompletedIdxs: "2,4,5,7-9", expectedCompletedIdxs: "2,4,5,7-9",
expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13), expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13),
expectedPodPatches: 6, expectedPodPatches: 6,
expectedReady: ptr.To[int32](0),
}, },
"indexed job some failed pods": { "indexed job some failed pods": {
parallelism: 3, parallelism: 3,
@ -757,6 +783,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedFailed: 2, expectedFailed: 2,
expectedCreatedIndexes: sets.New(0, 2), expectedCreatedIndexes: sets.New(0, 2),
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"indexed job some pods without index": { "indexed job some pods without index": {
parallelism: 2, parallelism: 2,
@ -781,6 +808,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedFailed: 0, expectedFailed: 0,
expectedCompletedIdxs: "0", expectedCompletedIdxs: "0",
expectedPodPatches: 8, expectedPodPatches: 8,
expectedReady: ptr.To[int32](0),
}, },
"indexed job repeated indexes": { "indexed job repeated indexes": {
parallelism: 5, parallelism: 5,
@ -802,6 +830,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedSucceeded: 1, expectedSucceeded: 1,
expectedCompletedIdxs: "0", expectedCompletedIdxs: "0",
expectedPodPatches: 5, expectedPodPatches: 5,
expectedReady: ptr.To[int32](0),
}, },
"indexed job with indexes outside of range": { "indexed job with indexes outside of range": {
parallelism: 2, parallelism: 2,
@ -822,6 +851,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 0, expectedActive: 0,
expectedFailed: 0, expectedFailed: 0,
expectedPodPatches: 5, expectedPodPatches: 5,
expectedReady: ptr.To[int32](0),
}, },
"suspending a job with satisfied expectations": { "suspending a job with satisfied expectations": {
// Suspended Job should delete active pods when expectations are // Suspended Job should delete active pods when expectations are
@ -838,6 +868,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedConditionStatus: v1.ConditionTrue, expectedConditionStatus: v1.ConditionTrue,
expectedConditionReason: "JobSuspended", expectedConditionReason: "JobSuspended",
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"suspending a job with unsatisfied expectations": { "suspending a job with unsatisfied expectations": {
// Unlike the previous test, we expect the controller to NOT suspend the // Unlike the previous test, we expect the controller to NOT suspend the
@ -853,6 +884,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreations: 0, expectedCreations: 0,
expectedDeletions: 0, expectedDeletions: 0,
expectedActive: 3, expectedActive: 3,
expectedReady: ptr.To[int32](0),
}, },
"resuming a suspended job": { "resuming a suspended job": {
wasSuspended: true, wasSuspended: true,
@ -866,6 +898,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition: &jobConditionSuspended, expectedCondition: &jobConditionSuspended,
expectedConditionStatus: v1.ConditionFalse, expectedConditionStatus: v1.ConditionFalse,
expectedConditionReason: "JobResumed", expectedConditionReason: "JobResumed",
expectedReady: ptr.To[int32](0),
}, },
"suspending a deleted job": { "suspending a deleted job": {
// We would normally expect the active pods to be deleted (see a few test // We would normally expect the active pods to be deleted (see a few test
@ -882,6 +915,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedDeletions: 0, expectedDeletions: 0,
expectedActive: 2, expectedActive: 2,
expectedPodPatches: 2, expectedPodPatches: 2,
expectedReady: ptr.To[int32](0),
}, },
"indexed job with podIndexLabel feature disabled": { "indexed job with podIndexLabel feature disabled": {
parallelism: 2, parallelism: 2,
@ -892,13 +926,13 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 2, expectedActive: 2,
expectedCreatedIndexes: sets.New(0, 1), expectedCreatedIndexes: sets.New(0, 1),
podIndexLabelDisabled: true, podIndexLabelDisabled: true,
expectedReady: ptr.To[int32](0),
}, },
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t) logger, _ := ktesting.NewTestContext(t)
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodIndexLabel, !tc.podIndexLabelDisabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.jobPodReplacementPolicy)()
// job manager setup // job manager setup
@ -4751,28 +4785,20 @@ func TestJobBackoff(t *testing.T) {
newPod.ResourceVersion = "2" newPod.ResourceVersion = "2"
testCases := map[string]struct { testCases := map[string]struct {
requeues int requeues int
oldPodPhase v1.PodPhase oldPodPhase v1.PodPhase
phase v1.PodPhase phase v1.PodPhase
jobReadyPodsEnabled bool wantBackoff time.Duration
wantBackoff time.Duration
}{ }{
"failure": { "failure with pod updates batching": {
requeues: 0, requeues: 0,
phase: v1.PodFailed, phase: v1.PodFailed,
wantBackoff: syncJobBatchPeriod, wantBackoff: syncJobBatchPeriod,
}, },
"failure with pod updates batching": {
requeues: 0,
phase: v1.PodFailed,
jobReadyPodsEnabled: true,
wantBackoff: syncJobBatchPeriod,
},
} }
for name, tc := range testCases { for name, tc := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}

View File

@ -975,7 +975,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
JobPodReplacementPolicy: {Default: false, PreRelease: featuregate.Alpha}, JobPodReplacementPolicy: {Default: false, PreRelease: featuregate.Alpha},
JobReadyPods: {Default: true, PreRelease: featuregate.Beta}, JobReadyPods: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.31
KubeletCgroupDriverFromCRI: {Default: false, PreRelease: featuregate.Alpha}, KubeletCgroupDriverFromCRI: {Default: false, PreRelease: featuregate.Alpha},

View File

@ -1363,101 +1363,77 @@ func TestNonParallelJob(t *testing.T) {
func TestParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) {
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
cases := map[string]struct { closeFn, restConfig, clientSet, ns := setup(t, "parallel")
enableReadyPods bool defer closeFn()
}{ ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
"none": {}, defer cancel()
"ready pods": { resetMetrics()
enableReadyPods: true,
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](5),
}, },
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
} }
for name, tc := range cases { want := podsByStatus{
t.Run(name, func(t *testing.T) { Active: 5,
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() Ready: ptr.To[int32](0),
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
resetMetrics()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](5),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
want := podsByStatus{Active: 5}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
*want.Ready = 2
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Active: 5,
Failed: 2,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Once one Pod succeeds, no more Pods are created, even if some fail.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Failed: 2,
Succeeded: 1,
Active: 4,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Failed: 4,
Succeeded: 1,
Active: 2,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
want = podsByStatus{
Failed: 4,
Succeeded: 3,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7)
})
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
want.Ready = ptr.To[int32](2)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Active: 5,
Failed: 2,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Once one Pod succeeds, no more Pods are created, even if some fail.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Failed: 2,
Succeeded: 1,
Active: 4,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Failed: 4,
Succeeded: 1,
Active: 2,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after remaining Pods succeed.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
want = podsByStatus{
Failed: 4,
Succeeded: 3,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
validateTerminatedPodsTrackingFinalizerMetric(ctx, t, 7)
} }
func TestParallelJobParallelism(t *testing.T) { func TestParallelJobParallelism(t *testing.T) {
@ -1520,87 +1496,65 @@ func TestParallelJobWithCompletions(t *testing.T) {
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
cases := map[string]struct { closeFn, restConfig, clientSet, ns := setup(t, "completions")
enableReadyPods bool defer closeFn()
}{ ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
"none": {}, defer cancel()
"ready pods": {
enableReadyPods: true, jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](54),
Completions: ptr.To[int32](56),
}, },
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
} }
for name, tc := range cases { want := podsByStatus{
t.Run(name, func(t *testing.T) { Active: 54,
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() Ready: ptr.To[int32](0),
closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](54),
Completions: ptr.To[int32](56),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
want := podsByStatus{Active: 54}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](52)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Active: 54,
Failed: 2,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](50)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Pods are created until the number of succeeded Pods equals completions.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Failed: 2,
Succeeded: 53,
Active: 3,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after the Job completes.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
want = podsByStatus{
Failed: 2,
Succeeded: 56,
}
if tc.enableReadyPods {
want.Ready = ptr.To[int32](0)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Tracks ready pods, if enabled.
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
t.Fatalf("Failed Marking Pods as ready: %v", err)
}
want.Ready = ptr.To[int32](52)
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Failed Pods are replaced.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
}
want = podsByStatus{
Active: 54,
Failed: 2,
Ready: ptr.To[int32](50),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// Pods are created until the number of succeeded Pods equals completions.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
}
want = podsByStatus{
Failed: 2,
Succeeded: 53,
Active: 3,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
// No more Pods are created after the Job completes.
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
}
validateJobSucceeded(ctx, t, clientSet, jobObj)
want = podsByStatus{
Failed: 2,
Succeeded: 56,
Ready: ptr.To[int32](0),
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
} }
func TestIndexedJob(t *testing.T) { func TestIndexedJob(t *testing.T) {