From 0204bc7e2594c297f146701132ee969721ada2ab Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Tue, 30 Jul 2019 06:49:01 -0700 Subject: [PATCH] Pop expired watchers in case there is no update --- .../apiserver/pkg/storage/cacher/cacher.go | 7 ++ .../storage/cacher/cacher_whitebox_test.go | 64 +++++++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 2fa49efafff..e366906af4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -165,6 +165,7 @@ func (i *indexedWatchers) terminateAll(objectType reflect.Type, done func(*cache // if you set fire time at X, you can get the bookmark within (X-1,X+1) period. // This is NOT thread-safe. type watcherBookmarkTimeBuckets struct { + lock sync.Mutex watchersBuckets map[int64][]*cacheWatcher startBucketID int64 clock clock.Clock @@ -186,6 +187,8 @@ func (t *watcherBookmarkTimeBuckets) addWatcher(w *cacheWatcher) bool { return false } bucketID := nextTime.Unix() + t.lock.Lock() + defer t.lock.Unlock() if bucketID < t.startBucketID { bucketID = t.startBucketID } @@ -198,6 +201,8 @@ func (t *watcherBookmarkTimeBuckets) popExpiredWatchers() [][]*cacheWatcher { currentBucketID := t.clock.Now().Unix() // There should be one or two elements in almost all cases expiredWatchers := make([][]*cacheWatcher, 0, 2) + t.lock.Lock() + defer t.lock.Unlock() for ; t.startBucketID <= currentBucketID; t.startBucketID++ { if watchers, ok := t.watchersBuckets[t.startBucketID]; ok { delete(t.watchersBuckets, t.startBucketID) @@ -784,6 +789,8 @@ func (c *Cacher) dispatchEvents() { // Never send a bookmark event if we did not see an event here, this is fine // because we don't provide any guarantees on sending bookmarks. if lastProcessedResourceVersion == 0 { + // pop expired watchers in case there has been no update + c.bookmarkWatchers.popExpiredWatchers() continue } bookmarkEvent := &watchCacheEvent{ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 154cc524f61..ff807584029 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -587,6 +587,70 @@ func TestTimeBucketWatchersBasic(t *testing.T) { } } +func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, true)() + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 1000) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Wait until cacher is initialized. + cacher.ready.wait() + pred := storage.Everything + pred.AllowWatchBookmarks = true + + // run the collision test for 3 seconds to let ~2 buckets expire + stopCh := make(chan struct{}) + time.AfterFunc(3*time.Second, func() { close(stopCh) }) + + wg := &sync.WaitGroup{} + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + w, err := cacher.Watch(ctx, "pods/ns", "0", pred) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + w.Stop() + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-stopCh: + return + default: + cacher.bookmarkWatchers.popExpiredWatchers() + } + } + }() + + // wait for adding/removing watchers to end + wg.Wait() + + // wait out the expiration period and pop expired watchers + time.Sleep(2 * time.Second) + cacher.bookmarkWatchers.popExpiredWatchers() + cacher.bookmarkWatchers.lock.Lock() + defer cacher.bookmarkWatchers.lock.Unlock() + if len(cacher.bookmarkWatchers.watchersBuckets) != 0 { + t.Errorf("unexpected bookmark watchers %v", len(cacher.bookmarkWatchers.watchersBuckets)) + } +} + func testCacherSendBookmarkEvents(t *testing.T, watchCacheEnabled, allowWatchBookmarks, expectedBookmarks bool) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchBookmark, watchCacheEnabled)() backingStorage := &dummyStorage{}