From 92bdc7b3873800e6130176e49acdf5e17110e5b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Tue, 26 Sep 2023 18:39:44 +0200 Subject: [PATCH] Ensure that initial events are sorted for WatchList --- .../pkg/storage/cacher/cacher_test.go | 4 +- .../storage/cacher/watch_cache_interval.go | 17 +++ .../apiserver/pkg/storage/testing/utils.go | 8 -- .../pkg/storage/testing/watcher_tests.go | 100 ++++++++---------- 4 files changed, 65 insertions(+), 64 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index e13ffbd03e1..3cb4eff846c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -340,13 +340,13 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) { storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) } -func TestCacherWatchSemantics(t *testing.T) { +func TestWatchSemantics(t *testing.T) { store, terminate := testSetupWithEtcdAndCreateWrapper(t) t.Cleanup(terminate) storagetesting.RunWatchSemantics(context.TODO(), t, store) } -func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) { +func TestWatchSemanticInitialEventsExtended(t *testing.T) { store, terminate := testSetupWithEtcdAndCreateWrapper(t) t.Cleanup(terminate) storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go index c455357e04d..2b57dd16509 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/watch_cache_interval.go @@ -18,6 +18,7 @@ package cacher import ( "fmt" + "sort" "sync" "k8s.io/apimachinery/pkg/fields" @@ -114,9 +115,24 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida } } +type sortableWatchCacheEvents []*watchCacheEvent + +func (s sortableWatchCacheEvents) Len() int { + return len(s) +} + +func (s sortableWatchCacheEvents) Less(i, j int) bool { + return s[i].Key < s[j].Key +} + +func (s sortableWatchCacheEvents) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} + // newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events // returned by Next() need to be events from a List() done on the underlying store of // the watch cache. +// The items returned in the interval will be sorted by Key. func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) { buffer := &watchCacheIntervalBuffer{} allItems := store.List() @@ -140,6 +156,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA } buffer.endIndex++ } + sort.Sort(sortableWatchCacheEvents(buffer.buffer)) ci := &watchCacheInterval{ startIndex: 0, // Simulate that we already have all the events we're looking for. diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index 5d1fb3aa8b5..5564f3c86f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -193,14 +193,6 @@ func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEven } } -func testCheckResultsInRandomOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) { - for range expectedEvents { - testCheckResultFunc(t, w, func(actualEvent watch.Event) { - ExpectContains(t, "unexpected event", toInterfaceSlice(expectedEvents), actualEvent) - }) - } -} - func testCheckNoMoreResults(t *testing.T, w watch.Interface) { select { case e := <-w.ResultChan(): diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index e24dea401fb..9829194f939 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -1244,8 +1244,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac } return ret } - initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event { - return watch.Event{ + initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) []watch.Event { + return []watch.Event{{ Type: watch.Bookmark, Object: &example.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -1253,7 +1253,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, }, }, - } + }} } scenarios := []struct { name string @@ -1267,19 +1267,17 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac initialPods []*example.Pod podsAfterEstablishingWatch []*example.Pod - expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event - expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event + expectedInitialEvents func(createdInitialPods []*example.Pod) []watch.Event + expectedInitialEventsBookmark func(createdInitialPods []*example.Pod) []watch.Event expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event }{ { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { - return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} - }, + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEvents: addEventsFromCreatedPods, + expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1302,21 +1300,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset", sendInitialEvents: &trueVal, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - resourceVersion: "0", - initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { - return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} - }, + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "0", + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEvents: addEventsFromCreatedPods, + expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1342,21 +1338,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac sendInitialEvents: &trueVal, resourceVersion: "0", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - resourceVersion: "1", - initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { - return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} - }, + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + resourceVersion: "1", + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEvents: addEventsFromCreatedPods, + expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1366,7 +1360,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac sendInitialEvents: &falseVal, resourceVersion: "1", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInStrictOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1375,7 +1369,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac sendInitialEvents: &falseVal, resourceVersion: "1", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInStrictOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1384,21 +1378,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac sendInitialEvents: &trueVal, resourceVersion: "1", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, { - name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV", - allowWatchBookmarks: true, - sendInitialEvents: &trueVal, - useCurrentRV: true, - initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, - expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { - return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)} - }, + name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV", + allowWatchBookmarks: true, + sendInitialEvents: &trueVal, + useCurrentRV: true, + initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, + expectedInitialEvents: addEventsFromCreatedPods, + expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1424,7 +1416,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac sendInitialEvents: &trueVal, useCurrentRV: true, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1433,14 +1425,14 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac name: "legacy, RV=0", resourceVersion: "0", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, { name: "legacy, RV=unset", initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, - expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, + expectedInitialEvents: addEventsFromCreatedPods, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, }, @@ -1449,11 +1441,11 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac t.Run(scenario.name, func(t *testing.T) { // set up env defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() - if scenario.expectedInitialEventsInStrictOrder == nil { - scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil } + if scenario.expectedInitialEvents == nil { + scenario.expectedInitialEvents = func(_ []*example.Pod) []watch.Event { return nil } } - if scenario.expectedInitialEventsInRandomOrder == nil { - scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil } + if scenario.expectedInitialEventsBookmark == nil { + scenario.expectedInitialEventsBookmark = func(_ []*example.Pod) []watch.Event { return nil } } if scenario.expectedEventsAfterEstablishingWatch == nil { scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil } @@ -1487,8 +1479,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac defer w.Stop() // make sure we only get initial events - testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods)) - testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods)) + testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods)) + testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsBookmark(createdPods)) testCheckNoMoreResults(t, w) createdPods = []*example.Pod{}