mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 04:52:08 +00:00
Merge pull request #123676 from serathius/rv0
Fix enabling consistent list from watch cache also works for resourceVersion=0
This commit is contained in:
commit
89b1db79d7
@ -773,7 +773,7 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
|||||||
// minimal resource version, simply forward the request to storage.
|
// minimal resource version, simply forward the request to storage.
|
||||||
return c.storage.GetList(ctx, key, opts, listObj)
|
return c.storage.GetList(ctx, key, opts, listObj)
|
||||||
}
|
}
|
||||||
if listRV == 0 && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
|
if resourceVersion == "" && utilfeature.DefaultFeatureGate.Enabled(features.ConsistentListFromCache) {
|
||||||
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
listRV, err = storage.GetCurrentResourceVersionFromStorage(ctx, c.storage, c.newListFunc, c.resourcePrefix, c.objectType.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -167,20 +167,47 @@ func TestPreconditionalDeleteWithSuggestionPass(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
|
||||||
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
|
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestListWithListFromCache(t *testing.T) {
|
func TestListWithConsistentListFromCache(t *testing.T) {
|
||||||
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove skip.
|
|
||||||
t.Skip("This test flakes flakes due to https://github.com/etcd-io/etcd/issues/17507")
|
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
|
||||||
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
||||||
t.Cleanup(terminate)
|
t.Cleanup(terminate)
|
||||||
|
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
|
||||||
|
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
|
||||||
|
err := cacher.ready.wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
|
storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsistentList(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
|
||||||
|
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestConsistentListWithConsistentListFromCache(t *testing.T) {
|
||||||
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, true)()
|
||||||
|
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
||||||
|
t.Cleanup(terminate)
|
||||||
|
// Wait before sending watch progress request to avoid https://github.com/etcd-io/etcd/issues/17507
|
||||||
|
// TODO(https://github.com/etcd-io/etcd/issues/17507): Remove the wait when etcd is upgraded to version with fix.
|
||||||
|
err := cacher.ready.wait(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
storagetesting.RunTestConsistentList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true, true)
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetListNonRecursive(t *testing.T) {
|
func TestGetListNonRecursive(t *testing.T) {
|
||||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
|
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ConsistentListFromCache, false)()
|
||||||
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
|
||||||
@ -227,7 +254,7 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
|
storagetesting.RunTestListInconsistentContinuation(ctx, t, cacher, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConsistentList(t *testing.T) {
|
func TestListResourceVersionMatch(t *testing.T) {
|
||||||
// TODO(#109831): Enable use of this test and run it.
|
// TODO(#109831): Enable use of this test and run it.
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -216,6 +216,11 @@ func TestList(t *testing.T) {
|
|||||||
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
|
storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConsistentList(t *testing.T) {
|
||||||
|
ctx, store, client := testSetup(t)
|
||||||
|
storagetesting.RunTestConsistentList(ctx, t, store, compactStorage(client), false, true)
|
||||||
|
}
|
||||||
|
|
||||||
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
|
func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation {
|
||||||
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
|
return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) {
|
||||||
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
|
if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects {
|
||||||
@ -285,9 +290,9 @@ func TestListInconsistentContinuation(t *testing.T) {
|
|||||||
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
|
storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client))
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestConsistentList(t *testing.T) {
|
func TestListResourceVersionMatch(t *testing.T) {
|
||||||
ctx, store, _ := testSetup(t)
|
ctx, store, _ := testSetup(t)
|
||||||
storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store})
|
storagetesting.RunTestListResourceVersionMatch(ctx, t, &storeWithPrefixTransformer{store})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCount(t *testing.T) {
|
func TestCount(t *testing.T) {
|
||||||
|
@ -1170,6 +1170,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
|
|||||||
expectRV: currentRV,
|
expectRV: currentRV,
|
||||||
expectedOut: []example.Pod{},
|
expectedOut: []example.Pod{},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "test non-consistent List",
|
||||||
|
prefix: "/pods/empty",
|
||||||
|
pred: storage.Everything,
|
||||||
|
rv: "0",
|
||||||
|
expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
|
||||||
|
expectedOut: []example.Pod{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
@ -1242,6 +1250,67 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func RunTestConsistentList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, cacheEnabled, consistentReadsSupported bool) {
|
||||||
|
outPod := &example.Pod{}
|
||||||
|
inPod := &example.Pod{ObjectMeta: metav1.ObjectMeta{Namespace: "default", Name: "foo"}}
|
||||||
|
err := store.Create(ctx, computePodKey(inPod), inPod, outPod, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
lastObjecRV := outPod.ResourceVersion
|
||||||
|
compaction(ctx, t, outPod.ResourceVersion)
|
||||||
|
parsedRV, _ := strconv.Atoi(outPod.ResourceVersion)
|
||||||
|
currentRV := fmt.Sprintf("%d", parsedRV+1)
|
||||||
|
|
||||||
|
firstNonConsistentReadRV := lastObjecRV
|
||||||
|
if consistentReadsSupported && !cacheEnabled {
|
||||||
|
firstNonConsistentReadRV = currentRV
|
||||||
|
}
|
||||||
|
|
||||||
|
secondNonConsistentReadRV := lastObjecRV
|
||||||
|
if consistentReadsSupported {
|
||||||
|
secondNonConsistentReadRV = currentRV
|
||||||
|
}
|
||||||
|
|
||||||
|
tcs := []struct {
|
||||||
|
name string
|
||||||
|
requestRV string
|
||||||
|
expectResponseRV string
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Non-consistent list before sync",
|
||||||
|
requestRV: "0",
|
||||||
|
expectResponseRV: firstNonConsistentReadRV,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Consistent request returns currentRV",
|
||||||
|
requestRV: "",
|
||||||
|
expectResponseRV: currentRV,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Non-consistent request after sync returns currentRV",
|
||||||
|
requestRV: "0",
|
||||||
|
expectResponseRV: secondNonConsistentReadRV,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
for _, tc := range tcs {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
out := &example.PodList{}
|
||||||
|
opts := storage.ListOptions{
|
||||||
|
ResourceVersion: tc.requestRV,
|
||||||
|
Predicate: storage.Everything,
|
||||||
|
}
|
||||||
|
err = store.GetList(ctx, "/pods/empty", opts, out)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("GetList failed: %v", err)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion != tc.expectResponseRV {
|
||||||
|
t.Errorf("resourceVersion in list response want=%s, got=%s", tc.expectResponseRV, out.ResourceVersion)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion
|
// seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion
|
||||||
// from before any were created along with the full set of objects that were persisted
|
// from before any were created along with the full set of objects that were persisted
|
||||||
func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) {
|
func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) {
|
||||||
@ -1956,7 +2025,7 @@ type InterfaceWithPrefixTransformer interface {
|
|||||||
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
UpdatePrefixTransformer(PrefixTransformerModifier) func()
|
||||||
}
|
}
|
||||||
|
|
||||||
func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
func RunTestListResourceVersionMatch(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) {
|
||||||
nextPod := func(index uint32) (string, *example.Pod) {
|
nextPod := func(index uint32) (string, *example.Pod) {
|
||||||
obj := &example.Pod{
|
obj := &example.Pod{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Loading…
Reference in New Issue
Block a user