diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index 9fa26a52738..cdfccb2df05 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -63,7 +63,11 @@ var _ Leases = &storageLeases{} // ListLeases retrieves a list of the current master IPs from storage func (s *storageLeases) ListLeases() ([]string, error) { ipInfoList := &corev1.EndpointsList{} - if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, "0", storage.Everything, ipInfoList); err != nil { + storageOpts := storage.ListOptions{ + ResourceVersion: "0", + Predicate: storage.Everything, + } + if err := s.storage.List(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil { return nil, err } diff --git a/pkg/registry/core/service/allocator/storage/BUILD b/pkg/registry/core/service/allocator/storage/BUILD index 49eadbe9ad1..242669b6960 100644 --- a/pkg/registry/core/service/allocator/storage/BUILD +++ b/pkg/registry/core/service/allocator/storage/BUILD @@ -14,6 +14,7 @@ go_test( "//pkg/apis/core:go_default_library", "//pkg/registry/core/service/allocator:go_default_library", "//pkg/registry/registrytest:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/etcd3/testing:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", ], diff --git a/pkg/registry/core/service/allocator/storage/storage.go b/pkg/registry/core/service/allocator/storage/storage.go index be93bbee34d..ed9cbfdb8c0 100644 --- a/pkg/registry/core/service/allocator/storage/storage.go +++ b/pkg/registry/core/service/allocator/storage/storage.go @@ -190,7 +190,7 @@ func (e *Etcd) tryUpdate(fn func() error) error { // etcd. If the key does not exist, the object will have an empty ResourceVersion. func (e *Etcd) Get() (*api.RangeAllocation, error) { existing := &api.RangeAllocation{} - if err := e.storage.Get(context.TODO(), e.baseKey, "", existing, true); err != nil { + if err := e.storage.Get(context.TODO(), e.baseKey, storage.GetOptions{IgnoreNotFound: true}, existing); err != nil { return nil, storeerr.InterpretGetError(err, e.resource, "") } return existing, nil diff --git a/pkg/registry/core/service/allocator/storage/storage_test.go b/pkg/registry/core/service/allocator/storage/storage_test.go index a4a24f8b299..bfb39cef84f 100644 --- a/pkg/registry/core/service/allocator/storage/storage_test.go +++ b/pkg/registry/core/service/allocator/storage/storage_test.go @@ -21,6 +21,7 @@ import ( "strings" "testing" + apiserverstorage "k8s.io/apiserver/pkg/storage" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" "k8s.io/apiserver/pkg/storage/storagebackend" api "k8s.io/kubernetes/pkg/apis/core" @@ -80,7 +81,7 @@ func TestStore(t *testing.T) { other := allocator.NewAllocationMap(100, "rangeSpecValue") allocation := &api.RangeAllocation{} - if err := storage.storage.Get(context.TODO(), key(), "", allocation, false); err != nil { + if err := storage.storage.Get(context.TODO(), key(), apiserverstorage.GetOptions{}, allocation); err != nil { t.Fatal(err) } if allocation.Range != "rangeSpecValue" { diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go index c32de6f8ff9..65a533102f9 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun.go @@ -35,7 +35,7 @@ func (s *DryRunnableStorage) Versioner() storage.Versioner { func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64, dryRun bool) error { if dryRun { - if err := s.Storage.Get(ctx, key, "", out, false); err == nil { + if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err == nil { return storage.NewKeyExistsError(key, 0) } return s.copyInto(obj, out) @@ -45,7 +45,7 @@ func (s *DryRunnableStorage) Create(ctx context.Context, key string, obj, out ru func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, deleteValidation storage.ValidateObjectFunc, dryRun bool) error { if dryRun { - if err := s.Storage.Get(ctx, key, "", out, false); err != nil { + if err := s.Storage.Get(ctx, key, storage.GetOptions{}, out); err != nil { return err } if err := preconditions.Check(key, out); err != nil { @@ -56,31 +56,31 @@ func (s *DryRunnableStorage) Delete(ctx context.Context, key string, out runtime return s.Storage.Delete(ctx, key, out, preconditions, deleteValidation) } -func (s *DryRunnableStorage) Watch(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) { - return s.Storage.Watch(ctx, key, resourceVersion, p) +func (s *DryRunnableStorage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return s.Storage.Watch(ctx, key, opts) } -func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate) (watch.Interface, error) { - return s.Storage.WatchList(ctx, key, resourceVersion, p) +func (s *DryRunnableStorage) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return s.Storage.WatchList(ctx, key, opts) } -func (s *DryRunnableStorage) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { - return s.Storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) +func (s *DryRunnableStorage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + return s.Storage.Get(ctx, key, opts, objPtr) } -func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { - return s.Storage.GetToList(ctx, key, resourceVersion, p, listObj) +func (s *DryRunnableStorage) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return s.Storage.GetToList(ctx, key, opts, listObj) } -func (s *DryRunnableStorage) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { - return s.Storage.List(ctx, key, resourceVersion, p, listObj) +func (s *DryRunnableStorage) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return s.Storage.List(ctx, key, opts, listObj) } func (s *DryRunnableStorage) GuaranteedUpdate( ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, dryRun bool, suggestion ...runtime.Object) error { if dryRun { - err := s.Storage.Get(ctx, key, "", ptrToType, ignoreNotFound) + err := s.Storage.Get(ctx, key, storage.GetOptions{IgnoreNotFound: ignoreNotFound}, ptrToType) if err != nil { return err } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go index 7a04ec03b87..2703ef0df97 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/dryrun_test.go @@ -69,7 +69,7 @@ func TestDryRunCreateDoesntCreate(t *testing.T) { t.Fatalf("Failed to create new dry-run object: %v", err) } - err = s.Get(context.Background(), "key", "", out, false) + err = s.Get(context.Background(), "key", storage.GetOptions{}, out) if e, ok := err.(*storage.StorageError); !ok || e.Code != storage.ErrCodeKeyNotFound { t.Errorf("Expected key to be not found, error: %v", err) } @@ -185,7 +185,7 @@ func TestDryRunUpdateDoesntUpdate(t *testing.T) { t.Fatalf("Failed to dry-run update: %v", err) } out := UnstructuredOrDie(`{}`) - err = s.Get(context.Background(), "key", "", out, false) + err = s.Get(context.Background(), "key", storage.GetOptions{}, out) if !reflect.DeepEqual(created, out) { t.Fatalf("Returned object %q different from expected %q", created, out) } @@ -239,7 +239,7 @@ func TestDryRunDeleteDoesntDelete(t *testing.T) { t.Fatalf("Failed to dry-run delete the object: %v", err) } - err = s.Get(context.Background(), "key", "", out, false) + err = s.Get(context.Background(), "key", storage.GetOptions{}, out) if err != nil { t.Fatalf("Failed to retrieve dry-run deleted object: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go index 150daad0450..f50d1a0b32a 100644 --- a/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go +++ b/staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go @@ -322,15 +322,16 @@ func (e *Store) ListPredicate(ctx context.Context, p storage.SelectionPredicate, p.Continue = options.Continue list := e.NewListFunc() qualifiedResource := e.qualifiedResourceFromContext(ctx) + storageOpts := storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: p} if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { - err := e.Storage.GetToList(ctx, key, options.ResourceVersion, p, list) + err := e.Storage.GetToList(ctx, key, storageOpts, list) return list, storeerr.InterpretListError(err, qualifiedResource) } // if we cannot extract a key based on the current context, the optimization is skipped } - err := e.Storage.List(ctx, e.KeyRootFunc(ctx), options.ResourceVersion, p, list) + err := e.Storage.List(ctx, e.KeyRootFunc(ctx), storageOpts, list) return list, storeerr.InterpretListError(err, qualifiedResource) } @@ -367,7 +368,7 @@ func (e *Store) Create(ctx context.Context, obj runtime.Object, createValidation if !apierrors.IsAlreadyExists(err) { return nil, err } - if errGet := e.Storage.Get(ctx, key, "", out, false); errGet != nil { + if errGet := e.Storage.Get(ctx, key, storage.GetOptions{}, out); errGet != nil { return nil, err } accessor, errGetAcc := meta.Accessor(out) @@ -608,7 +609,7 @@ func (e *Store) Get(ctx context.Context, name string, options *metav1.GetOptions if err != nil { return nil, err } - if err := e.Storage.Get(ctx, key, options.ResourceVersion, obj, false); err != nil { + if err := e.Storage.Get(ctx, key, storage.GetOptions{ResourceVersion: options.ResourceVersion}, obj); err != nil { return nil, storeerr.InterpretGetError(err, e.qualifiedResourceFromContext(ctx), name) } if e.Decorator != nil { @@ -892,7 +893,7 @@ func (e *Store) Delete(ctx context.Context, name string, deleteValidation rest.V } obj := e.NewFunc() qualifiedResource := e.qualifiedResourceFromContext(ctx) - if err = e.Storage.Get(ctx, key, "", obj, false); err != nil { + if err = e.Storage.Get(ctx, key, storage.GetOptions{}, obj); err != nil { return nil, false, storeerr.InterpretDeleteError(err, qualifiedResource, name) } @@ -1126,9 +1127,10 @@ func (e *Store) Watch(ctx context.Context, options *metainternalversion.ListOpti // WatchPredicate starts a watch for the items that matches. func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate, resourceVersion string) (watch.Interface, error) { + storageOpts := storage.ListOptions{ResourceVersion: resourceVersion, Predicate: p} if name, ok := p.MatchesSingle(); ok { if key, err := e.KeyFunc(ctx, name); err == nil { - w, err := e.Storage.Watch(ctx, key, resourceVersion, p) + w, err := e.Storage.Watch(ctx, key, storageOpts) if err != nil { return nil, err } @@ -1141,7 +1143,7 @@ func (e *Store) WatchPredicate(ctx context.Context, p storage.SelectionPredicate // optimization is skipped } - w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), resourceVersion, p) + w, err := e.Storage.WatchList(ctx, e.KeyRootFunc(ctx), storageOpts) if err != nil { return nil, err } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 273fcf847d3..b1a2204c7b9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -431,8 +431,9 @@ func (c *Cacher) Delete(ctx context.Context, key string, out runtime.Object, pre } // Watch implements storage.Interface. -func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - watchRV, err := c.versioner.ParseResourceVersion(resourceVersion) +func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + pred := opts.Predicate + watchRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err } @@ -515,22 +516,22 @@ func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, } // WatchList implements storage.Interface. -func (c *Cacher) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - return c.Watch(ctx, key, resourceVersion, pred) +func (c *Cacher) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return c.Watch(ctx, key, opts) } // Get implements storage.Interface. -func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error { - if resourceVersion == "" { +func (c *Cacher) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { + if opts.ResourceVersion == "" { // If resourceVersion is not specified, serve it from underlying // storage (for backward compatibility). - return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) + return c.storage.Get(ctx, key, opts, objPtr) } // If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. - getRV, err := c.versioner.ParseResourceVersion(resourceVersion) + getRV, err := c.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return err } @@ -538,7 +539,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob if getRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.Get(ctx, key, resourceVersion, objPtr, ignoreNotFound) + return c.storage.Get(ctx, key, opts, objPtr) } // Do not create a trace - it's not for free and there are tons @@ -563,7 +564,7 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob objVal.Set(reflect.ValueOf(elem.Object).Elem()) } else { objVal.Set(reflect.Zero(objVal.Type())) - if !ignoreNotFound { + if !opts.IgnoreNotFound { return storage.NewKeyNotFoundError(key, int64(readResourceVersion)) } } @@ -571,7 +572,9 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob } // GetToList implements storage.Interface. -func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + resourceVersion := opts.ResourceVersion + pred := opts.Predicate pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" @@ -582,7 +585,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri // Limits are only sent to storage when resourceVersion is non-zero // since the watch cache isn't able to perform continuations, and // limits are ignored when resource version is zero - return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) + return c.storage.GetToList(ctx, key, opts, listObj) } // If resourceVersion is specified, serve it from cache. @@ -596,7 +599,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj) + return c.storage.GetToList(ctx, key, opts, listObj) } trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) @@ -643,7 +646,9 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri } // List implements storage.Interface. -func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + resourceVersion := opts.ResourceVersion + pred := opts.Predicate pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking) hasContinuation := pagingEnabled && len(pred.Continue) > 0 hasLimit := pagingEnabled && pred.Limit > 0 && resourceVersion != "0" @@ -654,7 +659,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p // Limits are only sent to storage when resourceVersion is non-zero // since the watch cache isn't able to perform continuations, and // limits are ignored when resource version is zero. - return c.storage.List(ctx, key, resourceVersion, pred, listObj) + return c.storage.List(ctx, key, opts, listObj) } // If resourceVersion is specified, serve it from cache. @@ -668,7 +673,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.List(ctx, key, resourceVersion, pred, listObj) + return c.storage.List(ctx, key, opts, listObj) } trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) @@ -1083,7 +1088,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, Continue: options.Continue, } - if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", pred, list); err != nil { + if err := lw.storage.List(context.TODO(), lw.resourcePrefix, storage.ListOptions{Predicate: pred}, list); err != nil { return nil, err } return list, nil @@ -1091,7 +1096,7 @@ func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, // Implements cache.ListerWatcher interface. func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) { - return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, storage.Everything) + return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, storage.ListOptions{ResourceVersion: options.ResourceVersion, Predicate: storage.Everything}) } // errWatcher implements watch.Interface to return a single error diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 18abd0a602f..933241448c4 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -305,19 +305,19 @@ func (d *dummyStorage) Create(_ context.Context, _ string, _, _ runtime.Object, func (d *dummyStorage) Delete(_ context.Context, _ string, _ runtime.Object, _ *storage.Preconditions, _ storage.ValidateObjectFunc) error { return fmt.Errorf("unimplemented") } -func (d *dummyStorage) Watch(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { +func (d *dummyStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { return newDummyWatch(), nil } -func (d *dummyStorage) WatchList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate) (watch.Interface, error) { +func (d *dummyStorage) WatchList(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { return newDummyWatch(), nil } -func (d *dummyStorage) Get(_ context.Context, _ string, _ string, _ runtime.Object, _ bool) error { - return fmt.Errorf("unimplemented") -} -func (d *dummyStorage) GetToList(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, _ runtime.Object) error { +func (d *dummyStorage) Get(_ context.Context, _ string, _ storage.GetOptions, _ runtime.Object) error { return d.err } -func (d *dummyStorage) List(_ context.Context, _ string, _ string, _ storage.SelectionPredicate, listObj runtime.Object) error { +func (d *dummyStorage) GetToList(_ context.Context, _ string, _ storage.ListOptions, _ runtime.Object) error { + return d.err +} +func (d *dummyStorage) List(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { podList := listObj.(*example.PodList) podList.ListMeta = metav1.ListMeta{ResourceVersion: "100"} return d.err @@ -329,7 +329,7 @@ func (d *dummyStorage) Count(_ string) (int64, error) { return 0, fmt.Errorf("unimplemented") } -func TestListWithLimitAndRV0(t *testing.T) { +func TestListCacheBypass(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage, 0) if err != nil { @@ -347,18 +347,24 @@ func TestListWithLimitAndRV0(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy - err = cacher.List(context.TODO(), "pods/ns", "0", pred, result) + err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred, + }, result) if err != nil { t.Errorf("List with Limit and RV=0 should be served from cache: %v", err) } - err = cacher.List(context.TODO(), "pods/ns", "", pred, result) + err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "", + Predicate: pred, + }, result) if err != errDummy { t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) } } -func TestGetToListWithLimitAndRV0(t *testing.T) { +func TestGetToListCacheBypass(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage, 0) if err != nil { @@ -376,17 +382,55 @@ func TestGetToListWithLimitAndRV0(t *testing.T) { // Inject error to underlying layer and check if cacher is not bypassed. backingStorage.err = errDummy - err = cacher.GetToList(context.TODO(), "pods/ns", "0", pred, result) + err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred, + }, result) if err != nil { t.Errorf("GetToList with Limit and RV=0 should be served from cache: %v", err) } - err = cacher.GetToList(context.TODO(), "pods/ns", "", pred, result) + err = cacher.GetToList(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "", + Predicate: pred, + }, result) if err != errDummy { t.Errorf("List with Limit without RV=0 should bypass cacher: %v", err) } } +func TestGetCacheBypass(t *testing.T) { + backingStorage := &dummyStorage{} + cacher, _, err := newTestCacher(backingStorage, 1) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + result := &example.Pod{} + + // Wait until cacher is initialized. + cacher.ready.wait() + + // Inject error to underlying layer and check if cacher is not bypassed. + backingStorage.err = errDummy + err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + IgnoreNotFound: true, + ResourceVersion: "0", + }, result) + if err != nil { + t.Errorf("Get with RV=0 should be served from cache: %v", err) + } + + err = cacher.Get(context.TODO(), "pods/ns/pod-0", storage.GetOptions{ + IgnoreNotFound: true, + ResourceVersion: "", + }, result) + if err != errDummy { + t.Errorf("Get without RV=0 should bypass cacher: %v", err) + } +} + func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} cacher, _, err := newTestCacher(backingStorage, 1000) @@ -417,7 +461,10 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { totalPods := 100 // Create watcher that will be slowing down reading. - w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ + ResourceVersion: "999", + Predicate: storage.Everything, + }) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -439,7 +486,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { } // Create fast watcher and ensure it will get each object exactly once. - w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -519,7 +566,7 @@ func TestCacheWatcherStoppedOnDestroy(t *testing.T) { // Wait until cacher is initialized. cacher.ready.wait() - w, err := cacher.Watch(context.Background(), "pods/ns", "0", storage.Everything) + w, err := cacher.Watch(context.Background(), "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -618,7 +665,7 @@ func TestCacherNoLeakWithMultipleWatchers(t *testing.T) { return default: ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) - w, err := cacher.Watch(ctx, "pods/ns", "0", pred) + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -672,7 +719,7 @@ func testCacherSendBookmarkEvents(t *testing.T, allowWatchBookmarks, expectedBoo pred.AllowWatchBookmarks = allowWatchBookmarks ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) - w, err := cacher.Watch(ctx, "pods/ns", "0", pred) + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -779,7 +826,7 @@ func TestCacherSendsMultipleWatchBookmarks(t *testing.T) { } ctx, _ := context.WithTimeout(context.Background(), 10*time.Second) - w, err := cacher.Watch(ctx, "pods/ns", "100", pred) + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "100", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -847,7 +894,7 @@ func TestDispatchingBookmarkEventsWithConcurrentStop(t *testing.T) { pred := storage.Everything pred.AllowWatchBookmarks = true ctx, _ := context.WithTimeout(context.Background(), time.Second) - w, err := cacher.Watch(ctx, "pods/ns", "999", pred) + w, err := cacher.Watch(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -921,14 +968,14 @@ func TestDispatchEventWillNotBeBlockedByTimedOutWatcher(t *testing.T) { totalPods := 50 // Create watcher that will be blocked. - w1, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + w1, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } defer w1.Stop() // Create fast watcher and ensure it will get all objects. - w2, err := cacher.Watch(context.TODO(), "pods/ns", "999", storage.Everything) + w2, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -1011,7 +1058,7 @@ func TestCachingDeleteEvents(t *testing.T) { } createWatch := func(pred storage.SelectionPredicate) watch.Interface { - w, err := cacher.Watch(context.TODO(), "pods/ns", "999", pred) + w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "999", Predicate: pred}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } @@ -1089,7 +1136,7 @@ func testCachingObjects(t *testing.T, watchersCount int) { watchers := make([]watch.Interface, 0, watchersCount) for i := 0; i < watchersCount; i++ { - w, err := cacher.Watch(context.TODO(), "pods/ns", "1000", storage.Everything) + w, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: "1000", Predicate: storage.Everything}) if err != nil { t.Fatalf("Failed to create watch: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go index 321cab04d11..c42fc6e08ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/api_object_versioner.go @@ -17,6 +17,7 @@ limitations under the License. package etcd3 import ( + "fmt" "strconv" "k8s.io/apimachinery/pkg/api/meta" @@ -45,14 +46,14 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin // UpdateList implements Versioner func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string, count *int64) error { + if resourceVersion == 0 { + return fmt.Errorf("illegal resource version from storage: %d", resourceVersion) + } listAccessor, err := meta.ListAccessor(obj) if err != nil || listAccessor == nil { return err } - versionString := "" - if resourceVersion != 0 { - versionString = strconv.FormatUint(resourceVersion, 10) - } + versionString := strconv.FormatUint(resourceVersion, 10) listAccessor.SetResourceVersion(versionString) listAccessor.SetContinue(nextKey) listAccessor.SetRemainingItemCount(count) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index 1cc68d19b60..1e5230cdb13 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -63,7 +63,7 @@ var _ value.Context = authenticatedDataString("") type store struct { client *clientv3.Client - // getOpts contains additional options that should be passed + // getOps contains additional options that should be passed // to all Get() calls. getOps []clientv3.OpOption codec runtime.Codec @@ -112,20 +112,21 @@ func (s *store) Versioner() storage.Versioner { } // Get implements storage.Interface.Get. -func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error { +func (s *store) Get(ctx context.Context, key string, opts storage.GetOptions, out runtime.Object) error { key = path.Join(s.pathPrefix, key) startTime := time.Now() - getResp, err := s.client.KV.Get(ctx, key, s.getOps...) + callOpts := s.getOps + getResp, err := s.client.KV.Get(ctx, key, callOpts...) metrics.RecordEtcdRequestLatency("get", getTypeName(out), startTime) if err != nil { return err } - if err = s.ensureMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil { + if err = s.ensureMinimumResourceVersion(opts.ResourceVersion, uint64(getResp.Header.Revision)); err != nil { return err } if len(getResp.Kvs) == 0 { - if ignoreNotFound { + if opts.IgnoreNotFound { return runtime.SetZeroValue(out) } return storage.NewKeyNotFoundError(key, 0) @@ -379,7 +380,9 @@ func (s *store) GuaranteedUpdate( } // GetToList implements storage.Interface.GetToList. -func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error { + resourceVersion := listOpts.ResourceVersion + pred := listOpts.Predicate trace := utiltrace.New("GetToList etcd3", utiltrace.Field{"key", key}, utiltrace.Field{"resourceVersion", resourceVersion}, @@ -510,7 +513,9 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error } // List implements storage.Interface.List. -func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error { +func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + resourceVersion := opts.ResourceVersion + pred := opts.Predicate trace := utiltrace.New("List etcd3", utiltrace.Field{"key", key}, utiltrace.Field{"resourceVersion", resourceVersion}, @@ -585,7 +590,6 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix) options = append(options, clientv3.WithRange(rangeEnd)) - default: options = append(options, clientv3.WithPrefix()) } @@ -709,22 +713,22 @@ func growSlice(v reflect.Value, maxCapacity int, sizes ...int) { } // Watch implements storage.Interface.Watch. -func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - return s.watch(ctx, key, resourceVersion, pred, false) +func (s *store) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return s.watch(ctx, key, opts, false) } // WatchList implements storage.Interface.WatchList. -func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) { - return s.watch(ctx, key, resourceVersion, pred, true) +func (s *store) WatchList(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { + return s.watch(ctx, key, opts, true) } -func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) { - rev, err := s.versioner.ParseResourceVersion(rv) +func (s *store) watch(ctx context.Context, key string, opts storage.ListOptions, recursive bool) (watch.Interface, error) { + rev, err := s.versioner.ParseResourceVersion(opts.ResourceVersion) if err != nil { return nil, err } key = path.Join(s.pathPrefix, key) - return s.watcher.Watch(ctx, key, int64(rev), recursive, pred) + return s.watcher.Watch(ctx, key, int64(rev), recursive, opts.Predicate) } func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 51364728317..0553a691493 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -168,7 +168,7 @@ func TestCreateWithTTL(t *testing.T) { t.Fatalf("Create failed: %v", err) } - w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -191,7 +191,19 @@ func TestGet(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) // create an object to test - key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + key, createdObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + // update the object once to allow get by exact resource version to be tested + updateObj := createdObj.DeepCopy() + updateObj.Annotations = map[string]string{"test-annotation": "1"} + storedObj := &example.Pod{} + err := store.GuaranteedUpdate(ctx, key, storedObj, true, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + ttl := uint64(1) + return updateObj, &ttl, nil + }) + if err != nil { + t.Fatalf("Update failed: %v", err) + } // create an additional object to increment the resource version for pods above the resource version of the foo object lastUpdatedObj := &example.Pod{} if err := store.Create(ctx, "bar", &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, lastUpdatedObj, 0); err != nil { @@ -201,6 +213,7 @@ func TestGet(t *testing.T) { currentRV, _ := strconv.Atoi(storedObj.ResourceVersion) lastUpdatedCurrentRV, _ := strconv.Atoi(lastUpdatedObj.ResourceVersion) + // TODO(jpbetz): Add exact test cases tests := []struct { name string key string @@ -209,28 +222,34 @@ func TestGet(t *testing.T) { expectRVTooLarge bool expectedOut *example.Pod rv string + expectedRV string }{{ // test get on existing item name: "get existing", key: key, ignoreNotFound: false, expectNotFoundErr: false, expectedOut: storedObj, - }, { // test get on existing item with minimum resource version set to 0 + }, { // test get on existing item with resource version set to 0 name: "resource version 0", key: key, expectedOut: storedObj, rv: "0", - }, { // test get on existing item with minimum resource version set to current resource version of the object - name: "current object resource version", + }, { // test get on existing item with resource version set to the resource version is was created on + name: "object created resource version", + key: key, + expectedOut: storedObj, + rv: createdObj.ResourceVersion, + }, { // test get on existing item with resource version set to current resource version of the object + name: "current object resource version, match=NotOlderThan", key: key, expectedOut: storedObj, rv: fmt.Sprintf("%d", currentRV), - }, { // test get on existing item with minimum resource version set to latest pod resource version + }, { // test get on existing item with resource version set to latest pod resource version name: "latest resource version", key: key, expectedOut: storedObj, rv: fmt.Sprintf("%d", lastUpdatedCurrentRV), - }, { // test get on existing item with minimum resource version set too high + }, { // test get on existing item with resource version set too high name: "too high resource version", key: key, expectRVTooLarge: true, @@ -251,7 +270,7 @@ func TestGet(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { out := &example.Pod{} - err := store.Get(ctx, tt.key, tt.rv, out, tt.ignoreNotFound) + err := store.Get(ctx, tt.key, storage.GetOptions{IgnoreNotFound: tt.ignoreNotFound, ResourceVersion: tt.rv}, out) if tt.expectNotFoundErr { if err == nil || !storage.IsNotFound(err) { t.Errorf("expecting not found error, but get: %v", err) @@ -264,11 +283,16 @@ func TestGet(t *testing.T) { } return } + if tt.expectedRV != "" { + if tt.expectedRV != out.ResourceVersion { + t.Errorf("expecting resource version want=%s, got=%s", tt.expectedRV, out.ResourceVersion) + } + } if err != nil { t.Fatalf("Get failed: %v", err) } if !reflect.DeepEqual(tt.expectedOut, out) { - t.Errorf("pod want=%#v, get=%#v", tt.expectedOut, out) + t.Errorf("pod want=\n%#v\nget=\n%#v", tt.expectedOut, out) } }) } @@ -398,7 +422,7 @@ func TestGetToList(t *testing.T) { for i, tt := range tests { out := &example.PodList{} - err := store.GetToList(ctx, tt.key, tt.rv, tt.pred, out) + err := store.GetToList(ctx, tt.key, storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred}, out) if tt.expectRVTooLarge { if err == nil || !storage.IsTooLargeResourceVersion(err) { @@ -587,7 +611,7 @@ func TestGuaranteedUpdateWithTTL(t *testing.T) { t.Fatalf("Create failed: %v", err) } - w, err := store.Watch(ctx, key, out.ResourceVersion, storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -801,12 +825,12 @@ func TestTransformationFailure(t *testing.T) { // List should fail var got example.PodList - if err := store.List(ctx, "/", "", storage.Everything, &got); !storage.IsInternalError(err) { + if err := store.List(ctx, "/", storage.ListOptions{Predicate: storage.Everything}, &got); !storage.IsInternalError(err) { t.Errorf("Unexpected error %v", err) } // Get should fail - if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { + if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } // GuaranteedUpdate without suggestion should return an error @@ -826,7 +850,7 @@ func TestTransformationFailure(t *testing.T) { if err := store.Delete(ctx, preset[1].key, &example.Pod{}, nil, storage.ValidateAllObjectFunc); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } - if err := store.Get(ctx, preset[1].key, "", &example.Pod{}, false); !storage.IsInternalError(err) { + if err := store.Get(ctx, preset[1].key, storage.GetOptions{}, &example.Pod{}); !storage.IsInternalError(err) { t.Errorf("Unexpected error: %v", err) } } @@ -894,7 +918,7 @@ func TestList(t *testing.T) { } list := &example.PodList{} - store.List(ctx, "/two-level", "0", storage.Everything, list) + store.List(ctx, "/two-level", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}, list) continueRV, _ := strconv.Atoi(list.ResourceVersion) secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV)) if err != nil { @@ -917,6 +941,7 @@ func TestList(t *testing.T) { expectedRemainingItemCount *int64 expectError bool expectRVTooLarge bool + expectRV string }{ { name: "rejects invalid resource version", @@ -950,14 +975,14 @@ func TestList(t *testing.T) { expectedOut: []*example.Pod{preset[0].storedObj}, }, { - name: "test List on existing key with minimum resource version set to 0", + name: "test List on existing key with resource version set to 0", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, rv: "0", }, { - name: "test List on existing key with minimum resource version set to current resource version", + name: "test List on existing key with resource version set to current resource version", prefix: "/one-level/", pred: storage.Everything, expectedOut: []*example.Pod{preset[0].storedObj}, @@ -990,6 +1015,34 @@ func TestList(t *testing.T) { expectContinue: true, expectedRemainingItemCount: utilpointer.Int64Ptr(1), }, + { + name: "test List with limit at current resource version", + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + }, + expectedOut: []*example.Pod{preset[1].storedObj}, + expectContinue: true, + expectedRemainingItemCount: utilpointer.Int64Ptr(1), + rv: list.ResourceVersion, + expectRV: list.ResourceVersion, + }, + { + name: "test List with limit at resource version 0", + prefix: "/two-level/", + pred: storage.SelectionPredicate{ + Label: labels.Everything(), + Field: fields.Everything(), + Limit: 1, + }, + expectedOut: []*example.Pod{preset[1].storedObj}, + expectContinue: true, + expectedRemainingItemCount: utilpointer.Int64Ptr(1), + rv: "0", + expectRV: list.ResourceVersion, + }, { name: "test List with limit when paging disabled", disablePaging: true, @@ -1142,46 +1195,58 @@ func TestList(t *testing.T) { } for _, tt := range tests { - if tt.pred.GetAttrs == nil { - tt.pred.GetAttrs = getAttrs - } - - out := &example.PodList{} - var err error - if tt.disablePaging { - err = disablePagingStore.List(ctx, tt.prefix, tt.rv, tt.pred, out) - } else { - err = store.List(ctx, tt.prefix, tt.rv, tt.pred, out) - } - if tt.expectRVTooLarge { - if err == nil || !storage.IsTooLargeResourceVersion(err) { - t.Errorf("(%s): expecting resource version too high error, but get: %s", tt.name, err) + t.Run(tt.name, func(t *testing.T) { + if tt.pred.GetAttrs == nil { + tt.pred.GetAttrs = getAttrs } - continue - } - if (err != nil) != tt.expectError { - t.Errorf("(%s): List failed: %v", tt.name, err) - } - if err != nil { - continue - } - if (len(out.Continue) > 0) != tt.expectContinue { - t.Errorf("(%s): unexpected continue token: %q", tt.name, out.Continue) - } - if len(tt.expectedOut) != len(out.Items) { - t.Errorf("(%s): length of list want=%d, got=%d", tt.name, len(tt.expectedOut), len(out.Items)) - continue - } - if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) { - t.Errorf("(%s): remainingItemCount want=%#v, got=%#v", tt.name, e, a) - } - for j, wantPod := range tt.expectedOut { - getPod := &out.Items[j] - if !reflect.DeepEqual(wantPod, getPod) { - t.Errorf("(%s): pod want=%#v, got=%#v", tt.name, wantPod, getPod) + out := &example.PodList{} + storageOpts := storage.ListOptions{ResourceVersion: tt.rv, Predicate: tt.pred} + var err error + if tt.disablePaging { + err = disablePagingStore.List(ctx, tt.prefix, storageOpts, out) + } else { + err = store.List(ctx, tt.prefix, storageOpts, out) } - } + if tt.expectRVTooLarge { + if err == nil || !storage.IsTooLargeResourceVersion(err) { + t.Fatalf("expecting resource version too high error, but get: %s", err) + } + return + } + + if err != nil { + if !tt.expectError { + t.Fatalf("List failed: %v", err) + } + return + } + if tt.expectError { + t.Fatalf("expected error but got none") + } + if (len(out.Continue) > 0) != tt.expectContinue { + t.Errorf("unexpected continue token: %q", out.Continue) + } + + // If a client requests an exact resource version, it must be echoed back to them. + if tt.expectRV != "" { + if tt.expectRV != out.ResourceVersion { + t.Errorf("resourceVersion in list response want=%s, got=%s", tt.expectRV, out.ResourceVersion) + } + } + if len(tt.expectedOut) != len(out.Items) { + t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items)) + } + if e, a := tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount(); (e == nil) != (a == nil) || (e != nil && a != nil && *e != *a) { + t.Errorf("remainingItemCount want=%#v, got=%#v", e, a) + } + for j, wantPod := range tt.expectedOut { + getPod := &out.Items[j] + if !reflect.DeepEqual(wantPod, getPod) { + t.Errorf("pod want=%#v, got=%#v", wantPod, getPod) + } + } + }) } } @@ -1249,7 +1314,7 @@ func TestListContinuation(t *testing.T) { }, } } - if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil { t.Fatalf("Unable to get initial list: %v", err) } if len(out.Continue) == 0 { @@ -1271,7 +1336,7 @@ func TestListContinuation(t *testing.T) { // no limit, should get two items out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { @@ -1293,7 +1358,7 @@ func TestListContinuation(t *testing.T) { // limit, should get two more pages out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromSecondItem)}, out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) == 0 { @@ -1314,7 +1379,7 @@ func TestListContinuation(t *testing.T) { continueFromThirdItem := out.Continue out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { @@ -1406,7 +1471,7 @@ func TestListContinuationWithFilter(t *testing.T) { }, } } - if err := store.List(ctx, "/", "0", pred(2, ""), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, "")}, out); err != nil { t.Errorf("Unable to get initial list: %v", err) } if len(out.Continue) == 0 { @@ -1435,7 +1500,7 @@ func TestListContinuationWithFilter(t *testing.T) { // but since there is only one item left, that is all we should get with no continueValue // both read counters should be incremented for the singular calls they make in this case out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(2, cont), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(2, cont)}, out); err != nil { t.Errorf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { @@ -1514,7 +1579,7 @@ func TestListInconsistentContinuation(t *testing.T) { } out := &example.PodList{} - if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, "")}, out); err != nil { t.Fatalf("Unable to get initial list: %v", err) } if len(out.Continue) == 0 { @@ -1555,7 +1620,7 @@ func TestListInconsistentContinuation(t *testing.T) { } // The old continue token should have expired - err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out) + err = store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(0, continueFromSecondItem)}, out) if err == nil { t.Fatalf("unexpected no error") } @@ -1572,7 +1637,7 @@ func TestListInconsistentContinuation(t *testing.T) { } out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, inconsistentContinueFromSecondItem)}, out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) == 0 { @@ -1586,7 +1651,7 @@ func TestListInconsistentContinuation(t *testing.T) { } continueFromThirdItem := out.Continue out = &example.PodList{} - if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil { + if err := store.List(ctx, "/", storage.ListOptions{ResourceVersion: "0", Predicate: pred(1, continueFromThirdItem)}, out); err != nil { t.Fatalf("Unable to get second page: %v", err) } if len(out.Continue) != 0 { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index 9e820cf557d..ad3209ccd47 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -94,7 +94,7 @@ func testWatch(t *testing.T, recursive bool) { }, }} for i, tt := range tests { - w, err := store.watch(ctx, tt.key, "0", tt.pred, recursive) + w, err := store.watch(ctx, tt.key, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred}, recursive) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -131,7 +131,7 @@ func TestDeleteTriggerWatch(t *testing.T) { ctx, store, cluster := testSetup(t) defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -149,7 +149,7 @@ func TestWatchFromZero(t *testing.T) { defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) - w, err := store.Watch(ctx, key, "0", storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -167,7 +167,7 @@ func TestWatchFromZero(t *testing.T) { } // Make sure when we watch from 0 we receive an ADDED event - w, err = store.Watch(ctx, key, "0", storage.Everything) + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -195,7 +195,7 @@ func TestWatchFromZero(t *testing.T) { } // Make sure we can still watch from 0 and receive an ADDED event - w, err = store.Watch(ctx, key, "0", storage.Everything) + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -209,7 +209,7 @@ func TestWatchFromNoneZero(t *testing.T) { defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -227,7 +227,7 @@ func TestWatchError(t *testing.T) { defer cluster.Terminate(t) invalidStore := newStore(cluster.RandClient(), true, codec, "", &prefixTransformer{prefix: []byte("test!")}) ctx := context.Background() - w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything) + w, err := invalidStore.Watch(ctx, "/abc", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } @@ -289,7 +289,7 @@ func TestWatchDeleteEventObjectHaveLatestRV(t *testing.T) { defer cluster.Terminate(t) key, storedObj := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) - w, err := store.Watch(ctx, key, storedObj.ResourceVersion, storage.Everything) + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Watch failed: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go index 31d4b2a2859..bd0632717ef 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/interfaces.go @@ -175,7 +175,7 @@ type Interface interface { // (e.g. reconnecting without missing any updates). // If resource version is "0", this interface will get current object at given key // and send it in an "ADDED" event, before watch starts. - Watch(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) + Watch(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) // WatchList begins watching the specified key's items. Items are decoded into API // objects and any item selected by 'p' are sent down to returned watch.Interface. @@ -184,26 +184,23 @@ type Interface interface { // (e.g. reconnecting without missing any updates). // If resource version is "0", this interface will list current objects directory defined by key // and send them in "ADDED" events, before watch starts. - WatchList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate) (watch.Interface, error) + WatchList(ctx context.Context, key string, opts ListOptions) (watch.Interface, error) // Get unmarshals json found at key into objPtr. On a not found error, will either - // return a zero object of the requested type, or an error, depending on ignoreNotFound. + // return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'. // Treats empty responses and nil response nodes exactly like a not found error. - // The returned contents may be delayed, but it is guaranteed that they will - // be have at least 'resourceVersion'. - Get(ctx context.Context, key string, resourceVersion string, objPtr runtime.Object, ignoreNotFound bool) error + // The returned contents may be delayed according to the semantics of GetOptions.ResourceVersion. + Get(ctx context.Context, key string, opts GetOptions, objPtr runtime.Object) error // GetToList unmarshals json found at key and opaque it into *List api object // (an object that satisfies the runtime.IsList definition). - // The returned contents may be delayed, but it is guaranteed that they will - // be have at least 'resourceVersion'. - GetToList(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error + // The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion. + GetToList(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error // List unmarshalls jsons found at directory defined by key and opaque them // into *List api object (an object that satisfies runtime.IsList definition). - // The returned contents may be delayed, but it is guaranteed that they will - // be have at least 'resourceVersion'. - List(ctx context.Context, key string, resourceVersion string, p SelectionPredicate, listObj runtime.Object) error + // The returned contents may be delayed according to the semantics of ListOptions.ResourceVersion. + List(ctx context.Context, key string, opts ListOptions, listObj runtime.Object) error // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') // retrying the update until success if there is index conflict. @@ -243,3 +240,26 @@ type Interface interface { // Count returns number of different entries under the key (generally being path prefix). Count(key string) (int64, error) } + +// GetOptions provides the options that may be provided for storage get operations. +type GetOptions struct { + // IgnoreNotFound determines what is returned if the requested object is not found. If + // true, a zero object is returned. If false, an error is returned. + IgnoreNotFound bool + // ResourceVersion provides a resource version constraint to apply to the get operation + // as a "not older than" constraint: the result contains data at least as new as the provided + // ResourceVersion. The newest available data is preferred, but any data not older than this + // ResourceVersion may be served. + ResourceVersion string +} + +// ListOptions provides the options that may be provided for storage list operations. +type ListOptions struct { + // ResourceVersion provides a resource version constraint to apply to the list operation + // as a "not older than" constraint: the result contains data at least as new as the provided + // ResourceVersion. The newest available data is preferred, but any data not older than this + // ResourceVersion may be served. + ResourceVersion string + // Predicate provides the selection rules for the list operation. + Predicate SelectionPredicate +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index c9e0a78bd1f..1f1b273c766 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -140,7 +140,7 @@ func updatePod(t *testing.T, s storage.Interface, obj, old *example.Pod) *exampl } obj.ResourceVersion = "" result := &example.Pod{} - if err := s.Get(context.TODO(), key, "", result, false); err != nil { + if err := s.Get(context.TODO(), key, storage.GetOptions{}, result); err != nil { t.Errorf("unexpected error: %v", err) } return result @@ -160,14 +160,14 @@ func TestGet(t *testing.T) { // We pass the ResourceVersion from the above Create() operation. result := &example.Pod{} - if err := cacher.Get(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, result, true); err != nil { + if err := cacher.Get(context.TODO(), "pods/ns/foo", storage.GetOptions{IgnoreNotFound: true, ResourceVersion: fooCreated.ResourceVersion}, result); err != nil { t.Errorf("Unexpected error: %v", err) } if e, a := *fooCreated, *result; !reflect.DeepEqual(e, a) { t.Errorf("Expected: %#v, got: %#v", e, a) } - if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, true); err != nil { + if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion, IgnoreNotFound: true}, result); err != nil { t.Errorf("Unexpected error: %v", err) } emptyPod := example.Pod{} @@ -175,7 +175,7 @@ func TestGet(t *testing.T) { t.Errorf("Expected: %#v, got: %#v", e, a) } - if err := cacher.Get(context.TODO(), "pods/ns/bar", fooCreated.ResourceVersion, result, false); !storage.IsNotFound(err) { + if err := cacher.Get(context.TODO(), "pods/ns/bar", storage.GetOptions{ResourceVersion: fooCreated.ResourceVersion}, result); !storage.IsNotFound(err) { t.Errorf("Unexpected error: %v", err) } } @@ -219,7 +219,7 @@ func TestGetToList(t *testing.T) { for i, tt := range tests { out := &example.PodList{} - err := cacher.GetToList(context.TODO(), tt.key, "", tt.pred, out) + err := cacher.GetToList(context.TODO(), tt.key, storage.ListOptions{Predicate: tt.pred}, out) if err != nil { t.Fatalf("GetToList failed: %v", err) } @@ -275,7 +275,7 @@ func TestList(t *testing.T) { // We first List directly from etcd by passing empty resourceVersion, // to get the current etcd resourceVersion. rvResult := &example.PodList{} - if err := cacher.List(context.TODO(), "pods/ns", "", storage.Everything, rvResult); err != nil { + if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{Predicate: storage.Everything}, rvResult); err != nil { t.Errorf("Unexpected error: %v", err) } deletedPodRV := rvResult.ListMeta.ResourceVersion @@ -283,7 +283,7 @@ func TestList(t *testing.T) { result := &example.PodList{} // We pass the current etcd ResourceVersion received from the above List() operation, // since there is not easy way to get ResourceVersion of barPod deletion operation. - if err := cacher.List(context.TODO(), "pods/ns", deletedPodRV, storage.Everything, result); err != nil { + if err := cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: deletedPodRV, Predicate: storage.Everything}, result); err != nil { t.Errorf("Unexpected error: %v", err) } if result.ListMeta.ResourceVersion != deletedPodRV { @@ -345,7 +345,7 @@ func TestTooLargeResourceVersionList(t *testing.T) { listRV := strconv.Itoa(int(rv + 10)) result := &example.PodList{} - err = cacher.List(context.TODO(), "pods/ns", listRV, storage.Everything, result) + err = cacher.List(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: listRV, Predicate: storage.Everything}, result) if !errors.IsTimeout(err) { t.Errorf("Unexpected error: %v", err) } @@ -381,12 +381,12 @@ type injectListError struct { storage.Interface } -func (self *injectListError) List(ctx context.Context, key string, resourceVersion string, p storage.SelectionPredicate, listObj runtime.Object) error { +func (self *injectListError) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { if self.errors > 0 { self.errors-- return fmt.Errorf("injected error") } - return self.Interface.List(ctx, key, resourceVersion, p, listObj) + return self.Interface.List(ctx, key, opts, listObj) } func TestWatch(t *testing.T) { @@ -421,7 +421,7 @@ func TestWatch(t *testing.T) { startVersion := strconv.Itoa(int(initialVersion)) // Set up Watch for object "podFoo". - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -438,7 +438,7 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, watcher, watch.Modified, podFooPrime) // Check whether we get too-old error via the watch channel - tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything) + tooOldWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "1", Predicate: storage.Everything}) if err != nil { t.Fatalf("Expected no direct error, got %v", err) } @@ -446,7 +446,7 @@ func TestWatch(t *testing.T) { // Events happens in eventFreshDuration, cache expand without event "Gone". verifyWatchEvent(t, tooOldWatcher, watch.Added, podFoo) - initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, storage.Everything) + initialWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -455,7 +455,7 @@ func TestWatch(t *testing.T) { verifyWatchEvent(t, initialWatcher, watch.Modified, podFooPrime) // Now test watch from "now". - nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything) + nowWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -488,7 +488,7 @@ func TestWatcherTimeout(t *testing.T) { // Create a number of watchers that will not be reading any result. nonReadingWatchers := 50 for i := 0; i < nonReadingWatchers; i++ { - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -496,7 +496,7 @@ func TestWatcherTimeout(t *testing.T) { } // Create a second watcher that will be reading result. - readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) + readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -524,7 +524,7 @@ func TestFiltering(t *testing.T) { // Ensure that the cacher is initialized, before creating any pods, // so that we are sure that all events will be present in cacher. - syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "0", storage.Everything) + syncWatcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -566,7 +566,7 @@ func TestFiltering(t *testing.T) { return labels.Set(metadata.GetLabels()), nil, nil }, } - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", fooCreated.ResourceVersion, pred) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: fooCreated.ResourceVersion, Predicate: pred}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -599,7 +599,7 @@ func TestStartingResourceVersion(t *testing.T) { rv += 10 startVersion := strconv.Itoa(int(rv)) - watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -662,7 +662,7 @@ func TestEmptyWatchEventCache(t *testing.T) { // It should support establishing watches from rv and higher, but not older. { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv-1)), storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -672,7 +672,7 @@ func TestEmptyWatchEventCache(t *testing.T) { } { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv+1)), storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -686,7 +686,7 @@ func TestEmptyWatchEventCache(t *testing.T) { } { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", strconv.Itoa(int(rv)), storage.Everything) + watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -716,7 +716,7 @@ func TestRandomWatchDeliver(t *testing.T) { } startVersion := strconv.Itoa(int(rv)) - watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything) + watcher, err := cacher.WatchList(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: startVersion, Predicate: storage.Everything}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -870,7 +870,7 @@ func TestWatchDispatchBookmarkEvents(t *testing.T) { pred := storage.Everything pred.AllowWatchBookmarks = c.allowWatchBookmark ctx, _ := context.WithTimeout(context.Background(), c.timeout) - watcher, err := cacher.Watch(ctx, "pods/ns/foo", startVersion, pred) + watcher, err := cacher.Watch(ctx, "pods/ns/foo", storage.ListOptions{ResourceVersion: startVersion, Predicate: pred}) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -910,7 +910,7 @@ func TestWatchBookmarksWithCorrectResourceVersion(t *testing.T) { pred := storage.Everything pred.AllowWatchBookmarks = true ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) - watcher, err := cacher.WatchList(ctx, "pods/ns", "0", pred) + watcher, err := cacher.WatchList(ctx, "pods/ns", storage.ListOptions{ResourceVersion: "0", Predicate: pred}) if err != nil { t.Fatalf("Unexpected error: %v", err) }