diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go index d58ecfdfe4e..b47fe0ed4b9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher_test.go @@ -62,7 +62,7 @@ func TestCacheWatcherCleanupNotBlockedByResult(t *testing.T) { } // set the size of the buffer of w.result to 0, so that the writes to // w.result is blocked. - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) w.Stop() if err := wait.PollImmediate(1*time.Second, 5*time.Second, func() (bool, error) { @@ -182,7 +182,7 @@ TestCase: testCase.events[j].ResourceVersion = uint64(j) + 1 } - w := newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(testCase.events), 0) ch := w.ResultChan() @@ -219,7 +219,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { // timeout to zero and run the Stop goroutine concurrently. // May sure that the watch will not be blocked on Stop. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), false, schema.GroupResource{Resource: "pods"}, "") go w.Stop() select { case <-done: @@ -231,7 +231,7 @@ func TestCacheWatcherStoppedInAnotherGoroutine(t *testing.T) { deadline := time.Now().Add(time.Hour) // After that, verifies the cacheWatcher.process goroutine works correctly. for i := 0; i < maxRetriesToProduceTheRaceCondition; i++ { - w = newCacheWatcher(2, filter, emptyFunc, testVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(2, filter, emptyFunc, storage.APIObjectVersioner{}, deadline, false, schema.GroupResource{Resource: "pods"}, "") w.input <- &watchCacheEvent{Object: &v1.Pod{}, ResourceVersion: uint64(i + 1)} ctx, cancel := context.WithDeadline(context.Background(), deadline) defer cancel() @@ -308,7 +308,7 @@ func TestResourceVersionAfterInitEvents(t *testing.T) { filter := func(_ string, _ labels.Set, _ fields.Set) bool { return true } forget := func(_ bool) {} deadline := time.Now().Add(time.Minute) - w := newCacheWatcher(numObjects+1, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + w := newCacheWatcher(numObjects+1, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") // Simulate a situation when the last event will that was already in // the state, wasn't yet processed by cacher and will be delivered @@ -351,7 +351,7 @@ func TestTimeBucketWatchersBasic(t *testing.T) { forget := func(bool) {} newWatcher := func(deadline time.Time) *cacheWatcher { - w := newCacheWatcher(0, filter, forget, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") + w := newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, "") w.setBookmarkAfterResourceVersion(0) return w } @@ -418,7 +418,7 @@ func TestCacheWatcherDraining(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") @@ -459,7 +459,7 @@ func TestCacheWatcherDrainingRequestedButNotDrained(t *testing.T) { makeWatchCacheEvent(5), makeWatchCacheEvent(6), } - w = newCacheWatcher(1, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(1, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") go w.processInterval(context.Background(), intervalFromEvents(initEvents), 1) if !w.add(makeWatchCacheEvent(7), time.NewTimer(1*time.Second)) { t.Fatal("failed adding an even to the watcher") @@ -496,7 +496,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionReceived(t *testing.T {Object: &v1.Pod{}}, {Object: &v1.Pod{}}, } - w = newCacheWatcher(0, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(0, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") w.setBookmarkAfterResourceVersion(10) go w.processInterval(context.Background(), intervalFromEvents(initEvents), 0) if w.add(&watchCacheEvent{Object: &v1.Pod{}}, time.NewTimer(1*time.Second)) { @@ -542,7 +542,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) { w.stopLocked() } initEvents := []*watchCacheEvent{{Object: makePod(1)}, {Object: makePod(2)}} - w = newCacheWatcher(2, filter, forget, testVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") + w = newCacheWatcher(2, filter, forget, storage.APIObjectVersioner{}, time.Now(), true, schema.GroupResource{Resource: "pods"}, "") w.setBookmarkAfterResourceVersion(10) go w.processInterval(ctx, intervalFromEvents(initEvents), 0) watchInitializationSignal.Wait() @@ -596,7 +596,7 @@ func TestCacheWatcherDrainingNoBookmarkAfterResourceVersionSent(t *testing.T) { func TestBookmarkAfterResourceVersionWatchers(t *testing.T) { newWatcher := func(id string, deadline time.Time) *cacheWatcher { - w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, testVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id) + w := newCacheWatcher(0, func(_ string, _ labels.Set, _ fields.Set) bool { return true }, func(bool) {}, storage.APIObjectVersioner{}, deadline, true, schema.GroupResource{Resource: "pods"}, id) w.setBookmarkAfterResourceVersion(10) return w } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 86d184e136c..be3edd6fcbf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/apimachinery/pkg/api/apitesting" apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -58,42 +57,6 @@ import ( "k8s.io/utils/pointer" ) -type testVersioner struct{} - -func (testVersioner) UpdateObject(obj runtime.Object, resourceVersion uint64) error { - return meta.NewAccessor().SetResourceVersion(obj, strconv.FormatUint(resourceVersion, 10)) -} -func (testVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string, count *int64) error { - listAccessor, err := meta.ListAccessor(obj) - if err != nil || listAccessor == nil { - return err - } - listAccessor.SetResourceVersion(strconv.FormatUint(resourceVersion, 10)) - listAccessor.SetContinue(continueValue) - listAccessor.SetRemainingItemCount(count) - return nil -} -func (testVersioner) PrepareObjectForStorage(obj runtime.Object) error { - return fmt.Errorf("unimplemented") -} -func (testVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { - accessor, err := meta.Accessor(obj) - if err != nil { - return 0, err - } - version := accessor.GetResourceVersion() - if len(version) == 0 { - return 0, nil - } - return strconv.ParseUint(version, 10, 64) -} -func (testVersioner) ParseResourceVersion(resourceVersion string) (uint64, error) { - if len(resourceVersion) == 0 { - return 0, nil - } - return strconv.ParseUint(resourceVersion, 10, 64) -} - var ( scheme = runtime.NewScheme() codecs = serializer.NewCodecFactory(scheme) @@ -111,7 +74,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { prefix := "pods" config := Config{ Storage: s, - Versioner: testVersioner{}, + Versioner: storage.APIObjectVersioner{}, GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, @@ -133,7 +96,7 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { Clock: clock.RealClock{}, } cacher, err := NewCacherFromConfig(config) - return cacher, testVersioner{}, err + return cacher, storage.APIObjectVersioner{}, err } type dummyStorage struct { @@ -348,6 +311,90 @@ func TestWatchCacheBypass(t *testing.T) { } } +func TestEmptyWatchEventCache(t *testing.T) { + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) + defer server.Terminate(t) + + // add a few objects + v := storage.APIObjectVersioner{} + lastRV := uint64(0) + for i := 0; i < 5; i++ { + pod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("foo-%d", i), Namespace: "test-ns"}} + out := &example.Pod{} + key := computePodKey(pod) + if err := etcdStorage.Create(context.Background(), key, pod, out, 0); err != nil { + t.Fatalf("Create failed: %v", err) + } + var err error + if lastRV, err = v.ParseResourceVersion(out.ResourceVersion); err != nil { + t.Fatalf("Unexpected error: %v", err) + } + } + + cacher, _, err := newTestCacher(etcdStorage) + if err != nil { + t.Fatalf("Couldn't create cacher: %v", err) + } + defer cacher.Stop() + + // Given that cacher is always initialized from the "current" version of etcd, + // we now have a cacher with an empty cache of watch events and a resourceVersion of rv. + // It should support establishing watches from rv and higher, but not older. + + expectedResourceExpiredError := apierrors.NewResourceExpired("").ErrStatus + tests := []struct { + name string + resourceVersion uint64 + expectedEvent *watch.Event + }{ + { + name: "RV-1", + resourceVersion: lastRV - 1, + expectedEvent: &watch.Event{Type: watch.Error, Object: &expectedResourceExpiredError}, + }, + { + name: "RV", + resourceVersion: lastRV, + }, + { + name: "RV+1", + resourceVersion: lastRV + 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := storage.ListOptions{ + ResourceVersion: strconv.Itoa(int(tt.resourceVersion)), + Predicate: storage.Everything, + } + watcher, err := cacher.Watch(context.Background(), "/pods/test-ns", opts) + if err != nil { + t.Fatalf("Failed to create watch: %v", err) + } + defer watcher.Stop() + select { + case event := <-watcher.ResultChan(): + if tt.expectedEvent == nil { + t.Errorf("Unexpected event: type=%#v, object=%#v", event.Type, event.Object) + break + } + if e, a := tt.expectedEvent.Type, event.Type; e != a { + t.Errorf("Expected: %s, got: %s", e, a) + } + if e, a := tt.expectedEvent.Object, event.Object; !apiequality.Semantic.DeepDerivative(e, a) { + t.Errorf("Expected: %#v, got: %#v", e, a) + } + case <-time.After(3 * time.Second): + if tt.expectedEvent != nil { + t.Errorf("Failed to get an event") + } + // watch remained established successfully + } + }) + } +} + func TestWatchNotHangingOnStartupFailure(t *testing.T) { // Configure cacher so that it can't initialize, because of // constantly failing lists to the underlying storage. @@ -378,7 +425,7 @@ func TestWatchNotHangingOnStartupFailure(t *testing.T) { func TestWatcherNotGoingBackInTime(t *testing.T) { backingStorage := &dummyStorage{} - cacher, _, err := newTestCacher(backingStorage) + cacher, v, err := newTestCacher(backingStorage) if err != nil { t.Fatalf("Couldn't create cacher: %v", err) } @@ -448,7 +495,7 @@ func TestWatcherNotGoingBackInTime(t *testing.T) { shouldContinue = false break } - rv, err := testVersioner{}.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion()) + rv, err := v.ParseResourceVersion(event.Object.(metaRuntimeInterface).GetResourceVersion()) if err != nil { t.Errorf("unexpected parsing error: %v", err) } else { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go index 44a55877879..8a2a8ab37e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -35,6 +35,10 @@ import ( func newPod() runtime.Object { return &example.Pod{} } +func computePodKey(obj *example.Pod) string { + return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) +} + func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) storage := etcd3.New( @@ -61,7 +65,7 @@ func TestCacherListerWatcher(t *testing.T) { } for _, obj := range objects { out := &example.Pod{} - key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) + key := computePodKey(obj) if err := store.Create(context.Background(), key, obj, out, 0); err != nil { t.Fatalf("Create failed: %v", err) } @@ -97,7 +101,7 @@ func TestCacherListerWatcherPagination(t *testing.T) { } for _, obj := range objects { out := &example.Pod{} - key := fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) + key := computePodKey(obj) if err := store.Create(context.Background(), key, obj, out, 0); err != nil { t.Fatalf("Create failed: %v", err) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go index 9e23687cf8d..5c2577c5597 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/tests/cacher_test.go @@ -112,10 +112,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3 return server, storage } -func newTestCacher(s storage.Interface) (*cacherstorage.Cacher, storage.Versioner, error) { - return newTestCacherWithClock(s, clock.RealClock{}) -} - func newTestCacherWithClock(s storage.Interface, clock clock.Clock) (*cacherstorage.Cacher, storage.Versioner, error) { prefix := "pods" v := storage.APIObjectVersioner{} @@ -490,73 +486,6 @@ func TestWatchDeprecated(t *testing.T) { verifyWatchEvent(t, tooOldWatcher, watch.Error, &expectedResourceExpiredError) } -func TestEmptyWatchEventCache(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) - defer server.Terminate(t) - - // add a few objects - updatePod(t, etcdStorage, makeTestPod("pod1"), nil) - updatePod(t, etcdStorage, makeTestPod("pod2"), nil) - updatePod(t, etcdStorage, makeTestPod("pod3"), nil) - updatePod(t, etcdStorage, makeTestPod("pod4"), nil) - updatePod(t, etcdStorage, makeTestPod("pod5"), nil) - - fooCreated := updatePod(t, etcdStorage, makeTestPod("foo"), nil) - - cacher, v, err := newTestCacher(etcdStorage) - if err != nil { - t.Fatalf("Couldn't create cacher: %v", err) - } - defer cacher.Stop() - - // get rv of last pod created - rv, err := v.ParseResourceVersion(fooCreated.ResourceVersion) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // We now have a cacher with an empty cache of watch events and a resourceVersion of rv. - // It should support establishing watches from rv and higher, but not older. - - { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv - 1)), Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - expectedResourceExpiredError := errors.NewResourceExpired("").ErrStatus - verifyWatchEvent(t, watcher, watch.Error, &expectedResourceExpiredError) - } - - { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv + 1)), Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - select { - case e := <-watcher.ResultChan(): - t.Errorf("unexpected event %#v", e) - case <-time.After(3 * time.Second): - // watch from rv+1 remained established successfully - } - } - - { - watcher, err := cacher.Watch(context.TODO(), "pods/ns", storage.ListOptions{ResourceVersion: strconv.Itoa(int(rv)), Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer watcher.Stop() - select { - case e := <-watcher.ResultChan(): - t.Errorf("unexpected event %#v", e) - case <-time.After(3 * time.Second): - // watch from rv remained established successfully - } - } -} - func TestWatchDispatchBookmarkEvents(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate)