diff --git a/cmd/kube-controller-manager/app/batch.go b/cmd/kube-controller-manager/app/batch.go index e5edb4409f2..ecee824a199 100644 --- a/cmd/kube-controller-manager/app/batch.go +++ b/cmd/kube-controller-manager/app/batch.go @@ -29,12 +29,16 @@ import ( ) func startJobController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { - go job.NewController( + jobController, err := job.NewController( ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Batch().V1().Jobs(), controllerContext.ClientBuilder.ClientOrDie("job-controller"), - ).Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs)) + ) + if err != nil { + return nil, true, fmt.Errorf("creating Job controller: %v", err) + } + go jobController.Run(ctx, int(controllerContext.ComponentConfig.JobController.ConcurrentJobSyncs)) return nil, true, nil } @@ -44,7 +48,7 @@ func startCronJobController(ctx context.Context, controllerContext ControllerCon controllerContext.ClientBuilder.ClientOrDie("cronjob-controller"), ) if err != nil { - return nil, true, fmt.Errorf("error creating CronJob controller V2: %v", err) + return nil, true, fmt.Errorf("creating CronJob controller V2: %v", err) } go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs)) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 34aae78e93d..2dce2d773f7 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -152,11 +152,11 @@ type syncJobCtx struct { // NewController creates a new Job controller that keeps the relevant pods // in sync with their corresponding Job objects. -func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) *Controller { +func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) { return newControllerWithClock(ctx, podInformer, jobInformer, kubeClient, &clock.RealClock{}) } -func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) *Controller { +func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface, clock clock.WithTicker) (*Controller, error) { eventBroadcaster := record.NewBroadcaster() logger := klog.FromContext(ctx) @@ -176,7 +176,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn podBackoffStore: newBackoffStore(), } - jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { jm.enqueueSyncJobImmediately(logger, obj) }, @@ -186,11 +186,13 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn DeleteFunc: func(obj interface{}) { jm.deleteJob(logger, obj) }, - }) + }); err != nil { + return nil, fmt.Errorf("adding Job event handler: %w", err) + } jm.jobLister = jobInformer.Lister() jm.jobStoreSynced = jobInformer.Informer().HasSynced - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + if _, err := podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { jm.addPod(logger, obj) }, @@ -200,7 +202,9 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn DeleteFunc: func(obj interface{}) { jm.deletePod(logger, obj, true) }, - }) + }); err != nil { + return nil, fmt.Errorf("adding Pod event handler: %w", err) + } jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced @@ -210,7 +214,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn metrics.Register() - return jm + return jm, nil } // Run the main goroutine responsible for watching and syncing jobs. diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 79b637e8af6..01b9222ad31 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -123,13 +123,18 @@ func newJob(parallelism, completions, backoffLimit int32, completionMode batch.C return newJobWithName("foobar", parallelism, completions, backoffLimit, completionMode) } -func newControllerFromClient(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { - return newControllerFromClientWithClock(ctx, kubeClient, resyncPeriod, realClock) +func newControllerFromClient(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) (*Controller, informers.SharedInformerFactory) { + t.Helper() + return newControllerFromClientWithClock(ctx, t, kubeClient, resyncPeriod, realClock) } -func newControllerFromClientWithClock(ctx context.Context, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { +func newControllerFromClientWithClock(ctx context.Context, t *testing.T, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, clock clock.WithTicker) (*Controller, informers.SharedInformerFactory) { + t.Helper() sharedInformers := informers.NewSharedInformerFactory(kubeClient, resyncPeriod()) - jm := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) + jm, err := newControllerWithClock(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), kubeClient, clock) + if err != nil { + t.Fatalf("Error creating Job controller: %v", err) + } jm.podControl = &controller.FakePodControl{} return jm, sharedInformers } @@ -879,7 +884,7 @@ func TestControllerSyncJob(t *testing.T) { fakeClock = clocktesting.NewFakeClock(time.Now()) } - manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientSet, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientSet, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{Err: tc.podControllerError, CreateLimit: tc.podLimit} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -1813,7 +1818,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, tc.enableJobBackoffLimitPerIndex)() clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, _ := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) + manager, _ := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl @@ -1960,7 +1965,7 @@ func TestSyncJobPastDeadline(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -2038,7 +2043,7 @@ func TestPastDeadlineJobFinished(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) - manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.expectations = FakeJobExpectations{ @@ -2117,7 +2122,7 @@ func TestPastDeadlineJobFinished(t *testing.T) { func TestSingleJobFailedCondition(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -2157,7 +2162,7 @@ func TestSingleJobFailedCondition(t *testing.T) { func TestSyncJobComplete(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -2183,7 +2188,7 @@ func TestSyncJobComplete(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, _ := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, _ := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3274,7 +3279,7 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) { tc.job.Spec.PodReplacementPolicy = podReplacementPolicy(batch.Failed) } clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3775,7 +3780,7 @@ func TestSyncJobWithJobBackoffLimitPerIndex(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(now) - manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3833,7 +3838,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { t.Run(name, func(t *testing.T) { t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff)) fakeClient := clocktesting.NewFakeClock(time.Now()) - manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClient) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClient) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -3886,7 +3891,7 @@ func TestUpdateJobRequeue(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -3955,7 +3960,7 @@ func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) { for name, tc := range cases { t.Run(name, func(t *testing.T) { fakeClock := clocktesting.NewFakeClock(now) - manager, _ := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + manager, _ := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) gotIndexesToAdd, gotRemainingTime := manager.getPodCreationInfoForIndependentIndexes(logger, tc.indexesToAdd, tc.podsWithDelayedDeletionPerIndex) if diff := cmp.Diff(tc.wantIndexesToAdd, gotIndexesToAdd); diff != "" { t.Fatalf("Unexpected indexes to add: %s", diff) @@ -3970,7 +3975,7 @@ func TestGetPodCreationInfoForIndependentIndexes(t *testing.T) { func TestJobPodLookup(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady testCases := []struct { @@ -4114,7 +4119,7 @@ func TestGetPodsForJob(t *testing.T) { job.DeletionTimestamp = &metav1.Time{} } clientSet := fake.NewSimpleClientset(job, otherJob) - jm, informer := newControllerFromClient(ctx, clientSet, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, t, clientSet, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady cachedJob := job.DeepCopy() @@ -4158,7 +4163,7 @@ func TestAddPod(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4202,7 +4207,7 @@ func TestAddPodOrphan(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4232,7 +4237,7 @@ func TestUpdatePod(t *testing.T) { logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4280,7 +4285,7 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4309,7 +4314,7 @@ func TestUpdatePodChangeControllerRef(t *testing.T) { logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4337,7 +4342,7 @@ func TestUpdatePodRelease(t *testing.T) { logger := klog.FromContext(ctx) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4364,7 +4369,7 @@ func TestDeletePod(t *testing.T) { logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + jm, informer := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4408,7 +4413,7 @@ func TestDeletePodOrphan(t *testing.T) { t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0)) logger, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + jm, informer := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) jm.podStoreSynced = alwaysReady jm.jobStoreSynced = alwaysReady @@ -4449,7 +4454,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(logger klog.Logger, controll func TestSyncJobExpectations(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4486,7 +4491,7 @@ func TestWatchJobs(t *testing.T) { clientset := fake.NewSimpleClientset() fakeWatch := watch.NewFake() clientset.PrependWatchReactor("jobs", core.DefaultWatchReactor(fakeWatch, nil)) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -4532,7 +4537,7 @@ func TestWatchPods(t *testing.T) { clientset := fake.NewSimpleClientset(testJob) fakeWatch := watch.NewFake() clientset.PrependWatchReactor("pods", core.DefaultWatchReactor(fakeWatch, nil)) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -4578,7 +4583,10 @@ func TestWatchOrphanPods(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + if err != nil { + t.Fatalf("Error creating Job controller: %v", err) + } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -4655,7 +4663,7 @@ func TestJobApiBackoffReset(t *testing.T) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) fakeClock := clocktesting.NewFakeClock(time.Now()) - manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock) + manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, t, clientset, controller.NoResyncPeriodFunc, fakeClock) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4739,7 +4747,7 @@ func TestJobBackoff(t *testing.T) { t.Run(name, func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)() clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4847,7 +4855,7 @@ func TestJobBackoffForOnFailure(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -4946,7 +4954,7 @@ func TestJobBackoffOnRestartPolicyNever(t *testing.T) { t.Run(name, func(t *testing.T) { // job manager setup clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) - manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) + manager, sharedInformerFactory := newControllerFromClient(ctx, t, clientset, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -5080,7 +5088,10 @@ func TestFinalizersRemovedExpectations(t *testing.T) { _, ctx := ktesting.NewTestContext(t) clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + if err != nil { + t.Fatalf("Error creating Job controller: %v", err) + } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} @@ -5133,7 +5144,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { update := pods[0].DeepCopy() update.Finalizers = nil update.ResourceVersion = "1" - err := clientset.Tracker().Update(podsResource, update, update.Namespace) + err = clientset.Tracker().Update(podsResource, update, update.Namespace) if err != nil { t.Errorf("Removing finalizer: %v", err) } @@ -5179,7 +5190,10 @@ func TestFinalizerCleanup(t *testing.T) { clientset := fake.NewSimpleClientset() sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) - manager := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager, err := NewController(ctx, sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + if err != nil { + t.Fatalf("Error creating Job controller: %v", err) + } manager.podStoreSynced = alwaysReady manager.jobStoreSynced = alwaysReady @@ -5193,7 +5207,7 @@ func TestFinalizerCleanup(t *testing.T) { // Create a simple Job job := newJob(1, 1, 1, batch.NonIndexedCompletion) - job, err := clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) + job, err = clientset.BatchV1().Jobs(job.GetNamespace()).Create(ctx, job, metav1.CreateOptions{}) if err != nil { t.Fatalf("Creating job: %v", err) } diff --git a/test/integration/cronjob/cronjob_test.go b/test/integration/cronjob/cronjob_test.go index 90f20902f07..9a259565dd9 100644 --- a/test/integration/cronjob/cronjob_test.go +++ b/test/integration/cronjob/cronjob_test.go @@ -52,7 +52,10 @@ func setup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc if err != nil { t.Fatalf("Error creating CronJob controller: %v", err) } - jc := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + jc, err := job.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + if err != nil { + t.Fatalf("Error creating Job controller: %v", err) + } return server.TearDownFn, cjc, jc, informerSet, clientSet } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 49338637aeb..4046a7e055c 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -75,7 +75,7 @@ func TestMetricsOnSuccesses(t *testing.T) { // setup the job controller closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() testCases := map[string]struct { @@ -151,7 +151,7 @@ func TestJobFinishedNumReasonMetric(t *testing.T) { // setup the job controller closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() testCases := map[string]struct { @@ -378,7 +378,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi // Make the job controller significantly slower to trigger race condition. restConfig.QPS = 1 restConfig.Burst = 1 - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -454,7 +454,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi cancel() // start the second controller to promote the interim FailureTarget job condition as Failed - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // verify the job is correctly marked as Failed validateJobFailed(ctx, t, cs, jobObj) validateNoOrphanPodsWithFinalizers(ctx, t, cs, jobObj) @@ -634,7 +634,7 @@ func TestJobPodFailurePolicy(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -659,7 +659,7 @@ func TestJobPodFailurePolicy(t *testing.T) { if test.restartController { cancel() - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -691,7 +691,7 @@ func TestBackoffLimitPerIndex_DelayedPodDeletion(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-failed") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -766,7 +766,7 @@ func TestBackoffLimitPerIndex_Reenabling(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobBackoffLimitPerIndex, true)() closeFn, restConfig, clientSet, ns := setup(t, "backoff-limit-per-index-reenabled") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -852,7 +852,7 @@ func TestBackoffLimitPerIndex_JobPodsCreatedWithExponentialBackoff(t *testing.T) closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -1239,7 +1239,7 @@ func TestBackoffLimitPerIndex(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -1316,7 +1316,7 @@ func TestNonParallelJob(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -1332,7 +1332,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // Failed Pod is replaced. if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { @@ -1346,7 +1346,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) // No more Pods are created after the Pod succeeds. if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -1377,7 +1377,7 @@ func TestParallelJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -1463,7 +1463,7 @@ func TestParallelJob(t *testing.T) { func TestParallelJobParallelism(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -1533,7 +1533,7 @@ func TestParallelJobWithCompletions(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() closeFn, restConfig, clientSet, ns := setup(t, "completions") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -1607,7 +1607,7 @@ func TestIndexedJob(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, fastPodFailureBackoff)) closeFn, restConfig, clientSet, ns := setup(t, "indexed") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -1758,7 +1758,7 @@ func TestJobPodReplacementPolicy(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "pod-replacement-policy") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -1921,7 +1921,7 @@ func TestElasticIndexedJob(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.ElasticIndexedJob, tc.featureGate)() closeFn, restConfig, clientSet, ns := setup(t, "indexed") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() resetMetrics() @@ -2004,7 +2004,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) { restConfig.QPS = 100 restConfig.Burst = 100 defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(b, restConfig) defer cancel() backoff := wait.Backoff{ Duration: time.Second, @@ -2083,7 +2083,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { // Make the job controller significantly slower to trigger race condition. restConfig.QPS = 1 restConfig.Burst = 1 - jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) + jc, ctx, cancel := createJobControllerWithSharedInformers(t, restConfig, informerSet) resetMetrics() defer cancel() restConfig.QPS = 200 @@ -2126,7 +2126,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 50)) closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() // Job tracking with finalizers requires less calls in Indexed mode, @@ -2165,7 +2165,7 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { t.Cleanup(setDurationDuringTest(&jobcontroller.DefaultJobPodFailureBackOff, 2*time.Second)) closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{}) @@ -2251,7 +2251,7 @@ func validateExpotentialBackoffDelay(t *testing.T, defaultPodFailureBackoff time func TestJobFailedWithInterrupts(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -2291,7 +2291,7 @@ func TestJobFailedWithInterrupts(t *testing.T) { } t.Log("Recreating job controller") cancel() - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) validateJobCondition(ctx, t, clientSet, jobObj, batchv1.JobFailed) } @@ -2322,7 +2322,7 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) { // Step 0: create job. closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer func() { cancel() }() @@ -2349,7 +2349,7 @@ func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) { } // Step 3: Restart controller. - ctx, cancel = startJobControllerAndWaitForCaches(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(t, restConfig) validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) } @@ -2382,7 +2382,7 @@ func TestSuspendJob(t *testing.T) { t.Run(name, func(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { @@ -2433,7 +2433,7 @@ func TestSuspendJob(t *testing.T) { func TestSuspendJobControllerRestart(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -2455,7 +2455,7 @@ func TestSuspendJobControllerRestart(t *testing.T) { func TestNodeSelectorUpdate(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobControllerAndWaitForCaches(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(t, restConfig) defer cancel() job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{ @@ -2919,9 +2919,10 @@ func setup(t testing.TB, nsBaseName string) (framework.TearDownFunc, *restclient return closeFn, config, clientSet, ns } -func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) { +func startJobControllerAndWaitForCaches(tb testing.TB, restConfig *restclient.Config) (context.Context, context.CancelFunc) { + tb.Helper() informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0) - jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) + jc, ctx, cancel := createJobControllerWithSharedInformers(tb, restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) @@ -2940,10 +2941,14 @@ func resetMetrics() { metrics.PodFailuresHandledByFailurePolicy.Reset() } -func createJobControllerWithSharedInformers(restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { +func createJobControllerWithSharedInformers(tb testing.TB, restConfig *restclient.Config, informerSet informers.SharedInformerFactory) (*jobcontroller.Controller, context.Context, context.CancelFunc) { + tb.Helper() clientSet := clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-controller")) ctx, cancel := context.WithCancel(context.Background()) - jc := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + jc, err := jobcontroller.NewController(ctx, informerSet.Core().V1().Pods(), informerSet.Batch().V1().Jobs(), clientSet) + if err != nil { + tb.Fatalf("Error creating Job controller: %v", err) + } return jc, ctx, cancel }