diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index f52bdd053e9..42d84653324 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -636,3 +636,9 @@ func (w *watchCache) Resync() error { // Nothing to do return nil } + +// isIndexValidLocked checks if a given index is still valid. +// This assumes that the lock is held. +func (w *watchCache) isIndexValidLocked(index int) bool { + return index >= w.startIndex +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go new file mode 100644 index 00000000000..d25fe1f49f5 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -0,0 +1,224 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +// watchCacheInterval serves as an abstraction over a source +// of watchCacheEvents. It maintains a window of events over +// an underlying source and these events can be served using +// the exposed Next() API. The main intent for doing things +// this way is to introduce an upper bound of memory usage +// for starting a watch and reduce the maximum possible time +// interval for which the lock would be held while events are +// copied over. + +// The source of events for the interval is typically either +// the watchCache circular buffer, if events being retrieved +// need to be for resource versions > 0 or the underlying +// implementation of Store, if resource version = 0. + +// Furthermore, an interval can be either valid or invalid at +// any given point of time. The notion of validity makes sense +// only in cases where the window of events in the underlying +// source can change over time - i.e. for watchCache circular +// buffer. When the circular buffer is full and an event needs +// to be popped off, watchCache::startIndex is incremented. In +// this case, an interval tracking that popped event is valid +// only if it has already been copied to its internal buffer. +// However, for efficiency we perform that lazily and we mark +// an interval as invalid iff we need to copy events from the +// watchCache and we end up needing events that have already +// been popped off. This translates to the following condition: +// watchCacheInterval::startIndex >= watchCache::startIndex. +// When this condition becomes false, the interval is no longer +// valid and should not be used to retrieve and serve elements +// from the underlying source. +type watchCacheInterval struct { + // startIndex denotes the starting point of the interval + // being considered. The value is the index in the actual + // source of watchCacheEvents. If the source of events is + // the watchCache, then this must be used modulo capacity. + startIndex int + + // endIndex denotes the ending point of the interval being + // considered. The value is the index in the actual source + // of events. If the source of the events is the watchCache, + // then this should be used modulo capacity. + endIndex int + + // indexer is meant to inject behaviour for how an event must + // be retrieved from the underlying source given an index. + indexer indexerFunc + + // indexValidator is used to check if a given index is still + // valid perspective. If it is deemed that the index is not + // valid, then this interval can no longer be used to serve + // events. Use of indexValidator is warranted only in cases + // where the window of events in the underlying source can + // change over time. Furthermore, an interval is invalid if + // its startIndex no longer coincides with the startIndex of + // underlying source. + indexValidator indexValidator + + // buffer holds watchCacheEvents that this interval returns on + // a call to Next(). This exists mainly to reduce acquiring the + // lock on each invocation of Next(). + buffer *watchCacheIntervalBuffer + + // lock effectively protects access to the underlying source + // of events through - indexer and indexValidator. + // + // Given that indexer and indexValidator only read state, if + // possible, Locker obtained through RLocker() is provided. + lock sync.Locker +} + +type attrFunc func(runtime.Object) (labels.Set, fields.Set, error) +type indexerFunc func(int) *watchCacheEvent +type indexValidator func(int) bool + +func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValidator indexValidator, locker sync.Locker) *watchCacheInterval { + return &watchCacheInterval{ + startIndex: startIndex, + endIndex: endIndex, + indexer: indexer, + indexValidator: indexValidator, + buffer: &watchCacheIntervalBuffer{buffer: make([]*watchCacheEvent, bufferSize)}, + lock: locker, + } +} + +// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events +// returned by Next() need to be events from a List() done on the underlying store of +// the watch cache. +func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) { + buffer := &watchCacheIntervalBuffer{} + allItems := store.List() + buffer.buffer = make([]*watchCacheEvent, len(allItems)) + for i, item := range allItems { + elem, ok := item.(*storeElement) + if !ok { + return nil, fmt.Errorf("not a storeElement: %v", elem) + } + objLabels, objFields, err := getAttrsFunc(elem.Object) + if err != nil { + return nil, err + } + buffer.buffer[i] = &watchCacheEvent{ + Type: watch.Added, + Object: elem.Object, + ObjLabels: objLabels, + ObjFields: objFields, + Key: elem.Key, + ResourceVersion: resourceVersion, + } + buffer.endIndex++ + } + ci := &watchCacheInterval{ + startIndex: 0, + // Simulate that we already have all the events we're looking for. + endIndex: 0, + buffer: buffer, + } + + return ci, nil +} + +// Next returns the next item in the cache interval provided the cache +// interval is still valid. An error is returned if the interval is +// invalidated. +func (wci *watchCacheInterval) Next() (*watchCacheEvent, error) { + // if there are items in the buffer to return, return from + // the buffer. + if event, exists := wci.buffer.next(); exists { + return event, nil + } + // check if there are still other events in this interval + // that can be processed. + if wci.startIndex >= wci.endIndex { + return nil, nil + } + wci.lock.Lock() + defer wci.lock.Unlock() + + if valid := wci.indexValidator(wci.startIndex); !valid { + return nil, fmt.Errorf("cache interval invalidated, interval startIndex: %d", wci.startIndex) + } + + wci.fillBuffer() + if event, exists := wci.buffer.next(); exists { + return event, nil + } + return nil, nil +} + +func (wci *watchCacheInterval) fillBuffer() { + wci.buffer.startIndex = 0 + wci.buffer.endIndex = 0 + for wci.startIndex < wci.endIndex && !wci.buffer.isFull() { + event := wci.indexer(wci.startIndex) + if event == nil { + break + } + wci.buffer.buffer[wci.buffer.endIndex] = event + wci.buffer.endIndex++ + wci.startIndex++ + } +} + +const bufferSize = 100 + +// watchCacheIntervalBuffer is used to reduce acquiring +// the lock on each invocation of watchCacheInterval.Next(). +type watchCacheIntervalBuffer struct { + // buffer is used to hold watchCacheEvents that + // the interval returns on a call to Next(). + buffer []*watchCacheEvent + // The first element of buffer is defined by startIndex, + // its last element is defined by endIndex. + startIndex int + endIndex int +} + +// next returns the next event present in the interval buffer provided +// it is not empty. +func (wcib *watchCacheIntervalBuffer) next() (*watchCacheEvent, bool) { + if wcib.isEmpty() { + return nil, false + } + next := wcib.buffer[wcib.startIndex] + wcib.startIndex++ + return next, true +} + +func (wcib *watchCacheIntervalBuffer) isFull() bool { + return wcib.endIndex >= bufferSize +} + +func (wcib *watchCacheIntervalBuffer) isEmpty() bool { + return wcib.startIndex == wcib.endIndex +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go new file mode 100644 index 00000000000..14562da108e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go @@ -0,0 +1,426 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cacher + +import ( + "errors" + "fmt" + "reflect" + "sync" + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" +) + +func intervalFromEvents(events []*watchCacheEvent) *watchCacheInterval { + startIndex, endIndex, locker := 0, len(events), &sync.Mutex{} + indexer := func(i int) *watchCacheEvent { + if len(events) == 0 { + return nil + } + return events[i] + } + indexValidator := func(_ int) bool { return true } + + return newCacheInterval(startIndex, endIndex, indexer, indexValidator, locker) +} + +func bufferFromEvents(events []*watchCacheEvent) *watchCacheIntervalBuffer { + wcib := &watchCacheIntervalBuffer{ + buffer: make([]*watchCacheEvent, bufferSize), + startIndex: 0, + endIndex: len(events), + } + copy(wcib.buffer, events) + + return wcib +} + +func generateEvents(start, end int) []*watchCacheEvent { + n := end - start + events := make([]*watchCacheEvent, n) + for i := 0; i < n; i++ { + events[i] = &watchCacheEvent{ + Type: watch.Added, + Object: makeTestPod(fmt.Sprintf("pod%d", start+i), uint64(start+i)), + } + } + return events +} + +func verifyEvent(ok bool, event, expectedEvent *watchCacheEvent) error { + if !ok { + return fmt.Errorf("expected event: %#v, got no event", expectedEvent) + } + + if event == nil { + return fmt.Errorf("unexpected nil event, expected: %#v", expectedEvent) + } + + if !reflect.DeepEqual(event, expectedEvent) { + return fmt.Errorf("expected %v, got %v", *event, *expectedEvent) + } + + return nil +} + +func verifyNoEvent(ok bool, event *watchCacheEvent) error { + if ok { + return errors.New("unexpected bool value indicating buffer is not empty") + } + if event != nil { + return fmt.Errorf("unexpected event received, expected: nil, got %v", *event) + } + + return nil +} + +func TestIntervalBufferIsFull(t *testing.T) { + cases := []struct { + endIndex int + expected bool + }{ + {endIndex: bufferSize - 1, expected: false}, + {endIndex: bufferSize, expected: true}, + {endIndex: bufferSize + 1, expected: true}, + } + + for _, c := range cases { + wcib := &watchCacheIntervalBuffer{endIndex: c.endIndex} + actual := wcib.isFull() + if actual != c.expected { + t.Errorf("expected %v, got %v", c.expected, actual) + } + } +} + +func TestIntervalBufferIsEmpty(t *testing.T) { + cases := []struct { + startIndex int + endIndex int + expected bool + }{ + {startIndex: 0, endIndex: 10, expected: false}, + {startIndex: 5, endIndex: 20, expected: false}, + {startIndex: 50, endIndex: 50, expected: true}, + } + + for _, c := range cases { + wcib := &watchCacheIntervalBuffer{ + startIndex: c.startIndex, + endIndex: c.endIndex, + } + actual := wcib.isEmpty() + if actual != c.expected { + t.Errorf("expected %v, got %v", c.expected, actual) + } + } +} + +func TestIntervalBufferNext(t *testing.T) { + cases := []struct { + name string + events []*watchCacheEvent + }{ + { + name: "buffer has elements", + events: []*watchCacheEvent{ + {Type: watch.Added, Object: makeTestPod("pod1", 1)}, + {Type: watch.Added, Object: makeTestPod("pod2", 2)}, + {Type: watch.Modified, Object: makeTestPod("pod3", 3)}, + }, + }, + { + name: "buffer is empty", + events: []*watchCacheEvent{}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + wcib := bufferFromEvents(c.events) + for i := 0; i < len(c.events); i++ { + event, ok := wcib.next() + if err := verifyEvent(ok, event, c.events[i]); err != nil { + t.Error(err) + } + } + event, ok := wcib.next() + if err := verifyNoEvent(ok, event); err != nil { + t.Error(err) + } + }) + } +} + +func TestFillBuffer(t *testing.T) { + cases := []struct { + name string + numEventsToFill int + }{ + { + name: "no events to put in buffer", + numEventsToFill: 0, + }, + { + name: "less than bufferSize events to put in buffer", + numEventsToFill: 5, + }, + { + name: "equal to bufferSize events to put in buffer", + numEventsToFill: bufferSize, + }, + { + name: "greater than bufferSize events to put in buffer", + numEventsToFill: bufferSize + 5, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + events := generateEvents(0, c.numEventsToFill) + wci := intervalFromEvents(events) + + for i := 0; i < len(events); i++ { + if i%bufferSize == 0 { + wci.fillBuffer() + } + event, ok := wci.buffer.next() + if err := verifyEvent(ok, event, events[i]); err != nil { + t.Error(err) + } + // If we have already received bufferSize number of events, + // buffer should be empty and we should receive no event. + if i%bufferSize == bufferSize-1 { + event, ok := wci.buffer.next() + if err := verifyNoEvent(ok, event); err != nil { + t.Error(err) + } + } + } + // buffer should be empty and return no event. + event, ok := wci.buffer.next() + if err := verifyNoEvent(ok, event); err != nil { + t.Error(err) + } + // Buffer should be empty now, an additional fillBuffer() + // should make no difference. + wci.fillBuffer() + event, ok = wci.buffer.next() + if err := verifyNoEvent(ok, event); err != nil { + t.Error(err) + } + }) + } +} + +func TestCacheIntervalNextFromWatchCache(t *testing.T) { + // Have the capacity such that it facilitates + // filling the interval buffer more than once + // completely and then some more - 10 here is + // arbitrary. + const capacity = 2*bufferSize + 10 + + cases := []struct { + name string + // The total number of events that the watch + // cache will be populated with to start with. + eventsAddedToWatchcache int + intervalStartIndex int + }{ + { + name: "watchCache empty, eventsAddedToWatchcache = 0", + eventsAddedToWatchcache: 0, + intervalStartIndex: 0, + }, + { + name: "watchCache partially propagated, eventsAddedToWatchcache < capacity", + eventsAddedToWatchcache: bufferSize, + intervalStartIndex: 0, + }, + { + name: "watchCache partially propagated, eventsAddedToWatchcache < capacity, intervalStartIndex at some offset", + eventsAddedToWatchcache: bufferSize, + intervalStartIndex: 5, + }, + { + name: "watchCache fully propagated, eventsAddedToWatchcache = capacity", + eventsAddedToWatchcache: capacity, + intervalStartIndex: 0, + }, + { + name: "watchCache fully propagated, eventsAddedToWatchcache = capacity, intervalStartIndex at some offset", + eventsAddedToWatchcache: capacity, + intervalStartIndex: 5, + }, + { + name: "watchCache over propagated, eventsAddedToWatchcache > capacity", + eventsAddedToWatchcache: capacity + bufferSize, + intervalStartIndex: 0, + }, + { + name: "watchCache over propagated, eventsAddedToWatchcache > capacity, intervalStartIndex at some offset", + eventsAddedToWatchcache: capacity + bufferSize, + intervalStartIndex: 5, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + wc := newTestWatchCache(capacity, &cache.Indexers{}) + for i := 0; i < c.eventsAddedToWatchcache; i++ { + wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) + } + indexerFunc := func(i int) *watchCacheEvent { + return wc.cache[i%wc.capacity] + } + + wci := newCacheInterval( + c.intervalStartIndex, + wc.endIndex, + indexerFunc, + wc.isIndexValidLocked, + &wc.RWMutex, + ) + + numExpectedEvents := wc.endIndex - c.intervalStartIndex + for i := 0; i < numExpectedEvents; i++ { + // Simulate and test interval invalidation iff + // the watchCache itself is not empty. + if c.eventsAddedToWatchcache > 0 { + // The points at which we want to artificially + // invalidate the interval and test its behaviour + // should be multiples of bufferSize. This is because + // invalidation only needs to be checked when we are + // copying over events from the underlying watch cache, + // i.e. freshly filling in the interval buffer. + if i%bufferSize == 0 && i != c.eventsAddedToWatchcache { + originalCacheStartIndex := wc.startIndex + wc.startIndex = wci.startIndex + 1 + event, err := wci.Next() + if err == nil { + t.Errorf("expected non-nil error") + } + if event != nil { + t.Errorf("expected nil event, got %v", *event) + } + // Restore startIndex. + wc.startIndex = originalCacheStartIndex + } + } + + // Check if the state of the interval buffer is as expected. + // The interval buffer can be empty either when received is + // either a multiple of bufferSize (after one complete fill) + // or when received is equal to the number of expected events. + // The latter happens when partial filling occurs and no more + // events are left post the partial fill. + if wci.buffer.isEmpty() != (i%bufferSize == 0 || i == numExpectedEvents) { + t.Error("expected empty interval buffer") + return + } + + event, err := wci.Next() + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + + expectedIndex := (c.intervalStartIndex + i) % wc.capacity + expectedEvent := wc.cache[expectedIndex] + if err := verifyEvent(true, event, expectedEvent); err != nil { + t.Error(err) + } + } + event, err := wci.Next() + ok := err != nil + if err := verifyNoEvent(ok, event); err != nil { + t.Error(err) + } + }) + } +} + +func TestCacheIntervalNextFromStore(t *testing.T) { + getAttrsFunc := func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, nil, fmt.Errorf("not a pod") + } + return labels.Set(pod.Labels), fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil + } + const numEvents = 50 + store := cache.NewIndexer(storeElementKey, storeElementIndexers(nil)) + events := make(map[string]*watchCacheEvent) + var rv uint64 = 1 // arbitrary number; rv till which the watch cache has progressed. + + for i := 0; i < numEvents; i++ { + elem := makeTestStoreElement(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) + objLabels, objFields, err := getAttrsFunc(elem.Object) + if err != nil { + t.Fatal(err) + } + events[elem.Key] = &watchCacheEvent{ + Type: watch.Added, + Object: elem.Object, + ObjLabels: objLabels, + ObjFields: objFields, + Key: elem.Key, + ResourceVersion: rv, + } + store.Add(elem) + } + + wci, err := newCacheIntervalFromStore(rv, store, getAttrsFunc) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < numEvents; i++ { + // The interval buffer can never be empty unless + // all elements obtained through List() have been + // returned. + if wci.buffer.isEmpty() && i != numEvents { + t.Fatal("expected non-empty interval buffer") + } + event, err := wci.Next() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if event == nil { + t.Error("unexpected nil event") + break + } + expectedEvent, ok := events[event.Key] + if !ok { + t.Fatalf("event with key %s not found", event.Key) + } + if !reflect.DeepEqual(event, expectedEvent) { + t.Errorf("expected: %v, got %v", *events[event.Key], *event) + } + } + + // The interval's buffer should now be empty. + if !wci.buffer.isEmpty() { + t.Error("expected cache interval's buffer to be empty") + } +}