Add GetOptions and ListOptions to storage interface

This commit is contained in:
Joe Betz 2020-05-29 10:48:33 -07:00
parent b618411f1e
commit 4c99949ae6
15 changed files with 346 additions and 196 deletions

View File

@ -63,7 +63,11 @@ var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage // ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) { func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &corev1.EndpointsList{} ipInfoList := &corev1.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil { storageOpts := storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
return nil, err return nil, err
} }

View File

@ -14,6 +14,7 @@ go_test(
"//pkg/apis/core:go_default_library", "//pkg/apis/core:go_default_library",
"//pkg/registry/core/service/allocator:go_default_library", "//pkg/registry/core/service/allocator:go_default_library",
"//pkg/registry/registrytest:go_default_library", "//pkg/registry/registrytest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
], ],

View File

@ -190,7 +190,7 @@ func (e *Etcd) tryUpdate(fn func() error) error {
// etcd. If the key does not exist, the object will have an empty ResourceVersion. // etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) { func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{} existing := &api.RangeAllocation{}
if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil { if err := e.storage.Get(context.TODO(), e.baseKey, storage.GetOptions{IgnoreNotFound: true}, existing); err != nil {
return nil, storeerr.InterpretGetError(err, e.resource, "") return nil, storeerr.InterpretGetError(err, e.resource, "")
} }
return existing, nil return existing, nil

View File

@ -21,6 +21,7 @@ import (
"strings" "strings"
"testing" "testing"
apiserverstorage "k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storage/storagebackend"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
@ -80,7 +81,7 @@ func TestStore(t *testing.T) {
other := allocator.NewAllocationMap(100, "rangeSpecValue") other := allocator.NewAllocationMap(100, "rangeSpecValue")
allocation := &api.RangeAllocation{} allocation := &api.RangeAllocation{}
if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil { if err := storage.storage.Get(context.TODO(), key(), apiserverstorage.GetOptions{}, allocation); err != nil {
t.Fatal(err) t.Fatal(err)
} }
if allocation.Range != "rangeSpecValue" { if allocation.Range != "rangeSpecValue" {

View File

@ -35,7 +35,7 @@ func (s *DryRunnableStorage) Versioner() storage.Versioner {
func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error { func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
if dryRun { if dryRun {
if err := s.Storage.Get(ctx, key, "", out, false); err == nil { if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
return storage.NewKeyExistsError(key, 0) return storage.NewKeyExistsError(key, 0)
} }
return s.copyInto(obj, out) return s.copyInto(obj, out)
@ -45,7 +45,7 @@ func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out ru
func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool) error { func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool) error {
if dryRun { if dryRun {
if err := s.Storage.Get(ctx, key, "", out, false); err != nil { if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err return err
} }
if err := preconditions.Check(key, out); err != nil { if err := preconditions.Check(key, out); err != nil {
@ -56,31 +56,31 @@ func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime
return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation) return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation)
} }
func (s *DryRunnableStorage) Watch(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) { func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.Storage.Watch(ctx, key, resourceVersion, p) return s.Storage.Watch(ctx, key, opts)
} }
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) { func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.Storage.WatchList(ctx, key, resourceVersion, p) return s.Storage.WatchList(ctx, key, opts)
} }
func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) return s.Storage.Get(ctx, key, opts, objPtr)
} }
func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.Storage.GetToList(ctx, key, resourceVersion, p, listObj) return s.Storage.GetToList(ctx, key, opts, listObj)
} }
func (s *DryRunnableStorage) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { func (s *DryRunnableStorage) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.Storage.List(ctx, key, resourceVersion, p, listObj) return s.Storage.List(ctx, key, opts, listObj)
} }
func (s *DryRunnableStorage) GuaranteedUpdate( func (s *DryRunnableStorage) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, dryRun bool, suggestion ...runtime.Object) error { preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, dryRun bool, suggestion ...runtime.Object) error {
if dryRun { if dryRun {
err := s.Storage.Get(ctx, key, "", ptrToType, ignoreNotFound) err := s.Storage.Get(ctx, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, ptrToType)
if err != nil { if err != nil {
return err return err
} }

View File

@ -69,7 +69,7 @@ func TestDryRunCreateDoesntCreate(t *testing.T) {
t.Fatalf("Failed to create new dry-run object: %v", err) t.Fatalf("Failed to create new dry-run object: %v", err)
} }
err = s.Get(context.Background(), "key", "", out, false) err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if e, ok := err.(*storage.StorageError); !ok || e.Code != storage.ErrCodeKeyNotFound { if e, ok := err.(*storage.StorageError); !ok || e.Code != storage.ErrCodeKeyNotFound {
t.Errorf("Expected key to be not found, error: %v", err) t.Errorf("Expected key to be not found, error: %v", err)
} }
@ -185,7 +185,7 @@ func TestDryRunUpdateDoesntUpdate(t *testing.T) {
t.Fatalf("Failed to dry-run update: %v", err) t.Fatalf("Failed to dry-run update: %v", err)
} }
out := UnstructuredOrDie(`{}`) out := UnstructuredOrDie(`{}`)
err = s.Get(context.Background(), "key", "", out, false) err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if !reflect.DeepEqual(created, out) { if !reflect.DeepEqual(created, out) {
t.Fatalf("Returned object %q different from expected %q", created, out) t.Fatalf("Returned object %q different from expected %q", created, out)
} }
@ -239,7 +239,7 @@ func TestDryRunDeleteDoesntDelete(t *testing.T) {
t.Fatalf("Failed to dry-run delete the object: %v", err) t.Fatalf("Failed to dry-run delete the object: %v", err)
} }
err = s.Get(context.Background(), "key", "", out, false) err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if err != nil { if err != nil {
t.Fatalf("Failed to retrieve dry-run deleted object: %v", err) t.Fatalf("Failed to retrieve dry-run deleted object: %v", err)
} }

