Merge pull request #116374 from sxllwx/optimize/etcd3-getlist

ftr(etcd): add benchmarks for getlist
This commit is contained in:
Kubernetes Prow Robot 2023-04-11 15:36:18 -07:00 committed by GitHub
commit 62d00139e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 138 additions and 6 deletions

View File

@ -18,6 +18,7 @@ package etcd3
import ( import (
"context" "context"
"crypto/rand"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
@ -32,6 +33,8 @@ import (
"k8s.io/apimachinery/pkg/api/apitesting" "k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/runtime/serializer"
@ -463,7 +466,7 @@ func (r *clientRecorder) GetReadsAndReset() uint64 {
} }
type setupOptions struct { type setupOptions struct {
client func(*testing.T) *clientv3.Client client func(testing.TB) *clientv3.Client
codec runtime.Codec codec runtime.Codec
newFunc func() runtime.Object newFunc func() runtime.Object
prefix string prefix string
@ -479,7 +482,7 @@ type setupOption func(*setupOptions)
func withClientConfig(config *embed.Config) setupOption { func withClientConfig(config *embed.Config) setupOption {
return func(options *setupOptions) { return func(options *setupOptions) {
options.client = func(t *testing.T) *clientv3.Client { options.client = func(t testing.TB) *clientv3.Client {
return testserver.RunEtcd(t, config) return testserver.RunEtcd(t, config)
} }
} }
@ -510,7 +513,7 @@ func withRecorder() setupOption {
} }
func withDefaults(options *setupOptions) { func withDefaults(options *setupOptions) {
options.client = func(t *testing.T) *clientv3.Client { options.client = func(t testing.TB) *clientv3.Client {
return testserver.RunEtcd(t, nil) return testserver.RunEtcd(t, nil)
} }
options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion) options.codec = apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
@ -524,7 +527,7 @@ func withDefaults(options *setupOptions) {
var _ setupOption = withDefaults var _ setupOption = withDefaults
func testSetup(t *testing.T, opts ...setupOption) (context.Context, *store, *clientv3.Client) { func testSetup(t testing.TB, opts ...setupOption) (context.Context, *store, *clientv3.Client) {
setupOpts := setupOptions{} setupOpts := setupOptions{}
opts = append([]setupOption{withDefaults}, opts...) opts = append([]setupOption{withDefaults}, opts...)
for _, opt := range opts { for _, opt := range opts {
@ -622,3 +625,132 @@ func TestInvalidKeys(t *testing.T) {
_, countErr := store.Count(invalidKey) _, countErr := store.Count(invalidKey)
expectInvalidKey("Count", countErr) expectInvalidKey("Count", countErr)
} }
func BenchmarkStore_GetList(b *testing.B) {
generateBigPod := func(index int, total int, expect int) runtime.Object {
l := map[string]string{}
if index%(total/expect) == 0 {
l["foo"] = "bar"
}
terminationGracePeriodSeconds := int64(42)
activeDeadlineSeconds := int64(42)
pod := &examplev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Labels: l,
},
Spec: examplev1.PodSpec{
RestartPolicy: examplev1.RestartPolicy("Always"),
TerminationGracePeriodSeconds: &terminationGracePeriodSeconds,
ActiveDeadlineSeconds: &activeDeadlineSeconds,
NodeSelector: map[string]string{},
ServiceAccountName: "demo-sa",
},
}
pod.Name = fmt.Sprintf("object-%d", index)
data := make([]byte, 1024*2, 1024*2) // 2k labels
rand.Read(data)
pod.Spec.NodeSelector["key"] = string(data)
return pod
}
testCases := []struct {
name string
objectNum int
expectNum int
selector labels.Selector
newObjectFunc func(index int, total int, expect int) runtime.Object
newListObjectFunc func() runtime.Object
}{
{
name: "pick 50 pods out of 5000 pod",
objectNum: 5000,
expectNum: 50,
selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}),
newObjectFunc: generateBigPod,
newListObjectFunc: func() runtime.Object { return &examplev1.PodList{} },
},
{
name: "pick 500 pods out of 5000 pod",
objectNum: 5000,
expectNum: 500,
selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}),
newObjectFunc: generateBigPod,
newListObjectFunc: func() runtime.Object { return &examplev1.PodList{} },
},
{
name: "pick 1000 pods out of 5000 pod",
objectNum: 5000,
expectNum: 1000,
selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}),
newObjectFunc: generateBigPod,
newListObjectFunc: func() runtime.Object { return &examplev1.PodList{} },
},
{
name: "pick 2500 pods out of 5000 pod",
objectNum: 5000,
expectNum: 2500,
selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}),
newObjectFunc: generateBigPod,
newListObjectFunc: func() runtime.Object { return &examplev1.PodList{} },
},
{
name: "pick 5000 pods out of 5000 pod",
objectNum: 5000,
expectNum: 5000,
selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}),
newObjectFunc: generateBigPod,
newListObjectFunc: func() runtime.Object { return &examplev1.PodList{} },
},
}
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
// booting etcd instance
ctx, store, etcdClient := testSetup(b)
defer etcdClient.Close()
// make fake objects..
dir := "/testing"
originalRevision := ""
for i := 0; i < tc.objectNum; i++ {
obj := tc.newObjectFunc(i, tc.objectNum, tc.expectNum)
o := obj.(metav1.Object)
key := fmt.Sprintf("/testing/testkey/%s", o.GetName())
out := tc.newObjectFunc(i, tc.objectNum, tc.expectNum)
if err := store.Create(ctx, key, obj, out, 0); err != nil {
b.Fatalf("Set failed: %v", err)
}
originalRevision = out.(metav1.Object).GetResourceVersion()
}
// prepare result and pred
pred := storage.SelectionPredicate{
Label: tc.selector,
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, error) {
pod, ok := obj.(*examplev1.Pod)
if !ok {
return nil, nil, fmt.Errorf("not a pod")
}
return pod.ObjectMeta.Labels, fields.Set{
"metadata.name": pod.Name,
}, nil
},
}
// now we start benchmarking
b.ResetTimer()
for i := 0; i < b.N; i++ {
list := tc.newListObjectFunc()
if err := store.GetList(ctx, dir, storage.ListOptions{Predicate: pred, Recursive: true}, list); err != nil {
b.Errorf("Unexpected List error: %v", err)
}
listObject := list.(*examplev1.PodList)
if originalRevision != listObject.GetResourceVersion() {
b.Fatalf("original revision (%s) did not match final revision after linearized reads (%s)", originalRevision, listObject.GetResourceVersion())
}
if len(listObject.Items) != tc.expectNum {
b.Fatalf("expect (%d) items but got (%d)", tc.expectNum, len(listObject.Items))
}
}
})
}
}

View File

@ -54,7 +54,7 @@ func getAvailablePorts(count int) ([]int, error) {
// - uses free ports for client and peer listeners // - uses free ports for client and peer listeners
// - cleans up the data directory on test termination // - cleans up the data directory on test termination
// - silences server logs other than errors // - silences server logs other than errors
func NewTestConfig(t *testing.T) *embed.Config { func NewTestConfig(t testing.TB) *embed.Config {
cfg := embed.NewConfig() cfg := embed.NewConfig()
cfg.UnsafeNoFsync = true cfg.UnsafeNoFsync = true
@ -81,7 +81,7 @@ func NewTestConfig(t *testing.T) *embed.Config {
// RunEtcd starts an embedded etcd server with the provided config // RunEtcd starts an embedded etcd server with the provided config
// (or NewTestConfig(t) if nil), and returns a client connected to the server. // (or NewTestConfig(t) if nil), and returns a client connected to the server.
// The server is terminated when the test ends. // The server is terminated when the test ends.
func RunEtcd(t *testing.T, cfg *embed.Config) *clientv3.Client { func RunEtcd(t testing.TB, cfg *embed.Config) *clientv3.Client {
t.Helper() t.Helper()
if cfg == nil { if cfg == nil {