mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Implement page size progressing for rare selectors.
This commit is contained in:
parent
95e30f66c3
commit
a2ad9f9e4a
@ -48,6 +48,13 @@ import (
|
|||||||
utiltrace "k8s.io/utils/trace"
|
utiltrace "k8s.io/utils/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// maxLimit is a maximum page limit increase used when fetching objects from etcd.
|
||||||
|
// This limit is used only for increasing page size by kube-apiserver. If request
|
||||||
|
// specifies larger limit initially, it won't be changed.
|
||||||
|
maxLimit = 10000
|
||||||
|
)
|
||||||
|
|
||||||
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
||||||
// authenticate the stored data. This does not defend against reuse of previously
|
// authenticate the stored data. This does not defend against reuse of previously
|
||||||
// encrypted values under the same key, but will prevent an attacker from using an
|
// encrypted values under the same key, but will prevent an attacker from using an
|
||||||
@ -585,11 +592,12 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||||||
keyPrefix := key
|
keyPrefix := key
|
||||||
|
|
||||||
// set the appropriate clientv3 options to filter the returned data set
|
// set the appropriate clientv3 options to filter the returned data set
|
||||||
var paging bool
|
var limitOption *clientv3.OpOption
|
||||||
|
var limit int64 = pred.Limit
|
||||||
options := make([]clientv3.OpOption, 0, 4)
|
options := make([]clientv3.OpOption, 0, 4)
|
||||||
if s.pagingEnabled && pred.Limit > 0 {
|
if s.pagingEnabled && pred.Limit > 0 {
|
||||||
paging = true
|
options = append(options, clientv3.WithLimit(limit))
|
||||||
options = append(options, clientv3.WithLimit(pred.Limit))
|
limitOption = &options[len(options)-1]
|
||||||
}
|
}
|
||||||
|
|
||||||
newItemFunc := getNewItemFunc(listObj, v)
|
newItemFunc := getNewItemFunc(listObj, v)
|
||||||
@ -714,7 +722,7 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||||||
|
|
||||||
// take items from the response until the bucket is full, filtering as we go
|
// take items from the response until the bucket is full, filtering as we go
|
||||||
for i, kv := range getResp.Kvs {
|
for i, kv := range getResp.Kvs {
|
||||||
if paging && int64(v.Len()) >= pred.Limit {
|
if limitOption != nil && int64(v.Len()) >= pred.Limit {
|
||||||
hasMore = true
|
hasMore = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -740,13 +748,23 @@ func (s *store) GetList(ctx context.Context, key string, opts storage.ListOption
|
|||||||
}
|
}
|
||||||
|
|
||||||
// no more results remain or we didn't request paging
|
// no more results remain or we didn't request paging
|
||||||
if !hasMore || !paging {
|
if !hasMore || limitOption == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// we're paging but we have filled our bucket
|
// we're paging but we have filled our bucket
|
||||||
if int64(v.Len()) >= pred.Limit {
|
if int64(v.Len()) >= pred.Limit {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if limit < maxLimit {
|
||||||
|
// We got incomplete result due to field/label selector dropping the object.
|
||||||
|
// Double page size to reduce total number of calls to etcd.
|
||||||
|
limit *= 2
|
||||||
|
if limit > maxLimit {
|
||||||
|
limit = maxLimit
|
||||||
|
}
|
||||||
|
*limitOption = clientv3.WithLimit(limit)
|
||||||
|
}
|
||||||
key = string(lastKey) + "\x00"
|
key = string(lastKey) + "\x00"
|
||||||
if withRev == 0 {
|
if withRev == 0 {
|
||||||
withRev = returnedRV
|
withRev = returnedRV
|
||||||
|
@ -1783,6 +1783,67 @@ func TestListContinuation(t *testing.T) {
|
|||||||
recorder.resetReads()
|
recorder.resetReads()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestListPaginationRareObject(t *testing.T) {
|
||||||
|
etcdClient := testserver.RunEtcd(t, nil)
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
|
transformer := &prefixTransformer{prefix: []byte(defaultTestPrefix)}
|
||||||
|
recorder := &clientRecorder{KV: etcdClient.KV}
|
||||||
|
etcdClient.KV = recorder
|
||||||
|
store := newStore(etcdClient, codec, newPod, "", schema.GroupResource{Resource: "pods"}, transformer, true, NewDefaultLeaseManagerConfig())
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
podCount := 1000
|
||||||
|
var pods []*example.Pod
|
||||||
|
for i := 0; i < podCount; i++ {
|
||||||
|
key := fmt.Sprintf("/one-level/pod-%d", i)
|
||||||
|
obj := &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i)}}
|
||||||
|
storedObj := &example.Pod{}
|
||||||
|
err := store.Create(ctx, key, obj, storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
pods = append(pods, storedObj)
|
||||||
|
}
|
||||||
|
|
||||||
|
out := &example.PodList{}
|
||||||
|
options := storage.ListOptions{
|
||||||
|
Predicate: storage.SelectionPredicate{
|
||||||
|
Limit: 1,
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.OneTermEqualSelector("metadata.name", "pod-999"),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, nil
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Recursive: true,
|
||||||
|
}
|
||||||
|
if err := store.GetList(ctx, "/", options, out); err != nil {
|
||||||
|
t.Fatalf("Unable to get initial list: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) != 0 {
|
||||||
|
t.Errorf("Unexpected continuation token set")
|
||||||
|
}
|
||||||
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], pods[999]) {
|
||||||
|
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||||
|
}
|
||||||
|
if transformer.reads != uint64(podCount) {
|
||||||
|
t.Errorf("unexpected reads: %d", transformer.reads)
|
||||||
|
}
|
||||||
|
// We expect that kube-apiserver will be increasing page sizes
|
||||||
|
// if not full pages are received, so we should see significantly less
|
||||||
|
// than 1000 pages (which would be result of talking to etcd with page size
|
||||||
|
// copied from pred.Limit).
|
||||||
|
// The expected number of calls is n+1 where n is the smallest n so that:
|
||||||
|
// pageSize + pageSize * 2 + pageSize * 4 + ... + pageSize * 2^n >= podCount.
|
||||||
|
// For pageSize = 1, podCount = 1000, we get n+1 = 10, 2 ^ 10 = 1024.
|
||||||
|
if recorder.reads != 10 {
|
||||||
|
t.Errorf("unexpected reads: %d", recorder.reads)
|
||||||
|
}
|
||||||
|
transformer.resetReads()
|
||||||
|
recorder.resetReads()
|
||||||
|
}
|
||||||
|
|
||||||
type clientRecorder struct {
|
type clientRecorder struct {
|
||||||
reads uint64
|
reads uint64
|
||||||
clientv3.KV
|
clientv3.KV
|
||||||
|
Loading…
Reference in New Issue
Block a user