From df17ea2e37d1b36dcda10fe20de5484c41c615b5 Mon Sep 17 00:00:00 2001 From: Paco Xu Date: Wed, 26 Jun 2024 11:51:12 +0800 Subject: [PATCH] Revert "apiserver/storage/cacher: consistent read from cache supports limit" --- .../apiserver/pkg/storage/cacher/cacher.go | 39 ++---------- .../storage/cacher/cacher_whitebox_test.go | 60 +------------------ .../request/list_work_estimator.go | 8 +-- 3 files changed, 11 insertions(+), 96 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1f0058237dc..70ccbad200d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -768,26 +768,12 @@ func shouldDelegateList(opts storage.ListOptions) bool { consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(pred.Continue) > 0 + // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. + hasLimit := pred.Limit > 0 && resourceVersion != "0" // Watch cache only supports ResourceVersionMatchNotOlderThan (default). - // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0" - unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan - return consistentReadFromStorage || hasContinuation || unsupportedMatch -} - -// computeListLimit determines whether the cacher should -// apply a limit to an incoming LIST request and returns its value. -// -// note that this function doesn't check RVM nor the Continuation token. -// these parameters are validated by the shouldDelegateList function. -// -// as of today, the limit is ignored for requests that set RV == 0 -func computeListLimit(opts storage.ListOptions) int64 { - if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" { - return 0 - } - return opts.Predicate.Limit + return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch } func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { @@ -897,21 +883,13 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio // the elements in ListObject are Struct type, making slice will bring excessive memory consumption. // so we try to delay this action as much as possible var selectedObjects []runtime.Object - var lastSelectedObjectKey string - var hasMoreListItems bool - limit := computeListLimit(opts) - for i, obj := range objs { + for _, obj := range objs { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } if filter(elem.Key, elem.Labels, elem.Fields) { selectedObjects = append(selectedObjects, elem.Object) - lastSelectedObjectKey = elem.Key - } - if limit > 0 && int64(len(selectedObjects)) >= limit { - hasMoreListItems = i < len(objs)-1 - break } } if len(selectedObjects) == 0 { @@ -927,12 +905,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio } span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) if c.versioner != nil { - continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts) - if err != nil { - return err - } - - if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil { + if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { return err } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index f668a8eff82..0cfed001038 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -201,6 +201,7 @@ func TestGetListCacheBypass(t *testing.T) { {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, + {opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, @@ -213,7 +214,6 @@ func TestGetListCacheBypass(t *testing.T) { featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) testCases := append(commonTestCases, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true}, - testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, ) for _, tc := range testCases { testGetListCacheBypass(t, tc.opts, tc.expectBypass) @@ -233,7 +233,6 @@ func TestGetListCacheBypass(t *testing.T) { testCases := append(commonTestCases, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false}, - testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false}, ) for _, tc := range testCases { testGetListCacheBypass(t, tc.opts, tc.expectBypass) @@ -2567,63 +2566,6 @@ func TestWatchStreamSeparation(t *testing.T) { } } -func TestComputeListLimit(t *testing.T) { - scenarios := []struct { - name string - opts storage.ListOptions - expectedLimit int64 - }{ - { - name: "limit is zero", - opts: storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 0, - }, - }, - expectedLimit: 0, - }, - { - name: "limit is positive, RV is unset", - opts: storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 1, - }, - ResourceVersion: "", - }, - expectedLimit: 1, - }, - { - name: "limit is positive, RV = 100", - opts: storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 1, - }, - ResourceVersion: "100", - }, - expectedLimit: 1, - }, - { - name: "legacy case: limit is positive, RV = 0", - opts: storage.ListOptions{ - Predicate: storage.SelectionPredicate{ - Limit: 1, - }, - ResourceVersion: "0", - }, - expectedLimit: 0, - }, - } - - for _, scenario := range scenarios { - t.Run(scenario.name, func(t *testing.T) { - actualLimit := computeListLimit(scenario.opts) - if actualLimit != scenario.expectedLimit { - t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit) - } - }) - } -} - func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) { opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true} opts.Predicate.AllowWatchBookmarks = true diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index a9e76141e93..d880469659c 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -173,10 +173,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) // Watch cache doesn't support continuations, so serve them from etcd. hasContinuation := len(opts.Continue) > 0 + // Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd. + hasLimit := opts.Limit > 0 && resourceVersion != "0" // Watch cache only supports ResourceVersionMatchNotOlderThan (default). - // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list - isLegacyExactMatch := opts.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0" - unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan - return consistentReadFromStorage || hasContinuation || unsupportedMatch + return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch }