From e577a77eb8160a5f70668a5d538ab5a1498ba0db Mon Sep 17 00:00:00 2001 From: Maciej Borsz Date: Tue, 25 Oct 2022 12:57:24 +0000 Subject: [PATCH] Add benchmark for caching object --- .../apiserver/pkg/endpoints/watch_test.go | 122 +++++++++++------- 1 file changed, 75 insertions(+), 47 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go index c5cce9ff840..c85d207eab8 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/watch_test.go @@ -17,6 +17,7 @@ limitations under the License. package endpoints import ( + "bytes" "context" "encoding/json" "fmt" @@ -38,6 +39,7 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/util/diff" "k8s.io/apimachinery/pkg/util/wait" @@ -878,7 +880,7 @@ func BenchmarkWatchHTTP(b *testing.B) { item.Name = fmt.Sprintf("reasonable-name-%d", i) } - runWatchHTTPBenchmark(b, items) + runWatchHTTPBenchmark(b, toObjectSlice(items), "") } func BenchmarkWatchHTTP_UTF8(b *testing.B) { @@ -891,10 +893,18 @@ func BenchmarkWatchHTTP_UTF8(b *testing.B) { item.Name = fmt.Sprintf("翏Ŏ熡韐-%d", i) } - runWatchHTTPBenchmark(b, items) + runWatchHTTPBenchmark(b, toObjectSlice(items), "") } -func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) { +func toObjectSlice(in []example.Pod) []runtime.Object { + var res []runtime.Object + for _, pod := range in { + res = append(res, &pod) + } + return res +} + +func runWatchHTTPBenchmark(b *testing.B, items []runtime.Object, contentType string) { simpleStorage := &SimpleRESTStorage{} handler := handle(map[string]rest.Storage{"simples": simpleStorage}) server := httptest.NewServer(handler) @@ -909,6 +919,8 @@ func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) { if err != nil { b.Fatalf("unexpected error: %v", err) } + request.Header.Add("Accept", contentType) + response, err := client.Do(request) if err != nil { b.Fatalf("unexpected error: %v", err) @@ -931,7 +943,7 @@ func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) { b.ResetTimer() for i := 0; i < b.N; i++ { - simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)]) + simpleStorage.fakeWatch.Action(actions[i%len(actions)], items[i%len(items)]) } simpleStorage.fakeWatch.Stop() wg.Wait() @@ -982,47 +994,63 @@ func BenchmarkWatchWebsocket(b *testing.B) { func BenchmarkWatchProtobuf(b *testing.B) { items := benchmarkItems(b) - simpleStorage := &SimpleRESTStorage{} - handler := handle(map[string]rest.Storage{"simples": simpleStorage}) - server := httptest.NewServer(handler) - defer server.Close() - client := http.Client{} - - dest, _ := url.Parse(server.URL) - dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples" - dest.RawQuery = "" - - request, err := http.NewRequest("GET", dest.String(), nil) - if err != nil { - b.Fatalf("unexpected error: %v", err) - } - request.Header.Set("Accept", "application/vnd.kubernetes.protobuf") - response, err := client.Do(request) - if err != nil { - b.Fatalf("unexpected error: %v", err) - } - if response.StatusCode != http.StatusOK { - body, _ := ioutil.ReadAll(response.Body) - b.Fatalf("Unexpected response %#v\n%s", response, body) - } - - wg := sync.WaitGroup{} - wg.Add(1) - go func() { - defer response.Body.Close() - if _, err := io.Copy(ioutil.Discard, response.Body); err != nil { - b.Error(err) - } - wg.Done() - }() - - actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted} - - b.ResetTimer() - for i := 0; i < b.N; i++ { - simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)]) - } - simpleStorage.fakeWatch.Stop() - wg.Wait() - b.StopTimer() + runWatchHTTPBenchmark(b, toObjectSlice(items), "application/vnd.kubernetes.protobuf") +} + +type fakeCachingObject struct { + obj runtime.Object + + once sync.Once + raw []byte + err error +} + +func (f *fakeCachingObject) CacheEncode(_ runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error { + f.once.Do(func() { + buffer := bytes.NewBuffer(nil) + f.err = encode(f.obj, buffer) + f.raw = buffer.Bytes() + }) + + if f.err != nil { + return f.err + } + + _, err := w.Write(f.raw) + return err +} + +func (f *fakeCachingObject) GetObject() runtime.Object { + return f.obj +} + +func (f *fakeCachingObject) GetObjectKind() schema.ObjectKind { + return f.obj.GetObjectKind() +} + +func (f *fakeCachingObject) DeepCopyObject() runtime.Object { + return &fakeCachingObject{obj: f.obj.DeepCopyObject()} +} + +var _ runtime.CacheableObject = &fakeCachingObject{} +var _ runtime.Object = &fakeCachingObject{} + +func wrapCachingObject(in []example.Pod) []runtime.Object { + var res []runtime.Object + for _, pod := range in { + res = append(res, &fakeCachingObject{obj: &pod}) + } + return res +} + +func BenchmarkWatchCachingObjectJSON(b *testing.B) { + items := benchmarkItems(b) + + runWatchHTTPBenchmark(b, wrapCachingObject(items), "") +} + +func BenchmarkWatchCachingObjectProtobuf(b *testing.B) { + items := benchmarkItems(b) + + runWatchHTTPBenchmark(b, wrapCachingObject(items), "application/vnd.kubernetes.protobuf") }