From a2ad9f9e4aba6aae6657a3189bdced6dbc8ba4b5 Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Mon, 7 Mar 2022 15:44:59 +0000 Subject: [PATCH] Implement page size progressing for rare selectors. --- .../apiserver/pkg/storage/etcd3/store.go | 28 +++++++-- .../apiserver/pkg/storage/etcd3/store_test.go | 61 +++++++++++++++++++ 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go index c4e80aa8e4e..75a6c3b58b8 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go @@ -48,6 +48,13 @@ import ( utiltrace "k8s.io/utils/trace" ) +const ( + // maxLimit is a maximum page limit increase used when fetching objects from etcd. + // This limit is used only for increasing page size by kube-apiserver. If request + // specifies larger limit initially, it won't be changed. + maxLimit = 10000 +) + // authenticatedDataString satisfies the value.Context interface. It uses the key to // authenticate the stored data. This does not defend against reuse of previously // encrypted values under the same key, but will prevent an attacker from using an @@ -585,11 +592,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption keyPrefix := key // set the appropriate clientv3 options to filter the returned data set - var paging bool + var limitOption *clientv3.OpOption + var limit int64 = pred.Limit options := make([]clientv3.OpOption, 0, 4) if s.pagingEnabled && pred.Limit > 0 { - paging = true - options = append(options, clientv3.WithLimit(pred.Limit)) + options = append(options, clientv3.WithLimit(limit)) + limitOption = &options[len(options)-1] } newItemFunc := getNewItemFunc(listObj, v) @@ -714,7 +722,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption // take items from the response until the bucket is full, filtering as we go for i, kv := range getResp.Kvs { - if paging && int64(v.Len()) >= pred.Limit { + if limitOption != nil && int64(v.Len()) >= pred.Limit { hasMore = true break } @@ -740,13 +748,23 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption } // no more results remain or we didn't request paging - if !hasMore || !paging { + if !hasMore || limitOption == nil { break } // we're paging but we have filled our bucket if int64(v.Len()) >= pred.Limit { break } + + if limit < maxLimit { + // We got incomplete result due to field/label selector dropping the object. + // Double page size to reduce total number of calls to etcd. + limit *= 2 + if limit > maxLimit { + limit = maxLimit + } + *limitOption = clientv3.WithLimit(limit) + } key = string(lastKey) + "\x00" if withRev == 0 { withRev = returnedRV diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go index 025970e7185..168a21d7d32 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go @@ -1783,6 +1783,67 @@ func TestListContinuation(t *testing.T) { recorder.resetReads() } +func TestListPaginationRareObject(t *testing.T) { + etcdClient := testserver.RunEtcd(t, nil) + codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) + transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)} + recorder := &clientRecorder{KV: etcdClient.KV} + etcdClient.KV = recorder + store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig()) + ctx := context.Background() + + podCount := 1000 + var pods []*example.Pod + for i := 0; i < podCount; i++ { + key := fmt.Sprintf("/one-level/pod-%d", i) + obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}} + storedObj := &example.Pod{} + err := store.Create(ctx, key, obj, storedObj, 0) + if err != nil { + t.Fatalf("Set failed: %v", err) + } + pods = append(pods, storedObj) + } + + out := &example.PodList{} + options := storage.ListOptions{ + Predicate: storage.SelectionPredicate{ + Limit: 1, + Label: labels.Everything(), + Field: fields.OneTermEqualSelector("metadata.name", "pod-999"), + GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod := obj.(*example.Pod) + return nil, fields.Set{"metadata.name": pod.Name}, nil + }, + }, + Recursive: true, + } + if err := store.GetList(ctx, "/", options, out); err != nil { + t.Fatalf("Unable to get initial list: %v", err) + } + if len(out.Continue) != 0 { + t.Errorf("Unexpected continuation token set") + } + if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) { + t.Fatalf("Unexpected first page: %#v", out.Items) + } + if transformer.reads != uint64(podCount) { + t.Errorf("unexpected reads: %d", transformer.reads) + } + // We expect that kube-apiserver will be increasing page sizes + // if not full pages are received, so we should see significantly less + // than 1000 pages (which would be result of talking to etcd with page size + // copied from pred.Limit). + // The expected number of calls is n+1 where n is the smallest n so that: + // pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount. + // For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024. + if recorder.reads != 10 { + t.Errorf("unexpected reads: %d", recorder.reads) + } + transformer.resetReads() + recorder.resetReads() +} + type clientRecorder struct { reads uint64 clientv3.KV