View File

@ -322,15 +322,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
p.Continue = options.Continue p.Continue = options.Continue
list := e.NewListFunc() list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx) qualifiedResource := e.qualifiedResourceFromContext(ctx)
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p}
if name, ok := p.MatchesSingle(); ok { if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list) err := e.Storage.GetToList(ctx, key, storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource) return list, storeerr.InterpretListError(err, qualifiedResource)
} }
// if we cannot extract a key based on the current context, the optimization is skipped // if we cannot extract a key based on the current context, the optimization is skipped
} }
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list) err := e.Storage.List(ctx, e.KeyRootFunc(ctx), storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource) return list, storeerr.InterpretListError(err, qualifiedResource)
} }
@ -367,7 +368,7 @@ func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return nil, err return nil, err
} }
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil { if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
return nil, err return nil, err
} }
accessor, errGetAcc := meta.Accessor(out) accessor, errGetAcc := meta.Accessor(out)
@ -608,7 +609,7 @@ func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil { if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name) return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
} }
if e.Decorator != nil { if e.Decorator != nil {
@ -892,7 +893,7 @@ func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.V
} }
obj := e.NewFunc() obj := e.NewFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx) qualifiedResource := e.qualifiedResourceFromContext(ctx)
if err = e.Storage.Get(ctx, key, "", obj, false); err != nil { if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil {
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
} }
@ -1126,9 +1127,10 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
// WatchPredicate starts a watch for the items that matches. // WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p}
if name, ok := p.MatchesSingle(); ok { if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil { if key, err := e.KeyFunc(ctx, name); err == nil {
w, err := e.Storage.Watch(ctx, key, resourceVersion, p) w, err := e.Storage.Watch(ctx, key, storageOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -1141,7 +1143,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate
// optimization is skipped // optimization is skipped
} }
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p) w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -431,8 +431,9 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
} }
// Watch implements storage.Interface. // Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion) pred := opts.Predicate
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -515,22 +516,22 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
} }
// WatchList implements storage.Interface. // WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred) return c.Watch(ctx, key, opts)
} }
// Get implements storage.Interface. // Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
if resourceVersion == "" { if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying // If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility). // storage (for backward compatibility).
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) return c.storage.Get(ctx, key, opts, objPtr)
} }
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that // It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion. // fresh as the given resourceVersion.
getRV, err := c.versioner.ParseResourceVersion(resourceVersion) getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil { if err != nil {
return err return err
} }
@ -538,7 +539,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
if getRV == 0 && !c.ready.check() { if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific // If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage. // minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) return c.storage.Get(ctx, key, opts, objPtr)
} }
// Do not create a trace - it's not for free and there are tons // Do not create a trace - it's not for free and there are tons
@ -563,7 +564,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
objVal.Set(reflect.ValueOf(elem.Object).Elem()) objVal.Set(reflect.ValueOf(elem.Object).Elem())
} else { } else {
objVal.Set(reflect.Zero(objVal.Type())) objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound { if !opts.IgnoreNotFound {
return storage.NewKeyNotFoundError(key, int64(readResourceVersion)) return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
} }
} }
@ -571,7 +572,9 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
} }
// GetToList implements storage.Interface. // GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -582,7 +585,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// Limits are only sent to storage when resourceVersion is non-zero // Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and // since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero // limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) return c.storage.GetToList(ctx, key, opts, listObj)
} }
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
@ -596,7 +599,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
if listRV == 0 && !c.ready.check() { if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific // If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage. // minimal resource version, simply forward the request to storage.
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) return c.storage.GetToList(ctx, key, opts, listObj)
} }
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -643,7 +646,9 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
} }
// List implements storage.Interface. // List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -654,7 +659,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// Limits are only sent to storage when resourceVersion is non-zero // Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and // since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero. // limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj) return c.storage.List(ctx, key, opts, listObj)
} }
// If resourceVersion is specified, serve it from cache. // If resourceVersion is specified, serve it from cache.
@ -668,7 +673,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if listRV == 0 && !c.ready.check() { if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific // If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage. // minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, resourceVersion, pred, listObj) return c.storage.List(ctx, key, opts, listObj)
} }
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -1083,7 +1088,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
Continue: options.Continue, Continue: options.Continue,
} }
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil { if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil {
return nil, err return nil, err
} }
return list, nil return list, nil
@ -1091,7 +1096,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface. // Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything) return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
} }
// errWatcher implements watch.Interface to return a single error // errWatcher implements watch.Interface to return a single error

