Merge pull request #121645 from p0lyn0mial/upstream-fix-race-in-test-forget-watcher

bring back: cacher: when forgeting a watcher, call stopWatcherLocked multiple times
This commit is contained in:
Kubernetes Prow Robot 2023-10-31 22:57:08 +01:00 committed by GitHub
commit 715cd17c0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 64 additions and 9 deletions

View File

@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
}
}

View File

@ -47,6 +47,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
@ -1807,3 +1808,59 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}
func TestForgetWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err)
defer cacher.Stop()
// wait until cacher is initialized.
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) {
cacher.Lock()
defer cacher.Unlock()
require.Equal(t, expectedWatchersCounter, len(cacher.watchers.allWatchers))
require.Equal(t, expectedValueWatchersCounter, len(cacher.watchers.valueWatchers))
}
assertCacherInternalState(0, 0)
var forgetWatcherFn func(bool)
var forgetCounter int
forgetWatcherWrapped := func(drainWatcher bool) {
forgetCounter++
forgetWatcherFn(drainWatcher)
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
true,
schema.GroupResource{Resource: "pods"},
"1",
)
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)
addWatcher := func(w *cacheWatcher) {
cacher.Lock()
defer cacher.Unlock()
cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
}
addWatcher(w)
assertCacherInternalState(1, 0)
forgetWatcherWrapped(false)
assertCacherInternalState(0, 0)
require.Equal(t, 1, forgetCounter)
forgetWatcherWrapped(false)
assertCacherInternalState(0, 0)
require.Equal(t, 2, forgetCounter)
}