From a530a6898a36d43eef766f041eab4e4b6d5adedb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Thu, 28 Jul 2022 10:20:23 +0200 Subject: [PATCH] Fix draining cacher tests --- .../storage/cacher/cacher_whitebox_test.go | 40 ++++++++++++++----- 1 file changed, 29 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 7d57855a213..883571e37d6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1586,6 +1586,19 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) { } } +func makeWatchCacheEvent(rv uint64) *watchCacheEvent { + return &watchCacheEvent{ + Type: watch.Added, + Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + ResourceVersion: fmt.Sprintf("%d", rv), + }, + }, + ResourceVersion: rv, + } +} + // TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested func TestCacheWatcherDraining(t *testing.T) { var lock sync.RWMutex @@ -1600,18 +1613,23 @@ func TestCacheWatcherDraining(t *testing.T) { w.stopLocked() } initEvents := []*watchCacheEvent{ - {Object: &v1.Pod{}}, - {Object: &v1.Pod{}}, + makeWatchCacheEvent(5), + makeWatchCacheEvent(6), } w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") - go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) - if !w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) + if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") } forget(true) // drain the watcher - <-w.ResultChan() - <-w.ResultChan() - <-w.ResultChan() + + eventCount := 0 + for range w.ResultChan() { + eventCount++ + } + if eventCount != 3 { + t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount) + } if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() defer lock.RUnlock() @@ -1636,12 +1654,12 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { w.stopLocked() } initEvents := []*watchCacheEvent{ - {Object: &v1.Pod{}}, - {Object: &v1.Pod{}}, + makeWatchCacheEvent(5), + makeWatchCacheEvent(6), } w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") - go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) - if !w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { + go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) + if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") } forget(true) // drain the watcher