Merge pull request #126439 from kaisoz/improve-job-test-runtime

Improve Job integration tests runtime by reusing apiserver within each test
This commit is contained in:
Kubernetes Prow Robot 2024-09-11 19:03:13 +01:00 committed by GitHub
commit ebe3404b5a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -427,16 +427,17 @@ func TestJobPodFailurePolicy(t *testing.T) {
},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "pod-failure-policy")
t.Cleanup(closeFn)
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer func() {
t.Cleanup(func() {
cancel()
}()
})
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
@ -761,18 +762,18 @@ func TestSuccessPolicy(t *testing.T) {
wantConditionTypes: []batchv1.JobConditionType{batchv1.JobFailed},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "success-policy")
t.Cleanup(closeFn)
for name, tc := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, tc.enableJobSuccessPolicy)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableBackoffLimitPerIndex)
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer func() {
cancel()
}()
t.Cleanup(cancel)
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &tc.job)
if err != nil {
t.Fatalf("Error %v while creating the Job %q", err, jobObj.Name)
@ -1180,11 +1181,12 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T)
// are terminal. The fate of the Job is indicated by the interim Job conditions:
// FailureTarget, or SuccessCriteriaMet.
func TestDelayTerminalPhaseCondition(t *testing.T) {
const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion"
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
podTemplateSpec := v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
@ -1515,6 +1517,9 @@ func TestDelayTerminalPhaseCondition(t *testing.T) {
},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition")
t.Cleanup(closeFn)
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
@ -1523,8 +1528,6 @@ func TestDelayTerminalPhaseCondition(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, true)
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, test.enableJobSuccessPolicy)
closeFn, restConfig, clientSet, ns := setup(t, "delay-terminal-condition")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
@ -1532,7 +1535,11 @@ func TestDelayTerminalPhaseCondition(t *testing.T) {
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
}
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
t.Cleanup(func() {
if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil {
t.Fatalf("Failed cleanup: %v", err)
}
})
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
waitForPodsToBeActive(ctx, t, jobClient, *jobObj.Spec.Parallelism, jobObj)
@ -1901,17 +1908,17 @@ func TestBackoffLimitPerIndex(t *testing.T) {
},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index")
t.Cleanup(closeFn)
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer func() {
cancel()
}()
t.Cleanup(cancel)
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
@ -1972,6 +1979,7 @@ func TestBackoffLimitPerIndex(t *testing.T) {
// reconcile or skip reconciliation of the Job depending on the Job's managedBy
// field, and the enablement of the JobManagedBy feature gate.
func TestManagedBy(t *testing.T) {
const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion"
customControllerName := "example.com/custom-job-controller"
podTemplateSpec := v1.PodTemplateSpec{
Spec: v1.PodSpec{
@ -2064,20 +2072,29 @@ func TestManagedBy(t *testing.T) {
},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "managed-by")
t.Cleanup(closeFn)
for name, test := range testCases {
t.Run(name, func(t *testing.T) {
resetMetrics()
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, test.enableJobManagedBy)
closeFn, restConfig, clientSet, ns := setup(t, "managed-by")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
t.Cleanup(cancel)
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
if err != nil {
t.Fatalf("Error %v while creating the job %q", err, klog.KObj(jobObj))
}
t.Cleanup(func() {
// Wait for cleanup to finish to prevent this job from affecting metrics
if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil {
t.Fatalf("Failed cleanup: %v", err)
}
})
if test.wantReconciledByBuiltInController {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: int(*jobObj.Spec.Parallelism),
@ -2817,6 +2834,7 @@ func TestIndexedJob(t *testing.T) {
}
func TestJobPodReplacementPolicy(t *testing.T) {
const blockDeletionFinalizerForTest string = "fake.example.com/blockDeletion"
t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff))
indexedCompletion := batchv1.IndexedCompletion
nonIndexedCompletion := batchv1.NonIndexedCompletion
@ -2847,7 +2865,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
CompletionMode: &indexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
},
@ -2872,7 +2890,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
},
@ -2900,7 +2918,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
PodReplacementPolicy: podReplacementPolicy(batchv1.TerminatingOrFailed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
},
@ -2928,7 +2946,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
PodFailurePolicy: &batchv1.PodFailurePolicy{
@ -2962,7 +2980,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
},
@ -2990,7 +3008,7 @@ func TestJobPodReplacementPolicy(t *testing.T) {
PodReplacementPolicy: podReplacementPolicy(batchv1.Failed),
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{"fake.example.com/blockDeletion"},
Finalizers: []string{blockDeletionFinalizerForTest},
},
},
},
@ -3010,13 +3028,14 @@ func TestJobPodReplacementPolicy(t *testing.T) {
},
},
}
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
t.Cleanup(closeFn)
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, tc.podReplacementPolicyEnabled)
closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy")
t.Cleanup(closeFn)
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
t.Cleanup(cancel)
resetMetrics()
@ -3030,7 +3049,11 @@ func TestJobPodReplacementPolicy(t *testing.T) {
jobClient := clientSet.BatchV1().Jobs(jobObj.Namespace)
waitForPodsToBeActive(ctx, t, jobClient, 2, jobObj)
t.Cleanup(func() { removePodsFinalizer(ctx, t, clientSet, ns.Name) })
t.Cleanup(func() {
if err := cleanUp(ctx, clientSet, jobObj, []string{blockDeletionFinalizerForTest, batchv1.JobTrackingFinalizer}); err != nil {
t.Fatalf("Failed cleanup: %v", err)
}
})
deletePods(ctx, t, clientSet, ns.Name)
@ -3221,11 +3244,11 @@ func TestElasticIndexedJob(t *testing.T) {
},
}
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
t.Cleanup(closeFn)
for name, tc := range cases {
tc := tc
t.Run(name, func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
resetMetrics()
@ -3362,7 +3385,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
b.Fatalf("Failed to create Job: %v", err)
}
b.Cleanup(func() {
if err := cleanUp(ctx, clientSet, jobObj); err != nil {
if err := cleanUp(ctx, clientSet, jobObj, []string{batchv1.JobTrackingFinalizer}); err != nil {
b.Fatalf("Failed cleanup: %v", err)
}
})
@ -3450,7 +3473,7 @@ func BenchmarkLargeFailureHandling(b *testing.B) {
b.Fatalf("Failed to create Job: %v", err)
}
b.Cleanup(func() {
if err := cleanUp(ctx, clientSet, jobObj); err != nil {
if err := cleanUp(ctx, clientSet, jobObj, []string{batchv1.JobTrackingFinalizer}); err != nil {
b.Fatalf("Failed cleanup: %v", err)
}
})
@ -3484,8 +3507,11 @@ func BenchmarkLargeFailureHandling(b *testing.B) {
}
}
// cleanUp deletes all pods and the job
func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job) error {
// cleanUp removes the specified pod finalizers, then deletes all pods and the job.
func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1.Job, podFinalizersToRemove []string) error {
if err := removePodsFinalizers(ctx, clientSet, jobObj.Namespace, podFinalizersToRemove); err != nil {
return err
}
// Clean up pods in pages, because DeleteCollection might timeout.
// #90743
for {
@ -3505,14 +3531,18 @@ func cleanUp(ctx context.Context, clientSet clientset.Interface, jobObj *batchv1
return err
}
}
return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{})
// Set the propagation policy to background to ensure that the job doesn't receive any finalizer at deletion time
return clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
PropagationPolicy: ptr.To(metav1.DeletePropagationBackground),
})
}
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "orphan-pod-finalizers")
t.Cleanup(closeFn)
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
t.Run(string(policy), func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "controller-informers")), 0)
// Make the job controller significantly slower to trigger race condition.
restConfig.QPS = 1
@ -3821,11 +3851,11 @@ func TestSuspendJob(t *testing.T) {
},
}
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
t.Cleanup(closeFn)
for _, tc := range testCases {
name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag)
t.Run(name, func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig)
defer cancel()
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
@ -4341,7 +4371,7 @@ func getCompletionIndex(p *v1.Pod) (int, error) {
func createJobWithDefaults(ctx context.Context, clientSet clientset.Interface, ns string, jobObj *batchv1.Job) (*batchv1.Job, error) {
if jobObj.Name == "" {
jobObj.Name = "test-job"
jobObj.GenerateName = "test-job"
}
if len(jobObj.Spec.Template.Spec.Containers) == 0 {
jobObj.Spec.Template.Spec.Containers = []v1.Container{
@ -4476,23 +4506,25 @@ func deletePods(ctx context.Context, t *testing.T, clientSet clientset.Interface
}
}
func removePodsFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {
t.Helper()
func removePodsFinalizers(ctx context.Context, clientSet clientset.Interface, namespace string, finalizersNames []string) error {
pods, err := clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list pods: %v", err)
return err
}
updatePod(ctx, t, clientSet, pods.Items, func(pod *v1.Pod) {
for i, finalizer := range pod.Finalizers {
if finalizer == "fake.example.com/blockDeletion" {
pod.Finalizers = append(pod.Finalizers[:i], pod.Finalizers[i+1:]...)
finalizersSet := sets.New(finalizersNames...)
return updatePod(ctx, clientSet, pods.Items, func(pod *v1.Pod) {
podFinalizers := []string{}
for _, podFinalizer := range pod.Finalizers {
if _, found := finalizersSet[podFinalizer]; !found {
podFinalizers = append(podFinalizers, podFinalizer)
}
}
pod.Finalizers = podFinalizers
})
}
func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) {
t.Helper()
func updatePod(ctx context.Context, clientSet clientset.Interface, pods []v1.Pod, updateFunc func(*v1.Pod)) error {
for _, val := range pods {
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
newPod, err := clientSet.CoreV1().Pods(val.Namespace).Get(ctx, val.Name, metav1.GetOptions{})
@ -4503,9 +4535,10 @@ func updatePod(ctx context.Context, t *testing.T, clientSet clientset.Interface,
_, err = clientSet.CoreV1().Pods(val.Namespace).Update(ctx, newPod, metav1.UpdateOptions{})
return err
}); err != nil {
t.Fatalf("Failed to update pod %s: %v", val.Name, err)
return err
}
}
return nil
}
func failTerminatingPods(ctx context.Context, t *testing.T, clientSet clientset.Interface, namespace string) {