mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-25 12:17:52 +00:00
Track Job Pods completion in status
Through Job.status.uncountedPodUIDs and a Pod finalizer An annotation marks if a job should be tracked with new behavior A separate work queue is used to remove finalizers from orphan pods. Change-Id: I1862e930257a9d1f7f1b2b0a526ed15bc8c248ad
This commit is contained in:
@@ -44,10 +44,277 @@ import (
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
const waitInterval = 500 * time.Millisecond
|
||||
|
||||
// TestNonParallelJob tests that a Job that only executes one Pod. The test
|
||||
// recreates the Job controller at some points to make sure a new controller
|
||||
// is able to pickup.
|
||||
func TestNonParallelJob(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
}, wFinalizers)
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
}, wFinalizers)
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
}, false)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelJob(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
}, wFinalizers)
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
Failed: 2,
|
||||
}, wFinalizers)
|
||||
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 1,
|
||||
Active: 4,
|
||||
}, wFinalizers)
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 4,
|
||||
Succeeded: 1,
|
||||
Active: 2,
|
||||
}, wFinalizers)
|
||||
// No more Pods are created after remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 4,
|
||||
Succeeded: 3,
|
||||
}, false)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParallelJobWithCompletions(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "completions")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(4),
|
||||
Completions: pointer.Int32Ptr(6),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
}, wFinalizers)
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
Failed: 2,
|
||||
}, wFinalizers)
|
||||
// Pods are created until the number of succeeded Pods equals completions.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 3,
|
||||
Active: 3,
|
||||
}, wFinalizers)
|
||||
// No more Pods are created after the Job completes.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 6,
|
||||
}, false)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexedJob(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
mode := batchv1.IndexedCompletion
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(3),
|
||||
Completions: pointer.Int32Ptr(4),
|
||||
CompletionMode: &mode,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
|
||||
|
||||
// One Pod succeeds.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 1")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Succeeded: 1,
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Disable feature gate and restart controller.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer events.Stop()
|
||||
|
||||
// One Pod fails, but no recreations happen because feature is disabled.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 2")
|
||||
}
|
||||
if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil {
|
||||
t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err)
|
||||
}
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1")
|
||||
|
||||
// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatal("Failed trying to succeed remaining pods")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 0,
|
||||
Failed: 1,
|
||||
Succeeded: 4,
|
||||
}, false)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestDisableJobTrackingWithFinalizers ensures that when the
|
||||
// JobTrackingWithFinalizers feature is disabled, tracking finalizers are
|
||||
// removed from all pods, but Job continues to be tracked.
|
||||
// This test can be removed once the feature graduates to GA.
|
||||
func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
// Step 1: job created while feature is enabled.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
@@ -55,219 +322,123 @@ func TestNonParallelJob(t *testing.T) {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(2),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver didn't add the tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
})
|
||||
Active: 2,
|
||||
}, true)
|
||||
|
||||
// Restarting controller.
|
||||
// Step 2: Disable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
// Fail a pod while Job controller is stopped.
|
||||
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
})
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Failed: 1,
|
||||
}, false)
|
||||
|
||||
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining updated Job object: %v", err)
|
||||
}
|
||||
if hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("controller didn't remove the tracking annotation")
|
||||
}
|
||||
|
||||
// Step 3: Reenable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
cancel()
|
||||
|
||||
// Succeed a pod while Job controller is stopped.
|
||||
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
})
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestParallelJob(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer cancel()
|
||||
func TestOrphanPodsFinalizersCleared(t *testing.T) {
|
||||
// Step 0: job created while feature is enabled.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
})
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
Failed: 2,
|
||||
})
|
||||
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 1,
|
||||
Active: 4,
|
||||
})
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 4,
|
||||
Succeeded: 1,
|
||||
Active: 2,
|
||||
})
|
||||
// No more Pods are created after remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 4,
|
||||
Succeeded: 3,
|
||||
})
|
||||
}
|
||||
|
||||
func TestParallelJobWithCompletions(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "completions")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(4),
|
||||
Completions: pointer.Int32Ptr(6),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
})
|
||||
// Failed Pods are replaced.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
Failed: 2,
|
||||
})
|
||||
// Pods are created until the number of succeeded Pods equals completions.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 3,
|
||||
Active: 3,
|
||||
})
|
||||
// No more Pods are created after the Job completes.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 2,
|
||||
Succeeded: 6,
|
||||
})
|
||||
}
|
||||
|
||||
func TestIndexedJob(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobController(restConfig, clientSet)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
mode := batchv1.IndexedCompletion
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(3),
|
||||
Completions: pointer.Int32Ptr(4),
|
||||
CompletionMode: &mode,
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
|
||||
|
||||
// One Pod succeeds.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 1")
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver didn't add the tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Succeeded: 1,
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
Active: 1,
|
||||
}, true)
|
||||
|
||||
// Disable feature gate and restart controller.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, false)()
|
||||
// Step 2: Disable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
|
||||
cancel()
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
|
||||
|
||||
// Delete the Job while controller is stopped.
|
||||
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
t.Fatalf("Failed to delete job: %v", err)
|
||||
}
|
||||
defer events.Stop()
|
||||
|
||||
// One Pod fails, but no recreations happen because feature is disabled.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 2")
|
||||
}
|
||||
if err := waitForEvent(events, jobObj.UID, "IndexedJobDisabled"); err != nil {
|
||||
t.Errorf("Waiting for an event for IndexedJobDisabled: %v", err)
|
||||
}
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 3), "1")
|
||||
|
||||
// Re-enable feature gate and restart controller. Failed Pod should be recreated now.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
|
||||
cancel()
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobController(restConfig, clientSet)
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Remaining Pods succeed.
|
||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatal("Failed trying to succeed remaining pods")
|
||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Falied to list Job Pods: %v", err)
|
||||
}
|
||||
sawPods := false
|
||||
for _, pod := range pods.Items {
|
||||
if isPodOwnedByJob(&pod, jobObj) {
|
||||
if hasJobTrackingFinalizer(&pod) {
|
||||
return false, nil
|
||||
}
|
||||
sawPods = true
|
||||
}
|
||||
}
|
||||
return sawPods, nil
|
||||
}); err != nil {
|
||||
t.Errorf("Waiting for finalizers to be removed: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 0,
|
||||
Failed: 1,
|
||||
Succeeded: 4,
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
func TestSuspendJob(t *testing.T) {
|
||||
@@ -336,7 +507,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
validate := func(s string, active int, status v1.ConditionStatus, reason string) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
|
||||
Active: active,
|
||||
})
|
||||
}, false)
|
||||
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get Job after %s: %v", s, err)
|
||||
@@ -382,7 +553,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
|
||||
Active: 0,
|
||||
})
|
||||
}, false)
|
||||
|
||||
// Disable feature gate and restart controller to test that pods get created.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)()
|
||||
@@ -394,7 +565,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
|
||||
Active: 2,
|
||||
})
|
||||
}, false)
|
||||
}
|
||||
|
||||
type podsByStatus struct {
|
||||
@@ -403,10 +574,10 @@ type podsByStatus struct {
|
||||
Succeeded int
|
||||
}
|
||||
|
||||
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
|
||||
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, wFinalizer bool) {
|
||||
t.Helper()
|
||||
var actualCounts podsByStatus
|
||||
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get updated Job: %v", err)
|
||||
@@ -419,24 +590,47 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
|
||||
return cmp.Equal(actualCounts, desired), nil
|
||||
}); err != nil {
|
||||
diff := cmp.Diff(desired, actualCounts)
|
||||
t.Errorf("Waiting for Job Pods: %v\nPods (-want,+got):\n%s", err, diff)
|
||||
t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
|
||||
}
|
||||
// Verify active Pods. No need for another wait.Poll.
|
||||
var active []*v1.Pod
|
||||
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list Job Pods: %v", err)
|
||||
}
|
||||
active = nil
|
||||
for _, pod := range pods.Items {
|
||||
phase := pod.Status.Phase
|
||||
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
|
||||
p := pod
|
||||
active = append(active, &p)
|
||||
}
|
||||
}
|
||||
return len(active) == desired.Active, nil
|
||||
}); err != nil {
|
||||
if len(active) != desired.Active {
|
||||
t.Errorf("Found %d active Pods, want %d", len(active), desired.Active)
|
||||
}
|
||||
}
|
||||
for _, p := range active {
|
||||
if got := hasJobTrackingFinalizer(p); got != wFinalizer {
|
||||
t.Errorf("Pod %s has tracking finalizer %t, want %t", p.Name, got, wFinalizer)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||
t.Helper()
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list Job Pods: %v", err)
|
||||
}
|
||||
active := 0
|
||||
for _, pod := range pods.Items {
|
||||
if isPodOwnedByJob(&pod, jobObj) {
|
||||
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning {
|
||||
active++
|
||||
}
|
||||
phase := pod.Status.Phase
|
||||
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
|
||||
t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
|
||||
}
|
||||
}
|
||||
if active != desired.Active {
|
||||
t.Errorf("Found %d active Pods, want %d", active, desired.Active)
|
||||
}
|
||||
}
|
||||
|
||||
// validateIndexedJobPods validates indexes and hostname of
|
||||
@@ -484,7 +678,7 @@ func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
|
||||
if reason == "" {
|
||||
return nil
|
||||
}
|
||||
return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
return wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
for {
|
||||
var ev watch.Event
|
||||
select {
|
||||
@@ -515,7 +709,7 @@ func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.
|
||||
|
||||
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
|
||||
t.Helper()
|
||||
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to obtain updated Job: %v", err)
|
||||
@@ -632,3 +826,20 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter
|
||||
go jc.Run(1, ctx.Done())
|
||||
return ctx, cancel
|
||||
}
|
||||
|
||||
func hasJobTrackingFinalizer(obj metav1.Object) bool {
|
||||
for _, fin := range obj.GetFinalizers() {
|
||||
if fin == batchv1.JobTrackingFinalizer {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func hasJobTrackingAnnotation(job *batchv1.Job) bool {
|
||||
if job.Annotations == nil {
|
||||
return false
|
||||
}
|
||||
_, ok := job.Annotations[batchv1.JobTrackingFinalizer]
|
||||
return ok
|
||||
}
|
||||
|
Reference in New Issue
Block a user