mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-06 02:34:03 +00:00
Merge pull request #117014 from p0lyn0mial/upstream-cacher-init-expired-watchers
cacher: do not simply popExpiredWatchers when the cacher hasn't dispatched any event
This commit is contained in:
commit
f0e01d2d9e
@ -922,12 +922,24 @@ func (c *Cacher) dispatchEvents() {
|
||||
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
||||
// Never send a bookmark event if we did not see an event here, this is fine
|
||||
// because we don't provide any guarantees on sending bookmarks.
|
||||
//
|
||||
// Just pop closed watchers and requeue others if needed.
|
||||
//
|
||||
// TODO(#115478): rework the following logic
|
||||
// in a way that would allow more
|
||||
// efficient cleanup of closed watchers
|
||||
if lastProcessedResourceVersion == 0 {
|
||||
func() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
// pop expired watchers in case there has been no update
|
||||
c.bookmarkWatchers.popExpiredWatchersThreadUnsafe()
|
||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
|
||||
for _, watcher := range watchers {
|
||||
if watcher.stopped {
|
||||
continue
|
||||
}
|
||||
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
|
||||
}
|
||||
}
|
||||
}()
|
||||
continue
|
||||
}
|
||||
|
@ -1973,3 +1973,59 @@ func BenchmarkCacher_GetList(b *testing.B) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
|
||||
// a bookmark event will be delivered after the cacher has seen an event.
|
||||
// Previously the watchers have been removed from the "want bookmark" queue.
|
||||
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// wait until cacher is initialized.
|
||||
if err := cacher.ready.wait(context.Background()); err != nil {
|
||||
t.Fatalf("unexpected error waiting for the cache to be ready")
|
||||
}
|
||||
|
||||
pred := storage.Everything
|
||||
pred.AllowWatchBookmarks = true
|
||||
opts := storage.ListOptions{
|
||||
Predicate: pred,
|
||||
SendInitialEvents: pointer.Bool(true),
|
||||
}
|
||||
w, err := cacher.Watch(context.Background(), "pods/ns", opts)
|
||||
require.NoError(t, err, "failed to create watch: %v")
|
||||
defer w.Stop()
|
||||
|
||||
// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
|
||||
// We do that every ~1s, so waiting 2 seconds seems enough.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
|
||||
makePod := func(rv uint64) *example.Pod {
|
||||
return &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod-%d", rv),
|
||||
Namespace: "ns",
|
||||
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
}
|
||||
}
|
||||
err = cacher.watchCache.Add(makePod(102))
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyEvents(t, w, []watch.Event{
|
||||
{Type: watch.Added, Object: makePod(102)},
|
||||
{Type: watch.Bookmark, Object: &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: "102",
|
||||
Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
|
||||
},
|
||||
}},
|
||||
}, true)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user