diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1e6cc4d021a..40fe4c0fdde 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -905,7 +905,23 @@ func (c *Cacher) dispatchEvents() { bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25)) defer bookmarkTimer.Stop() + // The internal informer populates the RV as soon as it conducts + // The first successful sync with the underlying store. + // The cache must wait until this first sync is completed to be deemed ready. + // Since we cannot send a bookmark when the lastProcessedResourceVersion is 0, + // we poll aggressively for the first RV before entering the dispatch loop. lastProcessedResourceVersion := uint64(0) + if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) { + if rv := c.watchCache.getResourceVersion(); rv != 0 { + lastProcessedResourceVersion = rv + return true, nil + } + return false, nil + }); err != nil { + // given the function above never returns error, + // the non-empty error means that the stopCh was closed + return + } for { select { case event, ok := <-c.incoming: @@ -929,29 +945,6 @@ func (c *Cacher) dispatchEvents() { metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc() case <-bookmarkTimer.C(): bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25)) - // Never send a bookmark event if we did not see an event here, this is fine - // because we don't provide any guarantees on sending bookmarks. - // - // Just pop closed watchers and requeue others if needed. - // - // TODO(#115478): rework the following logic - // in a way that would allow more - // efficient cleanup of closed watchers - if lastProcessedResourceVersion == 0 { - func() { - c.Lock() - defer c.Unlock() - for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() { - for _, watcher := range watchers { - if watcher.stopped { - continue - } - c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) - } - } - }() - continue - } bookmarkEvent := &watchCacheEvent{ Type: watch.Bookmark, Object: c.newFunc(), diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 33ae17b17bd..cc34ced7fcf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -1902,16 +1902,13 @@ func BenchmarkCacher_GetList(b *testing.B) { } } -// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that -// a bookmark event will be delivered after the cacher has seen an event. -// Previously the watchers have been removed from the "want bookmark" queue. -func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { +// TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived makes sure that +// a bookmark event will be delivered even if the cacher has not received an event. +func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true) backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } + require.NoError(t, err, "failed to create cacher") defer cacher.Stop() // wait until cacher is initialized. @@ -1929,29 +1926,10 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) { require.NoError(t, err, "failed to create watch: %v") defer w.Stop() - // Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers. - // We do that every ~1s, so waiting 2 seconds seems enough. - time.Sleep(2 * time.Second) - - // Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value. - makePod := func(rv uint64) *example.Pod { - return &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("pod-%d", rv), - Namespace: "ns", - ResourceVersion: fmt.Sprintf("%d", rv), - Annotations: map[string]string{}, - }, - } - } - err = cacher.watchCache.Add(makePod(102)) - require.NoError(t, err) - verifyEvents(t, w, []watch.Event{ - {Type: watch.Added, Object: makePod(102)}, {Type: watch.Bookmark, Object: &example.Pod{ ObjectMeta: metav1.ObjectMeta{ - ResourceVersion: "102", + ResourceVersion: "100", Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, }, }}, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 9c4d93f176a..6a5526ca559 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -641,6 +641,12 @@ func (w *watchCache) Resync() error { return nil } +func (w *watchCache) getResourceVersion() uint64 { + w.RLock() + defer w.RUnlock() + return w.resourceVersion +} + func (w *watchCache) currentCapacity() int { w.RLock() defer w.RUnlock()