Merge pull request #129205 from tosi3k/wc-configurable-retention

Configure watch cache history window based on request timeout
This commit is contained in:
Kubernetes Prow Robot 2024-12-17 20:50:58 +01:00 committed by GitHub
commit 107be8fecc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 279 additions and 198 deletions

View File

@ -175,6 +175,7 @@ func TestAddFlags(t *testing.T) {
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
ReadycheckTimeout: storagebackend.DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.LeaseManagerConfig{

View File

@ -225,6 +225,16 @@ func (o *Options) Complete(ctx context.Context, fss cliflag.NamedFlagSets, alter
return CompletedOptions{}, fmt.Errorf("error creating self-signed certificates: %v", err)
}
if o.GenericServerRunOptions.RequestTimeout > 0 {
// Setting the EventsHistoryWindow as a maximum of the value set in the
// watchcache-specific options and the value of the request timeout plus
// some epsilon.
// This is done to make sure that the list+watch pattern can still be
// usable in large clusters with the elevated request timeout where the
// initial list can take a considerable amount of time.
completed.Etcd.StorageConfig.EventsHistoryWindow = max(completed.Etcd.StorageConfig.EventsHistoryWindow, completed.GenericServerRunOptions.RequestTimeout+15*time.Second)
}
if len(completed.GenericServerRunOptions.ExternalHost) == 0 {
if len(completed.GenericServerRunOptions.AdvertiseAddress) > 0 {
completed.GenericServerRunOptions.ExternalHost = completed.GenericServerRunOptions.AdvertiseAddress.String()

View File

@ -167,6 +167,7 @@ func TestAddFlags(t *testing.T) {
CompactionInterval: storagebackend.DefaultCompactInterval,
CountMetricPollPeriod: time.Minute,
DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval,
EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow,
HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout,
ReadycheckTimeout: storagebackend.DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.LeaseManagerConfig{

View File

@ -58,6 +58,7 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/options"
"k8s.io/apiserver/pkg/storage/cacher"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/util/webhook"
"k8s.io/client-go/tools/cache"
@ -484,6 +485,10 @@ func testHandlerConversion(t *testing.T, enableWatchCache bool) {
t.Fatal(err)
}
if enableWatchCache {
storageConfig.EventsHistoryWindow = cacher.DefaultEventFreshDuration
}
etcdOptions := options.NewEtcdOptions(storageConfig)
etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme
restOptionsGetter := generic.RESTOptions{

View File

@ -54,17 +54,18 @@ func StorageWithCacher() generic.StorageDecorator {
}
cacherConfig := cacherstorage.Config{
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: storageConfig.GroupResource,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
IndexerFuncs: triggerFuncs,
Indexers: indexers,
Codec: storageConfig.Codec,
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: storageConfig.GroupResource,
EventsHistoryWindow: storageConfig.EventsHistoryWindow,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewFunc: newFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
IndexerFuncs: triggerFuncs,
Indexers: indexers,
Codec: storageConfig.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig)
if err != nil {

View File

@ -2435,15 +2435,16 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE
}
if hasCacheEnabled {
config := cacherstorage.Config{
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,
NewFunc: newFunc,
NewListFunc: newListFunc,
Codec: sc.Codec,
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: cacherstorage.DefaultEventFreshDuration,
ResourcePrefix: podPrefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) },
GetAttrsFunc: getPodAttrs,
NewFunc: newFunc,
NewListFunc: newListFunc,
Codec: sc.Codec,
}
cacher, err := cacherstorage.NewCacherFromConfig(config)
if err != nil {

View File

@ -61,10 +61,16 @@ const (
// storageWatchListPageSize is the cacher's request chunk size of
// initial and resync watch lists to storage.
storageWatchListPageSize = int64(10000)
// DefaultEventFreshDuration is the default time duration of events
// we want to keep.
// We set it to defaultBookmarkFrequency plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second
// defaultBookmarkFrequency defines how frequently watch bookmarks should be send
// in addition to sending a bookmark right before watch deadline.
//
// NOTE: Update `eventFreshDuration` when changing this value.
defaultBookmarkFrequency = time.Minute
)
@ -80,6 +86,10 @@ type Config struct {
// and metrics.
GroupResource schema.GroupResource
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
// If lower than DefaultEventFreshDuration, the cache creation will fail.
EventsHistoryWindow time.Duration
// The Cache will be caching objects of a given Type and assumes that they
// are all stored under ResourcePrefix directory in the underlying database.
ResourcePrefix string
@ -409,9 +419,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) {
contextMetadata = metadata.New(map[string]string{"source": "cache"})
}
eventFreshDuration := config.EventsHistoryWindow
if eventFreshDuration < DefaultEventFreshDuration {
return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration)
}
progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata)
watchCache := newWatchCache(
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester)
config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers,
config.Clock, eventFreshDuration, config.GroupResource, progressRequester)
listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix

View File

@ -470,18 +470,19 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context
}
config := Config{
Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,
NewFunc: newPod,
NewListFunc: newPodList,
IndexerFuncs: setupOpts.indexerFuncs,
Indexers: &setupOpts.indexers,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock,
Storage: wrappedStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,
NewFunc: newPod,
NewListFunc: newPodList,
IndexerFuncs: setupOpts.indexerFuncs,
Indexers: &setupOpts.indexers,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock,
}
cacher, err := NewCacherFromConfig(config)
if err != nil {

View File

@ -62,11 +62,12 @@ import (
func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) {
prefix := "pods"
config := Config{
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
Storage: s,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*example.Pod)
if !ok {
@ -2722,17 +2723,18 @@ func TestWatchStreamSeparation(t *testing.T) {
setupOpts := &setupOptions{}
withDefaults(setupOpts)
config := Config{
Storage: etcdStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,
NewFunc: newPod,
NewListFunc: newPodList,
IndexerFuncs: setupOpts.indexerFuncs,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock,
Storage: etcdStorage,
Versioner: storage.APIObjectVersioner{},
GroupResource: schema.GroupResource{Resource: "pods"},
EventsHistoryWindow: DefaultEventFreshDuration,
ResourcePrefix: setupOpts.resourcePrefix,
KeyFunc: setupOpts.keyFunc,
GetAttrsFunc: GetPodAttrs,
NewFunc: newPod,
NewListFunc: newPodList,
IndexerFuncs: setupOpts.indexerFuncs,
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
Clock: setupOpts.clock,
}
tcs := []struct {
name string

View File

@ -52,17 +52,11 @@ const (
// after receiving a 'too high resource version' error.
resourceVersionTooHighRetrySeconds = 1
// eventFreshDuration is time duration of events we want to keep.
// We set it to `defaultBookmarkFrequency` plus epsilon to maximize
// chances that last bookmark was sent within kept history, at the
// same time, minimizing the needed memory usage.
eventFreshDuration = 75 * time.Second
// defaultLowerBoundCapacity is a default value for event cache capacity's lower bound.
// TODO: Figure out, to what value we can decreased it.
defaultLowerBoundCapacity = 100
// defaultUpperBoundCapacity should be able to keep eventFreshDuration of history.
// defaultUpperBoundCapacity should be able to keep the required history.
defaultUpperBoundCapacity = 100 * 1024
)
@ -142,6 +136,9 @@ type watchCache struct {
// for testing timeouts.
clock clock.Clock
// eventFreshDuration defines the minimum watch history watchcache will store.
eventFreshDuration time.Duration
// An underlying storage.Versioner.
versioner storage.Versioner
@ -163,6 +160,7 @@ func newWatchCache(
versioner storage.Versioner,
indexers *cache.Indexers,
clock clock.WithTicker,
eventFreshDuration time.Duration,
groupResource schema.GroupResource,
progressRequester *conditionalProgressRequester) *watchCache {
wc := &watchCache{
@ -179,6 +177,7 @@ func newWatchCache(
listResourceVersion: 0,
eventHandler: eventHandler,
clock: clock,
eventFreshDuration: eventFreshDuration,
versioner: versioner,
groupResource: groupResource,
waitingUntilFresh: progressRequester,
@ -319,14 +318,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) {
// - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration.
// - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping).
func (w *watchCache) resizeCacheLocked(eventTime time.Time) {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration {
capacity := min(w.capacity*2, w.upperBoundCapacity)
if capacity > w.capacity {
w.doCacheResizeLocked(capacity)
}
return
}
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration {
if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration {
capacity := max(w.capacity/2, w.lowerBoundCapacity)
if capacity < w.capacity {
w.doCacheResizeLocked(capacity)
@ -660,7 +659,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in
// We don't have an exact data, but given we store updates from
// the last <eventFreshDuration>, we approach it by dividing the
// capacity by the length of the history window.
chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds()))
chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds()))
// Finally we adjust the size to avoid ending with too low or
// to large values.

View File

@ -286,7 +286,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
wc := newTestWatchCache(capacity, &cache.Indexers{})
wc := newTestWatchCache(capacity, DefaultEventFreshDuration, &cache.Indexers{})
defer wc.Stop()
for i := 0; i < c.eventsAddedToWatchcache; i++ {
wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i)))

View File

@ -109,7 +109,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts
}
// newTestWatchCache just adds a fake clock.
func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers *cache.Indexers) *testWatchCache {
keyFunc := func(obj runtime.Object) (string, error) {
return storage.NamespaceKeyFunc("prefix", obj)
}
@ -127,7 +127,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache {
wc.stopCh = make(chan struct{})
pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil)
go pr.Run(wc.stopCh)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr)
wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr)
// To preserve behavior of tests that assume a given capacity,
// resize it to th expected size.
wc.capacity = capacity
@ -194,7 +194,7 @@ func (w *testWatchCache) Stop() {
}
func TestWatchCacheBasic(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// Test Add/Update/Delete.
@ -272,7 +272,7 @@ func TestWatchCacheBasic(t *testing.T) {
}
func TestEvents(t *testing.T) {
store := newTestWatchCache(5, &cache.Indexers{})
store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// no dynamic-size cache to fit old tests.
@ -397,7 +397,7 @@ func TestEvents(t *testing.T) {
}
func TestMarker(t *testing.T) {
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// First thing that is called when propagated from storage is Replace.
@ -434,7 +434,7 @@ func TestMarker(t *testing.T) {
func TestWaitUntilFreshAndList(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{
"l:label": func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
@ -537,7 +537,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)
forceRequestWatchProgressSupport(t)
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
go func() {
@ -563,7 +563,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
func TestWaitUntilFreshAndGet(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.
@ -606,7 +606,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache)
ctx := context.Background()
store := newTestWatchCache(3, &cache.Indexers{})
store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
fc := store.clock.(*testingclock.FakeClock)
@ -651,7 +651,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) {
func TestReflectorForWatchCache(t *testing.T) {
ctx := context.Background()
store := newTestWatchCache(5, &cache.Indexers{})
store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
{
@ -702,212 +702,212 @@ func TestDynamicCache(t *testing.T) {
expectStartIndex int
}{
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding",
name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity",
name: "[capacity not equals 4*n] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
interval: DefaultEventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 2,
},
{
name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 10,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration / 4,
interval: DefaultEventFreshDuration / 4,
expectCapacity: 5,
expectStartIndex: 3,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 2,
expectStartIndex: 6,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 3,
upperBoundCapacity: 5 * 2,
interval: eventFreshDuration + time.Second,
interval: DefaultEventFreshDuration + time.Second,
expectCapacity: 3,
expectStartIndex: 5,
},
{
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 5,
cacheCapacity: 5,
startIndex: 3,
lowerBoundCapacity: 5 / 2,
upperBoundCapacity: 8,
interval: eventFreshDuration / 6,
interval: DefaultEventFreshDuration / 6,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding",
name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity",
name: "[capacity equals 4*n] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
interval: DefaultEventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 1,
},
{
name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 0,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding",
name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 16,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity",
name: "[capacity equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration / 8,
interval: DefaultEventFreshDuration / 8,
expectCapacity: 8,
expectStartIndex: 3,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking",
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 4,
expectStartIndex: 7,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity",
name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 7,
upperBoundCapacity: 8 * 2,
interval: eventFreshDuration/2 + time.Second,
interval: DefaultEventFreshDuration/2 + time.Second,
expectCapacity: 7,
expectStartIndex: 4,
},
{
name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity",
name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity",
eventCount: 8,
cacheCapacity: 8,
startIndex: 3,
lowerBoundCapacity: 8 / 2,
upperBoundCapacity: 10,
interval: eventFreshDuration / 9,
interval: DefaultEventFreshDuration / 9,
expectCapacity: 10,
expectStartIndex: 3,
},
@ -915,7 +915,7 @@ func TestDynamicCache(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{})
store := newTestWatchCache(test.cacheCapacity, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
store.cache = make([]*watchCacheEvent, test.cacheCapacity)
store.startIndex = test.startIndex
@ -964,7 +964,7 @@ func checkCacheElements(cache *testWatchCache) bool {
}
func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
now := store.clock.Now()
@ -983,7 +983,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
addEvent("key1", 20, now)
// Force "key1" to rotate our of cache.
later := now.Add(2 * eventFreshDuration)
later := now.Add(2 * DefaultEventFreshDuration)
addEvent("key2", 30, later)
addEvent("key3", 40, later)
@ -998,122 +998,162 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
func TestSuggestedWatchChannelSize(t *testing.T) {
testCases := []struct {
name string
capacity int
indexExists bool
triggerUsed bool
expected int
name string
capacity int
indexExists bool
triggerUsed bool
eventsFreshDuration time.Duration
expected int
}{
{
name: "capacity=100, indexExists, triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: true,
expected: 10,
name: "capacity=100, indexExists, triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=100, indexExists, !triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: false,
expected: 10,
name: "capacity=100, indexExists, !triggerUsed",
capacity: 100,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=100, !indexExists",
capacity: 100,
indexExists: false,
triggerUsed: false,
expected: 10,
name: "capacity=100, !indexExists",
capacity: 100,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=750, indexExists, triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: true,
expected: 10,
name: "capacity=750, indexExists, triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=750, indexExists, !triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: false,
expected: 10,
name: "capacity=750, indexExists, !triggerUsed",
capacity: 750,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=750, !indexExists",
capacity: 750,
indexExists: false,
triggerUsed: false,
expected: 10,
name: "capacity=750, !indexExists",
capacity: 750,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=7500, indexExists, triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: true,
expected: 10,
name: "capacity=7500, indexExists, triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=7500, indexExists, !triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: false,
expected: 100,
name: "capacity=7500, indexExists, !triggerUsed",
capacity: 7500,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
name: "capacity=7500, !indexExists",
capacity: 7500,
indexExists: false,
triggerUsed: false,
expected: 100,
name: "capacity=7500, indexExists, !triggerUsed, eventsFreshDuration=2m30s",
capacity: 7500,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 50,
},
{
name: "capacity=75000, indexExists, triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: true,
expected: 10,
name: "capacity=7500, !indexExists",
capacity: 7500,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
name: "capacity=75000, indexExists, !triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: false,
expected: 1000,
name: "capacity=7500, !indexExists, eventsFreshDuration=2m30s",
capacity: 7500,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 50,
},
{
name: "capacity=75000, !indexExists",
capacity: 75000,
indexExists: false,
triggerUsed: false,
expected: 100,
name: "capacity=75000, indexExists, triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=750000, indexExists, triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: true,
expected: 10,
name: "capacity=75000, indexExists, !triggerUsed",
capacity: 75000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 1000,
},
{
name: "capacity=750000, indexExists, !triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: false,
expected: 1000,
name: "capacity=75000, indexExists, !triggerUsed, eventsFreshDuration=2m30s",
capacity: 75000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: 2 * DefaultEventFreshDuration,
expected: 500,
},
{
name: "capacity=750000, !indexExists",
capacity: 750000,
indexExists: false,
triggerUsed: false,
expected: 100,
name: "capacity=75000, !indexExists",
capacity: 75000,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
{
name: "capacity=750000, indexExists, triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: true,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 10,
},
{
name: "capacity=750000, indexExists, !triggerUsed",
capacity: 750000,
indexExists: true,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 1000,
},
{
name: "capacity=750000, !indexExists",
capacity: 750000,
indexExists: false,
triggerUsed: false,
eventsFreshDuration: DefaultEventFreshDuration,
expected: 100,
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
store := newTestWatchCache(test.capacity, &cache.Indexers{})
store := newTestWatchCache(test.capacity, test.eventsFreshDuration, &cache.Indexers{})
defer store.Stop()
got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed)
if got != test.expected {
@ -1124,7 +1164,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) {
}
func BenchmarkWatchCache_updateCache(b *testing.B) {
store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{})
store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
store.cache = store.cache[:0]
store.upperBoundCapacity = defaultUpperBoundCapacity
@ -1146,7 +1186,7 @@ func TestHistogramCacheReadWait(t *testing.T) {
}
ctx := context.Background()
testedMetrics := "apiserver_watch_cache_read_wait_seconds"
store := newTestWatchCache(2, &cache.Indexers{})
store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{})
defer store.Stop()
// In background, update the store.

View File

@ -37,6 +37,7 @@ const (
DefaultCompactInterval = 5 * time.Minute
DefaultDBMetricPollInterval = 30 * time.Second
DefaultEventsHistoryWindow = 75 * time.Second
DefaultHealthcheckTimeout = 2 * time.Second
DefaultReadinessTimeout = 2 * time.Second
)
@ -80,6 +81,8 @@ type Config struct {
CountMetricPollPeriod time.Duration
// DBMetricPollInterval specifies how often should storage backend metric be updated.
DBMetricPollInterval time.Duration
// EventsHistoryWindow specifies minimum history duration that storage is keeping.
EventsHistoryWindow time.Duration
// HealthcheckTimeout specifies the timeout used when checking health
HealthcheckTimeout time.Duration
// ReadycheckTimeout specifies the timeout used when checking readiness
@ -115,6 +118,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
Codec: codec,
CompactionInterval: DefaultCompactInterval,
DBMetricPollInterval: DefaultDBMetricPollInterval,
EventsHistoryWindow: DefaultEventsHistoryWindow,
HealthcheckTimeout: DefaultHealthcheckTimeout,
ReadycheckTimeout: DefaultReadinessTimeout,
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),