View File

@ -305,19 +305,19 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object,
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc) error { func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc) error {
return fmt.Errorf("unimplemented") return fmt.Errorf("unimplemented")
} }
func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil return newDummyWatch(), nil
} }
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { func (d *dummyStorage) WatchList(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil return newDummyWatch(), nil
} }
func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error { func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error {
return d.err return d.err
} }
func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error { func (d *dummyStorage) GetToList(_ context.Context, _ string, _ storage.ListOptions, _ runtime.Object) error {
return d.err
}
func (d *dummyStorage) List(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList) podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err return d.err
@ -329,7 +329,7 @@ func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented") return 0, fmt.Errorf("unimplemented")
} }
func TestListWithLimitAndRV0(t *testing.T) { func TestListCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 0) cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil { if err != nil {
@ -347,18 +347,24 @@ func TestListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed. // Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy backingStorage.err = errDummy
err = cacher.List(context.TODO(), "pods/ns", "0", pred, result) err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: pred,
}, result)
if err != nil { if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
} }
err = cacher.List(context.TODO(), "pods/ns", "", pred, result) err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: pred,
}, result)
if err != errDummy { if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
} }
} }
func TestGetToListWithLimitAndRV0(t *testing.T) { func TestGetToListCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 0) cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil { if err != nil {
@ -376,17 +382,55 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed. // Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy backingStorage.err = errDummy
err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: pred,
}, result)
if err != nil { if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
} }
err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result) err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: pred,
}, result)
if err != errDummy { if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
} }
} }
func TestGetCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
result := &example.Pod{}
// Wait until cacher is initialized.
cacher.ready.wait()
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "0",
}, result)
if err != nil {
t.Errorf("Get with RV=0 should be served from cache: %v", err)
}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "",
}, result)
if err != errDummy {
t.Errorf("Get without RV=0 should bypass cacher: %v", err)
}
}
func TestWatcherNotGoingBackInTime(t *testing.T) { func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{} backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1000) cacher, _, err := newTestCacher(backingStorage, 1000)
@ -417,7 +461,10 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
totalPods := 100 totalPods := 100
// Create watcher that will be slowing down reading. // Create watcher that will be slowing down reading.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "999",
Predicate: storage.Everything,
})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -439,7 +486,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
} }
// Create fast watcher and ensure it will get each object exactly once. // Create fast watcher and ensure it will get each object exactly once.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -519,7 +566,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
// Wait until cacher is initialized. // Wait until cacher is initialized.
cacher.ready.wait() cacher.ready.wait()
w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything) w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -618,7 +665,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
return return
default: default:
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred) w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -672,7 +719,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
pred.AllowWatchBookmarks = allowWatchBookmarks pred.AllowWatchBookmarks = allowWatchBookmarks
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred) w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -779,7 +826,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
} }
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "100", pred) w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -847,7 +894,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
pred := storage.Everything pred := storage.Everything
pred.AllowWatchBookmarks = true pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), time.Second) ctx, _ := context.WithTimeout(context.Background(), time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "999", pred) w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -921,14 +968,14 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
totalPods := 50 totalPods := 50
// Create watcher that will be blocked. // Create watcher that will be blocked.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
defer w1.Stop() defer w1.Stop()
// Create fast watcher and ensure it will get all objects. // Create fast watcher and ensure it will get all objects.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -1011,7 +1058,7 @@ func TestCachingDeleteEvents(t *testing.T) {
} }
createWatch := func(pred storage.SelectionPredicate) watch.Interface { createWatch := func(pred storage.SelectionPredicate) watch.Interface {
w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred) w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }
@ -1089,7 +1136,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
watchers := make([]watch.Interface, 0, watchersCount) watchers := make([]watch.Interface, 0, watchersCount)
for i := 0; i < watchersCount; i++ { for i := 0; i < watchersCount; i++ {
w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything) w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "1000", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Failed to create watch: %v", err) t.Fatalf("Failed to create watch: %v", err)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd3 package etcd3
import ( import (
"fmt"
"strconv" "strconv"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
@ -45,14 +46,14 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin
// UpdateList implements Versioner // UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error { func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error {
if resourceVersion == 0 {
return fmt.Errorf("illegal resource version from storage: %d", resourceVersion)
}
listAccessor, err := meta.ListAccessor(obj) listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil { if err != nil || listAccessor == nil {
return err return err
} }
versionString := "" versionString := strconv.FormatUint(resourceVersion, 10)
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
listAccessor.SetResourceVersion(versionString) listAccessor.SetResourceVersion(versionString)
listAccessor.SetContinue(nextKey) listAccessor.SetContinue(nextKey)
listAccessor.SetRemainingItemCount(count) listAccessor.SetRemainingItemCount(count)

View File

@ -63,7 +63,7 @@ var _ value.Context = authenticatedDataString("")
type store struct { type store struct {
client *clientv3.Client client *clientv3.Client
// getOpts contains additional options that should be passed // getOps contains additional options that should be passed
// to all Get() calls. // to all Get() calls.
getOps []clientv3.OpOption getOps []clientv3.OpOption
codec runtime.Codec codec runtime.Codec
@ -112,20 +112,21 @@ func (s *store) Versioner() storage.Versioner {
} }
// Get implements storage.Interface.Get. // Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
startTime := time.Now() startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...) callOpts := s.getOps
getResp, err := s.client.KV.Get(ctx, key, callOpts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil { if err != nil {
return err return err
} }
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { if err = s.ensureMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err return err
} }
if len(getResp.Kvs) == 0 { if len(getResp.Kvs) == 0 {
if ignoreNotFound { if opts.IgnoreNotFound {
return runtime.SetZeroValue(out) return runtime.SetZeroValue(out)
} }
return storage.NewKeyNotFoundError(key, 0) return storage.NewKeyNotFoundError(key, 0)
@ -379,7 +380,9 @@ func (s *store) GuaranteedUpdate(
} }
// GetToList implements storage.Interface.GetToList. // GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := listOpts.ResourceVersion
pred := listOpts.Predicate
trace := utiltrace.New("GetToList etcd3", trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key}, utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersion", resourceVersion},
@ -510,7 +513,9 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
} }
// List implements storage.Interface.List. // List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
trace := utiltrace.New("List etcd3", trace := utiltrace.New("List etcd3",
utiltrace.Field{"key", key}, utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion}, utiltrace.Field{"resourceVersion", resourceVersion},
@ -585,7 +590,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd)) options = append(options, clientv3.WithRange(rangeEnd))
default: default:
options = append(options, clientv3.WithPrefix()) options = append(options, clientv3.WithPrefix())
} }
@ -709,22 +713,22 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
} }
// Watch implements storage.Interface.Watch. // Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, false) return s.watch(ctx, key, opts, false)
} }
// WatchList implements storage.Interface.WatchList. // WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, true) return s.watch(ctx, key, opts, true)
} }
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) { func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(rv) rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil { if err != nil {
return nil, err return nil, err
} }
key = path.Join(s.pathPrefix, key) key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred) return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
} }
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -168,7 +168,7 @@ func TestCreateWithTTL(t *testing.T) {
t.Fatalf("Create failed: %v", err) t.Fatalf("Create failed: %v", err)
} }
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -191,7 +191,19 @@ func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
// create an object to test // create an object to test
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// update the object once to allow get by exact resource version to be tested
updateObj := createdObj.DeepCopy()
updateObj.Annotations = map[string]string{"test-annotation": "1"}
storedObj := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, storedObj, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1)
return updateObj, &ttl, nil
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
// create an additional object to increment the resource version for pods above the resource version of the foo object // create an additional object to increment the resource version for pods above the resource version of the foo object
lastUpdatedObj := &example.Pod{} lastUpdatedObj := &example.Pod{}
if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil { if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil {
@ -201,6 +213,7 @@ func TestGet(t *testing.T) {
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion) lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion)
// TODO(jpbetz): Add exact test cases
tests := []struct { tests := []struct {
name string name string
key string key string
@ -209,28 +222,34 @@ func TestGet(t *testing.T) {
expectRVTooLarge bool expectRVTooLarge bool
expectedOut *example.Pod expectedOut *example.Pod
rv string rv string
expectedRV string
}{{ // test get on existing item }{{ // test get on existing item
name: "get existing", name: "get existing",
key: key, key: key,
ignoreNotFound: false, ignoreNotFound: false,
expectNotFoundErr: false, expectNotFoundErr: false,
expectedOut: storedObj, expectedOut: storedObj,
}, { // test get on existing item with minimum resource version set to 0 }, { // test get on existing item with resource version set to 0
name: "resource version 0", name: "resource version 0",
key: key, key: key,
expectedOut: storedObj, expectedOut: storedObj,
rv: "0", rv: "0",
}, { // test get on existing item with minimum resource version set to current resource version of the object }, { // test get on existing item with resource version set to the resource version is was created on
name: "current object resource version", name: "object created resource version",
key: key,
expectedOut: storedObj,
rv: createdObj.ResourceVersion,
}, { // test get on existing item with resource version set to current resource version of the object
name: "current object resource version, match=NotOlderThan",
key: key, key: key,
expectedOut: storedObj, expectedOut: storedObj,
rv: fmt.Sprintf("%d", currentRV), rv: fmt.Sprintf("%d", currentRV),
}, { // test get on existing item with minimum resource version set to latest pod resource version }, { // test get on existing item with resource version set to latest pod resource version
name: "latest resource version", name: "latest resource version",
key: key, key: key,
expectedOut: storedObj, expectedOut: storedObj,
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV), rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
}, { // test get on existing item with minimum resource version set too high }, { // test get on existing item with resource version set too high
name: "too high resource version", name: "too high resource version",
key: key, key: key,
expectRVTooLarge: true, expectRVTooLarge: true,
@ -251,7 +270,7 @@ func TestGet(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
out := &example.Pod{} out := &example.Pod{}
err := store.Get(ctx, tt.key, tt.rv, out, tt.ignoreNotFound) err := store.Get(ctx, tt.key, storage.GetOptions{IgnoreNotFound: tt.ignoreNotFound, ResourceVersion: tt.rv}, out)
if tt.expectNotFoundErr { if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) { if err == nil || !storage.IsNotFound(err) {
t.Errorf("expecting not found error, but get: %v", err) t.Errorf("expecting not found error, but get: %v", err)
@ -264,11 +283,16 @@ func TestGet(t *testing.T) {
} }
return return
} }
if tt.expectedRV != "" {
if tt.expectedRV != out.ResourceVersion {
t.Errorf("expecting resource version want=%s, got=%s", tt.expectedRV, out.ResourceVersion)
}
}
if err != nil { if err != nil {
t.Fatalf("Get failed: %v", err) t.Fatalf("Get failed: %v", err)
} }
if !reflect.DeepEqual(tt.expectedOut, out) { if !reflect.DeepEqual(tt.expectedOut, out) {
t.Errorf("pod want=%#v, get=%#v", tt.expectedOut, out) t.Errorf("pod want=\n%#v\nget=\n%#v", tt.expectedOut, out)
} }
}) })
} }
@ -398,7 +422,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
out := &example.PodList{} out := &example.PodList{}
err := store.GetToList(ctx, tt.key, tt.rv, tt.pred, out) err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}, out)
if tt.expectRVTooLarge { if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) { if err == nil || !storage.IsTooLargeResourceVersion(err) {
@ -587,7 +611,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
t.Fatalf("Create failed: %v", err) t.Fatalf("Create failed: %v", err)
} }
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -801,12 +825,12 @@ func TestTransformationFailure(t *testing.T) {
// List should fail // List should fail
var got example.PodList var got example.PodList
if err := store.List(ctx, "/", "", storage.Everything, &got); !storage.IsInternalError(err) { if err := store.List(ctx, "/", storage.ListOptions{Predicate: storage.Everything}, &got); !storage.IsInternalError(err) {
t.Errorf("Unexpected error %v", err) t.Errorf("Unexpected error %v", err)
} }
// Get should fail // Get should fail
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
// GuaranteedUpdate without suggestion should return an error // GuaranteedUpdate without suggestion should return an error
@ -826,7 +850,7 @@ func TestTransformationFailure(t *testing.T) {
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc); !storage.IsInternalError(err) { if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
} }
@ -894,7 +918,7 @@ func TestList(t *testing.T) {
} }
list := &example.PodList{} list := &example.PodList{}
store.List(ctx, "/two-level", "0", storage.Everything, list) store.List(ctx, "/two-level", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}, list)
continueRV, _ := strconv.Atoi(list.ResourceVersion) continueRV, _ := strconv.Atoi(list.ResourceVersion)
secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV)) secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV))
if err != nil { if err != nil {
@ -917,6 +941,7 @@ func TestList(t *testing.T) {
expectedRemainingItemCount *int64 expectedRemainingItemCount *int64
expectError bool expectError bool
expectRVTooLarge bool expectRVTooLarge bool
expectRV string
}{ }{
{ {
name: "rejects invalid resource version", name: "rejects invalid resource version",
@ -950,14 +975,14 @@ func TestList(t *testing.T) {
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
}, },
{ {
name: "test List on existing key with minimum resource version set to 0", name: "test List on existing key with resource version set to 0",
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
rv: "0", rv: "0",
}, },
{ {
name: "test List on existing key with minimum resource version set to current resource version", name: "test List on existing key with resource version set to current resource version",
prefix: "/one-level/", prefix: "/one-level/",
pred: storage.Everything, pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj}, expectedOut: []*example.Pod{preset[0].storedObj},
@ -990,6 +1015,34 @@ func TestList(t *testing.T) {
expectContinue: true, expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1), expectedRemainingItemCount: utilpointer.Int64Ptr(1),
}, },
{
name: "test List with limit at current resource version",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: list.ResourceVersion,
expectRV: list.ResourceVersion,
},
{
name: "test List with limit at resource version 0",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: "0",
expectRV: list.ResourceVersion,
},
{ {
name: "test List with limit when paging disabled", name: "test List with limit when paging disabled",
disablePaging: true, disablePaging: true,
@ -1142,46 +1195,58 @@ func TestList(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
if tt.pred.GetAttrs == nil { t.Run(tt.name, func(t *testing.T) {
tt.pred.GetAttrs = getAttrs if tt.pred.GetAttrs == nil {
} tt.pred.GetAttrs = getAttrs
out := &example.PodList{}
var err error
if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out)
} else {
err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out)
}
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Errorf("(%s): expecting resource version too high error, but get: %s", tt.name, err)
} }
continue
}
if (err != nil) != tt.expectError { out := &example.PodList{}
t.Errorf("(%s): List failed: %v", tt.name, err) storageOpts := storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}
} var err error
if err != nil { if tt.disablePaging {
continue err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out)
} } else {
if (len(out.Continue) > 0) != tt.expectContinue { err = store.List(ctx, tt.prefix, storageOpts, out)
t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue)
}
if len(tt.expectedOut) != len(out.Items) {
t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items))
continue
}
if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) {
t.Errorf("(%s): remainingItemCount want=%#v, got=%#v", tt.name, e, a)
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod)
} }
} if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Fatalf("expecting resource version too high error, but get: %s", err)
}
return
}
if err != nil {
if !tt.expectError {
t.Fatalf("List failed: %v", err)
}
return
}
if tt.expectError {
t.Fatalf("expected error but got none")
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("unexpected continue token: %q", out.Continue)
}
// If a client requests an exact resource version, it must be echoed back to them.
if tt.expectRV != "" {
if tt.expectRV != out.ResourceVersion {
t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion)
}
}
if len(tt.expectedOut) != len(out.Items) {
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
}
if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) {
t.Errorf("remainingItemCount want=%#v, got=%#v", e, a)
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("pod want=%#v, got=%#v", wantPod, getPod)
}
}
})
} }
} }
@ -1249,7 +1314,7 @@ func TestListContinuation(t *testing.T) {
}, },
} }
} }
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err) t.Fatalf("Unable to get initial list: %v", err)
} }
if len(out.Continue) == 0 { if len(out.Continue) == 0 {
@ -1271,7 +1336,7 @@ func TestListContinuation(t *testing.T) {
// no limit, should get two items // no limit, should get two items
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err) t.Fatalf("Unable to get second page: %v", err)
} }
if len(out.Continue) != 0 { if len(out.Continue) != 0 {
@ -1293,7 +1358,7 @@ func TestListContinuation(t *testing.T) {
// limit, should get two more pages // limit, should get two more pages
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err) t.Fatalf("Unable to get second page: %v", err)
} }
if len(out.Continue) == 0 { if len(out.Continue) == 0 {
@ -1314,7 +1379,7 @@ func TestListContinuation(t *testing.T) {
continueFromThirdItem := out.Continue continueFromThirdItem := out.Continue
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err) t.Fatalf("Unable to get second page: %v", err)
} }
if len(out.Continue) != 0 { if len(out.Continue) != 0 {
@ -1406,7 +1471,7 @@ func TestListContinuationWithFilter(t *testing.T) {
}, },
} }
} }
if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, "")}, out); err != nil {
t.Errorf("Unable to get initial list: %v", err) t.Errorf("Unable to get initial list: %v", err)
} }
if len(out.Continue) == 0 { if len(out.Continue) == 0 {
@ -1435,7 +1500,7 @@ func TestListContinuationWithFilter(t *testing.T) {
// but since there is only one item left, that is all we should get with no continueValue // but since there is only one item left, that is all we should get with no continueValue
// both read counters should be incremented for the singular calls they make in this case // both read counters should be incremented for the singular calls they make in this case
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, cont)}, out); err != nil {
t.Errorf("Unable to get second page: %v", err) t.Errorf("Unable to get second page: %v", err)
} }
if len(out.Continue) != 0 { if len(out.Continue) != 0 {
@ -1514,7 +1579,7 @@ func TestListInconsistentContinuation(t *testing.T) {
} }
out := &example.PodList{} out := &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err) t.Fatalf("Unable to get initial list: %v", err)
} }
if len(out.Continue) == 0 { if len(out.Continue) == 0 {
@ -1555,7 +1620,7 @@ func TestListInconsistentContinuation(t *testing.T) {
} }
// The old continue token should have expired // The old continue token should have expired
err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out) err = store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out)
if err == nil { if err == nil {
t.Fatalf("unexpected no error") t.Fatalf("unexpected no error")
} }
@ -1572,7 +1637,7 @@ func TestListInconsistentContinuation(t *testing.T) {
} }
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, inconsistentContinueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err) t.Fatalf("Unable to get second page: %v", err)
} }
if len(out.Continue) == 0 { if len(out.Continue) == 0 {
@ -1586,7 +1651,7 @@ func TestListInconsistentContinuation(t *testing.T) {
} }
continueFromThirdItem := out.Continue continueFromThirdItem := out.Continue
out = &example.PodList{} out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err) t.Fatalf("Unable to get second page: %v", err)
} }
if len(out.Continue) != 0 { if len(out.Continue) != 0 {

View File

@ -94,7 +94,7 @@ func testWatch(t *testing.T, recursive bool) {
}, },
}} }}
for i, tt := range tests { for i, tt := range tests {
w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive) w, err := store.watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred}, recursive)
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -131,7 +131,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, cluster := testSetup(t) ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -149,7 +149,7 @@ func TestWatchFromZero(t *testing.T) {
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, "0", storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -167,7 +167,7 @@ func TestWatchFromZero(t *testing.T) {
} }
// Make sure when we watch from 0 we receive an ADDED event // Make sure when we watch from 0 we receive an ADDED event
w, err = store.Watch(ctx, key, "0", storage.Everything) w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -195,7 +195,7 @@ func TestWatchFromZero(t *testing.T) {
} }
// Make sure we can still watch from 0 and receive an ADDED event // Make sure we can still watch from 0 and receive an ADDED event
w, err = store.Watch(ctx, key, "0", storage.Everything) w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -209,7 +209,7 @@ func TestWatchFromNoneZero(t *testing.T) {
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -227,7 +227,7 @@ func TestWatchError(t *testing.T) {
defer cluster.Terminate(t) defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background() ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }
@ -289,7 +289,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
defer cluster.Terminate(t) defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Watch failed: %v", err) t.Fatalf("Watch failed: %v", err)
} }

