diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index b09758d8e68..6df0fd135e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -315,10 +315,11 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } clock := clock.RealClock{} + objType := reflect.TypeOf(obj) cacher := &Cacher{ ready: newReady(), storage: config.Storage, - objectType: reflect.TypeOf(obj), + objectType: objType, versioner: config.Versioner, newFunc: config.NewFunc, indexedTrigger: indexedTrigger, @@ -349,7 +350,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } watchCache := newWatchCache( - config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers) + config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType) listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics.go index 0d6a0cdbf7c..19cd5da6af4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/metrics.go @@ -1,5 +1,5 @@ /* -Copyright 2019 The Kubernetes Authors. +Copyright 2020 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -33,7 +33,25 @@ var ( initCounter = metrics.NewCounterVec( &metrics.CounterOpts{ Name: "apiserver_init_events_total", - Help: "Counter of init events processed in watchcache broken by resource type", + Help: "Counter of init events processed in watchcache broken by resource type.", + StabilityLevel: metrics.ALPHA, + }, + []string{"resource"}, + ) + + watchCacheCapacityIncreaseTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: "watch_cache_capacity_increase_total", + Help: "Total number of watch cache capacity increase events broken by resource type.", + StabilityLevel: metrics.ALPHA, + }, + []string{"resource"}, + ) + + watchCacheCapacityDecreaseTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Name: "watch_cache_capacity_decrease_total", + Help: "Total number of watch cache capacity decrease events broken by resource type.", StabilityLevel: metrics.ALPHA, }, []string{"resource"}, @@ -42,4 +60,15 @@ var ( func init() { legacyregistry.MustRegister(initCounter) + legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal) + legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal) +} + +// recordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations. +func recordsWatchCacheCapacityChange(objType string, old, new int) { + if old < new { + watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc() + return + } + watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc() } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go index 63a23800f02..7943a93dcab 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/util.go @@ -44,3 +44,17 @@ func hasPathPrefix(s, pathPrefix string) bool { } return false } + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 1ad94a265c0..553a91a9e44 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "reflect" "sort" "sync" "time" @@ -44,6 +45,20 @@ const ( // resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client // after receiving a 'too high resource version' error. resourceVersionTooHighRetrySeconds = 1 + + // eventFreshDuration is time duration of events we want to keep. + eventFreshDuration = 5 * time.Minute + + // defaultLowerBoundCapacity is a default value for event cache capacity's lower bound. + // 100 is minimum in NewHeuristicWatchCacheSizes. + // TODO: Figure out, to what value we can decreased it. + defaultLowerBoundCapacity = 100 + + // defaultUpperBoundCapacity should be able to keep eventFreshDuration of history. + // With the current 102400 value though, it's not enough for leases in 5k-node cluster, + // but that is conscious decision. + // TODO: Validate if the current value is high enough for large scale clusters. + defaultUpperBoundCapacity = 100 * 1024 ) // watchCacheEvent is a single "watch event" that is send to users of @@ -60,6 +75,7 @@ type watchCacheEvent struct { PrevObjFields fields.Set Key string ResourceVersion uint64 + RecordTime time.Time } // Computing a key of an object is generally non-trivial (it performs @@ -126,6 +142,12 @@ type watchCache struct { // Maximum size of history window. capacity int + // upper bound of capacity since event cache has a dynamic size. + upperBoundCapacity int + + // lower bound of capacity since event cache has a dynamic size. + lowerBoundCapacity int + // keyFunc is used to get a key in the underlying storage for a given object. keyFunc func(runtime.Object) (string, error) @@ -165,6 +187,9 @@ type watchCache struct { // An underlying storage.Versioner. versioner storage.Versioner + + // cacher's objectType. + objectType reflect.Type } func newWatchCache( @@ -173,12 +198,16 @@ func newWatchCache( eventHandler func(*watchCacheEvent), getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error), versioner storage.Versioner, - indexers *cache.Indexers) *watchCache { + indexers *cache.Indexers, + objectType reflect.Type) *watchCache { wc := &watchCache{ - capacity: capacity, - keyFunc: keyFunc, - getAttrsFunc: getAttrsFunc, - cache: make([]*watchCacheEvent, capacity), + capacity: capacity, + keyFunc: keyFunc, + getAttrsFunc: getAttrsFunc, + cache: make([]*watchCacheEvent, capacity), + // TODO get rid of them once we stop passing capacity as a parameter to watch cache. + lowerBoundCapacity: min(capacity, defaultLowerBoundCapacity), + upperBoundCapacity: max(capacity, defaultUpperBoundCapacity), startIndex: 0, endIndex: 0, store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)), @@ -187,6 +216,7 @@ func newWatchCache( eventHandler: eventHandler, clock: clock.RealClock{}, versioner: versioner, + objectType: objectType, } wc.cond = sync.NewCond(wc.RLocker()) return wc @@ -260,6 +290,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd ObjFields: elem.Fields, Key: key, ResourceVersion: resourceVersion, + RecordTime: w.clock.Now(), } if err := func() error { @@ -301,7 +332,8 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd // Assumes that lock is already held for write. func (w *watchCache) updateCache(event *watchCacheEvent) { - if w.endIndex == w.startIndex+w.capacity { + w.resizeCacheLocked(event.RecordTime) + if w.isCacheFullLocked() { // Cache is full - remove the oldest element. w.startIndex++ } @@ -309,6 +341,48 @@ func (w *watchCache) updateCache(event *watchCacheEvent) { w.endIndex++ } +// resizeCacheLocked resizes the cache if necessary: +// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration. +// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping). +func (w *watchCache) resizeCacheLocked(eventTime time.Time) { + if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration { + capacity := min(w.capacity*2, w.upperBoundCapacity) + if capacity > w.capacity { + w.doCacheResizeLocked(capacity) + } + return + } + if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration { + capacity := max(w.capacity/2, w.lowerBoundCapacity) + if capacity < w.capacity { + w.doCacheResizeLocked(capacity) + } + return + } +} + +// isCacheFullLocked used to judge whether watchCacheEvent is full. +// Assumes that lock is already held for write. +func (w *watchCache) isCacheFullLocked() bool { + return w.endIndex == w.startIndex+w.capacity +} + +// doCacheResizeLocked resize watchCache's event array with different capacity. +// Assumes that lock is already held for write. +func (w *watchCache) doCacheResizeLocked(capacity int) { + newCache := make([]*watchCacheEvent, capacity) + if capacity < w.capacity { + // adjust startIndex if cache capacity shrink. + w.startIndex = w.endIndex - capacity + } + for i := w.startIndex; i < w.endIndex; i++ { + newCache[i%capacity] = w.cache[i%w.capacity] + } + w.cache = newCache + recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity) + w.capacity = capacity +} + // List returns list of pointers to objects. func (w *watchCache) List() []interface{} { return w.store.List() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 5c10b7bb907..de0437501d6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "reflect" "strconv" "strings" "testing" @@ -33,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/etcd3" "k8s.io/client-go/tools/cache" @@ -79,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache { } versioner := etcd3.APIObjectVersioner{} mockHandler := func(*watchCacheEvent) {} - wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers) + wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{})) wc.clock = clock.NewFakeClock(time.Now()) return wc } @@ -164,6 +166,10 @@ func TestWatchCacheBasic(t *testing.T) { func TestEvents(t *testing.T) { store := newTestWatchCache(5, &cache.Indexers{}) + // no dynamic-size cache to fit old tests. + store.lowerBoundCapacity = 5 + store.upperBoundCapacity = 5 + store.Add(makeTestPod("pod", 3)) // Test for Added event. @@ -501,3 +507,292 @@ func TestReflectorForWatchCache(t *testing.T) { } } } + +func TestDynamicCache(t *testing.T) { + tests := []struct { + name string + eventCount int + cacheCapacity int + startIndex int + // interval is time duration between adjacent events. + lowerBoundCapacity int + upperBoundCapacity int + interval time.Duration + expectCapacity int + expectStartIndex int + }{ + { + name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding", + eventCount: 5, + cacheCapacity: 5, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration / 6, + expectCapacity: 10, + expectStartIndex: 0, + }, + { + name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity", + eventCount: 5, + cacheCapacity: 5, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration / 4, + expectCapacity: 5, + expectStartIndex: 0, + }, + { + name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + eventCount: 5, + cacheCapacity: 5, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration + time.Second, + expectCapacity: 2, + expectStartIndex: 3, + }, + { + name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + eventCount: 5, + cacheCapacity: 5, + lowerBoundCapacity: 3, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration + time.Second, + expectCapacity: 3, + expectStartIndex: 2, + }, + { + name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + eventCount: 5, + cacheCapacity: 5, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 8, + interval: eventFreshDuration / 6, + expectCapacity: 8, + expectStartIndex: 0, + }, + { + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + eventCount: 5, + cacheCapacity: 5, + startIndex: 3, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration / 6, + expectCapacity: 10, + expectStartIndex: 3, + }, + { + name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + eventCount: 5, + cacheCapacity: 5, + startIndex: 3, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration / 4, + expectCapacity: 5, + expectStartIndex: 3, + }, + { + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + eventCount: 5, + cacheCapacity: 5, + startIndex: 3, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration + time.Second, + expectCapacity: 2, + expectStartIndex: 6, + }, + { + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + eventCount: 5, + cacheCapacity: 5, + startIndex: 3, + lowerBoundCapacity: 3, + upperBoundCapacity: 5 * 2, + interval: eventFreshDuration + time.Second, + expectCapacity: 3, + expectStartIndex: 5, + }, + { + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + eventCount: 5, + cacheCapacity: 5, + startIndex: 3, + lowerBoundCapacity: 5 / 2, + upperBoundCapacity: 8, + interval: eventFreshDuration / 6, + expectCapacity: 8, + expectStartIndex: 3, + }, + { + name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding", + eventCount: 8, + cacheCapacity: 8, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration / 9, + expectCapacity: 16, + expectStartIndex: 0, + }, + { + name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity", + eventCount: 8, + cacheCapacity: 8, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration / 8, + expectCapacity: 8, + expectStartIndex: 0, + }, + { + name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + eventCount: 8, + cacheCapacity: 8, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration/2 + time.Second, + expectCapacity: 4, + expectStartIndex: 4, + }, + { + name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + eventCount: 8, + cacheCapacity: 8, + lowerBoundCapacity: 7, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration/2 + time.Second, + expectCapacity: 7, + expectStartIndex: 1, + }, + { + name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + eventCount: 8, + cacheCapacity: 8, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 10, + interval: eventFreshDuration / 9, + expectCapacity: 10, + expectStartIndex: 0, + }, + { + name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + eventCount: 8, + cacheCapacity: 8, + startIndex: 3, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration / 9, + expectCapacity: 16, + expectStartIndex: 3, + }, + { + name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + eventCount: 8, + cacheCapacity: 8, + startIndex: 3, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration / 8, + expectCapacity: 8, + expectStartIndex: 3, + }, + { + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + eventCount: 8, + cacheCapacity: 8, + startIndex: 3, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration/2 + time.Second, + expectCapacity: 4, + expectStartIndex: 7, + }, + { + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + eventCount: 8, + cacheCapacity: 8, + startIndex: 3, + lowerBoundCapacity: 7, + upperBoundCapacity: 8 * 2, + interval: eventFreshDuration/2 + time.Second, + expectCapacity: 7, + expectStartIndex: 4, + }, + { + name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + eventCount: 8, + cacheCapacity: 8, + startIndex: 3, + lowerBoundCapacity: 8 / 2, + upperBoundCapacity: 10, + interval: eventFreshDuration / 9, + expectCapacity: 10, + expectStartIndex: 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{}) + store.cache = make([]*watchCacheEvent, test.cacheCapacity) + store.startIndex = test.startIndex + store.lowerBoundCapacity = test.lowerBoundCapacity + store.upperBoundCapacity = test.upperBoundCapacity + loadEventWithDuration(store, test.eventCount, test.interval) + nextInterval := store.clock.Now().Add(time.Duration(test.interval.Nanoseconds() * int64(test.eventCount))) + store.resizeCacheLocked(nextInterval) + if store.capacity != test.expectCapacity { + t.Errorf("expect capacity %d, but get %d", test.expectCapacity, store.capacity) + } + + // check cache's startIndex, endIndex and all elements. + if store.startIndex != test.expectStartIndex { + t.Errorf("expect startIndex %d, but get %d", test.expectStartIndex, store.startIndex) + } + if store.endIndex != test.startIndex+test.eventCount { + t.Errorf("expect endIndex %d get %d", test.startIndex+test.eventCount, store.endIndex) + } + if !checkCacheElements(store) { + t.Errorf("some elements locations in cache is wrong") + } + }) + } +} + +func loadEventWithDuration(cache *watchCache, count int, interval time.Duration) { + for i := 0; i < count; i++ { + event := &watchCacheEvent{ + Key: fmt.Sprintf("event-%d", i+cache.startIndex), + RecordTime: cache.clock.Now().Add(time.Duration(interval.Nanoseconds() * int64(i))), + } + cache.cache[(i+cache.startIndex)%cache.capacity] = event + } + cache.endIndex = cache.startIndex + count +} + +func checkCacheElements(cache *watchCache) bool { + for i := cache.startIndex; i < cache.endIndex; i++ { + location := i % cache.capacity + if cache.cache[location].Key != fmt.Sprintf("event-%d", i) { + return false + } + } + return true +} + +func BenchmarkWatchCache_updateCache(b *testing.B) { + store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) + store.cache = store.cache[:0] + store.upperBoundCapacity = defaultUpperBoundCapacity + loadEventWithDuration(store, defaultUpperBoundCapacity, 0) + add := &watchCacheEvent{ + Key: fmt.Sprintf("event-%d", defaultUpperBoundCapacity), + RecordTime: store.clock.Now(), + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + store.updateCache(add) + } +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 4b66b960240..040f621d440 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -443,9 +443,8 @@ func TestWatch(t *testing.T) { t.Fatalf("Expected no direct error, got %v", err) } defer tooOldWatcher.Stop() - // Ensure we get a "Gone" error - expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus - verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) + // Events happens in eventFreshDuration, cache expand without event "Gone". + verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) if err != nil {