Make watch order conformance test reliable

This commit is contained in:
Jordan Liggitt 2021-05-12 08:37:20 -04:00
parent 6768ac8115
commit 630573adac

View File

@ -27,6 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
@ -336,6 +338,11 @@ var _ = SIGDescribe("Watchers", func() {
iterations := 100 iterations := 100
ginkgo.By("getting a starting resourceVersion")
configmaps, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
resourceVersion := configmaps.ResourceVersion
ginkgo.By("starting a background goroutine to produce watch events") ginkgo.By("starting a background goroutine to produce watch events")
donec := make(chan struct{}) donec := make(chan struct{})
stopc := make(chan struct{}) stopc := make(chan struct{})
@ -345,11 +352,16 @@ var _ = SIGDescribe("Watchers", func() {
produceConfigMapEvents(f, stopc, 5*time.Millisecond) produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}() }()
listWatcher := &cachetools.ListWatch{
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
return c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), listOptions)
},
}
ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order") ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{} wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ { for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion}) wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns) framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
wcs = append(wcs, wc) wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
@ -436,11 +448,14 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru
func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap { func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select { select {
case event := <-watch.ResultChan(): case event, ok := <-watch.ResultChan():
if !ok {
framework.Failf("Watch closed unexpectedly")
}
if configMap, ok := event.Object.(*v1.ConfigMap); ok { if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap return configMap
} }
framework.Failf("expected config map") framework.Failf("expected config map, got %T", event.Object)
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event") framework.Failf("timed out waiting for watch event")
} }