Watch should return "too old RV" if it can't serve it.

This commit is contained in:
wojtekt 2020-05-25 20:46:27 +02:00
parent 2f38e1b130
commit ef1e5b6d3a
2 changed files with 40 additions and 11 deletions

View File

@ -534,19 +534,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
size := w.endIndex - w.startIndex size := w.endIndex - w.startIndex
var oldest uint64 var oldest uint64
switch { switch {
case size >= w.capacity: case w.listResourceVersion > 0 && w.startIndex == 0:
// Once the watch event buffer is full, the oldest watch event we can deliver // If no event was removed from the buffer since last relist, the oldest watch
// is the first one in the buffer. // event we can deliver is one greater than the resource version of the list.
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
case w.listResourceVersion > 0:
// If the watch event buffer isn't full, the oldest watch event we can deliver
// is one greater than the resource version of the last full list.
oldest = w.listResourceVersion + 1 oldest = w.listResourceVersion + 1
case size > 0: case size > 0:
// If we've never completed a list, use the resourceVersion of the oldest event // If the previous condition is not satisfied: either some event was already
// in the buffer. // removed from the buffer or we've never completed a list (the latter can
// This should only happen in unit tests that populate the buffer without // only happen in unit tests that populate the buffer without performing
// performing list/replace operations. // list/replace operations), the oldest watch event we can deliver is the first
// one in the buffer.
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
default: default:
return nil, fmt.Errorf("watch cache isn't correctly initialized") return nil, fmt.Errorf("watch cache isn't correctly initialized")

View File

@ -782,6 +782,38 @@ func checkCacheElements(cache *watchCache) bool {
return true return true
} }
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{})
now := store.clock.Now()
addEvent := func(key string, rv uint64, t time.Time) {
event := &watchCacheEvent{
Key: key,
ResourceVersion: rv,
RecordTime: t,
}
store.updateCache(event)
}
// Initial LIST comes from the moment of RV=10.
store.Replace(nil, "10")
addEvent("key1", 20, now)
// Force "key1" to rotate our of cache.
later := now.Add(2 * eventFreshDuration)
addEvent("key2", 30, later)
addEvent("key3", 40, later)
// Force cache resize.
addEvent("key4", 50, later.Add(time.Second))
_, err := store.GetAllEventsSince(15)
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
t.Errorf("unexpected error: %v", err)
}
}
func BenchmarkWatchCache_updateCache(b *testing.B) { func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store.cache = store.cache[:0] store.cache = store.cache[:0]