cacher: do not popExpiredWatchers when the cacher hasn't dispatched any event

If the cacher hasn't seen any event (when lastProcessedResourceVersion is zero) and
the bookmarkTimer has ticked then we shouldn't popExpiredWatchers. This is
because the watchers wont' be re-added and will miss future bookmark events when
the cacher finally receives an event via the c.incoming chan.
This commit is contained in:
Lukasz Szaszkiewicz 2023-04-19 15:29:13 +02:00
parent eab66a687b
commit 6db4cbfde7
2 changed files with 70 additions and 2 deletions

View File

@ -922,12 +922,24 @@ func (c *Cacher) dispatchEvents() {
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
// Never send a bookmark event if we did not see an event here, this is fine
// because we don't provide any guarantees on sending bookmarks.
//
// Just pop closed watchers and requeue others if needed.
//
// TODO(#115478): rework the following logic
// in a way that would allow more
// efficient cleanup of closed watchers
if lastProcessedResourceVersion == 0 {
func() {
c.Lock()
defer c.Unlock()
// pop expired watchers in case there has been no update
c.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
for _, watcher := range watchers {
if watcher.stopped {
continue
}
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
}
}
}()
continue
}

View File

@ -1973,3 +1973,59 @@ func BenchmarkCacher_GetList(b *testing.B) {
})
}
}
// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
// a bookmark event will be delivered after the cacher has seen an event.
// Previously the watchers have been removed from the "want bookmark" queue.
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
// wait until cacher is initialized.
if err := cacher.ready.wait(context.Background()); err != nil {
t.Fatalf("unexpected error waiting for the cache to be ready")
}
pred := storage.Everything
pred.AllowWatchBookmarks = true
opts := storage.ListOptions{
Predicate: pred,
SendInitialEvents: pointer.Bool(true),
}
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
require.NoError(t, err, "failed to create watch: %v")
defer w.Stop()
// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
// We do that every ~1s, so waiting 2 seconds seems enough.
time.Sleep(2 * time.Second)
// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
makePod := func(rv uint64) *example.Pod {
return &example.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("pod-%d", rv),
Namespace: "ns",
ResourceVersion: fmt.Sprintf("%d", rv),
Annotations: map[string]string{},
},
}
}
err = cacher.watchCache.Add(makePod(102))
require.NoError(t, err)
verifyEvents(t, w, []watch.Event{
{Type: watch.Added, Object: makePod(102)},
{Type: watch.Bookmark, Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "102",
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
},
}},
}, true)
}