diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index 69cc9f7e852..478d2151d9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -488,10 +488,19 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch break } c.sendWatchCacheEvent(event) + // With some events already sent, update resourceVersion so that // events that were buffered and not yet processed won't be delivered // to this watcher second time causing going back in time. - resourceVersion = event.ResourceVersion + // + // There is one case where events are not necessary ordered by + // resourceVersion, being a case of watching from resourceVersion=0, + // which at the beginning returns the state of each objects. + // For the purpose of it, we need to max it with the resource version + // that we have so far. + if event.ResourceVersion > resourceVersion { + resourceVersion = event.ResourceVersion + } initEventCount++ } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go index c183259519e..d408c7ae865 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -28,12 +28,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/storage" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + "k8s.io/client-go/tools/cache" testingclock "k8s.io/utils/clock/testing" ) @@ -285,6 +287,63 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { } +func TestResourceVersionAfterInitEvents(t *testing.T) { + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { + return nil, nil, nil + } + + const numObjects = 10 + store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + + for i := 0; i < numObjects; i++ { + elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod-%d", i), uint64(i))) + store.Add(elem) + } + + wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc) + if err != nil { + t.Fatal(err) + } + + filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true } + forget := func(_ bool) {} + deadline := time.Now().Add(time.Minute) + w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + + // Simulate a situation when the last event will that was already in + // the state, wasn't yet processed by cacher and will be delivered + // via channel again. + event := &watchCacheEvent{ + Type: watch.Added, + Object: makeTestPod(fmt.Sprintf("pod-%d", numObjects-1), uint64(numObjects-1)), + ResourceVersion: uint64(numObjects - 1), + } + if !w.add(event, time.NewTimer(time.Second)) { + t.Fatalf("failed to add event") + } + w.stopLocked() + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + w.processInterval(context.Background(), wci, uint64(numObjects-1)) + }() + + // We expect all init events to be delivered. + for i := 0; i < numObjects; i++ { + <-w.ResultChan() + } + // We don't expect any other event to be delivered and thus + // the ResultChan to be closed. + result, ok := <-w.ResultChan() + if ok { + t.Errorf("unexpected event: %#v", result) + } + + wg.Wait() +} + func TestTimeBucketWatchersBasic(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 783cf4524c3..4d86018e520 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -156,14 +156,15 @@ type watchCache struct { // getAttrsFunc is used to get labels and fields of an object. getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error) - // cache is used a cyclic buffer - its first element (with the smallest - // resourceVersion) is defined by startIndex, its last element is defined - // by endIndex (if cache is full it will be startIndex + capacity). - // Both startIndex and endIndex can be greater than buffer capacity - - // you should always apply modulo capacity to get an index in cache array. + // cache is used a cyclic buffer - the "current" contents of it are + // stored in [start_index%capacity, end_index%capacity) - so the + // "current" contents have exactly end_index-start_index items. cache []*watchCacheEvent startIndex int endIndex int + // removedEventSinceRelist holds the information whether any of the events + // were already removed from the `cache` cyclic buffer since the last relist + removedEventSinceRelist bool // store will effectively support LIST operation from the "end of cache // history" i.e. from the moment just after the newest cached watched event. @@ -346,6 +347,7 @@ func (w *watchCache) updateCache(event *watchCacheEvent) { if w.isCacheFullLocked() { // Cache is full - remove the oldest element. w.startIndex++ + w.removedEventSinceRelist = true } w.cache[w.endIndex%w.capacity] = event w.endIndex++ @@ -572,8 +574,15 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { w.Lock() defer w.Unlock() - w.startIndex = 0 - w.endIndex = 0 + // Ensure startIndex never decreases, so that existing watchCacheInterval + // instances get "invalid" errors if the try to download from the buffer + // using their own start/end indexes calculated from previous buffer + // content. + + // Empty the cyclic buffer, ensuring startIndex doesn't decrease. + w.startIndex = w.endIndex + w.removedEventSinceRelist = false + if err := w.store.Replace(toReplace, resourceVersion); err != nil { return err } @@ -664,7 +673,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach size := w.endIndex - w.startIndex var oldest uint64 switch { - case w.listResourceVersion > 0 && w.startIndex == 0: + case w.listResourceVersion > 0 && !w.removedEventSinceRelist: // If no event was removed from the buffer since last relist, the oldest watch // event we can deliver is one greater than the resource version of the list. oldest = w.listResourceVersion + 1