mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Revert "apiserver/storage/cacher: consistent read from cache supports limit"
This commit is contained in:
parent
c6fd466fb4
commit
df17ea2e37
@ -768,26 +768,12 @@ func shouldDelegateList(opts storage.ListOptions) bool {
|
|||||||
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
|
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
|
||||||
// Watch cache doesn't support continuations, so serve them from etcd.
|
// Watch cache doesn't support continuations, so serve them from etcd.
|
||||||
hasContinuation := len(pred.Continue) > 0
|
hasContinuation := len(pred.Continue) > 0
|
||||||
|
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
|
||||||
|
hasLimit := pred.Limit > 0 && resourceVersion != "0"
|
||||||
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
|
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
|
||||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
|
||||||
isLegacyExactMatch := opts.Predicate.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
|
|
||||||
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
|
|
||||||
|
|
||||||
return consistentReadFromStorage || hasContinuation || unsupportedMatch
|
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
|
||||||
}
|
|
||||||
|
|
||||||
// computeListLimit determines whether the cacher should
|
|
||||||
// apply a limit to an incoming LIST request and returns its value.
|
|
||||||
//
|
|
||||||
// note that this function doesn't check RVM nor the Continuation token.
|
|
||||||
// these parameters are validated by the shouldDelegateList function.
|
|
||||||
//
|
|
||||||
// as of today, the limit is ignored for requests that set RV == 0
|
|
||||||
func computeListLimit(opts storage.ListOptions) int64 {
|
|
||||||
if opts.Predicate.Limit <= 0 || opts.ResourceVersion == "0" {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
return opts.Predicate.Limit
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool {
|
||||||
@ -897,21 +883,13 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
|
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
|
||||||
// so we try to delay this action as much as possible
|
// so we try to delay this action as much as possible
|
||||||
var selectedObjects []runtime.Object
|
var selectedObjects []runtime.Object
|
||||||
var lastSelectedObjectKey string
|
for _, obj := range objs {
|
||||||
var hasMoreListItems bool
|
|
||||||
limit := computeListLimit(opts)
|
|
||||||
for i, obj := range objs {
|
|
||||||
elem, ok := obj.(*storeElement)
|
elem, ok := obj.(*storeElement)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
}
|
}
|
||||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
if filter(elem.Key, elem.Labels, elem.Fields) {
|
||||||
selectedObjects = append(selectedObjects, elem.Object)
|
selectedObjects = append(selectedObjects, elem.Object)
|
||||||
lastSelectedObjectKey = elem.Key
|
|
||||||
}
|
|
||||||
if limit > 0 && int64(len(selectedObjects)) >= limit {
|
|
||||||
hasMoreListItems = i < len(objs)-1
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(selectedObjects) == 0 {
|
if len(selectedObjects) == 0 {
|
||||||
@ -927,12 +905,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
}
|
}
|
||||||
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
|
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
|
||||||
if c.versioner != nil {
|
if c.versioner != nil {
|
||||||
continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts)
|
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -201,6 +201,7 @@ func TestGetListCacheBypass(t *testing.T) {
|
|||||||
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
|
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
|
||||||
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
|
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Continue: "a"}}, expectBypass: true},
|
||||||
|
|
||||||
|
{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
|
||||||
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
|
{opts: storage.ListOptions{ResourceVersion: "0", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
|
||||||
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
|
{opts: storage.ListOptions{ResourceVersion: "1", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
|
||||||
|
|
||||||
@ -213,7 +214,6 @@ func TestGetListCacheBypass(t *testing.T) {
|
|||||||
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
|
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)
|
||||||
testCases := append(commonTestCases,
|
testCases := append(commonTestCases,
|
||||||
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
|
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: true},
|
||||||
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: true},
|
|
||||||
)
|
)
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
|
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
|
||||||
@ -233,7 +233,6 @@ func TestGetListCacheBypass(t *testing.T) {
|
|||||||
|
|
||||||
testCases := append(commonTestCases,
|
testCases := append(commonTestCases,
|
||||||
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
|
testCase{opts: storage.ListOptions{ResourceVersion: ""}, expectBypass: false},
|
||||||
testCase{opts: storage.ListOptions{ResourceVersion: "", Predicate: storage.SelectionPredicate{Limit: 500}}, expectBypass: false},
|
|
||||||
)
|
)
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
|
testGetListCacheBypass(t, tc.opts, tc.expectBypass)
|
||||||
@ -2567,63 +2566,6 @@ func TestWatchStreamSeparation(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestComputeListLimit(t *testing.T) {
|
|
||||||
scenarios := []struct {
|
|
||||||
name string
|
|
||||||
opts storage.ListOptions
|
|
||||||
expectedLimit int64
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "limit is zero",
|
|
||||||
opts: storage.ListOptions{
|
|
||||||
Predicate: storage.SelectionPredicate{
|
|
||||||
Limit: 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
expectedLimit: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "limit is positive, RV is unset",
|
|
||||||
opts: storage.ListOptions{
|
|
||||||
Predicate: storage.SelectionPredicate{
|
|
||||||
Limit: 1,
|
|
||||||
},
|
|
||||||
ResourceVersion: "",
|
|
||||||
},
|
|
||||||
expectedLimit: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "limit is positive, RV = 100",
|
|
||||||
opts: storage.ListOptions{
|
|
||||||
Predicate: storage.SelectionPredicate{
|
|
||||||
Limit: 1,
|
|
||||||
},
|
|
||||||
ResourceVersion: "100",
|
|
||||||
},
|
|
||||||
expectedLimit: 1,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "legacy case: limit is positive, RV = 0",
|
|
||||||
opts: storage.ListOptions{
|
|
||||||
Predicate: storage.SelectionPredicate{
|
|
||||||
Limit: 1,
|
|
||||||
},
|
|
||||||
ResourceVersion: "0",
|
|
||||||
},
|
|
||||||
expectedLimit: 0,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, scenario := range scenarios {
|
|
||||||
t.Run(scenario.name, func(t *testing.T) {
|
|
||||||
actualLimit := computeListLimit(scenario.opts)
|
|
||||||
if actualLimit != scenario.expectedLimit {
|
|
||||||
t.Errorf("computeListLimit returned = %v, expected %v", actualLimit, scenario.expectedLimit)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
|
func watchAndWaitForBookmark(t *testing.T, ctx context.Context, etcdStorage storage.Interface) func() (resourceVersion uint64) {
|
||||||
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
|
opts := storage.ListOptions{ResourceVersion: "", Predicate: storage.Everything, Recursive: true}
|
||||||
opts.Predicate.AllowWatchBookmarks = true
|
opts.Predicate.AllowWatchBookmarks = true
|
||||||
|
@ -173,10 +173,10 @@ func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
|
|||||||
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
|
consistentReadFromStorage := resourceVersion == "" && !(consistentListFromCacheEnabled && requestWatchProgressSupported)
|
||||||
// Watch cache doesn't support continuations, so serve them from etcd.
|
// Watch cache doesn't support continuations, so serve them from etcd.
|
||||||
hasContinuation := len(opts.Continue) > 0
|
hasContinuation := len(opts.Continue) > 0
|
||||||
|
// Serve paginated requests about revision "0" from watch cache to avoid overwhelming etcd.
|
||||||
|
hasLimit := opts.Limit > 0 && resourceVersion != "0"
|
||||||
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
|
// Watch cache only supports ResourceVersionMatchNotOlderThan (default).
|
||||||
// see https://kubernetes.io/docs/reference/using-api/api-concepts/#semantics-for-get-and-list
|
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
|
||||||
isLegacyExactMatch := opts.Limit > 0 && match == "" && len(resourceVersion) > 0 && resourceVersion != "0"
|
|
||||||
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan || isLegacyExactMatch
|
|
||||||
|
|
||||||
return consistentReadFromStorage || hasContinuation || unsupportedMatch
|
return consistentReadFromStorage || hasContinuation || hasLimit || unsupportedMatch
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user