diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 8cf76f76567..1956e5fc030 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -665,11 +665,6 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return c.storage.GetList(ctx, key, opts, listObj) } - match := opts.ResourceVersionMatch - if match != metav1.ResourceVersionMatchNotOlderThan && match != "" { - return fmt.Errorf("unknown ResourceVersionMatch value: %v", match) - } - // If resourceVersion is specified, serve it from cache. // It's guaranteed that the returned value is at least that // fresh as the given resourceVersion. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 7d94355a0fc..5d7207f5f77 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "reflect" + "sync" "sync/atomic" "testing" "time" @@ -163,7 +164,11 @@ func testCheckResultFunc(t *testing.T, expectEventType watch.EventType, w watch. t.Errorf("event type want=%v, get=%v", expectEventType, res.Type) return } - if err := check(res.Object); err != nil { + obj := res.Object + if co, ok := obj.(runtime.CacheableObject); ok { + obj = co.GetObject() + } + if err := check(obj); err != nil { t.Error(err) } case <-time.After(wait.ForeverTestTimeout): @@ -210,6 +215,36 @@ func resourceVersionNotOlderThan(sentinel string) func(string) error { } } +// StorageInjectingListErrors injects a dummy error for first N GetList calls. +type StorageInjectingListErrors struct { + storage.Interface + + lock sync.Mutex + Errors int +} + +func (s *StorageInjectingListErrors) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + err := func() error { + s.lock.Lock() + defer s.lock.Unlock() + if s.Errors > 0 { + s.Errors-- + return fmt.Errorf("injected error") + } + return nil + }() + if err != nil { + return err + } + return s.Interface.GetList(ctx, key, opts, listObj) +} + +func (s *StorageInjectingListErrors) ErrorsConsumed() (bool, error) { + s.lock.Lock() + defer s.lock.Unlock() + return s.Errors == 0, nil +} + // PrefixTransformer adds and verifies that all data has the correct prefix on its way in and out. type PrefixTransformer struct { prefix []byte diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 4455f574851..30558e9ff58 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -70,10 +70,10 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur watchTests: []*testWatchStruct{{basePod, false, ""}, {basePodAssigned, true, watch.Added}}, pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("spec.nodename=bar"), + Field: fields.ParseSelectorOrDie("spec.nodeName=bar"), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil + return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil }, }, }, { @@ -87,22 +87,28 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur watchTests: []*testWatchStruct{{basePod, true, watch.Added}, {basePodAssigned, true, watch.Deleted}}, pred: storage.SelectionPredicate{ Label: labels.Everything(), - Field: fields.ParseSelectorOrDie("spec.nodename!=bar"), + Field: fields.ParseSelectorOrDie("spec.nodeName!=bar"), GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { pod := obj.(*example.Pod) - return nil, fields.Set{"spec.nodename": pod.Spec.NodeName}, nil + return nil, fields.Set{"spec.nodeName": pod.Spec.NodeName}, nil }, }, }} for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - watchKey := fmt.Sprintf("pods/%s", tt.namespace) + watchKey := fmt.Sprintf("/pods/%s", tt.namespace) key := watchKey + "/foo" if !recursive { watchKey = key } - w, err := store.Watch(ctx, watchKey, storage.ListOptions{ResourceVersion: "0", Predicate: tt.pred, Recursive: recursive}) + // Get the current RV from which we can start watching. + out := &example.PodList{} + if err := store.GetList(ctx, watchKey, storage.ListOptions{ResourceVersion: "", Predicate: tt.pred, Recursive: recursive}, out); err != nil { + t.Fatalf("List failed: %v", err) + } + + w, err := store.Watch(ctx, watchKey, storage.ListOptions{ResourceVersion: out.ResourceVersion, Predicate: tt.pred, Recursive: recursive}) if err != nil { t.Fatalf("Watch failed: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 7d343fd0249..0e4b39e04e0 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -201,23 +201,16 @@ func verifyWatchEvent(t *testing.T, w watch.Interface, eventType watch.EventType } } -type injectListError struct { - errors int - storage.Interface -} - -func (self *injectListError) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - if self.errors > 0 { - self.errors-- - return fmt.Errorf("injected error") - } - return self.Interface.GetList(ctx, key, opts, listObj) -} - func TestWatch(t *testing.T) { + ctx, cacher, terminate := testSetup(t) + t.Cleanup(terminate) + storagetesting.RunTestWatch(ctx, t, cacher) +} + +// TODO(wojtek-t): We should extend the generic RunTestWatch test to cover the +// scenarios that are not yet covered by it and get rid of this test. +func TestWatchDeprecated(t *testing.T) { server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) - // Inject one list error to make sure we test the relist case. - etcdStorage = &injectListError{errors: 1, Interface: etcdStorage} defer server.Terminate(t) fakeClock := testingclock.NewFakeClock(time.Now()) cacher, _, err := newTestCacherWithClock(etcdStorage, fakeClock) @@ -787,8 +780,14 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora } server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) + // Inject one list error to make sure we test the relist case. + wrappedStorage := &storagetesting.StorageInjectingListErrors{ + Interface: etcdStorage, + Errors: 1, + } + config := cacherstorage.Config{ - Storage: etcdStorage, + Storage: wrappedStorage, Versioner: storage.APIObjectVersioner{}, GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: setupOpts.resourcePrefix, @@ -809,5 +808,11 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *cacherstora server.Terminate(t) } + // Since some tests depend on the fact that GetList shouldn't fail, + // we wait until the error from the underlying storage is consumed. + if err := wait.PollInfinite(100*time.Millisecond, wrappedStorage.ErrorsConsumed); err != nil { + t.Fatalf("Failed to inject list errors: %v", err) + } + return ctx, cacher, terminate }