diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index e6f01384c9c..8377c7c7387 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -825,20 +825,25 @@ func shouldDelegateListOnNotReadyCache(opts storage.ListOptions) bool { return noLabelSelector && noFieldSelector && hasLimit } -func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) ([]interface{}, uint64, string, error) { +func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) { if !recursive { obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key) if err != nil { - return nil, 0, "", err + return listResp{}, "", err } if exists { - return []interface{}{obj}, readResourceVersion, "", nil + return listResp{Items: []interface{}{obj}, ResourceVersion: readResourceVersion}, "", nil } - return nil, readResourceVersion, "", nil + return listResp{ResourceVersion: readResourceVersion}, "", nil } return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx)) } +type listResp struct { + Items []interface{} + ResourceVersion uint64 +} + // GetList implements storage.Interface func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { recursive := opts.Recursive @@ -914,7 +919,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - objs, readResourceVersion, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) + resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) success := "true" fallback := "false" if err != nil { @@ -933,7 +938,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if consistentRead { metrics.ConsistentReadTotal.WithLabelValues(c.resourcePrefix, success, fallback).Add(1) } - span.AddEvent("Listed items from cache", attribute.Int("count", len(objs))) + span.AddEvent("Listed items from cache", attribute.Int("count", len(resp.Items))) // store pointer of eligible objects, // Why not directly put object in the items of listObj? // the elements in ListObject are Struct type, making slice will bring excessive memory consumption. @@ -942,7 +947,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio var lastSelectedObjectKey string var hasMoreListItems bool limit := computeListLimit(opts) - for i, obj := range objs { + for i, obj := range resp.Items { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) @@ -952,7 +957,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio lastSelectedObjectKey = elem.Key } if limit > 0 && int64(len(selectedObjects)) >= limit { - hasMoreListItems = i < len(objs)-1 + hasMoreListItems = i < len(resp.Items)-1 break } } @@ -969,16 +974,16 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio } span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) if c.versioner != nil { - continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(readResourceVersion), int64(len(objs)), hasMoreListItems, opts) + continueValue, remainingItemCount, err := storage.PrepareContinueToken(lastSelectedObjectKey, key, int64(resp.ResourceVersion), int64(len(resp.Items)), hasMoreListItems, opts) if err != nil { return err } - if err = c.versioner.UpdateList(listObj, readResourceVersion, continueValue, remainingItemCount); err != nil { + if err = c.versioner.UpdateList(listObj, resp.ResourceVersion, continueValue, remainingItemCount); err != nil { return err } } - metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(objs), listVal.Len()) + metrics.RecordListCacheMetrics(c.resourcePrefix, indexUsed, len(resp.Items), listVal.Len()) return nil } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 14c22015f4e..961935d3b1e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -3079,7 +3079,7 @@ func TestListIndexer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields) - _, _, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) + _, usedIndex, err := cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index ae1506d033b..dc02c86d0df 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -451,7 +451,7 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. -func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (result []interface{}, rv uint64, index string, err error) { +func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) { requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() @@ -463,32 +463,34 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion defer w.RUnlock() if err != nil { - return result, rv, index, err + return listResp{}, "", err } - var prefixFilteredAndOrdered bool - result, rv, index, prefixFilteredAndOrdered, err = func() ([]interface{}, uint64, string, bool, error) { - // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only - // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we - // want - they will be filtered out later. The fact that we return less things is only further performance improvement. - // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. - for _, matchValue := range matchValues { - if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { - return result, w.resourceVersion, matchValue.IndexName, false, nil - } - } - if store, ok := w.store.(orderedLister); ok { - result, _ := store.ListPrefix(key, "", 0) - return result, w.resourceVersion, "", true, nil - } - return w.store.List(), w.resourceVersion, "", false, nil - }() - if !prefixFilteredAndOrdered { - result, err = filterPrefixAndOrder(key, result) - if err != nil { - return nil, 0, "", err + // This isn't the place where we do "final filtering" - only some "prefiltering" is happening here. So the only + // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we + // want - they will be filtered out later. The fact that we return less things is only further performance improvement. + // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. + for _, matchValue := range matchValues { + if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { + result, err = filterPrefixAndOrder(key, result) + return listResp{ + Items: result, + ResourceVersion: w.resourceVersion, + }, matchValue.IndexName, err } } - return result, w.resourceVersion, index, nil + if store, ok := w.store.(orderedLister); ok { + result, _ := store.ListPrefix(key, "", 0) + return listResp{ + Items: result, + ResourceVersion: w.resourceVersion, + }, "", nil + } + result := w.store.List() + result, err = filterPrefixAndOrder(key, result) + return listResp{ + Items: result, + ResourceVersion: w.resourceVersion, + }, "", err } func filterPrefixAndOrder(prefix string, items []interface{}) ([]interface{}, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 221767e63a8..21e21d04fc3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -462,15 +462,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { }() // list by empty MatchValues. - list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) + resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } - if resourceVersion != 5 { - t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + if resp.ResourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } - if len(list) != 3 { - t.Errorf("unexpected list returned: %#v", list) + if len(resp.Items) != 3 { + t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) @@ -481,15 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { {IndexName: "l:label", Value: "value1"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } - if resourceVersion != 5 { - t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + if resp.ResourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } - if len(list) != 2 { - t.Errorf("unexpected list returned: %#v", list) + if len(resp.Items) != 2 { + t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "l:label" { t.Errorf("Used index %q but expected %q", indexUsed, "l:label") @@ -500,15 +500,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { {IndexName: "l:not-exist-label", Value: "whatever"}, {IndexName: "f:spec.nodeName", Value: "node2"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } - if resourceVersion != 5 { - t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + if resp.ResourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } - if len(list) != 1 { - t.Errorf("unexpected list returned: %#v", list) + if len(resp.Items) != 1 { + t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "f:spec.nodeName" { t.Errorf("Used index %q but expected %q", indexUsed, "f:spec.nodeName") @@ -518,15 +518,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { matchValues = []storage.MatchValue{ {IndexName: "l:not-exist-label", Value: "whatever"}, } - list, resourceVersion, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) if err != nil { t.Fatalf("unexpected error: %v", err) } - if resourceVersion != 5 { - t.Errorf("unexpected resourceVersion: %v, expected: 5", resourceVersion) + if resp.ResourceVersion != 5 { + t.Errorf("unexpected resourceVersion: %v, expected: 5", resp.ResourceVersion) } - if len(list) != 3 { - t.Errorf("unexpected list returned: %#v", list) + if len(resp.Items) != 3 { + t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) @@ -546,15 +546,15 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { }() // list from future revision. Requires watch cache to request bookmark to get it. - list, resourceVersion, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) + resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } - if resourceVersion != 3 { - t.Errorf("unexpected resourceVersion: %v, expected: 6", resourceVersion) + if resp.ResourceVersion != 3 { + t.Errorf("unexpected resourceVersion: %v, expected: 6", resp.ResourceVersion) } - if len(list) != 1 { - t.Errorf("unexpected list returned: %#v", list) + if len(resp.Items) != 1 { + t.Errorf("unexpected list returned: %#v", resp) } if indexUsed != "" { t.Errorf("Used index %q but expected none to be used", indexUsed) @@ -626,7 +626,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 4)) }() - _, _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) + _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } @@ -655,12 +655,12 @@ func TestReflectorForWatchCache(t *testing.T) { defer store.Stop() { - _, version, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) + resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } - if version != 0 { - t.Errorf("unexpected resource version: %d", version) + if resp.ResourceVersion != 0 { + t.Errorf("unexpected resource version: %d", resp.ResourceVersion) } } @@ -678,12 +678,12 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - _, version, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) + resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) if err != nil { t.Fatalf("unexpected error: %v", err) } - if version != 10 { - t.Errorf("unexpected resource version: %d", version) + if resp.ResourceVersion != 10 { + t.Errorf("unexpected resource version: %d", resp.ResourceVersion) } } }