diff --git a/test/e2e/storage/BUILD b/test/e2e/storage/BUILD index 56e4bd1f96b..a4361e764ac 100644 --- a/test/e2e/storage/BUILD +++ b/test/e2e/storage/BUILD @@ -66,6 +66,8 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/tools/watch:go_default_library", "//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library", "//staging/src/k8s.io/component-base/metrics/testutil:go_default_library", "//test/e2e/framework:go_default_library", diff --git a/test/e2e/storage/csi_mock_volume.go b/test/e2e/storage/csi_mock_volume.go index 74fad8c24e0..7d706c89db2 100644 --- a/test/e2e/storage/csi_mock_volume.go +++ b/test/e2e/storage/csi_mock_volume.go @@ -39,6 +39,8 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" + cachetools "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/pkg/controller/volume/scheduling" "k8s.io/kubernetes/test/e2e/framework" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" @@ -821,7 +823,20 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { ctx, cancel := context.WithTimeout(context.Background(), csiPodRunningTimeout) defer cancel() - pvcWatch, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, metav1.ListOptions{}) + + // In contrast to the raw watch, RetryWatcher is expected to deliver all events even + // when the underlying raw watch gets closed prematurely + // (https://github.com/kubernetes/kubernetes/pull/93777#discussion_r467932080). + // This is important because below the test is going to make assertions about the + // PVC state changes. + initResource, err := f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to fetch initial PVC resource") + listWatcher := &cachetools.ListWatch{ + WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) { + return f.ClientSet.CoreV1().PersistentVolumeClaims(f.Namespace.Name).Watch(ctx, listOptions) + }, + } + pvcWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher) framework.ExpectNoError(err, "create PVC watch") defer pvcWatch.Stop() @@ -889,12 +904,14 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { ginkgo.By("Checking PVC events") nodeAnnotationSet := false nodeAnnotationReset := false + watchFailed := false loop: for { select { case event, ok := <-pvcWatch.ResultChan(): if !ok { - framework.Failf("PVC watch ended prematurely") + watchFailed = true + break loop } framework.Logf("PVC event %s: %#v", event.Type, event.Object) @@ -913,10 +930,8 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { case watch.Deleted: break loop case watch.Error: - // Can this occur when the apiserver is under heavy load? - // If yes, then we should bail out of the test here early and - // skip further checks instead of treating it as a test failure. - framework.Failf("PVC watch failed prematurely: %v", event.Object) + watchFailed = true + break } case <-ctx.Done(): framework.Failf("Timeout while waiting to observe PVC list") @@ -932,7 +947,13 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } } - if test.lateBinding { + switch { + case watchFailed: + // If the watch failed or stopped prematurely (which can happen at any time), then we cannot + // verify whether the annotation was set as expected. This is still considered a successful + // test. + framework.Logf("PVC watch delivered incomplete data, cannot check annotation") + case test.lateBinding: gomega.Expect(nodeAnnotationSet).To(gomega.BeTrue(), "selected-node should have been set") // Whether it gets reset depends on whether we have topology enabled. Without // it, rescheduling is unnecessary. @@ -941,7 +962,7 @@ var _ = utils.SIGDescribe("CSI mock volume", func() { } else { gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") } - } else { + default: gomega.Expect(nodeAnnotationSet).To(gomega.BeFalse(), "selected-node should not have been set") gomega.Expect(nodeAnnotationReset).To(gomega.BeFalse(), "selected-node should not have been reset") }