diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 9bcee76ef98..3dfa808427d 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -2756,6 +2756,8 @@ func getCondition(job *batch.Job, condition batch.JobConditionType, status v1.Co // reaching the active deadline, at which point it is marked as Failed. func TestPastDeadlineJobFinished(t *testing.T) { _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) clientset := fake.NewClientset() fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) @@ -2765,8 +2767,6 @@ func TestPastDeadlineJobFinished(t *testing.T) { controller.NewControllerExpectations(), true, func() { }, } - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() sharedInformerFactory.Start(ctx.Done()) sharedInformerFactory.WaitForCacheSync(ctx.Done()) @@ -6549,6 +6549,8 @@ func TestWatchPods(t *testing.T) { func TestWatchOrphanPods(t *testing.T) { _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + t.Cleanup(cancel) clientset := fake.NewClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) @@ -6594,19 +6596,19 @@ func TestWatchOrphanPods(t *testing.T) { sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Delete(tc.job) }) } - + _, ctx := ktesting.NewTestContext(t) podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer() if tc.job != nil { podBuilder = podBuilder.job(tc.job) } orphanPod := podBuilder.Pod - orphanPod, err := clientset.CoreV1().Pods("default").Create(context.Background(), orphanPod, metav1.CreateOptions{}) + orphanPod, err := clientset.CoreV1().Pods("default").Create(ctx, orphanPod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating orphan pod: %v", err) } if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { - p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(context.Background(), orphanPod.Name, metav1.GetOptions{}) + p, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(ctx, orphanPod.Name, metav1.GetOptions{}) if err != nil { return false, err } @@ -7610,7 +7612,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { func TestFinalizerCleanup(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancel(ctx) - defer cancel() + t.Cleanup(cancel) clientset := fake.NewClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 480eb83d9aa..1041fbcd62f 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -50,6 +50,7 @@ import ( basemetrics "k8s.io/component-base/metrics" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" @@ -165,15 +166,15 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi }, } closeFn, restConfig, cs, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) // Make the job controller significantly slower to trigger race condition. restConfig.QPS = 1 restConfig.Burst = 1 ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) resetMetrics() restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst @@ -238,7 +239,8 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi cancel() // Delete the failed pod to make sure it is not used by the second instance of the controller - ctx, cancel = context.WithCancel(context.Background()) + _, ctx = ktesting.NewTestContext(t) + ctx, cancel = context.WithCancel(ctx) err = cs.CoreV1().Pods(failedPod.Namespace).Delete(ctx, failedPod.Name, metav1.DeleteOptions{GracePeriodSeconds: ptr.To[int64](0)}) if err != nil { t.Fatalf("Error: '%v' while deleting pod: '%v'", err, klog.KObj(failedPod)) @@ -817,9 +819,9 @@ func TestSuccessPolicy(t *testing.T) { func TestSuccessPolicy_ReEnabling(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobSuccessPolicy, true) closeFn, resetConfig, clientSet, ns := setup(t, "success-policy-re-enabling") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, resetConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -901,11 +903,9 @@ func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true) closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { - cancel() - }() + t.Cleanup(cancel) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -979,9 +979,9 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true) closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -1069,9 +1069,9 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second)) closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -2128,11 +2128,11 @@ func TestManagedBy_Reenabling(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true) closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reenabling") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) resetMetrics() baseJob := batchv1.Job{ @@ -2231,9 +2231,9 @@ func TestManagedBy_RecreatedJob(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true) closeFn, restConfig, clientSet, ns := setup(t, "managed-by-recreate-job") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() baseJob := batchv1.Job{ @@ -2312,9 +2312,9 @@ func TestManagedBy_UsingReservedJobFinalizers(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobManagedBy, true) closeFn, restConfig, clientSet, ns := setup(t, "managed-by-reserved-finalizers") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobSpec := batchv1.Job{ @@ -2441,11 +2441,11 @@ func completionModePtr(cm batchv1.CompletionMode) *batchv1.CompletionMode { func TestNonParallelJob(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{}) @@ -2506,9 +2506,9 @@ func TestNonParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "parallel") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -2597,9 +2597,9 @@ func TestParallelJob(t *testing.T) { func TestParallelJobChangingParallelism(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -2660,9 +2660,9 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "completions") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -2740,9 +2740,9 @@ func TestParallelJobWithCompletions(t *testing.T) { func TestIndexedJob(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "indexed") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() mode := batchv1.IndexedCompletion @@ -3110,11 +3110,11 @@ func TestJobPodReplacementPolicyFeatureToggling(t *testing.T) { wantTerminating := ptr.To(podCount) featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodReplacementPolicy, true) closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -3250,7 +3250,7 @@ func TestElasticIndexedJob(t *testing.T) { tc := tc t.Run(name, func(t *testing.T) { ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) resetMetrics() // Set up initial Job in Indexed completion mode. @@ -3334,9 +3334,9 @@ func BenchmarkLargeIndexedJob(b *testing.B) { // the job-controller performance is fast enough in the limited QPS and Burst situations. restConfig.QPS = 100 restConfig.Burst = 100 - defer closeFn() + b.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig) - defer cancel() + b.Cleanup(cancel) backoff := wait.Backoff{ Duration: time.Second, Factor: 1.5, @@ -3419,9 +3419,9 @@ func BenchmarkLargeFailureHandling(b *testing.B) { // the job-controller performance is fast enough in the limited QPS and Burst situations. restConfig.QPS = 100 restConfig.Burst = 100 - defer closeFn() + b.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig) - defer cancel() + b.Cleanup(cancel) backoff := wait.Backoff{ Duration: time.Second, Factor: 1.5, @@ -3549,7 +3549,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { restConfig.Burst = 1 jc, ctx, cancel := createJobControllerWithSharedInformers(t, restConfig, informerSet) resetMetrics() - defer cancel() + t.Cleanup(cancel) restConfig.QPS = restConfigQPS restConfig.Burst = restConfigBurst runGC := util.CreateGCController(ctx, t, *restConfig, informerSet) @@ -3590,9 +3590,9 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { // doesn't affect the termination of pods. t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 50)) closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) // Job tracking with finalizers requires less calls in Indexed mode, // so it's more likely to process all finalizers before all the pods @@ -3633,9 +3633,9 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second)) closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{}) if err != nil { @@ -3722,11 +3722,11 @@ func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time // succeed is marked as Failed, even if the controller fails in the middle. func TestJobFailedWithInterrupts(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ Completions: ptr.To[int32](10), @@ -3794,11 +3794,11 @@ func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clien func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) { // Step 0: create job. closeFn, restConfig, clientSet, ns := setup(t, "simple") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer func() { + t.Cleanup(func() { cancel() - }() + }) jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -3817,7 +3817,8 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) { // Step 2: Delete the Job while the controller is stopped. cancel() - err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{}) + _, ctx = ktesting.NewTestContext(t) + err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{}) if err != nil { t.Fatalf("Failed to delete job: %v", err) } @@ -3857,7 +3858,7 @@ func TestSuspendJob(t *testing.T) { name := fmt.Sprintf("feature=%v,create=%v,update=%v", tc.featureGate, tc.create.flag, tc.update.flag) t.Run(name, func(t *testing.T) { ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { t.Fatal(err) @@ -3907,9 +3908,9 @@ func TestSuspendJob(t *testing.T) { func TestSuspendJobControllerRestart(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -3930,9 +3931,9 @@ func TestSuspendJobControllerRestart(t *testing.T) { func TestNodeSelectorUpdate(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") - defer closeFn() + t.Cleanup(closeFn) ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) - defer cancel() + t.Cleanup(cancel) job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{ Parallelism: ptr.To[int32](1), @@ -4433,7 +4434,8 @@ func resetMetrics() { func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { tb.Helper() clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller")) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(tb) + ctx, cancel := context.WithCancel(ctx) jc, err := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) if err != nil { tb.Fatalf("Error creating Job controller: %v", err)