Merge pull request #70735 from wojtek-t/use_watch-cache_when_rv=0

Use watch cache when rv=0 even when limit is set
This commit is contained in:
k8s-ci-robot 2018-11-13 13:45:13 -08:00 committed by GitHub
commit 1777be5fbe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 3 deletions

View File

@ -50,10 +50,14 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/example/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",

View File

@ -404,10 +404,15 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
if resourceVersion == "" || hasContinuation || hasLimit {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). If a continuation or limit is
// storage (for backward compatibility). If a continuation is
// requested, serve it from the underlying storage as well.
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cacher
import (
"context"
"fmt"
"reflect"
"strconv"
@ -30,9 +31,14 @@ import (
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/diff"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/apis/example"
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
"k8s.io/apiserver/pkg/storage"
)
// verifies the cacheWatcher.process goroutine is properly cleaned up even if
@ -195,7 +201,13 @@ func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) er
return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10))
}
func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error {
return fmt.Errorf("unimplemented")
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10))
listAccessor.SetContinue(continueValue)
return nil
}
func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error {
return fmt.Errorf("unimplemented")
@ -206,3 +218,136 @@ func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) {
return strconv.ParseUint(resourceVersion, 10, 64)
}
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
errDummy = fmt.Errorf("dummy error")
)
func init() {
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(example.AddToScheme(scheme))
utilruntime.Must(examplev1.AddToScheme(scheme))
}
func newTestCacher(s storage.Interface, cap int) (*Cacher, storage.Versioner) {
prefix := "pods"
config := Config{
CacheCapacity: cap,
Storage: s,
Versioner: testVersioner{},
Type: &example.Pod{},
ResourcePrefix: prefix,
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) { return nil, nil, true, nil },
NewListFunc: func() runtime.Object { return &example.PodList{} },
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
}
return NewCacherFromConfig(config), testVersioner{}
}
type dummyStorage struct {
err error
}
type dummyWatch struct {
ch chan watch.Event
}
func (w *dummyWatch) ResultChan() <-chan watch.Event {
return w.ch
}
func (w *dummyWatch) Stop() {
close(w.ch)
}
func newDummyWatch() watch.Interface {
return &dummyWatch{
ch: make(chan watch.Event),
}
}
func (d *dummyStorage) Versioner() storage.Versioner { return nil }
func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, _ uint64) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error {
return d.err
}
func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err
}
func (d *dummyStorage) GuaranteedUpdate(_ context.Context, _ string, _ runtime.Object, _ bool, _ *storage.Preconditions, _ storage.UpdateFunc, _ ...runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
}
func TestListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
}
result := &example.PodList{}
// Wait until cacher is initialized.
cacher.ready.wait()
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.List(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
}
err = cacher.List(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}
func TestGetToListWithLimitAndRV0(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _ := newTestCacher(backingStorage, 0)
defer cacher.Stop()
pred := storage.SelectionPredicate{
Limit: 500,
}
result := &example.PodList{}
// Wait until cacher is initialized.
cacher.ready.wait()
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err := cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
}
err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}