Merge pull request #91595 from jpbetz/get-list-storage-options

Introduce GetOptions and ListOptions to storage interface
This commit is contained in:
Kubernetes Prow Robot 2020-06-03 12:18:11 -07:00 committed by GitHub
commit e422e9a3f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 346 additions and 196 deletions

View File

@ -63,7 +63,11 @@ var _ Leases = &storageLeases{}
// ListLeases retrieves a list of the current master IPs from storage
func (s *storageLeases) ListLeases() ([]string, error) {
ipInfoList := &corev1.EndpointsList{}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil {
storageOpts := storage.ListOptions{
ResourceVersion: "0",
Predicate: storage.Everything,
}
if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
return nil, err
}

View File

@ -14,6 +14,7 @@ go_test(
"//pkg/apis/core:go_default_library",
"//pkg/registry/core/service/allocator:go_default_library",
"//pkg/registry/registrytest:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
],

View File

@ -190,7 +190,7 @@ func (e *Etcd) tryUpdate(fn func() error) error {
// etcd. If the key does not exist, the object will have an empty ResourceVersion.
func (e *Etcd) Get() (*api.RangeAllocation, error) {
existing := &api.RangeAllocation{}
if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil {
if err := e.storage.Get(context.TODO(), e.baseKey, storage.GetOptions{IgnoreNotFound: true}, existing); err != nil {
return nil, storeerr.InterpretGetError(err, e.resource, "")
}
return existing, nil

View File

@ -21,6 +21,7 @@ import (
"strings"
"testing"
apiserverstorage "k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend"
api "k8s.io/kubernetes/pkg/apis/core"
@ -80,7 +81,7 @@ func TestStore(t *testing.T) {
other := allocator.NewAllocationMap(100, "rangeSpecValue")
allocation := &api.RangeAllocation{}
if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil {
if err := storage.storage.Get(context.TODO(), key(), apiserverstorage.GetOptions{}, allocation); err != nil {
t.Fatal(err)
}
if allocation.Range != "rangeSpecValue" {

View File

@ -35,7 +35,7 @@ func (s *DryRunnableStorage) Versioner() storage.Versioner {
func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error {
if dryRun {
if err := s.Storage.Get(ctx, key, "", out, false); err == nil {
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil {
return storage.NewKeyExistsError(key, 0)
}
return s.copyInto(obj, out)
@ -45,7 +45,7 @@ func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out ru
func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool) error {
if dryRun {
if err := s.Storage.Get(ctx, key, "", out, false); err != nil {
if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil {
return err
}
if err := preconditions.Check(key, out); err != nil {
@ -56,31 +56,31 @@ func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime
return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation)
}
func (s *DryRunnableStorage) Watch(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) {
return s.Storage.Watch(ctx, key, resourceVersion, p)
func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.Storage.Watch(ctx, key, opts)
}
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) {
return s.Storage.WatchList(ctx, key, resourceVersion, p)
func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.Storage.WatchList(ctx, key, opts)
}
func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return s.Storage.Get(ctx, key, opts, objPtr)
}
func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
return s.Storage.GetToList(ctx, key, resourceVersion, p, listObj)
func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.Storage.GetToList(ctx, key, opts, listObj)
}
func (s *DryRunnableStorage) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
return s.Storage.List(ctx, key, resourceVersion, p, listObj)
func (s *DryRunnableStorage) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.Storage.List(ctx, key, opts, listObj)
}
func (s *DryRunnableStorage) GuaranteedUpdate(
ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, dryRun bool, suggestion ...runtime.Object) error {
if dryRun {
err := s.Storage.Get(ctx, key, "", ptrToType, ignoreNotFound)
err := s.Storage.Get(ctx, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, ptrToType)
if err != nil {
return err
}

View File

@ -69,7 +69,7 @@ func TestDryRunCreateDoesntCreate(t *testing.T) {
t.Fatalf("Failed to create new dry-run object: %v", err)
}
err = s.Get(context.Background(), "key", "", out, false)
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if e, ok := err.(*storage.StorageError); !ok || e.Code != storage.ErrCodeKeyNotFound {
t.Errorf("Expected key to be not found, error: %v", err)
}
@ -185,7 +185,7 @@ func TestDryRunUpdateDoesntUpdate(t *testing.T) {
t.Fatalf("Failed to dry-run update: %v", err)
}
out := UnstructuredOrDie(`{}`)
err = s.Get(context.Background(), "key", "", out, false)
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if !reflect.DeepEqual(created, out) {
t.Fatalf("Returned object %q different from expected %q", created, out)
}
@ -239,7 +239,7 @@ func TestDryRunDeleteDoesntDelete(t *testing.T) {
t.Fatalf("Failed to dry-run delete the object: %v", err)
}
err = s.Get(context.Background(), "key", "", out, false)
err = s.Get(context.Background(), "key", storage.GetOptions{}, out)
if err != nil {
t.Fatalf("Failed to retrieve dry-run deleted object: %v", err)
}

View File

@ -322,15 +322,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate,
p.Continue = options.Continue
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p}
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list)
err := e.Storage.GetToList(ctx, key, storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
// if we cannot extract a key based on the current context, the optimization is skipped
}
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list)
err := e.Storage.List(ctx, e.KeyRootFunc(ctx), storageOpts, list)
return list, storeerr.InterpretListError(err, qualifiedResource)
}
@ -367,7 +368,7 @@ func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation
if !apierrors.IsAlreadyExists(err) {
return nil, err
}
if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil {
if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil {
return nil, err
}
accessor, errGetAcc := meta.Accessor(out)
@ -608,7 +609,7 @@ func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions
if err != nil {
return nil, err
}
if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil {
if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil {
return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name)
}
if e.Decorator != nil {
@ -892,7 +893,7 @@ func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.V
}
obj := e.NewFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
if err = e.Storage.Get(ctx, key, "", obj, false); err != nil {
if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil {
return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name)
}
@ -1126,9 +1127,10 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti
// WatchPredicate starts a watch for the items that matches.
func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) {
storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p}
if name, ok := p.MatchesSingle(); ok {
if key, err := e.KeyFunc(ctx, name); err == nil {
w, err := e.Storage.Watch(ctx, key, resourceVersion, p)
w, err := e.Storage.Watch(ctx, key, storageOpts)
if err != nil {
return nil, err
}
@ -1141,7 +1143,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate
// optimization is skipped
}
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p)
w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts)
if err != nil {
return nil, err
}

View File

@ -431,8 +431,9 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre
}
// Watch implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
watchRV, err := c.versioner.ParseResourceVersion(resourceVersion)
func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
pred := opts.Predicate
watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
@ -515,22 +516,22 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string,
}
// WatchList implements storage.Interface.
func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return c.Watch(ctx, key, resourceVersion, pred)
func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return c.Watch(ctx, key, opts)
}
// Get implements storage.Interface.
func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error {
if resourceVersion == "" {
func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
if opts.ResourceVersion == "" {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
return c.storage.Get(ctx, key, opts, objPtr)
}
// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
getRV, err := c.versioner.ParseResourceVersion(resourceVersion)
getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return err
}
@ -538,7 +539,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
if getRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound)
return c.storage.Get(ctx, key, opts, objPtr)
}
// Do not create a trace - it's not for free and there are tons
@ -563,7 +564,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
objVal.Set(reflect.ValueOf(elem.Object).Elem())
} else {
objVal.Set(reflect.Zero(objVal.Type()))
if !ignoreNotFound {
if !opts.IgnoreNotFound {
return storage.NewKeyNotFoundError(key, int64(readResourceVersion))
}
}
@ -571,7 +572,9 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
}
// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -582,7 +585,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
return c.storage.GetToList(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
@ -596,7 +599,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
return c.storage.GetToList(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -643,7 +646,9 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
}
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
hasContinuation := pagingEnabled && len(pred.Continue) > 0
hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0"
@ -654,7 +659,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
// Limits are only sent to storage when resourceVersion is non-zero
// since the watch cache isn't able to perform continuations, and
// limits are ignored when resource version is zero.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
return c.storage.List(ctx, key, opts, listObj)
}
// If resourceVersion is specified, serve it from cache.
@ -668,7 +673,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
return c.storage.List(ctx, key, opts, listObj)
}
trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
@ -1083,7 +1088,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
Continue: options.Continue,
}
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil {
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil {
return nil, err
}
return list, nil
@ -1091,7 +1096,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object,
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything)
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything})
}
// errWatcher implements watch.Interface to return a single error

