diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 1e43aaef388..b56a59cf553 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -709,8 +709,8 @@ func computeListLimit(opts storage.ListOptions) int64 { return opts.Predicate.Limit } -func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) { - if !recursive { +func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, opts storage.ListOptions) (listResp, string, error) { + if !opts.Recursive { obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key) if err != nil { return listResp{}, "", err @@ -720,7 +720,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred } return listResp{ResourceVersion: readResourceVersion}, "", nil } - return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx)) + return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, opts) } type listResp struct { @@ -730,8 +730,6 @@ type listResp struct { // GetList implements storage.Interface func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error { - recursive := opts.Recursive - pred := opts.Predicate // For recursive lists, we need to make sure the key ended with "/" so that we only // get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys // with prefix "/a" will return all three, while with prefix "/a/" will return only @@ -772,7 +770,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) } - resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive) + resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, opts) if err != nil { return err } @@ -790,7 +788,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } - if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) { + if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) { selectedObjects = append(selectedObjects, elem.Object) lastSelectedObjectKey = elem.Key } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index fd81aa682f6..4daaf2814d8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -3172,7 +3172,7 @@ func TestListIndexer(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields) - _, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive) + _, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, storage.ListOptions{Predicate: pred, Recursive: tt.recursive}) if err != nil { t.Errorf("Unexpected error: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go index 7db92d7a37a..1cd37a9fbaa 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache.go @@ -495,7 +495,7 @@ func (s sortableStoreElements) Swap(i, j int) { // WaitUntilFreshAndList returns list of pointers to `storeElement` objects along // with their ResourceVersion and the name of the index, if any, that was used. -func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) { +func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) { requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress) if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) { w.waitingUntilFresh.Add() @@ -513,7 +513,7 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion // requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we // want - they will be filtered out later. The fact that we return less things is only further performance improvement. // TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible. - for _, matchValue := range matchValues { + for _, matchValue := range opts.Predicate.MatcherIndex(ctx) { if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil { result, err = filterPrefixAndOrder(key, result) return listResp{ diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go index 623e2c7afd5..5c1db0e4540 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_test.go @@ -287,7 +287,7 @@ func TestEvents(t *testing.T) { // Test for Added event. { - _, err := store.getAllEventsSince(1, storage.ListOptions{}) + _, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } @@ -296,7 +296,7 @@ func TestEvents(t *testing.T) { } } { - result, err := store.getAllEventsSince(2, storage.ListOptions{}) + result, err := store.getAllEventsSince(2, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -320,13 +320,13 @@ func TestEvents(t *testing.T) { // Test with not full cache. { - _, err := store.getAllEventsSince(1, storage.ListOptions{}) + _, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(3, storage.ListOptions{}) + result, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -354,13 +354,13 @@ func TestEvents(t *testing.T) { // Test with full cache - there should be elements from 5 to 9. { - _, err := store.getAllEventsSince(3, storage.ListOptions{}) + _, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything}) if err == nil { t.Errorf("expected error too old") } } { - result, err := store.getAllEventsSince(4, storage.ListOptions{}) + result, err := store.getAllEventsSince(4, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -379,7 +379,7 @@ func TestEvents(t *testing.T) { store.Delete(makeTestPod("pod", uint64(10))) { - result, err := store.getAllEventsSince(9, storage.ListOptions{}) + result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -410,13 +410,13 @@ func TestMarker(t *testing.T) { makeTestPod("pod2", 9), }, "9") - _, err := store.getAllEventsSince(8, storage.ListOptions{}) + _, err := store.getAllEventsSince(8, storage.ListOptions{Predicate: storage.Everything}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) } // Getting events from 8 should return no events, // even though there is a marker there. - result, err := store.getAllEventsSince(9, storage.ListOptions{}) + result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -427,7 +427,7 @@ func TestMarker(t *testing.T) { pod := makeTestPod("pods", 12) store.Add(pod) // Getting events from 8 should still work and return one event. - result, err = store.getAllEventsSince(9, storage.ListOptions{}) + result, err = store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -466,7 +466,7 @@ func TestWaitUntilFreshAndList(t *testing.T) { }() // list by empty MatchValues. - resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil) + resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -481,11 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { } // list by label index. - matchValues := []storage.MatchValue{ - {IndexName: "l:label", Value: "value1"}, - {IndexName: "f:spec.nodeName", Value: "node2"}, - } - resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{ + "label": "value1", + }), + Field: fields.SelectorFromSet(map[string]string{ + "spec.nodeName": "node2", + }), + IndexLabels: []string{"label"}, + }}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -500,11 +504,15 @@ func TestWaitUntilFreshAndList(t *testing.T) { } // list with spec.nodeName index. - matchValues = []storage.MatchValue{ - {IndexName: "l:not-exist-label", Value: "whatever"}, - {IndexName: "f:spec.nodeName", Value: "node2"}, - } - resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{ + "not-exist-label": "whatever", + }), + Field: fields.SelectorFromSet(map[string]string{ + "spec.nodeName": "node2", + }), + IndexFields: []string{"spec.nodeName"}, + }}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -519,10 +527,13 @@ func TestWaitUntilFreshAndList(t *testing.T) { } // list with index not exists. - matchValues = []storage.MatchValue{ - {IndexName: "l:not-exist-label", Value: "whatever"}, - } - resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues) + resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{ + Label: labels.SelectorFromSet(map[string]string{ + "not-exist-label": "whatever", + }), + Field: fields.Everything(), + IndexLabels: []string{"label"}, + }}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -550,7 +561,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) { }() // list from future revision. Requires watch cache to request bookmark to get it. - resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil) + resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -630,7 +641,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) { store.Add(makeTestPod("bar", 4)) }() - _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil) + _, _, err := store.WaitUntilFreshAndList(ctx, 4, "", storage.ListOptions{Predicate: storage.Everything}) if !errors.IsTimeout(err) { t.Errorf("expected timeout error but got: %v", err) } @@ -659,7 +670,7 @@ func TestReflectorForWatchCache(t *testing.T) { defer store.Stop() { - resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil) + resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -682,7 +693,7 @@ func TestReflectorForWatchCache(t *testing.T) { r.ListAndWatch(wait.NeverStop) { - resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil) + resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", storage.ListOptions{Predicate: storage.Everything}) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -994,7 +1005,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) { // Force cache resize. addEvent("key4", 50, later.Add(time.Second)) - _, err := store.getAllEventsSince(15, storage.ListOptions{}) + _, err := store.getAllEventsSince(15, storage.ListOptions{Predicate: storage.Everything}) if err == nil || !strings.Contains(err.Error(), "too old resource version") { t.Errorf("unexpected error: %v", err) }