From 7e35823690df01bd019a88d3346bd3ac820afaca Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Mon, 30 Oct 2023 14:24:39 +0100 Subject: [PATCH] cacher: when forgeting a watcher, call stopWatcherLocked multiple times It's possible that the watcher is already not in the structure (e.g. in case of simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() on a watcher multiple times. --- .../apiserver/pkg/storage/cacher/cacher.go | 16 +++--- .../storage/cacher/cacher_whitebox_test.go | 57 +++++++++++++++++++ 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index c7d9390ae76..4f408044197 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) { wm[number] = w } -func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) { - if watcher, ok := wm[number]; ok { - delete(wm, number) - done(watcher) - } +func (wm watchersMap) deleteWatcher(number int) { + delete(wm, number) } func (wm watchersMap) terminateAll(done func(*cacheWatcher)) { @@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac } } -func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) { +func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) { if supported { - i.valueWatchers[value].deleteWatcher(number, done) + i.valueWatchers[value].deleteWatcher(number) if len(i.valueWatchers[value]) == 0 { delete(i.valueWatchers, value) } } else { - i.allWatchers[scope].deleteWatcher(number, done) + i.allWatchers[scope].deleteWatcher(number) if len(i.allWatchers[scope]) == 0 { delete(i.allWatchers, scope) } @@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. - c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked) + c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported) + c.stopWatcherLocked(w) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 11220e721f3..b6d59af079b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -47,6 +47,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/clock" + testingclock "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" ) @@ -1807,3 +1808,59 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { }}, }, true) } + +func TestForgetWatcher(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + require.NoError(t, err) + defer cacher.Stop() + + // wait until cacher is initialized. + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) { + cacher.Lock() + defer cacher.Unlock() + + require.Equal(t, expectedWatchersCounter, len(cacher.watchers.allWatchers)) + require.Equal(t, expectedValueWatchersCounter, len(cacher.watchers.valueWatchers)) + } + assertCacherInternalState(0, 0) + + var forgetWatcherFn func(bool) + var forgetCounter int + forgetWatcherWrapped := func(drainWatcher bool) { + forgetCounter++ + forgetWatcherFn(drainWatcher) + } + w := newCacheWatcher( + 0, + func(_ string, _ labels.Set, _ fields.Set) bool { return true }, + nil, + storage.APIObjectVersioner{}, + testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute), + true, + schema.GroupResource{Resource: "pods"}, + "1", + ) + forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false) + addWatcher := func(w *cacheWatcher) { + cacher.Lock() + defer cacher.Unlock() + + cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false) + } + + addWatcher(w) + assertCacherInternalState(1, 0) + + forgetWatcherWrapped(false) + assertCacherInternalState(0, 0) + require.Equal(t, 1, forgetCounter) + + forgetWatcherWrapped(false) + assertCacherInternalState(0, 0) + require.Equal(t, 2, forgetCounter) +}