From 6db4cbfde7babfb34f5cd1059c769ec2d870f12a Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Wed, 19 Apr 2023 15:29:13 +0200 Subject: [PATCH] cacher: do not popExpiredWatchers when the cacher hasn't dispatched any event If the cacher hasn't seen any event (when lastProcessedResourceVersion is zero) and the bookmarkTimer has ticked then we shouldn't popExpiredWatchers. This is because the watchers wont' be re-added and will miss future bookmark events when the cacher finally receives an event via the c.incoming chan. --- .../apiserver/pkg/storage/cacher/cacher.go | 16 +++++- .../storage/cacher/cacher_whitebox_test.go | 56 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index aee122521af..4453b5cf1d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -922,12 +922,24 @@ func (c *Cacher) dispatchEvents() { bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) // Never send a bookmark event if we did not see an event here, this is fine // because we don't provide any guarantees on sending bookmarks. + // + // Just pop closed watchers and requeue others if needed. + // + // TODO(#115478): rework the following logic + // in a way that would allow more + // efficient cleanup of closed watchers if lastProcessedResourceVersion == 0 { func() { c.Lock() defer c.Unlock() - // pop expired watchers in case there has been no update - c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() + for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() { + for _, watcher := range watchers { + if watcher.stopped { + continue + } + c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) + } + } }() continue } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 24e0da48b25..86d184e136c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1973,3 +1973,59 @@ func BenchmarkCacher_GetList(b *testing.B) { }) } } + +// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that +// a bookmark event will be delivered after the cacher has seen an event. +// Previously the watchers have been removed from the "want bookmark" queue. +func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // wait until cacher is initialized. + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + pred := storage.Everything + pred.AllowWatchBookmarks = true + opts := storage.ListOptions{ + Predicate: pred, + SendInitialEvents: pointer.Bool(true), + } + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers. + // We do that every ~1s, so waiting 2 seconds seems enough. + time.Sleep(2 * time.Second) + + // Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value. + makePod := func(rv uint64) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } + } + err = cacher.watchCache.Add(makePod(102)) + require.NoError(t, err) + + verifyEvents(t, w, []watch.Event{ + {Type: watch.Added, Object: makePod(102)}, + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, true) +}