diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index aee122521af..4453b5cf1d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -922,12 +922,24 @@ func (c *Cacher) dispatchEvents() { bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) // Never send a bookmark event if we did not see an event here, this is fine // because we don't provide any guarantees on sending bookmarks. + // + // Just pop closed watchers and requeue others if needed. + // + // TODO(#115478): rework the following logic + // in a way that would allow more + // efficient cleanup of closed watchers if lastProcessedResourceVersion == 0 { func() { c.Lock() defer c.Unlock() - // pop expired watchers in case there has been no update - c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() + for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() { + for _, watcher := range watchers { + if watcher.stopped { + continue + } + c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) + } + } }() continue } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 24e0da48b25..86d184e136c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1973,3 +1973,59 @@ func BenchmarkCacher_GetList(b *testing.B) { }) } } + +// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that +// a bookmark event will be delivered after the cacher has seen an event. +// Previously the watchers have been removed from the "want bookmark" queue. +func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // wait until cacher is initialized. + if err := cacher.ready.wait(context.Background()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + pred := storage.Everything + pred.AllowWatchBookmarks = true + opts := storage.ListOptions{ + Predicate: pred, + SendInitialEvents: pointer.Bool(true), + } + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers. + // We do that every ~1s, so waiting 2 seconds seems enough. + time.Sleep(2 * time.Second) + + // Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value. + makePod := func(rv uint64) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } + } + err = cacher.watchCache.Add(makePod(102)) + require.NoError(t, err) + + verifyEvents(t, w, []watch.Event{ + {Type: watch.Added, Object: makePod(102)}, + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, true) +}