diff --git a/test/e2e/apimachinery/watch.go b/test/e2e/apimachinery/watch.go index 2ca292bda2c..4709f9c0418 100644 --- a/test/e2e/apimachinery/watch.go +++ b/test/e2e/apimachinery/watch.go @@ -17,6 +17,8 @@ limitations under the License. package apimachinery import ( + "fmt" + "math/rand" "time" "k8s.io/api/core/v1" @@ -314,6 +316,49 @@ var _ = SIGDescribe("Watchers", func() { expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate) expectEvent(testWatch, watch.Deleted, nil) }) + + /* + Testname: watch-consistency + Description: Ensure that concurrent watches are consistent with each other by initiating an additional watch + for events received from the first watch, initiated at the resource version of the event, and checking that all + resource versions of all events match. Events are produced from writes on a background goroutine. + */ + It("should receive events on concurrent watches in same order", func() { + c := f.ClientSet + ns := f.Namespace.Name + + iterations := 100 + + By("starting a background goroutine to produce watch events") + donec := make(chan struct{}) + stopc := make(chan struct{}) + go func() { + defer GinkgoRecover() + defer close(donec) + produceConfigMapEvents(f, stopc, 5*time.Millisecond) + }() + + By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order") + wcs := []watch.Interface{} + resourceVersion := "0" + for i := 0; i < iterations; i++ { + wc, err := c.CoreV1().ConfigMaps(ns).Watch(metav1.ListOptions{ResourceVersion: resourceVersion}) + Expect(err).NotTo(HaveOccurred()) + wcs = append(wcs, wc) + resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion + for _, wc := range wcs[1:] { + e := waitForNextConfigMapEvent(wc) + if resourceVersion != e.ResourceVersion { + framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion) + } + } + } + close(stopc) + for _, wc := range wcs { + wc.Stop() + } + <-donec + }) }) func watchConfigMaps(f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) { @@ -381,3 +426,70 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru } } } + +func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap { + select { + case event := <-watch.ResultChan(): + if configMap, ok := event.Object.(*v1.ConfigMap); ok { + return configMap + } else { + framework.Failf("expected config map") + } + case <-time.After(10 * time.Second): + framework.Failf("timed out waiting for watch event") + } + return nil // should never happen +} + +const ( + createEvent = iota + updateEvent + deleteEvent +) + +func produceConfigMapEvents(f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) { + c := f.ClientSet + ns := f.Namespace.Name + + name := func(i int) string { + return fmt.Sprintf("cm-%d", i) + } + + existing := []int{} + tc := time.NewTicker(minWaitBetweenEvents) + defer tc.Stop() + i := 0 + for range tc.C { + op := rand.Intn(3) + if len(existing) == 0 { + op = createEvent + } + + cm := &v1.ConfigMap{} + switch op { + case createEvent: + cm.Name = name(i) + _, err := c.CoreV1().ConfigMaps(ns).Create(cm) + Expect(err).NotTo(HaveOccurred()) + existing = append(existing, i) + i += 1 + case updateEvent: + idx := rand.Intn(len(existing)) + cm.Name = name(existing[idx]) + _, err := c.CoreV1().ConfigMaps(ns).Update(cm) + Expect(err).NotTo(HaveOccurred()) + case deleteEvent: + idx := rand.Intn(len(existing)) + err := c.CoreV1().ConfigMaps(ns).Delete(name(existing[idx]), &metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + existing = append(existing[:idx], existing[idx+1:]...) + default: + framework.Failf("Unsupported event operation: %d", op) + } + select { + case <-stopc: + return + default: + } + } +}