mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #106828 from MadhavJivrajani/watch-cache-interval-impl
cacher: Implement watchCacheInterval to reduce watchCache lock contention
This commit is contained in:
commit
c225bdd552
@ -636,3 +636,9 @@ func (w *watchCache) Resync() error {
|
||||
// Nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
// isIndexValidLocked checks if a given index is still valid.
|
||||
// This assumes that the lock is held.
|
||||
func (w *watchCache) isIndexValidLocked(index int) bool {
|
||||
return index >= w.startIndex
|
||||
}
|
||||
|
@ -0,0 +1,224 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cacher
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
// watchCacheInterval serves as an abstraction over a source
|
||||
// of watchCacheEvents. It maintains a window of events over
|
||||
// an underlying source and these events can be served using
|
||||
// the exposed Next() API. The main intent for doing things
|
||||
// this way is to introduce an upper bound of memory usage
|
||||
// for starting a watch and reduce the maximum possible time
|
||||
// interval for which the lock would be held while events are
|
||||
// copied over.
|
||||
|
||||
// The source of events for the interval is typically either
|
||||
// the watchCache circular buffer, if events being retrieved
|
||||
// need to be for resource versions > 0 or the underlying
|
||||
// implementation of Store, if resource version = 0.
|
||||
|
||||
// Furthermore, an interval can be either valid or invalid at
|
||||
// any given point of time. The notion of validity makes sense
|
||||
// only in cases where the window of events in the underlying
|
||||
// source can change over time - i.e. for watchCache circular
|
||||
// buffer. When the circular buffer is full and an event needs
|
||||
// to be popped off, watchCache::startIndex is incremented. In
|
||||
// this case, an interval tracking that popped event is valid
|
||||
// only if it has already been copied to its internal buffer.
|
||||
// However, for efficiency we perform that lazily and we mark
|
||||
// an interval as invalid iff we need to copy events from the
|
||||
// watchCache and we end up needing events that have already
|
||||
// been popped off. This translates to the following condition:
|
||||
// watchCacheInterval::startIndex >= watchCache::startIndex.
|
||||
// When this condition becomes false, the interval is no longer
|
||||
// valid and should not be used to retrieve and serve elements
|
||||
// from the underlying source.
|
||||
type watchCacheInterval struct {
|
||||
// startIndex denotes the starting point of the interval
|
||||
// being considered. The value is the index in the actual
|
||||
// source of watchCacheEvents. If the source of events is
|
||||
// the watchCache, then this must be used modulo capacity.
|
||||
startIndex int
|
||||
|
||||
// endIndex denotes the ending point of the interval being
|
||||
// considered. The value is the index in the actual source
|
||||
// of events. If the source of the events is the watchCache,
|
||||
// then this should be used modulo capacity.
|
||||
endIndex int
|
||||
|
||||
// indexer is meant to inject behaviour for how an event must
|
||||
// be retrieved from the underlying source given an index.
|
||||
indexer indexerFunc
|
||||
|
||||
// indexValidator is used to check if a given index is still
|
||||
// valid perspective. If it is deemed that the index is not
|
||||
// valid, then this interval can no longer be used to serve
|
||||
// events. Use of indexValidator is warranted only in cases
|
||||
// where the window of events in the underlying source can
|
||||
// change over time. Furthermore, an interval is invalid if
|
||||
// its startIndex no longer coincides with the startIndex of
|
||||
// underlying source.
|
||||
indexValidator indexValidator
|
||||
|
||||
// buffer holds watchCacheEvents that this interval returns on
|
||||
// a call to Next(). This exists mainly to reduce acquiring the
|
||||
// lock on each invocation of Next().
|
||||
buffer *watchCacheIntervalBuffer
|
||||
|
||||
// lock effectively protects access to the underlying source
|
||||
// of events through - indexer and indexValidator.
|
||||
//
|
||||
// Given that indexer and indexValidator only read state, if
|
||||
// possible, Locker obtained through RLocker() is provided.
|
||||
lock sync.Locker
|
||||
}
|
||||
|
||||
type attrFunc func(runtime.Object) (labels.Set, fields.Set, error)
|
||||
type indexerFunc func(int) *watchCacheEvent
|
||||
type indexValidator func(int) bool
|
||||
|
||||
func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval {
|
||||
return &watchCacheInterval{
|
||||
startIndex: startIndex,
|
||||
endIndex: endIndex,
|
||||
indexer: indexer,
|
||||
indexValidator: indexValidator,
|
||||
buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)},
|
||||
lock: locker,
|
||||
}
|
||||
}
|
||||
|
||||
// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
|
||||
// returned by Next() need to be events from a List() done on the underlying store of
|
||||
// the watch cache.
|
||||
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
|
||||
buffer := &watchCacheIntervalBuffer{}
|
||||
allItems := store.List()
|
||||
buffer.buffer = make([]*watchCacheEvent, len(allItems))
|
||||
for i, item := range allItems {
|
||||
elem, ok := item.(*storeElement)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("not a storeElement: %v", elem)
|
||||
}
|
||||
objLabels, objFields, err := getAttrsFunc(elem.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buffer.buffer[i] = &watchCacheEvent{
|
||||
Type: watch.Added,
|
||||
Object: elem.Object,
|
||||
ObjLabels: objLabels,
|
||||
ObjFields: objFields,
|
||||
Key: elem.Key,
|
||||
ResourceVersion: resourceVersion,
|
||||
}
|
||||
buffer.endIndex++
|
||||
}
|
||||
ci := &watchCacheInterval{
|
||||
startIndex: 0,
|
||||
// Simulate that we already have all the events we're looking for.
|
||||
endIndex: 0,
|
||||
buffer: buffer,
|
||||
}
|
||||
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
// Next returns the next item in the cache interval provided the cache
|
||||
// interval is still valid. An error is returned if the interval is
|
||||
// invalidated.
|
||||
func (wci *watchCacheInterval) Next() (*watchCacheEvent, error) {
|
||||
// if there are items in the buffer to return, return from
|
||||
// the buffer.
|
||||
if event, exists := wci.buffer.next(); exists {
|
||||
return event, nil
|
||||
}
|
||||
// check if there are still other events in this interval
|
||||
// that can be processed.
|
||||
if wci.startIndex >= wci.endIndex {
|
||||
return nil, nil
|
||||
}
|
||||
wci.lock.Lock()
|
||||
defer wci.lock.Unlock()
|
||||
|
||||
if valid := wci.indexValidator(wci.startIndex); !valid {
|
||||
return nil, fmt.Errorf("cache interval invalidated, interval startIndex: %d", wci.startIndex)
|
||||
}
|
||||
|
||||
wci.fillBuffer()
|
||||
if event, exists := wci.buffer.next(); exists {
|
||||
return event, nil
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (wci *watchCacheInterval) fillBuffer() {
|
||||
wci.buffer.startIndex = 0
|
||||
wci.buffer.endIndex = 0
|
||||
for wci.startIndex < wci.endIndex && !wci.buffer.isFull() {
|
||||
event := wci.indexer(wci.startIndex)
|
||||
if event == nil {
|
||||
break
|
||||
}
|
||||
wci.buffer.buffer[wci.buffer.endIndex] = event
|
||||
wci.buffer.endIndex++
|
||||
wci.startIndex++
|
||||
}
|
||||
}
|
||||
|
||||
const bufferSize = 100
|
||||
|
||||
// watchCacheIntervalBuffer is used to reduce acquiring
|
||||
// the lock on each invocation of watchCacheInterval.Next().
|
||||
type watchCacheIntervalBuffer struct {
|
||||
// buffer is used to hold watchCacheEvents that
|
||||
// the interval returns on a call to Next().
|
||||
buffer []*watchCacheEvent
|
||||
// The first element of buffer is defined by startIndex,
|
||||
// its last element is defined by endIndex.
|
||||
startIndex int
|
||||
endIndex int
|
||||
}
|
||||
|
||||
// next returns the next event present in the interval buffer provided
|
||||
// it is not empty.
|
||||
func (wcib *watchCacheIntervalBuffer) next() (*watchCacheEvent, bool) {
|
||||
if wcib.isEmpty() {
|
||||
return nil, false
|
||||
}
|
||||
next := wcib.buffer[wcib.startIndex]
|
||||
wcib.startIndex++
|
||||
return next, true
|
||||
}
|
||||
|
||||
func (wcib *watchCacheIntervalBuffer) isFull() bool {
|
||||
return wcib.endIndex >= bufferSize
|
||||
}
|
||||
|
||||
func (wcib *watchCacheIntervalBuffer) isEmpty() bool {
|
||||
return wcib.startIndex == wcib.endIndex
|
||||
}
|
@ -0,0 +1,426 @@
|
||||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cacher
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
)
|
||||
|
||||
func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval {
|
||||
startIndex, endIndex, locker := 0, len(events), &sync.Mutex{}
|
||||
indexer := func(i int) *watchCacheEvent {
|
||||
if len(events) == 0 {
|
||||
return nil
|
||||
}
|
||||
return events[i]
|
||||
}
|
||||
indexValidator := func(_ int) bool { return true }
|
||||
|
||||
return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker)
|
||||
}
|
||||
|
||||
func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer {
|
||||
wcib := &watchCacheIntervalBuffer{
|
||||
buffer: make([]*watchCacheEvent, bufferSize),
|
||||
startIndex: 0,
|
||||
endIndex: len(events),
|
||||
}
|
||||
copy(wcib.buffer, events)
|
||||
|
||||
return wcib
|
||||
}
|
||||
|
||||
func generateEvents(start, end int) []*watchCacheEvent {
|
||||
n := end - start
|
||||
events := make([]*watchCacheEvent, n)
|
||||
for i := 0; i < n; i++ {
|
||||
events[i] = &watchCacheEvent{
|
||||
Type: watch.Added,
|
||||
Object: makeTestPod(fmt.Sprintf("pod%d", start+i), uint64(start+i)),
|
||||
}
|
||||
}
|
||||
return events
|
||||
}
|
||||
|
||||
func verifyEvent(ok bool, event, expectedEvent *watchCacheEvent) error {
|
||||
if !ok {
|
||||
return fmt.Errorf("expected event: %#v, got no event", expectedEvent)
|
||||
}
|
||||
|
||||
if event == nil {
|
||||
return fmt.Errorf("unexpected nil event, expected: %#v", expectedEvent)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(event, expectedEvent) {
|
||||
return fmt.Errorf("expected %v, got %v", *event, *expectedEvent)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func verifyNoEvent(ok bool, event *watchCacheEvent) error {
|
||||
if ok {
|
||||
return errors.New("unexpected bool value indicating buffer is not empty")
|
||||
}
|
||||
if event != nil {
|
||||
return fmt.Errorf("unexpected event received, expected: nil, got %v", *event)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestIntervalBufferIsFull(t *testing.T) {
|
||||
cases := []struct {
|
||||
endIndex int
|
||||
expected bool
|
||||
}{
|
||||
{endIndex: bufferSize - 1, expected: false},
|
||||
{endIndex: bufferSize, expected: true},
|
||||
{endIndex: bufferSize + 1, expected: true},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
wcib := &watchCacheIntervalBuffer{endIndex: c.endIndex}
|
||||
actual := wcib.isFull()
|
||||
if actual != c.expected {
|
||||
t.Errorf("expected %v, got %v", c.expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntervalBufferIsEmpty(t *testing.T) {
|
||||
cases := []struct {
|
||||
startIndex int
|
||||
endIndex int
|
||||
expected bool
|
||||
}{
|
||||
{startIndex: 0, endIndex: 10, expected: false},
|
||||
{startIndex: 5, endIndex: 20, expected: false},
|
||||
{startIndex: 50, endIndex: 50, expected: true},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
wcib := &watchCacheIntervalBuffer{
|
||||
startIndex: c.startIndex,
|
||||
endIndex: c.endIndex,
|
||||
}
|
||||
actual := wcib.isEmpty()
|
||||
if actual != c.expected {
|
||||
t.Errorf("expected %v, got %v", c.expected, actual)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIntervalBufferNext(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
events []*watchCacheEvent
|
||||
}{
|
||||
{
|
||||
name: "buffer has elements",
|
||||
events: []*watchCacheEvent{
|
||||
{Type: watch.Added, Object: makeTestPod("pod1", 1)},
|
||||
{Type: watch.Added, Object: makeTestPod("pod2", 2)},
|
||||
{Type: watch.Modified, Object: makeTestPod("pod3", 3)},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "buffer is empty",
|
||||
events: []*watchCacheEvent{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
wcib := bufferFromEvents(c.events)
|
||||
for i := 0; i < len(c.events); i++ {
|
||||
event, ok := wcib.next()
|
||||
if err := verifyEvent(ok, event, c.events[i]); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
event, ok := wcib.next()
|
||||
if err := verifyNoEvent(ok, event); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestFillBuffer(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
numEventsToFill int
|
||||
}{
|
||||
{
|
||||
name: "no events to put in buffer",
|
||||
numEventsToFill: 0,
|
||||
},
|
||||
{
|
||||
name: "less than bufferSize events to put in buffer",
|
||||
numEventsToFill: 5,
|
||||
},
|
||||
{
|
||||
name: "equal to bufferSize events to put in buffer",
|
||||
numEventsToFill: bufferSize,
|
||||
},
|
||||
{
|
||||
name: "greater than bufferSize events to put in buffer",
|
||||
numEventsToFill: bufferSize + 5,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
events := generateEvents(0, c.numEventsToFill)
|
||||
wci := intervalFromEvents(events)
|
||||
|
||||
for i := 0; i < len(events); i++ {
|
||||
if i%bufferSize == 0 {
|
||||
wci.fillBuffer()
|
||||
}
|
||||
event, ok := wci.buffer.next()
|
||||
if err := verifyEvent(ok, event, events[i]); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// If we have already received bufferSize number of events,
|
||||
// buffer should be empty and we should receive no event.
|
||||
if i%bufferSize == bufferSize-1 {
|
||||
event, ok := wci.buffer.next()
|
||||
if err := verifyNoEvent(ok, event); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
// buffer should be empty and return no event.
|
||||
event, ok := wci.buffer.next()
|
||||
if err := verifyNoEvent(ok, event); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
// Buffer should be empty now, an additional fillBuffer()
|
||||
// should make no difference.
|
||||
wci.fillBuffer()
|
||||
event, ok = wci.buffer.next()
|
||||
if err := verifyNoEvent(ok, event); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheIntervalNextFromWatchCache(t *testing.T) {
|
||||
// Have the capacity such that it facilitates
|
||||
// filling the interval buffer more than once
|
||||
// completely and then some more - 10 here is
|
||||
// arbitrary.
|
||||
const capacity = 2*bufferSize + 10
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
// The total number of events that the watch
|
||||
// cache will be populated with to start with.
|
||||
eventsAddedToWatchcache int
|
||||
intervalStartIndex int
|
||||
}{
|
||||
{
|
||||
name: "watchCache empty, eventsAddedToWatchcache = 0",
|
||||
eventsAddedToWatchcache: 0,
|
||||
intervalStartIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "watchCache partially propagated, eventsAddedToWatchcache < capacity",
|
||||
eventsAddedToWatchcache: bufferSize,
|
||||
intervalStartIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "watchCache partially propagated, eventsAddedToWatchcache < capacity, intervalStartIndex at some offset",
|
||||
eventsAddedToWatchcache: bufferSize,
|
||||
intervalStartIndex: 5,
|
||||
},
|
||||
{
|
||||
name: "watchCache fully propagated, eventsAddedToWatchcache = capacity",
|
||||
eventsAddedToWatchcache: capacity,
|
||||
intervalStartIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "watchCache fully propagated, eventsAddedToWatchcache = capacity, intervalStartIndex at some offset",
|
||||
eventsAddedToWatchcache: capacity,
|
||||
intervalStartIndex: 5,
|
||||
},
|
||||
{
|
||||
name: "watchCache over propagated, eventsAddedToWatchcache > capacity",
|
||||
eventsAddedToWatchcache: capacity + bufferSize,
|
||||
intervalStartIndex: 0,
|
||||
},
|
||||
{
|
||||
name: "watchCache over propagated, eventsAddedToWatchcache > capacity, intervalStartIndex at some offset",
|
||||
eventsAddedToWatchcache: capacity + bufferSize,
|
||||
intervalStartIndex: 5,
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
wc := newTestWatchCache(capacity, &cache.Indexers{})
|
||||
for i := 0; i < c.eventsAddedToWatchcache; i++ {
|
||||
wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))
|
||||
}
|
||||
indexerFunc := func(i int) *watchCacheEvent {
|
||||
return wc.cache[i%wc.capacity]
|
||||
}
|
||||
|
||||
wci := newCacheInterval(
|
||||
c.intervalStartIndex,
|
||||
wc.endIndex,
|
||||
indexerFunc,
|
||||
wc.isIndexValidLocked,
|
||||
&wc.RWMutex,
|
||||
)
|
||||
|
||||
numExpectedEvents := wc.endIndex - c.intervalStartIndex
|
||||
for i := 0; i < numExpectedEvents; i++ {
|
||||
// Simulate and test interval invalidation iff
|
||||
// the watchCache itself is not empty.
|
||||
if c.eventsAddedToWatchcache > 0 {
|
||||
// The points at which we want to artificially
|
||||
// invalidate the interval and test its behaviour
|
||||
// should be multiples of bufferSize. This is because
|
||||
// invalidation only needs to be checked when we are
|
||||
// copying over events from the underlying watch cache,
|
||||
// i.e. freshly filling in the interval buffer.
|
||||
if i%bufferSize == 0 && i != c.eventsAddedToWatchcache {
|
||||
originalCacheStartIndex := wc.startIndex
|
||||
wc.startIndex = wci.startIndex + 1
|
||||
event, err := wci.Next()
|
||||
if err == nil {
|
||||
t.Errorf("expected non-nil error")
|
||||
}
|
||||
if event != nil {
|
||||
t.Errorf("expected nil event, got %v", *event)
|
||||
}
|
||||
// Restore startIndex.
|
||||
wc.startIndex = originalCacheStartIndex
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the state of the interval buffer is as expected.
|
||||
// The interval buffer can be empty either when received is
|
||||
// either a multiple of bufferSize (after one complete fill)
|
||||
// or when received is equal to the number of expected events.
|
||||
// The latter happens when partial filling occurs and no more
|
||||
// events are left post the partial fill.
|
||||
if wci.buffer.isEmpty() != (i%bufferSize == 0 || i == numExpectedEvents) {
|
||||
t.Error("expected empty interval buffer")
|
||||
return
|
||||
}
|
||||
|
||||
event, err := wci.Next()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
expectedIndex := (c.intervalStartIndex + i) % wc.capacity
|
||||
expectedEvent := wc.cache[expectedIndex]
|
||||
if err := verifyEvent(true, event, expectedEvent); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
event, err := wci.Next()
|
||||
ok := err != nil
|
||||
if err := verifyNoEvent(ok, event); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCacheIntervalNextFromStore(t *testing.T) {
|
||||
getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*v1.Pod)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("not a pod")
|
||||
}
|
||||
return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil
|
||||
}
|
||||
const numEvents = 50
|
||||
store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil))
|
||||
events := make(map[string]*watchCacheEvent)
|
||||
var rv uint64 = 1 // arbitrary number; rv till which the watch cache has progressed.
|
||||
|
||||
for i := 0; i < numEvents; i++ {
|
||||
elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))
|
||||
objLabels, objFields, err := getAttrsFunc(elem.Object)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
events[elem.Key] = &watchCacheEvent{
|
||||
Type: watch.Added,
|
||||
Object: elem.Object,
|
||||
ObjLabels: objLabels,
|
||||
ObjFields: objFields,
|
||||
Key: elem.Key,
|
||||
ResourceVersion: rv,
|
||||
}
|
||||
store.Add(elem)
|
||||
}
|
||||
|
||||
wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for i := 0; i < numEvents; i++ {
|
||||
// The interval buffer can never be empty unless
|
||||
// all elements obtained through List() have been
|
||||
// returned.
|
||||
if wci.buffer.isEmpty() && i != numEvents {
|
||||
t.Fatal("expected non-empty interval buffer")
|
||||
}
|
||||
event, err := wci.Next()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if event == nil {
|
||||
t.Error("unexpected nil event")
|
||||
break
|
||||
}
|
||||
expectedEvent, ok := events[event.Key]
|
||||
if !ok {
|
||||
t.Fatalf("event with key %s not found", event.Key)
|
||||
}
|
||||
if !reflect.DeepEqual(event, expectedEvent) {
|
||||
t.Errorf("expected: %v, got %v", *events[event.Key], *event)
|
||||
}
|
||||
}
|
||||
|
||||
// The interval's buffer should now be empty.
|
||||
if !wci.buffer.isEmpty() {
|
||||
t.Error("expected cache interval's buffer to be empty")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user