diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 0eda5d76b02..5c3da1096c6 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -67,7 +67,7 @@ func TestNonParallelJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -86,7 +86,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobController(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // Failed Pod is replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { @@ -100,7 +100,7 @@ func TestNonParallelJob(t *testing.T) { // Restarting controller. cancel() - ctx, cancel = startJobController(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // No more Pods are created after the Pod succeeds. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { @@ -141,7 +141,7 @@ func TestParallelJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -229,7 +229,7 @@ func TestParallelJobParallelism(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "parallel") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -309,7 +309,7 @@ func TestParallelJobWithCompletions(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() closeFn, restConfig, clientSet, ns := setup(t, "completions") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -389,7 +389,7 @@ func TestIndexedJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "indexed") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -464,7 +464,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -495,7 +495,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // Ensure Job continues to be tracked and finalizers are removed. validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -522,7 +522,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) // Ensure Job continues to be tracked and finalizers are removed. validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ @@ -585,7 +585,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -651,7 +651,7 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -683,7 +683,7 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) { } // Restart controller. - ctx, cancel = startJobController(restConfig) + ctx, cancel = startJobControllerAndWaitForCaches(restConfig) if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) if err != nil { @@ -733,7 +733,7 @@ func TestSuspendJob(t *testing.T) { t.Run(name, func(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { @@ -784,7 +784,7 @@ func TestSuspendJob(t *testing.T) { func TestSuspendJobControllerRestart(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer func() { cancel() }() @@ -815,7 +815,7 @@ func TestNodeSelectorUpdate(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "suspend") defer closeFn() - ctx, cancel := startJobController(restConfig) + ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{ @@ -1182,11 +1182,17 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co return closeFn, &config, clientSet, ns } -func startJobController(restConfig *restclient.Config) (context.Context, context.CancelFunc) { +func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) { informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0) jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) informerSet.Start(ctx.Done()) go jc.Run(ctx, 1) + + // since this method starts the controller in a separate goroutine + // and the tests don't check /readyz there is no way + // the tests can tell it is safe to call the server and requests won't be rejected + // thus we wait until caches have synced + informerSet.WaitForCacheSync(ctx.Done()) return ctx, cancel }