Merge pull request #121615 from p0lyn0mial/upstream-cacher-forget-watcher

cacher: when forgeting a watcher, call stopWatcherLocked multiple times
This commit is contained in:
Kubernetes Prow Robot 2023-10-31 13:14:14 +01:00 committed by GitHub
commit 5bac451d1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 50 additions and 9 deletions

View File

@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
}
}

View File

@ -47,6 +47,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
@ -1807,3 +1808,45 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}
func TestForgetWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err)
defer cacher.Stop()
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
var forgetWatcherFn func(bool)
var forgetCounter int
forgetWatcherWrapped := func(drainWatcher bool) {
forgetCounter++
forgetWatcherFn(drainWatcher)
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
true,
schema.GroupResource{Resource: "pods"},
"1",
)
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)
cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, len(cacher.watchers.allWatchers))
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, forgetCounter)
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 2, forgetCounter)
}