diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 925ef0d2c69..e589a97356f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -862,7 +862,8 @@ func (c *Cacher) startDispatchingBookmarkEvents() { // as we don't delete watcher from bookmarkWatchers when it is stopped. for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() { for _, watcher := range watchers { - // watcher.stop() is protected by c.Lock() + // c.Lock() is held here. + // watcher.stopThreadUnsafe() is protected by c.Lock() if watcher.stopped { continue } @@ -932,7 +933,7 @@ func (c *Cacher) finishDispatching() { defer c.Unlock() c.dispatching = false for _, watcher := range c.watchersToStop { - watcher.stop() + watcher.stopThreadUnsafe() } c.watchersToStop = c.watchersToStop[:0] } @@ -947,7 +948,7 @@ func (c *Cacher) stopWatcherThreadUnsafe(watcher *cacheWatcher) { if c.dispatching { c.watchersToStop = append(c.watchersToStop, watcher) } else { - watcher.stop() + watcher.stopThreadUnsafe() } } @@ -977,7 +978,7 @@ func forgetWatcher(c *Cacher, index int, triggerValue string, triggerSupported b defer c.Unlock() // It's possible that the watcher is already not in the structure (e.g. in case of - // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stop() + // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopThreadUnsafe() // on a watcher multiple times. c.watchers.deleteWatcher(index, triggerValue, triggerSupported, c.stopWatcherThreadUnsafe) } @@ -1079,8 +1080,8 @@ func (c *errWatcher) Stop() { } // cacheWatcher implements watch.Interface +// this is not thread-safe type cacheWatcher struct { - sync.Mutex input chan *watchCacheEvent result chan watch.Event done chan struct{} @@ -1121,12 +1122,8 @@ func (c *cacheWatcher) Stop() { c.forget() } -// TODO(#73958) -// stop() is protected by Cacher.Lock(), rename it to -// stopThreadUnsafe and remove the sync.Mutex. -func (c *cacheWatcher) stop() { - c.Lock() - defer c.Unlock() +// we rely on the fact that stopThredUnsafe is actually protected by Cacher.Lock() +func (c *cacheWatcher) stopThreadUnsafe() { if !c.stopped { c.stopped = true close(c.done) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index fa2c921920b..8663e8ccc0e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -63,7 +63,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // forget() has to stop the watcher, as only stopping the watcher // triggers stopping the process() goroutine which we are in the // end waiting for in this test. - w.stop() + w.stopThreadUnsafe() } initEvents := []*watchCacheEvent{ {Object: &v1.Pod{}}, @@ -472,7 +472,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { done := make(chan struct{}) filter := func(string, labels.Set, fields.Set) bool { return true } forget := func() { - w.stop() + w.stopThreadUnsafe() done <- struct{}{} }