Test consistent List

This commit is contained in:
Marek Siarkowicz 2023-07-03 17:22:22 +02:00
parent 8f79a3d91e
commit b36fdd68b7
4 changed files with 37 additions and 15 deletions

View File

@ -145,9 +145,9 @@ func TestPreconditionalDeleteWithSuggestion(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
ctx, cacher, terminate := testSetup(t) ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate) t.Cleanup(terminate)
storagetesting.RunTestList(ctx, t, cacher, true) storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true)
} }
func TestListWithoutPaging(t *testing.T) { func TestListWithoutPaging(t *testing.T) {
@ -238,9 +238,9 @@ func TestWatch(t *testing.T) {
} }
func TestWatchFromZero(t *testing.T) { func TestWatchFromZero(t *testing.T) {
ctx, cacher, terminate := testSetup(t) ctx, cacher, server, terminate := testSetupWithEtcdServer(t)
t.Cleanup(terminate) t.Cleanup(terminate)
storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher)) storagetesting.RunTestWatchFromZero(ctx, t, cacher, compactStorage(cacher, server.V3Client))
} }
func TestDeleteTriggerWatch(t *testing.T) { func TestDeleteTriggerWatch(t *testing.T) {
@ -365,6 +365,11 @@ func withoutPaging(options *setupOptions) {
} }
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) { func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) {
ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...)
return ctx, cacher, tearDown
}
func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context, *Cacher, *etcd3testing.EtcdTestServer, tearDownFunc) {
setupOpts := setupOptions{} setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...) opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts { for _, opt := range opts {
@ -407,5 +412,5 @@ func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tea
t.Fatalf("Failed to inject list errors: %v", err) t.Fatalf("Failed to inject list errors: %v", err)
} }
return ctx, cacher, terminate return ctx, cacher, server, terminate
} }

View File

@ -21,6 +21,8 @@ import (
"fmt" "fmt"
"testing" "testing"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -71,7 +73,7 @@ func computePodKey(obj *example.Pod) string {
return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name) return fmt.Sprintf("/pods/%s/%s", obj.Namespace, obj.Name)
} }
func compactStorage(c *Cacher) storagetesting.Compaction { func compactStorage(c *Cacher, client *clientv3.Client) storagetesting.Compaction {
return func(ctx context.Context, t *testing.T, resourceVersion string) { return func(ctx context.Context, t *testing.T, resourceVersion string) {
versioner := storage.APIObjectVersioner{} versioner := storage.APIObjectVersioner{}
rv, err := versioner.ParseResourceVersion(resourceVersion) rv, err := versioner.ParseResourceVersion(resourceVersion)
@ -93,9 +95,6 @@ func compactStorage(c *Cacher) storagetesting.Compaction {
if c.watchCache.resourceVersion < rv { if c.watchCache.resourceVersion < rv {
t.Fatalf("Can't compact into a future version: %v", resourceVersion) t.Fatalf("Can't compact into a future version: %v", resourceVersion)
} }
if rv < c.watchCache.listResourceVersion {
t.Fatalf("Can't compact into a past version: %v", resourceVersion)
}
if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 { if len(c.watchers.allWatchers) > 0 || len(c.watchers.valueWatchers) > 0 {
// We could consider terminating those watchers, but given // We could consider terminating those watchers, but given
@ -114,6 +113,11 @@ func compactStorage(c *Cacher) storagetesting.Compaction {
} }
c.watchCache.listResourceVersion = rv c.watchCache.listResourceVersion = rv
// TODO(wojtek-t): We should also compact the underlying etcd storage. if _, err = client.KV.Put(ctx, "compact_rev_key", resourceVersion); err != nil {
t.Fatalf("Could not update compact_rev_key: %v", err)
}
if _, err = client.Compact(ctx, int64(rv)); err != nil {
t.Fatalf("Could not compact: %v", err)
}
} }
} }

View File

@ -191,8 +191,8 @@ func TestTransformationFailure(t *testing.T) {
} }
func TestList(t *testing.T) { func TestList(t *testing.T) {
ctx, store, _ := testSetup(t) ctx, store, client := testSetup(t)
storagetesting.RunTestList(ctx, t, store, false) storagetesting.RunTestList(ctx, t, store, compactStorage(client), false)
} }
func TestListWithoutPaging(t *testing.T) { func TestListWithoutPaging(t *testing.T) {
@ -258,7 +258,7 @@ func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil { if _, _, err = compact(ctx, etcdClient, 0, int64(rv)); err != nil {
t.Fatalf("Unable to compact, %v", err) t.Fatalf("Unable to compact, %v", err)
} }
} }

View File

@ -478,7 +478,7 @@ func RunTestPreconditionalDeleteWithSuggestion(ctx context.Context, t *testing.T
} }
} }
func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ignoreWatchCacheTests bool) { func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction, ignoreWatchCacheTests bool) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.RemainingItemCount, true)()
initialRV, preset, err := seedMultiLevelData(ctx, store) initialRV, preset, err := seedMultiLevelData(ctx, store)
@ -506,6 +506,11 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
pod := obj.(*example.Pod) pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil return nil, fields.Set{"metadata.name": pod.Name, "spec.nodeName": pod.Spec.NodeName}, nil
} }
// Use compact to increase etcd global revision without changes to any resources.
// The increase in resources version comes from Kubernetes compaction updating hidden key.
// Used to test consistent List to confirm it returns latest etcd revision.
compaction(ctx, t, initialRV)
currentRV := fmt.Sprintf("%d", continueRV+1)
tests := []struct { tests := []struct {
name string name string
@ -706,7 +711,7 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
expectedRemainingItemCount: utilpointer.Int64(1), expectedRemainingItemCount: utilpointer.Int64(1),
rv: list.ResourceVersion, rv: list.ResourceVersion,
rvMatch: metav1.ResourceVersionMatchNotOlderThan, rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectRV: list.ResourceVersion, expectRVFunc: resourceVersionNotOlderThan(list.ResourceVersion),
}, },
{ {
name: "test List with limit at resource version 0", name: "test List with limit at resource version 0",
@ -1019,6 +1024,14 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, ign
rvMatch: metav1.ResourceVersionMatchNotOlderThan, rvMatch: metav1.ResourceVersionMatchNotOlderThan,
expectedOut: []example.Pod{}, expectedOut: []example.Pod{},
}, },
{
name: "test consistent List",
prefix: "/pods/empty",
pred: storage.Everything,
rv: "",
expectRV: currentRV,
expectedOut: []example.Pod{},
},
} }
for _, tt := range tests { for _, tt := range tests {