View File

@ -175,7 +175,7 @@ type Interface interface {
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key // If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts. // and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API // WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface. // objects and any item selected by 'p' are sent down to returned watch.Interface.
@ -184,26 +184,23 @@ type Interface interface {
// (e.g. reconnecting without missing any updates). // (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key // If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts. // and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either // Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound. // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error. // Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will // The returned contents may be delayed according to the semantics of GetOptions.ResourceVersion.
// be have at least 'resourceVersion'. Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
// GetToList unmarshals json found at key and opaque it into *List api object // GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition). // (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will // The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
// be have at least 'resourceVersion'. GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them // List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition). // into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will // The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
// be have at least 'resourceVersion'. List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict. // retrying the update until success if there is index conflict.
@ -243,3 +240,26 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix). // Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error) Count(key string) (int64, error)
} }
// GetOptions provides the options that may be provided for storage get operations.
type GetOptions struct {
// IgnoreNotFound determines what is returned if the requested object is not found. If
// true, a zero object is returned. If false, an error is returned.
IgnoreNotFound bool
// ResourceVersion provides a resource version constraint to apply to the get operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
}
// ListOptions provides the options that may be provided for storage list operations.
type ListOptions struct {
// ResourceVersion provides a resource version constraint to apply to the list operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
}

