From 3f4d3b67682335db510f85deb65b322127a3a0a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Fri, 20 Oct 2023 15:38:59 +0200 Subject: [PATCH] Cleanup paging parameters in etcd3 store --- .../pkg/storage/cacher/cacher_test.go | 14 +--- .../cacher/cacher_testing_utils_test.go | 3 +- .../storage/cacher/cacher_whitebox_test.go | 2 +- .../pkg/storage/cacher/lister_watcher_test.go | 4 +- .../apiserver/pkg/storage/etcd3/store.go | 14 ++-- .../apiserver/pkg/storage/etcd3/store_test.go | 14 ---- .../storage/storagebackend/factory/etcd3.go | 2 +- .../pkg/storage/testing/store_tests.go | 74 ------------------- .../k8s.io/apiserver/pkg/storage/util_test.go | 2 +- 9 files changed, 13 insertions(+), 116 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go index 64c4b9c8d49..e13ffbd03e1 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_test.go @@ -179,12 +179,6 @@ func TestListWithListFromCache(t *testing.T) { storagetesting.RunTestList(ctx, t, cacher, compactStorage(cacher, server.V3Client), true) } -func TestListWithoutPaging(t *testing.T) { - ctx, cacher, terminate := testSetup(t, withoutPaging) - t.Cleanup(terminate) - storagetesting.RunTestListWithoutPaging(ctx, t, cacher) -} - func TestGetListNonRecursive(t *testing.T) { ctx, cacher, terminate := testSetup(t) t.Cleanup(terminate) @@ -368,7 +362,6 @@ type setupOptions struct { resourcePrefix string keyFunc func(runtime.Object) (string, error) indexerFuncs map[string]storage.IndexerFunc - pagingEnabled bool clock clock.WithTicker } @@ -379,7 +372,6 @@ func withDefaults(options *setupOptions) { options.resourcePrefix = prefix options.keyFunc = func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) } - options.pagingEnabled = true options.clock = clock.RealClock{} } @@ -401,10 +393,6 @@ func withSpecNodeNameIndexerFuncs(options *setupOptions) { } } -func withoutPaging(options *setupOptions) { - options.pagingEnabled = false -} - func testSetup(t *testing.T, opts ...setupOption) (context.Context, *Cacher, tearDownFunc) { ctx, cacher, _, tearDown := testSetupWithEtcdServer(t, opts...) return ctx, cacher, tearDown @@ -417,7 +405,7 @@ func testSetupWithEtcdServer(t *testing.T, opts ...setupOption) (context.Context opt(&setupOpts) } - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), setupOpts.pagingEnabled) + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) // Inject one list error to make sure we test the relist case. wrappedStorage := &storagetesting.StorageInjectingListErrors{ Interface: etcdStorage, diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go index 3eb757c42a6..4fe2e759ae7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_testing_utils_test.go @@ -55,7 +55,7 @@ func init() { func newPod() runtime.Object { return &example.Pod{} } func newPodList() runtime.Object { return &example.PodList{} } -func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3testing.EtcdTestServer, storage.Interface) { +func newEtcdTestStorage(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) storage := etcd3.New( server.V3Client, @@ -66,7 +66,6 @@ func newEtcdTestStorage(t *testing.T, prefix string, pagingEnabled bool) (*etcd3 "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), - pagingEnabled, etcd3.NewDefaultLeaseManagerConfig()) return server, storage } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index fcef65efb52..11220e721f3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -342,7 +342,7 @@ func TestWatchCacheBypass(t *testing.T) { } func TestEmptyWatchEventCache(t *testing.T) { - server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix(), true) + server, etcdStorage := newEtcdTestStorage(t, etcd3testing.PathPrefix()) defer server.Terminate(t) // add a few objects diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go index 6506f1ff6aa..6c89ee3038e 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher_test.go @@ -28,7 +28,7 @@ import ( func TestCacherListerWatcher(t *testing.T) { prefix := "pods" fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix, true) + server, store := newEtcdTestStorage(t, prefix) defer server.Terminate(t) objects := []*example.Pod{ @@ -62,7 +62,7 @@ func TestCacherListerWatcher(t *testing.T) { func TestCacherListerWatcherPagination(t *testing.T) { prefix := "pods" fn := func() runtime.Object { return &example.PodList{} } - server, store := newEtcdTestStorage(t, prefix, true) + server, store := newEtcdTestStorage(t, prefix) defer server.Terminate(t) // We need the list to be sorted by name to later check the alphabetical order of diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index ad2896f7957..9c52ce17eb9 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -77,7 +77,6 @@ type store struct { groupResource schema.GroupResource groupResourceString string watcher *watcher - pagingEnabled bool leaseManager *leaseManager } @@ -96,11 +95,11 @@ type objState struct { } // New returns an etcd3 implementation of storage.Interface. -func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) storage.Interface { - return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, pagingEnabled, leaseManagerConfig) +func New(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) storage.Interface { + return newStore(c, codec, newFunc, newListFunc, prefix, resourcePrefix, groupResource, transformer, leaseManagerConfig) } -func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, pagingEnabled bool, leaseManagerConfig LeaseManagerConfig) *store { +func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func() runtime.Object, prefix, resourcePrefix string, groupResource schema.GroupResource, transformer value.Transformer, leaseManagerConfig LeaseManagerConfig) *store { versioner := storage.APIObjectVersioner{} // for compatibility with etcd2 impl. // no-op for default prefix of '/registry'. @@ -129,7 +128,6 @@ func newStore(c *clientv3.Client, codec runtime.Codec, newFunc, newListFunc func codec: codec, versioner: versioner, transformer: transformer, - pagingEnabled: pagingEnabled, pathPrefix: pathPrefix, groupResource: groupResource, groupResourceString: groupResource.String(), @@ -623,7 +621,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption limit := opts.Predicate.Limit var paging bool options := make([]clientv3.OpOption, 0, 4) - if s.pagingEnabled && opts.Predicate.Limit > 0 { + if opts.Predicate.Limit > 0 { paging = true options = append(options, clientv3.WithLimit(limit)) limitOption = &options[len(options)-1] @@ -643,7 +641,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption var continueRV, withRev int64 var continueKey string switch { - case opts.Recursive && s.pagingEnabled && len(opts.Predicate.Continue) > 0: + case opts.Recursive && len(opts.Predicate.Continue) > 0: continueKey, continueRV, err = storage.DecodeContinue(opts.Predicate.Continue, keyPrefix) if err != nil { return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err)) @@ -668,7 +666,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption case metav1.ResourceVersionMatchExact: withRev = int64(*fromRV) case "": // legacy case - if opts.Recursive && s.pagingEnabled && opts.Predicate.Limit > 0 && *fromRV > 0 { + if opts.Recursive && opts.Predicate.Limit > 0 && *fromRV > 0 { withRev = int64(*fromRV) } default: diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 4b874061316..dcf6c9d0f4d 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -216,11 +216,6 @@ func TestList(t *testing.T) { storagetesting.RunTestList(ctx, t, store, compactStorage(client), false) } -func TestListWithoutPaging(t *testing.T) { - ctx, store, _ := testSetup(t, withoutPaging()) - storagetesting.RunTestListWithoutPaging(ctx, t, store) -} - func checkStorageCallsInvariants(transformer *storagetesting.PrefixTransformer, recorder *clientRecorder) storagetesting.CallsValidation { return func(t *testing.T, pageSize, estimatedProcessedObjects uint64) { if reads := transformer.GetReadsAndReset(); reads != estimatedProcessedObjects { @@ -495,7 +490,6 @@ type setupOptions struct { resourcePrefix string groupResource schema.GroupResource transformer value.Transformer - pagingEnabled bool leaseConfig LeaseManagerConfig recorderEnabled bool @@ -517,12 +511,6 @@ func withPrefix(prefix string) setupOption { } } -func withoutPaging() setupOption { - return func(options *setupOptions) { - options.pagingEnabled = false - } -} - func withLeaseConfig(leaseConfig LeaseManagerConfig) setupOption { return func(options *setupOptions) { options.leaseConfig = leaseConfig @@ -546,7 +534,6 @@ func withDefaults(options *setupOptions) { options.resourcePrefix = "/pods" options.groupResource = schema.GroupResource{Resource: "pods"} options.transformer = newTestTransformer() - options.pagingEnabled = true options.leaseConfig = newTestLeaseManagerConfig() } @@ -571,7 +558,6 @@ func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *cli setupOpts.resourcePrefix, setupOpts.groupResource, setupOpts.transformer, - setupOpts.pagingEnabled, setupOpts.leaseConfig, ) ctx := context.Background() diff --git a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go index ed9640bb9ea..2aab5c76d21 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go @@ -454,7 +454,7 @@ func newETCD3Storage(c storagebackend.ConfigForResource, newFunc, newListFunc fu if transformer == nil { transformer = identity.NewEncryptCheckTransformer() } - return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, true, c.LeaseManagerConfig), destroyFunc, nil + return etcd3.New(client, c.Codec, newFunc, newListFunc, c.Prefix, resourcePrefix, c.GroupResource, transformer, c.LeaseManagerConfig), destroyFunc, nil } // startDBSizeMonitorPerEndpoint starts a loop to monitor etcd database size and update the diff --git a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go index 9deda51d162..c5a1d98ec7c 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/testing/store_tests.go @@ -28,7 +28,6 @@ import ( "sync" "testing" - "github.com/google/go-cmp/cmp" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -1243,79 +1242,6 @@ func RunTestList(ctx context.Context, t *testing.T, store storage.Interface, com } } -func RunTestListWithoutPaging(ctx context.Context, t *testing.T, store storage.Interface) { - _, preset, err := seedMultiLevelData(ctx, store) - if err != nil { - t.Fatal(err) - } - - getAttrs := func(obj runtime.Object) (labels.Set, fields.Set, error) { - pod := obj.(*example.Pod) - return nil, fields.Set{"metadata.name": pod.Name}, nil - } - - tests := []struct { - name string - disablePaging bool - rv string - rvMatch metav1.ResourceVersionMatch - prefix string - pred storage.SelectionPredicate - expectedOut []*example.Pod - expectContinue bool - expectedRemainingItemCount *int64 - expectError bool - }{ - { - name: "test List with limit when paging disabled", - disablePaging: true, - prefix: "/pods/second/", - pred: storage.SelectionPredicate{ - Label: labels.Everything(), - Field: fields.Everything(), - Limit: 1, - }, - expectedOut: []*example.Pod{preset[1], preset[2]}, - expectContinue: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if tt.pred.GetAttrs == nil { - tt.pred.GetAttrs = getAttrs - } - - out := &example.PodList{} - storageOpts := storage.ListOptions{ - ResourceVersion: tt.rv, - ResourceVersionMatch: tt.rvMatch, - Predicate: tt.pred, - Recursive: true, - } - - if err := store.GetList(ctx, tt.prefix, storageOpts, out); err != nil { - t.Fatalf("GetList failed: %v", err) - return - } - if (len(out.Continue) > 0) != tt.expectContinue { - t.Errorf("unexpected continue token: %q", out.Continue) - } - - if len(tt.expectedOut) != len(out.Items) { - t.Fatalf("length of list want=%d, got=%d", len(tt.expectedOut), len(out.Items)) - } - if diff := cmp.Diff(tt.expectedRemainingItemCount, out.ListMeta.GetRemainingItemCount()); diff != "" { - t.Errorf("incorrect remainingItemCount: %s", diff) - } - for j, wantPod := range tt.expectedOut { - getPod := &out.Items[j] - expectNoDiff(t, fmt.Sprintf("%s: incorrect pod", tt.name), wantPod, getPod) - } - }) - } -} - // seedMultiLevelData creates a set of keys with a multi-level structure, returning a resourceVersion // from before any were created along with the full set of objects that were persisted func seedMultiLevelData(ctx context.Context, store storage.Interface) (string, []*example.Pod, error) { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go index ef13dc3c52d..8f44d44411a 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/util_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/util_test.go @@ -85,7 +85,7 @@ func TestGetCurrentResourceVersionFromStorage(t *testing.T) { // test data newEtcdTestStorage := func(t *testing.T, prefix string) (*etcd3testing.EtcdTestServer, storage.Interface) { server, _ := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) - storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), true, etcd3.NewDefaultLeaseManagerConfig()) + storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion, example2v1.SchemeGroupVersion), func() runtime.Object { return &example.Pod{} }, func() runtime.Object { return &example.PodList{} }, prefix, "/pods", schema.GroupResource{Resource: "pods"}, identity.NewEncryptCheckTransformer(), etcd3.NewDefaultLeaseManagerConfig()) return server, storage } server, etcdStorage := newEtcdTestStorage(t, "")