mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #91417 from wojtek-t/fix_watch_race
Fix the bug of watches being accepted instead of returning "too old resource version" in watchcache
This commit is contained in:
commit
a79c711191
@ -534,19 +534,16 @@ func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*w
|
||||
size := w.endIndex - w.startIndex
|
||||
var oldest uint64
|
||||
switch {
|
||||
case size >= w.capacity:
|
||||
// Once the watch event buffer is full, the oldest watch event we can deliver
|
||||
// is the first one in the buffer.
|
||||
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
|
||||
case w.listResourceVersion > 0:
|
||||
// If the watch event buffer isn't full, the oldest watch event we can deliver
|
||||
// is one greater than the resource version of the last full list.
|
||||
case w.listResourceVersion > 0 && w.startIndex == 0:
|
||||
// If no event was removed from the buffer since last relist, the oldest watch
|
||||
// event we can deliver is one greater than the resource version of the list.
|
||||
oldest = w.listResourceVersion + 1
|
||||
case size > 0:
|
||||
// If we've never completed a list, use the resourceVersion of the oldest event
|
||||
// in the buffer.
|
||||
// This should only happen in unit tests that populate the buffer without
|
||||
// performing list/replace operations.
|
||||
// If the previous condition is not satisfied: either some event was already
|
||||
// removed from the buffer or we've never completed a list (the latter can
|
||||
// only happen in unit tests that populate the buffer without performing
|
||||
// list/replace operations), the oldest watch event we can deliver is the first
|
||||
// one in the buffer.
|
||||
oldest = w.cache[w.startIndex%w.capacity].ResourceVersion
|
||||
default:
|
||||
return nil, fmt.Errorf("watch cache isn't correctly initialized")
|
||||
|
@ -782,6 +782,38 @@ func checkCacheElements(cache *watchCache) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
|
||||
store := newTestWatchCache(2, &cache.Indexers{})
|
||||
|
||||
now := store.clock.Now()
|
||||
addEvent := func(key string, rv uint64, t time.Time) {
|
||||
event := &watchCacheEvent{
|
||||
Key: key,
|
||||
ResourceVersion: rv,
|
||||
RecordTime: t,
|
||||
}
|
||||
store.updateCache(event)
|
||||
}
|
||||
|
||||
// Initial LIST comes from the moment of RV=10.
|
||||
store.Replace(nil, "10")
|
||||
|
||||
addEvent("key1", 20, now)
|
||||
|
||||
// Force "key1" to rotate our of cache.
|
||||
later := now.Add(2 * eventFreshDuration)
|
||||
addEvent("key2", 30, later)
|
||||
addEvent("key3", 40, later)
|
||||
|
||||
// Force cache resize.
|
||||
addEvent("key4", 50, later.Add(time.Second))
|
||||
|
||||
_, err := store.GetAllEventsSince(15)
|
||||
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkWatchCache_updateCache(b *testing.B) {
|
||||
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
|
||||
store.cache = store.cache[:0]
|
||||
|
Loading…
Reference in New Issue
Block a user