Fix unnecessary too-old-errors from watch cache

This commit is contained in:
wojtekt 2018-08-30 12:28:44 +02:00
parent 8aea674681
commit 1202172592
2 changed files with 69 additions and 14 deletions

View File

@ -127,6 +127,9 @@ type watchCache struct {
// ResourceVersion up to which the watchCache is propagated. // ResourceVersion up to which the watchCache is propagated.
resourceVersion uint64 resourceVersion uint64
// ResourceVersion of the last list result (populated via Replace() method).
listResourceVersion uint64
// This handler is run at the end of every successful Replace() method. // This handler is run at the end of every successful Replace() method.
onReplace func() onReplace func()
@ -147,16 +150,17 @@ func newWatchCache(
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
versioner storage.Versioner) *watchCache { versioner storage.Versioner) *watchCache {
wc := &watchCache{ wc := &watchCache{
capacity: capacity, capacity: capacity,
keyFunc: keyFunc, keyFunc: keyFunc,
getAttrsFunc: getAttrsFunc, getAttrsFunc: getAttrsFunc,
cache: make([]watchCacheElement, capacity), cache: make([]watchCacheElement, capacity),
startIndex: 0, startIndex: 0,
endIndex: 0, endIndex: 0,
store: cache.NewStore(storeElementKey), store: cache.NewStore(storeElementKey),
resourceVersion: 0, resourceVersion: 0,
clock: clock.RealClock{}, listResourceVersion: 0,
versioner: versioner, clock: clock.RealClock{},
versioner: versioner,
} }
wc.cond = sync.NewCond(wc.RLocker()) wc.cond = sync.NewCond(wc.RLocker())
return wc return wc
@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
if err := w.store.Replace(toReplace, resourceVersion); err != nil { if err := w.store.Replace(toReplace, resourceVersion); err != nil {
return err return err
} }
w.listResourceVersion = version
w.resourceVersion = version w.resourceVersion = version
if w.onReplace != nil { if w.onReplace != nil {
w.onReplace() w.onReplace()
@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) { func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher var oldest uint64
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion switch {
oldest := w.resourceVersion + 1 case size >= w.capacity:
if size > 0 { // Once the watch event buffer is full, the oldest watch event we can deliver
// is the first one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion oldest = w.cache[w.startIndex%w.capacity].resourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
oldest = w.listResourceVersion + 1
case size > 0:
// If we've never completed a list, use the resourceVersion of the oldest event
// in the buffer.
// This should only happen in unit tests that populate the buffer without
// performing list/replace operations.
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
default:
return nil, fmt.Errorf("watch cache isn't correctly initialized")
} }
if resourceVersion == 0 { if resourceVersion == 0 {
// resourceVersion = 0 means that we don't require any specific starting point // resourceVersion = 0 means that we don't require any specific starting point
// and we would like to start watching from ~now. // and we would like to start watching from ~now.

View File

@ -19,6 +19,7 @@ package cacher
import ( import (
"fmt" "fmt"
"strconv" "strconv"
"strings"
"testing" "testing"
"time" "time"
@ -278,6 +279,41 @@ func TestEvents(t *testing.T) {
} }
} }
func TestMarker(t *testing.T) {
store := newTestWatchCache(3)
// First thing that is called when propagated from storage is Replace.
store.Replace([]interface{}{
makeTestPod("pod1", 5),
makeTestPod("pod2", 9),
}, "9")
_, err := store.GetAllEventsSince(8)
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err)
}
// Getting events from 8 should return no events,
// even though there is a marker there.
result, err := store.GetAllEventsSince(9)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result) != 0 {
t.Errorf("unexpected result: %#v, expected no events", result)
}
pod := makeTestPod("pods", 12)
store.Add(pod)
// Getting events from 8 should still work and return one event.
result, err = store.GetAllEventsSince(9)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) {
t.Errorf("unexpected result: %#v, expected %v", result, pod)
}
}
func TestWaitUntilFreshAndList(t *testing.T) { func TestWaitUntilFreshAndList(t *testing.T) {
store := newTestWatchCache(3) store := newTestWatchCache(3)