Merge pull request #101950 from liggitt/watch-order

Make watch order conformance test reliable
This commit is contained in:
Kubernetes Prow Robot 2021-05-12 17:26:24 -07:00 committed by GitHub
commit e6e6536c1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -27,6 +27,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
cachetools "k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
"github.com/onsi/ginkgo" "github.com/onsi/ginkgo"
@ -336,6 +338,11 @@ var _ = SIGDescribe("Watchers", func() {
iterations := 100 iterations := 100
ginkgo.By("getting a starting resourceVersion")
configmaps, err := c.CoreV1().ConfigMaps(ns).List(context.TODO(), metav1.ListOptions{})
framework.ExpectNoError(err, "Failed to list configmaps in the namespace %s", ns)
resourceVersion := configmaps.ResourceVersion
ginkgo.By("starting a background goroutine to produce watch events") ginkgo.By("starting a background goroutine to produce watch events")
donec := make(chan struct{}) donec := make(chan struct{})
stopc := make(chan struct{}) stopc := make(chan struct{})
@ -345,11 +352,16 @@ var _ = SIGDescribe("Watchers", func() {
produceConfigMapEvents(f, stopc, 5*time.Millisecond) produceConfigMapEvents(f, stopc, 5*time.Millisecond)
}() }()
listWatcher := &cachetools.ListWatch{
WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
return c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), listOptions)
},
}
ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order") ginkgo.By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
wcs := []watch.Interface{} wcs := []watch.Interface{}
resourceVersion := "0"
for i := 0; i < iterations; i++ { for i := 0; i < iterations; i++ {
wc, err := c.CoreV1().ConfigMaps(ns).Watch(context.TODO(), metav1.ListOptions{ResourceVersion: resourceVersion}) wc, err := watchtools.NewRetryWatcher(resourceVersion, listWatcher)
framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns) framework.ExpectNoError(err, "Failed to watch configmaps in the namespace %s", ns)
wcs = append(wcs, wc) wcs = append(wcs, wc)
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
@ -436,11 +448,14 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru
func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap { func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
select { select {
case event := <-watch.ResultChan(): case event, ok := <-watch.ResultChan():
if !ok {
framework.Failf("Watch closed unexpectedly")
}
if configMap, ok := event.Object.(*v1.ConfigMap); ok { if configMap, ok := event.Object.(*v1.ConfigMap); ok {
return configMap return configMap
} }
framework.Failf("expected config map") framework.Failf("expected config map, got %T", event.Object)
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
framework.Failf("timed out waiting for watch event") framework.Failf("timed out waiting for watch event")
} }