mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #68065 from wojtek-t/fix_unnecessary_too_old_rv_errors
Automatic merge from submit-queue (batch tested with PRs 68051, 68130, 67211, 68065, 68117). If you want to cherry-pick this change to another branch, please follow the instructions here: https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md. Fix unnecessary too-old-errors from watch cache When initializing watch cache via LIST, we set its resource version to the RV of the list request. However, before this PR, the first incoming watch event (updating the watch cache) was moving the "smallest oldest known version" to RV of that watch event. So watch requests passing rv equal to the RV returned from the initial list were failing with "too old resource version". That is not needed, because we know that in the meantime there weren't any other watch events. This PR is addressing that issue. /assign @liggitt
This commit is contained in:
commit
3966b8bbcc
@ -127,6 +127,9 @@ type watchCache struct {
|
||||
// ResourceVersion up to which the watchCache is propagated.
|
||||
resourceVersion uint64
|
||||
|
||||
// ResourceVersion of the last list result (populated via Replace() method).
|
||||
listResourceVersion uint64
|
||||
|
||||
// This handler is run at the end of every successful Replace() method.
|
||||
onReplace func()
|
||||
|
||||
@ -147,16 +150,17 @@ func newWatchCache(
|
||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, bool, error),
|
||||
versioner storage.Versioner) *watchCache {
|
||||
wc := &watchCache{
|
||||
capacity: capacity,
|
||||
keyFunc: keyFunc,
|
||||
getAttrsFunc: getAttrsFunc,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
endIndex: 0,
|
||||
store: cache.NewStore(storeElementKey),
|
||||
resourceVersion: 0,
|
||||
clock: clock.RealClock{},
|
||||
versioner: versioner,
|
||||
capacity: capacity,
|
||||
keyFunc: keyFunc,
|
||||
getAttrsFunc: getAttrsFunc,
|
||||
cache: make([]watchCacheElement, capacity),
|
||||
startIndex: 0,
|
||||
endIndex: 0,
|
||||
store: cache.NewStore(storeElementKey),
|
||||
resourceVersion: 0,
|
||||
listResourceVersion: 0,
|
||||
clock: clock.RealClock{},
|
||||
versioner: versioner,
|
||||
}
|
||||
wc.cond = sync.NewCond(wc.RLocker())
|
||||
return wc
|
||||
@ -390,6 +394,7 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
||||
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
w.listResourceVersion = version
|
||||
w.resourceVersion = version
|
||||
if w.onReplace != nil {
|
||||
w.onReplace()
|
||||
@ -412,12 +417,26 @@ func (w *watchCache) SetOnEvent(onEvent func(*watchCacheEvent)) {
|
||||
|
||||
func (w *watchCache) GetAllEventsSinceThreadUnsafe(resourceVersion uint64) ([]*watchCacheEvent, error) {
|
||||
size := w.endIndex - w.startIndex
|
||||
// if we have no watch events in our cache, the oldest one we can successfully deliver to a watcher
|
||||
// is the *next* event we'll receive, which will be at least one greater than our current resourceVersion
|
||||
oldest := w.resourceVersion + 1
|
||||
if size > 0 {
|
||||
var oldest uint64
|
||||
switch {
|
||||
case size >= w.capacity:
|
||||
// Once the watch event buffer is full, the oldest watch event we can deliver
|
||||
// is the first one in the buffer.
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
case w.listResourceVersion > 0:
|
||||
// If the watch event buffer isn't full, the oldest watch event we can deliver
|
||||
// is one greater than the resource version of the last full list.
|
||||
oldest = w.listResourceVersion + 1
|
||||
case size > 0:
|
||||
// If we've never completed a list, use the resourceVersion of the oldest event
|
||||
// in the buffer.
|
||||
// This should only happen in unit tests that populate the buffer without
|
||||
// performing list/replace operations.
|
||||
oldest = w.cache[w.startIndex%w.capacity].resourceVersion
|
||||
default:
|
||||
return nil, fmt.Errorf("watch cache isn't correctly initialized")
|
||||
}
|
||||
|
||||
if resourceVersion == 0 {
|
||||
// resourceVersion = 0 means that we don't require any specific starting point
|
||||
// and we would like to start watching from ~now.
|
||||
|
@ -19,6 +19,7 @@ package cacher
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -278,6 +279,41 @@ func TestEvents(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestMarker(t *testing.T) {
|
||||
store := newTestWatchCache(3)
|
||||
|
||||
// First thing that is called when propagated from storage is Replace.
|
||||
store.Replace([]interface{}{
|
||||
makeTestPod("pod1", 5),
|
||||
makeTestPod("pod2", 9),
|
||||
}, "9")
|
||||
|
||||
_, err := store.GetAllEventsSince(8)
|
||||
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
// Getting events from 8 should return no events,
|
||||
// even though there is a marker there.
|
||||
result, err := store.GetAllEventsSince(9)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(result) != 0 {
|
||||
t.Errorf("unexpected result: %#v, expected no events", result)
|
||||
}
|
||||
|
||||
pod := makeTestPod("pods", 12)
|
||||
store.Add(pod)
|
||||
// Getting events from 8 should still work and return one event.
|
||||
result, err = store.GetAllEventsSince(9)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if len(result) != 1 || !apiequality.Semantic.DeepEqual(result[0].Object, pod) {
|
||||
t.Errorf("unexpected result: %#v, expected %v", result, pod)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitUntilFreshAndList(t *testing.T) {
|
||||
store := newTestWatchCache(3)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user