View File

@ -140,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
} }
obj.ResourceVersion = "" obj.ResourceVersion = ""
result := &example.Pod{} result := &example.Pod{}
if err := s.Get(context.TODO(), key, "", result, false); err != nil { if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
return result return result
@ -160,14 +160,14 @@ func TestGet(t *testing.T) {
// We pass the ResourceVersion from the above Create() operation. // We pass the ResourceVersion from the above Create() operation.
result := &example.Pod{} result := &example.Pod{}
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil { if err := cacher.Get(context.TODO(), "pods/ns/foo", storage.GetOptions{IgnoreNotFound: true, ResourceVersion: fooCreated.ResourceVersion}, result); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) { if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a) t.Errorf("Expected: %#v, got: %#v", e, a)
} }
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil { if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion, IgnoreNotFound: true}, result); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
emptyPod := example.Pod{} emptyPod := example.Pod{}
@ -175,7 +175,7 @@ func TestGet(t *testing.T) {
t.Errorf("Expected: %#v, got: %#v", e, a) t.Errorf("Expected: %#v, got: %#v", e, a)
} }
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) { if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion}, result); !storage.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
} }
@ -219,7 +219,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests { for i, tt := range tests {
out := &example.PodList{} out := &example.PodList{}
err := cacher.GetToList(context.TODO(), tt.key, "", tt.pred, out) err := cacher.GetToList(context.TODO(), tt.key, storage.ListOptions{Predicate: tt.pred}, out)
if err != nil { if err != nil {
t.Fatalf("GetToList failed: %v", err) t.Fatalf("GetToList failed: %v", err)
} }
@ -275,7 +275,7 @@ func TestList(t *testing.T) {
// We first List directly from etcd by passing empty resourceVersion, // We first List directly from etcd by passing empty resourceVersion,
// to get the current etcd resourceVersion. // to get the current etcd resourceVersion.
rvResult := &example.PodList{} rvResult := &example.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil { if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{Predicate: storage.Everything}, rvResult); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
deletedPodRV := rvResult.ListMeta.ResourceVersion deletedPodRV := rvResult.ListMeta.ResourceVersion
@ -283,7 +283,7 @@ func TestList(t *testing.T) {
result := &example.PodList{} result := &example.PodList{}
// We pass the current etcd ResourceVersion received from the above List() operation, // We pass the current etcd ResourceVersion received from the above List() operation,
// since there is not easy way to get ResourceVersion of barPod deletion operation. // since there is not easy way to get ResourceVersion of barPod deletion operation.
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil { if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: deletedPodRV, Predicate: storage.Everything}, result); err != nil {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
if result.ListMeta.ResourceVersion != deletedPodRV { if result.ListMeta.ResourceVersion != deletedPodRV {
@ -345,7 +345,7 @@ func TestTooLargeResourceVersionList(t *testing.T) {
listRV := strconv.Itoa(int(rv + 10)) listRV := strconv.Itoa(int(rv + 10))
result := &example.PodList{} result := &example.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result) err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: listRV, Predicate: storage.Everything}, result)
if !errors.IsTimeout(err) { if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err) t.Errorf("Unexpected error: %v", err)
} }
@ -381,12 +381,12 @@ type injectListError struct {
storage.Interface storage.Interface
} }
func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { func (self *injectListError) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if self.errors > 0 { if self.errors > 0 {
self.errors-- self.errors--
return fmt.Errorf("injected error") return fmt.Errorf("injected error")
} }
return self.Interface.List(ctx, key, resourceVersion, p, listObj) return self.Interface.List(ctx, key, opts, listObj)
} }
func TestWatch(t *testing.T) { func TestWatch(t *testing.T) {
@ -421,7 +421,7 @@ func TestWatch(t *testing.T) {
startVersion := strconv.Itoa(int(initialVersion)) startVersion := strconv.Itoa(int(initialVersion))
// Set up Watch for object "podFoo". // Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -438,7 +438,7 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
// Check whether we get too-old error via the watch channel // Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Expected no direct error, got %v", err) t.Fatalf("Expected no direct error, got %v", err)
} }
@ -446,7 +446,7 @@ func TestWatch(t *testing.T) {
// Events happens in eventFreshDuration, cache expand without event "Gone". // Events happens in eventFreshDuration, cache expand without event "Gone".
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo) verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -455,7 +455,7 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now". // Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything) nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -488,7 +488,7 @@ func TestWatcherTimeout(t *testing.T) {
// Create a number of watchers that will not be reading any result. // Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50 nonReadingWatchers := 50
for i := 0; i < nonReadingWatchers; i++ { for i := 0; i < nonReadingWatchers; i++ {
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -496,7 +496,7 @@ func TestWatcherTimeout(t *testing.T) {
} }
// Create a second watcher that will be reading result. // Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -524,7 +524,7 @@ func TestFiltering(t *testing.T) {
// Ensure that the cacher is initialized, before creating any pods, // Ensure that the cacher is initialized, before creating any pods,
// so that we are sure that all events will be present in cacher. // so that we are sure that all events will be present in cacher.
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything) syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -566,7 +566,7 @@ func TestFiltering(t *testing.T) {
return labels.Set(metadata.GetLabels()), nil, nil return labels.Set(metadata.GetLabels()), nil, nil
}, },
} }
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -599,7 +599,7 @@ func TestStartingResourceVersion(t *testing.T) {
rv += 10 rv += 10
startVersion := strconv.Itoa(int(rv)) startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -662,7 +662,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
// It should support establishing watches from rv and higher, but not older. // It should support establishing watches from rv and higher, but not older.
{ {
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -672,7 +672,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
} }
{ {
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -686,7 +686,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
} }
{ {
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything) watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -716,7 +716,7 @@ func TestRandomWatchDeliver(t *testing.T) {
} }
startVersion := strconv.Itoa(int(rv)) startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -870,7 +870,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
pred := storage.Everything pred := storage.Everything
pred.AllowWatchBookmarks = c.allowWatchBookmark pred.AllowWatchBookmarks = c.allowWatchBookmark
ctx, _ := context.WithTimeout(context.Background(), c.timeout) ctx, _ := context.WithTimeout(context.Background(), c.timeout)
watcher, err := cacher.Watch(ctx, "pods/ns/foo", startVersion, pred) watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@ -910,7 +910,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
pred := storage.Everything pred := storage.Everything
pred.AllowWatchBookmarks = true pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred) watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }