cacher: when forgeting a watcher, call stopWatcherLocked multiple times

It's possible that the watcher is already not in the structure (e.g. in case of
simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
on a watcher multiple times.
This commit is contained in:
Lukasz Szaszkiewicz 2023-10-30 14:24:39 +01:00
parent 38ed3ef7b7
commit bbca4a4b9a
2 changed files with 50 additions and 9 deletions

View File

@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
wm[number] = w
}
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
if watcher, ok := wm[number]; ok {
delete(wm, number)
done(watcher)
}
func (wm watchersMap) deleteWatcher(number int) {
delete(wm, number)
}
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
}
}
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
if supported {
i.valueWatchers[value].deleteWatcher(number, done)
i.valueWatchers[value].deleteWatcher(number)
if len(i.valueWatchers[value]) == 0 {
delete(i.valueWatchers, value)
}
} else {
i.allWatchers[scope].deleteWatcher(number, done)
i.allWatchers[scope].deleteWatcher(number)
if len(i.allWatchers[scope]) == 0 {
delete(i.allWatchers, scope)
}
@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
// It's possible that the watcher is already not in the structure (e.g. in case of
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
// on a watcher multiple times.
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
c.stopWatcherLocked(w)
}
}

View File

@ -47,6 +47,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/pointer"
)
@ -1807,3 +1808,45 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
}},
}, true)
}
func TestForgetWatcher(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
require.NoError(t, err)
defer cacher.Stop()
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
var forgetWatcherFn func(bool)
var forgetCounter int
forgetWatcherWrapped := func(drainWatcher bool) {
forgetCounter++
forgetWatcherFn(drainWatcher)
}
w := newCacheWatcher(
0,
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
nil,
storage.APIObjectVersioner{},
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
true,
schema.GroupResource{Resource: "pods"},
"1",
)
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)
cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, len(cacher.watchers.allWatchers))
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 1, forgetCounter)
forgetWatcherWrapped(false)
require.Equal(t, 0, len(cacher.watchers.allWatchers))
require.Equal(t, 0, len(cacher.watchers.valueWatchers))
require.Equal(t, 2, forgetCounter)
}