diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 4f408044197..c7d9390ae76 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -112,8 +112,11 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { wm[number] = w } -func (wm watchersMap) deleteWatcher(number int) { - delete(wm, number) +func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) { + if watcher, ok := wm[number]; ok { + delete(wm, number) + done(watcher) + } } func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { @@ -144,14 +147,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac } } -func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) { +func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) { if supported { - i.valueWatchers[value].deleteWatcher(number) + i.valueWatchers[value].deleteWatcher(number, done) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { - i.allWatchers[scope].deleteWatcher(number) + i.allWatchers[scope].deleteWatcher(number, done) if len(i.allWatchers[scope]) == 0 { delete(i.allWatchers, scope) } @@ -1220,8 +1223,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. - c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported) - c.stopWatcherLocked(w) + c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 86d178390a2..11220e721f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -47,7 +47,6 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" - testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" ) @@ -1808,45 +1807,3 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { }}, }, true) } - -func TestForgetWatcher(t *testing.T) { - backingStorage := &dummyStorage{} - cacher, _, err := newTestCacher(backingStorage) - require.NoError(t, err) - defer cacher.Stop() - - require.Equal(t, 0, len(cacher.watchers.allWatchers)) - require.Equal(t, 0, len(cacher.watchers.valueWatchers)) - - var forgetWatcherFn func(bool) - var forgetCounter int - forgetWatcherWrapped := func(drainWatcher bool) { - forgetCounter++ - forgetWatcherFn(drainWatcher) - } - w := newCacheWatcher( - 0, - func(_ string, _ labels.Set, _ fields.Set) bool { return true }, - nil, - storage.APIObjectVersioner{}, - testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute), - true, - schema.GroupResource{Resource: "pods"}, - "1", - ) - forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false) - - cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false) - require.Equal(t, 0, len(cacher.watchers.valueWatchers)) - require.Equal(t, 1, len(cacher.watchers.allWatchers)) - - forgetWatcherWrapped(false) - require.Equal(t, 0, len(cacher.watchers.allWatchers)) - require.Equal(t, 0, len(cacher.watchers.valueWatchers)) - require.Equal(t, 1, forgetCounter) - - forgetWatcherWrapped(false) - require.Equal(t, 0, len(cacher.watchers.allWatchers)) - require.Equal(t, 0, len(cacher.watchers.valueWatchers)) - require.Equal(t, 2, forgetCounter) -}