hardens integration job tests

the job controller used by the tests must wait for the caches to sync
since the tests don't check /readyz there is no way
the tests can tell it is safe to call the server and requests won't be rejected
This commit is contained in:
Lukasz Szaszkiewicz 2022-05-02 11:57:16 +02:00
parent fdb2d54475
commit 59a5c1a6ea

View File

@ -67,7 +67,7 @@ func TestNonParallelJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -86,7 +86,7 @@ func TestNonParallelJob(t *testing.T) {
// Restarting controller. // Restarting controller.
cancel() cancel()
ctx, cancel = startJobController(restConfig) ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Failed Pod is replaced. // Failed Pod is replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
@ -100,7 +100,7 @@ func TestNonParallelJob(t *testing.T) {
// Restarting controller. // Restarting controller.
cancel() cancel()
ctx, cancel = startJobController(restConfig) ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// No more Pods are created after the Pod succeeds. // No more Pods are created after the Pod succeeds.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
@ -141,7 +141,7 @@ func TestParallelJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "parallel") closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel() defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -229,7 +229,7 @@ func TestParallelJobParallelism(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "parallel") closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel() defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -309,7 +309,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
closeFn, restConfig, clientSet, ns := setup(t, "completions") closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel() defer cancel()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -389,7 +389,7 @@ func TestIndexedJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "indexed") closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -464,7 +464,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -495,7 +495,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
} }
// Restart controller. // Restart controller.
ctx, cancel = startJobController(restConfig) ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Ensure Job continues to be tracked and finalizers are removed. // Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -522,7 +522,7 @@ func TestDisableJobTrackingWithFinalizers(t *testing.T) {
} }
// Restart controller. // Restart controller.
ctx, cancel = startJobController(restConfig) ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
// Ensure Job continues to be tracked and finalizers are removed. // Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
@ -585,7 +585,7 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -651,7 +651,7 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -683,7 +683,7 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
} }
// Restart controller. // Restart controller.
ctx, cancel = startJobController(restConfig) ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) { if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
@ -733,7 +733,7 @@ func TestSuspendJob(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend") closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel() defer cancel()
events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{}) events, err := clientSet.EventsV1().Events(ns.Name).Watch(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
@ -784,7 +784,7 @@ func TestSuspendJob(t *testing.T) {
func TestSuspendJobControllerRestart(t *testing.T) { func TestSuspendJobControllerRestart(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend") closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() { defer func() {
cancel() cancel()
}() }()
@ -815,7 +815,7 @@ func TestNodeSelectorUpdate(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "suspend") closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig) ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel() defer cancel()
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{ job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
@ -1182,11 +1182,17 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
return closeFn, &config, clientSet, ns return closeFn, &config, clientSet, ns
} }
func startJobController(restConfig *restclient.Config) (context.Context, context.CancelFunc) { func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.Context, context.CancelFunc) {
informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0) informerSet := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(restConfig, "job-informers")), 0)
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
informerSet.Start(ctx.Done()) informerSet.Start(ctx.Done())
go jc.Run(ctx, 1) go jc.Run(ctx, 1)
// since this method starts the controller in a separate goroutine
// and the tests don't check /readyz there is no way
// the tests can tell it is safe to call the server and requests won't be rejected
// thus we wait until caches have synced
informerSet.WaitForCacheSync(ctx.Done())
return ctx, cancel return ctx, cancel
} }