From afbb1a6ef98b548b9e57b168614ca6e15fd0034c Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Thu, 14 Sep 2023 11:19:36 +0200 Subject: [PATCH] storage/testing/watcher_tests: move TestCacherWatchSemantics (no-op) --- .../pkg/storage/testing/watcher_tests.go | 202 ++++++++++++++++++ 1 file changed, 202 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index 01f659decf5..eb7acf83676 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -36,9 +36,12 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/apis/example" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/value" + utilfeature "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/utils/pointer" ) @@ -1219,6 +1222,205 @@ func RunSendInitialEventsBackwardCompatibility(ctx context.Context, t *testing.T w.Stop() } +func TestCacherWatchSemantics(t *testing.T) { + trueVal, falseVal := true, false + makePod := func(rv uint64) *example.Pod { + return &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%d", rv), + Namespace: "ns", + ResourceVersion: fmt.Sprintf("%d", rv), + Annotations: map[string]string{}, + }, + } + } + + scenarios := []struct { + name string + allowWatchBookmarks bool + sendInitialEvents *bool + resourceVersion string + storageResourceVersion string + + initialPods []*example.Pod + podsAfterEstablishingWatch []*example.Pod + + expectedInitialEventsInStrictOrder []watch.Event + expectedInitialEventsInRandomOrder []watch.Event + expectedEventsAfterEstablishingWatch []watch.Event + }{ + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset, storageRV=102", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + storageResourceVersion: "102", + initialPods: []*example.Pod{makePod(101)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, + expectedEventsAfterEstablishingWatch: []watch.Event{ + {Type: watch.Added, Object: makePod(102)}, + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0, storageRV=105", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{ + {Type: watch.Added, Object: makePod(101)}, + {Type: watch.Added, Object: makePod(102)}, + }, + expectedInitialEventsInStrictOrder: []watch.Event{ + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=101, storageRV=105", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "101", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + expectedInitialEventsInStrictOrder: []watch.Event{ + {Type: watch.Bookmark, Object: &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "102", + Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, + }, + }}, + }, + }, + { + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset, storageRV=102", + sendInitialEvents: &trueVal, + storageResourceVersion: "102", + initialPods: []*example.Pod{makePod(101)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}}, + podsAfterEstablishingWatch: []*example.Pod{makePod(102)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=0, storageRV=105", + sendInitialEvents: &trueVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=101, storageRV=105", + sendInitialEvents: &trueVal, + resourceVersion: "101", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + // make sure we only get initial events that are > initial RV (101) + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + }, + { + name: "sendInitialEvents=false, RV=unset, storageRV=103", + sendInitialEvents: &falseVal, + storageResourceVersion: "103", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(104)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(104)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "sendInitialEvents=false, RV=0, storageRV=105", + sendInitialEvents: &falseVal, + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + podsAfterEstablishingWatch: []*example.Pod{makePod(103)}, + expectedEventsAfterEstablishingWatch: []watch.Event{{Type: watch.Added, Object: makePod(103)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "legacy, RV=0, storageRV=105", + resourceVersion: "0", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + expectedInitialEventsInRandomOrder: []watch.Event{{Type: watch.Added, Object: makePod(101)}, {Type: watch.Added, Object: makePod(102)}}, + }, + { + // note we set storage's RV to some future value, mustn't be used by this scenario + name: "legacy, RV=unset, storageRV=105", + storageResourceVersion: "105", + initialPods: []*example.Pod{makePod(101), makePod(102)}, + // no events because the watch is delegated to the underlying storage + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + // set up env + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() + storageListMetaResourceVersion := "" + backingStorage := &dummyStorage{getListFn: func(_ context.Context, _ string, _ storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: storageListMetaResourceVersion} + return nil + }} + + cacher, _, err := newTestCacher(backingStorage) + if err != nil { + t.Fatalf("falied to create cacher: %v", err) + } + defer cacher.Stop() + if err := cacher.ready.wait(context.TODO()); err != nil { + t.Fatalf("unexpected error waiting for the cache to be ready") + } + + // now, run a scenario + // but first let's add some initial data + for _, obj := range scenario.initialPods { + err = cacher.watchCache.Add(obj) + require.NoError(t, err, "failed to add a pod: %v") + } + // read request params + opts := storage.ListOptions{Predicate: storage.Everything} + opts.SendInitialEvents = scenario.sendInitialEvents + opts.Predicate.AllowWatchBookmarks = scenario.allowWatchBookmarks + if len(scenario.resourceVersion) > 0 { + opts.ResourceVersion = scenario.resourceVersion + } + // before starting a new watch set a storage RV to some future value + storageListMetaResourceVersion = scenario.storageResourceVersion + + w, err := cacher.Watch(context.Background(), "pods/ns", opts) + require.NoError(t, err, "failed to create watch: %v") + defer w.Stop() + + // make sure we only get initial events + verifyEvents(t, w, scenario.expectedInitialEventsInRandomOrder, false) + verifyEvents(t, w, scenario.expectedInitialEventsInStrictOrder, true) + verifyNoEvents(t, w) + // add a pod that is greater than the storage's RV when the watch was started + for _, obj := range scenario.podsAfterEstablishingWatch { + err = cacher.watchCache.Add(obj) + require.NoError(t, err, "failed to add a pod: %v") + } + verifyEvents(t, w, scenario.expectedEventsAfterEstablishingWatch, true) + verifyNoEvents(t, w) + }) + } +} + type testWatchStruct struct { obj *example.Pod expectEvent bool