From 4a2b7ee5699331df31b7483be082c201a1e7f51f Mon Sep 17 00:00:00 2001 From: Antoni Zawodny Date: Fri, 13 Dec 2024 15:40:03 +0100 Subject: [PATCH] Configure watch cache history window based on request timeout --- .../app/options/options_test.go | 1 + pkg/controlplane/apiserver/options/options.go | 10 + .../apiserver/options/options_test.go | 1 + .../apiserver/customresource_handler_test.go | 5 + .../generic/registry/storage_factory.go | 23 +- .../registry/generic/registry/store_test.go | 19 +- .../apiserver/pkg/storage/cacher/cacher.go | 22 +- .../pkg/storage/cacher/cacher_test.go | 25 +- .../storage/cacher/cacher_whitebox_test.go | 34 +- .../pkg/storage/cacher/watch_cache.go | 19 +- .../cacher/watch_cache_interval_test.go | 2 +- .../pkg/storage/cacher/watch_cache_test.go | 312 ++++++++++-------- .../pkg/storage/storagebackend/config.go | 4 + 13 files changed, 279 insertions(+), 198 deletions(-) diff --git a/cmd/kube-apiserver/app/options/options_test.go b/cmd/kube-apiserver/app/options/options_test.go index 4da997c84bd..ad148e10434 100644 --- a/cmd/kube-apiserver/app/options/options_test.go +++ b/cmd/kube-apiserver/app/options/options_test.go @@ -175,6 +175,7 @@ func TestAddFlags(t *testing.T) { CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow, HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout, ReadycheckTimeout: storagebackend.DefaultReadinessTimeout, LeaseManagerConfig: etcd3.LeaseManagerConfig{ diff --git a/pkg/controlplane/apiserver/options/options.go b/pkg/controlplane/apiserver/options/options.go index 2266be0cbed..026ffc59caf 100644 --- a/pkg/controlplane/apiserver/options/options.go +++ b/pkg/controlplane/apiserver/options/options.go @@ -225,6 +225,16 @@ func (o *Options) Complete(ctx context.Context, fss cliflag.NamedFlagSets, alter return CompletedOptions{}, fmt.Errorf("error creating self-signed certificates: %v", err) } + if o.GenericServerRunOptions.RequestTimeout > 0 { + // Setting the EventsHistoryWindow as a maximum of the value set in the + // watchcache-specific options and the value of the request timeout plus + // some epsilon. + // This is done to make sure that the list+watch pattern can still be + // usable in large clusters with the elevated request timeout where the + // initial list can take a considerable amount of time. + completed.Etcd.StorageConfig.EventsHistoryWindow = max(completed.Etcd.StorageConfig.EventsHistoryWindow, completed.GenericServerRunOptions.RequestTimeout+15*time.Second) + } + if len(completed.GenericServerRunOptions.ExternalHost) == 0 { if len(completed.GenericServerRunOptions.AdvertiseAddress) > 0 { completed.GenericServerRunOptions.ExternalHost = completed.GenericServerRunOptions.AdvertiseAddress.String() diff --git a/pkg/controlplane/apiserver/options/options_test.go b/pkg/controlplane/apiserver/options/options_test.go index 68b13d5bc79..9fa3f0abfa3 100644 --- a/pkg/controlplane/apiserver/options/options_test.go +++ b/pkg/controlplane/apiserver/options/options_test.go @@ -167,6 +167,7 @@ func TestAddFlags(t *testing.T) { CompactionInterval: storagebackend.DefaultCompactInterval, CountMetricPollPeriod: time.Minute, DBMetricPollInterval: storagebackend.DefaultDBMetricPollInterval, + EventsHistoryWindow: storagebackend.DefaultEventsHistoryWindow, HealthcheckTimeout: storagebackend.DefaultHealthcheckTimeout, ReadycheckTimeout: storagebackend.DefaultReadinessTimeout, LeaseManagerConfig: etcd3.LeaseManagerConfig{ diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go index 63919e60e77..3e77fe7197a 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_handler_test.go @@ -58,6 +58,7 @@ import ( genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/server/options" + "k8s.io/apiserver/pkg/storage/cacher" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/util/webhook" "k8s.io/client-go/tools/cache" @@ -484,6 +485,10 @@ func testHandlerConversion(t *testing.T, enableWatchCache bool) { t.Fatal(err) } + if enableWatchCache { + storageConfig.EventsHistoryWindow = cacher.DefaultEventFreshDuration + } + etcdOptions := options.NewEtcdOptions(storageConfig) etcdOptions.StorageConfig.Codec = unstructured.UnstructuredJSONScheme restOptionsGetter := generic.RESTOptions{ diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go index 3c974f3981e..63fb20bcb86 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go @@ -54,17 +54,18 @@ func StorageWithCacher() generic.StorageDecorator { } cacherConfig := cacherstorage.Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: storageConfig.GroupResource, - ResourcePrefix: resourcePrefix, - KeyFunc: keyFunc, - NewFunc: newFunc, - NewListFunc: newListFunc, - GetAttrsFunc: getAttrsFunc, - IndexerFuncs: triggerFuncs, - Indexers: indexers, - Codec: storageConfig.Codec, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: storageConfig.GroupResource, + EventsHistoryWindow: storageConfig.EventsHistoryWindow, + ResourcePrefix: resourcePrefix, + KeyFunc: keyFunc, + NewFunc: newFunc, + NewListFunc: newListFunc, + GetAttrsFunc: getAttrsFunc, + IndexerFuncs: triggerFuncs, + Indexers: indexers, + Codec: storageConfig.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(cacherConfig) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go index e11cf8d3de7..4127342e3d0 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store_test.go @@ -2435,15 +2435,16 @@ func newTestGenericStoreRegistry(t *testing.T, scheme *runtime.Scheme, hasCacheE } if hasCacheEnabled { config := cacherstorage.Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: podPrefix, - KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, - GetAttrsFunc: getPodAttrs, - NewFunc: newFunc, - NewListFunc: newListFunc, - Codec: sc.Codec, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: cacherstorage.DefaultEventFreshDuration, + ResourcePrefix: podPrefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NoNamespaceKeyFunc(podPrefix, obj) }, + GetAttrsFunc: getPodAttrs, + NewFunc: newFunc, + NewListFunc: newListFunc, + Codec: sc.Codec, } cacher, err := cacherstorage.NewCacherFromConfig(config) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index a5b5506dcf2..e6f01384c9c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -61,10 +61,16 @@ const ( // storageWatchListPageSize is the cacher's request chunk size of // initial and resync watch lists to storage. storageWatchListPageSize = int64(10000) + + // DefaultEventFreshDuration is the default time duration of events + // we want to keep. + // We set it to defaultBookmarkFrequency plus epsilon to maximize + // chances that last bookmark was sent within kept history, at the + // same time, minimizing the needed memory usage. + DefaultEventFreshDuration = defaultBookmarkFrequency + 15*time.Second + // defaultBookmarkFrequency defines how frequently watch bookmarks should be send // in addition to sending a bookmark right before watch deadline. - // - // NOTE: Update `eventFreshDuration` when changing this value. defaultBookmarkFrequency = time.Minute ) @@ -80,6 +86,10 @@ type Config struct { // and metrics. GroupResource schema.GroupResource + // EventsHistoryWindow specifies minimum history duration that storage is keeping. + // If lower than DefaultEventFreshDuration, the cache creation will fail. + EventsHistoryWindow time.Duration + // The Cache will be caching objects of a given Type and assumes that they // are all stored under ResourcePrefix directory in the underlying database. ResourcePrefix string @@ -409,9 +419,15 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { contextMetadata = metadata.New(map[string]string{"source": "cache"}) } + eventFreshDuration := config.EventsHistoryWindow + if eventFreshDuration < DefaultEventFreshDuration { + return nil, fmt.Errorf("config.EventsHistoryWindow (%v) must not be below %v", eventFreshDuration, DefaultEventFreshDuration) + } + progressRequester := newConditionalProgressRequester(config.Storage.RequestWatchProgress, config.Clock, contextMetadata) watchCache := newWatchCache( - config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, config.Clock, config.GroupResource, progressRequester) + config.KeyFunc, cacher.processEvent, config.GetAttrsFunc, config.Versioner, config.Indexers, + config.Clock, eventFreshDuration, config.GroupResource, progressRequester) listerWatcher := NewListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc, contextMetadata) reflectorName := "storage/cacher.go:" + config.ResourcePrefix diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 4168a1b4ff7..811d3ea247c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -470,18 +470,19 @@ func testSetupWithEtcdServer(t testing.TB, opts ...setupOption) (context.Context } config := Config{ - Storage: wrappedStorage, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: setupOpts.resourcePrefix, - KeyFunc: setupOpts.keyFunc, - GetAttrsFunc: GetPodAttrs, - NewFunc: newPod, - NewListFunc: newPodList, - IndexerFuncs: setupOpts.indexerFuncs, - Indexers: &setupOpts.indexers, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: setupOpts.clock, + Storage: wrappedStorage, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: setupOpts.resourcePrefix, + KeyFunc: setupOpts.keyFunc, + GetAttrsFunc: GetPodAttrs, + NewFunc: newPod, + NewListFunc: newPodList, + IndexerFuncs: setupOpts.indexerFuncs, + Indexers: &setupOpts.indexers, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: setupOpts.clock, } cacher, err := NewCacherFromConfig(config) if err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f01c2bcc926..14c22015f4e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -62,11 +62,12 @@ import ( func newTestCacherWithoutSyncing(s storage.Interface) (*Cacher, storage.Versioner, error) { prefix := "pods" config := Config{ - Storage: s, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: prefix, - KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, + Storage: s, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod, ok := obj.(*example.Pod) if !ok { @@ -2722,17 +2723,18 @@ func TestWatchStreamSeparation(t *testing.T) { setupOpts := &setupOptions{} withDefaults(setupOpts) config := Config{ - Storage: etcdStorage, - Versioner: storage.APIObjectVersioner{}, - GroupResource: schema.GroupResource{Resource: "pods"}, - ResourcePrefix: setupOpts.resourcePrefix, - KeyFunc: setupOpts.keyFunc, - GetAttrsFunc: GetPodAttrs, - NewFunc: newPod, - NewListFunc: newPodList, - IndexerFuncs: setupOpts.indexerFuncs, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: setupOpts.clock, + Storage: etcdStorage, + Versioner: storage.APIObjectVersioner{}, + GroupResource: schema.GroupResource{Resource: "pods"}, + EventsHistoryWindow: DefaultEventFreshDuration, + ResourcePrefix: setupOpts.resourcePrefix, + KeyFunc: setupOpts.keyFunc, + GetAttrsFunc: GetPodAttrs, + NewFunc: newPod, + NewListFunc: newPodList, + IndexerFuncs: setupOpts.indexerFuncs, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: setupOpts.clock, } tcs := []struct { name string diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 541988b31fd..ae1506d033b 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -52,17 +52,11 @@ const ( // after receiving a 'too high resource version' error. resourceVersionTooHighRetrySeconds = 1 - // eventFreshDuration is time duration of events we want to keep. - // We set it to `defaultBookmarkFrequency` plus epsilon to maximize - // chances that last bookmark was sent within kept history, at the - // same time, minimizing the needed memory usage. - eventFreshDuration = 75 * time.Second - // defaultLowerBoundCapacity is a default value for event cache capacity's lower bound. // TODO: Figure out, to what value we can decreased it. defaultLowerBoundCapacity = 100 - // defaultUpperBoundCapacity should be able to keep eventFreshDuration of history. + // defaultUpperBoundCapacity should be able to keep the required history. defaultUpperBoundCapacity = 100 * 1024 ) @@ -142,6 +136,9 @@ type watchCache struct { // for testing timeouts. clock clock.Clock + // eventFreshDuration defines the minimum watch history watchcache will store. + eventFreshDuration time.Duration + // An underlying storage.Versioner. versioner storage.Versioner @@ -163,6 +160,7 @@ func newWatchCache( versioner storage.Versioner, indexers *cache.Indexers, clock clock.WithTicker, + eventFreshDuration time.Duration, groupResource schema.GroupResource, progressRequester *conditionalProgressRequester) *watchCache { wc := &watchCache{ @@ -179,6 +177,7 @@ func newWatchCache( listResourceVersion: 0, eventHandler: eventHandler, clock: clock, + eventFreshDuration: eventFreshDuration, versioner: versioner, groupResource: groupResource, waitingUntilFresh: progressRequester, @@ -319,14 +318,14 @@ func (w *watchCache) updateCache(event *watchCacheEvent) { // - increases capacity by 2x if cache is full and all cached events occurred within last eventFreshDuration. // - decreases capacity by 2x when recent quarter of events occurred outside of eventFreshDuration(protect watchCache from flapping). func (w *watchCache) resizeCacheLocked(eventTime time.Time) { - if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < eventFreshDuration { + if w.isCacheFullLocked() && eventTime.Sub(w.cache[w.startIndex%w.capacity].RecordTime) < w.eventFreshDuration { capacity := min(w.capacity*2, w.upperBoundCapacity) if capacity > w.capacity { w.doCacheResizeLocked(capacity) } return } - if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > eventFreshDuration { + if w.isCacheFullLocked() && eventTime.Sub(w.cache[(w.endIndex-w.capacity/4)%w.capacity].RecordTime) > w.eventFreshDuration { capacity := max(w.capacity/2, w.lowerBoundCapacity) if capacity < w.capacity { w.doCacheResizeLocked(capacity) @@ -660,7 +659,7 @@ func (w *watchCache) suggestedWatchChannelSize(indexExists, triggerUsed bool) in // We don't have an exact data, but given we store updates from // the last , we approach it by dividing the // capacity by the length of the history window. - chanSize := int(math.Ceil(float64(w.currentCapacity()) / eventFreshDuration.Seconds())) + chanSize := int(math.Ceil(float64(w.currentCapacity()) / w.eventFreshDuration.Seconds())) // Finally we adjust the size to avoid ending with too low or // to large values. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go index 487a5ac1e8d..3a873053323 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval_test.go @@ -286,7 +286,7 @@ func TestCacheIntervalNextFromWatchCache(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - wc := newTestWatchCache(capacity, &cache.Indexers{}) + wc := newTestWatchCache(capacity, DefaultEventFreshDuration, &cache.Indexers{}) defer wc.Stop() for i := 0; i < c.eventsAddedToWatchcache; i++ { wc.Add(makeTestPod(fmt.Sprintf("pod%d", i), uint64(i))) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 27f695014ca..221767e63a8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -109,7 +109,7 @@ func (w *testWatchCache) getCacheIntervalForEvents(resourceVersion uint64, opts } // newTestWatchCache just adds a fake clock. -func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { +func newTestWatchCache(capacity int, eventFreshDuration time.Duration, indexers *cache.Indexers) *testWatchCache { keyFunc := func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc("prefix", obj) } @@ -127,7 +127,7 @@ func newTestWatchCache(capacity int, indexers *cache.Indexers) *testWatchCache { wc.stopCh = make(chan struct{}) pr := newConditionalProgressRequester(wc.RequestWatchProgress, &immediateTickerFactory{}, nil) go pr.Run(wc.stopCh) - wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), schema.GroupResource{Resource: "pods"}, pr) + wc.watchCache = newWatchCache(keyFunc, mockHandler, getAttrsFunc, versioner, indexers, testingclock.NewFakeClock(time.Now()), eventFreshDuration, schema.GroupResource{Resource: "pods"}, pr) // To preserve behavior of tests that assume a given capacity, // resize it to th expected size. wc.capacity = capacity @@ -194,7 +194,7 @@ func (w *testWatchCache) Stop() { } func TestWatchCacheBasic(t *testing.T) { - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // Test Add/Update/Delete. @@ -272,7 +272,7 @@ func TestWatchCacheBasic(t *testing.T) { } func TestEvents(t *testing.T) { - store := newTestWatchCache(5, &cache.Indexers{}) + store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // no dynamic-size cache to fit old tests. @@ -397,7 +397,7 @@ func TestEvents(t *testing.T) { } func TestMarker(t *testing.T) { - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // First thing that is called when propagated from storage is Replace. @@ -434,7 +434,7 @@ func TestMarker(t *testing.T) { func TestWaitUntilFreshAndList(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{ + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{ "l:label": func(obj interface{}) ([]string, error) { pod, ok := obj.(*v1.Pod) if !ok { @@ -537,7 +537,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true) forceRequestWatchProgressSupport(t) ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. go func() { @@ -563,7 +563,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { func TestWaitUntilFreshAndGet(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. @@ -606,7 +606,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { t.Run(tc.name, func(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, tc.ConsistentListFromCache) ctx := context.Background() - store := newTestWatchCache(3, &cache.Indexers{}) + store := newTestWatchCache(3, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() fc := store.clock.(*testingclock.FakeClock) @@ -651,7 +651,7 @@ func (t *testLW) Watch(options metav1.ListOptions) (watch.Interface, error) { func TestReflectorForWatchCache(t *testing.T) { ctx := context.Background() - store := newTestWatchCache(5, &cache.Indexers{}) + store := newTestWatchCache(5, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() { @@ -702,212 +702,212 @@ func TestDynamicCache(t *testing.T) { expectStartIndex int }{ { - name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding", + name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] events outside eventFreshDuration without change cache capacity", + name: "[capacity not equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 4, + interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity not equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 2, }, { - name: "[capacity not equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity not equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 0, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 10, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration / 4, + interval: DefaultEventFreshDuration / 4, expectCapacity: 5, expectStartIndex: 3, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 2, expectStartIndex: 6, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 3, upperBoundCapacity: 5 * 2, - interval: eventFreshDuration + time.Second, + interval: DefaultEventFreshDuration + time.Second, expectCapacity: 3, expectStartIndex: 5, }, { - name: "[capacity not equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity not equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 5, cacheCapacity: 5, startIndex: 3, lowerBoundCapacity: 5 / 2, upperBoundCapacity: 8, - interval: eventFreshDuration / 6, + interval: DefaultEventFreshDuration / 6, expectCapacity: 8, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding", + name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] events outside eventFreshDuration without change cache capacity", + name: "[capacity equals 4*n] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 8, + interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 4, }, { - name: "[capacity equals 4*n] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity equals 4*n] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 1, }, { - name: "[capacity equals 4*n] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity equals 4*n] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 0, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding", + name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 16, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events outside eventFreshDuration without change cache capacity", + name: "[capacity equals 4*n] [startIndex not equal 0] events outside DefaultEventFreshDuration without change cache capacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration / 8, + interval: DefaultEventFreshDuration / 8, expectCapacity: 8, expectStartIndex: 3, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking", + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 4, expectStartIndex: 7, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside eventFreshDuration cause cache shrinking with given lowerBoundCapacity", + name: "[capacity equals 4*n] [startIndex not equal 0] quarter of recent events outside DefaultEventFreshDuration cause cache shrinking with given lowerBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 7, upperBoundCapacity: 8 * 2, - interval: eventFreshDuration/2 + time.Second, + interval: DefaultEventFreshDuration/2 + time.Second, expectCapacity: 7, expectStartIndex: 4, }, { - name: "[capacity equals 4*n] [startIndex not equal 0] events inside eventFreshDuration cause cache expanding with given upperBoundCapacity", + name: "[capacity equals 4*n] [startIndex not equal 0] events inside DefaultEventFreshDuration cause cache expanding with given upperBoundCapacity", eventCount: 8, cacheCapacity: 8, startIndex: 3, lowerBoundCapacity: 8 / 2, upperBoundCapacity: 10, - interval: eventFreshDuration / 9, + interval: DefaultEventFreshDuration / 9, expectCapacity: 10, expectStartIndex: 3, }, @@ -915,7 +915,7 @@ func TestDynamicCache(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - store := newTestWatchCache(test.cacheCapacity, &cache.Indexers{}) + store := newTestWatchCache(test.cacheCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = make([]*watchCacheEvent, test.cacheCapacity) store.startIndex = test.startIndex @@ -964,7 +964,7 @@ func checkCacheElements(cache *testWatchCache) bool { } func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() now := store.clock.Now() @@ -983,7 +983,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { addEvent("key1", 20, now) // Force "key1" to rotate our of cache. - later := now.Add(2 * eventFreshDuration) + later := now.Add(2 * DefaultEventFreshDuration) addEvent("key2", 30, later) addEvent("key3", 40, later) @@ -998,122 +998,162 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { func TestSuggestedWatchChannelSize(t *testing.T) { testCases := []struct { - name string - capacity int - indexExists bool - triggerUsed bool - expected int + name string + capacity int + indexExists bool + triggerUsed bool + eventsFreshDuration time.Duration + expected int }{ { - name: "capacity=100, indexExists, triggerUsed", - capacity: 100, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=100, indexExists, triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=100, indexExists, !triggerUsed", - capacity: 100, - indexExists: true, - triggerUsed: false, - expected: 10, + name: "capacity=100, indexExists, !triggerUsed", + capacity: 100, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=100, !indexExists", - capacity: 100, - indexExists: false, - triggerUsed: false, - expected: 10, + name: "capacity=100, !indexExists", + capacity: 100, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, indexExists, triggerUsed", - capacity: 750, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=750, indexExists, triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, indexExists, !triggerUsed", - capacity: 750, - indexExists: true, - triggerUsed: false, - expected: 10, + name: "capacity=750, indexExists, !triggerUsed", + capacity: 750, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750, !indexExists", - capacity: 750, - indexExists: false, - triggerUsed: false, - expected: 10, + name: "capacity=750, !indexExists", + capacity: 750, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=7500, indexExists, triggerUsed", - capacity: 7500, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=7500, indexExists, triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=7500, indexExists, !triggerUsed", - capacity: 7500, - indexExists: true, - triggerUsed: false, - expected: 100, + name: "capacity=7500, indexExists, !triggerUsed", + capacity: 7500, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, { - name: "capacity=7500, !indexExists", - capacity: 7500, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=7500, indexExists, !triggerUsed, eventsFreshDuration=2m30s", + capacity: 7500, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 50, }, { - name: "capacity=75000, indexExists, triggerUsed", - capacity: 75000, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=7500, !indexExists", + capacity: 7500, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, { - name: "capacity=75000, indexExists, !triggerUsed", - capacity: 75000, - indexExists: true, - triggerUsed: false, - expected: 1000, + name: "capacity=7500, !indexExists, eventsFreshDuration=2m30s", + capacity: 7500, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 50, }, { - name: "capacity=75000, !indexExists", - capacity: 75000, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=75000, indexExists, triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, }, { - name: "capacity=750000, indexExists, triggerUsed", - capacity: 750000, - indexExists: true, - triggerUsed: true, - expected: 10, + name: "capacity=75000, indexExists, !triggerUsed", + capacity: 75000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 1000, }, { - name: "capacity=750000, indexExists, !triggerUsed", - capacity: 750000, - indexExists: true, - triggerUsed: false, - expected: 1000, + name: "capacity=75000, indexExists, !triggerUsed, eventsFreshDuration=2m30s", + capacity: 75000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: 2 * DefaultEventFreshDuration, + expected: 500, }, { - name: "capacity=750000, !indexExists", - capacity: 750000, - indexExists: false, - triggerUsed: false, - expected: 100, + name: "capacity=75000, !indexExists", + capacity: 75000, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, + }, + { + name: "capacity=750000, indexExists, triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: true, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 10, + }, + { + name: "capacity=750000, indexExists, !triggerUsed", + capacity: 750000, + indexExists: true, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 1000, + }, + { + name: "capacity=750000, !indexExists", + capacity: 750000, + indexExists: false, + triggerUsed: false, + eventsFreshDuration: DefaultEventFreshDuration, + expected: 100, }, } for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - store := newTestWatchCache(test.capacity, &cache.Indexers{}) + store := newTestWatchCache(test.capacity, test.eventsFreshDuration, &cache.Indexers{}) defer store.Stop() got := store.suggestedWatchChannelSize(test.indexExists, test.triggerUsed) if got != test.expected { @@ -1124,7 +1164,7 @@ func TestSuggestedWatchChannelSize(t *testing.T) { } func BenchmarkWatchCache_updateCache(b *testing.B) { - store := newTestWatchCache(defaultUpperBoundCapacity, &cache.Indexers{}) + store := newTestWatchCache(defaultUpperBoundCapacity, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() store.cache = store.cache[:0] store.upperBoundCapacity = defaultUpperBoundCapacity @@ -1146,7 +1186,7 @@ func TestHistogramCacheReadWait(t *testing.T) { } ctx := context.Background() testedMetrics := "apiserver_watch_cache_read_wait_seconds" - store := newTestWatchCache(2, &cache.Indexers{}) + store := newTestWatchCache(2, DefaultEventFreshDuration, &cache.Indexers{}) defer store.Stop() // In background, update the store. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go index 822470778df..c948d641164 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/config.go @@ -37,6 +37,7 @@ const ( DefaultCompactInterval = 5 * time.Minute DefaultDBMetricPollInterval = 30 * time.Second + DefaultEventsHistoryWindow = 75 * time.Second DefaultHealthcheckTimeout = 2 * time.Second DefaultReadinessTimeout = 2 * time.Second ) @@ -80,6 +81,8 @@ type Config struct { CountMetricPollPeriod time.Duration // DBMetricPollInterval specifies how often should storage backend metric be updated. DBMetricPollInterval time.Duration + // EventsHistoryWindow specifies minimum history duration that storage is keeping. + EventsHistoryWindow time.Duration // HealthcheckTimeout specifies the timeout used when checking health HealthcheckTimeout time.Duration // ReadycheckTimeout specifies the timeout used when checking readiness @@ -115,6 +118,7 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config { Codec: codec, CompactionInterval: DefaultCompactInterval, DBMetricPollInterval: DefaultDBMetricPollInterval, + EventsHistoryWindow: DefaultEventsHistoryWindow, HealthcheckTimeout: DefaultHealthcheckTimeout, ReadycheckTimeout: DefaultReadinessTimeout, LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),