From 75f17eb38fc8bbcb360d43dffce6e27a7159d43f Mon Sep 17 00:00:00 2001 From: wangxiang Date: Wed, 12 Apr 2023 06:36:03 +0800 Subject: [PATCH] optimize watch-cache getlist (#116327) * ftr(watch-cache): add benchmarks * ftr(kube-apiserver): faster watch-cache getlist * refine: testcase name * - refine var name make it easier to convey meaning - add comment to explain why we need to apply for a slice of runtime.Object instead of making a slice of ListObject.Items directly. --- .../apiserver/pkg/storage/cacher/cacher.go | 22 +-- .../storage/cacher/cacher_whitebox_test.go | 126 +++++++++++++++++- 2 files changed, 135 insertions(+), 13 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index eada35b1d0a..33c404633c7 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -795,24 +795,30 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio return err } span.AddEvent("Listed items from cache", attribute.Int("count", len(objs))) - if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() { - // Resize the slice appropriately, since we already know that none - // of the elements will be filtered out. - listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs))) - span.AddEvent("Resized result") - } + // store pointer of eligible objects, + // Why not directly put object in the items of listObj? + // the elements in ListObject are Struct type, making slice will bring excessive memory consumption. + // so we try to delay this action as much as possible + var selectedObjects []runtime.Object for _, obj := range objs { elem, ok := obj.(*storeElement) if !ok { return fmt.Errorf("non *storeElement returned from storage: %v", obj) } if filter(elem.Key, elem.Labels, elem.Fields) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) + selectedObjects = append(selectedObjects, elem.Object) } } - if listVal.IsNil() { + if len(selectedObjects) == 0 { // Ensure that we never return a nil Items pointer in the result for consistency. listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0)) + } else { + // Resize the slice appropriately, since we already know that size of result set + listVal.Set(reflect.MakeSlice(listVal.Type(), len(selectedObjects), len(selectedObjects))) + span.AddEvent("Resized result") + for i, o := range selectedObjects { + listVal.Index(i).Set(reflect.ValueOf(o).Elem()) + } } span.AddEvent("Filtered items", attribute.Int("count", listVal.Len())) if c.versioner != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go index 152d92b9aea..7847c1eaa4f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher_whitebox_test.go @@ -18,6 +18,7 @@ package cacher import ( "context" + "crypto/rand" "errors" "fmt" "reflect" @@ -114,11 +115,22 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) { GroupResource: schema.GroupResource{Resource: "pods"}, ResourcePrefix: prefix, KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) }, - GetAttrsFunc: storage.DefaultNamespaceScopedAttr, - NewFunc: func() runtime.Object { return &example.Pod{} }, - NewListFunc: func() runtime.Object { return &example.PodList{} }, - Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), - Clock: clock.RealClock{}, + GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) { + pod, ok := obj.(*example.Pod) + if !ok { + return storage.DefaultNamespaceScopedAttr(obj) + } + labelsSet, fieldsSet, err := storage.DefaultNamespaceScopedAttr(obj) + if err != nil { + return nil, nil, err + } + fieldsSet["spec.nodeName"] = pod.Spec.NodeName + return labelsSet, fieldsSet, nil + }, + NewFunc: func() runtime.Object { return &example.Pod{} }, + NewListFunc: func() runtime.Object { return &example.PodList{} }, + Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion), + Clock: clock.RealClock{}, } cacher, err := NewCacherFromConfig(config) return cacher, testVersioner{}, err @@ -1830,3 +1842,107 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) { require.NoError(t, err) require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)") } + +type fakeStorage struct { + pods []example.Pod + storage.Interface +} + +func newObjectStorage(fakePods []example.Pod) *fakeStorage { + return &fakeStorage{ + pods: fakePods, + } +} + +func (m fakeStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + podList := listObj.(*example.PodList) + podList.ListMeta = metav1.ListMeta{ResourceVersion: "12345"} + podList.Items = m.pods + return nil +} +func (m fakeStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) { + return newDummyWatch(), nil +} + +func BenchmarkCacher_GetList(b *testing.B) { + testCases := []struct { + totalObjectNum int + expectObjectNum int + }{ + { + totalObjectNum: 5000, + expectObjectNum: 50, + }, + { + totalObjectNum: 5000, + expectObjectNum: 500, + }, + { + totalObjectNum: 5000, + expectObjectNum: 1000, + }, + { + totalObjectNum: 5000, + expectObjectNum: 2500, + }, + { + totalObjectNum: 5000, + expectObjectNum: 5000, + }, + } + for _, tc := range testCases { + b.Run( + fmt.Sprintf("totalObjectNum=%d, expectObjectNum=%d", tc.totalObjectNum, tc.expectObjectNum), + func(b *testing.B) { + // create sample pods + fakePods := make([]example.Pod, tc.totalObjectNum, tc.totalObjectNum) + for i := range fakePods { + fakePods[i].Namespace = "default" + fakePods[i].Name = fmt.Sprintf("pod-%d", i) + fakePods[i].ResourceVersion = strconv.Itoa(i) + if i%(tc.totalObjectNum/tc.expectObjectNum) == 0 { + fakePods[i].Spec.NodeName = "node-0" + } + data := make([]byte, 1024*2, 1024*2) // 2k labels + rand.Read(data) + fakePods[i].Spec.NodeSelector = map[string]string{ + "key": string(data), + } + } + + // build test cacher + cacher, _, err := newTestCacher(newObjectStorage(fakePods)) + if err != nil { + b.Fatalf("new cacher: %v", err) + } + defer cacher.Stop() + + // prepare result and pred + parsedField, err := fields.ParseSelector("spec.nodeName=node-0") + if err != nil { + b.Fatalf("parse selector: %v", err) + } + pred := storage.SelectionPredicate{ + Label: labels.Everything(), + Field: parsedField, + } + + // now we start benchmarking + b.ResetTimer() + for i := 0; i < b.N; i++ { + result := &example.PodList{} + err = cacher.GetList(context.TODO(), "pods", storage.ListOptions{ + Predicate: pred, + Recursive: true, + ResourceVersion: "12345", + }, result) + if err != nil { + b.Fatalf("GetList cache: %v", err) + } + if len(result.Items) != tc.expectObjectNum { + b.Fatalf("expect %d but got %d", tc.expectObjectNum, len(result.Items)) + } + } + }) + } +}