mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 23:37:01 +00:00
Merge pull request #124754 from p0lyn0mial/upstream-cacher-dispatchevents-progress-requester
storage/cacher: dispatchEvents use progressRequester
This commit is contained in:
commit
935292b62d
@ -908,7 +908,23 @@ func (c *Cacher) dispatchEvents() {
|
||||
bookmarkTimer := c.clock.NewTimer(wait.Jitter(time.Second, 0.25))
|
||||
defer bookmarkTimer.Stop()
|
||||
|
||||
// The internal informer populates the RV as soon as it conducts
|
||||
// The first successful sync with the underlying store.
|
||||
// The cache must wait until this first sync is completed to be deemed ready.
|
||||
// Since we cannot send a bookmark when the lastProcessedResourceVersion is 0,
|
||||
// we poll aggressively for the first RV before entering the dispatch loop.
|
||||
lastProcessedResourceVersion := uint64(0)
|
||||
if err := wait.PollUntilContextCancel(wait.ContextForChannel(c.stopCh), 10*time.Millisecond, true, func(_ context.Context) (bool, error) {
|
||||
if rv := c.watchCache.getResourceVersion(); rv != 0 {
|
||||
lastProcessedResourceVersion = rv
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}); err != nil {
|
||||
// given the function above never returns error,
|
||||
// the non-empty error means that the stopCh was closed
|
||||
return
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-c.incoming:
|
||||
@ -932,29 +948,6 @@ func (c *Cacher) dispatchEvents() {
|
||||
metrics.EventsCounter.WithLabelValues(c.groupResource.String()).Inc()
|
||||
case <-bookmarkTimer.C():
|
||||
bookmarkTimer.Reset(wait.Jitter(time.Second, 0.25))
|
||||
// Never send a bookmark event if we did not see an event here, this is fine
|
||||
// because we don't provide any guarantees on sending bookmarks.
|
||||
//
|
||||
// Just pop closed watchers and requeue others if needed.
|
||||
//
|
||||
// TODO(#115478): rework the following logic
|
||||
// in a way that would allow more
|
||||
// efficient cleanup of closed watchers
|
||||
if lastProcessedResourceVersion == 0 {
|
||||
func() {
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
for _, watchers := range c.bookmarkWatchers.popExpiredWatchersThreadUnsafe() {
|
||||
for _, watcher := range watchers {
|
||||
if watcher.stopped {
|
||||
continue
|
||||
}
|
||||
c.bookmarkWatchers.addWatcherThreadUnsafe(watcher)
|
||||
}
|
||||
}
|
||||
}()
|
||||
continue
|
||||
}
|
||||
bookmarkEvent := &watchCacheEvent{
|
||||
Type: watch.Bookmark,
|
||||
Object: c.newFunc(),
|
||||
|
@ -1902,16 +1902,13 @@ func BenchmarkCacher_GetList(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDoNotPopExpiredWatchersWhenNoEventsSeen makes sure that
|
||||
// a bookmark event will be delivered after the cacher has seen an event.
|
||||
// Previously the watchers have been removed from the "want bookmark" queue.
|
||||
func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
|
||||
// TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived makes sure that
|
||||
// a bookmark event will be delivered even if the cacher has not received an event.
|
||||
func TestWatchListIsSynchronisedWhenNoEventsFromStoreReceived(t *testing.T) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)
|
||||
backingStorage := &dummyStorage{}
|
||||
cacher, _, err := newTestCacher(backingStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
require.NoError(t, err, "failed to create cacher")
|
||||
defer cacher.Stop()
|
||||
|
||||
// wait until cacher is initialized.
|
||||
@ -1929,29 +1926,10 @@ func TestDoNotPopExpiredWatchersWhenNoEventsSeen(t *testing.T) {
|
||||
require.NoError(t, err, "failed to create watch: %v")
|
||||
defer w.Stop()
|
||||
|
||||
// Ensure that popExpiredWatchers is called to ensure that our watch isn't removed from bookmarkWatchers.
|
||||
// We do that every ~1s, so waiting 2 seconds seems enough.
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Send an event to ensure that lastProcessedResourceVersion in Cacher will change to non-zero value.
|
||||
makePod := func(rv uint64) *example.Pod {
|
||||
return &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("pod-%d", rv),
|
||||
Namespace: "ns",
|
||||
ResourceVersion: fmt.Sprintf("%d", rv),
|
||||
Annotations: map[string]string{},
|
||||
},
|
||||
}
|
||||
}
|
||||
err = cacher.watchCache.Add(makePod(102))
|
||||
require.NoError(t, err)
|
||||
|
||||
verifyEvents(t, w, []watch.Event{
|
||||
{Type: watch.Added, Object: makePod(102)},
|
||||
{Type: watch.Bookmark, Object: &example.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
ResourceVersion: "102",
|
||||
ResourceVersion: "100",
|
||||
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
|
||||
},
|
||||
}},
|
||||
|
@ -643,6 +643,12 @@ func (w *watchCache) Resync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *watchCache) getResourceVersion() uint64 {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
return w.resourceVersion
|
||||
}
|
||||
|
||||
func (w *watchCache) currentCapacity() int {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
Loading…
Reference in New Issue
Block a user