Merge pull request #125730 from p0lyn0mial/upstream-bring-back-consistent-read-from-cache-supports-pagination

apiserver/storage/cacher: consistent read from cache supports limit
This commit is contained in:
Kubernetes Prow Robot
2024-06-27 11:57:20 -07:00
committed by GitHub
5 changed files with 131 additions and 26 deletions

View File

@@ -615,7 +615,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions
// to compute watcher.forget function (which has to happen under lock). // to compute watcher.forget function (which has to happen under lock).
watcher := newCacheWatcher( watcher := newCacheWatcher(
chanSize, chanSize,
filterWithAttrsFunction(key, pred), filterWithAttrsAndPrefixFunction(key, pred),
emptyFunc, emptyFunc,
c.versioner, c.versioner,
deadline, deadline,
@@ -768,12 +768,26 @@ func shouldDelegateList(opts storage.ListOptions) bool {
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd. // Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(pred.Continue) > 0 hasContinuation := len(pred.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := pred.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default). // Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch return consistentReadFromStorage || hasContinuation || unsupportedMatch
}
// computeListLimit determines whether the cacher should
// apply a limit to an incoming LIST request and returns its value.
//
// note that this function doesn't check RVM nor the Continuation token.
// these parameters are validated by the shouldDelegateList function.
//
// as of today, the limit is ignored for requests that set RV == 0
func computeListLimit(opts storage.ListOptions) int64 {
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
return 0
}
return opts.Predicate.Limit
} }
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
@@ -795,7 +809,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
} }
return nil, readResourceVersion, "", nil return nil, readResourceVersion, "", nil
} }
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, pred.MatcherIndex(ctx)) return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
} }
// GetList implements storage.Interface // GetList implements storage.Interface
@@ -871,7 +885,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
if listVal.Kind() != reflect.Slice { if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
} }
filter := filterWithAttrsFunction(preparedKey, pred)
objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
if err != nil { if err != nil {
@@ -883,13 +896,21 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption. // the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
// so we try to delay this action as much as possible // so we try to delay this action as much as possible
var selectedObjects []runtime.Object var selectedObjects []runtime.Object
for _, obj := range objs { var lastSelectedObjectKey string
var hasMoreListItems bool
limit := computeListLimit(opts)
for i, obj := range objs {
elem, ok := obj.(*storeElement) elem, ok := obj.(*storeElement)
if !ok { if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj) return fmt.Errorf("non *storeElement returned from storage: %v", obj)
} }
if filter(elem.Key, elem.Labels, elem.Fields) { if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
selectedObjects = append(selectedObjects, elem.Object) selectedObjects = append(selectedObjects, elem.Object)
lastSelectedObjectKey = elem.Key
}
if limit > 0 && int64(len(selectedObjects)) >= limit {
hasMoreListItems = i < len(objs)-1
break
} }
} }
if len(selectedObjects) == 0 { if len(selectedObjects) == 0 {
@@ -905,7 +926,12 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
} }
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
if c.versioner != nil { if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
if err != nil {
return err
}
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
return err return err
} }
} }
@@ -1293,7 +1319,7 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName,
} }
} }
func filterWithAttrsFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc { func filterWithAttrsAndPrefixFunction(key string, p storage.SelectionPredicate) filterWithAttrsFunc {
filterFunc := func(objKey string, label labels.Set, field fields.Set) bool { filterFunc := func(objKey string, label labels.Set, field fields.Set) bool {
if !hasPathPrefix(objKey, key) { if !hasPathPrefix(objKey, key) {
return false return false

View File

@@ -201,7 +201,6 @@ func TestGetListCacheBypass(t *testing.T) {
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false}, {opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true}, {opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
@@ -214,6 +213,7 @@ func TestGetListCacheBypass(t *testing.T) {
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
testCases := append(commonTestCases, testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true}, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
) )
for _, tc := range testCases { for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass) testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -233,6 +233,7 @@ func TestGetListCacheBypass(t *testing.T) {
testCases := append(commonTestCases, testCases := append(commonTestCases,
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false}, testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
) )
for _, tc := range testCases { for _, tc := range testCases {
testGetListCacheBypass(t, tc.opts, tc.expectBypass) testGetListCacheBypass(t, tc.opts, tc.expectBypass)
@@ -2591,6 +2592,63 @@ func TestWatchStreamSeparation(t *testing.T) {
} }
} }
func TestComputeListLimit(t *testing.T) {
scenarios := []struct {
name string
opts storage.ListOptions
expectedLimit int64
}{
{
name: "limit is zero",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 0,
},
},
expectedLimit: 0,
},
{
name: "limit is positive, RV is unset",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "",
},
expectedLimit: 1,
},
{
name: "limit is positive, RV = 100",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "100",
},
expectedLimit: 1,
},
{
name: "legacy case: limit is positive, RV = 0",
opts: storage.ListOptions{
Predicate: storage.SelectionPredicate{
Limit: 1,
},
ResourceVersion: "0",
},
expectedLimit: 0,
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
actualLimit := computeListLimit(scenario.opts)
if actualLimit != scenario.expectedLimit {
t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit)
}
})
}
}
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) { func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true} opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
opts.Predicate.AllowWatchBookmarks = true opts.Predicate.AllowWatchBookmarks = true

