mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Merge pull request #90091 from gongguan/dynamic-cache
Use dynamic size watch-cache.
This commit is contained in:
commit
875d3b534e
@ -315,10 +315,11 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
clock := clock.RealClock{}
|
clock := clock.RealClock{}
|
||||||
|
objType := reflect.TypeOf(obj)
|
||||||
cacher := &Cacher{
|
cacher := &Cacher{
|
||||||
ready: newReady(),
|
ready: newReady(),
|
||||||
storage: config.Storage,
|
storage: config.Storage,
|
||||||
objectType: reflect.TypeOf(obj),
|
objectType: objType,
|
||||||
versioner: config.Versioner,
|
versioner: config.Versioner,
|
||||||
newFunc: config.NewFunc,
|
newFunc: config.NewFunc,
|
||||||
indexedTrigger: indexedTrigger,
|
indexedTrigger: indexedTrigger,
|
||||||
@ -349,7 +350,7 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
watchCache := newWatchCache(
|
watchCache := newWatchCache(
|
||||||
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers)
|
config.CacheCapacity, config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, objType)
|
||||||
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
listerWatcher := NewCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
|
||||||
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright 2019 The Kubernetes Authors.
|
Copyright 2020 The Kubernetes Authors.
|
||||||
|
|
||||||
Licensed under the Apache License, Version 2.0 (the "License");
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
you may not use this file except in compliance with the License.
|
you may not use this file except in compliance with the License.
|
||||||
@ -33,7 +33,25 @@ var (
|
|||||||
initCounter = metrics.NewCounterVec(
|
initCounter = metrics.NewCounterVec(
|
||||||
&metrics.CounterOpts{
|
&metrics.CounterOpts{
|
||||||
Name: "apiserver_init_events_total",
|
Name: "apiserver_init_events_total",
|
||||||
Help: "Counter of init events processed in watchcache broken by resource type",
|
Help: "Counter of init events processed in watchcache broken by resource type.",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{"resource"},
|
||||||
|
)
|
||||||
|
|
||||||
|
watchCacheCapacityIncreaseTotal = metrics.NewCounterVec(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Name: "watch_cache_capacity_increase_total",
|
||||||
|
Help: "Total number of watch cache capacity increase events broken by resource type.",
|
||||||
|
StabilityLevel: metrics.ALPHA,
|
||||||
|
},
|
||||||
|
[]string{"resource"},
|
||||||
|
)
|
||||||
|
|
||||||
|
watchCacheCapacityDecreaseTotal = metrics.NewCounterVec(
|
||||||
|
&metrics.CounterOpts{
|
||||||
|
Name: "watch_cache_capacity_decrease_total",
|
||||||
|
Help: "Total number of watch cache capacity decrease events broken by resource type.",
|
||||||
StabilityLevel: metrics.ALPHA,
|
StabilityLevel: metrics.ALPHA,
|
||||||
},
|
},
|
||||||
[]string{"resource"},
|
[]string{"resource"},
|
||||||
@ -42,4 +60,15 @@ var (
|
|||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
legacyregistry.MustRegister(initCounter)
|
legacyregistry.MustRegister(initCounter)
|
||||||
|
legacyregistry.MustRegister(watchCacheCapacityIncreaseTotal)
|
||||||
|
legacyregistry.MustRegister(watchCacheCapacityDecreaseTotal)
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordsWatchCacheCapacityChange record watchCache capacity resize(increase or decrease) operations.
|
||||||
|
func recordsWatchCacheCapacityChange(objType string, old, new int) {
|
||||||
|
if old < new {
|
||||||
|
watchCacheCapacityIncreaseTotal.WithLabelValues(objType).Inc()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
watchCacheCapacityDecreaseTotal.WithLabelValues(objType).Inc()
|
||||||
}
|
}
|
||||||
|
@ -44,3 +44,17 @@ func hasPathPrefix(s, pathPrefix string) bool {
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func max(a, b int) int {
|
||||||
|
if a > b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
func min(a, b int) int {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
@ -18,6 +18,7 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"sort"
|
"sort"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -44,6 +45,20 @@ const (
|
|||||||
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
|
// resourceVersionTooHighRetrySeconds is the seconds before a operation should be retried by the client
|
||||||
// after receiving a 'too high resource version' error.
|
// after receiving a 'too high resource version' error.
|
||||||
resourceVersionTooHighRetrySeconds = 1
|
resourceVersionTooHighRetrySeconds = 1
|
||||||
|
|
||||||
|
// eventFreshDuration is time duration of events we want to keep.
|
||||||
|
eventFreshDuration = 5 * time.Minute
|
||||||
|
|
||||||
|
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
|
||||||
|
// 100 is minimum in NewHeuristicWatchCacheSizes.
|
||||||
|
// TODO: Figure out, to what value we can decreased it.
|
||||||
|
defaultLowerBoundCapacity = 100
|
||||||
|
|
||||||
|
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
|
||||||
|
// With the current 102400 value though, it's not enough for leases in 5k-node cluster,
|
||||||
|
// but that is conscious decision.
|
||||||
|
// TODO: Validate if the current value is high enough for large scale clusters.
|
||||||
|
defaultUpperBoundCapacity = 100 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// watchCacheEvent is a single "watch event" that is send to users of
|
// watchCacheEvent is a single "watch event" that is send to users of
|
||||||
@ -60,6 +75,7 @@ type watchCacheEvent struct {
|
|||||||
PrevObjFields fields.Set
|
PrevObjFields fields.Set
|
||||||
Key string
|
Key string
|
||||||
ResourceVersion uint64
|
ResourceVersion uint64
|
||||||
|
RecordTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// Computing a key of an object is generally non-trivial (it performs
|
// Computing a key of an object is generally non-trivial (it performs
|
||||||
@ -126,6 +142,12 @@ type watchCache struct {
|
|||||||
// Maximum size of history window.
|
// Maximum size of history window.
|
||||||
capacity int
|
capacity int
|
||||||
|
|
||||||
|
// upper bound of capacity since event cache has a dynamic size.
|
||||||
|
upperBoundCapacity int
|
||||||
|
|
||||||
|
// lower bound of capacity since event cache has a dynamic size.
|
||||||
|
lowerBoundCapacity int
|
||||||
|
|
||||||
// keyFunc is used to get a key in the underlying storage for a given object.
|
// keyFunc is used to get a key in the underlying storage for a given object.
|
||||||
keyFunc func(runtime.Object) (string, error)
|
keyFunc func(runtime.Object) (string, error)
|
||||||
|
|
||||||
@ -165,6 +187,9 @@ type watchCache struct {
|
|||||||
|
|
||||||
// An underlying storage.Versioner.
|
// An underlying storage.Versioner.
|
||||||
versioner storage.Versioner
|
versioner storage.Versioner
|
||||||
|
|
||||||
|
// cacher's objectType.
|
||||||
|
objectType reflect.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
func newWatchCache(
|
func newWatchCache(
|
||||||
@ -173,12 +198,16 @@ func newWatchCache(
|
|||||||
eventHandler func(*watchCacheEvent),
|
eventHandler func(*watchCacheEvent),
|
||||||
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
getAttrsFunc func(runtime.Object) (labels.Set, fields.Set, error),
|
||||||
versioner storage.Versioner,
|
versioner storage.Versioner,
|
||||||
indexers *cache.Indexers) *watchCache {
|
indexers *cache.Indexers,
|
||||||
|
objectType reflect.Type) *watchCache {
|
||||||
wc := &watchCache{
|
wc := &watchCache{
|
||||||
capacity: capacity,
|
capacity: capacity,
|
||||||
keyFunc: keyFunc,
|
keyFunc: keyFunc,
|
||||||
getAttrsFunc: getAttrsFunc,
|
getAttrsFunc: getAttrsFunc,
|
||||||
cache: make([]*watchCacheEvent, capacity),
|
cache: make([]*watchCacheEvent, capacity),
|
||||||
|
// TODO get rid of them once we stop passing capacity as a parameter to watch cache.
|
||||||
|
lowerBoundCapacity: min(capacity, defaultLowerBoundCapacity),
|
||||||
|
upperBoundCapacity: max(capacity, defaultUpperBoundCapacity),
|
||||||
startIndex: 0,
|
startIndex: 0,
|
||||||
endIndex: 0,
|
endIndex: 0,
|
||||||
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
|
store: cache.NewIndexer(storeElementKey, storeElementIndexers(indexers)),
|
||||||
@ -187,6 +216,7 @@ func newWatchCache(
|
|||||||
eventHandler: eventHandler,
|
eventHandler: eventHandler,
|
||||||
clock: clock.RealClock{},
|
clock: clock.RealClock{},
|
||||||
versioner: versioner,
|
versioner: versioner,
|
||||||
|
objectType: objectType,
|
||||||
}
|
}
|
||||||
wc.cond = sync.NewCond(wc.RLocker())
|
wc.cond = sync.NewCond(wc.RLocker())
|
||||||
return wc
|
return wc
|
||||||
@ -260,6 +290,7 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||||||
ObjFields: elem.Fields,
|
ObjFields: elem.Fields,
|
||||||
Key: key,
|
Key: key,
|
||||||
ResourceVersion: resourceVersion,
|
ResourceVersion: resourceVersion,
|
||||||
|
RecordTime: w.clock.Now(),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := func() error {
|
if err := func() error {
|
||||||
@ -301,7 +332,8 @@ func (w *watchCache) processEvent(event watch.Event, resourceVersion uint64, upd
|
|||||||
|
|
||||||
// Assumes that lock is already held for write.
|
// Assumes that lock is already held for write.
|
||||||
func (w *watchCache) updateCache(event *watchCacheEvent) {
|
func (w *watchCache) updateCache(event *watchCacheEvent) {
|
||||||
if w.endIndex == w.startIndex+w.capacity {
|
w.resizeCacheLocked(event.RecordTime)
|
||||||
|
if w.isCacheFullLocked() {
|
||||||
// Cache is full - remove the oldest element.
|
// Cache is full - remove the oldest element.
|
||||||
w.startIndex++
|
w.startIndex++
|
||||||
}
|
}
|
||||||
@ -309,6 +341,48 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
|
|||||||
w.endIndex++
|
w.endIndex++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resizeCacheLocked resizes the cache if necessary:
|
||||||
|
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
|
||||||
|
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
|
||||||
|
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
|
||||||
|
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
|
||||||
|
capacity := min(w.capacity*2, w.upperBoundCapacity)
|
||||||
|
if capacity > w.capacity {
|
||||||
|
w.doCacheResizeLocked(capacity)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
|
||||||
|
capacity := max(w.capacity/2, w.lowerBoundCapacity)
|
||||||
|
if capacity < w.capacity {
|
||||||
|
w.doCacheResizeLocked(capacity)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isCacheFullLocked used to judge whether watchCacheEvent is full.
|
||||||
|
// Assumes that lock is already held for write.
|
||||||
|
func (w *watchCache) isCacheFullLocked() bool {
|
||||||
|
return w.endIndex == w.startIndex+w.capacity
|
||||||
|
}
|
||||||
|
|
||||||
|
// doCacheResizeLocked resize watchCache's event array with different capacity.
|
||||||
|
// Assumes that lock is already held for write.
|
||||||
|
func (w *watchCache) doCacheResizeLocked(capacity int) {
|
||||||
|
newCache := make([]*watchCacheEvent, capacity)
|
||||||
|
if capacity < w.capacity {
|
||||||
|
// adjust startIndex if cache capacity shrink.
|
||||||
|
w.startIndex = w.endIndex - capacity
|
||||||
|
}
|
||||||
|
for i := w.startIndex; i < w.endIndex; i++ {
|
||||||
|
newCache[i%capacity] = w.cache[i%w.capacity]
|
||||||
|
}
|
||||||
|
w.cache = newCache
|
||||||
|
recordsWatchCacheCapacityChange(w.objectType.String(), w.capacity, capacity)
|
||||||
|
w.capacity = capacity
|
||||||
|
}
|
||||||
|
|
||||||
// List returns list of pointers to <storeElement> objects.
|
// List returns list of pointers to <storeElement> objects.
|
||||||
func (w *watchCache) List() []interface{} {
|
func (w *watchCache) List() []interface{} {
|
||||||
return w.store.List()
|
return w.store.List()
|
||||||
|
@ -18,6 +18,7 @@ package cacher
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
@ -33,6 +34,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
@ -79,7 +81,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *watchCache {
|
|||||||
}
|
}
|
||||||
versioner := etcd3.APIObjectVersioner{}
|
versioner := etcd3.APIObjectVersioner{}
|
||||||
mockHandler := func(*watchCacheEvent) {}
|
mockHandler := func(*watchCacheEvent) {}
|
||||||
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers)
|
wc := newWatchCache(capacity, keyFunc, mockHandler, getAttrsFunc, versioner, indexers, reflect.TypeOf(&example.Pod{}))
|
||||||
wc.clock = clock.NewFakeClock(time.Now())
|
wc.clock = clock.NewFakeClock(time.Now())
|
||||||
return wc
|
return wc
|
||||||
}
|
}
|
||||||
@ -164,6 +166,10 @@ func TestWatchCacheBasic(t *testing.T) {
|
|||||||
func TestEvents(t *testing.T) {
|
func TestEvents(t *testing.T) {
|
||||||
store := newTestWatchCache(5, &cache.Indexers{})
|
store := newTestWatchCache(5, &cache.Indexers{})
|
||||||
|
|
||||||
|
// no dynamic-size cache to fit old tests.
|
||||||
|
store.lowerBoundCapacity = 5
|
||||||
|
store.upperBoundCapacity = 5
|
||||||
|
|
||||||
store.Add(makeTestPod("pod", 3))
|
store.Add(makeTestPod("pod", 3))
|
||||||
|
|
||||||
// Test for Added event.
|
// Test for Added event.
|
||||||
@ -501,3 +507,292 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDynamicCache(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
eventCount int
|
||||||
|
cacheCapacity int
|
||||||
|
startIndex int
|
||||||
|
// interval is time duration between adjacent events.
|
||||||
|
lowerBoundCapacity int
|
||||||
|
upperBoundCapacity int
|
||||||
|
interval time.Duration
|
||||||
|
expectCapacity int
|
||||||
|
expectStartIndex int
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration / 6,
|
||||||
|
expectCapacity: 10,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration / 4,
|
||||||
|
expectCapacity: 5,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration + time.Second,
|
||||||
|
expectCapacity: 2,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
lowerBoundCapacity: 3,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration + time.Second,
|
||||||
|
expectCapacity: 3,
|
||||||
|
expectStartIndex: 2,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 8,
|
||||||
|
interval: eventFreshDuration / 6,
|
||||||
|
expectCapacity: 8,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration / 6,
|
||||||
|
expectCapacity: 10,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration / 4,
|
||||||
|
expectCapacity: 5,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration + time.Second,
|
||||||
|
expectCapacity: 2,
|
||||||
|
expectStartIndex: 6,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 3,
|
||||||
|
upperBoundCapacity: 5 * 2,
|
||||||
|
interval: eventFreshDuration + time.Second,
|
||||||
|
expectCapacity: 3,
|
||||||
|
expectStartIndex: 5,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
|
||||||
|
eventCount: 5,
|
||||||
|
cacheCapacity: 5,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 5 / 2,
|
||||||
|
upperBoundCapacity: 8,
|
||||||
|
interval: eventFreshDuration / 6,
|
||||||
|
expectCapacity: 8,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration / 9,
|
||||||
|
expectCapacity: 16,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration / 8,
|
||||||
|
expectCapacity: 8,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration/2 + time.Second,
|
||||||
|
expectCapacity: 4,
|
||||||
|
expectStartIndex: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
lowerBoundCapacity: 7,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration/2 + time.Second,
|
||||||
|
expectCapacity: 7,
|
||||||
|
expectStartIndex: 1,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 10,
|
||||||
|
interval: eventFreshDuration / 9,
|
||||||
|
expectCapacity: 10,
|
||||||
|
expectStartIndex: 0,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration / 9,
|
||||||
|
expectCapacity: 16,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration / 8,
|
||||||
|
expectCapacity: 8,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration/2 + time.Second,
|
||||||
|
expectCapacity: 4,
|
||||||
|
expectStartIndex: 7,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 7,
|
||||||
|
upperBoundCapacity: 8 * 2,
|
||||||
|
interval: eventFreshDuration/2 + time.Second,
|
||||||
|
expectCapacity: 7,
|
||||||
|
expectStartIndex: 4,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
|
||||||
|
eventCount: 8,
|
||||||
|
cacheCapacity: 8,
|
||||||
|
startIndex: 3,
|
||||||
|
lowerBoundCapacity: 8 / 2,
|
||||||
|
upperBoundCapacity: 10,
|
||||||
|
interval: eventFreshDuration / 9,
|
||||||
|
expectCapacity: 10,
|
||||||
|
expectStartIndex: 3,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
|
||||||
|
store.cache = make([]*watchCacheEvent, test.cacheCapacity)
|
||||||
|
store.startIndex = test.startIndex
|
||||||
|
store.lowerBoundCapacity = test.lowerBoundCapacity
|
||||||
|
store.upperBoundCapacity = test.upperBoundCapacity
|
||||||
|
loadEventWithDuration(store, test.eventCount, test.interval)
|
||||||
|
nextInterval := store.clock.Now().Add(time.Duration(test.interval.Nanoseconds() * int64(test.eventCount)))
|
||||||
|
store.resizeCacheLocked(nextInterval)
|
||||||
|
if store.capacity != test.expectCapacity {
|
||||||
|
t.Errorf("expect capacity %d, but get %d", test.expectCapacity, store.capacity)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check cache's startIndex, endIndex and all elements.
|
||||||
|
if store.startIndex != test.expectStartIndex {
|
||||||
|
t.Errorf("expect startIndex %d, but get %d", test.expectStartIndex, store.startIndex)
|
||||||
|
}
|
||||||
|
if store.endIndex != test.startIndex+test.eventCount {
|
||||||
|
t.Errorf("expect endIndex %d get %d", test.startIndex+test.eventCount, store.endIndex)
|
||||||
|
}
|
||||||
|
if !checkCacheElements(store) {
|
||||||
|
t.Errorf("some elements locations in cache is wrong")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func loadEventWithDuration(cache *watchCache, count int, interval time.Duration) {
|
||||||
|
for i := 0; i < count; i++ {
|
||||||
|
event := &watchCacheEvent{
|
||||||
|
Key: fmt.Sprintf("event-%d", i+cache.startIndex),
|
||||||
|
RecordTime: cache.clock.Now().Add(time.Duration(interval.Nanoseconds() * int64(i))),
|
||||||
|
}
|
||||||
|
cache.cache[(i+cache.startIndex)%cache.capacity] = event
|
||||||
|
}
|
||||||
|
cache.endIndex = cache.startIndex + count
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkCacheElements(cache *watchCache) bool {
|
||||||
|
for i := cache.startIndex; i < cache.endIndex; i++ {
|
||||||
|
location := i % cache.capacity
|
||||||
|
if cache.cache[location].Key != fmt.Sprintf("event-%d", i) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func BenchmarkWatchCache_updateCache(b *testing.B) {
|
||||||
|
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
|
||||||
|
store.cache = store.cache[:0]
|
||||||
|
store.upperBoundCapacity = defaultUpperBoundCapacity
|
||||||
|
loadEventWithDuration(store, defaultUpperBoundCapacity, 0)
|
||||||
|
add := &watchCacheEvent{
|
||||||
|
Key: fmt.Sprintf("event-%d", defaultUpperBoundCapacity),
|
||||||
|
RecordTime: store.clock.Now(),
|
||||||
|
}
|
||||||
|
b.ResetTimer()
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
store.updateCache(add)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -443,9 +443,8 @@ func TestWatch(t *testing.T) {
|
|||||||
t.Fatalf("Expected no direct error, got %v", err)
|
t.Fatalf("Expected no direct error, got %v", err)
|
||||||
}
|
}
|
||||||
defer tooOldWatcher.Stop()
|
defer tooOldWatcher.Stop()
|
||||||
// Ensure we get a "Gone" error
|
// Events happens in eventFreshDuration, cache expand without event "Gone".
|
||||||
expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus
|
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
|
||||||
verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError)
|
|
||||||
|
|
||||||
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
|
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user