apiserver/storage/cacher: cache supports pagination

This commit is contained in:
Lukasz Szaszkiewicz 2024-06-10 10:14:38 +02:00
parent bc3b8f6c6b
commit 87536f367d
2 changed files with 92 additions and 7 deletions

View File

@ -762,12 +762,26 @@ func shouldDelegateList(opts storage.ListOptions) bool {
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := pred.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
return consistentReadFromStorage || hasContinuation || unsupportedMatch
}
// computeListLimit determines whether the cacher should
// apply a limit to an incoming LIST request and returns its value.
//
// note that this function doesn't check RVM nor the Continuation token.
// these parameters are validated by the shouldDelegateList function.
//
// as of today, the limit is ignored for requests that set RV == 0
func computeListLimit(opts storage.ListOptions) int64 {
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
return 0
}
return opts.Predicate.Limit
}
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
@ -877,13 +891,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
// so we try to delay this action as much as possible
var selectedObjects []runtime.Object
for _, obj := range objs {
var lastSelectedObjectKey string
var hasMoreListItems bool
limit := computeListLimit(opts)
for i, obj := range objs {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
if limit > 0 && int64(len(selectedObjects)) >= limit {
hasMoreListItems = i < len(objs)-1
break
}
}
if len(selectedObjects) == 0 {
@ -899,7 +921,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
}
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
if err != nil {
return err
}
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
return err
}
}

View File

@ -201,7 +201,6 @@ func TestGetListCacheBypass(t *testing.T) {
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
@ -214,6 +213,7 @@ func TestGetListCacheBypass(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
)
for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@ -233,6 +233,7 @@ func TestGetListCacheBypass(t *testing.T) {
testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
)
for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@ -2566,6 +2567,63 @@ func TestWatchStreamSeparation(t *testing.T) {
}
}
func TestComputeListLimit(t *testing.T) {
scenarios := []struct {
name string
opts storage.ListOptions
expectedLimit int64
}{
{
name: "limit is zero",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 0,
},
},
expectedLimit: 0,
},
{
name: "limit is positive, RV is unset",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "",
},
expectedLimit: 1,
},
{
name: "limit is positive, RV = 100",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "100",
},
expectedLimit: 1,
},
{
name: "legacy case: limit is positive, RV = 0",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "0",
},
expectedLimit: 0,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
actualLimit := computeListLimit(scenario.opts)
if actualLimit != scenario.expectedLimit {
t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit)
}
})
}
}
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
opts.Predicate.AllowWatchBookmarks = true