From 12021725922efc3a80c8a0673b28826a524eb0a0 Mon Sep 17 00:00:00 2001 From: wojtekt Date: Thu, 30 Aug 2018 12:28:44 +0200 Subject: [PATCH] Fix unnecessary too-old-errors from watch cache --- .../pkg/storage/cacher/watch_cache.go | 47 +++++++++++++------ .../pkg/storage/cacher/watch_cache_test.go | 36 ++++++++++++++ 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 5afbe434e74..527b81dda2b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -127,6 +127,9 @@ type watchCache struct { // ResourceVersion up to which the watchCache is propagated. resourceVersion uint64 + // ResourceVersion of the last list result (populated via Replace() method). + listResourceVersion uint64 + // This handler is run at the end of every successful Replace() method. onReplace func() @@ -147,16 +150,17 @@ func newWatchCache( getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error), versioner storage.Versioner) *watchCache { wc := &watchCache{ - capacity: capacity, - keyFunc: keyFunc, - getAttrsFunc: getAttrsFunc, - cache: make([]watchCacheElement, capacity), - startIndex: 0, - endIndex: 0, - store: cache.NewStore(storeElementKey), - resourceVersion: 0, - clock: clock.RealClock{}, - versioner: versioner, + capacity: capacity, + keyFunc: keyFunc, + getAttrsFunc: getAttrsFunc, + cache: make([]watchCacheElement, capacity), + startIndex: 0, + endIndex: 0, + store: cache.NewStore(storeElementKey), + resourceVersion: 0, + listResourceVersion: 0, + clock: clock.RealClock{}, + versioner: versioner, } wc.cond = sync.NewCond(wc.RLocker()) return wc @@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error { if err := w.store.Replace(toReplace, resourceVersion); err != nil { return err } + w.listResourceVersion = version w.resourceVersion = version if w.onReplace != nil { w.onReplace() @@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { size := w.endIndex - w.startIndex - // if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher - // is the *next* event we'll receive, which will be at least one greater than our current resourceVersion - oldest := w.resourceVersion + 1 - if size > 0 { + var oldest uint64 + switch { + case size >= w.capacity: + // Once the watch event buffer is full, the oldest watch event we can deliver + // is the first one in the buffer. oldest = w.cache[w.startIndex%w.capacity].resourceVersion + case w.listResourceVersion > 0: + // If the watch event buffer isn't full, the oldest watch event we can deliver + // is one greater than the resource version of the last full list. + oldest = w.listResourceVersion + 1 + case size > 0: + // If we've never completed a list, use the resourceVersion of the oldest event + // in the buffer. + // This should only happen in unit tests that populate the buffer without + // performing list/replace operations. + oldest = w.cache[w.startIndex%w.capacity].resourceVersion + default: + return nil, fmt.Errorf("watch cache isn't correctly initialized") } + if resourceVersion == 0 { // resourceVersion = 0 means that we don't require any specific starting point // and we would like to start watching from ~now. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index d6a7b88b157..594105faae8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -19,6 +19,7 @@ package cacher import ( "fmt" "strconv" + "strings" "testing" "time" @@ -278,6 +279,41 @@ func TestEvents(t *testing.T) { } } +func TestMarker(t *testing.T) { + store := newTestWatchCache(3) + + // First thing that is called when propagated from storage is Replace. + store.Replace([]interface{}{ + makeTestPod("pod1", 5), + makeTestPod("pod2", 9), + }, "9") + + _, err := store.GetAllEventsSince(8) + if err == nil || !strings.Contains(err.Error(), "too old resource version") { + t.Errorf("unexpected error: %v", err) + } + // Getting events from 8 should return no events, + // even though there is a marker there. + result, err := store.GetAllEventsSince(9) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(result) != 0 { + t.Errorf("unexpected result: %#v, expected no events", result) + } + + pod := makeTestPod("pods", 12) + store.Add(pod) + // Getting events from 8 should still work and return one event. + result, err = store.GetAllEventsSince(9) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) { + t.Errorf("unexpected result: %#v, expected %v", result, pod) + } +} + func TestWaitUntilFreshAndList(t *testing.T) { store := newTestWatchCache(3)