diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index dfa21157906..bc6500909c3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -621,9 +621,11 @@ func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, o func shouldDelegateList(opts storage.ListOptions) bool { resourceVersion := opts.ResourceVersion pred := opts.Predicate + match := opts.ResourceVersionMatch pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan // If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). If a continuation is @@ -631,7 +633,7 @@ func shouldDelegateList(opts storage.ListOptions) bool { // Limits are only sent to storage when resourceVersion is non-zero // since the watch cache isn't able to perform continuations, and // limits are ignored when resource version is zero - return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact + return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch } func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) { @@ -657,6 +659,11 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return c.storage.GetList(ctx, key, opts, listObj) } + match := opts.ResourceVersionMatch + if match != metav1.ResourceVersionMatchNotOlderThan && match != "" { + return fmt.Errorf("unknown ResourceVersionMatch value: %v", match) + } + // If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. @@ -715,6 +722,10 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) } } + if listVal.IsNil() { + // Ensure that we never return a nil Items pointer in the result for consistency. + listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) + } span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) if c.versioner != nil { if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index bbbeee361f8..3e67f513285 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -463,6 +463,20 @@ func (w *watchCache) waitUntilFreshAndBlock(ctx context.Context, resourceVersion return nil } +type sortableStoreElements []interface{} + +func (s sortableStoreElements) Len() int { + return len(s) +} + +func (s sortableStoreElements) Less(i, j int) bool { + return s[i].(*storeElement).Key < s[j].(*storeElement).Key +} + +func (s sortableStoreElements) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, matchValues []storage.MatchValue) ([]interface{}, uint64, string, error) { @@ -472,16 +486,21 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion return nil, 0, "", err } - // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only - // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we - // want - they will be filtered out later. The fact that we return less things is only further performance improvement. - // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. - for _, matchValue := range matchValues { - if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { - return result, w.resourceVersion, matchValue.IndexName, nil + result, rv, index, err := func() ([]interface{}, uint64, string, error) { + // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only + // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we + // want - they will be filtered out later. The fact that we return less things is only further performance improvement. + // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. + for _, matchValue := range matchValues { + if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { + return result, w.resourceVersion, matchValue.IndexName, nil + } } - } - return w.store.List(), w.resourceVersion, "", nil + return w.store.List(), w.resourceVersion, "", nil + }() + + sort.Sort(sortableStoreElements(result)) + return result, rv, index, err } // WaitUntilFreshAndGet returns a pointers to object. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 51e5b9012b9..463a84d7b91 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -782,6 +782,10 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption options = append(options, clientv3.WithRev(withRev)) } } + if v.IsNil() { + // Ensure that we never return a nil Items pointer in the result for consistency. + v.Set(reflect.MakeSlice(v.Type(), 0, 0)) + } // instruct the client to begin querying from immediately after the last key we returned // we never return a key that the client wouldn't be allowed to see diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index ef2c0b0675a..d9df9c3d3a5 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -189,7 +189,7 @@ func TestTransformationFailure(t *testing.T) { func TestList(t *testing.T) { ctx, store, _ := testSetup(t) - storagetesting.RunTestList(ctx, t, store) + storagetesting.RunTestList(ctx, t, store, false) } func TestListWithoutPaging(t *testing.T) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f68a7df4fa5..75d17ecacea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "reflect" + "sort" "strconv" "strings" "sync" @@ -188,6 +189,13 @@ func RunTestGet(ctx context.Context, t *testing.T, store storage.Interface) { for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { + // For some asynchronous implementations of storage interface (in particular watchcache), + // certain requests may impact result of further requests. As an example, if we first + // ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests + // with NotOlderThan semantic), the further requests (even specifying earlier resource + // version) will also return the result synchronized to at least ResourceVersion X. + // By parallelizing test cases we ensure that the order in which test cases are defined + // doesn't automatically preclude some scenarios from happening. t.Parallel() out := &example.Pod{} @@ -468,7 +476,7 @@ func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T } } -func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { +func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ignoreWatchCacheTests bool) { defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() initialRV, preset, err := seedMultiLevelData(ctx, store) @@ -478,7 +486,8 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { list := &example.PodList{} storageOpts := storage.ListOptions{ - ResourceVersion: "0", + // Ensure we're listing from "now". + ResourceVersion: "", Predicate: storage.Everything, Recursive: true, } @@ -502,7 +511,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { rvMatch metav1.ResourceVersionMatch prefix string pred storage.SelectionPredicate - expectedOut []*example.Pod + ignoreForWatchCache bool + expectedOut []example.Pod + expectedAlternatives [][]example.Pod expectContinue bool expectedRemainingItemCount *int64 expectError bool @@ -539,31 +550,31 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { name: "test List on existing key", prefix: "/pods/first/", pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, + expectedOut: []example.Pod{*preset[0]}, }, { - name: "test List on existing key with resource version set to 0", - prefix: "/pods/first/", - pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, - rv: "0", + name: "test List on existing key with resource version set to 0", + prefix: "/pods/first/", + pred: storage.Everything, + expectedAlternatives: [][]example.Pod{{}, {*preset[0]}}, + rv: "0", }, { name: "test List on existing key with resource version set before first write, match=Exact", prefix: "/pods/first/", pred: storage.Everything, - expectedOut: []*example.Pod{}, + expectedOut: []example.Pod{}, rv: initialRV, rvMatch: metav1.ResourceVersionMatchExact, expectRV: initialRV, }, { - name: "test List on existing key with resource version set to 0, match=NotOlderThan", - prefix: "/pods/first/", - pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, - rv: "0", - rvMatch: metav1.ResourceVersionMatchNotOlderThan, + name: "test List on existing key with resource version set to 0, match=NotOlderThan", + prefix: "/pods/first/", + pred: storage.Everything, + expectedAlternatives: [][]example.Pod{{}, {*preset[0]}}, + rv: "0", + rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, { name: "test List on existing key with resource version set to 0, match=Invalid", @@ -574,12 +585,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { expectError: true, }, { - name: "test List on existing key with resource version set before first write, match=NotOlderThan", - prefix: "/pods/first/", - pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, - rv: initialRV, - rvMatch: metav1.ResourceVersionMatchNotOlderThan, + name: "test List on existing key with resource version set before first write, match=NotOlderThan", + prefix: "/pods/first/", + pred: storage.Everything, + expectedAlternatives: [][]example.Pod{{}, {*preset[0]}}, + rv: initialRV, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, { name: "test List on existing key with resource version set before first write, match=Invalid", @@ -593,14 +604,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { name: "test List on existing key with resource version set to current resource version", prefix: "/pods/first/", pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, + expectedOut: []example.Pod{*preset[0]}, rv: list.ResourceVersion, }, { name: "test List on existing key with resource version set to current resource version, match=Exact", prefix: "/pods/first/", pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, + expectedOut: []example.Pod{*preset[0]}, rv: list.ResourceVersion, rvMatch: metav1.ResourceVersionMatchExact, expectRV: list.ResourceVersion, @@ -609,7 +620,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { name: "test List on existing key with resource version set to current resource version, match=NotOlderThan", prefix: "/pods/first/", pred: storage.Everything, - expectedOut: []*example.Pod{preset[0]}, + expectedOut: []example.Pod{*preset[0]}, rv: list.ResourceVersion, rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, @@ -617,7 +628,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { name: "test List on non-existing key", prefix: "/pods/non-existing/", pred: storage.Everything, - expectedOut: nil, + expectedOut: []example.Pod{}, }, { name: "test List with pod name matching", @@ -626,7 +637,18 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Field: fields.ParseSelectorOrDie("metadata.name!=bar"), }, - expectedOut: nil, + expectedOut: []example.Pod{}, + }, + { + name: "test List with pod name matching with resource version set to current resource version, match=NotOlderThan", + prefix: "/pods/first/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.ParseSelectorOrDie("metadata.name!=bar"), + }, + expectedOut: []example.Pod{}, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, { name: "test List with limit", @@ -636,7 +658,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1]}, + expectedOut: []example.Pod{*preset[1]}, expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), }, @@ -648,7 +670,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1]}, + expectedOut: []example.Pod{*preset[1]}, expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: list.ResourceVersion, @@ -662,13 +684,28 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1]}, + expectedOut: []example.Pod{*preset[1]}, expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: list.ResourceVersion, rvMatch: metav1.ResourceVersionMatchExact, expectRV: list.ResourceVersion, }, + { + name: "test List with limit at current resource version and match=NotOlderThan", + prefix: "/pods/second/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + }, + expectedOut: []example.Pod{*preset[1]}, + expectContinue: true, + expectedRemainingItemCount: utilpointer.Int64Ptr(1), + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectRV: list.ResourceVersion, + }, { name: "test List with limit at resource version 0", prefix: "/pods/second/", @@ -677,7 +714,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1]}, + // TODO(#108003): As of now, watchcache is deliberately ignoring + // limit if RV=0 is specified, returning whole list of objects. + // While this should eventually get fixed, for now we're explicitly + // ignoring this testcase for watchcache. + ignoreForWatchCache: true, + expectedOut: []example.Pod{*preset[1]}, expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", @@ -691,7 +733,12 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[1]}, + // TODO(#108003): As of now, watchcache is deliberately ignoring + // limit if RV=0 is specified, returning whole list of objects. + // While this should eventually get fixed, for now we're explicitly + // ignoring this testcase for watchcache. + ignoreForWatchCache: true, + expectedOut: []example.Pod{*preset[1]}, expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), rv: "0", @@ -706,7 +753,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Field: fields.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{}, + expectedOut: []example.Pod{}, expectContinue: false, rv: initialRV, rvMatch: metav1.ResourceVersionMatchExact, @@ -721,7 +768,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Limit: 1, Continue: secondContinuation, }, - expectedOut: []*example.Pod{preset[2]}, + expectedOut: []example.Pod{*preset[2]}, }, { name: "ignores resource version 0 for List with pregenerated continue token", @@ -733,13 +780,21 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Continue: secondContinuation, }, rv: "0", - expectedOut: []*example.Pod{preset[2]}, + expectedOut: []example.Pod{*preset[2]}, }, { name: "test List with multiple levels of directories and expect flattened result", prefix: "/pods/second/", pred: storage.Everything, - expectedOut: []*example.Pod{preset[1], preset[2]}, + expectedOut: []example.Pod{*preset[1], *preset[2]}, + }, + { + name: "test List with multiple levels of directories and expect flattened result with current resource version and match=NotOlderThan", + prefix: "/pods/second/", + pred: storage.Everything, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[1], *preset[2]}, }, { name: "test List with filter returning only one item, ensure only a single page returned", @@ -749,7 +804,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 1, }, - expectedOut: []*example.Pod{preset[3]}, + expectedOut: []example.Pod{*preset[3]}, + expectContinue: true, + }, + { + name: "test List with filter returning only one item, ensure only a single page returned with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "barfoo"), + Label: labels.Everything(), + Limit: 1, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[3]}, expectContinue: true, }, { @@ -760,7 +828,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 2, }, - expectedOut: []*example.Pod{preset[3]}, + expectedOut: []example.Pod{*preset[3]}, + expectContinue: false, + }, + { + name: "test List with filter returning only one item, covers the entire list with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "barfoo"), + Label: labels.Everything(), + Limit: 2, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[3]}, expectContinue: false, }, { @@ -771,9 +852,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 2, }, - rv: "0", - expectedOut: []*example.Pod{preset[3]}, - expectContinue: false, + rv: "0", + expectedAlternatives: [][]example.Pod{{}, {*preset[3]}}, + expectContinue: false, }, { name: "test List with filter returning two items, more pages possible", @@ -784,7 +865,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Limit: 2, }, expectContinue: true, - expectedOut: []*example.Pod{preset[0], preset[1]}, + expectedOut: []example.Pod{*preset[0], *preset[1]}, + }, + { + name: "test List with filter returning two items, more pages possible with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "bar"), + Label: labels.Everything(), + Limit: 2, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectContinue: true, + expectedOut: []example.Pod{*preset[0], *preset[1]}, }, { name: "filter returns two items split across multiple pages", @@ -794,7 +888,19 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 2, }, - expectedOut: []*example.Pod{preset[2], preset[4]}, + expectedOut: []example.Pod{*preset[2], *preset[4]}, + }, + { + name: "filter returns two items split across multiple pages with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "foo"), + Label: labels.Everything(), + Limit: 2, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[2], *preset[4]}, }, { name: "filter returns one item for last page, ends on last item, not full", @@ -805,7 +911,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Limit: 2, Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)), }, - expectedOut: []*example.Pod{preset[4]}, + expectedOut: []example.Pod{*preset[4]}, }, { name: "filter returns one item for last page, starts on last item, full", @@ -816,7 +922,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Limit: 1, Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)), }, - expectedOut: []*example.Pod{preset[4]}, + expectedOut: []example.Pod{*preset[4]}, }, { name: "filter returns one item for last page, starts on last item, partial page", @@ -827,7 +933,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Limit: 2, Continue: encodeContinueOrDie("third/barfoo", int64(continueRV)), }, - expectedOut: []*example.Pod{preset[4]}, + expectedOut: []example.Pod{*preset[4]}, }, { name: "filter returns two items, page size equal to total list size", @@ -837,7 +943,19 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 5, }, - expectedOut: []*example.Pod{preset[2], preset[4]}, + expectedOut: []example.Pod{*preset[2], *preset[4]}, + }, + { + name: "filter returns two items, page size equal to total list size with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "foo"), + Label: labels.Everything(), + Limit: 5, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[2], *preset[4]}, }, { name: "filter returns one item, page size equal to total list size", @@ -847,12 +965,52 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Label: labels.Everything(), Limit: 5, }, - expectedOut: []*example.Pod{preset[3]}, + expectedOut: []example.Pod{*preset[3]}, + }, + { + name: "filter returns one item, page size equal to total list size with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.SelectionPredicate{ + Field: fields.OneTermEqualSelector("metadata.name", "barfoo"), + Label: labels.Everything(), + Limit: 5, + }, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[3]}, + }, + { + name: "list all items", + prefix: "/pods", + pred: storage.Everything, + expectedOut: []example.Pod{*preset[0], *preset[1], *preset[2], *preset[3], *preset[4]}, + }, + { + name: "list all items with current resource version and match=NotOlderThan", + prefix: "/pods", + pred: storage.Everything, + rv: list.ResourceVersion, + rvMatch: metav1.ResourceVersionMatchNotOlderThan, + expectedOut: []example.Pod{*preset[0], *preset[1], *preset[2], *preset[3], *preset[4]}, }, } for _, tt := range tests { + tt := tt t.Run(tt.name, func(t *testing.T) { + // For some asynchronous implementations of storage interface (in particular watchcache), + // certain requests may impact result of further requests. As an example, if we first + // ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests + // with NotOlderThan semantic), the further requests (even specifying earlier resource + // version) will also return the result synchronized to at least ResourceVersion X. + // By parallelizing test cases we ensure that the order in which test cases are defined + // doesn't automatically preclude some scenarios from happening. + t.Parallel() + + if ignoreWatchCacheTests && tt.ignoreForWatchCache { + t.Skip() + } + if tt.pred.GetAttrs == nil { tt.pred.GetAttrs = getAttrs } @@ -864,9 +1022,9 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { Predicate: tt.pred, Recursive: true, } - err = store.GetList(ctx, tt.prefix, storageOpts, out) + err := store.GetList(ctx, tt.prefix, storageOpts, out) if tt.expectRVTooLarge { - if err == nil || !storage.IsTooLargeResourceVersion(err) { + if err == nil || !apierrors.IsTimeout(err) || !storage.IsTooLargeResourceVersion(err) { t.Fatalf("expecting resource version too high error, but get: %s", err) } return @@ -896,15 +1054,20 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface) { t.Errorf("resourceVersion in list response invalid: %v", err) } } - if len(tt.expectedOut) != len(out.Items) { - t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items)) - } - if diff := cmp.Diff(tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount()); diff != "" { - t.Errorf("incorrect remainingItemCount: %s", diff) - } - for j, wantPod := range tt.expectedOut { - getPod := &out.Items[j] - ExpectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod) + + if tt.expectedAlternatives == nil { + sort.Sort(sortablePodList(tt.expectedOut)) + ExpectNoDiff(t, "incorrect list pods", tt.expectedOut, out.Items) + } else { + toInterfaceSlice := func(podLists [][]example.Pod) []interface{} { + result := make([]interface{}, 0, len(podLists)) + for i := range podLists { + sort.Sort(sortablePodList(podLists[i])) + result = append(result, podLists[i]) + } + return result + } + ExpectContains(t, "incorrect list pods", toInterfaceSlice(tt.expectedAlternatives), out.Items) } }) } @@ -1088,13 +1251,13 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage name: "existing key, resourceVersion=0", key: key, pred: storage.Everything, - expectedAlternatives: [][]example.Pod{nil, {*storedObj}}, + expectedAlternatives: [][]example.Pod{{}, {*storedObj}}, rv: "0", }, { name: "existing key, resourceVersion=0, resourceVersionMatch=notOlderThan", key: key, pred: storage.Everything, - expectedAlternatives: [][]example.Pod{nil, {*storedObj}}, + expectedAlternatives: [][]example.Pod{{}, {*storedObj}}, rv: "0", rvMatch: metav1.ResourceVersionMatchNotOlderThan, }, { @@ -1142,7 +1305,7 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage name: "non-existing key", key: "/non-existing", pred: storage.Everything, - expectedOut: nil, + expectedOut: []example.Pod{}, }, { name: "with matching pod name", key: "/non-existing", @@ -1154,12 +1317,19 @@ func RunTestGetListNonRecursive(ctx context.Context, t *testing.T, store storage return nil, fields.Set{"metadata.name": pod.Name}, nil }, }, - expectedOut: nil, + expectedOut: []example.Pod{}, }} for _, tt := range tests { tt := tt t.Run(tt.name, func(t *testing.T) { + // For some asynchronous implementations of storage interface (in particular watchcache), + // certain requests may impact result of further requests. As an example, if we first + // ensure that watchcache is synchronized up to ResourceVersion X (using Get/List requests + // with NotOlderThan semantic), the further requests (even specifying earlier resource + // version) will also return the result synchronized to at least ResourceVersion X. + // By parallelizing test cases we ensure that the order in which test cases are defined + // doesn't automatically preclude some scenarios from happening. t.Parallel() out := &example.PodList{} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 3532ccea522..7d94355a0fc 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -291,3 +291,17 @@ func (ft *failingTransformer) TransformFromStorage(ctx context.Context, data []b func (ft *failingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { return data, nil } + +type sortablePodList []example.Pod + +func (s sortablePodList) Len() int { + return len(s) +} + +func (s sortablePodList) Less(i, j int) bool { + return computePodKey(&s[i]) < computePodKey(&s[j]) +} + +func (s sortablePodList) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index a1437403da1..b731d2c5326 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -176,6 +176,14 @@ func TestGetListNonRecursive(t *testing.T) { } func TestList(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestList(ctx, t, cacher, true) +} + +// TODO(wojtek-t): We should extend the generic RunTestList test to cover the +// scenarios that are not yet covered by it and get rid of this test. +func TestListDeprecated(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) cacher, _, err := newTestCacher(etcdStorage) @@ -268,42 +276,6 @@ func TestList(t *testing.T) { } } -// TestTooLargeResourceVersionList ensures that a list request for a resource version higher than available -// in the watch cache completes (does not wait indefinitely) and results in a ResourceVersionTooLarge error. -func TestTooLargeResourceVersionList(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - defer server.Terminate(t) - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - podFoo := makeTestPod("foo") - fooCreated := updatePod(t, etcdStorage, podFoo, nil) - - // Set up List at fooCreated.ResourceVersion + 10 - rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - listRV := strconv.Itoa(int(rv + 10)) - - result := &example.PodList{} - options := storage.ListOptions{ - ResourceVersion: listRV, - Predicate: storage.Everything, - Recursive: true, - } - err = cacher.GetList(context.TODO(), "pods/ns", options, result) - if !errors.IsTimeout(err) { - t.Errorf("Unexpected error: %v", err) - } - if !storage.IsTooLargeResourceVersion(err) { - t.Errorf("expected 'Too large resource version' cause in error but got: %v", err) - } -} - func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType, eventObject runtime.Object) { _, _, line, _ := goruntime.Caller(1) select { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go index 75d70a0ad46..130746a411e 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/request/list_work_estimator.go @@ -147,8 +147,11 @@ func key(requestInfo *apirequest.RequestInfo) string { // staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go func shouldListFromStorage(query url.Values, opts *metav1.ListOptions) bool { resourceVersion := opts.ResourceVersion + match := opts.ResourceVersionMatch pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) hasContinuation := pagingEnabled && len(opts.Continue) > 0 hasLimit := pagingEnabled && opts.Limit > 0 && resourceVersion != "0" - return resourceVersion == "" || hasContinuation || hasLimit || opts.ResourceVersionMatch == metav1.ResourceVersionMatchExact + unsupportedMatch := match != "" && match != metav1.ResourceVersionMatchNotOlderThan + + return resourceVersion == "" || hasContinuation || hasLimit || unsupportedMatch } diff --git a/test/integration/apiserver/watchcache_test.go b/test/integration/apiserver/watchcache_test.go index f98343e31d2..b5abfb368eb 100644 --- a/test/integration/apiserver/watchcache_test.go +++ b/test/integration/apiserver/watchcache_test.go @@ -19,6 +19,7 @@ package apiserver import ( "context" "fmt" + "sync" "testing" "time" @@ -164,3 +165,68 @@ func TestWatchCacheUpdatedByEtcd(t *testing.T) { t.Errorf("Events watchcache unexpected synced: %v", err) } } + +func BenchmarkListFromWatchCache(b *testing.B) { + c, _, tearDownFn := framework.StartTestServer(b, framework.TestServerSetup{ + ModifyServerConfig: func(config *controlplane.Config) { + // Switch off endpoints reconciler to avoid unnecessary operations. + config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType + }, + }) + defer tearDownFn() + + namespaces, secretsPerNamespace := 100, 1000 + wg := sync.WaitGroup{} + + errCh := make(chan error, namespaces) + for i := 0; i < namespaces; i++ { + wg.Add(1) + index := i + go func() { + defer wg.Done() + + ctx := context.Background() + ns := &v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("namespace-%d", index)}, + } + ns, err := c.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{}) + if err != nil { + errCh <- err + return + } + + for j := 0; j < secretsPerNamespace; j++ { + secret := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("secret-%d", j), + }, + } + _, err := c.CoreV1().Secrets(ns.Name).Create(ctx, secret, metav1.CreateOptions{}) + if err != nil { + errCh <- err + return + } + } + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + b.Error(err) + } + + b.ResetTimer() + + ctx := context.Background() + opts := metav1.ListOptions{ + ResourceVersion: "0", + } + for i := 0; i < b.N; i++ { + secrets, err := c.CoreV1().Secrets("").List(ctx, opts) + if err != nil { + b.Errorf("failed to list secrets: %v", err) + } + b.Logf("Number of secrets: %d", len(secrets.Items)) + } +}