Fix draining cacher tests

This commit is contained in:
Wojciech Tyczyński 2022-07-28 10:20:23 +02:00
parent debace151c
commit a530a6898a

View File

@ -1586,6 +1586,19 @@ func TestCacheIntervalInvalidationStopsWatch(t *testing.T) {
} }
} }
func makeWatchCacheEvent(rv uint64) *watchCacheEvent {
return &watchCacheEvent{
Type: watch.Added,
Object: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
ResourceVersion: fmt.Sprintf("%d", rv),
},
},
ResourceVersion: rv,
}
}
// TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested // TestCacheWatcherDraining verifies the cacheWatcher.process goroutine is properly cleaned up when draining was requested
func TestCacheWatcherDraining(t *testing.T) { func TestCacheWatcherDraining(t *testing.T) {
var lock sync.RWMutex var lock sync.RWMutex
@ -1600,18 +1613,23 @@ func TestCacheWatcherDraining(t *testing.T) {
w.stopLocked() w.stopLocked()
} }
initEvents := []*watchCacheEvent{ initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}}, makeWatchCacheEvent(5),
{Object: &v1.Pod{}}, makeWatchCacheEvent(6),
} }
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher") t.Fatal("failed adding an even to the watcher")
} }
forget(true) // drain the watcher forget(true) // drain the watcher
<-w.ResultChan()
<-w.ResultChan() eventCount := 0
<-w.ResultChan() for range w.ResultChan() {
eventCount++
}
if eventCount != 3 {
t.Errorf("Unexpected number of objects received: %d, expected: 3", eventCount)
}
if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) {
lock.RLock() lock.RLock()
defer lock.RUnlock() defer lock.RUnlock()
@ -1636,12 +1654,12 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) {
w.stopLocked() w.stopLocked()
} }
initEvents := []*watchCacheEvent{ initEvents := []*watchCacheEvent{
{Object: &v1.Pod{}}, makeWatchCacheEvent(5),
{Object: &v1.Pod{}}, makeWatchCacheEvent(6),
} }
w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "") w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, objectType, "")
go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1)
if !w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) {
t.Fatal("failed adding an even to the watcher") t.Fatal("failed adding an even to the watcher")
} }
forget(true) // drain the watcher forget(true) // drain the watcher