mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
optimize watch-cache getlist (#116327)
* ftr(watch-cache): add benchmarks * ftr(kube-apiserver): faster watch-cache getlist * refine: testcase name * - refine var name make it easier to convey meaning - add comment to explain why we need to apply for a slice of runtime.Object instead of making a slice of ListObject.Items directly.
This commit is contained in:
parent
4c022ceb2c
commit
75f17eb38f
@ -795,24 +795,30 @@ func (c *Cacher) GetList(ctx context.Context, key string, opts storage.ListOptio
|
||||
return err
|
||||
}
|
||||
span.AddEvent("Listed items from cache", attribute.Int("count", len(objs)))
|
||||
if len(objs) > listVal.Cap() && pred.Label.Empty() && pred.Field.Empty() {
|
||||
// Resize the slice appropriately, since we already know that none
|
||||
// of the elements will be filtered out.
|
||||
listVal.Set(reflect.MakeSlice(reflect.SliceOf(c.objectType.Elem()), 0, len(objs)))
|
||||
span.AddEvent("Resized result")
|
||||
}
|
||||
// store pointer of eligible objects,
|
||||
// Why not directly put object in the items of listObj?
|
||||
// the elements in ListObject are Struct type, making slice will bring excessive memory consumption.
|
||||
// so we try to delay this action as much as possible
|
||||
var selectedObjects []runtime.Object
|
||||
for _, obj := range objs {
|
||||
elem, ok := obj.(*storeElement)
|
||||
if !ok {
|
||||
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
|
||||
}
|
||||
if filter(elem.Key, elem.Labels, elem.Fields) {
|
||||
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
|
||||
selectedObjects = append(selectedObjects, elem.Object)
|
||||
}
|
||||
}
|
||||
if listVal.IsNil() {
|
||||
if len(selectedObjects) == 0 {
|
||||
// Ensure that we never return a nil Items pointer in the result for consistency.
|
||||
listVal.Set(reflect.MakeSlice(listVal.Type(), 0, 0))
|
||||
} else {
|
||||
// Resize the slice appropriately, since we already know that size of result set
|
||||
listVal.Set(reflect.MakeSlice(listVal.Type(), len(selectedObjects), len(selectedObjects)))
|
||||
span.AddEvent("Resized result")
|
||||
for i, o := range selectedObjects {
|
||||
listVal.Index(i).Set(reflect.ValueOf(o).Elem())
|
||||
}
|
||||
}
|
||||
span.AddEvent("Filtered items", attribute.Int("count", listVal.Len()))
|
||||
if c.versioner != nil {
|
||||
|
@ -18,6 +18,7 @@ package cacher
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
@ -114,11 +115,22 @@ func newTestCacher(s storage.Interface) (*Cacher, storage.Versioner, error) {
|
||||
GroupResource: schema.GroupResource{Resource: "pods"},
|
||||
ResourcePrefix: prefix,
|
||||
KeyFunc: func(obj runtime.Object) (string, error) { return storage.NamespaceKeyFunc(prefix, obj) },
|
||||
GetAttrsFunc: storage.DefaultNamespaceScopedAttr,
|
||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||
Clock: clock.RealClock{},
|
||||
GetAttrsFunc: func(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
pod, ok := obj.(*example.Pod)
|
||||
if !ok {
|
||||
return storage.DefaultNamespaceScopedAttr(obj)
|
||||
}
|
||||
labelsSet, fieldsSet, err := storage.DefaultNamespaceScopedAttr(obj)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
fieldsSet["spec.nodeName"] = pod.Spec.NodeName
|
||||
return labelsSet, fieldsSet, nil
|
||||
},
|
||||
NewFunc: func() runtime.Object { return &example.Pod{} },
|
||||
NewListFunc: func() runtime.Object { return &example.PodList{} },
|
||||
Codec: codecs.LegacyCodec(examplev1.SchemeGroupVersion),
|
||||
Clock: clock.RealClock{},
|
||||
}
|
||||
cacher, err := NewCacherFromConfig(config)
|
||||
return cacher, testVersioner{}, err
|
||||
@ -1830,3 +1842,107 @@ func TestWaitUntilWatchCacheFreshAndForceAllEvents(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.True(t, forceAllEvents, "the target method should instruct the caller to ask for all events in the cache (full state)")
|
||||
}
|
||||
|
||||
type fakeStorage struct {
|
||||
pods []example.Pod
|
||||
storage.Interface
|
||||
}
|
||||
|
||||
func newObjectStorage(fakePods []example.Pod) *fakeStorage {
|
||||
return &fakeStorage{
|
||||
pods: fakePods,
|
||||
}
|
||||
}
|
||||
|
||||
func (m fakeStorage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
|
||||
podList := listObj.(*example.PodList)
|
||||
podList.ListMeta = metav1.ListMeta{ResourceVersion: "12345"}
|
||||
podList.Items = m.pods
|
||||
return nil
|
||||
}
|
||||
func (m fakeStorage) Watch(_ context.Context, _ string, _ storage.ListOptions) (watch.Interface, error) {
|
||||
return newDummyWatch(), nil
|
||||
}
|
||||
|
||||
func BenchmarkCacher_GetList(b *testing.B) {
|
||||
testCases := []struct {
|
||||
totalObjectNum int
|
||||
expectObjectNum int
|
||||
}{
|
||||
{
|
||||
totalObjectNum: 5000,
|
||||
expectObjectNum: 50,
|
||||
},
|
||||
{
|
||||
totalObjectNum: 5000,
|
||||
expectObjectNum: 500,
|
||||
},
|
||||
{
|
||||
totalObjectNum: 5000,
|
||||
expectObjectNum: 1000,
|
||||
},
|
||||
{
|
||||
totalObjectNum: 5000,
|
||||
expectObjectNum: 2500,
|
||||
},
|
||||
{
|
||||
totalObjectNum: 5000,
|
||||
expectObjectNum: 5000,
|
||||
},
|
||||
}
|
||||
for _, tc := range testCases {
|
||||
b.Run(
|
||||
fmt.Sprintf("totalObjectNum=%d, expectObjectNum=%d", tc.totalObjectNum, tc.expectObjectNum),
|
||||
func(b *testing.B) {
|
||||
// create sample pods
|
||||
fakePods := make([]example.Pod, tc.totalObjectNum, tc.totalObjectNum)
|
||||
for i := range fakePods {
|
||||
fakePods[i].Namespace = "default"
|
||||
fakePods[i].Name = fmt.Sprintf("pod-%d", i)
|
||||
fakePods[i].ResourceVersion = strconv.Itoa(i)
|
||||
if i%(tc.totalObjectNum/tc.expectObjectNum) == 0 {
|
||||
fakePods[i].Spec.NodeName = "node-0"
|
||||
}
|
||||
data := make([]byte, 1024*2, 1024*2) // 2k labels
|
||||
rand.Read(data)
|
||||
fakePods[i].Spec.NodeSelector = map[string]string{
|
||||
"key": string(data),
|
||||
}
|
||||
}
|
||||
|
||||
// build test cacher
|
||||
cacher, _, err := newTestCacher(newObjectStorage(fakePods))
|
||||
if err != nil {
|
||||
b.Fatalf("new cacher: %v", err)
|
||||
}
|
||||
defer cacher.Stop()
|
||||
|
||||
// prepare result and pred
|
||||
parsedField, err := fields.ParseSelector("spec.nodeName=node-0")
|
||||
if err != nil {
|
||||
b.Fatalf("parse selector: %v", err)
|
||||
}
|
||||
pred := storage.SelectionPredicate{
|
||||
Label: labels.Everything(),
|
||||
Field: parsedField,
|
||||
}
|
||||
|
||||
// now we start benchmarking
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
result := &example.PodList{}
|
||||
err = cacher.GetList(context.TODO(), "pods", storage.ListOptions{
|
||||
Predicate: pred,
|
||||
Recursive: true,
|
||||
ResourceVersion: "12345",
|
||||
}, result)
|
||||
if err != nil {
|
||||
b.Fatalf("GetList cache: %v", err)
|
||||
}
|
||||
if len(result.Items) != tc.expectObjectNum {
|
||||
b.Fatalf("expect %d but got %d", tc.expectObjectNum, len(result.Items))
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user