mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-12 13:31:52 +00:00
cacher: when forgeting a watcher, call stopWatcherLocked multiple times
It's possible that the watcher is already not in the structure (e.g. in case of simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() on a watcher multiple times.
This commit is contained in:
parent
0294521985
commit
7e35823690
@ -112,11 +112,8 @@ func (wm watchersMap) addWatcher(w *cacheWatcher, number int) {
|
||||
wm[number] = w
|
||||
}
|
||||
|
||||
func (wm watchersMap) deleteWatcher(number int, done func(*cacheWatcher)) {
|
||||
if watcher, ok := wm[number]; ok {
|
||||
delete(wm, number)
|
||||
done(watcher)
|
||||
}
|
||||
func (wm watchersMap) deleteWatcher(number int) {
|
||||
delete(wm, number)
|
||||
}
|
||||
|
||||
func (wm watchersMap) terminateAll(done func(*cacheWatcher)) {
|
||||
@ -147,14 +144,14 @@ func (i *indexedWatchers) addWatcher(w *cacheWatcher, number int, scope namespac
|
||||
}
|
||||
}
|
||||
|
||||
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool, done func(*cacheWatcher)) {
|
||||
func (i *indexedWatchers) deleteWatcher(number int, scope namespacedName, value string, supported bool) {
|
||||
if supported {
|
||||
i.valueWatchers[value].deleteWatcher(number, done)
|
||||
i.valueWatchers[value].deleteWatcher(number)
|
||||
if len(i.valueWatchers[value]) == 0 {
|
||||
delete(i.valueWatchers, value)
|
||||
}
|
||||
} else {
|
||||
i.allWatchers[scope].deleteWatcher(number, done)
|
||||
i.allWatchers[scope].deleteWatcher(number)
|
||||
if len(i.allWatchers[scope]) == 0 {
|
||||
delete(i.allWatchers, scope)
|
||||
}
|
||||
@ -1223,7 +1220,8 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
|
||||
// It's possible that the watcher is already not in the structure (e.g. in case of
|
||||
// simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked()
|
||||
// on a watcher multiple times.
|
||||
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported, c.stopWatcherLocked)
|
||||
c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported)
|
||||
c.stopWatcherLocked(w)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,6 +47,7 @@ import (
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/utils/clock"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@ -1807,3 +1808,59 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
|
||||
}},
|
||||
}, true)
|
||||
}
|
||||
|
||||
func TestForgetWatcher(t *testing.T) {
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
require.NoError(t, err)
|
||||
defer cacher.Stop()
|
||||
|
||||
// wait until cacher is initialized.
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||
}
|
||||
|
||||
assertCacherInternalState := func(expectedWatchersCounter, expectedValueWatchersCounter int) {
|
||||
cacher.Lock()
|
||||
defer cacher.Unlock()
|
||||
|
||||
require.Equal(t, expectedWatchersCounter, len(cacher.watchers.allWatchers))
|
||||
require.Equal(t, expectedValueWatchersCounter, len(cacher.watchers.valueWatchers))
|
||||
}
|
||||
assertCacherInternalState(0, 0)
|
||||
|
||||
var forgetWatcherFn func(bool)
|
||||
var forgetCounter int
|
||||
forgetWatcherWrapped := func(drainWatcher bool) {
|
||||
forgetCounter++
|
||||
forgetWatcherFn(drainWatcher)
|
||||
}
|
||||
w := newCacheWatcher(
|
||||
0,
|
||||
func(_ string, _ labels.Set, _ fields.Set) bool { return true },
|
||||
nil,
|
||||
storage.APIObjectVersioner{},
|
||||
testingclock.NewFakeClock(time.Now()).Now().Add(2*time.Minute),
|
||||
true,
|
||||
schema.GroupResource{Resource: "pods"},
|
||||
"1",
|
||||
)
|
||||
forgetWatcherFn = forgetWatcher(cacher, w, 0, namespacedName{}, "", false)
|
||||
addWatcher := func(w *cacheWatcher) {
|
||||
cacher.Lock()
|
||||
defer cacher.Unlock()
|
||||
|
||||
cacher.watchers.addWatcher(w, 0, namespacedName{}, "", false)
|
||||
}
|
||||
|
||||
addWatcher(w)
|
||||
assertCacherInternalState(1, 0)
|
||||
|
||||
forgetWatcherWrapped(false)
|
||||
assertCacherInternalState(0, 0)
|
||||
require.Equal(t, 1, forgetCounter)
|
||||
|
||||
forgetWatcherWrapped(false)
|
||||
assertCacherInternalState(0, 0)
|
||||
require.Equal(t, 2, forgetCounter)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user