diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 9540a529fc4..217fa72588c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -516,13 +516,6 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newErrWatcher(err), nil } - // With some events already sent, update resourceVersion so that - // events that were buffered and not yet processed won't be delivered - // to this watcher second time causing going back in time. - if len(initEvents) > 0 { - watchRV = initEvents[len(initEvents)-1].ResourceVersion - } - func() { c.Lock() defer c.Unlock() @@ -537,7 +530,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watcherIdx++ }() - go watcher.process(ctx, initEvents, watchRV) + go watcher.processEvents(ctx, initEvents, watchRV) return watcher, nil } @@ -1392,7 +1385,7 @@ func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) { } } -func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { +func (c *cacheWatcher) processEvents(ctx context.Context, initEvents []*watchCacheEvent, resourceVersion uint64) { defer utilruntime.HandleCrash() // Check how long we are processing initEvents. @@ -1413,15 +1406,25 @@ func (c *cacheWatcher) process(ctx context.Context, initEvents []*watchCacheEven for _, event := range initEvents { c.sendWatchCacheEvent(event) } + objType := c.objectType.String() if len(initEvents) > 0 { initCounter.WithLabelValues(objType).Add(float64(len(initEvents))) + // With some events already sent, update resourceVersion + // so that events that were buffered and not yet processed + // won't be delivered to this watcher second time causing + // going back in time. + resourceVersion = initEvents[len(initEvents)-1].ResourceVersion } processingTime := time.Since(startTime) if processingTime > initProcessThreshold { klog.V(2).Infof("processing %d initEvents of %s (%s) took %v", len(initEvents), objType, c.identifier, processingTime) } + c.process(ctx, resourceVersion) +} + +func (c *cacheWatcher) process(ctx context.Context, resourceVersion uint64) { // At this point we already start processing incoming watch events. // However, the init event can still be processed because their serialization // and sending to the client happens asynchrnously. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index ca65b9f6352..88819285f5a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -74,7 +74,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.process(context.Background(), initEvents, 0) + go w.processEvents(context.Background(), initEvents, 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { lock.RLock() @@ -194,7 +194,7 @@ TestCase: } w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, objectType, "") - go w.process(context.Background(), testCase.events, 0) + go w.processEvents(context.Background(), testCase.events, 0) ch := w.ResultChan() for j, event := range testCase.expected { @@ -545,7 +545,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, objectType, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, _ := context.WithDeadline(context.Background(), deadline) - go w.process(ctx, nil, 0) + go w.processEvents(ctx, nil, 0) select { case <-w.ResultChan(): case <-time.After(time.Second):