diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go index 76b1dfd195c..d58ecfdfe4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -359,30 +359,30 @@ func TestTimeBucketWatchersBasic(t *testing.T) { clock := testingclock.NewFakeClock(time.Now()) watchers := newTimeBucketWatchers(clock, defaultBookmarkFrequency) now := clock.Now() - watchers.addWatcher(newWatcher(now.Add(10 * time.Second))) - watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) - watchers.addWatcher(newWatcher(now.Add(20 * time.Second))) + watchers.addWatcherThreadUnsafe(newWatcher(now.Add(10 * time.Second))) + watchers.addWatcherThreadUnsafe(newWatcher(now.Add(20 * time.Second))) + watchers.addWatcherThreadUnsafe(newWatcher(now.Add(20 * time.Second))) if len(watchers.watchersBuckets) != 2 { t.Errorf("unexpected bucket size: %#v", watchers.watchersBuckets) } - watchers0 := watchers.popExpiredWatchers() + watchers0 := watchers.popExpiredWatchersThreadUnsafe() if len(watchers0) != 0 { t.Errorf("unexpected bucket size: %#v", watchers0) } clock.Step(10 * time.Second) - watchers1 := watchers.popExpiredWatchers() + watchers1 := watchers.popExpiredWatchersThreadUnsafe() if len(watchers1) != 1 || len(watchers1[0]) != 1 { t.Errorf("unexpected bucket size: %v", watchers1) } - watchers1 = watchers.popExpiredWatchers() + watchers1 = watchers.popExpiredWatchersThreadUnsafe() if len(watchers1) != 0 { t.Errorf("unexpected bucket size: %#v", watchers1) } clock.Step(12 * time.Second) - watchers2 := watchers.popExpiredWatchers() + watchers2 := watchers.popExpiredWatchersThreadUnsafe() if len(watchers2) != 1 || len(watchers2[0]) != 2 { t.Errorf("unexpected bucket size: %#v", watchers2) } @@ -603,49 +603,49 @@ func TestBookmarkAfterResourceVersionWatchers(t *testing.T) { clock := testingclock.NewFakeClock(time.Now()) target := newTimeBucketWatchers(clock, defaultBookmarkFrequency) - if !target.addWatcher(newWatcher("1", clock.Now().Add(2*time.Minute))) { + if !target.addWatcherThreadUnsafe(newWatcher("1", clock.Now().Add(2*time.Minute))) { t.Fatal("failed adding an even to the watcher") } // the watcher is immediately expired (it's waiting for bookmark, so it is scheduled immediately) - ret := target.popExpiredWatchers() + ret := target.popExpiredWatchersThreadUnsafe() if len(ret) != 1 || len(ret[0]) != 1 { t.Fatalf("expected only one watcher to be expired") } - if !target.addWatcher(ret[0][0]) { + if !target.addWatcherThreadUnsafe(ret[0][0]) { t.Fatal("failed adding an even to the watcher") } // after one second time the watcher is still expired clock.Step(1 * time.Second) - ret = target.popExpiredWatchers() + ret = target.popExpiredWatchersThreadUnsafe() if len(ret) != 1 || len(ret[0]) != 1 { t.Fatalf("expected only one watcher to be expired") } - if !target.addWatcher(ret[0][0]) { + if !target.addWatcherThreadUnsafe(ret[0][0]) { t.Fatal("failed adding an even to the watcher") } // after 29 seconds the watcher is still expired clock.Step(29 * time.Second) - ret = target.popExpiredWatchers() + ret = target.popExpiredWatchersThreadUnsafe() if len(ret) != 1 || len(ret[0]) != 1 { t.Fatalf("expected only one watcher to be expired") } // after confirming the watcher is not expired immediately ret[0][0].markBookmarkAfterRvAsReceived(&watchCacheEvent{Type: watch.Bookmark, ResourceVersion: 10, Object: &v1.Pod{}}) - if !target.addWatcher(ret[0][0]) { + if !target.addWatcherThreadUnsafe(ret[0][0]) { t.Fatal("failed adding an even to the watcher") } clock.Step(30 * time.Second) - ret = target.popExpiredWatchers() + ret = target.popExpiredWatchersThreadUnsafe() if len(ret) != 0 { t.Fatalf("didn't expect any watchers to be expired") } clock.Step(30 * time.Second) - ret = target.popExpiredWatchers() + ret = target.popExpiredWatchersThreadUnsafe() if len(ret) != 1 || len(ret[0]) != 1 { t.Fatalf("expected only one watcher to be expired") } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index c5f27b19ced..aee122521af 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -184,7 +184,6 @@ func (i *indexedWatchers) terminateAll(groupResource schema.GroupResource, done // second in a bucket, and pop up them once at the timeout. To be more specific, // if you set fire time at X, you can get the bookmark within (X-1,X+1) period. type watcherBookmarkTimeBuckets struct { - lock sync.Mutex // the key of watcherBuckets is the number of seconds since createTime watchersBuckets map[int64][]*cacheWatcher createTime time.Time @@ -205,7 +204,7 @@ func newTimeBucketWatchers(clock clock.Clock, bookmarkFrequency time.Duration) * // adds a watcher to the bucket, if the deadline is before the start, it will be // added to the first one. -func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { +func (t *watcherBookmarkTimeBuckets) addWatcherThreadUnsafe(w *cacheWatcher) bool { // note that the returned time can be before t.createTime, // especially in cases when the nextBookmarkTime method // give us the zero value of type Time @@ -215,8 +214,6 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { return false } bucketID := int64(nextTime.Sub(t.createTime) / time.Second) - t.lock.Lock() - defer t.lock.Unlock() if bucketID < t.startBucketID { bucketID = t.startBucketID } @@ -225,12 +222,10 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { return true } -func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher { +func (t *watcherBookmarkTimeBuckets) popExpiredWatchersThreadUnsafe() [][]*cacheWatcher { currentBucketID := int64(t.clock.Since(t.createTime) / time.Second) // There should be one or two elements in almost all cases expiredWatchers := make([][]*cacheWatcher, 0, 2) - t.lock.Lock() - defer t.lock.Unlock() for ; t.startBucketID <= currentBucketID; t.startBucketID++ { if watchers, ok := t.watchersBuckets[t.startBucketID]; ok { delete(t.watchersBuckets, t.startBucketID) @@ -328,6 +323,7 @@ type Cacher struct { // dispatching that event to avoid race with closing channels in watchers. watchersToStop []*cacheWatcher // Maintain a timeout queue to send the bookmark event before the watcher times out. + // Note that this field when accessed MUST be protected by the Cacher.lock. bookmarkWatchers *watcherBookmarkTimeBuckets // expiredBookmarkWatchers is a list of watchers that were expired and need to be schedule for a next bookmark event expiredBookmarkWatchers []*cacheWatcher @@ -647,7 +643,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions // Add it to the queue only when the client support watch bookmarks. if watcher.allowWatchBookmarks { - c.bookmarkWatchers.addWatcher(watcher) + c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) } c.watcherIdx++ }() @@ -927,8 +923,12 @@ func (c *Cacher) dispatchEvents() { // Never send a bookmark event if we did not see an event here, this is fine // because we don't provide any guarantees on sending bookmarks. if lastProcessedResourceVersion == 0 { - // pop expired watchers in case there has been no update - c.bookmarkWatchers.popExpiredWatchers() + func() { + c.Lock() + defer c.Unlock() + // pop expired watchers in case there has been no update + c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() + }() continue } bookmarkEvent := &watchCacheEvent{ @@ -1050,7 +1050,7 @@ func (c *Cacher) dispatchEvent(event *watchCacheEvent) { func (c *Cacher) startDispatchingBookmarkEventsLocked() { // Pop already expired watchers. However, explicitly ignore stopped ones, // as we don't delete watcher from bookmarkWatchers when it is stopped. - for _, watchers := range c.bookmarkWatchers.popExpiredWatchers() { + for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() { for _, watcher := range watchers { // c.Lock() is held here. // watcher.stopThreadUnsafe() is protected by c.Lock() @@ -1155,7 +1155,7 @@ func (c *Cacher) finishDispatching() { continue } // requeue the watcher for the next bookmark if needed. - c.bookmarkWatchers.addWatcher(watcher) + c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) } c.expiredBookmarkWatchers = c.expiredBookmarkWatchers[:0] } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 0624d5e78e9..24e0da48b25 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -686,7 +686,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { case <-stopCh: return default: - cacher.bookmarkWatchers.popExpiredWatchers() + cacher.Lock() + cacher.bookmarkWatchers.popExpiredWatchersThreadUnsafe() + cacher.Unlock() } } }() @@ -700,9 +702,9 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { // wait out the expiration period and pop expired watchers time.Sleep(2 * time.Second) - cacher.bookmarkWatchers.popExpiredWatchers() - cacher.bookmarkWatchers.lock.Lock() - defer cacher.bookmarkWatchers.lock.Unlock() + cacher.Lock() + defer cacher.Unlock() + cacher.bookmarkWatchers.popExpiredWatchersThreadUnsafe() if len(cacher.bookmarkWatchers.watchersBuckets) != 0 { numWatchers := 0 for bucketID, v := range cacher.bookmarkWatchers.watchersBuckets {