diff --git a/test/e2e/apimachinery/watch.go b/test/e2e/apimachinery/watch.go index 25911bfe734..ec92fc0cfa6 100644 --- a/test/e2e/apimachinery/watch.go +++ b/test/e2e/apimachinery/watch.go @@ -27,6 +27,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + cachetools "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" "k8s.io/kubernetes/test/e2e/framework" "github.com/onsi/ginkgo" @@ -336,6 +338,11 @@ var _ = SIGDescribe("Watchers", func() { iterations := 100 + ginkgo.By("getting a starting resourceVersion") + configmaps, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns) + resourceVersion := configmaps.ResourceVersion + ginkgo.By("starting a background goroutine to produce watch events") donec := make(chan struct{}) stopc := make(chan struct{}) @@ -345,11 +352,16 @@ var _ = SIGDescribe("Watchers", func() { produceConfigMapEvents(f, stopc, 5*time.Millisecond) }() + listWatcher := &cachetools.ListWatch{ + WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) { + return c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), listOptions) + }, + } + ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order") wcs := []watch.Interface{} - resourceVersion := "0" for i := 0; i < iterations; i++ { - wc, err := c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion}) + wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher) framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns) wcs = append(wcs, wc) resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion @@ -436,11 +448,14 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap { select { - case event := <-watch.ResultChan(): + case event, ok := <-watch.ResultChan(): + if !ok { + framework.Failf("Watch closed unexpectedly") + } if configMap, ok := event.Object.(*v1.ConfigMap); ok { return configMap } - framework.Failf("expected config map") + framework.Failf("expected config map, got %T", event.Object) case <-time.After(10 * time.Second): framework.Failf("timed out waiting for watch event") }