diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index c7d9390ae76..4f408044197 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { wm[number] = w } -func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) { - if watcher, ok := wm[number]; ok { - delete(wm, number) - done(watcher) - } +func (wm watchersMap) deleteWatcher(number int) { + delete(wm, number) } func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { @@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac } } -func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) { +func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) { if supported { - i.valueWatchers[value].deleteWatcher(number, done) + i.valueWatchers[value].deleteWatcher(number) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { - i.allWatchers[scope].deleteWatcher(number, done) + i.allWatchers[scope].deleteWatcher(number) if len(i.allWatchers[scope]) == 0 { delete(i.allWatchers, scope) } @@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. - c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked) + c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported) + c.stopWatcherLocked(w) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 11220e721f3..86d178390a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -47,6 +47,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" ) @@ -1807,3 +1808,45 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { }}, }, true) } + +func TestForgetWatcher(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + require.NoError(t, err) + defer cacher.Stop() + + require.Equal(t, 0, len(cacher.watchers.allWatchers)) + require.Equal(t, 0, len(cacher.watchers.valueWatchers)) + + var forgetWatcherFn func(bool) + var forgetCounter int + forgetWatcherWrapped := func(drainWatcher bool) { + forgetCounter++ + forgetWatcherFn(drainWatcher) + } + w := newCacheWatcher( + 0, + func(_ string, _ labels.Set, _ fields.Set) bool { return true }, + nil, + storage.APIObjectVersioner{}, + testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute), + true, + schema.GroupResource{Resource: "pods"}, + "1", + ) + forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false) + + cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false) + require.Equal(t, 0, len(cacher.watchers.valueWatchers)) + require.Equal(t, 1, len(cacher.watchers.allWatchers)) + + forgetWatcherWrapped(false) + require.Equal(t, 0, len(cacher.watchers.allWatchers)) + require.Equal(t, 0, len(cacher.watchers.valueWatchers)) + require.Equal(t, 1, forgetCounter) + + forgetWatcherWrapped(false) + require.Equal(t, 0, len(cacher.watchers.allWatchers)) + require.Equal(t, 0, len(cacher.watchers.valueWatchers)) + require.Equal(t, 2, forgetCounter) +}