mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 18:02:01 +00:00
Merge pull request #113326 from mborsz/bench3
Add benchmark for json.compact high cpu usage in watch
This commit is contained in:
commit
08644a12b3
@ -17,6 +17,7 @@ limitations under the License.
|
||||
package endpoints
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
@ -38,6 +39,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
@ -878,7 +880,7 @@ func BenchmarkWatchHTTP(b *testing.B) {
|
||||
item.Name = fmt.Sprintf("reasonable-name-%d", i)
|
||||
}
|
||||
|
||||
runWatchHTTPBenchmark(b, items)
|
||||
runWatchHTTPBenchmark(b, toObjectSlice(items), "")
|
||||
}
|
||||
|
||||
func BenchmarkWatchHTTP_UTF8(b *testing.B) {
|
||||
@ -891,10 +893,18 @@ func BenchmarkWatchHTTP_UTF8(b *testing.B) {
|
||||
item.Name = fmt.Sprintf("翏Ŏ熡韐-%d", i)
|
||||
}
|
||||
|
||||
runWatchHTTPBenchmark(b, items)
|
||||
runWatchHTTPBenchmark(b, toObjectSlice(items), "")
|
||||
}
|
||||
|
||||
func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) {
|
||||
func toObjectSlice(in []example.Pod) []runtime.Object {
|
||||
var res []runtime.Object
|
||||
for _, pod := range in {
|
||||
res = append(res, &pod)
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func runWatchHTTPBenchmark(b *testing.B, items []runtime.Object, contentType string) {
|
||||
simpleStorage := &SimpleRESTStorage{}
|
||||
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
|
||||
server := httptest.NewServer(handler)
|
||||
@ -909,6 +919,8 @@ func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) {
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
request.Header.Add("Accept", contentType)
|
||||
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
@ -931,7 +943,7 @@ func runWatchHTTPBenchmark(b *testing.B, items []example.Pod) {
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
|
||||
simpleStorage.fakeWatch.Action(actions[i%len(actions)], items[i%len(items)])
|
||||
}
|
||||
simpleStorage.fakeWatch.Stop()
|
||||
wg.Wait()
|
||||
@ -982,47 +994,63 @@ func BenchmarkWatchWebsocket(b *testing.B) {
|
||||
func BenchmarkWatchProtobuf(b *testing.B) {
|
||||
items := benchmarkItems(b)
|
||||
|
||||
simpleStorage := &SimpleRESTStorage{}
|
||||
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
dest, _ := url.Parse(server.URL)
|
||||
dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
|
||||
dest.RawQuery = ""
|
||||
|
||||
request, err := http.NewRequest("GET", dest.String(), nil)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
request.Header.Set("Accept", "application/vnd.kubernetes.protobuf")
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
body, _ := ioutil.ReadAll(response.Body)
|
||||
b.Fatalf("Unexpected response %#v\n%s", response, body)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer response.Body.Close()
|
||||
if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
|
||||
b.Error(err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
|
||||
}
|
||||
simpleStorage.fakeWatch.Stop()
|
||||
wg.Wait()
|
||||
b.StopTimer()
|
||||
runWatchHTTPBenchmark(b, toObjectSlice(items), "application/vnd.kubernetes.protobuf")
|
||||
}
|
||||
|
||||
type fakeCachingObject struct {
|
||||
obj runtime.Object
|
||||
|
||||
once sync.Once
|
||||
raw []byte
|
||||
err error
|
||||
}
|
||||
|
||||
func (f *fakeCachingObject) CacheEncode(_ runtime.Identifier, encode func(runtime.Object, io.Writer) error, w io.Writer) error {
|
||||
f.once.Do(func() {
|
||||
buffer := bytes.NewBuffer(nil)
|
||||
f.err = encode(f.obj, buffer)
|
||||
f.raw = buffer.Bytes()
|
||||
})
|
||||
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
|
||||
_, err := w.Write(f.raw)
|
||||
return err
|
||||
}
|
||||
|
||||
func (f *fakeCachingObject) GetObject() runtime.Object {
|
||||
return f.obj
|
||||
}
|
||||
|
||||
func (f *fakeCachingObject) GetObjectKind() schema.ObjectKind {
|
||||
return f.obj.GetObjectKind()
|
||||
}
|
||||
|
||||
func (f *fakeCachingObject) DeepCopyObject() runtime.Object {
|
||||
return &fakeCachingObject{obj: f.obj.DeepCopyObject()}
|
||||
}
|
||||
|
||||
var _ runtime.CacheableObject = &fakeCachingObject{}
|
||||
var _ runtime.Object = &fakeCachingObject{}
|
||||
|
||||
func wrapCachingObject(in []example.Pod) []runtime.Object {
|
||||
var res []runtime.Object
|
||||
for _, pod := range in {
|
||||
res = append(res, &fakeCachingObject{obj: &pod})
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
func BenchmarkWatchCachingObjectJSON(b *testing.B) {
|
||||
items := benchmarkItems(b)
|
||||
|
||||
runWatchHTTPBenchmark(b, wrapCachingObject(items), "")
|
||||
}
|
||||
|
||||
func BenchmarkWatchCachingObjectProtobuf(b *testing.B) {
|
||||
items := benchmarkItems(b)
|
||||
|
||||
runWatchHTTPBenchmark(b, wrapCachingObject(items), "application/vnd.kubernetes.protobuf")
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user