mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Merge pull request #116436 from wojtek-t/fix_watch_cache_2
Fix incorrect watch events when watch is initialized simultanously with reinitializing watchcache
This commit is contained in:
commit
36b29b38bb
@ -488,10 +488,19 @@ func (c *cacheWatcher) processInterval(ctx context.Context, cacheInterval *watch
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
c.sendWatchCacheEvent(event)
|
c.sendWatchCacheEvent(event)
|
||||||
|
|
||||||
// With some events already sent, update resourceVersion so that
|
// With some events already sent, update resourceVersion so that
|
||||||
// events that were buffered and not yet processed won't be delivered
|
// events that were buffered and not yet processed won't be delivered
|
||||||
// to this watcher second time causing going back in time.
|
// to this watcher second time causing going back in time.
|
||||||
resourceVersion = event.ResourceVersion
|
//
|
||||||
|
// There is one case where events are not necessary ordered by
|
||||||
|
// resourceVersion, being a case of watching from resourceVersion=0,
|
||||||
|
// which at the beginning returns the state of each objects.
|
||||||
|
// For the purpose of it, we need to max it with the resource version
|
||||||
|
// that we have so far.
|
||||||
|
if event.ResourceVersion > resourceVersion {
|
||||||
|
resourceVersion = event.ResourceVersion
|
||||||
|
}
|
||||||
initEventCount++
|
initEventCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,12 +28,14 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
|
||||||
|
"k8s.io/client-go/tools/cache"
|
||||||
testingclock "k8s.io/utils/clock/testing"
|
testingclock "k8s.io/utils/clock/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -285,6 +287,63 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestResourceVersionAfterInitEvents(t *testing.T) {
|
||||||
|
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
return nil, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const numObjects = 10
|
||||||
|
store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil))
|
||||||
|
|
||||||
|
for i := 0; i < numObjects; i++ {
|
||||||
|
elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod-%d", i), uint64(i)))
|
||||||
|
store.Add(elem)
|
||||||
|
}
|
||||||
|
|
||||||
|
wci, err := newCacheIntervalFromStore(numObjects, store, getAttrsFunc)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true }
|
||||||
|
forget := func(_ bool) {}
|
||||||
|
deadline := time.Now().Add(time.Minute)
|
||||||
|
w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "")
|
||||||
|
|
||||||
|
// Simulate a situation when the last event will that was already in
|
||||||
|
// the state, wasn't yet processed by cacher and will be delivered
|
||||||
|
// via channel again.
|
||||||
|
event := &watchCacheEvent{
|
||||||
|
Type: watch.Added,
|
||||||
|
Object: makeTestPod(fmt.Sprintf("pod-%d", numObjects-1), uint64(numObjects-1)),
|
||||||
|
ResourceVersion: uint64(numObjects - 1),
|
||||||
|
}
|
||||||
|
if !w.add(event, time.NewTimer(time.Second)) {
|
||||||
|
t.Fatalf("failed to add event")
|
||||||
|
}
|
||||||
|
w.stopLocked()
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
w.processInterval(context.Background(), wci, uint64(numObjects-1))
|
||||||
|
}()
|
||||||
|
|
||||||
|
// We expect all init events to be delivered.
|
||||||
|
for i := 0; i < numObjects; i++ {
|
||||||
|
<-w.ResultChan()
|
||||||
|
}
|
||||||
|
// We don't expect any other event to be delivered and thus
|
||||||
|
// the ResultChan to be closed.
|
||||||
|
result, ok := <-w.ResultChan()
|
||||||
|
if ok {
|
||||||
|
t.Errorf("unexpected event: %#v", result)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
func TestTimeBucketWatchersBasic(t *testing.T) {
|
func TestTimeBucketWatchersBasic(t *testing.T) {
|
||||||
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
filter := func(_ string, _ labels.Set, _ fields.Set) bool {
|
||||||
return true
|
return true
|
||||||
|
@ -156,14 +156,15 @@ type watchCache struct {
|
|||||||
// getAttrsFunc is used to get labels and fields of an object.
|
// getAttrsFunc is used to get labels and fields of an object.
|
||||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
||||||
|
|
||||||
// cache is used a cyclic buffer - its first element (with the smallest
|
// cache is used a cyclic buffer - the "current" contents of it are
|
||||||
// resourceVersion) is defined by startIndex, its last element is defined
|
// stored in [start_index%capacity, end_index%capacity) - so the
|
||||||
// by endIndex (if cache is full it will be startIndex + capacity).
|
// "current" contents have exactly end_index-start_index items.
|
||||||
// Both startIndex and endIndex can be greater than buffer capacity -
|
|
||||||
// you should always apply modulo capacity to get an index in cache array.
|
|
||||||
cache []*watchCacheEvent
|
cache []*watchCacheEvent
|
||||||
startIndex int
|
startIndex int
|
||||||
endIndex int
|
endIndex int
|
||||||
|
// removedEventSinceRelist holds the information whether any of the events
|
||||||
|
// were already removed from the `cache` cyclic buffer since the last relist
|
||||||
|
removedEventSinceRelist bool
|
||||||
|
|
||||||
// store will effectively support LIST operation from the "end of cache
|
// store will effectively support LIST operation from the "end of cache
|
||||||
// history" i.e. from the moment just after the newest cached watched event.
|
// history" i.e. from the moment just after the newest cached watched event.
|
||||||
@ -346,6 +347,7 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
|
|||||||
if w.isCacheFullLocked() {
|
if w.isCacheFullLocked() {
|
||||||
// Cache is full - remove the oldest element.
|
// Cache is full - remove the oldest element.
|
||||||
w.startIndex++
|
w.startIndex++
|
||||||
|
w.removedEventSinceRelist = true
|
||||||
}
|
}
|
||||||
w.cache[w.endIndex%w.capacity] = event
|
w.cache[w.endIndex%w.capacity] = event
|
||||||
w.endIndex++
|
w.endIndex++
|
||||||
@ -572,8 +574,15 @@ func (w *watchCache) Replace(objs []interface{}, resourceVersion string) error {
|
|||||||
w.Lock()
|
w.Lock()
|
||||||
defer w.Unlock()
|
defer w.Unlock()
|
||||||
|
|
||||||
w.startIndex = 0
|
// Ensure startIndex never decreases, so that existing watchCacheInterval
|
||||||
w.endIndex = 0
|
// instances get "invalid" errors if the try to download from the buffer
|
||||||
|
// using their own start/end indexes calculated from previous buffer
|
||||||
|
// content.
|
||||||
|
|
||||||
|
// Empty the cyclic buffer, ensuring startIndex doesn't decrease.
|
||||||
|
w.startIndex = w.endIndex
|
||||||
|
w.removedEventSinceRelist = false
|
||||||
|
|
||||||
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
|
if err := w.store.Replace(toReplace, resourceVersion); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -664,7 +673,7 @@ func (w *watchCache) getAllEventsSinceLocked(resourceVersion uint64) (*watchCach
|
|||||||
size := w.endIndex - w.startIndex
|
size := w.endIndex - w.startIndex
|
||||||
var oldest uint64
|
var oldest uint64
|
||||||
switch {
|
switch {
|
||||||
case w.listResourceVersion > 0 && w.startIndex == 0:
|
case w.listResourceVersion > 0 && !w.removedEventSinceRelist:
|
||||||
// If no event was removed from the buffer since last relist, the oldest watch
|
// If no event was removed from the buffer since last relist, the oldest watch
|
||||||
// event we can deliver is one greater than the resource version of the list.
|
// event we can deliver is one greater than the resource version of the list.
|
||||||
oldest = w.listResourceVersion + 1
|
oldest = w.listResourceVersion + 1
|
||||||
|
Loading…
Reference in New Issue
Block a user