mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-02 00:07:50 +00:00
Add api-machinery 'watch-consistency' e2e test
This commit is contained in:
parent
ef195428a0
commit
f87d2c61f3
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package apimachinery
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
@ -314,6 +316,49 @@ var _ = SIGDescribe("Watchers", func() {
|
||||
expectEvent(testWatch, watch.Modified, testConfigMapThirdUpdate)
|
||||
expectEvent(testWatch, watch.Deleted, nil)
|
||||
})
|
||||
|
||||
/*
|
||||
Testname: watch-consistency
|
||||
Description: Ensure that concurrent watches are consistent with each other by initiating an additional watch
|
||||
for events received from the first watch, initiated at the resource version of the event, and checking that all
|
||||
resource versions of all events match. Events are produced from writes on a background goroutine.
|
||||
*/
|
||||
It("should receive events on concurrent watches in same order", func() {
|
||||
c := f.ClientSet
|
||||
ns := f.Namespace.Name
|
||||
|
||||
iterations := 100
|
||||
|
||||
By("starting a background goroutine to produce watch events")
|
||||
donec := make(chan struct{})
|
||||
stopc := make(chan struct{})
|
||||
go func() {
|
||||
defer GinkgoRecover()
|
||||
defer close(donec)
|
||||
produceConfigMapEvents(f, stopc, 5*time.Millisecond)
|
||||
}()
|
||||
|
||||
By("creating watches starting from each resource version of the events produced and verifying they all receive resource versions in the same order")
|
||||
wcs := []watch.Interface{}
|
||||
resourceVersion := "0"
|
||||
for i := 0; i < iterations; i++ {
|
||||
wc, err := c.CoreV1().ConfigMaps(ns).Watch(metav1.ListOptions{ResourceVersion: resourceVersion})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
wcs = append(wcs, wc)
|
||||
resourceVersion = waitForNextConfigMapEvent(wcs[0]).ResourceVersion
|
||||
for _, wc := range wcs[1:] {
|
||||
e := waitForNextConfigMapEvent(wc)
|
||||
if resourceVersion != e.ResourceVersion {
|
||||
framework.Failf("resource version mismatch, expected %s but got %s", resourceVersion, e.ResourceVersion)
|
||||
}
|
||||
}
|
||||
}
|
||||
close(stopc)
|
||||
for _, wc := range wcs {
|
||||
wc.Stop()
|
||||
}
|
||||
<-donec
|
||||
})
|
||||
})
|
||||
|
||||
func watchConfigMaps(f *framework.Framework, resourceVersion string, labels ...string) (watch.Interface, error) {
|
||||
@ -381,3 +426,70 @@ func waitForEvent(w watch.Interface, expectType watch.EventType, expectObject ru
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func waitForNextConfigMapEvent(watch watch.Interface) *v1.ConfigMap {
|
||||
select {
|
||||
case event := <-watch.ResultChan():
|
||||
if configMap, ok := event.Object.(*v1.ConfigMap); ok {
|
||||
return configMap
|
||||
} else {
|
||||
framework.Failf("expected config map")
|
||||
}
|
||||
case <-time.After(10 * time.Second):
|
||||
framework.Failf("timed out waiting for watch event")
|
||||
}
|
||||
return nil // should never happen
|
||||
}
|
||||
|
||||
const (
|
||||
createEvent = iota
|
||||
updateEvent
|
||||
deleteEvent
|
||||
)
|
||||
|
||||
func produceConfigMapEvents(f *framework.Framework, stopc <-chan struct{}, minWaitBetweenEvents time.Duration) {
|
||||
c := f.ClientSet
|
||||
ns := f.Namespace.Name
|
||||
|
||||
name := func(i int) string {
|
||||
return fmt.Sprintf("cm-%d", i)
|
||||
}
|
||||
|
||||
existing := []int{}
|
||||
tc := time.NewTicker(minWaitBetweenEvents)
|
||||
defer tc.Stop()
|
||||
i := 0
|
||||
for range tc.C {
|
||||
op := rand.Intn(3)
|
||||
if len(existing) == 0 {
|
||||
op = createEvent
|
||||
}
|
||||
|
||||
cm := &v1.ConfigMap{}
|
||||
switch op {
|
||||
case createEvent:
|
||||
cm.Name = name(i)
|
||||
_, err := c.CoreV1().ConfigMaps(ns).Create(cm)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
existing = append(existing, i)
|
||||
i += 1
|
||||
case updateEvent:
|
||||
idx := rand.Intn(len(existing))
|
||||
cm.Name = name(existing[idx])
|
||||
_, err := c.CoreV1().ConfigMaps(ns).Update(cm)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
case deleteEvent:
|
||||
idx := rand.Intn(len(existing))
|
||||
err := c.CoreV1().ConfigMaps(ns).Delete(name(existing[idx]), &metav1.DeleteOptions{})
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
existing = append(existing[:idx], existing[idx+1:]...)
|
||||
default:
|
||||
framework.Failf("Unsupported event operation: %d", op)
|
||||
}
|
||||
select {
|
||||
case <-stopc:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user