diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index f367d8327b7..c0536fa4b57 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -22,9 +22,6 @@ import ( "io/ioutil" "os" "reflect" - "strconv" - "strings" - "sync" "sync/atomic" "testing" @@ -33,10 +30,7 @@ import ( "google.golang.org/grpc/grpclog" "k8s.io/apimachinery/pkg/api/apitesting" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" @@ -253,296 +247,41 @@ func TestListContinuationWithFilter(t *testing.T) { storagetesting.RunTestListContinuationWithFilter(ctx, t, store, validation) } +func compactStorage(etcdClient *clientv3.Client) storagetesting.Compaction { + return func(ctx context.Context, t *testing.T, resourceVersion string) { + versioner := storage.APIObjectVersioner{} + rv, err := versioner.ParseResourceVersion(resourceVersion) + if err != nil { + t.Fatal(err) + } + if _, err := etcdClient.KV.Compact(ctx, int64(rv), clientv3.WithCompactPhysical()); err != nil { + t.Fatalf("Unable to compact, %v", err) + } + } +} + func TestListInconsistentContinuation(t *testing.T) { ctx, store, client := testSetup(t) - - // Setup storage with the following structure: - // / - // - one-level/ - // | - test - // | - // - two-level/ - // - 1/ - // | - test - // | - // - 2/ - // - test - // - preset := []struct { - key string - obj *example.Pod - storedObj *example.Pod - }{ - { - key: "/one-level/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/1/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, - }, - { - key: "/two-level/2/test", - obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, - }, - } - - for i, ps := range preset { - preset[i].storedObj = &example.Pod{} - err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) - if err != nil { - t.Fatalf("Set failed: %v", err) - } - } - - pred := func(limit int64, continueValue string) storage.SelectionPredicate { - return storage.SelectionPredicate{ - Limit: limit, - Continue: continueValue, - Label: labels.Everything(), - Field: fields.Everything(), - GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - }, - } - } - - out := &example.PodList{} - options := storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, ""), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get initial list: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) - - continueFromSecondItem := out.Continue - - // update /two-level/2/test/bar - oldName := preset[2].obj.Name - newPod := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: oldName, - Labels: map[string]string{ - "state": "new", - }, - }, - } - if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, - func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { - return newPod, nil, nil - }, newPod); err != nil { - t.Fatalf("update failed: %v", err) - } - - // compact to latest revision. - versioner := storage.APIObjectVersioner{} - lastRVString := preset[2].storedObj.ResourceVersion - lastRV, err := versioner.ParseResourceVersion(lastRVString) - if err != nil { - t.Fatal(err) - } - if _, err := client.KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil { - t.Fatalf("Unable to compact, %v", err) - } - - // The old continue token should have expired - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(0, continueFromSecondItem), - Recursive: true, - } - err = store.GetList(ctx, "/", options, out) - if err == nil { - t.Fatalf("unexpected no error") - } - if !strings.Contains(err.Error(), inconsistentContinue) { - t.Fatalf("unexpected error message %v", err) - } - status, ok := err.(apierrors.APIStatus) - if !ok { - t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) - } - inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue - if len(inconsistentContinueFromSecondItem) == 0 { - t.Fatalf("expect non-empty continue token") - } - - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, inconsistentContinueFromSecondItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) == 0 { - t.Fatalf("No continuation token set") - } - validateResourceVersion := storagetesting.ResourceVersionNotOlderThan(lastRVString) - storagetesting.ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) - if err := validateResourceVersion(out.ResourceVersion); err != nil { - t.Fatal(err) - } - continueFromThirdItem := out.Continue - resolvedResourceVersionFromThirdItem := out.ResourceVersion - out = &example.PodList{} - options = storage.ListOptions{ - ResourceVersion: "0", - Predicate: pred(1, continueFromThirdItem), - Recursive: true, - } - if err := store.GetList(ctx, "/", options, out); err != nil { - t.Fatalf("Unable to get second page: %v", err) - } - if len(out.Continue) != 0 { - t.Fatalf("Unexpected continuation token set") - } - storagetesting.ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) - if out.ResourceVersion != resolvedResourceVersionFromThirdItem { - t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) - } + storagetesting.RunTestListInconsistentContinuation(ctx, t, store, compactStorage(client)) } -func newTestLeaseManagerConfig() LeaseManagerConfig { - cfg := NewDefaultLeaseManagerConfig() - // As 30s is the default timeout for testing in global configuration, - // we cannot wait longer than that in a single time: change it to 1s - // for testing purposes. See wait.ForeverTestTimeout - cfg.ReuseDurationSeconds = 1 - return cfg +func TestConsistentList(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestConsistentList(ctx, t, &storeWithPrefixTransformer{store}) } -func newTestTransformer() value.Transformer { - return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false) +func TestCount(t *testing.T) { + ctx, store, _ := testSetup(t) + storagetesting.RunTestCount(ctx, t, store) } -type clientRecorder struct { - reads uint64 - clientv3.KV -} - -func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { - atomic.AddUint64(&r.reads, 1) - return r.KV.Get(ctx, key, opts...) -} - -func (r *clientRecorder) GetReadsAndReset() uint64 { - return atomic.SwapUint64(&r.reads, 0) -} - -type setupOptions struct { - client func(*testing.T) *clientv3.Client - codec runtime.Codec - newFunc func() runtime.Object - prefix string - groupResource schema.GroupResource - transformer value.Transformer - pagingEnabled bool - leaseConfig LeaseManagerConfig - - recorderEnabled bool -} - -type setupOption func(*setupOptions) - -func withClient(client *clientv3.Client) setupOption { - return func(options *setupOptions) { - options.client = func(t *testing.T) *clientv3.Client { - return client - } - } -} - -func withClientConfig(config *embed.Config) setupOption { - return func(options *setupOptions) { - options.client = func(t *testing.T) *clientv3.Client { - return testserver.RunEtcd(t, config) - } - } -} - -func withCodec(codec runtime.Codec) setupOption { - return func(options *setupOptions) { - options.codec = codec - } -} - -func withPrefix(prefix string) setupOption { - return func(options *setupOptions) { - options.prefix = prefix - } -} - -func withoutPaging() setupOption { - return func(options *setupOptions) { - options.pagingEnabled = false - } -} - -func withTransformer(transformer value.Transformer) setupOption { - return func(options *setupOptions) { - options.transformer = transformer - } -} - -func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { - return func(options *setupOptions) { - options.leaseConfig = leaseConfig - } -} - -func withRecorder() setupOption { - return func(options *setupOptions) { - options.recorderEnabled = true - } -} - -func withDefaults(options *setupOptions) { - options.client = func(t *testing.T) *clientv3.Client { - return testserver.RunEtcd(t, nil) - } - options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) - options.newFunc = newPod - options.prefix = "" - options.groupResource = schema.GroupResource{Resource: "pods"} - options.transformer = newTestTransformer() - options.pagingEnabled = true - options.leaseConfig = newTestLeaseManagerConfig() -} - -var _ setupOption = withDefaults - -func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *clientv3.Client) { - setupOpts := setupOptions{} - opts = append([]setupOption{withDefaults}, opts...) - for _, opt := range opts { - opt(&setupOpts) - } - client := setupOpts.client(t) - if setupOpts.recorderEnabled { - client.KV = &clientRecorder{KV: client.KV} - } - store := newStore( - client, - setupOpts.codec, - setupOpts.newFunc, - setupOpts.prefix, - setupOpts.groupResource, - setupOpts.transformer, - setupOpts.pagingEnabled, - setupOpts.leaseConfig, - ) - ctx := context.Background() - return ctx, store, client -} +// ======================================================================= +// Implementation-specific tests are following. +// The following tests are exercising the details of the implementation +// not the actual user-facing contract of storage interface. +// As such, they may focus e.g. on non-functional aspects like performance +// impact. +// ======================================================================= func TestPrefix(t *testing.T) { testcases := map[string]string{ @@ -651,119 +390,6 @@ func Test_growSlice(t *testing.T) { } } -// fancyTransformer creates next object on each call to -// TransformFromStorage call. -type fancyTransformer struct { - transformer value.Transformer - store *store - - lock sync.Mutex - index int -} - -func (t *fancyTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { - if err := t.createObject(ctx); err != nil { - return nil, false, err - } - return t.transformer.TransformFromStorage(ctx, data, dataCtx) -} - -func (t *fancyTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { - return t.transformer.TransformToStorage(ctx, data, dataCtx) -} - -func (t *fancyTransformer) createObject(ctx context.Context) error { - t.lock.Lock() - defer t.lock.Unlock() - - t.index++ - key := fmt.Sprintf("pod-%d", t.index) - obj := &example.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: key, - Labels: map[string]string{ - "even": strconv.FormatBool(t.index%2 == 0), - }, - }, - } - out := &example.Pod{} - return t.store.Create(ctx, key, obj, out, 0) -} - -func TestConsistentList(t *testing.T) { - transformer := &fancyTransformer{ - transformer: newTestTransformer(), - } - ctx, store, _ := testSetup(t, withTransformer(transformer)) - transformer.store = store - - for i := 0; i < 5; i++ { - if err := transformer.createObject(ctx); err != nil { - t.Fatalf("failed to create object: %v", err) - } - } - - getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod, ok := obj.(*example.Pod) - if !ok { - return nil, nil, fmt.Errorf("invalid object") - } - return labels.Set(pod.Labels), nil, nil - } - predicate := storage.SelectionPredicate{ - Label: labels.Set{"even": "true"}.AsSelector(), - GetAttrs: getAttrs, - Limit: 4, - } - - result1 := example.PodList{} - options := storage.ListOptions{ - Predicate: predicate, - Recursive: true, - } - if err := store.GetList(ctx, "/", options, &result1); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - // List objects from the returned resource version. - options = storage.ListOptions{ - Predicate: predicate, - ResourceVersion: result1.ResourceVersion, - ResourceVersionMatch: metav1.ResourceVersionMatchExact, - Recursive: true, - } - - result2 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result2); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - storagetesting.ExpectNoDiff(t, "incorrect lists", result1, result2) - - // Now also verify the ResourceVersionMatchNotOlderThan. - options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan - - result3 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result3); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - options.ResourceVersion = result3.ResourceVersion - options.ResourceVersionMatch = metav1.ResourceVersionMatchExact - - result4 := example.PodList{} - if err := store.GetList(ctx, "/", options, &result4); err != nil { - t.Fatalf("failed to list objects: %v", err) - } - - storagetesting.ExpectNoDiff(t, "incorrect lists", result3, result4) -} - -func TestCount(t *testing.T) { - ctx, store, _ := testSetup(t) - storagetesting.RunTestCount(ctx, t, store) -} - func TestLeaseMaxObjectCount(t *testing.T) { ctx, store, _ := testSetup(t, withLeaseConfig(LeaseManagerConfig{ ReuseDurationSeconds: defaultLeaseReuseDurationSeconds, @@ -803,3 +429,134 @@ func TestLeaseMaxObjectCount(t *testing.T) { } } } + +// =================================================== +// Test-setup related function are following. +// =================================================== + +func newTestLeaseManagerConfig() LeaseManagerConfig { + cfg := NewDefaultLeaseManagerConfig() + // As 30s is the default timeout for testing in global configuration, + // we cannot wait longer than that in a single time: change it to 1s + // for testing purposes. See wait.ForeverTestTimeout + cfg.ReuseDurationSeconds = 1 + return cfg +} + +func newTestTransformer() value.Transformer { + return storagetesting.NewPrefixTransformer([]byte(defaultTestPrefix), false) +} + +type clientRecorder struct { + reads uint64 + clientv3.KV +} + +func (r *clientRecorder) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + atomic.AddUint64(&r.reads, 1) + return r.KV.Get(ctx, key, opts...) +} + +func (r *clientRecorder) GetReadsAndReset() uint64 { + return atomic.SwapUint64(&r.reads, 0) +} + +type setupOptions struct { + client func(*testing.T) *clientv3.Client + codec runtime.Codec + newFunc func() runtime.Object + prefix string + groupResource schema.GroupResource + transformer value.Transformer + pagingEnabled bool + leaseConfig LeaseManagerConfig + + recorderEnabled bool +} + +type setupOption func(*setupOptions) + +func withClient(client *clientv3.Client) setupOption { + return func(options *setupOptions) { + options.client = func(t *testing.T) *clientv3.Client { + return client + } + } +} + +func withClientConfig(config *embed.Config) setupOption { + return func(options *setupOptions) { + options.client = func(t *testing.T) *clientv3.Client { + return testserver.RunEtcd(t, config) + } + } +} + +func withCodec(codec runtime.Codec) setupOption { + return func(options *setupOptions) { + options.codec = codec + } +} + +func withPrefix(prefix string) setupOption { + return func(options *setupOptions) { + options.prefix = prefix + } +} + +func withoutPaging() setupOption { + return func(options *setupOptions) { + options.pagingEnabled = false + } +} + +func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { + return func(options *setupOptions) { + options.leaseConfig = leaseConfig + } +} + +func withRecorder() setupOption { + return func(options *setupOptions) { + options.recorderEnabled = true + } +} + +func withDefaults(options *setupOptions) { + options.client = func(t *testing.T) *clientv3.Client { + return testserver.RunEtcd(t, nil) + } + options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + options.newFunc = newPod + options.prefix = "" + options.groupResource = schema.GroupResource{Resource: "pods"} + options.transformer = newTestTransformer() + options.pagingEnabled = true + options.leaseConfig = newTestLeaseManagerConfig() +} + +var _ setupOption = withDefaults + +func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *clientv3.Client) { + setupOpts := setupOptions{} + opts = append([]setupOption{withDefaults}, opts...) + for _, opt := range opts { + opt(&setupOpts) + } + client := setupOpts.client(t) + if setupOpts.recorderEnabled { + client.KV = &clientRecorder{KV: client.KV} + } + store := newStore( + client, + setupOpts.codec, + setupOpts.newFunc, + setupOpts.prefix, + setupOpts.groupResource, + setupOpts.transformer, + setupOpts.pagingEnabled, + setupOpts.leaseConfig, + ) + ctx := context.Background() + return ctx, store, client +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go index cf76a162719..cadd90cc3bf 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/watcher_test.go @@ -49,64 +49,9 @@ func TestDeleteTriggerWatch(t *testing.T) { storagetesting.RunTestDeleteTriggerWatch(ctx, t, store) } -// TestWatchFromZero tests that -// - watch from 0 should sync up and grab the object added before -// - watch from 0 is able to return events for objects whose previous version has been compacted func TestWatchFromZero(t *testing.T) { ctx, store, client := testSetup(t) - key, storedObj := storagetesting.TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) - - w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, storedObj) - w.Stop() - - // Update - out := &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Make sure when we watch from 0 we receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) - w.Stop() - - // Update again - out = &example.Pod{} - err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( - func(runtime.Object) (runtime.Object, error) { - return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil - }), nil) - if err != nil { - t.Fatalf("GuaranteedUpdate failed: %v", err) - } - - // Compact previous versions - revToCompact, err := store.versioner.ParseResourceVersion(out.ResourceVersion) - if err != nil { - t.Fatalf("Error converting %q to an int: %v", storedObj.ResourceVersion, err) - } - _, err = client.Compact(ctx, int64(revToCompact), clientv3.WithCompactPhysical()) - if err != nil { - t.Fatalf("Error compacting: %v", err) - } - - // Make sure we can still watch from 0 and receive an ADDED event - w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) - if err != nil { - t.Fatalf("Watch failed: %v", err) - } - storagetesting.TestCheckResult(t, watch.Added, w, out) + storagetesting.RunTestWatchFromZero(ctx, t, store, compactStorage(client)) } // TestWatchFromNoneZero tests that diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index f7c92a3ff81..e6d98a0ecc8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -23,6 +23,7 @@ import ( "math" "reflect" "strconv" + "strings" "sync" "testing" @@ -1410,6 +1411,161 @@ func RunTestListContinuationWithFilter(ctx context.Context, t *testing.T, store } } +type Compaction func(ctx context.Context, t *testing.T, resourceVersion string) + +func RunTestListInconsistentContinuation(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + if compaction == nil { + t.Skipf("compaction callback not provided") + } + + // Setup storage with the following structure: + // / + // - one-level/ + // | - test + // | + // - two-level/ + // - 1/ + // | - test + // | + // - 2/ + // - test + // + preset := []struct { + key string + obj *example.Pod + storedObj *example.Pod + }{ + { + key: "/one-level/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/1/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, + }, + { + key: "/two-level/2/test", + obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}, + }, + } + + for i, ps := range preset { + preset[i].storedObj = &example.Pod{} + err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + } + + pred := func(limit int64, continueValue string) storage.SelectionPredicate { + return storage.SelectionPredicate{ + Limit: limit, + Continue: continueValue, + Label: labels.Everything(), + Field: fields.Everything(), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + } + } + + out := &example.PodList{} + options := storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, ""), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + ExpectNoDiff(t, "incorrect first page", []example.Pod{*preset[0].storedObj}, out.Items) + + continueFromSecondItem := out.Continue + + // update /two-level/2/test/bar + oldName := preset[2].obj.Name + newPod := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldName, + Labels: map[string]string{ + "state": "new", + }, + }, + } + if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil, + func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) { + return newPod, nil, nil + }, newPod); err != nil { + t.Fatalf("update failed: %v", err) + } + + // compact to latest revision. + lastRVString := preset[2].storedObj.ResourceVersion + compaction(ctx, t, lastRVString) + + // The old continue token should have expired + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(0, continueFromSecondItem), + Recursive: true, + } + err := store.GetList(ctx, "/", options, out) + if err == nil { + t.Fatalf("unexpected no error") + } + if !strings.Contains(err.Error(), "The provided continue parameter is too old ") { + t.Fatalf("unexpected error message %v", err) + } + status, ok := err.(apierrors.APIStatus) + if !ok { + t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err)) + } + inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue + if len(inconsistentContinueFromSecondItem) == 0 { + t.Fatalf("expect non-empty continue token") + } + + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, inconsistentContinueFromSecondItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) == 0 { + t.Fatalf("No continuation token set") + } + validateResourceVersion := ResourceVersionNotOlderThan(lastRVString) + ExpectNoDiff(t, "incorrect second page", []example.Pod{*preset[1].storedObj}, out.Items) + if err := validateResourceVersion(out.ResourceVersion); err != nil { + t.Fatal(err) + } + continueFromThirdItem := out.Continue + resolvedResourceVersionFromThirdItem := out.ResourceVersion + out = &example.PodList{} + options = storage.ListOptions{ + ResourceVersion: "0", + Predicate: pred(1, continueFromThirdItem), + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get second page: %v", err) + } + if len(out.Continue) != 0 { + t.Fatalf("Unexpected continuation token set") + } + ExpectNoDiff(t, "incorrect third page", []example.Pod{*preset[2].storedObj}, out.Items) + if out.ResourceVersion != resolvedResourceVersionFromThirdItem { + t.Fatalf("Expected list resource version to be %s, got %s", resolvedResourceVersionFromThirdItem, out.ResourceVersion) + } +} + type PrefixTransformerModifier func(*PrefixTransformer) value.Transformer type InterfaceWithPrefixTransformer interface { @@ -1418,6 +1574,94 @@ type InterfaceWithPrefixTransformer interface { UpdatePrefixTransformer(PrefixTransformerModifier) func() } +func RunTestConsistentList(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer) { + nextPod := func(index uint32) (string, *example.Pod) { + key := fmt.Sprintf("pod-%d", index) + obj := &example.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: key, + Labels: map[string]string{ + "even": strconv.FormatBool(index%2 == 0), + }, + }, + } + return key, obj + } + + transformer := &reproducingTransformer{ + store: store, + nextObject: nextPod, + } + + revertTransformer := store.UpdatePrefixTransformer( + func(previousTransformer *PrefixTransformer) value.Transformer { + transformer.wrapped = previousTransformer + return transformer + }) + defer revertTransformer() + + for i := 0; i < 5; i++ { + if err := transformer.createObject(ctx); err != nil { + t.Fatalf("failed to create object: %v", err) + } + } + + getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*example.Pod) + if !ok { + return nil, nil, fmt.Errorf("invalid object") + } + return labels.Set(pod.Labels), nil, nil + } + predicate := storage.SelectionPredicate{ + Label: labels.Set{"even": "true"}.AsSelector(), + GetAttrs: getAttrs, + Limit: 4, + } + + result1 := example.PodList{} + options := storage.ListOptions{ + Predicate: predicate, + Recursive: true, + } + if err := store.GetList(ctx, "/", options, &result1); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + // List objects from the returned resource version. + options = storage.ListOptions{ + Predicate: predicate, + ResourceVersion: result1.ResourceVersion, + ResourceVersionMatch: metav1.ResourceVersionMatchExact, + Recursive: true, + } + + result2 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result2); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + ExpectNoDiff(t, "incorrect lists", result1, result2) + + // Now also verify the ResourceVersionMatchNotOlderThan. + options.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan + + result3 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result3); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + options.ResourceVersion = result3.ResourceVersion + options.ResourceVersionMatch = metav1.ResourceVersionMatchExact + + result4 := example.PodList{} + if err := store.GetList(ctx, "/", options, &result4); err != nil { + t.Fatalf("failed to list objects: %v", err) + } + + ExpectNoDiff(t, "incorrect lists", result3, result4) +} + func RunTestGuaranteedUpdate(ctx context.Context, t *testing.T, store InterfaceWithPrefixTransformer, validation KeyValidation) { key := "/testkey" diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go index c6bca507b8f..6c595589839 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/utils.go @@ -235,3 +235,33 @@ func (p *PrefixTransformer) TransformToStorage(ctx context.Context, data []byte, func (p *PrefixTransformer) GetReadsAndReset() uint64 { return atomic.SwapUint64(&p.reads, 0) } + +// reproducingTransformer is a custom test-only transformer used purely +// for testing consistency. +// It allows for creating predefined objects on TransformFromStorage operations, +// which allows for precise in time injection of new objects in the middle of +// read operations. +type reproducingTransformer struct { + wrapped value.Transformer + store storage.Interface + + index uint32 + nextObject func(uint32) (string, *example.Pod) +} + +func (rt *reproducingTransformer) TransformFromStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, bool, error) { + if err := rt.createObject(ctx); err != nil { + return nil, false, err + } + return rt.wrapped.TransformFromStorage(ctx, data, dataCtx) +} + +func (rt *reproducingTransformer) TransformToStorage(ctx context.Context, data []byte, dataCtx value.Context) ([]byte, error) { + return rt.wrapped.TransformToStorage(ctx, data, dataCtx) +} + +func (rt *reproducingTransformer) createObject(ctx context.Context) error { + key, obj := rt.nextObject(atomic.AddUint32(&rt.index, 1)) + out := &example.Pod{} + return rt.store.Create(ctx, key, obj, out, 0) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go index bbdf5c4adff..d8823db90ac 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/watcher_tests.go @@ -121,6 +121,62 @@ func testWatch(ctx context.Context, t *testing.T, store storage.Interface, recur } } +// RunTestWatchFromZero tests that +// - watch from 0 should sync up and grab the object added before +// - watch from 0 is able to return events for objects whose previous version has been compacted +func RunTestWatchFromZero(ctx context.Context, t *testing.T, store storage.Interface, compaction Compaction) { + key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}) + + w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, storedObj) + w.Stop() + + // Update + out := &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns", Annotations: map[string]string{"a": "1"}}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Make sure when we watch from 0 we receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) + w.Stop() + + if compaction == nil { + t.Skip("compaction callback not provided") + } + + // Update again + out = &example.Pod{} + err = store.GuaranteedUpdate(ctx, key, out, true, nil, storage.SimpleUpdate( + func(runtime.Object) (runtime.Object, error) { + return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}}, nil + }), nil) + if err != nil { + t.Fatalf("GuaranteedUpdate failed: %v", err) + } + + // Compact previous versions + compaction(ctx, t, out.ResourceVersion) + + // Make sure we can still watch from 0 and receive an ADDED event + w, err = store.Watch(ctx, key, storage.ListOptions{ResourceVersion: "0", Predicate: storage.Everything}) + if err != nil { + t.Fatalf("Watch failed: %v", err) + } + TestCheckResult(t, watch.Added, w, out) +} + func RunTestDeleteTriggerWatch(ctx context.Context, t *testing.T, store storage.Interface) { key, storedObj := TestPropagateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) w, err := store.Watch(ctx, key, storage.ListOptions{ResourceVersion: storedObj.ResourceVersion, Predicate: storage.Everything})