mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-10 04:27:54 +00:00
Pass storage.ListOptions to WaitUntilFreshAndList
This commit is contained in:
parent
30ea0d13cd
commit
e6cf9dd166
@ -709,8 +709,8 @@ func computeListLimit(opts storage.ListOptions) int64 {
|
|||||||
return opts.Predicate.Limit
|
return opts.Predicate.Limit
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred storage.SelectionPredicate, recursive bool) (listResp, string, error) {
|
func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, opts storage.ListOptions) (listResp, string, error) {
|
||||||
if !recursive {
|
if !opts.Recursive {
|
||||||
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
|
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(ctx, listRV, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return listResp{}, "", err
|
return listResp{}, "", err
|
||||||
@ -720,7 +720,7 @@ func (c *Cacher) listItems(ctx context.Context, listRV uint64, key string, pred
|
|||||||
}
|
}
|
||||||
return listResp{ResourceVersion: readResourceVersion}, "", nil
|
return listResp{ResourceVersion: readResourceVersion}, "", nil
|
||||||
}
|
}
|
||||||
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, pred.MatcherIndex(ctx))
|
return c.watchCache.WaitUntilFreshAndList(ctx, listRV, key, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
type listResp struct {
|
type listResp struct {
|
||||||
@ -730,8 +730,6 @@ type listResp struct {
|
|||||||
|
|
||||||
// GetList implements storage.Interface
|
// GetList implements storage.Interface
|
||||||
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
|
func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, listRV uint64) error {
|
||||||
recursive := opts.Recursive
|
|
||||||
pred := opts.Predicate
|
|
||||||
// For recursive lists, we need to make sure the key ended with "/" so that we only
|
// For recursive lists, we need to make sure the key ended with "/" so that we only
|
||||||
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
|
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
|
||||||
// with prefix "/a" will return all three, while with prefix "/a/" will return only
|
// with prefix "/a" will return all three, while with prefix "/a/" will return only
|
||||||
@ -772,7 +770,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, pred, recursive)
|
resp, indexUsed, err := c.listItems(ctx, listRV, preparedKey, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -790,7 +788,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||||
}
|
}
|
||||||
if pred.MatchesObjectAttributes(elem.Labels, elem.Fields) {
|
if opts.Predicate.MatchesObjectAttributes(elem.Labels, elem.Fields) {
|
||||||
selectedObjects = append(selectedObjects, elem.Object)
|
selectedObjects = append(selectedObjects, elem.Object)
|
||||||
lastSelectedObjectKey = elem.Key
|
lastSelectedObjectKey = elem.Key
|
||||||
}
|
}
|
||||||
|
@ -3172,7 +3172,7 @@ func TestListIndexer(t *testing.T) {
|
|||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields)
|
pred := storagetesting.CreatePodPredicate(tt.fieldSelector, true, tt.indexFields)
|
||||||
_, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, pred, tt.recursive)
|
_, usedIndex, err := cacher.cacher.listItems(ctx, 0, "/pods/"+tt.requestedNamespace, storage.ListOptions{Predicate: pred, Recursive: tt.recursive})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error: %v", err)
|
t.Errorf("Unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -495,7 +495,7 @@ func (s sortableStoreElements) Swap(i, j int) {
|
|||||||
|
|
||||||
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
// WaitUntilFreshAndList returns list of pointers to `storeElement` objects along
|
||||||
// with their ResourceVersion and the name of the index, if any, that was used.
|
// with their ResourceVersion and the name of the index, if any, that was used.
|
||||||
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, matchValues []storage.MatchValue) (resp listResp, index string, err error) {
|
func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion uint64, key string, opts storage.ListOptions) (resp listResp, index string, err error) {
|
||||||
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
requestWatchProgressSupported := etcdfeature.DefaultFeatureSupportChecker.Supports(storage.RequestWatchProgress)
|
||||||
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
|
if utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) && requestWatchProgressSupported && w.notFresh(resourceVersion) {
|
||||||
w.waitingUntilFresh.Add()
|
w.waitingUntilFresh.Add()
|
||||||
@ -513,7 +513,7 @@ func (w *watchCache) WaitUntilFreshAndList(ctx context.Context, resourceVersion
|
|||||||
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
|
// requirement here is to NOT miss anything that should be returned. We can return as many non-matching items as we
|
||||||
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
|
// want - they will be filtered out later. The fact that we return less things is only further performance improvement.
|
||||||
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
|
// TODO: if multiple indexes match, return the one with the fewest items, so as to do as much filtering as possible.
|
||||||
for _, matchValue := range matchValues {
|
for _, matchValue := range opts.Predicate.MatcherIndex(ctx) {
|
||||||
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
|
if result, err := w.store.ByIndex(matchValue.IndexName, matchValue.Value); err == nil {
|
||||||
result, err = filterPrefixAndOrder(key, result)
|
result, err = filterPrefixAndOrder(key, result)
|
||||||
return listResp{
|
return listResp{
|
||||||
|
@ -287,7 +287,7 @@ func TestEvents(t *testing.T) {
|
|||||||
|
|
||||||
// Test for Added event.
|
// Test for Added event.
|
||||||
{
|
{
|
||||||
_, err := store.getAllEventsSince(1, storage.ListOptions{})
|
_, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error too old")
|
t.Errorf("expected error too old")
|
||||||
}
|
}
|
||||||
@ -296,7 +296,7 @@ func TestEvents(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
result, err := store.getAllEventsSince(2, storage.ListOptions{})
|
result, err := store.getAllEventsSince(2, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -320,13 +320,13 @@ func TestEvents(t *testing.T) {
|
|||||||
|
|
||||||
// Test with not full cache.
|
// Test with not full cache.
|
||||||
{
|
{
|
||||||
_, err := store.getAllEventsSince(1, storage.ListOptions{})
|
_, err := store.getAllEventsSince(1, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error too old")
|
t.Errorf("expected error too old")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
result, err := store.getAllEventsSince(3, storage.ListOptions{})
|
result, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -354,13 +354,13 @@ func TestEvents(t *testing.T) {
|
|||||||
|
|
||||||
// Test with full cache - there should be elements from 5 to 9.
|
// Test with full cache - there should be elements from 5 to 9.
|
||||||
{
|
{
|
||||||
_, err := store.getAllEventsSince(3, storage.ListOptions{})
|
_, err := store.getAllEventsSince(3, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
t.Errorf("expected error too old")
|
t.Errorf("expected error too old")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
result, err := store.getAllEventsSince(4, storage.ListOptions{})
|
result, err := store.getAllEventsSince(4, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -379,7 +379,7 @@ func TestEvents(t *testing.T) {
|
|||||||
store.Delete(makeTestPod("pod", uint64(10)))
|
store.Delete(makeTestPod("pod", uint64(10)))
|
||||||
|
|
||||||
{
|
{
|
||||||
result, err := store.getAllEventsSince(9, storage.ListOptions{})
|
result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -410,13 +410,13 @@ func TestMarker(t *testing.T) {
|
|||||||
makeTestPod("pod2", 9),
|
makeTestPod("pod2", 9),
|
||||||
}, "9")
|
}, "9")
|
||||||
|
|
||||||
_, err := store.getAllEventsSince(8, storage.ListOptions{})
|
_, err := store.getAllEventsSince(8, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
// Getting events from 8 should return no events,
|
// Getting events from 8 should return no events,
|
||||||
// even though there is a marker there.
|
// even though there is a marker there.
|
||||||
result, err := store.getAllEventsSince(9, storage.ListOptions{})
|
result, err := store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -427,7 +427,7 @@ func TestMarker(t *testing.T) {
|
|||||||
pod := makeTestPod("pods", 12)
|
pod := makeTestPod("pods", 12)
|
||||||
store.Add(pod)
|
store.Add(pod)
|
||||||
// Getting events from 8 should still work and return one event.
|
// Getting events from 8 should still work and return one event.
|
||||||
result, err = store.getAllEventsSince(9, storage.ListOptions{})
|
result, err = store.getAllEventsSince(9, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -466,7 +466,7 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// list by empty MatchValues.
|
// list by empty MatchValues.
|
||||||
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", nil)
|
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -481,11 +481,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// list by label index.
|
// list by label index.
|
||||||
matchValues := []storage.MatchValue{
|
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
|
||||||
{IndexName: "l:label", Value: "value1"},
|
Label: labels.SelectorFromSet(map[string]string{
|
||||||
{IndexName: "f:spec.nodeName", Value: "node2"},
|
"label": "value1",
|
||||||
}
|
}),
|
||||||
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
Field: fields.SelectorFromSet(map[string]string{
|
||||||
|
"spec.nodeName": "node2",
|
||||||
|
}),
|
||||||
|
IndexLabels: []string{"label"},
|
||||||
|
}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -500,11 +504,15 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// list with spec.nodeName index.
|
// list with spec.nodeName index.
|
||||||
matchValues = []storage.MatchValue{
|
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
|
||||||
{IndexName: "l:not-exist-label", Value: "whatever"},
|
Label: labels.SelectorFromSet(map[string]string{
|
||||||
{IndexName: "f:spec.nodeName", Value: "node2"},
|
"not-exist-label": "whatever",
|
||||||
}
|
}),
|
||||||
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
Field: fields.SelectorFromSet(map[string]string{
|
||||||
|
"spec.nodeName": "node2",
|
||||||
|
}),
|
||||||
|
IndexFields: []string{"spec.nodeName"},
|
||||||
|
}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -519,10 +527,13 @@ func TestWaitUntilFreshAndList(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// list with index not exists.
|
// list with index not exists.
|
||||||
matchValues = []storage.MatchValue{
|
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", storage.ListOptions{Predicate: storage.SelectionPredicate{
|
||||||
{IndexName: "l:not-exist-label", Value: "whatever"},
|
Label: labels.SelectorFromSet(map[string]string{
|
||||||
}
|
"not-exist-label": "whatever",
|
||||||
resp, indexUsed, err = store.WaitUntilFreshAndList(ctx, 5, "prefix/", matchValues)
|
}),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
IndexLabels: []string{"label"},
|
||||||
|
}})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -550,7 +561,7 @@ func TestWaitUntilFreshAndListFromCache(t *testing.T) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// list from future revision. Requires watch cache to request bookmark to get it.
|
// list from future revision. Requires watch cache to request bookmark to get it.
|
||||||
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", nil)
|
resp, indexUsed, err := store.WaitUntilFreshAndList(ctx, 3, "prefix/", storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -630,7 +641,7 @@ func TestWaitUntilFreshAndListTimeout(t *testing.T) {
|
|||||||
store.Add(makeTestPod("bar", 4))
|
store.Add(makeTestPod("bar", 4))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", nil)
|
_, _, err := store.WaitUntilFreshAndList(ctx, 4, "", storage.ListOptions{Predicate: storage.Everything})
|
||||||
if !errors.IsTimeout(err) {
|
if !errors.IsTimeout(err) {
|
||||||
t.Errorf("expected timeout error but got: %v", err)
|
t.Errorf("expected timeout error but got: %v", err)
|
||||||
}
|
}
|
||||||
@ -659,7 +670,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
defer store.Stop()
|
defer store.Stop()
|
||||||
|
|
||||||
{
|
{
|
||||||
resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", nil)
|
resp, _, err := store.WaitUntilFreshAndList(ctx, 0, "", storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -682,7 +693,7 @@ func TestReflectorForWatchCache(t *testing.T) {
|
|||||||
r.ListAndWatch(wait.NeverStop)
|
r.ListAndWatch(wait.NeverStop)
|
||||||
|
|
||||||
{
|
{
|
||||||
resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", nil)
|
resp, _, err := store.WaitUntilFreshAndList(ctx, 10, "", storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
@ -994,7 +1005,7 @@ func TestCacheIncreaseDoesNotBreakWatch(t *testing.T) {
|
|||||||
// Force cache resize.
|
// Force cache resize.
|
||||||
addEvent("key4", 50, later.Add(time.Second))
|
addEvent("key4", 50, later.Add(time.Second))
|
||||||
|
|
||||||
_, err := store.getAllEventsSince(15, storage.ListOptions{})
|
_, err := store.getAllEventsSince(15, storage.ListOptions{Predicate: storage.Everything})
|
||||||
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
if err == nil || !strings.Contains(err.Error(), "too old resource version") {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user