View File

@ -305,19 +305,19 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object,
func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) {
func (d *dummyStorage) WatchList(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
return newDummyWatch(), nil
}
func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error {
return fmt.Errorf("unimplemented")
}
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error {
func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error {
return d.err
}
func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error {
func (d *dummyStorage) GetToList(_ context.Context, _ string, _ storage.ListOptions, _ runtime.Object) error {
return d.err
}
func (d *dummyStorage) List(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error {
podList := listObj.(*example.PodList)
podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"}
return d.err
@ -329,7 +329,7 @@ func (d *dummyStorage) Count(_ string) (int64, error) {
return 0, fmt.Errorf("unimplemented")
}
func TestListWithLimitAndRV0(t *testing.T) {
func TestListCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil {
@ -347,18 +347,24 @@ func TestListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err = cacher.List(context.TODO(), "pods/ns", "0", pred, result)
err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: pred,
}, result)
if err != nil {
t.Errorf("List with Limit and RV=0 should be served from cache: %v", err)
}
err = cacher.List(context.TODO(), "pods/ns", "", pred, result)
err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: pred,
}, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}
func TestGetToListWithLimitAndRV0(t *testing.T) {
func TestGetToListCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 0)
if err != nil {
@ -376,17 +382,55 @@ func TestGetToListWithLimitAndRV0(t *testing.T) {
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result)
err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "0",
Predicate: pred,
}, result)
if err != nil {
t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err)
}
err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result)
err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "",
Predicate: pred,
}, result)
if err != errDummy {
t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err)
}
}
func TestGetCacheBypass(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1)
if err != nil {
t.Fatalf("Couldn't create cacher: %v", err)
}
defer cacher.Stop()
result := &example.Pod{}
// Wait until cacher is initialized.
cacher.ready.wait()
// Inject error to underlying layer and check if cacher is not bypassed.
backingStorage.err = errDummy
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "0",
}, result)
if err != nil {
t.Errorf("Get with RV=0 should be served from cache: %v", err)
}
err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{
IgnoreNotFound: true,
ResourceVersion: "",
}, result)
if err != errDummy {
t.Errorf("Get without RV=0 should bypass cacher: %v", err)
}
}
func TestWatcherNotGoingBackInTime(t *testing.T) {
backingStorage := &dummyStorage{}
cacher, _, err := newTestCacher(backingStorage, 1000)
@ -417,7 +461,10 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
totalPods := 100
// Create watcher that will be slowing down reading.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{
ResourceVersion: "999",
Predicate: storage.Everything,
})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -439,7 +486,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) {
}
// Create fast watcher and ensure it will get each object exactly once.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -519,7 +566,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) {
// Wait until cacher is initialized.
cacher.ready.wait()
w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything)
w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -618,7 +665,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) {
return
default:
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred)
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -672,7 +719,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo
pred.AllowWatchBookmarks = allowWatchBookmarks
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "0", pred)
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -779,7 +826,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) {
}
ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "100", pred)
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -847,7 +894,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), time.Second)
w, err := cacher.Watch(ctx, "pods/ns", "999", pred)
w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -921,14 +968,14 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) {
totalPods := 50
// Create watcher that will be blocked.
w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
defer w1.Stop()
// Create fast watcher and ensure it will get all objects.
w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything)
w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -1011,7 +1058,7 @@ func TestCachingDeleteEvents(t *testing.T) {
}
createWatch := func(pred storage.SelectionPredicate) watch.Interface {
w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred)
w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}
@ -1089,7 +1136,7 @@ func testCachingObjects(t *testing.T, watchersCount int) {
watchers := make([]watch.Interface, 0, watchersCount)
for i := 0; i < watchersCount; i++ {
w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything)
w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "1000", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Failed to create watch: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package etcd3
import (
"fmt"
"strconv"
"k8s.io/apimachinery/pkg/api/meta"
@ -45,14 +46,14 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error {
if resourceVersion == 0 {
return fmt.Errorf("illegal resource version from storage: %d", resourceVersion)
}
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
}
versionString := ""
if resourceVersion != 0 {
versionString = strconv.FormatUint(resourceVersion, 10)
}
versionString := strconv.FormatUint(resourceVersion, 10)
listAccessor.SetResourceVersion(versionString)
listAccessor.SetContinue(nextKey)
listAccessor.SetRemainingItemCount(count)

View File

@ -63,7 +63,7 @@ var _ value.Context = authenticatedDataString("")
type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// getOps contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
@ -112,20 +112,21 @@ func (s *store) Versioner() storage.Versioner {
}
// Get implements storage.Interface.Get.
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error {
key = path.Join(s.pathPrefix, key)
startTime := time.Now()
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
callOpts := s.getOps
getResp, err := s.client.KV.Get(ctx, key, callOpts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime)
if err != nil {
return err
}
if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
if err = s.ensureMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}
if len(getResp.Kvs) == 0 {
if ignoreNotFound {
if opts.IgnoreNotFound {
return runtime.SetZeroValue(out)
}
return storage.NewKeyNotFoundError(key, 0)
@ -379,7 +380,9 @@ func (s *store) GuaranteedUpdate(
}
// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := listOpts.ResourceVersion
pred := listOpts.Predicate
trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
@ -510,7 +513,9 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
trace := utiltrace.New("List etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
@ -585,7 +590,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
options = append(options, clientv3.WithRange(rangeEnd))
default:
options = append(options, clientv3.WithPrefix())
}
@ -709,22 +713,22 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
}
// Watch implements storage.Interface.Watch.
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, false)
func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, false)
}
// WatchList implements storage.Interface.WatchList.
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
return s.watch(ctx, key, resourceVersion, pred, true)
func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
return s.watch(ctx, key, opts, true)
}
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(rv)
func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) {
rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion)
if err != nil {
return nil, err
}
key = path.Join(s.pathPrefix, key)
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate)
}
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {

View File

@ -168,7 +168,7 @@ func TestCreateWithTTL(t *testing.T) {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -191,7 +191,19 @@ func TestGet(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
// create an object to test
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
// update the object once to allow get by exact resource version to be tested
updateObj := createdObj.DeepCopy()
updateObj.Annotations = map[string]string{"test-annotation": "1"}
storedObj := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, storedObj, true, nil,
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
ttl := uint64(1)
return updateObj, &ttl, nil
})
if err != nil {
t.Fatalf("Update failed: %v", err)
}
// create an additional object to increment the resource version for pods above the resource version of the foo object
lastUpdatedObj := &example.Pod{}
if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil {
@ -201,6 +213,7 @@ func TestGet(t *testing.T) {
currentRV, _ := strconv.Atoi(storedObj.ResourceVersion)
lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion)
// TODO(jpbetz): Add exact test cases
tests := []struct {
name string
key string
@ -209,28 +222,34 @@ func TestGet(t *testing.T) {
expectRVTooLarge bool
expectedOut *example.Pod
rv string
expectedRV string
}{{ // test get on existing item
name: "get existing",
key: key,
ignoreNotFound: false,
expectNotFoundErr: false,
expectedOut: storedObj,
}, { // test get on existing item with minimum resource version set to 0
}, { // test get on existing item with resource version set to 0
name: "resource version 0",
key: key,
expectedOut: storedObj,
rv: "0",
}, { // test get on existing item with minimum resource version set to current resource version of the object
name: "current object resource version",
}, { // test get on existing item with resource version set to the resource version is was created on
name: "object created resource version",
key: key,
expectedOut: storedObj,
rv: createdObj.ResourceVersion,
}, { // test get on existing item with resource version set to current resource version of the object
name: "current object resource version, match=NotOlderThan",
key: key,
expectedOut: storedObj,
rv: fmt.Sprintf("%d", currentRV),
}, { // test get on existing item with minimum resource version set to latest pod resource version
}, { // test get on existing item with resource version set to latest pod resource version
name: "latest resource version",
key: key,
expectedOut: storedObj,
rv: fmt.Sprintf("%d", lastUpdatedCurrentRV),
}, { // test get on existing item with minimum resource version set too high
}, { // test get on existing item with resource version set too high
name: "too high resource version",
key: key,
expectRVTooLarge: true,
@ -251,7 +270,7 @@ func TestGet(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
out := &example.Pod{}
err := store.Get(ctx, tt.key, tt.rv, out, tt.ignoreNotFound)
err := store.Get(ctx, tt.key, storage.GetOptions{IgnoreNotFound: tt.ignoreNotFound, ResourceVersion: tt.rv}, out)
if tt.expectNotFoundErr {
if err == nil || !storage.IsNotFound(err) {
t.Errorf("expecting not found error, but get: %v", err)
@ -264,11 +283,16 @@ func TestGet(t *testing.T) {
}
return
}
if tt.expectedRV != "" {
if tt.expectedRV != out.ResourceVersion {
t.Errorf("expecting resource version want=%s, got=%s", tt.expectedRV, out.ResourceVersion)
}
}
if err != nil {
t.Fatalf("Get failed: %v", err)
}
if !reflect.DeepEqual(tt.expectedOut, out) {
t.Errorf("pod want=%#v, get=%#v", tt.expectedOut, out)
t.Errorf("pod want=\n%#v\nget=\n%#v", tt.expectedOut, out)
}
})
}
@ -398,7 +422,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests {
out := &example.PodList{}
err := store.GetToList(ctx, tt.key, tt.rv, tt.pred, out)
err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}, out)
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
@ -587,7 +611,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) {
t.Fatalf("Create failed: %v", err)
}
w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -801,12 +825,12 @@ func TestTransformationFailure(t *testing.T) {
// List should fail
var got example.PodList
if err := store.List(ctx, "/", "", storage.Everything, &got); !storage.IsInternalError(err) {
if err := store.List(ctx, "/", storage.ListOptions{Predicate: storage.Everything}, &got); !storage.IsInternalError(err) {
t.Errorf("Unexpected error %v", err)
}
// Get should fail
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) {
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
// GuaranteedUpdate without suggestion should return an error
@ -826,7 +850,7 @@ func TestTransformationFailure(t *testing.T) {
if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) {
if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) {
t.Errorf("Unexpected error: %v", err)
}
}
@ -894,7 +918,7 @@ func TestList(t *testing.T) {
}
list := &example.PodList{}
store.List(ctx, "/two-level", "0", storage.Everything, list)
store.List(ctx, "/two-level", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}, list)
continueRV, _ := strconv.Atoi(list.ResourceVersion)
secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV))
if err != nil {
@ -917,6 +941,7 @@ func TestList(t *testing.T) {
expectedRemainingItemCount *int64
expectError bool
expectRVTooLarge bool
expectRV string
}{
{
name: "rejects invalid resource version",
@ -950,14 +975,14 @@ func TestList(t *testing.T) {
expectedOut: []*example.Pod{preset[0].storedObj},
},
{
name: "test List on existing key with minimum resource version set to 0",
name: "test List on existing key with resource version set to 0",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
rv: "0",
},
{
name: "test List on existing key with minimum resource version set to current resource version",
name: "test List on existing key with resource version set to current resource version",
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
@ -990,6 +1015,34 @@ func TestList(t *testing.T) {
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
},
{
name: "test List with limit at current resource version",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: list.ResourceVersion,
expectRV: list.ResourceVersion,
},
{
name: "test List with limit at resource version 0",
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
expectedRemainingItemCount: utilpointer.Int64Ptr(1),
rv: "0",
expectRV: list.ResourceVersion,
},
{
name: "test List with limit when paging disabled",
disablePaging: true,
@ -1142,46 +1195,58 @@ func TestList(t *testing.T) {
}
for _, tt := range tests {
if tt.pred.GetAttrs == nil {
tt.pred.GetAttrs = getAttrs
}
out := &example.PodList{}
var err error
if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out)
} else {
err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out)
}
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Errorf("(%s): expecting resource version too high error, but get: %s", tt.name, err)
t.Run(tt.name, func(t *testing.T) {
if tt.pred.GetAttrs == nil {
tt.pred.GetAttrs = getAttrs
}
continue
}
if (err != nil) != tt.expectError {
t.Errorf("(%s): List failed: %v", tt.name, err)
}
if err != nil {
continue
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue)
}
if len(tt.expectedOut) != len(out.Items) {
t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items))
continue
}
if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) {
t.Errorf("(%s): remainingItemCount want=%#v, got=%#v", tt.name, e, a)
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod)
out := &example.PodList{}
storageOpts := storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}
var err error
if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out)
} else {
err = store.List(ctx, tt.prefix, storageOpts, out)
}
}
if tt.expectRVTooLarge {
if err == nil || !storage.IsTooLargeResourceVersion(err) {
t.Fatalf("expecting resource version too high error, but get: %s", err)
}
return
}
if err != nil {
if !tt.expectError {
t.Fatalf("List failed: %v", err)
}
return
}
if tt.expectError {
t.Fatalf("expected error but got none")
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("unexpected continue token: %q", out.Continue)
}
// If a client requests an exact resource version, it must be echoed back to them.
if tt.expectRV != "" {
if tt.expectRV != out.ResourceVersion {
t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion)
}
}
if len(tt.expectedOut) != len(out.Items) {
t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items))
}
if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) {
t.Errorf("remainingItemCount want=%#v, got=%#v", e, a)
}
for j, wantPod := range tt.expectedOut {
getPod := &out.Items[j]
if !reflect.DeepEqual(wantPod, getPod) {
t.Errorf("pod want=%#v, got=%#v", wantPod, getPod)
}
}
})
}
}
@ -1249,7 +1314,7 @@ func TestListContinuation(t *testing.T) {
},
}
}
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
@ -1271,7 +1336,7 @@ func TestListContinuation(t *testing.T) {
// no limit, should get two items
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
@ -1293,7 +1358,7 @@ func TestListContinuation(t *testing.T) {
// limit, should get two more pages
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) == 0 {
@ -1314,7 +1379,7 @@ func TestListContinuation(t *testing.T) {
continueFromThirdItem := out.Continue
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
@ -1406,7 +1471,7 @@ func TestListContinuationWithFilter(t *testing.T) {
},
}
}
if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, "")}, out); err != nil {
t.Errorf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
@ -1435,7 +1500,7 @@ func TestListContinuationWithFilter(t *testing.T) {
// but since there is only one item left, that is all we should get with no continueValue
// both read counters should be incremented for the singular calls they make in this case
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, cont)}, out); err != nil {
t.Errorf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
@ -1514,7 +1579,7 @@ func TestListInconsistentContinuation(t *testing.T) {
}
out := &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
@ -1555,7 +1620,7 @@ func TestListInconsistentContinuation(t *testing.T) {
}
// The old continue token should have expired
err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out)
err = store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out)
if err == nil {
t.Fatalf("unexpected no error")
}
@ -1572,7 +1637,7 @@ func TestListInconsistentContinuation(t *testing.T) {
}
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, inconsistentContinueFromSecondItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) == 0 {
@ -1586,7 +1651,7 @@ func TestListInconsistentContinuation(t *testing.T) {
}
continueFromThirdItem := out.Continue
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {

View File

@ -94,7 +94,7 @@ func testWatch(t *testing.T, recursive bool) {
},
}}
for i, tt := range tests {
w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive)
w, err := store.watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred}, recursive)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -131,7 +131,7 @@ func TestDeleteTriggerWatch(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -149,7 +149,7 @@ func TestWatchFromZero(t *testing.T) {
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}})
w, err := store.Watch(ctx, key, "0", storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -167,7 +167,7 @@ func TestWatchFromZero(t *testing.T) {
}
// Make sure when we watch from 0 we receive an ADDED event
w, err = store.Watch(ctx, key, "0", storage.Everything)
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -195,7 +195,7 @@ func TestWatchFromZero(t *testing.T) {
}
// Make sure we can still watch from 0 and receive an ADDED event
w, err = store.Watch(ctx, key, "0", storage.Everything)
w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -209,7 +209,7 @@ func TestWatchFromNoneZero(t *testing.T) {
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -227,7 +227,7 @@ func TestWatchError(t *testing.T) {
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
@ -289,7 +289,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) {
defer cluster.Terminate(t)
key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything)
w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Watch failed: %v", err)
}

View File

@ -175,7 +175,7 @@ type Interface interface {
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will get current object at given key
// and send it in an "ADDED" event, before watch starts.
Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item selected by 'p' are sent down to returned watch.Interface.
@ -184,26 +184,23 @@ type Interface interface {
// (e.g. reconnecting without missing any updates).
// If resource version is "0", this interface will list current objects directory defined by key
// and send them in "ADDED" events, before watch starts.
WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error)
WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error)
// Get unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
// Treats empty responses and nil response nodes exactly like a not found error.
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error
// The returned contents may be delayed according to the semantics of GetOptions.ResourceVersion.
Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error
// GetToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// List unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
// The returned contents may be delayed, but it is guaranteed that they will
// be have at least 'resourceVersion'.
List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error
// The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion.
List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is index conflict.
@ -243,3 +240,26 @@ type Interface interface {
// Count returns number of different entries under the key (generally being path prefix).
Count(key string) (int64, error)
}
// GetOptions provides the options that may be provided for storage get operations.
type GetOptions struct {
// IgnoreNotFound determines what is returned if the requested object is not found. If
// true, a zero object is returned. If false, an error is returned.
IgnoreNotFound bool
// ResourceVersion provides a resource version constraint to apply to the get operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
}
// ListOptions provides the options that may be provided for storage list operations.
type ListOptions struct {
// ResourceVersion provides a resource version constraint to apply to the list operation
// as a "not older than" constraint: the result contains data at least as new as the provided
// ResourceVersion. The newest available data is preferred, but any data not older than this
// ResourceVersion may be served.
ResourceVersion string
// Predicate provides the selection rules for the list operation.
Predicate SelectionPredicate
}

View File

@ -140,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl
}
obj.ResourceVersion = ""
result := &example.Pod{}
if err := s.Get(context.TODO(), key, "", result, false); err != nil {
if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil {
t.Errorf("unexpected error: %v", err)
}
return result
@ -160,14 +160,14 @@ func TestGet(t *testing.T) {
// We pass the ResourceVersion from the above Create() operation.
result := &example.Pod{}
if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil {
if err := cacher.Get(context.TODO(), "pods/ns/foo", storage.GetOptions{IgnoreNotFound: true, ResourceVersion: fooCreated.ResourceVersion}, result); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) {
t.Errorf("Expected: %#v, got: %#v", e, a)
}
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil {
if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion, IgnoreNotFound: true}, result); err != nil {
t.Errorf("Unexpected error: %v", err)
}
emptyPod := example.Pod{}
@ -175,7 +175,7 @@ func TestGet(t *testing.T) {
t.Errorf("Expected: %#v, got: %#v", e, a)
}
if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) {
if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion}, result); !storage.IsNotFound(err) {
t.Errorf("Unexpected error: %v", err)
}
}
@ -219,7 +219,7 @@ func TestGetToList(t *testing.T) {
for i, tt := range tests {
out := &example.PodList{}
err := cacher.GetToList(context.TODO(), tt.key, "", tt.pred, out)
err := cacher.GetToList(context.TODO(), tt.key, storage.ListOptions{Predicate: tt.pred}, out)
if err != nil {
t.Fatalf("GetToList failed: %v", err)
}
@ -275,7 +275,7 @@ func TestList(t *testing.T) {
// We first List directly from etcd by passing empty resourceVersion,
// to get the current etcd resourceVersion.
rvResult := &example.PodList{}
if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil {
if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{Predicate: storage.Everything}, rvResult); err != nil {
t.Errorf("Unexpected error: %v", err)
}
deletedPodRV := rvResult.ListMeta.ResourceVersion
@ -283,7 +283,7 @@ func TestList(t *testing.T) {
result := &example.PodList{}
// We pass the current etcd ResourceVersion received from the above List() operation,
// since there is not easy way to get ResourceVersion of barPod deletion operation.
if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil {
if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: deletedPodRV, Predicate: storage.Everything}, result); err != nil {
t.Errorf("Unexpected error: %v", err)
}
if result.ListMeta.ResourceVersion != deletedPodRV {
@ -345,7 +345,7 @@ func TestTooLargeResourceVersionList(t *testing.T) {
listRV := strconv.Itoa(int(rv + 10))
result := &example.PodList{}
err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result)
err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: listRV, Predicate: storage.Everything}, result)
if !errors.IsTimeout(err) {
t.Errorf("Unexpected error: %v", err)
}
@ -381,12 +381,12 @@ type injectListError struct {
storage.Interface
}
func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error {
func (self *injectListError) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
if self.errors > 0 {
self.errors--
return fmt.Errorf("injected error")
}
return self.Interface.List(ctx, key, resourceVersion, p, listObj)
return self.Interface.List(ctx, key, opts, listObj)
}
func TestWatch(t *testing.T) {
@ -421,7 +421,7 @@ func TestWatch(t *testing.T) {
startVersion := strconv.Itoa(int(initialVersion))
// Set up Watch for object "podFoo".
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -438,7 +438,7 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, watcher, watch.Modified, podFooPrime)
// Check whether we get too-old error via the watch channel
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Expected no direct error, got %v", err)
}
@ -446,7 +446,7 @@ func TestWatch(t *testing.T) {
// Events happens in eventFreshDuration, cache expand without event "Gone".
verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything)
initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -455,7 +455,7 @@ func TestWatch(t *testing.T) {
verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime)
// Now test watch from "now".
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -488,7 +488,7 @@ func TestWatcherTimeout(t *testing.T) {
// Create a number of watchers that will not be reading any result.
nonReadingWatchers := 50
for i := 0; i < nonReadingWatchers; i++ {
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -496,7 +496,7 @@ func TestWatcherTimeout(t *testing.T) {
}
// Create a second watcher that will be reading result.
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -524,7 +524,7 @@ func TestFiltering(t *testing.T) {
// Ensure that the cacher is initialized, before creating any pods,
// so that we are sure that all events will be present in cacher.
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything)
syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -566,7 +566,7 @@ func TestFiltering(t *testing.T) {
return labels.Set(metadata.GetLabels()), nil, nil
},
}
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -599,7 +599,7 @@ func TestStartingResourceVersion(t *testing.T) {
rv += 10
startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -662,7 +662,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
// It should support establishing watches from rv and higher, but not older.
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -672,7 +672,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -686,7 +686,7 @@ func TestEmptyWatchEventCache(t *testing.T) {
}
{
watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything)
watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -716,7 +716,7 @@ func TestRandomWatchDeliver(t *testing.T) {
}
startVersion := strconv.Itoa(int(rv))
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -870,7 +870,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = c.allowWatchBookmark
ctx, _ := context.WithTimeout(context.Background(), c.timeout)
watcher, err := cacher.Watch(ctx, "pods/ns/foo", startVersion, pred)
watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -910,7 +910,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) {
pred := storage.Everything
pred.AllowWatchBookmarks = true
ctx, _ := context.WithTimeout(context.Background(), 3*time.Second)
watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred)
watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}