mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #113730 from wojtek-t/generalize_cacher_tests_3
Reuse generic GetList test for watchcache and fix inconsistency issues for both etcd3 and watchcache
This commit is contained in:
commit
2bb77a13b1
@ -621,9 +621,11 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o
|
||||
func shouldDelegateList(opts storage.ListOptions) bool {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
pred := opts.Predicate
|
||||
match := opts.ResourceVersionMatch
|
||||
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
||||
hasContinuation := pagingEnabled && len(pred.Continue) > 0
|
||||
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
|
||||
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
// If resourceVersion is not specified, serve it from underlying
|
||||
// storage (for backward compatibility). If a continuation is
|
||||
@ -631,7 +633,7 @@ func shouldDelegateList(opts storage.ListOptions) bool {
|
||||
// Limits are only sent to storage when resourceVersion is non-zero
|
||||
// since the watch cache isn't able to perform continuations, and
|
||||
// limits are ignored when resource version is zero
|
||||
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
|
||||
return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch
|
||||
}
|
||||
|
||||
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) {
|
||||
@ -657,6 +659,11 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
return c.storage.GetList(ctx, key, opts, listObj)
|
||||
}
|
||||
|
||||
match := opts.ResourceVersionMatch
|
||||
if match != metav1.ResourceVersionMatchNotOlderThan && match != "" {
|
||||
return fmt.Errorf("unknown ResourceVersionMatch value: %v", match)
|
||||
}
|
||||
|
||||
// If resourceVersion is specified, serve it from cache.
|
||||
// It's guaranteed that the returned value is at least that
|
||||
// fresh as the given resourceVersion.
|
||||
@ -715,6 +722,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
}
|
||||
}
|
||||
if listVal.IsNil() {
|
||||
// Ensure that we never return a nil Items pointer in the result for consistency.
|
||||
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
|
||||
}
|
||||
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
|
||||
if c.versioner != nil {
|
||||
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
|
||||
|
@ -463,6 +463,20 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion
|
||||
return nil
|
||||
}
|
||||
|
||||
type sortableStoreElements []interface{}
|
||||
|
||||
func (s sortableStoreElements) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s sortableStoreElements) Less(i, j int) bool {
|
||||
return s[i].(*storeElement).Key < s[j].(*storeElement).Key
|
||||
}
|
||||
|
||||
func (s sortableStoreElements) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
||||
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
||||
// with their ResourceVersion and the name of the index, if any, that was used.
|
||||
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) {
|
||||
@ -472,16 +486,21 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
|
||||
return nil, 0, "", err
|
||||
}
|
||||
|
||||
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
|
||||
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
|
||||
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
|
||||
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
|
||||
for _, matchValue := range matchValues {
|
||||
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
|
||||
return result, w.resourceVersion, matchValue.IndexName, nil
|
||||
result, rv, index, err := func() ([]interface{}, uint64, string, error) {
|
||||
// This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only
|
||||
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
|
||||
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
|
||||
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
|
||||
for _, matchValue := range matchValues {
|
||||
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
|
||||
return result, w.resourceVersion, matchValue.IndexName, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return w.store.List(), w.resourceVersion, "", nil
|
||||
return w.store.List(), w.resourceVersion, "", nil
|
||||
}()
|
||||
|
||||
sort.Sort(sortableStoreElements(result))
|
||||
return result, rv, index, err
|
||||
}
|
||||
|
||||
// WaitUntilFreshAndGet returns a pointers to <storeElement> object.
|
||||
|
@ -782,6 +782,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
||||
options = append(options, clientv3.WithRev(withRev))
|
||||
}
|
||||
}
|
||||
if v.IsNil() {
|
||||
// Ensure that we never return a nil Items pointer in the result for consistency.
|
||||
v.Set(reflect.MakeSlice(v.Type(), 0, 0))
|
||||
}
|
||||
|
||||
// instruct the client to begin querying from immediately after the last key we returned
|
||||
// we never return a key that the client wouldn't be allowed to see
|
||||
|
@ -189,7 +189,7 @@ func TestTransformationFailure(t *testing.T) {
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
ctx, store, _ := testSetup(t)
|
||||
storagetesting.RunTestList(ctx, t, store)
|
||||
storagetesting.RunTestList(ctx, t, store, false)
|
||||
}
|
||||
|
||||
func TestListWithoutPaging(t *testing.T) {
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -188,6 +189,13 @@ func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// For some asynchronous implementations of storage interface (in particular watchcache),
|
||||
// certain requests may impact result of further requests. As an example, if we first
|
||||
// ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests
|
||||
// with NotOlderThan semantic), the further requests (even specifying earlier resource
|
||||
// version) will also return the result synchronized to at least ResourceVersion X.
|
||||
// By parallelizing test cases we ensure that the order in which test cases are defined
|
||||
// doesn't automatically preclude some scenarios from happening.
|
||||
t.Parallel()
|
||||
|
||||
out := &example.Pod{}
|
||||
@ -468,7 +476,7 @@ func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T
|
||||
}
|
||||
}
|
||||
|
||||
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ignoreWatchCacheTests bool) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
|
||||
|
||||
initialRV, preset, err := seedMultiLevelData(ctx, store)
|
||||
@ -478,7 +486,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
|
||||
list := &example.PodList{}
|
||||
storageOpts := storage.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
// Ensure we're listing from "now".
|
||||
ResourceVersion: "",
|
||||
Predicate: storage.Everything,
|
||||
Recursive: true,
|
||||
}
|
||||
@ -502,7 +511,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
rvMatch metav1.ResourceVersionMatch
|
||||
prefix string
|
||||
pred storage.SelectionPredicate
|
||||
expectedOut []*example.Pod
|
||||
ignoreForWatchCache bool
|
||||
expectedOut []example.Pod
|
||||
expectedAlternatives [][]example.Pod
|
||||
expectContinue bool
|
||||
expectedRemainingItemCount *int64
|
||||
expectError bool
|
||||
@ -539,31 +550,31 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
name: "test List on existing key",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
expectedOut: []example.Pod{*preset[0]},
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 0",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
rv: "0",
|
||||
name: "test List on existing key with resource version set to 0",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedAlternatives: [][]example.Pod{{}, {*preset[0]}},
|
||||
rv: "0",
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set before first write, match=Exact",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{},
|
||||
expectedOut: []example.Pod{},
|
||||
rv: initialRV,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: initialRV,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 0, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
name: "test List on existing key with resource version set to 0, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedAlternatives: [][]example.Pod{{}, {*preset[0]}},
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to 0, match=Invalid",
|
||||
@ -574,12 +585,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
expectError: true,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set before first write, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
rv: initialRV,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
name: "test List on existing key with resource version set before first write, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedAlternatives: [][]example.Pod{{}, {*preset[0]}},
|
||||
rv: initialRV,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set before first write, match=Invalid",
|
||||
@ -593,14 +604,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
name: "test List on existing key with resource version set to current resource version",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
expectedOut: []example.Pod{*preset[0]},
|
||||
rv: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List on existing key with resource version set to current resource version, match=Exact",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
expectedOut: []example.Pod{*preset[0]},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: list.ResourceVersion,
|
||||
@ -609,7 +620,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
name: "test List on existing key with resource version set to current resource version, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[0]},
|
||||
expectedOut: []example.Pod{*preset[0]},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
@ -617,7 +628,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
name: "test List on non-existing key",
|
||||
prefix: "/pods/non-existing/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: nil,
|
||||
expectedOut: []example.Pod{},
|
||||
},
|
||||
{
|
||||
name: "test List with pod name matching",
|
||||
@ -626,7 +637,18 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||
},
|
||||
expectedOut: nil,
|
||||
expectedOut: []example.Pod{},
|
||||
},
|
||||
{
|
||||
name: "test List with pod name matching with resource version set to current resource version, match=NotOlderThan",
|
||||
prefix: "/pods/first/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.ParseSelectorOrDie("metadata.name!=bar"),
|
||||
},
|
||||
expectedOut: []example.Pod{},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
},
|
||||
{
|
||||
name: "test List with limit",
|
||||
@ -636,7 +658,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1]},
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
},
|
||||
@ -648,7 +670,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1]},
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: list.ResourceVersion,
|
||||
@ -662,13 +684,28 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1]},
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at current resource version and match=NotOlderThan",
|
||||
prefix: "/pods/second/",
|
||||
pred: storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectRV: list.ResourceVersion,
|
||||
},
|
||||
{
|
||||
name: "test List with limit at resource version 0",
|
||||
prefix: "/pods/second/",
|
||||
@ -677,7 +714,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1]},
|
||||
// TODO(#108003): As of now, watchcache is deliberately ignoring
|
||||
// limit if RV=0 is specified, returning whole list of objects.
|
||||
// While this should eventually get fixed, for now we're explicitly
|
||||
// ignoring this testcase for watchcache.
|
||||
ignoreForWatchCache: true,
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: "0",
|
||||
@ -691,7 +733,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[1]},
|
||||
// TODO(#108003): As of now, watchcache is deliberately ignoring
|
||||
// limit if RV=0 is specified, returning whole list of objects.
|
||||
// While this should eventually get fixed, for now we're explicitly
|
||||
// ignoring this testcase for watchcache.
|
||||
ignoreForWatchCache: true,
|
||||
expectedOut: []example.Pod{*preset[1]},
|
||||
expectContinue: true,
|
||||
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
|
||||
rv: "0",
|
||||
@ -706,7 +753,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Field: fields.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{},
|
||||
expectedOut: []example.Pod{},
|
||||
expectContinue: false,
|
||||
rv: initialRV,
|
||||
rvMatch: metav1.ResourceVersionMatchExact,
|
||||
@ -721,7 +768,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Limit: 1,
|
||||
Continue: secondContinuation,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[2]},
|
||||
expectedOut: []example.Pod{*preset[2]},
|
||||
},
|
||||
{
|
||||
name: "ignores resource version 0 for List with pregenerated continue token",
|
||||
@ -733,13 +780,21 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Continue: secondContinuation,
|
||||
},
|
||||
rv: "0",
|
||||
expectedOut: []*example.Pod{preset[2]},
|
||||
expectedOut: []example.Pod{*preset[2]},
|
||||
},
|
||||
{
|
||||
name: "test List with multiple levels of directories and expect flattened result",
|
||||
prefix: "/pods/second/",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []*example.Pod{preset[1], preset[2]},
|
||||
expectedOut: []example.Pod{*preset[1], *preset[2]},
|
||||
},
|
||||
{
|
||||
name: "test List with multiple levels of directories and expect flattened result with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods/second/",
|
||||
pred: storage.Everything,
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[1], *preset[2]},
|
||||
},
|
||||
{
|
||||
name: "test List with filter returning only one item, ensure only a single page returned",
|
||||
@ -749,7 +804,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[3]},
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
expectContinue: true,
|
||||
},
|
||||
{
|
||||
name: "test List with filter returning only one item, ensure only a single page returned with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "barfoo"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 1,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
expectContinue: true,
|
||||
},
|
||||
{
|
||||
@ -760,7 +828,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[3]},
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
expectContinue: false,
|
||||
},
|
||||
{
|
||||
name: "test List with filter returning only one item, covers the entire list with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "barfoo"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
expectContinue: false,
|
||||
},
|
||||
{
|
||||
@ -771,9 +852,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
rv: "0",
|
||||
expectedOut: []*example.Pod{preset[3]},
|
||||
expectContinue: false,
|
||||
rv: "0",
|
||||
expectedAlternatives: [][]example.Pod{{}, {*preset[3]}},
|
||||
expectContinue: false,
|
||||
},
|
||||
{
|
||||
name: "test List with filter returning two items, more pages possible",
|
||||
@ -784,7 +865,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Limit: 2,
|
||||
},
|
||||
expectContinue: true,
|
||||
expectedOut: []*example.Pod{preset[0], preset[1]},
|
||||
expectedOut: []example.Pod{*preset[0], *preset[1]},
|
||||
},
|
||||
{
|
||||
name: "test List with filter returning two items, more pages possible with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "bar"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectContinue: true,
|
||||
expectedOut: []example.Pod{*preset[0], *preset[1]},
|
||||
},
|
||||
{
|
||||
name: "filter returns two items split across multiple pages",
|
||||
@ -794,7 +888,19 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[2], preset[4]},
|
||||
expectedOut: []example.Pod{*preset[2], *preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns two items split across multiple pages with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 2,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[2], *preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns one item for last page, ends on last item, not full",
|
||||
@ -805,7 +911,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Limit: 2,
|
||||
Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)),
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[4]},
|
||||
expectedOut: []example.Pod{*preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns one item for last page, starts on last item, full",
|
||||
@ -816,7 +922,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Limit: 1,
|
||||
Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)),
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[4]},
|
||||
expectedOut: []example.Pod{*preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns one item for last page, starts on last item, partial page",
|
||||
@ -827,7 +933,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Limit: 2,
|
||||
Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)),
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[4]},
|
||||
expectedOut: []example.Pod{*preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns two items, page size equal to total list size",
|
||||
@ -837,7 +943,19 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 5,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[2], preset[4]},
|
||||
expectedOut: []example.Pod{*preset[2], *preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns two items, page size equal to total list size with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "foo"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 5,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[2], *preset[4]},
|
||||
},
|
||||
{
|
||||
name: "filter returns one item, page size equal to total list size",
|
||||
@ -847,12 +965,52 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Label: labels.Everything(),
|
||||
Limit: 5,
|
||||
},
|
||||
expectedOut: []*example.Pod{preset[3]},
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
},
|
||||
{
|
||||
name: "filter returns one item, page size equal to total list size with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.SelectionPredicate{
|
||||
Field: fields.OneTermEqualSelector("metadata.name", "barfoo"),
|
||||
Label: labels.Everything(),
|
||||
Limit: 5,
|
||||
},
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[3]},
|
||||
},
|
||||
{
|
||||
name: "list all items",
|
||||
prefix: "/pods",
|
||||
pred: storage.Everything,
|
||||
expectedOut: []example.Pod{*preset[0], *preset[1], *preset[2], *preset[3], *preset[4]},
|
||||
},
|
||||
{
|
||||
name: "list all items with current resource version and match=NotOlderThan",
|
||||
prefix: "/pods",
|
||||
pred: storage.Everything,
|
||||
rv: list.ResourceVersion,
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
expectedOut: []example.Pod{*preset[0], *preset[1], *preset[2], *preset[3], *preset[4]},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// For some asynchronous implementations of storage interface (in particular watchcache),
|
||||
// certain requests may impact result of further requests. As an example, if we first
|
||||
// ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests
|
||||
// with NotOlderThan semantic), the further requests (even specifying earlier resource
|
||||
// version) will also return the result synchronized to at least ResourceVersion X.
|
||||
// By parallelizing test cases we ensure that the order in which test cases are defined
|
||||
// doesn't automatically preclude some scenarios from happening.
|
||||
t.Parallel()
|
||||
|
||||
if ignoreWatchCacheTests && tt.ignoreForWatchCache {
|
||||
t.Skip()
|
||||
}
|
||||
|
||||
if tt.pred.GetAttrs == nil {
|
||||
tt.pred.GetAttrs = getAttrs
|
||||
}
|
||||
@ -864,9 +1022,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
Predicate: tt.pred,
|
||||
Recursive: true,
|
||||
}
|
||||
err = store.GetList(ctx, tt.prefix, storageOpts, out)
|
||||
err := store.GetList(ctx, tt.prefix, storageOpts, out)
|
||||
if tt.expectRVTooLarge {
|
||||
if err == nil || !storage.IsTooLargeResourceVersion(err) {
|
||||
if err == nil || !apierrors.IsTimeout(err) || !storage.IsTooLargeResourceVersion(err) {
|
||||
t.Fatalf("expecting resource version too high error, but get: %s", err)
|
||||
}
|
||||
return
|
||||
@ -896,15 +1054,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) {
|
||||
t.Errorf("resourceVersion in list response invalid: %v", err)
|
||||
}
|
||||
}
|
||||
if len(tt.expectedOut) != len(out.Items) {
|
||||
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
|
||||
}
|
||||
if diff := cmp.Diff(tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount()); diff != "" {
|
||||
t.Errorf("incorrect remainingItemCount: %s", diff)
|
||||
}
|
||||
for j, wantPod := range tt.expectedOut {
|
||||
getPod := &out.Items[j]
|
||||
ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod)
|
||||
|
||||
if tt.expectedAlternatives == nil {
|
||||
sort.Sort(sortablePodList(tt.expectedOut))
|
||||
ExpectNoDiff(t, "incorrect list pods", tt.expectedOut, out.Items)
|
||||
} else {
|
||||
toInterfaceSlice := func(podLists [][]example.Pod) []interface{} {
|
||||
result := make([]interface{}, 0, len(podLists))
|
||||
for i := range podLists {
|
||||
sort.Sort(sortablePodList(podLists[i]))
|
||||
result = append(result, podLists[i])
|
||||
}
|
||||
return result
|
||||
}
|
||||
ExpectContains(t, "incorrect list pods", toInterfaceSlice(tt.expectedAlternatives), out.Items)
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -1088,13 +1251,13 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
|
||||
name: "existing key, resourceVersion=0",
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedAlternatives: [][]example.Pod{nil, {*storedObj}},
|
||||
expectedAlternatives: [][]example.Pod{{}, {*storedObj}},
|
||||
rv: "0",
|
||||
}, {
|
||||
name: "existing key, resourceVersion=0, resourceVersionMatch=notOlderThan",
|
||||
key: key,
|
||||
pred: storage.Everything,
|
||||
expectedAlternatives: [][]example.Pod{nil, {*storedObj}},
|
||||
expectedAlternatives: [][]example.Pod{{}, {*storedObj}},
|
||||
rv: "0",
|
||||
rvMatch: metav1.ResourceVersionMatchNotOlderThan,
|
||||
}, {
|
||||
@ -1142,7 +1305,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
|
||||
name: "non-existing key",
|
||||
key: "/non-existing",
|
||||
pred: storage.Everything,
|
||||
expectedOut: nil,
|
||||
expectedOut: []example.Pod{},
|
||||
}, {
|
||||
name: "with matching pod name",
|
||||
key: "/non-existing",
|
||||
@ -1154,12 +1317,19 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage
|
||||
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||
},
|
||||
},
|
||||
expectedOut: nil,
|
||||
expectedOut: []example.Pod{},
|
||||
}}
|
||||
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
// For some asynchronous implementations of storage interface (in particular watchcache),
|
||||
// certain requests may impact result of further requests. As an example, if we first
|
||||
// ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests
|
||||
// with NotOlderThan semantic), the further requests (even specifying earlier resource
|
||||
// version) will also return the result synchronized to at least ResourceVersion X.
|
||||
// By parallelizing test cases we ensure that the order in which test cases are defined
|
||||
// doesn't automatically preclude some scenarios from happening.
|
||||
t.Parallel()
|
||||
|
||||
out := &example.PodList{}
|
||||
|
@ -291,3 +291,17 @@ func (ft *failingTransformer) TransformFromStorage(ctx context.Context, data []b
|
||||
func (ft *failingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) {
|
||||
return data, nil
|
||||
}
|
||||
|
||||
type sortablePodList []example.Pod
|
||||
|
||||
func (s sortablePodList) Len() int {
|
||||
return len(s)
|
||||
}
|
||||
|
||||
func (s sortablePodList) Less(i, j int) bool {
|
||||
return computePodKey(&s[i]) < computePodKey(&s[j])
|
||||
}
|
||||
|
||||
func (s sortablePodList) Swap(i, j int) {
|
||||
s[i], s[j] = s[j], s[i]
|
||||
}
|
||||
|
@ -176,6 +176,14 @@ func TestGetListNonRecursive(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestList(t *testing.T) {
|
||||
ctx, cacher, terminate := testSetup(t)
|
||||
t.Cleanup(terminate)
|
||||
storagetesting.RunTestList(ctx, t, cacher, true)
|
||||
}
|
||||
|
||||
// TODO(wojtek-t): We should extend the generic RunTestList test to cover the
|
||||
// scenarios that are not yet covered by it and get rid of this test.
|
||||
func TestListDeprecated(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, _, err := newTestCacher(etcdStorage)
|
||||
@ -268,42 +276,6 @@ func TestList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available
|
||||
// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error.
|
||||
func TestTooLargeResourceVersionList(t *testing.T) {
|
||||
server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix())
|
||||
defer server.Terminate(t)
|
||||
cacher, v, err := newTestCacher(etcdStorage)
|
||||
if err != nil {
|
||||
t.Fatalf("Couldn't create cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
podFoo := makeTestPod("foo")
|
||||
fooCreated := updatePod(t, etcdStorage, podFoo, nil)
|
||||
|
||||
// Set up List at fooCreated.ResourceVersion + 10
|
||||
rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
listRV := strconv.Itoa(int(rv + 10))
|
||||
|
||||
result := &example.PodList{}
|
||||
options := storage.ListOptions{
|
||||
ResourceVersion: listRV,
|
||||
Predicate: storage.Everything,
|
||||
Recursive: true,
|
||||
}
|
||||
err = cacher.GetList(context.TODO(), "pods/ns", options, result)
|
||||
if !errors.IsTimeout(err) {
|
||||
t.Errorf("Unexpected error: %v", err)
|
||||
}
|
||||
if !storage.IsTooLargeResourceVersion(err) {
|
||||
t.Errorf("expected 'Too large resource version' cause in error but got: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) {
|
||||
_, _, line, _ := goruntime.Caller(1)
|
||||
select {
|
||||
|
@ -147,8 +147,11 @@ func key(requestInfo *apirequest.RequestInfo) string {
|
||||
// staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go
|
||||
func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool {
|
||||
resourceVersion := opts.ResourceVersion
|
||||
match := opts.ResourceVersionMatch
|
||||
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
|
||||
hasContinuation := pagingEnabled && len(opts.Continue) > 0
|
||||
hasLimit := pagingEnabled && opts.Limit > 0 && resourceVersion != "0"
|
||||
return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact
|
||||
unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch
|
||||
}
|
||||
|
@ -19,6 +19,7 @@ package apiserver
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -164,3 +165,68 @@ func TestWatchCacheUpdatedByEtcd(t *testing.T) {
|
||||
t.Errorf("Events watchcache unexpected synced: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkListFromWatchCache(b *testing.B) {
|
||||
c, _, tearDownFn := framework.StartTestServer(b, framework.TestServerSetup{
|
||||
ModifyServerConfig: func(config *controlplane.Config) {
|
||||
// Switch off endpoints reconciler to avoid unnecessary operations.
|
||||
config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
|
||||
},
|
||||
})
|
||||
defer tearDownFn()
|
||||
|
||||
namespaces, secretsPerNamespace := 100, 1000
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
errCh := make(chan error, namespaces)
|
||||
for i := 0; i < namespaces; i++ {
|
||||
wg.Add(1)
|
||||
index := i
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
ctx := context.Background()
|
||||
ns := &v1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("namespace-%d", index)},
|
||||
}
|
||||
ns, err := c.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
for j := 0; j < secretsPerNamespace; j++ {
|
||||
secret := &v1.Secret{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("secret-%d", j),
|
||||
},
|
||||
}
|
||||
_, err := c.CoreV1().Secrets(ns.Name).Create(ctx, secret, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
for err := range errCh {
|
||||
b.Error(err)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
|
||||
ctx := context.Background()
|
||||
opts := metav1.ListOptions{
|
||||
ResourceVersion: "0",
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
secrets, err := c.CoreV1().Secrets("").List(ctx, opts)
|
||||
if err != nil {
|
||||
b.Errorf("failed to list secrets: %v", err)
|
||||
}
|
||||
b.Logf("Number of secrets: %d", len(secrets.Items))
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user