From ef1e5b6d3ac25431d890bfe4f540d9aa956a856d Mon Sep 17 00:00:00 2001 From: wojtekt Date: Mon, 25 May 2020 20:46:27 +0200 Subject: [PATCH] Watch should return "too old RV" if it can't serve it. --- .../pkg/storage/cacher/watch_cache.go | 19 +++++------ .../pkg/storage/cacher/watch_cache_test.go | 32 +++++++++++++++++++ 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index ef469e15176..6fb84443d6a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -534,19 +534,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w size := w.endIndex - w.startIndex var oldest uint64 switch { - case size >= w.capacity: - // Once the watch event buffer is full, the oldest watch event we can deliver - // is the first one in the buffer. - oldest = w.cache[w.startIndex%w.capacity].ResourceVersion - case w.listResourceVersion > 0: - // If the watch event buffer isn't full, the oldest watch event we can deliver - // is one greater than the resource version of the last full list. + case w.listResourceVersion > 0 && w.startIndex == 0: + // If no event was removed from the buffer since last relist, the oldest watch + // event we can deliver is one greater than the resource version of the list. oldest = w.listResourceVersion + 1 case size > 0: - // If we've never completed a list, use the resourceVersion of the oldest event - // in the buffer. - // This should only happen in unit tests that populate the buffer without - // performing list/replace operations. + // If the previous condition is not satisfied: either some event was already + // removed from the buffer or we've never completed a list (the latter can + // only happen in unit tests that populate the buffer without performing + // list/replace operations), the oldest watch event we can deliver is the first + // one in the buffer. oldest = w.cache[w.startIndex%w.capacity].ResourceVersion default: return nil, fmt.Errorf("watch cache isn't correctly initialized") diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index de0437501d6..86b573f323f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -782,6 +782,38 @@ func checkCacheElements(cache *watchCache) bool { return true } +func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { + store := newTestWatchCache(2, &cache.Indexers{}) + + now := store.clock.Now() + addEvent := func(key string, rv uint64, t time.Time) { + event := &watchCacheEvent{ + Key: key, + ResourceVersion: rv, + RecordTime: t, + } + store.updateCache(event) + } + + // Initial LIST comes from the moment of RV=10. + store.Replace(nil, "10") + + addEvent("key1", 20, now) + + // Force "key1" to rotate our of cache. + later := now.Add(2 * eventFreshDuration) + addEvent("key2", 30, later) + addEvent("key3", 40, later) + + // Force cache resize. + addEvent("key4", 50, later.Add(time.Second)) + + _, err := store.GetAllEventsSince(15) + if err == nil || !strings.Contains(err.Error(), "too old resource version") { + t.Errorf("unexpected error: %v", err) + } +} + func BenchmarkWatchCache_updateCache(b *testing.B) { store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) store.cache = store.cache[:0]