Ensure that initial events are sorted for WatchList

This commit is contained in:
Wojciech Tyczyński 2023-09-26 18:39:44 +02:00
parent d2b4928669
commit 92bdc7b387
4 changed files with 65 additions and 64 deletions

View File

@ -340,13 +340,13 @@ func TestSendInitialEventsBackwardCompatibility(t *testing.T) {
storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store) storagetesting.RunSendInitialEventsBackwardCompatibility(ctx, t, store)
} }
func TestCacherWatchSemantics(t *testing.T) { func TestWatchSemantics(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t) store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate) t.Cleanup(terminate)
storagetesting.RunWatchSemantics(context.TODO(), t, store) storagetesting.RunWatchSemantics(context.TODO(), t, store)
} }
func TestCacherWatchSemanticInitialEventsExtended(t *testing.T) { func TestWatchSemanticInitialEventsExtended(t *testing.T) {
store, terminate := testSetupWithEtcdAndCreateWrapper(t) store, terminate := testSetupWithEtcdAndCreateWrapper(t)
t.Cleanup(terminate) t.Cleanup(terminate)
storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store) storagetesting.RunWatchSemanticInitialEventsExtended(context.TODO(), t, store)

View File

@ -18,6 +18,7 @@ package cacher
import ( import (
"fmt" "fmt"
"sort"
"sync" "sync"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
@ -114,9 +115,24 @@ func newCacheInterval(startIndex, endIndex int, indexer indexerFunc, indexValida
} }
} }
type sortableWatchCacheEvents []*watchCacheEvent
func (s sortableWatchCacheEvents) Len() int {
return len(s)
}
func (s sortableWatchCacheEvents) Less(i, j int) bool {
return s[i].Key < s[j].Key
}
func (s sortableWatchCacheEvents) Swap(i, j int) {
s[i], s[j] = s[j], s[i]
}
// newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events // newCacheIntervalFromStore is meant to handle the case of rv=0, such that the events
// returned by Next() need to be events from a List() done on the underlying store of // returned by Next() need to be events from a List() done on the underlying store of
// the watch cache. // the watch cache.
// The items returned in the interval will be sorted by Key.
func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) { func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getAttrsFunc attrFunc) (*watchCacheInterval, error) {
buffer := &watchCacheIntervalBuffer{} buffer := &watchCacheIntervalBuffer{}
allItems := store.List() allItems := store.List()
@ -140,6 +156,7 @@ func newCacheIntervalFromStore(resourceVersion uint64, store cache.Indexer, getA
} }
buffer.endIndex++ buffer.endIndex++
} }
sort.Sort(sortableWatchCacheEvents(buffer.buffer))
ci := &watchCacheInterval{ ci := &watchCacheInterval{
startIndex: 0, startIndex: 0,
// Simulate that we already have all the events we're looking for. // Simulate that we already have all the events we're looking for.

View File

@ -193,14 +193,6 @@ func testCheckResultsInStrictOrder(t *testing.T, w watch.Interface, expectedEven
} }
} }
func testCheckResultsInRandomOrder(t *testing.T, w watch.Interface, expectedEvents []watch.Event) {
for range expectedEvents {
testCheckResultFunc(t, w, func(actualEvent watch.Event) {
ExpectContains(t, "unexpected event", toInterfaceSlice(expectedEvents), actualEvent)
})
}
}
func testCheckNoMoreResults(t *testing.T, w watch.Interface) { func testCheckNoMoreResults(t *testing.T, w watch.Interface) {
select { select {
case e := <-w.ResultChan(): case e := <-w.ResultChan():

View File

@ -1244,8 +1244,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
} }
return ret return ret
} }
initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) watch.Event { initialEventsEndFromLastCreatedPod := func(createdInitialPods []*example.Pod) []watch.Event {
return watch.Event{ return []watch.Event{{
Type: watch.Bookmark, Type: watch.Bookmark,
Object: &example.Pod{ Object: &example.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -1253,7 +1253,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
Annotations: map[string]string{"k8s.io/initial-events-end": "true"}, Annotations: map[string]string{"k8s.io/initial-events-end": "true"},
}, },
}, },
} }}
} }
scenarios := []struct { scenarios := []struct {
name string name string
@ -1267,19 +1267,17 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
initialPods []*example.Pod initialPods []*example.Pod
podsAfterEstablishingWatch []*example.Pod podsAfterEstablishingWatch []*example.Pod
expectedInitialEventsInRandomOrder func(createdInitialPods []*example.Pod) []watch.Event expectedInitialEvents func(createdInitialPods []*example.Pod) []watch.Event
expectedInitialEventsInStrictOrder func(createdInitialPods []*example.Pod) []watch.Event expectedInitialEventsBookmark func(createdInitialPods []*example.Pod) []watch.Event
expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event expectedEventsAfterEstablishingWatch func(createdPodsAfterWatch []*example.Pod) []watch.Event
}{ }{
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=unset",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1302,21 +1300,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset", name: "allowWatchBookmarks=false, sendInitialEvents=true, RV=unset",
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=0",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "0", resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1342,21 +1338,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "0", resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=1",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "1", resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1366,7 +1360,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal, sendInitialEvents: &falseVal,
resourceVersion: "1", resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1375,7 +1369,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &falseVal, sendInitialEvents: &falseVal,
resourceVersion: "1", resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInStrictOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1384,21 +1378,19 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
resourceVersion: "1", resourceVersion: "1",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
{ {
name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV", name: "allowWatchBookmarks=true, sendInitialEvents=true, RV=useCurrentRV",
allowWatchBookmarks: true, allowWatchBookmarks: true,
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
useCurrentRV: true, useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
expectedInitialEventsInStrictOrder: func(createdInitialPods []*example.Pod) []watch.Event { expectedInitialEventsBookmark: initialEventsEndFromLastCreatedPod,
return []watch.Event{initialEventsEndFromLastCreatedPod(createdInitialPods)}
},
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1424,7 +1416,7 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
sendInitialEvents: &trueVal, sendInitialEvents: &trueVal,
useCurrentRV: true, useCurrentRV: true,
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1433,14 +1425,14 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
name: "legacy, RV=0", name: "legacy, RV=0",
resourceVersion: "0", resourceVersion: "0",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
{ {
name: "legacy, RV=unset", name: "legacy, RV=unset",
initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")}, initialPods: []*example.Pod{makePod("1"), makePod("2"), makePod("3")},
expectedInitialEventsInRandomOrder: addEventsFromCreatedPods, expectedInitialEvents: addEventsFromCreatedPods,
podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")}, podsAfterEstablishingWatch: []*example.Pod{makePod("4"), makePod("5")},
expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods, expectedEventsAfterEstablishingWatch: addEventsFromCreatedPods,
}, },
@ -1449,11 +1441,11 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
t.Run(scenario.name, func(t *testing.T) { t.Run(scenario.name, func(t *testing.T) {
// set up env // set up env
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.WatchList, true)()
if scenario.expectedInitialEventsInStrictOrder == nil { if scenario.expectedInitialEvents == nil {
scenario.expectedInitialEventsInStrictOrder = func(_ []*example.Pod) []watch.Event { return nil } scenario.expectedInitialEvents = func(_ []*example.Pod) []watch.Event { return nil }
} }
if scenario.expectedInitialEventsInRandomOrder == nil { if scenario.expectedInitialEventsBookmark == nil {
scenario.expectedInitialEventsInRandomOrder = func(_ []*example.Pod) []watch.Event { return nil } scenario.expectedInitialEventsBookmark = func(_ []*example.Pod) []watch.Event { return nil }
} }
if scenario.expectedEventsAfterEstablishingWatch == nil { if scenario.expectedEventsAfterEstablishingWatch == nil {
scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil } scenario.expectedEventsAfterEstablishingWatch = func(_ []*example.Pod) []watch.Event { return nil }
@ -1487,8 +1479,8 @@ func RunWatchSemantics(ctx context.Context, t *testing.T, store storage.Interfac
defer w.Stop() defer w.Stop()
// make sure we only get initial events // make sure we only get initial events
testCheckResultsInRandomOrder(t, w, scenario.expectedInitialEventsInRandomOrder(createdPods)) testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEvents(createdPods))
testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsInStrictOrder(createdPods)) testCheckResultsInStrictOrder(t, w, scenario.expectedInitialEventsBookmark(createdPods))
testCheckNoMoreResults(t, w) testCheckNoMoreResults(t, w)
createdPods = []*example.Pod{} createdPods = []*example.Pod{}