diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD index c041b9e81ba..af3a14a8fba 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/BUILD @@ -50,10 +50,14 @@ go_test( "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library", "//staging/src/k8s.io/client-go/tools/cache:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index f5e2c4d0eeb..e3515148c35 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -404,10 +404,15 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob // GetToList implements storage.Interface. func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) - if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) { + hasContinuation := pagingEnabled && len(pred.Continue) > 0 + hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" + if resourceVersion == "" || hasContinuation || hasLimit { // If resourceVersion is not specified, serve it from underlying - // storage (for backward compatibility). If a continuation or limit is + // storage (for backward compatibility). If a continuation is // requested, serve it from the underlying storage as well. + // Limits are only sent to storage when resourceVersion is non-zero + // since the watch cache isn't able to perform continuations, and + // limits are ignored when resource version is zero return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 11e43f4d746..780af4b2d10 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -17,6 +17,7 @@ limitations under the License. package cacher import ( + "context" "fmt" "reflect" "strconv" @@ -30,9 +31,14 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/diff" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + "k8s.io/apiserver/pkg/apis/example" + examplev1 "k8s.io/apiserver/pkg/apis/example/v1" + "k8s.io/apiserver/pkg/storage" ) // verifies the cacheWatcher.process goroutine is properly cleaned up even if @@ -195,7 +201,13 @@ func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) er return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) } func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error { - return fmt.Errorf("unimplemented") + listAccessor, err := meta.ListAccessor(obj) + if err != nil || listAccessor == nil { + return err + } + listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10)) + listAccessor.SetContinue(continueValue) + return nil } func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { return fmt.Errorf("unimplemented") @@ -206,3 +218,136 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) { return strconv.ParseUint(resourceVersion, 10, 64) } + +var ( + scheme = runtime.NewScheme() + codecs = serializer.NewCodecFactory(scheme) + errDummy = fmt.Errorf("dummy error") +) + +func init() { + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + utilruntime.Must(example.AddToScheme(scheme)) + utilruntime.Must(examplev1.AddToScheme(scheme)) +} + +func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) { + prefix := "pods" + config := Config{ + CacheCapacity: cap, + Storage: s, + Versioner: testVersioner{}, + Type: &example.Pod{}, + ResourcePrefix: prefix, + KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, + GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil }, + NewListFunc: func() runtime.Object { return &example.PodList{} }, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + } + return NewCacherFromConfig(config), testVersioner{} +} + +type dummyStorage struct { + err error +} + +type dummyWatch struct { + ch chan watch.Event +} + +func (w *dummyWatch) ResultChan() <-chan watch.Event { + return w.ch +} + +func (w *dummyWatch) Stop() { + close(w.ch) +} + +func newDummyWatch() watch.Interface { + return &dummyWatch{ + ch: make(chan watch.Event), + } +} + +func (d *dummyStorage) Versioner() storage.Versioner { return nil } +func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error { + return fmt.Errorf("unimplemented") +} +func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error { + return fmt.Errorf("unimplemented") +} +func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { + return newDummyWatch(), nil +} +func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { + return newDummyWatch(), nil +} +func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error { + return fmt.Errorf("unimplemented") +} +func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error { + return d.err +} +func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} + return d.err +} +func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error { + return fmt.Errorf("unimplemented") +} +func (d *dummyStorage) Count(_ string) (int64, error) { + return 0, fmt.Errorf("unimplemented") +} + +func TestListWithLimitAndRV0(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 0) + defer cacher.Stop() + + pred := storage.SelectionPredicate{ + Limit: 500, + } + result := &example.PodList{} + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.err = errDummy + err := cacher.List(context.TODO(), "pods/ns", "0", pred, result) + if err != nil { + t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) + } + + err = cacher.List(context.TODO(), "pods/ns", "", pred, result) + if err != errDummy { + t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) + } +} + +func TestGetToListWithLimitAndRV0(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _ := newTestCacher(backingStorage, 0) + defer cacher.Stop() + + pred := storage.SelectionPredicate{ + Limit: 500, + } + result := &example.PodList{} + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.err = errDummy + err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) + if err != nil { + t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) + } + + err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result) + if err != errDummy { + t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) + } +}