View File

@@ -501,7 +501,29 @@ func (s sortableStoreElements) Swap(i, j int) {
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
// with their ResourceVersion and the name of the index, if any, that was used. // with their ResourceVersion and the name of the index, if any, that was used.
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
items, rv, index, err := w.waitUntilFreshAndListItems(ctx, resourceVersion, key, matchValues)
if err != nil {
return nil, 0, "", err
}
var result []interface{}
for _, item := range items {
elem, ok := item.(*storeElement)
if !ok {
return nil, 0, "", fmt.Errorf("non *storeElement returned from storage: %v", item)
}
if !hasPathPrefix(elem.Key, key) {
continue
}
result = append(result, item)
}
sort.Sort(sortableStoreElements(result))
return result, rv, index, nil
}
func (w *watchCache) waitUntilFreshAndListItems(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) {
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
w.waitingUntilFresh.Add() w.waitingUntilFresh.Add()
@@ -511,7 +533,6 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
err = w.waitUntilFreshAndBlock(ctx, resourceVersion) err = w.waitUntilFreshAndBlock(ctx, resourceVersion)
} }
defer func() { sort.Sort(sortableStoreElements(result)) }()
defer w.RUnlock() defer w.RUnlock()
if err != nil { if err != nil {
return result, rv, index, err return result, rv, index, err

View File

@@ -462,7 +462,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
}() }()
// list by empty MatchValues. // list by empty MatchValues.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, nil) list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -481,7 +481,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:label", Value: "value1"}, {IndexName: "l:label", Value: "value1"},
{IndexName: "f:spec.nodeName", Value: "node2"}, {IndexName: "f:spec.nodeName", Value: "node2"},
} }
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -500,7 +500,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
{IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "l:not-exist-label", Value: "whatever"},
{IndexName: "f:spec.nodeName", Value: "node2"}, {IndexName: "f:spec.nodeName", Value: "node2"},
} }
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -518,7 +518,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
matchValues = []storage.MatchValue{ matchValues = []storage.MatchValue{
{IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "l:not-exist-label", Value: "whatever"},
} }
list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, matchValues) list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -546,7 +546,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
}() }()
// list from future revision. Requires watch cache to request bookmark to get it. // list from future revision. Requires watch cache to request bookmark to get it.
list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, nil) list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
store.Add(makeTestPod("bar", 4)) store.Add(makeTestPod("bar", 4))
}() }()
_, _, _, err := store.WaitUntilFreshAndList(ctx, 4, nil) _, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
if !errors.IsTimeout(err) { if !errors.IsTimeout(err) {
t.Errorf("expected timeout error but got: %v", err) t.Errorf("expected timeout error but got: %v", err)
} }
@@ -655,7 +655,7 @@ func TestReflectorForWatchCache(t *testing.T) {
defer store.Stop() defer store.Stop()
{ {
_, version, _, err := store.WaitUntilFreshAndList(ctx, 0, nil) _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }
@@ -678,7 +678,7 @@ func TestReflectorForWatchCache(t *testing.T) {
r.ListAndWatch(wait.NeverStop) r.ListAndWatch(wait.NeverStop)
{ {
_, version, _, err := store.WaitUntilFreshAndList(ctx, 10, nil) _, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
if err != nil { if err != nil {
t.Fatalf("unexpected error: %v", err) t.Fatalf("unexpected error: %v", err)
} }

View File

@@ -173,10 +173,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported) consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
// Watch cache doesn't support continuations, so serve them from etcd. // Watch cache doesn't support continuations, so serve them from etcd.
hasContinuation := len(opts.Continue) > 0 hasContinuation := len(opts.Continue) > 0
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
hasLimit := opts.Limit > 0 && resourceVersion != "0"
// Watch cache only supports ResourceVersionMatchNotOlderThan (default). // Watch cache only supports ResourceVersionMatchNotOlderThan (default).
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan // see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
isLegacyExactMatch := opts.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch return consistentReadFromStorage || hasContinuation || unsupportedMatch
} }